从postgresql读取数据时,无法在flink中获取Json数据

编程入门 行业动态 更新时间:2024-10-28 07:29:34
本文介绍了从postgresql读取数据时,无法在flink中获取Json数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我试图使用 flink 从 postgre 获取数据.代码如下:

I was trying to fetch data from postgre using flink. The following is the code:

dbData =env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(Utils.properties_fetch("drivername"))
.setDBUrl(Utils.properties_fetch("dbURL"))
.setUsername(Utils.properties_fetch("username"))
.setPassword(Utils.properties_fetch("password"))
.setQuery(sourcequery)
.setRowTypeInfo(newRowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DATE_TYPE_INFO,BasicTypeInfo.DATE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO))
.finish());

第三个 BasicTypeInfo.STRING_TYPE_INFO 是从 postgre 获取 jsonb 数据类型.

The third BasicTypeInfo.STRING_TYPE_INFO is fetching a jsonb data type from postgre.

我收到以下错误:

06/28/2018 14:02:09 Job execution switched to status FAILING.
java.lang.ClassCastException: org.postgresql.util.PGobject cannot be 
cast to java.lang.String at 
org.apache.flink.apimon.typeutils.base.StringSerializer
.serialize(StringSerializer.java:28) at 
org.apache.flink.api.java.typeutils.runtime
.RowSerializer.serialize(RowSerializer.java:160) at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer
.serialize(RowSerializer.java:46) at 
org.apache.flink.runtime.plugable.SerializationDelegate
.write(SerializationDelegate.java:54) at 
org.apache.flink.runtime.iowork.api.serialization
.SpanningRecordSerializer.addRecord(SpanningRecordSerializer
.java:93) at 
org.apache.flink.runtime.iowork.api.writer
.RecordWriter.sendToTarget(RecordWriter.java:114) at 
org.apache.flink.runtime.iowork.api.writer
.RecordWriter.emit(RecordWriter.java:89) at 
org.apache.flink.runtime.operators.shipping.OutputCollector
.collect(OutputCollector.java:65) at 
org.apache.flink.runtime.operators.util.metrics
.CountingCollector.collect(CountingCollector.java:35) at 
org.apache.flink.runtime.operators.DataSourceTask
.invoke(DataSourceTask.java:168)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

推荐答案

您的查询返回的字段之一似乎是 PGobject,其中 flink 需要一个字符串.

It seems like one of the field returned from your query is a PGobject, where flink expected a string.

您可以将此字段的 BasicTypeInfo.STRING_TYPE_INFO 更改为 TypeInformation.of(PGobject.class)

You can change the BasicTypeInfo.STRING_TYPE_INFO for this field to TypeInformation.of(PGobject.class)

后面可以添加一个map函数调用PGobject#value来获取这个字段的底层字符串值

Later you can add a map function to call PGobject#value to get the underlying string value of this field

这篇关于从postgresql读取数据时,无法在flink中获取Json数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 15:33:34,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/963843.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数据   postgresql   Json   flink

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!