Apache Beam 根据上一行的值更新当前行值

编程入门 行业动态 更新时间:2024-10-26 07:33:35
本文介绍了Apache Beam 根据上一行的值更新当前行值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

Apache Beam 根据上一行的值更新值

Apache Beam update values based on the values from the previous row

我已将 CSV 文件中的值分组.在分组的行中,我们发现一些缺失值需要根据前一行的值进行更新.如果该行的第一列为空,则需要将其更新为 0.

I have grouped the values from a CSV file. Here in the grouped rows, we find a few missing values which need to be updated based on the values from the previous row. If the first column of the row is empty then we need to update it by 0.

我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?

I am able to group the records, But unable to figure out a logic to update the values, How do I achieve this?

记录

<头>客户 ID日期金额BS:894811/1/2012100BS:894821/1/2012BS:894831/1/2012300BS:894811/2/2012900BS:894821/2/2012200BS:894831/2/2012

分组记录

<头>客户 ID日期金额BS:894811/1/2012100BS:894811/2/2012900BS:894821/1/2012BS:894821/2/2012200BS:894831/1/2012300BS:894831/2/2012

更新缺失值

<头>客户 ID日期金额BS:894811/1/2012100BS:894811/2/2012900BS:894821/1/2012000BS:894821/2/2012200BS:894831/1/2012300BS:894831/2/2012300

到目前为止的代码:

public class GroupByTest {
    public static void main(String[] args) throws IOException {
        System.out.println("We are about to start!!");

        final File schemaFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");

        File csvFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions 
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows
        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

        // Transformation
        //Convert row to KV
        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
            .aggregateField("balance", Sum.ofDoubles(), "balances");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
        
                        
        
        pipeline.run().waitUntilFinish();
        System.out.println("The end");

    }

    private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
        String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
        LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
        if (logicalType != null) {
            type = logicalType.getName();
        }

        switch (type) {
        case "string":
            return row.getString(columnName);
        case "int":
            return Objects.requireNonNull(row.getInt32(columnName)).toString();
        case "bigint":
            return Objects.requireNonNull(row.getInt64(columnName)).toString();
        case "double":
            return Objects.requireNonNull(row.getDouble(columnName)).toString();
        case "timestamp-millis":
            return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

        default:
            return row.getString(columnName);

        }
    }



}

修改后的代码:原始代码

final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
        .aggregateField("amount", Sum.ofDoubles(), "balances");

按客户 ID 分组

class ToKV extends DoFn<Row, KV<String, Row>> {

    private static final long serialVersionUID = -8093837716944809689L;
    String columnName1 = null;

    @ProcessElement
    public void processElement(ProcessContext context) {
        Row row = context.element();
        org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
        context.output(KV.of(row.getValue(columnName1).toString(), row));
    }

    public void setColumnName1(String columnName1) {
        this.columnName1 = columnName1;
    }


}

按客户 ID 分组:

ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
    
    
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());

//尝试按日期分组

    PCollection<Row> outputRow = 
            groupedKVRows
            .apply(ParDo.of(new GroupByDate()))
            .setCoder(RowCoder.of(AvroUtils.toBeamSchema(schema)));

如何编写将 Iterable 转换为 pCollection 的逻辑,以便对日期进行排序.

How to write the logic to convert Iterable to pCollection so that the date can be sorted.

class GroupByDate extends DoFn<KV<String,Iterable<Row>>, Row> {

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        String strKey = context.element().getKey();
        Iterable<Row> rows = context.element().getValue();
        
    
        
        
    }

Avro 架构:

{
  "type" : "record",
  "name" : "Entry",
  "namespace" : "transform",
  "fields" : [  {
    "name" : "customerId",
    "type" : [ "string", "null" ]
  }, {
    "name" : "date",
    "type" : [ "string", "null" ],
    "logicalType": "date"
    
  }, {
    "name" : "amount",
    "type" : [ "double", "null" ]
  } ]
}

更新以将 PCollection 转换为 Row[]

class KVToRow extends DoFn<KV<String, Iterable<Row>>, Row[]> {

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        String strKey = context.element().getKey();
        List<Row> rowList = new ArrayList();
        Iterable<Row> rowValue = context.element().getValue();
        rowValue.forEach(data -> {
            rowList.add(data);

        });
        Row[] rowArray = new Row[rowList.size()-1];
        rowArray=rowList.toArray(rowArray);
        context.output(rowArray);
    }
}

建议代码

Row[] rowArray = Iterables.toArray(rows, Row.class);

错误:

Iterables 类型中的 toArray(Iterable<? extends T>, Class) 方法不适用于参数 (PCollection, Class)

The method toArray(Iterable<? extends T>, Class) in the type Iterables is not applicable for the arguments (PCollection, Class)

将可迭代对象转换为数组

Row[] rowArray =  groupedKVRows.apply(ParDo.of(new KVToRow()));

错误:

此行有多个标记- 类型不匹配:无法从 PCollection 转换到行[]- 1 行更改,2 行删除

Multiple markers at this line - Type mismatch: cannot convert from PCollection<Row[]> to Row[] - 1 changed line, 2 deleted

推荐答案

Beam 不提供任何顺序保证,因此您必须像以前一样将它们分组.

Beam does not provide any order guarantees, so you will have to group them as you did.

但据我所知,您需要按 customerId 分组.之后,您可以应用像 ParDo 这样的 PTransform 按 date 对分组的行进行排序,并根据需要填充缺失值.

But as far as I can understand from your case, you need to group by customerId. After that, you can apply a PTransform like ParDo to sort the grouped Rows by date and fill missing values however you wish.

通过转换为数组的排序示例

// Create a formatter for parsing dates
DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");

// Convert iterable to array
Row[] rowArray = Iterables.toArray(rows, Row.class);

// Sort array using dates
Arrays
    .sort(
        rowArray,
        Comparator
            paringLong(row -> formatter.parseDateTime(row.getString("date")).getMillis())
    );

// Store the last amount
Double lastAmount = 0.0;

// Iterate over the array and fill in the missing parts
for (Row row : rowArray) {

    // Get current amount
    Double currentAmount = row.getDouble("amount");

    // If null, fill the previous value
    ...
}

这篇关于Apache Beam 根据上一行的值更新当前行值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-04-19 23:20:06,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/971342.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Apache   Beam

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!