技术博客 技术博客
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • JAVA
  • 仓颉
  • 设计模式
  • 人工智能
  • Spring
  • Mybatis
  • Maven
  • Git
  • Kafka
  • RabbitMQ
  • RocketMQ
  • Redis
  • Zookeeper
  • Nginx
  • 数据库套件
  • MySQL
  • Elasticsearch
  • MongoDB
  • Hadoop
  • ClickHouse
  • Hbase
  • Hive
  • Flink
  • Flume
  • SQLite
  • linux
  • Docker
  • Jenkins
  • Kubernetes
  • 工具
  • 前端
  • AI
GitHub (opens new window)
  • kafka

    • kafka-2.7.0 基本概念
    • Kafka-2.7.0 搭建及参数解析
    • kafka-2.7.0 spring boot 集成 kafka
    • kafka-2.7.0 kafka Connect
    • kafka-2.7.0 Kafka Streams 流处理
      • 简述
      • 入门
      • 自定义 stream
  • RabbitMQ

    • rabbitmq 简介
  • RocketMQ

    • RocketMQ 基础概念
    • RocketMQ 搭建
    • RocketMQ 整合spring boot
  • redis

    • Redis 介绍及安装
    • Redis 命令介绍
    • Redis 分布式锁介绍
    • Redis 事务介绍
    • Redis 的key失效通知介绍
    • Redis 配置文件解读
    • Redis 记一次宕机排查
    • Redis 高可用(一) 主从理论
    • Redis 高可用(二) 哨兵理论
    • Redis 高可用(三) 搭建
    • Redis 集群搭建
  • zookeeper

    • Zookeeper 介绍及安装
    • Zookeeper 做为锁使用
  • nginx

    • nginx-1.18.0 安装
    • nginx 常见问题总结
    • nginx 高可用
  • 数据库套件

    • MyCat 1.6.7(一)MySQL高可用及分库分表
    • MyCat 1.6.7(二)高可用及权限
    • shardingsphere 4.x(一)Sharding-JDBC使用
    • shardingsphere 4.x(二)Sharding-Proxy使用
目录

kafka-2.7.0 Kafka Streams 流处理

# 简述

简单来说,Kafka Streams 就是可以在 kafka 内部实现一套零延时、可定制的数据计算处理逻辑,可以把不连续的数据进行计算。

# 入门

启动 zookeeper 和 kafka

bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh -daemon config/server.properties 
1

创建 topic

bin/kafka-topics.sh --create --zookeeper 192.168.81.62:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input 
bin/kafka-topics.sh --create --zookeeper 192.168.81.62:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-output
1
2

查看 topic 状态

bin/kafka-topics.sh --zookeeper 192.168.81.62:2181 --describe 
1

准备好了这些之后我们就可以运行 Kafka Streams 的 Word Count 程序了。 Kafka 自带了 Demo 程序供用户使用

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo 
1

上面的 WordCountDemo 会固定地读取名为 streams-plaintext-input topic ,为读取的每条消息执行 Word Count 程序的转换计算逻辑,然后持续地把处理结果固定写入 streams-wordcount-output 中 。当运行上述命令时,会看不到任何输出。需要启动 kafka-console-consumer 才能看到最终消息。但我们先启动 Kafka 自带的 console producer 来生产一些输入数据供 Word Count 程序消费。

bin/kafka-console-producer.sh --broker-list 192.168.81.62:9092 --topic streams-plaintext-input
1

启动后暂时不要发送任何数据,接下来新建窗口启动 kafka-console-consumer

bin/kafka-console-consumer.sh --bootstrap-server 192.168.81.62:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
1
2
3
4
5
6
7
8

如果以上命令出现 WARN 警告信息,则是正常的,这是因为当 clients 首次向 broker 发送请求获取该 topic 数据时,很可能尚未有该 topic 的元数据信息,故 broker 向 clients 返回的响应中会带上 LEADER_NOT_AVAILABLE 异常,表明 clients 应该主动更新元数据。

接下来发送数据,而在 kafka-console-consumer 窗口就能看到数据统计。

如果设置的 kafka 配置不是 localhost:9092 或者 127.0.01:9092,则运行不了,因为他的 demo 默认 bootstrap.server 就是 localhost:9092,也没地方修改配置,除非修改源码,那就等于自定义一样。

# 自定义 stream

在上面的例子,因为我本身 kafka 配置的原因导致我无法使用 stream,所以把代码改了改,让 bootstrap.server 指向我的配置。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>
1
2
3
4
package com.example.demo.stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @author big uncle
 * @date 2021/3/2 13:29
 * @module
 **/
public class TJStream {


    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-wordcount");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.81.62:9092") ;
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //使得每次运行程序时都能保证从头消费一次消息。
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        StreamsBuilder builder= new StreamsBuilder();
        // 指定输入 topic
        KStream<String, String> source = builder.stream("streams-plaintext-input");
        KTable<String,Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String s) {
                return Arrays.asList(s.toLowerCase(Locale.getDefault()).split(" "));
            }
        }).groupBy(new KeyValueMapper<String, String, String>() {
            @Override
            public String apply(String s, String s2) {
                return s2;
            }
        }).count();
        counts
            // 转换 KStream 类型
            .toStream()
            // 把 value 的 long 类型转换位 string 类型
            .map((k,v) -> new KeyValue<String,String>(k,String.valueOf(v)))
            // 发送到这个 topic
            .to("streams-wordcount-output",Produced.with(Serdes.String(),Serdes.String()));

        final KafkaStreams streams = new KafkaStreams (builder.build(), properties) ;
        // 添加监控,关闭之后释放资源
        final CountDownLatch latch = new CountDownLatch (1) ;
        Runtime.getRuntime().addShutdownHook (new Thread ("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        try {
            // 运行 这里不会阻塞
            streams.start();
            // 阻塞主线程
            latch.await();
        }catch(Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

kafka stream 里面的 Api 还是有必要学习一下的

打成 jar 包,然后把 jar 放入 /kafka/libs 下,启动

bin/kafka-run-class.sh com.example.demo.stream.TJStream
1

启动之后会有很多警告,不需要管,按照如上的步骤我们继续操作就行,到此就能看到输入的数据。以下是我输出的统计信息,但是 consumer 在消费的时候比较慢,不是即时的。

haha    4
hah     1
ha      5
1
2
3

我们也可以使用 spring boot 来监听 streams-wordcount-output 这个 topic 来接收数据

上次更新: 6/11/2025, 4:10:30 PM
kafka-2.7.0 kafka Connect
rabbitmq 简介

← kafka-2.7.0 kafka Connect rabbitmq 简介→

Theme by Vdoing | Copyright © 2023-2025
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式