从卡夫卡获得最新价值

编程入门 行业动态 更新时间:2024-10-28 06:23:43
本文介绍了从卡夫卡获得最新价值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个名为 A 的Kafka主题.

I have a Kafka topic called A.

格式为:

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000} { id : 2, name:confluent, created_at:2017-09-28 22:00:00.000} { id : 3, name:kafka, created_at:2017-09-28 24:42:00.000} { id : 4, name:apache, created_at:2017-09-28 24:41:00.000}

现在在消费者方面,我只想获取一小时窗口的最新数据,这意味着每隔一小时我需要从基于created_at的主题中获取最新值

Now in consumer side i want to get only latest data of one hour window means every one hour i need to get latest value from topic based on created_at

我的预期输出是:

{ id : 1, name:stackoverflow, created_at:2017-09-28 22:30:00.000} { id : 3, name:kafka, created_at:2017-09-28 24:42:00.000}

我认为可以通过ksql解决,但我不确定.请帮助我.

I think this can be solve by ksql but i m not sure. Please help me.

谢谢.

推荐答案

是的,您可以为此使用KSQL.请尝试以下操作:

Yes, you can use KSQL for this. Try the following:

CREATE STREAM S1 (id BIGINT, name VARCHAR, created_at VARCHAT) WITH (kafka_topic = 'topic_name', value_format = 'JSON');

CREATE TABLE maxRow AS SELECT id, name, max(STRINGTOTIMESTAMP(created_at, 'yyyy-mm-dd hh:mm:ss.SSS')) AS creted_at FROM s1 WINDOW TUMBLING (size 1 hour) GROUP BY id, name;

结果将采用Linux时间戳格式的created_at时间.您可以在新查询中使用TIMESTAMPTOSTRING udf将其更改为所需的格式. 如果发现任何问题,请告诉我.

The result will have the created_at time in linux timestamp format. You can change it into your desired format using TIMESTAMPTOSTRING udf in a new query. Please let me know if you find any issues.

更多推荐

从卡夫卡获得最新价值

本文发布于:2023-10-30 08:17:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1542289.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:卡夫卡   价值   最新

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!