Spark 2.2结构流foreach writer jdbc接收器滞后

编程入门 行业动态 更新时间:2024-10-24 20:13:48
本文介绍了Spark 2.2结构流foreach writer jdbc接收器滞后的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我在一个使用spark 2.2结构流传输的项目中,将kafka msg读取到oracle数据库中.流入kafka的消息大约为每秒4000-6000条消息.

i'm in a project using spark 2.2 struct streaming to read kafka msg into oracle database. the message flow into kafka is about 4000-6000 messages per second .

当使用hdfs文件系统作为接收器目标时,它可以正常工作.当使用foreach jdbc writer时,它将随着时间的流逝而产生巨大的延迟.我认为延迟是由foreach循环引起的.

when using hdfs file system as sink destination ,it just works fine. when using foreach jdbc writer,it will have a huge delay over time . I think the lag is caused by foreach loop .

jdbc接收器类(独立的类文件):

the jdbc sink class(stand alone class file):

class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] { val driver = "oracle.jdbc.driver.OracleDriver" var connection: java.sql.Connection = _ var statement: java.sql.PreparedStatement = _ val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)" def open(partitionId: Long, version: Long): Boolean = { Class.forName(driver) connection = java.sql.DriverManager.getConnection(url, user, pwd) connection.setAutoCommit(false) statement = connection.prepareStatement(v_sql) true } def process(value: org.apache.spark.sql.Row): Unit = { statement.setString(1, value(0).toString) statement.setString(2, value(1).toString) statement.setString(3, value(2).toString) statement.setString(4, value(3).toString) statement.executeUpdate() } def close(errorOrNull: Throwable): Unit = { connectionmit() connection.close } }

水槽部分:

val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000") .option("subscribe", "rawdb.raw_data") .option("startingOffsets", "latest") .load() .select($"value".as[Array[Byte]]) .map(avroDeserialize(_)) .filter(some logic).select(some logic) .writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start()

如果我更改了最后一行

.writeStream.format("csv")...

进入jdbc foreach接收器,如下所示:

into jdbc foreach sink as following:

val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))" val user = "user"; val pwd = "password";

val writer = new JDBCSink(url, user, pwd) .writeStream.foreach(writer).outputMode("append").start()

出现延迟.

我猜这个问题很可能是由foreach循环机制引起的-它不是以批处理方式批量处理数千行,或者作为oracle DBA,我已经对oracle数据库端进行了微调,大多数情况下数据库正在等待空闲事件.试图通过设置connection.setAutoCommit(false)避免过多的提交,任何建议将不胜感激.

I guess the problem most likely caused by foreach loop mechanics-it's not in batch mode deal with like several thousands row in a batch ,as an oracle DBA either, I have fine tuned oracle database side ,mostly the database is waiting for idle events . excessive commit is trying to be avoided by setting connection.setAutoCommit(false) already,any suggestion will be much appreciate.

推荐答案

通过将结果注入到另一个Kafka主题中来解决问题,然后编写另一个从新主题中读取的程序,将它们批量写入数据库中.

problem solved by injecting the result into another Kafka topic , then wrote another program read from the new topic write them into database on batches .

我认为在下一个Spark版本中,它们可能会提供jdbc接收器并具有一些参数设置批处理大小.

I think in next spark release,they might provide the jdbc sink and have some parameter setting batch size .

主要代码如下:

写另一个主题:

.writeStream.format("kafka") .option("kafka.bootstrap.servers", "x.x.x.x:9092") .option("topic", "fastdbtest") .option("checkpointLocation", "/user/root/chk") .start()

阅读主题并写入数据库,我正在使用c3p0连接池

read the topic and write to databases,i'm using c3p0 connection pool

lines.foreachRDD(rdd => { if (!rdd.isEmpty) { rdd.foreachPartition(partitionRecords => { //get a connection from connection pool val conn = ConnManager.getManager.getConnection val ps = conn.prepareStatement("insert into sparkdb.t_cf(ENTITYID,CLIENTMAC,STIME,FLAG) values(?,?,?,?)") try { conn.setAutoCommit(false) partitionRecords.foreach(record => { insertIntoDB(ps, record) } ) ps.executeBatch() connmit() } catch { case e: Exception =>{} // do some log } finally { ps.close() conn.close() } }) } })

更多推荐

Spark 2.2结构流foreach writer jdbc接收器滞后

本文发布于:2023-11-26 00:46:30,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631988.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:接收器   结构   Spark   jdbc   writer

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!