将CoGroupByKey与自定义类型一起使用会导致Coder错误

编程入门 行业动态 更新时间:2024-10-24 20:12:33
本文介绍了将CoGroupByKey与自定义类型一起使用会导致Coder错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我想加入两个PCollection(分别来自不同的输入),并按照此处介绍的使用CoGroupByKey进行加入部分所述的步骤实施: cloud.google/dataflow/model/group-by-key

I want to join two PCollection (from a different input respectively) and implement by following the step described here, "Joins with CoGroupByKey" section: cloud.google/dataflow/model/group-by-key

就我而言,我想加入GeoIP的阻止信息和位置信息。因此,我将块和位置定义为自定义类,然后编写如下:

In my case, I want to join GeoIP's "block" information and "location" information. So I defined Block and Location as a custom class and then wrote like below:

final TupleTag<Block> t1 = new TupleTag<Block>(); final TupleTag<Location> t2 = new TupleTag<Location>(); PCollection<KV<Long, CoGbkResult>> coGbkResultColl = KeyedPCollectionTuple.of(t1, kvGeoNameIDBlock) .and(t2, kvGeoNameIDLocation).apply(CoGroupByKey.<Long>create());

键具有Long类型值。我以为已经完成,但是当我运行 mvn compile 时,它会输出以下错误:

A key has a Long type value. I thought it's done but when I run mvn compile, it outputs a following error:

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project xxxx: An exception occured while executing the Java class. null: InvocationTargetException: Unable to return a default Coder for Extract GeoNameID-Block KV/ParMultiDo(ExtractGeoNameIDBlock).out0 [PCollection]. Correct one of the following root causes: [ERROR] No Coder has been manually specified; you may do so using .setCoder(). [ERROR] Inferring a Coder from the CoderRegistry failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<java.lang.Long, com.xxx.platform.geoip2.Block>: Unable to provide a Coder for com.xxx.platform.geoip2.Block. [ERROR] Building a Coder using a registered CoderProvider failed. [ERROR] See suppressed exceptions for detailed failures. [ERROR] Using the default output Coder from the producing PTransform failed: Cannot provide coder for parameterized type org.apache.beam.sdk.values.KV<java.lang.Long, com.xxx.platform.geoip2.Block>: Unable to provide a Coder for com.xxx.platform.geoip2.Block.

输出错误的确切DoFn是 ExtractGeoNameIDBlock ,它只是创建其键(要连接)和自身的键值对。

The exact DoFn which outputs an error is ExtractGeoNameIDBlock, which simply creates a key-value pair of its key (to be joined) and itself.

// ExtractGeoNameIDBlock creates KV collection while reading from block CSV static class ExtractGeoNameIDBlock extends DoFn<String, KV<Long, Block>> { private static final long serialVersionUID = 1L; @ProcessElement public void processElement(ProcessContext c) throws Exception { String line = c.element(); if (!line.startsWith("network,")) { // exclude headerline Block b = new Block(); b.loadFromCsvLine(line); if (b.getGeonameId() != null) { c.output(KV.of(b.getGeonameId(), b)); } } } }

loadFromCsvLine 只是解析CSV行,将字段转换为每个对应的类型并分配给其私有字段。

loadFromCsvLine just parse CSV line, convert fields to each corresponding type and assign to its private fields.

需要为我的自定义类设置一些编码器以使其工作。 我找到了一个引用编码器的文档,但仍然不确定如何实现我的编码器。 cloud.google/dataflow/model/data-编码

So it looks I need to set some coder to my custom class to make it work. I found a document referring the coder but still not sure how I can implement mine. cloud.google/dataflow/model/data-encoding

有没有一个真实的例子可以帮助我为自定义类创建自定义编码器?

Is there any real example that I can follow to create a custom coder for my custom class?

[更新13:02 09/26/2017] 我添加了

[Update 13:02 09/26/2017] I added

CoderRegistry cr = p.getCoderRegistry(); cr.registerCoderForClass(Block.class, AvroCoder.of(Block.class));

然后出现错误

java.lang.NullPointerException: in com.xxx.platform.geoip2.Block in long null of long in field representedCountryGeonameId of com.xxx.platform.geoip2.Block

[更新14:05 09/26/2017] 我更改了这样的实现:

[Update 14:05 09/26/2017] I changed the implementation like this:

@DefaultCoder(AvroCoder.class) public class Block { private static final Logger LOG = LoggerFactory.getLogger(Block.class); @Nullable public String network; @Nullable public Long registeredCountryGeonameId; : :

(对所有属性设置@Nullable)

(Set @Nullable to all properties)

但仍然出现此错误:

(22eeaf3dfb26f8cc): java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null Long at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:191) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn.processElement(CoGroupByKey.java:185) Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null Long at org.apache.beam.sdk.coders.VarLongCoder.encode(VarLongCoder.java:51) at org.apache.beam.sdk.coders.VarLongCoder.encode(VarLongCoder.java:35) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135) at com.google.cloud.dataflow.worker.ShuffleSink$ShuffleSinkWriter.encodeToChunk(ShuffleSink.java:320) at com.google.cloud.dataflow.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:216) at com.google.cloud.dataflow.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:178) at com.google.cloud.dataflow.worker.utilmon.worker.WriteOperation.process(WriteOperation.java:80) at com.google.cloud.dataflow.worker.utilmon.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.ReifyTimestampAndWindowsParDoFnFactory$ReifyTimestampAndWindowsParDoFn.processElement(ReifyTimestampAndWindowsParDoFnFactory.java:68) at com.google.cloud.dataflow.worker.utilmon.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.utilmon.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn.processElement(CoGroupByKey.java:185) at org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233) at com.google.cloud.dataflow.worker.utilmon.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.utilmon.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at com.bandainamcoent.platform.GeoIpPopulateTable$ExtractGeoNameIDBlock.processElement(GeoIpPopulateTable.java:79) at com.bandainamcoent.platform.GeoIpPopulateTable$ExtractGeoNameIDBlock$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233) at com.google.cloud.dataflow.worker.utilmon.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.utilmon.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.utilmon.worker.ReadOperation.runReadLoop(ReadOperation.java:187) at com.google.cloud.dataflow.worker.utilmon.worker.ReadOperation.start(ReadOperation.java:148) at com.google.cloud.dataflow.worker.utilmon.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68) at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

谢谢。

推荐答案

您的自定义类 Block 看起来不像'没有指定编码器。您可以创建自己的 Coder ,或使用常规代码之一,例如 AvroCoder 。您还应该在 CoderRegistry 中注册它,以便管道知道如何对 Block s进行编码。

It looks like your custom class Block doesn't have a coder specified. You can create your own Coder, or use one of the general ones such as AvroCoder. You should also register it with the CoderRegistry so the pipeline knows how to encode Blocks.

更多推荐

将CoGroupByKey与自定义类型一起使用会导致Coder错误

本文发布于:2023-10-24 19:36:36,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1524812.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:自定义   错误   类型   CoGroupByKey   Coder

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!