为什么在光束管道复制元素中使用GroupByKey(在Google Dataflow上运行时)?

编程入门 行业动态 更新时间:2024-10-24 04:29:46
本文介绍了为什么在光束管道复制元素中使用GroupByKey(在Google Dataflow上运行时)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我们有一个管道,该管道首先从PubSub接收消息,每个消息都有一个文件名.这些文件被分解为行级别,解析为JSON对象节点,然后发送到外部解码服务(该服务对一些编码数据进行解码).对象节点最终将转换为表行,并写入Big Query.

We have a pipeline that starts by receiving messages from PubSub, each with the name of a file. These files are exploded to line level, parsed to JSON object nodes and then sent to an external decoding service (which decodes some encoded data). Object nodes are eventually converted to Table Rows and written to Big Query.

看来,Dataflow直到到达解码服务之前都没有确认PubSub消息.解码服务速度很慢,一次发送许多消息会导致积压.这意味着与PubSub消息关联的行可能需要一些时间才能到达解码服务.结果,PubSub没有收到确认并重新发送该消息.我对此进行的第一个尝试是在使用withAttributeId()传递给Reader的每个PubSub消息中添加一个属性.但是,在测试中,这只能防止重复出现的副本靠近在一起.

It appeared that Dataflow was not acknowledging the PubSub messages until they arrived at the decoding service. The decoding service is slow, resulting in a backlog when many message are sent at once. This means that lines associated with a PubSub message can take some time to arrive at the decoding service. As a result, PubSub was receiving no acknowledgement and resending the message. My first attempt to remedy this was adding an attribute to each PubSub messages that is passed to the Reader using withAttributeId(). However, on testing, this only prevented duplicates that arrived close together.

我的第二次尝试是添加融合断路器a>(示例)读.这只是执行不必要的GroupByKey,然后取消分组,其想法是GroupByKey强制Dataflow确认PubSub消息.

My second attempt was to add a fusion breaker (example) after the PubSub read. This simply performs a needless GroupByKey and then ungroups, the idea being that the GroupByKey forces Dataflow to acknowledge the PubSub message.

上面讨论的融合断路器的工作原理是,它阻止PubSub重新发送消息,但是我发现该GroupByKey输出的元素多于其收到的元素:查看图片.

The fusion breaker discussed above works in that it prevents PubSub from resending messages, but I am finding that this GroupByKey outputs more elements than it receives: See image.

为尝试诊断此问题,我删除了部分管道,以获取仍表现出此行为的简单管道.即使在以下情况下行为依然存在

To try and diagnose this I have removed parts of the pipeline to get a simple pipeline that still exhibits this behavior. The behavior remains even when

  • PubSub被一些虚拟转换所代替,这些虚拟转换发送出固定的消息列表,并且每次发送之间都略有延迟.
  • 写作转换已删除.
  • 所有侧面输入/输出均已删除.

我观察到的行为是:

  • 一些接收到的消息直接通过GroupByKey.
  • 在某一点之后,GroupByKey将保留"邮件(可能是由于GroupByKey之后的积压所致).
  • 这些消息最终退出GroupByKey(大小为1的组).
  • 短暂的延迟(大约3分钟)后,相同的消息再次退出GroupByKey(仍以大小为1的组).这可能会发生几次(我怀疑这与他们等待输入GroupByKey所花费的时间成正比).
  • 示例工作ID为2017-10-11_03_50_42-6097948956276262224.我没有在其他任何跑步者身上跑过光束.

    Example job id is 2017-10-11_03_50_42-6097948956276262224. I have not run the beam on any other runner.

    Fusion Breaker如下:

    The Fusion Breaker is below:

    @Slf4j public class FusionBreaker<T> extends PTransform<PCollection<T>, PCollection<T>> { @Override public PCollection<T> expand(PCollection<T> input) { return group(window(input.apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break in"))))) .apply("Getting iterables after breaking fusion", Values.create()) .apply("Flattening iterables after breaking fusion", Flatten.iterables()) .apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break out"))); } private PCollection<T> window(PCollection<T> input) { return input.apply("Windowing before breaking fusion", Window.<T>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()); } private PCollection<KV<Integer, Iterable<T>>> group(PCollection<T> input) { return input.apply("Keying with random number", ParDo.of(new RandomKeyFn<>())) .apply("Grouping by key to break fusion", GroupByKey.create()); } private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> { private Random random; @Setup public void setup() { random = new Random(); } @ProcessElement public void processElement(ProcessContext context) { context.output(KV.of(random.nextInt(), context.element())); } } }

    PassthroughLoggers仅记录通过的元素(我使用这些元素来确认元素确实重复,而不是计数有问题).

    The PassthroughLoggers simply log the elements passing through (I use these to confirm that elements are indeed repeated, rather than there being an issue with the counts).

    我怀疑这与Windows/触发器有关,但是我的理解是,无论使用哪种窗口设置,都不要在使用.discardingFiredPanes()时重复元素.我也尝试过FixedWindows,但没有成功.

    I suspect this is something to do with windows/triggers, but my understanding is that elements should never be repeated when .discardingFiredPanes() is used - regardless of the windowing setup. I have also tried FixedWindows with no success.

    推荐答案

    首先, Reshuffle 转换与您的Fusion Breaker等效,但是在性能上进行了一些改进,使其更可取.

    First, the Reshuffle transform is equivalent to your Fusion Breaker, but has some additional performance improvements that should make it preferable.

    第二,如果重试元素,则计数器和日志记录可能会多次看到该元素.如光束执行模型所述,步骤中的元素可能是重试是否融合了任何内容.

    Second, both counters and logging may see an element multiple times if it is retried. As described in the Beam Execution Model, an element at a step may be retried if anything that is fused into it is retried.

    您实际上在流水线输出中观察到重复吗?

    Have you actually observed duplicates in what is written as the output of the pipeline?

    更多推荐

    为什么在光束管道复制元素中使用GroupByKey(在Google Dataflow上运行时)?

    本文发布于:2023-10-31 05:00:20,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1545076.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:光束   管道   元素   Dataflow   Google

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!