FlinkCDAS同步starrrocks,varchar超长导致数据同步失败

编程入门 行业动态 更新时间:2024-10-09 14:21:14

FlinkCDAS同步starrrocks,varchar超长导致<a href=https://www.elefans.com/category/jswz/34/1769993.html style=数据同步失败"/>

FlinkCDAS同步starrrocks,varchar超长导致数据同步失败

FlinkCDAS同步starrrocks,varchar超长导致数据同步失败

使用FlinkCDAS同步mysql数据到starrocks时,报org.apache.flink.table.api.TableException: Failed to deserialize the input record,看异常栈根源是starrocks入库异常,报too many filtered rows,同步的数据中存在异常行,被过滤了,无法正常入库。

org.apache.flink.table.api.TableException: Failed to deserialize the input record: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1678505077, file=, pos=0}} ConnectRecord{topic='mysql_binlog_source.dd.tableee', kafkaPartition=null, key=Struct{id=6031202774149}, keySchema=Schema{mysql_binlog_source.db.tableee.Key:STRUCT}, value=Struct{after=Struct{id=60312027,name=Ad from a Page post #6,031,202,774,149,run_status=,object_id=0,object_story_id=,effective_object_story_id=1693370527557087_1764951593732313,object_story_spec={"page_id":"1693370527557087","link_data":{"name":"Segredos para usu\u00e1rios Samsung!","image_hash":"efb4d7d867876aa44d33a4ba3b8e12adgb","call_to_action":{"type":"INSTALL_MOBILE_APP"}}},body=,image_hash=,image_file=,image_url=,image_crops=,video_id=,actor_image_hash=,link_url=,object_url=,url_tags=,preview_url=,thumbnail_url=,follow_redirect=,object_store_url=,link_deep_link_url=,call_to_action_type=,object_type=,product_set_id=,adlabels=,applink_treatment=,dynamic_ad_voice=,place_page_set_id=0,instagram_actor_id=0,instagram_permalink_url=,template_url=,object_instagram_id=0,link_og_id=,instagram_story_id=,zipbytes=,copy_from=,asset_feed_spec=,created_time=1655725861000,updated_time=1667633606000,page_id=1693370527557087},source=Struct{version=1.6.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,db=db,table=creative,server_id=0,file=,pos=0,row=0},op=r,ts_ms=1678505077334}, valueSchema=Schema{mysql_binlog_source.db.tableee.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}.at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:150)at com.ververica.cdc.connectors.mysql.source.MySqlEvolvingSourceDeserializeSchema.deserialize(MySqlEvolvingSourceDeserializeSchema.java:143)at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:131)at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:111)at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:83)at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:56)at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:159)at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:70)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:641)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1079)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1028)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)at java.lang.Thread.run(Thread.java:834)Suppressed: java.lang.RuntimeException: Writing records to StarRocks failed.at com.starrocks.connector.flink.manager.StarRocksSinkManager.checkFlushException(StarRocksSinkManager.java:396)at com.starrocks.connector.flink.manager.StarRocksSinkManager.close(StarRocksSinkManager.java:285)at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.close(StarRocksDynamicSinkFunction.java:242)at org.apache.flink.apimon.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1272)at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1191)at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:936)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:936)... 3 moreCaused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response: 
{"Status":"Fail","BeginTxnTimeMs":0,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"3ecfb766-d4c0-4a4a-9ace-139e74e39f4a","LoadBytes":51839093,"StreamLoadPlanTimeMs":0,"NumberTotalRows":48570,"WriteDataTimeMs":471,"TxnId":2805956,"LoadTimeMs":473,"ErrorURL":"http://192.16.61.113:8040/api/_load_error_log?file=error_log_3542ac0c208e8ddc_b712c1cb407f85b9","ReadDataTimeMs":52,"NumberLoadedRows":48566,"NumberFilteredRows":4}
{"streamLoadErrorLog":"Error: NULL value in non-nullable column 'name'. Row: [6004092296930, '', 170703647, 6004092298930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092297130, '', 170703647, 6004092299930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092297730, '', 170703647, 6004092301330, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092304530, '', 170703647, 6004092304930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\n"}at com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor.doStreamLoad(StarRocksStreamLoadVisitor.java:105)at com.starrocks.connector.flink.manager.StarRocksSinkManager.asyncFlush(StarRocksSinkManager.java:320)at com.starrocks.connector.flink.manager.StarRocksSinkManager.lambda$startAsyncFlushing$0(StarRocksSinkManager.java:162)... 1 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operatorat org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:168)at com.ververica.cdc.connectors.mysql.source.MySqlEvolvingSourceDeserializeSchema$SimpleCollector.collect(MySqlEvolvingSourceDeserializeSchema.java:236)at com.ververica.cdc.connectors.mysql.source.MySqlEvolvingSourceDeserializeSchema$SimpleCollector.collect(MySqlEvolvingSourceDeserializeSchema.java:227)at com.ververica.cdc.debezium.table.AppendMetadataCollector.collect(AppendMetadataCollector.java:51)at com.ververica.cdc.debezium.table.AppendSystemColumnAndMetadataCollector.collect(AppendSystemColumnAndMetadataCollector.java:43)at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:169)at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:130)... 18 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operatorat org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:40)at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:28)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)at org.apache.flink.table.runtime.operators.evolution.SchemaEvolutionOperator.processElement(SchemaEvolutionOperator.java:171)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)... 30 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operatorat org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:101)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)... 38 more
Caused by: java.lang.RuntimeException: Writing records to StarRocks failed.at com.starrocks.connector.flink.manager.StarRocksSinkManager.checkFlushException(StarRocksSinkManager.java:396)at com.starrocks.connector.flink.manager.StarRocksSinkManager.writeRecords(StarRocksSinkManager.java:215)at com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction.invoke(StarRocksDynamicSinkFunction.java:198)at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:68)at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)... 44 more[CIRCULAR REFERENCE:com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response: 
{"Status":"Fail","BeginTxnTimeMs":0,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"3ecfb766-d4c0-4a4a-9ace-139e74e39f4a","LoadBytes":51839093,"StreamLoadPlanTimeMs":0,"NumberTotalRows":48570,"WriteDataTimeMs":471,"TxnId":2805956,"LoadTimeMs":473,"ErrorURL":"http://192.16.61.123:8040/api/_load_error_log?file=error_log_3542ac0c208e8ddc_b712c1cb407f85b9","ReadDataTimeMs":52,"NumberLoadedRows":48566,"NumberFilteredRows":4}
{"streamLoadErrorLog":"Error: NULL value in non-nullable column 'name'. Row: [6004092296930, '', 170703647, 6004092298930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092297130, '', 170703647, 6004092299930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092297730, '', 170703647, 6004092301330, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\nError: NULL value in non-nullable column 'name'. Row: [6004092304530, '', 170703647, 6004092304930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]\n"}
]

通过starrocks返回的ErrorURL查看starrocks的异常日志。提示非空字段name存在空值。通过主键id反查mysqli源表的记录,发现name值是存在的。一开始怀疑是name的映射不对,但比较ddl发现是一致的。后来通过flink sql debug,发现是name值的字符长度超长,导致sink starrrocoks时被置空,最终导致入库异常。

Error: NULL value in non-nullable column 'name'. Row: [6004092296930, '', 170703647, 6004092298930, 0, '', '', NULL, '', 0, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 0, 0, '', '', 0, '', '', '', '', NULL, '', 2022-11-01 14:53:25, 2022-11-01 14:53:25, 0, 0]

源表和starrocks表中name的定义为name varchar(20) not null。但实际写入的name值超过了20.修改starrocks的name定义,字符长度大于mysql中name的最长字符值即可。

更多推荐

FlinkCDAS同步starrrocks,varchar超长导致数据同步失败

本文发布于:2024-03-13 15:06:35,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1734244.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数据同步   FlinkCDAS   starrrocks   varchar

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!