环境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2
Kafka在0.10版本推出了Stream API,整合提供了对存储在Kafka内的实时统数据进行流式处理和分析的能力。
流式计算一般被用来和批量计算做比较。计数据批量计算往往有一个固定的整合数据集作为输入并计算结果。而流式计算的实时统输入往往是“无界”的(Unbounded Data),持续输入的计数据,即永远拿不到全量数据去做计算;同时,整合计算结果也是实时统持续输出的,只能拿到某一个时刻的计数据结果,而不是整合最终的结果。
Kafka Streams是实时统一个客户端类库,用于处理和分析存储在Kafka中的计数据数据。它建立在流式处理的整合一些重要的概念之上:如何区分事件时间和处理时间、Windowing的实时统支持、简单高效的计数据管理和实时查询应用程序状态。
Kafka Streams的云服务器提供商门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异,可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。
Kafka Streams的一些特点:
被设计成一个简单的、轻量级的客户端类库,能够被集成到任何Java应用中 除了Kafka之外没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性 通过可容错的状态存储实现高效的状态操作(windowed joins and aggregations) 支持exactly-once语义 支持纪录级的处理,实现毫秒级的延迟 提供High-Level的Stream DSL和Low-Level的Processor API有两种特殊的processor:
Source Processor 源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个kafka topic的记录并将其转发到其下游处理器,从而从一个或多个kafka topic生成其拓扑的输入流。
Sink Processor 接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka topic。源码库
相关的核心概念查看如下链接
下面演示Kafka Stream 在Springboot中的应用
消息转换并转发其它Topic
@Bean public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) { KStream<Object, Object> stream = streamsBuilder.stream("test"); stream.map((key, value) -> { System.out.println("原始消息内容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ; return new KeyValue<>(key, "{ \"title\": \"123123\", \"message\": \"重新定义内容\"}".getBytes(Charset.forName("UTF-8"))) ; }).to("demo") ; return stream; }执行结果:
Stream对象处理
@Bean public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) { JsonSerde<Message> jsonSerde = new JsonSerde<>() ; JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; descri.addTrustedPackages("*") ; KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); stream.map((key, value) -> { value.setTitle("XXXXXXX") ; return new KeyValue<>(key, value) ; }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ; return stream; }执行结果:
分组处理
@Bean public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) { JsonSerde<Message> jsonSerde = new JsonSerde<>() ; JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; descri.addTrustedPackages("*") ; KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); stream.selectKey(new KeyValueMapper<String, Message, String>() { @Override public String apply(String key, Message value) { return value.getOrgCode() ; } }) .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) .count() .toStream().print(Printed.toSysOut()); return stream; }执行结果:
聚合
@Bean public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) { JsonSerde<Message> jsonSerde = new JsonSerde<>() ; JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; descri.addTrustedPackages("*") ; KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); stream.selectKey(new KeyValueMapper<String, Message, String>() { @Override public String apply(String key, Message value) { return value.getOrgCode() ; } }) .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) .aggregate(() -> 0L, (key, value ,aggValue) -> { System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; return aggValue + 1 ; }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) .toStream().print(Printed.toSysOut()); return stream; }执行结果:
Filter过滤数据
@Bean public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) { JsonSerde<Message> jsonSerde = new JsonSerde<>() ; JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; descri.addTrustedPackages("*") ; KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); stream.selectKey(new KeyValueMapper<String, Message, String>() { @Override public String apply(String key, Message value) { return value.getOrgCode() ; } }) .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) .aggregate(() -> 0L, (key, value ,aggValue) -> { System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ; return aggValue + 1 ; }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long())) .toStream() .filter((key, value) -> !"2".equals(key)) .print(Printed.toSysOut()); return stream; }执行结果:
过滤Key不等于"2"
分支多流处理
@Bean public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) { JsonSerde<Message> jsonSerde = new JsonSerde<>() ; JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; descri.addTrustedPackages("*") ; KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); // 分支,多流处理 KStream<String, Message>[] arrStream = stream.branch( (key, value) -> "男".equals(value.getSex()), (key, value) -> "女".equals(value.getSex())); Stream.of(arrStream).forEach(as -> { as.foreach((key, message) -> { System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ; }); }); return stream; }执行结果:
多字段分组
不能使用多个selectKey,后面的会覆盖前面的
@Bean public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) { JsonSerde<Message> jsonSerde = new JsonSerde<>() ; JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ; descri.addTrustedPackages("*") ; KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde)); stream .selectKey(new KeyValueMapper<String, Message, String>() { @Override public String apply(String key, Message value) { System.out.println(Thread.currentThread().getName()) ; return value.getTime() + " | " + value.getOrgCode() ; } }) .groupByKey(Grouped.with(Serdes.String(), jsonSerde)) .count() .toStream().print(Printed.toSysOut()); return stream; }执行结果: