我的卡夫卡发送以下json
'{ "eventSummaryList": [ { "customer": 1, "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}", "identifierRule": 1770, "identifierSummary": 17, "rule": "rota_fora", "status": 1, "vehicle": 103970 }, { "customer": 2, "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}", "identifierRule": 8, "identifierSummary": 7, "rule": "velocidade_maior", "status": 1, "vehicle": 103970 } ]}”
我创造了这个连续的transfomr
CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS SELECT cast ( cast(pack ->>'eventSummaryList' as json)->>'customer' as bigint ) as customer FROM pipeline_kafka.sensor_event_process_streamTHEN EXECUTE procedure update_sensor_event_process_t();
但我的日志pipelineDB返回这个...
CONTEXT:JSON数据,第1行:{COPY sensor_event_process_stream,第1行,列包:“{”LOG:[pipeline_kafka] sensor_event_process_stream < - topicNotificationProcess(PID 25201):无法处理批处理,丢弃8条消息ERROR:无效的输入语法json细节:输入字符串意外结束。
我如何通过json数组并获取客户列的内容?
My kafka send the following json
'{ "eventSummaryList": [ { "customer": 1, "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}", "identifierRule": 1770, "identifierSummary": 17, "rule": "rota_fora", "status": 1, "vehicle": 103970 }, { "customer": 2, "data": "{\"cliente\":\"52264\",\"data_posicao\":\"1484250682\",\"gps_valido\":\"1\",\"horimetro\":\"0\",\"ibuttonPart1\":\"0\",\"ibuttonPart2\":\"0\",\"id_evento\":\"null\",\"id_motorista\":\"0\",\"ignicao\":\"0\",\"latitude\":\"-25.5385123\",\"longitude\":\"-49.1995068\",\"odometro\":\"0\",\"pos_memoria\":\"0\",\"veiculo\":\"103970\",\"velocidade\":\"0\"}", "identifierRule": 8, "identifierSummary": 7, "rule": "velocidade_maior", "status": 1, "vehicle": 103970 } ]}'
I created this continuous transfomr
CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS SELECT cast ( cast(pack ->>'eventSummaryList' as json)->>'customer' as bigint ) as customer FROM pipeline_kafka.sensor_event_process_streamTHEN EXECUTE procedure update_sensor_event_process_t();
but my log pipelineDB return this...
CONTEXT: JSON data, line 1: { COPY sensor_event_process_stream, line 1, column pack: "{" LOG: [pipeline_kafka] sensor_event_process_stream <- topicNotificationProcess (PID 25201): failed to process batch, dropped 8 messages ERROR: invalid input syntax for type json DETAIL: The input string ended unexpectedly.
How do I go through the json array and get only the contents of the customer column?
最满意答案
嗨亲爱的我解决了我的问题,我使用函数json_array_elements,留下来像这样...
CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS SELECT Cast(value::json ->> 'identifierRule' AS BIGINT) AS id_regra, Cast(value::json ->> 'rule' AS VARCHAR) AS regra, Cast((value::json ->'data')::json->> 'veiculo' AS BIGINT) AS id_veiculo, Cast(value::json ->> 'customer' AS bigint) AS id_cliente , cast((value::json ->'data')::json->> 'velocidade' AS int) AS velocidade, cast((value::json ->'data')::json->> 'odometro' AS int) AS odometro, cast((value::json ->'data')::json->> 'data_posicao' AS bigint) AS data_posicao, cast((value::json ->'data')::json->> 'id_motorista' AS bigint) AS id_motorista, cast((value::json ->'data')::json->> 'latitude' AS float) AS latitude, cast((value::json ->'data')::json->> 'longitude' AS float) AS longitude, cast(value::json ->> 'status' AS boolean) AS status FROM ( SELECT (json_array_elements(pack->'eventSummaryList'))::json AS value FROM pipeline_kafka.sensor_event_process_stream ) b THEN EXECUTE procedure update_sensor_event_process_t();一个重要的细节,不要传递JSON与空格和换行符,管道不accpet。
hi dears i solved my problem, i user the function json_array_elements, stayed like this ...
CREATE CONTINUOUS TRANSFORM sensor_event_process_transform AS SELECT Cast(value::json ->> 'identifierRule' AS BIGINT) AS id_regra, Cast(value::json ->> 'rule' AS VARCHAR) AS regra, Cast((value::json ->'data')::json->> 'veiculo' AS BIGINT) AS id_veiculo, Cast(value::json ->> 'customer' AS bigint) AS id_cliente , cast((value::json ->'data')::json->> 'velocidade' AS int) AS velocidade, cast((value::json ->'data')::json->> 'odometro' AS int) AS odometro, cast((value::json ->'data')::json->> 'data_posicao' AS bigint) AS data_posicao, cast((value::json ->'data')::json->> 'id_motorista' AS bigint) AS id_motorista, cast((value::json ->'data')::json->> 'latitude' AS float) AS latitude, cast((value::json ->'data')::json->> 'longitude' AS float) AS longitude, cast(value::json ->> 'status' AS boolean) AS status FROM ( SELECT (json_array_elements(pack->'eventSummaryList'))::json AS value FROM pipeline_kafka.sensor_event_process_stream ) b THEN EXECUTE procedure update_sensor_event_process_t();An important detail, do not pass json with spaces and line breaks, the pipeline doesn't accpet.
更多推荐
发布评论