Confluent 4.1.0

编程入门 行业动态 更新时间:2024-10-22 07:31:34
本文介绍了Confluent 4.1.0 -> KSQL : STREAM-TABLE join ->表数据空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

第 1 步:运行生产者以创建样本数据

STEP 1: Run the producer to create sample data

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic stream-test-topic \
         --property schema.registry.url=http://localhost:8081 \
         --property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"DEAL_ID","type":"string"},{"name":"DEAL_EXPENSE_CODE","type":"string"},{"name":"DEAL_BRANCH","type":"string"}]}'

样本数据:

{"DEAL_ID":"deal002", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal003", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal004", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal005", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal006", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal007", "DEAL_EXPENSE_CODE":"EXP001", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal008", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal009", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal010", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal011", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal012", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}

第 2 步:打开另一个终端并运行消费者以测试数据.

STEP 2: Open another terminal and run the consumer to test the data.

./bin/kafka-avro-console-consumer --topic stream-test-topic \
         --bootstrap-server localhost:9092 \
         --property schema.registry.url=http://localhost:8081 \
         --from-beginning

第 3 步:打开另一个终端并运行生产者.

STEP 3: Open another terminal and run the producer.

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic expense-test-topic \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
--property key.schema='"string"' \
         --property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"EXPENSE_CODE","type":"string"},{"name":"EXPENSE_DESC","type":"string"}]}'

数据:

"pk1":{"EXPENSE_CODE":"EXP001", "EXPENSE_DESC":"Regulatory Deposit"}
"pk2":{"EXPENSE_CODE":"EXP002", "EXPENSE_DESC":"ABC - Sofia"}
"pk3":{"EXPENSE_CODE":"EXP003", "EXPENSE_DESC":"Apple Corporation"}
"pk4":{"EXPENSE_CODE":"EXP004", "EXPENSE_DESC":"Confluent Europe"}
"pk5":{"EXPENSE_CODE":"EXP005", "EXPENSE_DESC":"Air India"}
"pk6":{"EXPENSE_CODE":"EXP006", "EXPENSE_DESC":"KLM International"}

第 4 步:打开另一个终端并运行消费者

STEP 4: Open another terminal and run the consumer

./bin/kafka-avro-console-consumer --topic expense-test-topic \
         --bootstrap-server localhost:9092 \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
         --from-beginning

第 5 步:登录 KSQL 客户端.

STEP 5: Login to KSQL client.

./bin/ksql http://localhost:8088

创建以下流和表并运行连接查询.

create following stream and table and run join query.

KSQL:

流:

    CREATE STREAM SAMPLE_STREAM 
       (DEAL_ID VARCHAR, DEAL_EXPENSE_CODE varchar, DEAL_BRANCH VARCHAR) 
       WITH (kafka_topic='stream-test-topic',value_format='AVRO', key = 'DEAL_ID');

表格:

CREATE TABLE SAMPLE_TABLE 
   (EXPENSE_CODE varchar, EXPENSE_DESC VARCHAR)
   WITH (kafka_topic='expense-test-topic',value_format='AVRO', key = 'EXPENSE_CODE');

以下是输出:

ksql> SELECT STREAM1.DEAL_EXPENSE_CODE, TABLE1.EXPENSE_DESC 
       from SAMPLE_STREAM STREAM1 LEFT JOIN SAMPLE_TABLE TABLE1 
       ON STREAM1.DEAL_EXPENSE_CODE = TABLE1.EXPENSE_CODE  
       WINDOW TUMBLING (SIZE 3 MINUTE) 
       GROUP BY STREAM1.DEAL_EXPENSE_CODE, TABLE1.EXPENSE_DESC;

EXP001 | null
EXP001 | null
EXP002 | null
EXP003 | null
EXP004 | null
EXP005 | null
EXP006 | null
EXP002 | null
EXP002 | null

推荐答案

tl;dr:您的表数据需要在您加入的列上键入.

使用上面的示例数据,了解如何进行调查和修复.

Using the sample data above, here's how to investigate and fix.

使用KSQL检查topic中的数据(不需要kafka-avro-console-consumer).输出数据的格式为时间戳、键、值

Use KSQL to check the data in the topics (no need for kafka-avro-console-consumer). Format of the output data is timestamp, key, value

:

ksql> print 'stream-test-topic' from beginning;
Format:AVRO
30/04/18 15:59:13 BST, null, {"DEAL_ID": "deal002", "DEAL_EXPENSE_CODE": "EXP002", "DEAL_BRANCH": "AMSTERDAM"}
30/04/18 15:59:13 BST, null, {"DEAL_ID": "deal003", "DEAL_EXPENSE_CODE": "EXP003", "DEAL_BRANCH": "AMSTERDAM"}
30/04/18 15:59:13 BST, null, {"DEAL_ID": "deal004", "DEAL_EXPENSE_CODE": "EXP004", "DEAL_BRANCH": "AMSTERDAM"}

表格:

ksql> print 'expense-test-topic' from beginning;
Format:AVRO
30/04/18 16:10:52 BST, pk1, {"EXPENSE_CODE": "EXP001", "EXPENSE_DESC": "Regulatory Deposit"}
30/04/18 16:10:52 BST, pk2, {"EXPENSE_CODE": "EXP002", "EXPENSE_DESC": "ABC - Sofia"}
30/04/18 16:10:52 BST, pk3, {"EXPENSE_CODE": "EXP003", "EXPENSE_DESC": "Apple Corporation"}
30/04/18 16:10:52 BST, pk4, {"EXPENSE_CODE": "EXP004", "EXPENSE_DESC": "Confluent Europe"}
30/04/18 16:10:52 BST, pk5, {"EXPENSE_CODE": "EXP005", "EXPENSE_DESC": "Air India"}
30/04/18 16:10:52 BST, pk6, {"EXPENSE_CODE": "EXP006", "EXPENSE_DESC": "KLM International"}

此时,请注意键 (pk) 与我们将要加入的列不匹配

At this point, note that the key (pk<x>) does not match the column on which we will be joining

注册两个主题:

ksql> CREATE STREAM deals WITH (KAFKA_TOPIC='stream-test-topic', VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------

ksql> CREATE TABLE expense_codes_table WITH (KAFKA_TOPIC='expense-test-topic', VALUE_FORMAT='AVRO', KEY='EXPENSE_CODE');

 Message
---------------
 Table created
---------------

告诉 KSQL 从每个主题的开头查询事件

Tell KSQL to query events from the beginning of each topic

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

验证表的每个 DDL 声明的键 (KEY='EXPENSE_CODE') 是否与底层 Kafka 消息的实际键匹配(可通过 ROWKEY 获得)系统栏):

Validate that the table's declared key per the DDL (KEY='EXPENSE_CODE') matches the actual key of the underlying Kafka messages (available through the ROWKEY system column):

ksql> SELECT ROWKEY, EXPENSE_CODE FROM expense_codes_table;
pk1 | EXP001
pk2 | EXP002
pk3 | EXP003
pk4 | EXP004
pk5 | EXP005
pk6 | EXP006

密钥不匹配.我们的加入注定失败!

神奇的解决方法——让我们使用 KSQL 重新设置主题!

Magic workaround—let's rekey the topic using KSQL!

将表的源主题注册为 KSQL STREAM:

ksql> CREATE STREAM expense_codes_stream WITH (KAFKA_TOPIC='expense-test-topic', VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------

创建派生流,键入正确的列.这是由重新加密的 Kafka 主题支持的.

Create a derived stream, keyed on the correct colum. This is underpinned by a re-keyed Kafka topic.

ksql> CREATE STREAM EXPENSE_CODES_REKEY AS SELECT * FROM expense_codes_stream PARTITION BY EXPENSE_CODE;

 Message
----------------------------
 Stream created and running
----------------------------

在重新加密的主题之上重新注册 KSQL _TABLE_:

ksql> DROP TABLE expense_codes_table;

 Message
----------------------------------------
 Source EXPENSE_CODES_TABLE was dropped
----------------------------------------
ksql> CREATE TABLE expense_codes_table WITH (KAFKA_TOPIC='EXPENSE_CODES_REKEY', VALUE_FORMAT='AVRO', KEY='EXPENSE_CODE');

 Message
---------------
 Table created
---------------

检查新表上的键(声明 vs 消息)匹配:

Check the keys (declared vs message) match on the new table:

ksql> SELECT ROWKEY, EXPENSE_CODE FROM expense_codes_table;
EXP005 | EXP005
EXP001 | EXP001
EXP002 | EXP002
EXP003 | EXP003
EXP006 | EXP006
EXP004 | EXP004  

成功加入:

ksql> SELECT D.DEAL_EXPENSE_CODE, E.EXPENSE_DESC \
FROM deals D \
  LEFT JOIN expense_codes_table E \
  ON D.DEAL_EXPENSE_CODE = E.EXPENSE_CODE  \
WINDOW TUMBLING (SIZE 3 MINUTE) \
GROUP BY D.DEAL_EXPENSE_CODE, E.EXPENSE_DESC;

EXP006 | KLM International
EXP003 | Apple Corporation
EXP002 | ABC - Sofia
EXP004 | Confluent Europe
EXP001 | Regulatory Deposit
EXP005 | Air India

这篇关于Confluent 4.1.0 -> KSQL : STREAM-TABLE join ->表数据空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 14:13:07,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/963087.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Confluent

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!