技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • mysql

    • MySQL 问题汇总
    • MySQL 索引介绍
    • MySQL 锁介绍
    • MySQL 索引优化工具 explain
    • MySQL 主从复制(GTID)
    • MySQL 8安装
    • MySQL 8.x新特性总结
    • MySQL UDF以及新类型JSON
    • MySQL 高可用MGR(一) 理论
    • MySQL 高可用MGR(二) 搭建
    • MySQL 高可用MGR(三) 测试
  • Elasticsearch

    • ES 7.8.0(一) 入门介绍
    • ES 7.8.0(二) 读、写和写索引流程以及文档分析过程
    • ES 7.8.0(三) 文档冲突
  • mongodb

    • mongodb
  • hadoop

    • Hadoop 伪分布式及集群
    • Hadoop 指令
    • Hadoop 读写流程详解
    • Hadoop SpringBoot集成
    • Hadoop MapReduce机制
    • Hadoop YARN
    • Hadoop MapReduce配置和编写job及数据倾斜的解决
    • Hadoop MapReduce自定义格式输入输出
  • clickhouse

    • ClickHouse 介绍及安装
    • ClickHouse 数据类型
    • ClickHouse 表引擎
    • ClickHouse SQL操作
    • ClickHouse 副本配置
    • ClickHouse 分片与集群部署
    • ClickHouse Explain及建表优化
    • ClickHouse 语法优化规则
    • ClickHouse 查询优化
    • ClickHouse 数据一致性
    • ClickHouse 物化视图
    • ClickHouse MaterializeMySQL引擎
    • ClickHouse 监控及备份
  • hbase

    • Hbase 介绍及安装
    • Hbase 优化
    • Hbase phoenix安装及使用
    • Hbase LSM-TREE
  • hive

    • Hive 介绍及安装
    • Hive 内外部表、分区表、分桶表概念及hiveSQL命令
    • Hive 数据类型
    • Hive 函数 MySQL联合
    • Hive 数据倾斜和优化
    • Hive Sqoop安装及指令
  • flink

    • Flink 介绍及安装
    • Flink 配置介绍及Demo
    • Flink API讲解
    • Flink 运行架构
    • Flink 时间语义及Watermark
    • Flink 状态管理
    • Flink 容错,检查点,保存点
    • Flink 状态一致性
    • Flink Table API 和 Flink SQL
    • Flink CEP编程
    • Flink Joining编程
    • Flink CDC
  • flume

    • Flume 日志收集系统介绍及安装
    • Flume Source支持的类型
    • Flume Sink支持的类型
    • Flume Channel支持的类型
    • Flume Selector
    • Flume Interceptor拦截器类型
    • Flume Process
      • Failover Sink Processor 故障恢复
        • 配置
      • Load balancing Sink Processor 负载均衡
        • 配置
      • flume 内存通道事务机制
        • put事务
        • put事务流程
        • take事务
        • take 事务流程
  • sqlite

    • SQLite介绍
目录

Flume Process

本文及后续所有文章都以 1.8.0 做为版本讲解和入门学习

# Failover Sink Processor 故障恢复

Sink Group 允许用户将多个 Sink 组合成一个实体。Flume Sink Process 可以通过切换组内 Sink 用来实现负载均衡的效果,或在一个 Sink 故障时切换到另一个 Sink。

Failover Sink Processor 维护一个 sink 们的优先表。确保只要一个可用的,事件就可以被处理。失败处理原则是,为失效的 sink 指定一个冷却时间,在冷却时间达到后再重新使用,这里会有个问题就是重新启动的 flume 和备用的 flume 都会接收到信息。当 node104 宕机不在工作,node113 就会被执行工作。

sink 们可以被配置一个优先级,数字越大优先级越高。

如果 sink 们发送时间失败,则下一个最高优先级的 sink 奖会尝试接着发送事件。

如果没有指定优先级,则优先级顺序取决于 sink 们的配置顺序,先配置的默认优先级高于后配置的。

在配置过程中,设置一个 group processor,并且为每个 sink 指定一个优先级,优先级必须是唯一的。

# 配置

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1 c2
a1.sinks=s1 s2

# 配置故障恢复
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=s1 s2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.s1=5
a1.sinkgroups.g1.processor.priority.s2=10


# 指定source 的数据来源以及堆外开放的端口
a1.sources.r1.type=netcat
a1.sources.r1.bind=node113
a1.sources.r1.port=8888

# 指定a1的channels基于内存
a1.channels.c1.type=memory
a1.channels.c2.type=memory

# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=node103
a1.sinks.s1.port=8888

a1.sinks.s2.type=avro
a1.sinks.s2.hostname=node104
a1.sinks.s2.port=8888

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1 c2
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# Load balancing Sink Processor 负载均衡

Load balancing Sink Processor 提供了再多个 sink 之间实现负载均衡的能力。他维护了一个活动 sink 的索引列表。它支持轮询或所及方式的负载均衡,默认值是轮询方式,可以通过篇日志指定,也可以通过实现 AbstractSinkSelector 接口实现自定义的选择机制。

负载均衡的机制是通过 单个 channel 轮询或随机发送给某个 sink。

# 配置

# a1 代表一个flume 给每个组件匿名
a1.sources=r1
a1.channels=c1
a1.sinks=s1 s2

# 配置负载
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=s1 s2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.selector=round_robin


# 指定source 的数据来源以及堆外开放的端口
a1.sources.r1.type=netcat
a1.sources.r1.bind=node113
a1.sources.r1.port=8888

# 指定a1的channels基于内存
a1.channels.c1.type=memory

# 指定a1的sinks 输出到控制台
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=node103
a1.sinks.s1.port=8888

a1.sinks.s2.type=avro
a1.sinks.s2.hostname=node104
a1.sinks.s2.port=8888

# 绑定a1 sources和channle 的关系
a1.sources.r1.channels=c1
# 绑定a1 sinks 和 channel 的关系
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# flume 内存通道事务机制

flume 的事务机制与可靠性保证的实现,最核心的组件是 channel,如果没有 channel 组件,而仅靠 source 与 sink 组件是无从谈起的。

# put 事务

# put 事务流程

  • doput 将批数据先写入临时缓冲区 putList(LinkedBlockingDequeue)链表结构组成的双向阻塞队列。
  • doCommit 检查 memoryChannel 内存队列是否足够合并
  • doRollback memoryChannel 内存队列空间不足,回滚,等待内存通道容量满足合并

putList 就是一个临时的缓冲区,数据会先 put 到 putList,最后由 commit 方法检查 memoryChannel 是否有足够的缓冲区,有则合并到 memoryChannel 的队列。

# take 事务

# take 事务流程

  • doTake 先将数据发往临时缓冲区 takeList(LinkedBlockingDequeue)将输出发送到下一个节点
  • doCimmit 如果数据全部发送成功,则清除临时缓冲区 takeList
  • doRollback 数据发送过程中出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 memoryChannel
  • bucketWriter 类 会监听数据的发送成功或失败,并响应给 takeList
上次更新: 6/11/2025, 4:10:30 PM
Flume Interceptor拦截器类型
SQLite介绍

← Flume Interceptor拦截器类型 SQLite介绍→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式