Spark增量加载会覆盖旧记录

编程入门 行业动态 更新时间:2024-10-24 18:24:48
本文介绍了Spark增量加载会覆盖旧记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我需要使用Spark(PySpark)对表进行增量加载

I have a requirement to do the incremental loading to a table by using Spark (PySpark)

这里是例子:

第1天

id | value ----------- 1 | abc 2 | def

第2天

id | value ----------- 2 | cde 3 | xyz

预期结果

id | value ----------- 1 | abc 2 | cde 3 | xyz

这可以在关系数据库中轻松完成, 想知道这是否可以在Spark或其他转换工具中完成,例如Presto?

This can be done easily in relational database, Wondering whether this can be done in Spark or other transformational tool, e.g. Presto?

推荐答案

去这里! 第一个数据框:

Here you go! First Dataframe:

>>> list1 = [(1, 'abc'),(2,'def')] >>> olddf = spark.createDataFrame(list1, ['id', 'value']) >>> olddf.show(); +---+-----+ | id|value| +---+-----+ | 1| abc| | 2| def| +---+-----+

第二个数据框:

>>> list2 = [(2, 'cde'),(3,'xyz')] >>> newdf = spark.createDataFrame(list2, ['id', 'value']) >>> newdf.show(); +---+-----+ | id|value| +---+-----+ | 2| cde| | 3| xyz| +---+-----+

现在使用完全外部联接来联接和合并这两个数据名望,并在选择并可以使用用户定义的值替换空值时使用合并功能.

Now join and merge these two datafame using full outer join and use coalesce function while select and can replace the null values wih user defined values.

from pyspark.sql.functions import * >>> df = olddf.join(newdf, olddf.id == newdf.id,'full_outer').select(coalesce(olddf.id,newdf.id).alias("id"),coalesce(newdf.value,olddf.value).alias("value")) >>> df.show(); +---+-----+ | id|value| +---+-----+ | 1| abc| | 3| xyz| | 2| cde| +---+-----+

我希望这可以解决您的问题. :-)

I hope this should solve your problem. :-)

更多推荐

Spark增量加载会覆盖旧记录

本文发布于:2023-08-01 08:53:39,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1267018.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:增量   加载   Spark

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!