我们有一个管道,该管道首先从PubSub接收消息,每个消息都有一个文件名.这些文件被分解为行级别,解析为JSON对象节点,然后发送到外部解码服务(该服务对一些编码数据进行解码).对象节点最终将转换为表行,并写入Big Query.
We have a pipeline that starts by receiving messages from PubSub, each with the name of a file. These files are exploded to line level, parsed to JSON object nodes and then sent to an external decoding service (which decodes some encoded data). Object nodes are eventually converted to Table Rows and written to Big Query.
看来,Dataflow直到到达解码服务之前都没有确认PubSub消息.解码服务速度很慢,一次发送许多消息会导致积压.这意味着与PubSub消息关联的行可能需要一些时间才能到达解码服务.结果,PubSub没有收到确认并重新发送该消息.我对此进行的第一个尝试是在使用withAttributeId()传递给Reader的每个PubSub消息中添加一个属性.但是,在测试中,这只能防止重复出现的副本靠近在一起.
It appeared that Dataflow was not acknowledging the PubSub messages until they arrived at the decoding service. The decoding service is slow, resulting in a backlog when many message are sent at once. This means that lines associated with a PubSub message can take some time to arrive at the decoding service. As a result, PubSub was receiving no acknowledgement and resending the message. My first attempt to remedy this was adding an attribute to each PubSub messages that is passed to the Reader using withAttributeId(). However, on testing, this only prevented duplicates that arrived close together.
我的第二次尝试是添加融合断路器a>(示例)读.这只是执行不必要的GroupByKey,然后取消分组,其想法是GroupByKey强制Dataflow确认PubSub消息.
My second attempt was to add a fusion breaker (example) after the PubSub read. This simply performs a needless GroupByKey and then ungroups, the idea being that the GroupByKey forces Dataflow to acknowledge the PubSub message.
上面讨论的融合断路器的工作原理是,它阻止PubSub重新发送消息,但是我发现该GroupByKey输出的元素多于其收到的元素:查看图片.
The fusion breaker discussed above works in that it prevents PubSub from resending messages, but I am finding that this GroupByKey outputs more elements than it receives: See image.
为尝试诊断此问题,我删除了部分管道,以获取仍表现出此行为的简单管道.即使在以下情况下行为依然存在
To try and diagnose this I have removed parts of the pipeline to get a simple pipeline that still exhibits this behavior. The behavior remains even when
- PubSub被一些虚拟转换所代替,这些虚拟转换发送出固定的消息列表,并且每次发送之间都略有延迟.
- 写作转换已删除.
- 所有侧面输入/输出均已删除.
我观察到的行为是:
示例工作ID为2017-10-11_03_50_42-6097948956276262224.我没有在其他任何跑步者身上跑过光束.
Example job id is 2017-10-11_03_50_42-6097948956276262224. I have not run the beam on any other runner.
Fusion Breaker如下:
The Fusion Breaker is below:
@Slf4j public class FusionBreaker<T> extends PTransform<PCollection<T>, PCollection<T>> { @Override public PCollection<T> expand(PCollection<T> input) { return group(window(input.apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break in"))))) .apply("Getting iterables after breaking fusion", Values.create()) .apply("Flattening iterables after breaking fusion", Flatten.iterables()) .apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break out"))); } private PCollection<T> window(PCollection<T> input) { return input.apply("Windowing before breaking fusion", Window.<T>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()); } private PCollection<KV<Integer, Iterable<T>>> group(PCollection<T> input) { return input.apply("Keying with random number", ParDo.of(new RandomKeyFn<>())) .apply("Grouping by key to break fusion", GroupByKey.create()); } private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> { private Random random; @Setup public void setup() { random = new Random(); } @ProcessElement public void processElement(ProcessContext context) { context.output(KV.of(random.nextInt(), context.element())); } } }PassthroughLoggers仅记录通过的元素(我使用这些元素来确认元素确实重复,而不是计数有问题).
The PassthroughLoggers simply log the elements passing through (I use these to confirm that elements are indeed repeated, rather than there being an issue with the counts).
我怀疑这与Windows/触发器有关,但是我的理解是,无论使用哪种窗口设置,都不要在使用.discardingFiredPanes()时重复元素.我也尝试过FixedWindows,但没有成功.
I suspect this is something to do with windows/triggers, but my understanding is that elements should never be repeated when .discardingFiredPanes() is used - regardless of the windowing setup. I have also tried FixedWindows with no success.
推荐答案首先, Reshuffle 转换与您的Fusion Breaker等效,但是在性能上进行了一些改进,使其更可取.
First, the Reshuffle transform is equivalent to your Fusion Breaker, but has some additional performance improvements that should make it preferable.
第二,如果重试元素,则计数器和日志记录可能会多次看到该元素.如光束执行模型所述,步骤中的元素可能是重试是否融合了任何内容.
Second, both counters and logging may see an element multiple times if it is retried. As described in the Beam Execution Model, an element at a step may be retried if anything that is fused into it is retried.
您实际上在流水线输出中观察到重复吗?
Have you actually observed duplicates in what is written as the output of the pipeline?
更多推荐
为什么在光束管道复制元素中使用GroupByKey(在Google Dataflow上运行时)?
发布评论