有什么方法可以在kafka-console-producer中产生一个带有空值的消息(即,将其标记为供压缩程序使用逻辑删除的消息)?
Is there any way to produce a message in the kafka-console-producer with a null value (ie. mark it for the compactor to delete it with a tombstone)?
我尝试生成"mykey"和"mykey |".前者产生错误,而后者使值成为空字符串.像这样运行生产者:
I've tried producing "mykey" and "mykey|". The former produces an error and the later makes the value the empty string. Running producer like this:
$KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=|"推荐答案
不幸的是,无法使用console-producer来做到这一点
Unfortunately, there is no way to do that using console-producer
这是ConsoleProducer类的代码片段(它如何读取数据). Kafka 0.11.0(不要认为它在不同版本之间进行了重大更改).
this is a code snippet from ConsoleProducer class (how it reads the data). Kafka 0.11.0 (don't think that it was changed significantly between different versions).
override def readMessage() = { lineNumber += 1 print(">") (reader.readLine(), parseKey) match { case (null, _) => null case (line, true) => line.indexOf(keySeparator) match { case -1 => if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8)) else throw new KafkaException(s"No key found on line $lineNumber: $line") case n => val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8) new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value) } case (line, false) => new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8)) } }如您所见,该值始终是一个不可为空的字节数组
as you can see, the value is always an non-nullable array of bytes
更多推荐
从控制台产生具有空值(墓碑)的Kafka消息
发布评论