数据流:在事件流中查找上一个事件

编程入门 行业动态 更新时间:2024-10-19 04:28:12
本文介绍了数据流:在事件流中查找上一个事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

在 Google Dataflow 中恢复我正在寻找的与 Apache Beam 相关的内容类似于 LAG 在 Azure 流分析

Resuming what I'm looking for to do with Apache Beam in Google Dataflow is something like LAG in the Azure Stream Analytics

使用 X 分钟的窗口接收数据:

Using a window of X minutes where I'm receiving data:

||||||  ||||||  ||||||  ||||||  ||||||  ||||||
|  1 |  |  2 |  |  3 |  |  4 |  |  5 |  |  6 | 
|id=x|  |id=x|  |id=x|  |id=x|  |id=x|  |id=x| 
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...

我需要比较数据(n)和数据(n-1),例如,按照前面的例子,它会是这样的:

I need to compare the data(n) with data(n-1), for example, following with the previous example, it will be something like this:

if data(6) inside and data(5)  outside then ... 
if data(5) inside and data(4)  outside then ... 
if data(4) inside and data(3)  outside then ... 
if data(3) inside and data(2)  outside then ... 
if data(2) inside and data(1)  outside then ... 

有什么实用的"方法可以做到这一点?

Is there any "practical "way to do this?

推荐答案

使用 Beam,如 docs,按键和窗口维护状态.因此,您无法访问先前窗口中的值.

With Beam, as explained in the docs, state is maintained per key and window. Therefore, you can't access values from previous windows.

要完成您想做的事情,您可能需要更复杂的管道设计.我的想法,在这里作为一个例子,是在 ParDo 中复制你的消息:

To do what you want to do you might need a more complex pipeline design. My idea, developed here as an example, would be to duplicate your messages in a ParDo:

将它们未经修改地发送到主输出同时,将它们发送到具有单窗口延迟的侧输出

为了做第二个要点,我们可以将窗口的持续时间(WINDOW_SECONDS)添加到元素时间戳:

To do the second bullet point, we can add the duration of a window (WINDOW_SECONDS) to the element timestamp:

class DuplicateWithLagDoFn(beam.DoFn):

  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    # Main output gets unmodified element
    yield element
    # The same element is emitted to the side output with a 1-window lag added to timestamp
    yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))

我们调用指定正确标签的函数:

We call the function specifying the correct tags:

beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')

然后对两者应用相同的窗口方案,通过键组合等.

and then apply the same windowing scheme to both, co-group by key, etc.

windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))

merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()

最后,我们可以在同一个 ParDo 中同时拥有两个值(旧的和新的):

Finally, we can have both values (old and new) inside the same ParDo:

class CompareDoFn(beam.DoFn):

  def process(self, element):
    logging.info("Combined with previous vale: {}".format(element))

    try:
      old_value = int(element[1][1][0].split(',')[1])
    except:
      old_value = 0

    try:
      new_value = int(element[1][0][0].split(',')[1])
    except:
      new_value = 0

    logging.info("New value: {}, Old value: {}, Difference: {}".format(new_value, old_value, new_value - old_value))
    return (element[0], new_value - old_value)

为了测试这一点,我使用直接运行器运行管道,并在单独的 shell 上发布两条间隔超过 10 秒的消息(在我的情况下,WINDOW_SECONDS 是 10 秒):

To test this I run the pipeline with the direct runner and, on a separate shell, I publish two messages more than 10 seconds apart (in my case WINDOW_SECONDS was 10s):

gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"

并且作业输出显示了预期的差异:

And the job output shows the expected difference:

INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40

我的示例的完整代码此处.在复制元素时要考虑性能因素,但如果您需要在两个窗口期间提供可用值,这是有道理的.

Full code for my example here. Take into account performance considerations as you are duplicating elements but it makes sense if you need to have values available during two windows.

这篇关于数据流:在事件流中查找上一个事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 23:31:18,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/972739.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:事件   数据流

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!