分区数据来自CSV,因此我可以处理较大的补丁,而不是单个行

编程入门 行业动态 更新时间:2024-10-14 12:20:43
本文介绍了分区数据来自CSV,因此我可以处理较大的补丁,而不是单个行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我刚刚开始使用Google Data Flow,我编写了一个简单的流程,该流程从云存储中读取CSV文件.步骤之一涉及调用Web服务以丰富结果.批量发送数百个请求时,所讨论的Web服务的性能要好得多.

I am just getting started with Google Data Flow, I have written a simple flow that reads a CSV file from cloud storage. One of the steps involves calling a web service to enrich results. The web service in question performs much better when sending several 100 requests in bulk.

在查看API时,我没有看到一种将PCollection的100个元素聚合到单个Par.do执行中的好方法.然后需要对结果进行拆分,以处理流程的最后一步,该步骤正在写入BigQuery表.

In looking at API I don't see a great way to aggregate 100 elements of a PCollection into a single Par.do Execution. The results would need to be then split to handle the last step of the flow which is writing to a BigQuery table.

不确定我是否需要使用窗口.我看到的大多数加窗示例都更适合在给定时间段内进行计数.

Not sure if I need to use windowing is what I want. Most of the windowing examples I see are more geared towards counting over a given time period.

推荐答案

您可以在DoFn的本地成员变量中缓冲元素,并在缓冲区足够大时以及在finishBundle中调用Web服务.例如:

You can buffer elements in a local member variable of your DoFn, and call your web service when the buffer is large enough, as well as in finishBundle. For example:

class CallServiceFn extends DoFn<String, String> { private List<String> elements = new ArrayList<>(); public void processElement(ProcessContext c) { elements.add(c.element()); if (elements.size() >= MAX_CALL_SIZE) { for (String result : callServiceWithData(elements)) { c.output(result); } elements.clear(); } } public void finishBundle(Context c) { for (String result : callServiceWithData(elements)) { c.output(result); } } }

更多推荐

分区数据来自CSV,因此我可以处理较大的补丁,而不是单个行

本文发布于:2023-11-24 13:22:22,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1625370.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分区   而不是   补丁   较大   数据

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!