问题描述
限时送ChatGPT账号..我是 Flink 和集群计算的新手.我花了一整天的时间试图在 Flink 上正确解析来自 Kafka 的愚蠢流,但没有结果:这有点令人沮丧......我在 kafka 中有一个用字符串键标识的 JSON-LD 消息流.我只是想在 Flink 中检索它们,然后用不同的键将消息分开.
I'm a very newbie of Flink and cluster computing. I spent all day trying to parse correctly on Flink a stupid stream from Kafka with NONE results: It's a bit frustrating... I've in kafka a stream of JSON-LD messages identified with a string key. I simply would like to retrieve them in Flink and then separate messages with different keys.
1)最初我考虑将消息作为字符串而不是 JSON-LD 发送.我虽然更容易...
1) Initially I considered to send messages as String instead of JSON-LD. I though was easier...
我尝试了所有反序列化器,但没有一个有效.简单的反序列化器显然有效,但它完全忽略了键.
I tried every deserialiser but none works. The simple deserialiser obsviously works but it completely ignore keys.
我相信我必须使用(Flink 显然只有两个支持键的反序列化器):
I believed I had to use (Flink apparently has just two deserialiser which support keys):
DataStream<Object> stream = env
.addSource(new FlinkKafkaConsumer010<>("topicTest", new TypeInformationKeyValueSerializationSchema(String.class, String.class, env.getConfig()), properties))
.rebalance();
stream.print();
但我得到:
06/12/2017 02:09:12 来源:自定义来源(4/4)切换到失败java.io.EOF异常在 org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
06/12/2017 02:09:12 Source: Custom Source(4/4) switched to FAILED java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
如何在不丢失密钥的情况下接收流消息?
How can I receive stream messages without lose keys?
2)我的 kafka 生产者是用 javascript 实现的,因为 Flink 支持 JSONDeserialization 我虽然直接在 kafka 中发送 JSON 对象.我不确定这是否适用于 JSON-LD,但我已经使用过:
2) My kafka producer is implemented in javascript, since Flink support JSONDeserialization I though to send in kafka directly JSON Object. I'm not sure that's works correctly with JSON-LD but I've used:
json.parse(jsonld_message)
将消息序列化为 json.然后我用通常的字符串键发送了这个.
to serialize as json the message. Then I sent this with usual string key.
但在 Flink 中此代码不起作用:
But in Flink this code doesn't work:
DataStream<ObjectNode> stream = env
.addSource(new FlinkKafkaConsumer010<>("topicTest", new JSONKeyValueDeserializationSchema(false), properties))
.rebalance();
stream.print();
举个
JsonParserException.
JsonParserException.
我认为第一种方法更简单,我更喜欢它,因为它允许同时考虑一个问题(第一:接收数据,第二:我猜用外部库重新转换 JSON-LD 中的字符串).
I think first approach is simpler and I prefer it because allows to consider one problem at time (first: receive data, second: reconvert string in JSON-LD with external library I guess).
推荐答案
SOLVED:
最后我决定实现一个实现 KeyedDeserializedSchema 接口的自定义反序列化器.
Finally I decided to implement a custom deserializer implementing the KeyedDeserializedSchema interface.
这篇关于Flink反序列化的两个问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论