Kafka Streams API:KStream到KTable

编程入门 行业动态 更新时间:2024-10-11 17:27:55
本文介绍了Kafka Streams API:KStream到KTable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个Kafka主题,我在其中发送位置事件(键=用户ID,值=用户位置).我可以将其作为KStream进行读取和处理:

I have a Kafka topic where I send location events (key=user_id, value=user_location). I am able to read and process it as a KStream:

KStreamBuilder builder = new KStreamBuilder(); KStream<String, Location> locations = builder .stream("location_topic") .map((k, v) -> { // some processing here, omitted form clarity Location location = new Location(lat, lon); return new KeyValue<>(k, location); });

这很好,但是我想在每个用户的最后一个位置使用KTable.我该怎么办?

That works well, but I'd like to have a KTable with the last known position of each user. How could I do it?

我可以写和读一个中间主题:

I am able to do it writing to and reading from an intermediate topic:

// write to intermediate topic locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux"); // build KTable from intermediate topic KTable<String, Location> table = builder.table("location_topic_aux", "store");

是否有从KStream获取KTable的简单方法?这是我第一个使用Kafka Streams的应用程序,因此我可能缺少明显的东西.

Is there a simple way to obtain a KTable from a KStream? This is my first app using Kafka Streams, so I'm probably missing something obvious.

推荐答案

更新:

在Kafka 2.5中,将添加新的方法KStream#toTable(),这将提供一种方便的方法将KStream转换为KTable.有关详细信息,请参见: cwiki.apache/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

In Kafka 2.5, a new method KStream#toTable() will be added, that will provide a convenient way to transform a KStream into a KTable. For details see: cwiki.apache/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL

原始答案:

目前尚无直截了当的方法.如Confluent常见问题解答中所述,您的方法绝对有效: docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a- ktable-没有聚合步骤

There is not straight forward way at the moment to do this. Your approach is absolutely valid as discussed in Confluent FAQs: docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

关于代码,这是最简单的方法.但是,这样做的缺点是(a)您需要管理其他主题,并且(b)由于向Kafka写入数据并从中重新读取数据,因此会导致额外的网络流量.

This is the simplest approach with regard to the code. However, it has the disadvantages that (a) you need to manage an additional topic and that (b) it results in additional network traffic because data is written to and re-read from Kafka.

有一种选择,使用虚拟减少":

There is one alternative, using a "dummy-reduce":

KStreamBuilder builder = new KStreamBuilder(); KStream<String, Long> stream = ...; // some computation that creates the derived KStream KTable<String, Long> table = stream.groupByKey().reduce( new Reducer<Long>() { @Override public Long apply(Long aggValue, Long newValue) { return newValue; } }, "dummy-aggregation-store");

与方法1相比,这种方法在代码上稍微复杂一些,但具有以下优点:(a)不需要手动主题管理,并且(b)无需从Kafka重新读取数据.

This approach is somewhat more complex with regard to the code compared to option 1 but has the advantage that (a) no manual topic management is required and (b) re-reading the data from Kafka is not necessary.

总体而言,您需要自己决定哪种方法更合适:

Overall, you need to decide by yourself, which approach you like better:

在选项2中,Kafka Streams将创建一个内部变更日志主题来备份KTable以实现容错功能.因此,这两种方法都需要在Kafka中进行一些额外的存储,并导致额外的网络流量.总体而言,这是在选项2中稍微复杂一些的代码与选项1中的手动主题管理之间的权衡.

In option 2, Kafka Streams will create an internal changelog topic to back up the KTable for fault tolerance. Thus, both approaches require some additional storage in Kafka and result in additional network traffic. Overall, it’s a trade-off between slightly more complex code in option 2 versus manual topic management in option 1.

更多推荐

Kafka Streams API:KStream到KTable

本文发布于:2023-11-25 15:03:14,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1630217.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Streams   Kafka   API   KTable   KStream

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!