在 Apache Beam 中创建自定义窗口函数

编程入门 行业动态 更新时间:2024-10-21 23:26:28
本文介绍了在 Apache Beam 中创建自定义窗口函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我有一个 Beam 管道,它从读取多个文本文件开始,其中文件中的每一行代表一行,该行稍后会在管道中插入 Bigtable.该场景需要确认从每个文件中提取的行数后来插入到 Bigtable 匹配中的行数.为此,我计划开发一个自定义窗口策略,以便根据文件名作为将传递给窗口函数的键,将来自单个文件的行分配给单个窗口.

I have a Beam pipeline that starts off with reading multiple text files where each line in a file represents a row that gets inserted into Bigtable later in the pipeline. The scenario requires confirming that the count of rows extracted from each file & count of rows later inserted into Bigtable match. For this I am planning to develop a custom Windowing strategy so that lines from a single file get assigned to a single window based on the file name as the key that will be passed to the Windowing function.

是否有创建自定义窗口函数的代码示例?

Is there any code sample for creating custom Windowing functions?

推荐答案

虽然我改变了确认插入行数的策略,但对于任何对从批处理源读取的窗口元素感兴趣的人来说,例如FileIO 在批处理作业中,这是创建自定义窗口策略的代码:

Although I changed my strategy for confirming the inserted number of rows, for anyone who is interested in windowing elements read from a batch source e.g. FileIO in a batch job, here's the code for creating a custom windowing strategy:

public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{

private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);

@Override
public IntervalWindow assignWindow(Instant timestamp) {
    Instant end = new Instant(timestamp.getMillis() + 1);
    IntervalWindow interval = new IntervalWindow(timestamp, end);
    LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
    return interval;
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return this.equals(other);
}

@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
    if (!this.isCompatible(other)) {
        throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
    }
  }

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}   

}

然后它可以在管道中使用,如下所示:

and then it can be used in the pipeline as below:

p
 .apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
 .apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
  .withAllowedLateness(Duration.standardMinutes(1))
  .discardingFiredPanes());

请记住,您需要编写 AssignTimestampFn() 以便每条消息都带有时间戳.

Please keep in mind that you will need to write the AssignTimestampFn() so that each message carries a timestamp.

这篇关于在 Apache Beam 中创建自定义窗口函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 22:59:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/971390.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:自定义   函数   窗口   Apache   Beam

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!