问题描述
限时送ChatGPT账号..Apache Beam 根据上一行的值更新值
Apache Beam update values based on the values from the previous row
我已将 CSV 文件中的值分组.在分组的行中,我们发现一些缺失值需要根据前一行的值进行更新.如果该行的第一列为空,则需要将其更新为 0.
I have grouped the values from a CSV file. Here in the grouped rows, we find a few missing values which need to be updated based on the values from the previous row. If the first column of the row is empty then we need to update it by 0.
我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?
I am able to group the records, But unable to figure out a logic to update the values, How do I achieve this?
记录
<头>分组记录
<头>更新缺失值
<头>到目前为止的代码:
public class GroupByTest {
public static void main(String[] args) throws IOException {
System.out.println("We are about to start!!");
final File schemaFile = new File(
"C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");
File csvFile = new File(
"C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Transformation
//Convert row to KV
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("balance", Sum.ofDoubles(), "balances");
final PCollection<Row> aggregagte = rows.apply(combine);
PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
if (logicalType != null) {
type = logicalType.getName();
}
switch (type) {
case "string":
return row.getString(columnName);
case "int":
return Objects.requireNonNull(row.getInt32(columnName)).toString();
case "bigint":
return Objects.requireNonNull(row.getInt64(columnName)).toString();
case "double":
return Objects.requireNonNull(row.getDouble(columnName)).toString();
case "timestamp-millis":
return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
default:
return row.getString(columnName);
}
}
}
修改后的代码:原始代码
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("amount", Sum.ofDoubles(), "balances");
按客户 ID 分组
class ToKV extends DoFn<Row, KV<String, Row>> {
private static final long serialVersionUID = -8093837716944809689L;
String columnName1 = null;
@ProcessElement
public void processElement(ProcessContext context) {
Row row = context.element();
org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
context.output(KV.of(row.getValue(columnName1).toString(), row));
}
public void setColumnName1(String columnName1) {
this.columnName1 = columnName1;
}
}
按客户 ID 分组:
ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());
//尝试按日期分组
PCollection<Row> outputRow =
groupedKVRows
.apply(ParDo.of(new GroupByDate()))
.setCoder(RowCoder.of(AvroUtils.toBeamSchema(schema)));
如何编写将 Iterable 转换为 pCollection 的逻辑,以便对日期进行排序.
How to write the logic to convert Iterable to pCollection so that the date can be sorted.
class GroupByDate extends DoFn<KV<String,Iterable<Row>>, Row> {
private static final long serialVersionUID = -1345126662309830332L;
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
Iterable<Row> rows = context.element().getValue();
}
Avro 架构:
{
"type" : "record",
"name" : "Entry",
"namespace" : "transform",
"fields" : [ {
"name" : "customerId",
"type" : [ "string", "null" ]
}, {
"name" : "date",
"type" : [ "string", "null" ],
"logicalType": "date"
}, {
"name" : "amount",
"type" : [ "double", "null" ]
} ]
}
更新以将 PCollection 转换为 Row[]
class KVToRow extends DoFn<KV<String, Iterable<Row>>, Row[]> {
private static final long serialVersionUID = -1345126662309830332L;
@ProcessElement
public void processElement(ProcessContext context) {
String strKey = context.element().getKey();
List<Row> rowList = new ArrayList();
Iterable<Row> rowValue = context.element().getValue();
rowValue.forEach(data -> {
rowList.add(data);
});
Row[] rowArray = new Row[rowList.size()-1];
rowArray=rowList.toArray(rowArray);
context.output(rowArray);
}
}
建议代码
Row[] rowArray = Iterables.toArray(rows, Row.class);
错误:
Iterables 类型中的 toArray(Iterable<? extends T>, Class) 方法不适用于参数 (PCollection, Class)
The method toArray(Iterable<? extends T>, Class) in the type Iterables is not applicable for the arguments (PCollection, Class)
将可迭代对象转换为数组
Row[] rowArray = groupedKVRows.apply(ParDo.of(new KVToRow()));
错误:
此行有多个标记- 类型不匹配:无法从 PCollection
Multiple markers at this line - Type mismatch: cannot convert from PCollection<Row[]> to Row[] - 1 changed line, 2 deleted
推荐答案
Beam 不提供任何顺序保证,因此您必须像以前一样将它们分组.
Beam does not provide any order guarantees, so you will have to group them as you did.
但据我所知,您需要按 customerId
分组.之后,您可以应用像 ParDo 这样的 PTransform 按 date
对分组的行进行排序,并根据需要填充缺失值.
But as far as I can understand from your case, you need to group by customerId
. After that, you can apply a PTransform like ParDo to sort the grouped Rows by date
and fill missing values however you wish.
通过转换为数组的排序示例
// Create a formatter for parsing dates
DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");
// Convert iterable to array
Row[] rowArray = Iterables.toArray(rows, Row.class);
// Sort array using dates
Arrays
.sort(
rowArray,
Comparator
paringLong(row -> formatter.parseDateTime(row.getString("date")).getMillis())
);
// Store the last amount
Double lastAmount = 0.0;
// Iterate over the array and fill in the missing parts
for (Row row : rowArray) {
// Get current amount
Double currentAmount = row.getDouble("amount");
// If null, fill the previous value
...
}
这篇关于Apache Beam 根据上一行的值更新当前行值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论