python调用kafka拉取数据失败

编程入门 行业动态 更新时间:2024-10-16 02:30:53

python调用kafka拉取<a href=https://www.elefans.com/category/jswz/34/1771445.html style=数据失败"/>

python调用kafka拉取数据失败

我试图通过Kafka发送一个非常简单的JSON对象,并使用Python和kafkapython从另一端读取它。但是,我一直看到以下错误:2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback

Traceback (most recent call last):

File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs

f(value)

File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response

unpacked = list(self._unpack_message_set(tp, messages))

File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set

tp.topic, msg.value)

File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize

return f(bytes_)

File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in

value_deserializer=lambda m: json.loads(m).decode('utf-8'))

File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads

return _default_decoder.decode(s)

File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode

obj, end = self.raw_decode(s, idx=_w(s, 0).end())

File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode

raise ValueError("No JSON object could be decoded")

ValueError: No JSON object could be decoded

我做了一些研究,这个错误最常见的原因是JSON错误。在发送JSON之前,我已经尝试打印出JSON,方法是在代码中添加以下内容,JSON打印时没有错误。在

^{pr2}$

这使我怀疑我可以生成json,但不能使用它。在

这是我的代码:import threading

import logging

import time

import json

from kafka import KafkaConsumer, KafkaProducer

class Producer(threading.Thread):

daemon = True

def run(self):

producer = KafkaProducer(bootstrap_servers='localhost:9092',

value_serializer=lambda v: json.dumps(v).encode('utf-8'))

while True:

producer.send('my-topic', {"dataObjectID": "test1"})

producer.send('my-topic', {"dataObjectID": "test2"})

time.sleep(1)

class Consumer(threading.Thread):

daemon = True

def run(self):

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',

auto_offset_reset='earliest',

value_deserializer=lambda m: json.loads(m).decode('utf-8'))

consumer.subscribe(['my-topic'])

for message in consumer:

print (message)

def main():

threads = [

Producer(),

Consumer()

]

for t in threads:

t.start()

time.sleep(10)

if __name__ == "__main__":

logging.basicConfig(

format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +

'%(levelname)s:%(process)d:%(message)s',

level=logging.INFO

)

main()

如果删除value_序列化程序和value_反序列化程序,则可以成功发送和接收字符串。当我运行该代码时,我可以看到我正在发送的JSON。这里有一个小狙击手:ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25)

ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25)

ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)

ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)

ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)

ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)

因此,我尝试从使用者中移除value_反序列化程式,程式码会执行,但如果没有反序列化程式,讯息会以字串的形式出现,这不是我需要的。那么,为什么值反序列化器不能工作?有没有其他方法可以从我应该使用的Kafka消息中获取JSON?在

更多推荐

python调用kafka拉取数据失败

本文发布于:2024-02-07 00:42:06,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1751882.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数据   python   kafka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!