来自KAFKA JSON的PipelineDB消费者与数组(PipelineDB consumer from KAFKA JSON with array)

编程入门 行业动态 更新时间:2024-10-28 01:24:16
来自KAFKA JSON的PipelineDB消费者与数组(PipelineDB consumer from KAFKA JSON with array)

我的卡夫卡发送以下json

'{ "eventSummaryList": [ { "customer": 1, "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}", "identifierRule": 1770, "identifierSummary": 17, "rule": "rota_fora", "status": 1, "vehicle": 103970 }, { "customer": 2, "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}", "identifierRule": 8, "identifierSummary": 7, "rule": "velocidade_maior", "status": 1, "vehicle": 103970 } ]

}”

我创造了这个连续的transfomr

CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS SELECT cast ( cast(pack ->>'eventSummaryList' as json)->>'customer' as bigint ) as customer FROM pipeline_kafka.sensor_event_process_stream

THEN EXECUTE procedure update_sensor_event_process_t();

但我的日志pipelineDB返回这个...

CONTEXT:JSON数据,第1行:{COPY sensor_event_process_stream,第1行,列包:“{”LOG:[pipeline_kafka] sensor_event_process_stream < - topicNotificationProcess(PID 25201):无法处理批处理,丢弃8条消息ERROR:无效的输入语法json细节:输入字符串意外结束。

我如何通过json数组并获取客户列的内容?

My kafka send the following json

'{ "eventSummaryList": [ { "customer": 1, "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}", "identifierRule": 1770, "identifierSummary": 17, "rule": "rota_fora", "status": 1, "vehicle": 103970 }, { "customer": 2, "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}", "identifierRule": 8, "identifierSummary": 7, "rule": "velocidade_maior", "status": 1, "vehicle": 103970 } ]

}'

I created this continuous transfomr

CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS SELECT cast ( cast(pack ->>'eventSummaryList' as json)->>'customer' as bigint ) as customer FROM pipeline_kafka.sensor_event_process_stream

THEN EXECUTE procedure update_sensor_event_process_t();

but my log pipelineDB return this...

CONTEXT: JSON data, line 1: { COPY sensor_event_process_stream, line 1, column pack: "{" LOG: [pipeline_kafka] sensor_event_process_stream <- topicNotificationProcess (PID 25201): failed to process batch, dropped 8 messages ERROR: invalid input syntax for type json DETAIL: The input string ended unexpectedly.

How do I go through the json array and get only the contents of the customer column?

最满意答案

嗨亲爱的我解决了我的问题,我使用函数json_array_elements,留下来像这样...

CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS SELECT Cast(value::json ->> 'identifierRule' AS BIGINT) AS id_regra, Cast(value::json ->> 'rule' AS VARCHAR) AS regra, Cast((value::json ->'data')::json->> 'veiculo' AS BIGINT) AS id_veiculo, Cast(value::json ->> 'customer' AS bigint) AS id_cliente , cast((value::json ->'data')::json->> 'velocidade' AS int) AS velocidade, cast((value::json ->'data')::json->> 'odometro' AS int) AS odometro, cast((value::json ->'data')::json->> 'data_posicao' AS bigint) AS data_posicao, cast((value::json ->'data')::json->> 'id_motorista' AS bigint) AS id_motorista, cast((value::json ->'data')::json->> 'latitude' AS float) AS latitude, cast((value::json ->'data')::json->> 'longitude' AS float) AS longitude, cast(value::json ->> 'status' AS boolean) AS status FROM ( SELECT (json_array_elements(pack->'eventSummaryList'))::json AS value FROM pipeline_kafka.sensor_event_process_stream ) b THEN EXECUTE procedure update_sensor_event_process_t();

一个重要的细节,不要传递JSON与空格和换行符,管道不accpet。

hi dears i solved my problem, i user the function json_array_elements, stayed like this ...

CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS SELECT Cast(value::json ->> 'identifierRule' AS BIGINT) AS id_regra, Cast(value::json ->> 'rule' AS VARCHAR) AS regra, Cast((value::json ->'data')::json->> 'veiculo' AS BIGINT) AS id_veiculo, Cast(value::json ->> 'customer' AS bigint) AS id_cliente , cast((value::json ->'data')::json->> 'velocidade' AS int) AS velocidade, cast((value::json ->'data')::json->> 'odometro' AS int) AS odometro, cast((value::json ->'data')::json->> 'data_posicao' AS bigint) AS data_posicao, cast((value::json ->'data')::json->> 'id_motorista' AS bigint) AS id_motorista, cast((value::json ->'data')::json->> 'latitude' AS float) AS latitude, cast((value::json ->'data')::json->> 'longitude' AS float) AS longitude, cast(value::json ->> 'status' AS boolean) AS status FROM ( SELECT (json_array_elements(pack->'eventSummaryList'))::json AS value FROM pipeline_kafka.sensor_event_process_stream ) b THEN EXECUTE procedure update_sensor_event_process_t();

An important detail, do not pass json with spaces and line breaks, the pipeline doesn't accpet.

更多推荐

本文发布于:2023-07-15 18:50:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1117472.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数组   消费者   JSON   KAFKA   PipelineDB

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!