技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • mysql

    • MySQL 问题汇总
    • MySQL 索引介绍
    • MySQL 锁介绍
    • MySQL 索引优化工具 explain
    • MySQL 主从复制(GTID)
    • MySQL 8安装
    • MySQL 8.x新特性总结
    • MySQL UDF以及新类型JSON
    • MySQL 高可用MGR(一) 理论
    • MySQL 高可用MGR(二) 搭建
    • MySQL 高可用MGR(三) 测试
  • Elasticsearch

    • ES 7.8.0(一) 入门介绍
    • ES 7.8.0(二) 读、写和写索引流程以及文档分析过程
    • ES 7.8.0(三) 文档冲突
  • mongodb

    • mongodb
  • hadoop

    • Hadoop 伪分布式及集群
    • Hadoop 指令
    • Hadoop 读写流程详解
    • Hadoop SpringBoot集成
    • Hadoop MapReduce机制
    • Hadoop YARN
    • Hadoop MapReduce配置和编写job及数据倾斜的解决
    • Hadoop MapReduce自定义格式输入输出
  • clickhouse

    • ClickHouse 介绍及安装
    • ClickHouse 数据类型
    • ClickHouse 表引擎
    • ClickHouse SQL操作
    • ClickHouse 副本配置
    • ClickHouse 分片与集群部署
    • ClickHouse Explain及建表优化
    • ClickHouse 语法优化规则
    • ClickHouse 查询优化
    • ClickHouse 数据一致性
    • ClickHouse 物化视图
    • ClickHouse MaterializeMySQL引擎
    • ClickHouse 监控及备份
  • hbase

    • Hbase 介绍及安装
    • Hbase 优化
    • Hbase phoenix安装及使用
    • Hbase LSM-TREE
  • hive

    • Hive 介绍及安装
    • Hive 内外部表、分区表、分桶表概念及hiveSQL命令
    • Hive 数据类型
    • Hive 函数 MySQL联合
    • Hive 数据倾斜和优化
    • Hive Sqoop安装及指令
  • flink

    • Flink 介绍及安装
    • Flink 配置介绍及Demo
    • Flink API讲解
    • Flink 运行架构
    • Flink 时间语义及Watermark
    • Flink 状态管理
    • Flink 容错,检查点,保存点
    • Flink 状态一致性
    • Flink Table API 和 Flink SQL
    • Flink CEP编程
    • Flink Joining编程
    • Flink CDC
  • flume

    • Flume 日志收集系统介绍及安装
    • Flume Source支持的类型
      • avro 类型的 source
        • avro source配置说明
        • 启动
        • 测试
      • exec 类型的 source
        • exec 配置
      • spooling directory 类型的 source
        • spooling directory 配置 source
      • sequence generator source(序列发生源) 类型的 source
        • spooling directory 配置 source
      • http 类型的 source
        • spooling directory 配置 source
        • 启动测试
      • Kafka 类型
      • Taildir Source
    • Flume Sink支持的类型
    • Flume Channel支持的类型
    • Flume Selector
    • Flume Interceptor拦截器类型
    • Flume Process
  • sqlite

    • SQLite介绍
目录

Flume Source支持的类型

本文及后续所有文章都以 1.8.0 做为版本讲解和入门学习

# avro 类型的 source

监听 Avro 端口来接收外部 avro 客户端的事件流。和 netcat 不同的是,avro-source 接收到的是经过 avro 序列化后的数据,然后反序列化数据继续传输。所以,如果是 avro-source 的话,源数据必须经过 avro 序列化后的数据。而 netcat 接收的是字符串格式。

利用 avro source 可以实现多级流动、扇出流、扇入流等效果,另外,也可以接收通过 flume 提供的 avro 客户端发送的日志信息。

# avro source 配置说明

在 /opt/software/flume-1.8.0/conf 下创建 source-avro.conf

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 指定source 的数据来源以及堆外开放的端口
a1.sources.r1.type=avro
a1.sources.r1.bind=node113
a1.sources.r1.port=8888

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 启动

flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-avro.conf -Dflume.root.logger=INFO,console
1

# 测试

在 node103 的 flume 执行命令,把配置文件发过去

./flume-ng avro-client -H node113 -p 8888 -F /opt/software/flume-1.8.0/conf/source-avro.conf -c /opt/software/flume-1.8.0/conf/
1

node113 接收会打印

# exec 类型的 source

可以将命令产生的输出做为源

# exec 配置

在 /opt/software/flume-1.8.0/conf 下创建 source-exec.conf
将 type 改成 exec,并添加 command 命令,会执行命令做为 source 的数据源。

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 
a1.sources.r1.type=exec
a1.sources.r1.command=ping node103

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# spooling directory 类型的 source

将指定得文件加入到 "自动搜集" 目录中。flume 会持续监听这个目录,把文件当作 source 来处理。注意:一旦文件被放到 “自动收集” 目录中,便不能修改,如果修改,flume 会报错。此外,他不能有重名的文件,否则也会报错。

当一个文件被 flume 读了以后,会在末尾 添加 .COMPLETED 标识

# spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下创建 source-spooldir.conf

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 
a1.sources.r1.type=spooldir
# 目录需要提前建立
a1.sources.r1.spoolDir=/home/data

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# sequence generator source (序列发生源) 类型的 source

一个简单的序列发生器,不断的产生事件,值是从 0 开始每次递增 1. 主要用来测试。测试消费能力。

# spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下创建 source-seq.conf

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1


a1.sources.r1.type=seq

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# http 类型的 source

此 source 接受 htpp 的 get 和 post 请求做为 f lume 的事件。其中 get 方式应该只用于试验。

如果想让 flume 正确解析 http 协议信息,比如解析出请求头、请求体等信息,需要提供一个可插拔的 “处理器” 来将请求转换为事件对象,这个处理器必须实现 HTTPSourceHandler 接口。

这个处理器接受一个 HttpServletRequest 对象,并返回一个 Flume Event 对象集合。

Flume 提供了一些常用的 Handler(处理器)。

  • JSONHandler
    可以处理 JSON 格式的数据,并支持 UTF-8 UTF-16 UTF-32 字符集,该 handler 接受 Event 数组,并根据请求头中的编码将其转换位 Flume Event,如果没有指定的编码,默认编码为 UTF-8.

# spooling directory 配置 source

在 /opt/software/flume-1.8.0/conf 下创建 source-http.conf

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1


a1.sources.r1.type=http
a1.sources.r1.bind=node113
a1.sources.r1.port=8888

# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 启动测试

启动

flume-ng agent -n a1 -c /opt/software/flume-1.8.0/conf -f /opt/software/flume-1.8.0/conf/source-http.conf -Dflume.root.logger=INFO,console
1

测试,从 node103 发送数据

curl -X POST -d '[{"headers":{"text":"hello wold"},"body":"hello hello"}]' http://node113:8888
1

node113 接收数据

2021-05-17 17:37:23,102 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{text=hello wold} body: 68 65 6C 6C 6F 20 68 65 6C 6C 6F                hello hello }
1

# Kafka 类型

flume-kafka-source 是 flume 内置的 kafka source 数据组件,是为了拉取 kafka 数据。flume-kafka-source 的 offset 是交由 zk 集群去维护 offset。

flume 属于单线程拉取数据并将数据发送内置 channel 并通过 sink 组件进行数据转发和处理,故对于 kafka 集群多副本方式拉取数据的时候,应适当考虑多个 flume 节点拉取 kafka 多副本数据,以避免 flume 节点在多个 kafka 集群副本中轮询。加大 flume 拉取 kafka 数据的速率。

属性 默认值 描述
channels – 配置的 channels 可配置多个 channels 后续文章会说到
type – org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers – 配置 kafka 集群地址
kafka.consumer.group.id flume 唯一确定的消费者群体。 在多个源或代理中设置相同的 ID 表示它们是同一个使用者组的一部分
kafka.topics – 你需要消费的 topic
kafka.topics.regex – 正则表达式,用于定义源订阅的主题集。 此属性的优先级高于 kafka.topics ,如果存在则覆盖 kafka.topics 。
batchSize 1000 一批中写入 Channel 的最大消息数 (优化项)
batchDurationMillis 1000 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项)
backoffSleepIncrement 1000 Kafka 主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空 kafka 主题的激进 ping 操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。
maxBackoffSleep 5000 Kafka 主题显示为空时触发的最长等待时间。 5 秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。
useFlumeEventFormat false 默认情况下,事件从 Kafka 主题直接作为字节直接进入事件主体。 设置为 true 以将事件读取为 Flume Avro 二进制格式。 与 KafkaSink 上的相同属性或 Kafka Channel 上的 parseAsFlumeEvent 属性一起使用时,这将保留在生成端发送的任何 Flume 标头。
setTopicHeader true 设置为 true 时,将检索到的消息的主题存储到标题中,该标题由 topicHeader 属性定义。
topicHeader topic 如果 setTopicHeader 属性设置为 true ,则定义用于存储接收消息主题名称的标题的名称。 如果与 Kafka Sink topicHeader 属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。
migrateZookeeperOffsets true 如果找不到 Kafka 存储的偏移量,请在 Zookeeper 中查找偏移量并将它们提交给 Kafka。 这应该是支持从旧版本的 Flume 无缝 Kafka 客户端迁移。 迁移后,可以将其设置为 false,但通常不需要这样做。 如果未找到 Zookeeper 偏移量,则 Kafka 配置 kafka.consumer.auto.offset.reset 定义如何处理偏移量。 查看 [Kafka 文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息 (opens new window)
kafka.consumer.security.protocol PLAINTEXT 如果使用某种级别的安全性写入 Kafka,则设置为 SASL_PLAINTEXT,SASL_SSL 或 SSL。
Other Kafka Consumer Properties – 这些属性用于配置 Kafka Consumer。 可以使用 Kafka 支持的任何消费者财产。 唯一的要求是在前缀为 “kafka.consumer” 的前缀中添加属性名称。 例如: kafka.consumer.auto.offset.reset

Kafka source 覆盖了两个 Kafka 消费者参数:source 将 auto.commit.enable 设置为 “false”,以批次进行提交。Kafka source 保证至少一次消息检索策略。source 启动时可能会出现重复项。Kafka Source 还为 key.deserializer (org.apache.kafka.common.serialization.StringSerializer) 和 value.deserializer (org.apache.kafka.common.serialization.ByteArraySerializer) 提供了默认值。不建议修改这些参数。

#1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1

# 设置kafka
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
# 一批写入 Channel 的最大消息数
a1.sources.r1.batchSize=5000
# 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项)
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers=192.168.81.101:9092
a1.sources.r1.kafka.topics=flink_yx_produce,flink_yc_produce
a1.sources.r1.kafka.consumer.group.id=flume_consume_1


# 指定a1的channels基于内存
a1.channels.c1.type=memory
# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=logger

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

agent.sources.r1.batchSize = 5000; agent.sources.r1.batchDurationMillis = 2000,即每 2 秒钟拉取 kafka 一批数据,批数据大小为 5000 放入到 flume-channels 中 。这两个值总和考虑以下两项:

  • 需要配置 kafka 单条数据 broker.conf 中配置 message.max.bytes
  • 当前 flume channel sink 组件最大消费能力如何

文档地址 https://github.com/apache/flume/blob/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst

# Taildir Source

在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为 log_20151015_10.log 的日志文件,所有 10 点产生的日志都会追加到这个文件中,到了 11 点,就会生成另一个 log_20151015_11.log 的文件。

这种场景如果通过 flume(1.6)收集,当前提供的 Spooling Directory Source 和 Exec Source 均不能满足动态实时收集的需求,在当前正在开发的 flume1.7 版本中,提供了一个非常好用的 TaildirSource,使用这个 source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

Taildir Source 可实时监控一批文件,并记录每个文件最新消费位置,agent 进程重启后不会有重复消费的问题。

# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1

# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1

######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true

######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100

######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

当你修改文件名后,文件的所有内容会被重新加载到 flume

上次更新: 6/11/2025, 4:10:30 PM
Flume 日志收集系统介绍及安装
Flume Sink支持的类型

← Flume 日志收集系统介绍及安装 Flume Sink支持的类型→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式