Flume Source支持的类型
本文及后续所有文章都以 1.8.0 做为版本讲解和入门学习
# avro 类型的 source
监听 Avro 端口来接收外部 avro 客户端的事件流。和 netcat 不同的是,avro-source 接收到的是经过 avro 序列化后的数据,然后反序列化数据继续传输。所以,如果是 avro-source 的话,源数据必须经过 avro 序列化后的数据。而 netcat 接收的是字符串格式。
利用 avro source 可以实现多级流动、扇出流、扇入流等效果,另外,也可以接收通过 flume 提供的 avro 客户端发送的日志信息。
# avro source 配置说明
在 /opt/software/flume-1.8.0/conf 下创建 source-avro.conf
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
# 指定source 的数据来源以及堆外开放的端口
a1.sources.r1.type=avro
a1.sources.r1.bind=node113
a1.sources.r1.port=8888
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 启动
flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-avro.conf -Dflume.root.logger=INFO,console
# 测试
在 node103 的 flume 执行命令,把配置文件发过去
./flume-ng avro-client -H node113 -p 8888 -F /opt/software/flume-1.8.0/conf/source-avro.conf -c /opt/software/flume-1.8.0/conf/
node113 接收会打印
# exec 类型的 source
可以将命令产生的输出做为源
# exec 配置
在 /opt/software/flume-1.8.0/conf 下创建 source-exec.conf
将 type 改成 exec,并添加 command 命令,会执行命令做为 source 的数据源。
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
#
a1.sources.r1.type=exec
a1.sources.r1.command=ping node103
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# spooling directory 类型的 source
将指定得文件加入到 "自动搜集" 目录中。flume 会持续监听这个目录,把文件当作 source 来处理。注意:一旦文件被放到 “自动收集” 目录中,便不能修改,如果修改,flume 会报错。此外,他不能有重名的文件,否则也会报错。
当一个文件被 flume 读了以后,会在末尾 添加 .COMPLETED 标识
# spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下创建 source-spooldir.conf
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
#
a1.sources.r1.type=spooldir
# 目录需要提前建立
a1.sources.r1.spoolDir=/home/data
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# sequence generator source (序列发生源) 类型的 source
一个简单的序列发生器,不断的产生事件,值是从 0 开始每次递增 1. 主要用来测试。测试消费能力。
# spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下创建 source-seq.conf
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.type=seq
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# http 类型的 source
此 source 接受 htpp 的 get 和 post 请求做为 f lume 的事件。其中 get 方式应该只用于试验。
如果想让 flume 正确解析 http 协议信息,比如解析出请求头、请求体等信息,需要提供一个可插拔的 “处理器” 来将请求转换为事件对象,这个处理器必须实现 HTTPSourceHandler 接口。
这个处理器接受一个 HttpServletRequest 对象,并返回一个 Flume Event 对象集合。
Flume 提供了一些常用的 Handler(处理器)。
- JSONHandler
可以处理 JSON 格式的数据,并支持 UTF-8 UTF-16 UTF-32 字符集,该 handler 接受 Event 数组,并根据请求头中的编码将其转换位 Flume Event,如果没有指定的编码,默认编码为 UTF-8.
# spooling directory 配置 source
在 /opt/software/flume-1.8.0/conf 下创建 source-http.conf
# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.type=http
a1.sources.r1.bind=node113
a1.sources.r1.port=8888
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 启动测试
启动
flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-http.conf -Dflume.root.logger=INFO,console
测试,从 node103 发送数据
curl -X POST -d '[{"headers":{"text":"hello wold"},"body":"hello hello"}]' http://node113:8888
node113 接收数据
2021-05-17 17:37:23,102 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{text=hello wold} body: 68 65 6C 6C 6F 20 68 65 6C 6C 6F hello hello }
# Kafka 类型
flume-kafka-source 是 flume 内置的 kafka source 数据组件,是为了拉取 kafka 数据。flume-kafka-source 的 offset 是交由 zk 集群去维护 offset。
flume 属于单线程拉取数据并将数据发送内置 channel 并通过 sink 组件进行数据转发和处理,故对于 kafka 集群多副本方式拉取数据的时候,应适当考虑多个 flume 节点拉取 kafka 多副本数据,以避免 flume 节点在多个 kafka 集群副本中轮询。加大 flume 拉取 kafka 数据的速率。
属性 | 默认值 | 描述 |
---|---|---|
channels | – | 配置的 channels 可配置多个 channels 后续文章会说到 |
type | – | org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | 配置 kafka 集群地址 |
kafka.consumer.group.id | flume | 唯一确定的消费者群体。 在多个源或代理中设置相同的 ID 表示它们是同一个使用者组的一部分 |
kafka.topics | – | 你需要消费的 topic |
kafka.topics.regex | – | 正则表达式,用于定义源订阅的主题集。 此属性的优先级高于 kafka.topics ,如果存在则覆盖 kafka.topics 。 |
batchSize | 1000 | 一批中写入 Channel 的最大消息数 (优化项) |
batchDurationMillis | 1000 | 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项) |
backoffSleepIncrement | 1000 | Kafka 主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空 kafka 主题的激进 ping 操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
maxBackoffSleep | 5000 | Kafka 主题显示为空时触发的最长等待时间。 5 秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 |
useFlumeEventFormat | false | 默认情况下,事件从 Kafka 主题直接作为字节直接进入事件主体。 设置为 true 以将事件读取为 Flume Avro 二进制格式。 与 KafkaSink 上的相同属性或 Kafka Channel 上的 parseAsFlumeEvent 属性一起使用时,这将保留在生成端发送的任何 Flume 标头。 |
setTopicHeader | true | 设置为 true 时,将检索到的消息的主题存储到标题中,该标题由 topicHeader 属性定义。 |
topicHeader | topic | 如果 setTopicHeader 属性设置为 true ,则定义用于存储接收消息主题名称的标题的名称。 如果与 Kafka Sink topicHeader 属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。 |
migrateZookeeperOffsets | true | 如果找不到 Kafka 存储的偏移量,请在 Zookeeper 中查找偏移量并将它们提交给 Kafka。 这应该是支持从旧版本的 Flume 无缝 Kafka 客户端迁移。 迁移后,可以将其设置为 false,但通常不需要这样做。 如果未找到 Zookeeper 偏移量,则 Kafka 配置 kafka.consumer.auto.offset.reset 定义如何处理偏移量。 查看 [Kafka 文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息 (opens new window) |
kafka.consumer.security.protocol | PLAINTEXT | 如果使用某种级别的安全性写入 Kafka,则设置为 SASL_PLAINTEXT,SASL_SSL 或 SSL。 |
Other Kafka Consumer Properties | – | 这些属性用于配置 Kafka Consumer。 可以使用 Kafka 支持的任何消费者财产。 唯一的要求是在前缀为 “kafka.consumer” 的前缀中添加属性名称。 例如: kafka.consumer.auto.offset.reset |
Kafka source 覆盖了两个 Kafka 消费者参数:source 将 auto.commit.enable 设置为 “false”,以批次进行提交。Kafka source 保证至少一次消息检索策略。source 启动时可能会出现重复项。Kafka Source 还为 key.deserializer (org.apache.kafka.common.serialization.StringSerializer) 和 value.deserializer (org.apache.kafka.common.serialization.ByteArraySerializer) 提供了默认值。不建议修改这些参数。
#1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1
# 设置kafka
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
# 一批写入 Channel 的最大消息数
a1.sources.r1.batchSize=5000
# 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项)
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers=192.168.81.101:9092
a1.sources.r1.kafka.topics=flink_yx_produce,flink_yc_produce
a1.sources.r1.kafka.consumer.group.id=flume_consume_1
# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger
# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
agent.sources.r1.batchSize = 5000; agent.sources.r1.batchDurationMillis = 2000,即每 2 秒钟拉取 kafka 一批数据,批数据大小为 5000 放入到 flume-channels 中 。这两个值总和考虑以下两项:
- 需要配置 kafka 单条数据 broker.conf 中配置 message.max.bytes
- 当前 flume channel sink 组件最大消费能力如何
文档地址 https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
# Taildir Source
在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为 log_20151015_10.log 的日志文件,所有 10 点产生的日志都会追加到这个文件中,到了 11 点,就会生成另一个 log_20151015_11.log 的文件。
这种场景如果通过 flume(1.6)收集,当前提供的 Spooling Directory Source 和 Exec Source 均不能满足动态实时收集的需求,在当前正在开发的 flume1.7 版本中,提供了一个非常好用的 TaildirSource,使用这个 source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。
Taildir Source 可实时监控一批文件,并记录每个文件最新消费位置,agent 进程重启后不会有重复消费的问题。
# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1
# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1
######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true
######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100
######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
当你修改文件名后,文件的所有内容会被重新加载到 flume