kafka-2.7.0 Kafka Streams 流处理
# 简述
简单来说,Kafka Streams 就是可以在 kafka 内部实现一套零延时、可定制的数据计算处理逻辑,可以把不连续的数据进行计算。
# 入门
启动 zookeeper 和 kafka
bin/zookeeper-server-start.sh config/zookeeper.properties & bin/kafka-server-start.sh -daemon config/server.properties
创建 topic
bin/kafka-topics.sh --create --zookeeper 192.168.81.62:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
bin/kafka-topics.sh --create --zookeeper 192.168.81.62:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-output
2
查看 topic 状态
bin/kafka-topics.sh --zookeeper 192.168.81.62:2181 --describe
准备好了这些之后我们就可以运行 Kafka Streams 的 Word Count 程序了。 Kafka 自带了 Demo 程序供用户使用
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
上面的 WordCountDemo 会固定地读取名为 streams-plaintext-input topic ,为读取的每条消息执行 Word Count 程序的转换计算逻辑,然后持续地把处理结果固定写入 streams-wordcount-output 中 。当运行上述命令时,会看不到任何输出。需要启动 kafka-console-consumer 才能看到最终消息。但我们先启动 Kafka 自带的 console producer 来生产一些输入数据供 Word Count 程序消费。
bin/kafka-console-producer.sh --broker-list 192.168.81.62:9092 --topic streams-plaintext-input
启动后暂时不要发送任何数据,接下来新建窗口启动 kafka-console-consumer
bin/kafka-console-consumer.sh --bootstrap-server 192.168.81.62:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
2
3
4
5
6
7
8
如果以上命令出现 WARN 警告信息,则是正常的,这是因为当 clients 首次向 broker 发送请求获取该 topic 数据时,很可能尚未有该 topic 的元数据信息,故 broker 向 clients 返回的响应中会带上 LEADER_NOT_AVAILABLE 异常,表明 clients 应该主动更新元数据。
接下来发送数据,而在 kafka-console-consumer 窗口就能看到数据统计。
如果设置的 kafka 配置不是 localhost:9092 或者 127.0.01:9092,则运行不了,因为他的 demo 默认 bootstrap.server 就是 localhost:9092,也没地方修改配置,除非修改源码,那就等于自定义一样。
# 自定义 stream
在上面的例子,因为我本身 kafka 配置的原因导致我无法使用 stream,所以把代码改了改,让 bootstrap.server 指向我的配置。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
2
3
4
package com.example.demo.stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
/**
* @author big uncle
* @date 2021/3/2 13:29
* @module
**/
public class TJStream {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-wordcount");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.81.62:9092") ;
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//使得每次运行程序时都能保证从头消费一次消息。
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
StreamsBuilder builder= new StreamsBuilder();
// 指定输入 topic
KStream<String, String> source = builder.stream("streams-plaintext-input");
KTable<String,Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String s) {
return Arrays.asList(s.toLowerCase(Locale.getDefault()).split(" "));
}
}).groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String s, String s2) {
return s2;
}
}).count();
counts
// 转换 KStream 类型
.toStream()
// 把 value 的 long 类型转换位 string 类型
.map((k,v) -> new KeyValue<String,String>(k,String.valueOf(v)))
// 发送到这个 topic
.to("streams-wordcount-output",Produced.with(Serdes.String(),Serdes.String()));
final KafkaStreams streams = new KafkaStreams (builder.build(), properties) ;
// 添加监控,关闭之后释放资源
final CountDownLatch latch = new CountDownLatch (1) ;
Runtime.getRuntime().addShutdownHook (new Thread ("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
// 运行 这里不会阻塞
streams.start();
// 阻塞主线程
latch.await();
}catch(Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
kafka stream 里面的 Api 还是有必要学习一下的
打成 jar 包,然后把 jar 放入 /kafka/libs 下,启动
bin/kafka-run-class.sh com.example.demo.stream.TJStream
启动之后会有很多警告,不需要管,按照如上的步骤我们继续操作就行,到此就能看到输入的数据。以下是我输出的统计信息,但是 consumer 在消费的时候比较慢,不是即时的。
haha 4
hah 1
ha 5
2
3
我们也可以使用 spring boot 来监听 streams-wordcount-output 这个 topic 来接收数据