流聚合未写入接收器

编程入门 行业动态 更新时间:2024-10-24 08:28:24
本文介绍了流聚合未写入接收器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我必须处理每天收到的一些文件.该信息具有主键(日期,client_id,operation_id).因此,我创建了一个Stream,该流仅将新数据附加到增量表中:

I have to process some files which arrive to me daily. The information have primary key (date,client_id,operation_id). So I created a Stream which append only new data into a delta table:

operations\ .repartition('date')\ .writeStream\ .outputMode('append')\ .trigger(once=True)\ .option("checkpointLocation", "/mnt/sandbox/operations/_chk")\ .format('delta')\ .partitionBy('date')\ .start('/mnt/sandbox/operations')

这工作正常,但是我需要总结按(date,client_id)分组的信息,因此我创建了另一个从此操作表到新表的流.因此,我尝试将date字段转换为时间戳,以便在编写聚合流时可以使用附加模式:

This is working fine, but i need to summarize this information grouped by (date,client_id), so i created another streaming from this operations table to a new table. So i tried to convert my date field to a timestamp, so i could use append mode while writing an aggregated stream:

import pyspark.sql.functions as F summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations') summarized= summarized.withColumn('timestamp_date',F.to_timestamp('date')) summarized= summarized.withWatermark('timestamp_date','1 second').groupBy('client_id','date','timestamp_date').agg(<lot of aggs>) summarized\ .repartition('date')\ .writeStream\ .outputMode('append')\ .option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\ .trigger(once=True)\ .format('delta')\ .partitionBy('date')\ .start('/mnt/sandbox/summarized')

此代码可以运行,但不会在接收器中写入任何内容.

This code runs, but it does not write anything in the sink.

为什么不将结果写入接收器?

why it isn't writing results into sink?

推荐答案

此处可能有两个问题.

我非常确定问题出在输入内容格式错误的F.to_timestamp('date')导致null.

I'm quite sure that the issue is with F.to_timestamp('date') that gives null due to malformed input.

如果是这样,withWatermark('timestamp_date','1 second')将永远无法物化",并且不会触发任何输出.

If so, withWatermark('timestamp_date','1 second') can never be "materialized" and triggers no output.

您能否spark.read.format('delta').load('/mnt/sandbox/operations')(到read而不是readStream),看看转换是否给出正确的值?

Could you spark.read.format('delta').load('/mnt/sandbox/operations') (to read not to readStream) and see if the conversion gives proper values?

spark.\ read.\ format('delta').\ load('/mnt/sandbox/operations').\ withColumn('timestamp_date',F.to_timestamp('date')).\ show

所有行都使用相同的时间戳

withWatermark('timestamp_date','1 second')也有可能没有完成(因此完成"了聚合),因为所有行都来自同一时间戳,因此时间不会提前.

All Rows Use Same Timestamp

It is also possible that withWatermark('timestamp_date','1 second') does not finishes (and so "completes" an aggregation) because all rows are from the same timestamp so the time does not advance.

您应该具有不同时间戳的行,以便每个timestamp_date的时间概念可以超过'1 second'延迟窗口.

You should have rows with different timestamps so the notion of time per the timestamp_date can get past the '1 second' lateness window.

更多推荐

流聚合未写入接收器

本文发布于:2023-11-26 00:44:59,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631982.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:接收器

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!