Flink通过JDBC写ClickHouse

编程入门 行业动态 更新时间:2024-10-10 01:17:55

<a href=https://www.elefans.com/category/jswz/34/1769678.html style=Flink通过JDBC写ClickHouse"/>

Flink通过JDBC写ClickHouse

Flink写ClickHouse的分布式表
实现方式:数据直接通过JDBC写每个本地表

定义 AbstractClickHouseJDBCOutputFormat

package cn.hooli.flink.api.java.io.jdbc;import org.apache.flink.apimon.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;public abstract class AbstractClickHouseJDBCOutputFormat<T> extends RichOutputFormat<T> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(AbstractClickHouseJDBCOutputFormat.class);private final String username;private final String password;private final String driverName = "ru.yandex.clickhouse.ClickHouseDriver";protected final String[] hosts;protected final List<Connection> connectionList;public AbstractClickHouseJDBCOutputFormat(String username, String password, String[] hosts) {this.username = username;this.password = password;this.hosts = hosts;this.connectionList = new ArrayList<>();}@Overridepublic void configure(Configuration parameters) {}protected void establishConnection() throws SQLException, ClassNotFoundException {Class.forName(driverName);for (String host : this.hosts) {// jdbc:clickhouse://10.138.41.146:8123String url = "jdbc:clickhouse://" + host;Connection connection = DriverManager.getConnection(url, this.username, this.password);this.connectionList.add(connection);}}protected void closeDbConnection() throws IOException {for (Connection connection : this.connectionList) {if (connection != null) {try {connection.close();} catch (SQLException se) {LOG.warn("JDBC connection could not be closed: " + se.getMessage());} finally {connection = null;}}}}
}

定义ClickHouseJDBCOutputFormat

package cn.hooli.flink.api.java.io.jdbc;import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;/*** OutputFormat to write Rows into a JDBC database.* The OutputFormat has to be configured using the supplied OutputFormatBuilder.** @see Row* @see DriverManager*/
public class ClickHouseJDBCOutputFormat extends AbstractClickHouseJDBCOutputFormat<Row> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(ClickHouseJDBCOutputFormat.class);private final String query;private final List<PreparedStatement> preparedStatementList;private final Map<Integer, List<Row>> ipWithDataList;private final long insertCkTimenterval; // 4000L// 插入的批次private final int insertCkBatchSize;  // 开发测试用10条// 上次写入时间private Long lastInsertTime;private final String dataBaseName;private final String tablename;private final String[] tableColums;/**** @param username              用户名* @param password              密码* @param hosts                 格式 {"1.1.1.1:8123", "1.1.1.2:8123", "1.1.1.3:8123"}* @param insertCkTimenterval   flush数据到 ClickHouse (ms)* @param insertCkBatchSize     达到多少条写 ClickHouse* @param dataBaseName          数据库名* @param tablename             表名 (本地表名)* @param tableColums           列名*/public ClickHouseJDBCOutputFormat(String username, String password, String[] hosts, long insertCkTimenterval, int insertCkBatchSize, String dataBaseName, String tablename, String[] tableColums) {super(username, password, hosts);this.insertCkTimenterval = insertCkTimenterval;this.insertCkBatchSize = insertCkBatchSize;this.lastInsertTime = System.currentTimeMillis();this.ipWithDataList = new HashMap<>();this.dataBaseName = dataBaseName;this.tablename = tablename;this.tableColums = tableColums;this.preparedStatementList = new ArrayList<>();this.query = clickhouseInsertValue(this.tableColums, this.tablename, this.dataBaseName);}/*** Connects to the target database and initializes the prepared statement.** @param taskNumber The number of the parallel instance.* @throws IOException Thrown, if the output could not be opened due to an*                     I/O problem.*/@Overridepublic void open(int taskNumber, int numTasks) throws IOException {try {establishConnection();for (Connection connection : connectionList) {PreparedStatement preparedStatement = connection.prepareStatement(query);this.preparedStatementList.add(preparedStatement);}} catch (SQLException sqe) {throw new IllegalArgumentException("open() failed.", sqe);} catch (ClassNotFoundException cnfe) {throw new IllegalArgumentException("JDBC driver class not found.", cnfe);}}@Overridepublic void writeRecord(Row row) throws IOException {/*** 1. 将数据写入CK*/final int[] size = {0};ipWithDataList.values().forEach(rows -> {size[0] += rows.size();});if (size[0] >= this.insertCkBatchSize) {ipWithDataList.forEach((index, rows) -> {try {flush(rows, preparedStatementList.get(index), connectionList.get(index));LOG.info("insertCkBatchSize");} catch (SQLException e) {throw new RuntimeException("Preparation of JDBC statement failed.", e);}});this.lastInsertTime = System.currentTimeMillis();}/*** 将当前行数据添加到List中*/// 轮询写入各个local表,避免单节点数据过多if (null != row) {Random random = new Random();int index = random.nextInt(super.hosts.length);List<Row> rows = ipWithDataList.get(index);if (rows == null) {rows = new ArrayList<>();}rows.add(row);ipWithDataList.put(index, rows);}}// 插入数据public void flush(List<Row> rows, PreparedStatement preparedStatement, Connection connection) throws SQLException {for (int i = 0; i < rows.size(); ++i) {Row row = rows.get(i);for (int j = 0; j < this.tableColums.length; ++j) {if (null != row.getField(j)) {preparedStatement.setObject(j + 1, row.getField(j));} else {preparedStatement.setObject(j + 1, "null");}}preparedStatement.addBatch();}preparedStatement.executeBatch();connectionmit();preparedStatement.clearBatch();rows.clear();}public void snapshotStateFlush() {if (this.isTimeToDoInsert()) {LOG.info("timeToDoInsert");flush();}}public void flush() {ipWithDataList.forEach((index, rows) -> {try {flush(rows, preparedStatementList.get(index), connectionList.get(index));} catch (SQLException e) {throw new RuntimeException("Preparation of JDBC statement failed.", e);}});}/*** 根据时间判断是否插入数据** @return*/private boolean isTimeToDoInsert() {long currTime = System.currentTimeMillis();return currTime - this.lastInsertTime >= this.insertCkTimenterval;}/*** Executes prepared statement and closes all resources of this instance.** @throws IOException Thrown, if the input could not be closed properly.*/@Overridepublic void close() throws IOException {for (PreparedStatement preparedStatement : this.preparedStatementList) {if (null != preparedStatement) {if (preparedStatement != null) {flush();try {preparedStatement.close();} catch (SQLException e) {LOG.info("JDBC statement could not be closed: " + e.getMessage());} finally {preparedStatement = null;}}}}closeDbConnection();}private String clickhouseInsertValue(String[] tableColums, String tablename, String dataBaseName) {StringBuffer sbCloums = new StringBuffer();StringBuffer sbValues = new StringBuffer();for (String s : tableColums) {sbCloums.append(s).append(",");sbValues.append("?").append(",");}String colums = sbCloums.toString().substring(0, sbCloums.toString().length() - 1);String values = sbValues.toString().substring(0, sbValues.toString().length() - 1);String insertSQL = "INSERT INTO " + dataBaseName + "." + tablename + " ( " + colums + " ) VALUES ( " + values + ")";return insertSQL;}}

定义ClickHouseJDBCSinkFunction

package cn.hooli.flink.api.java.io.jdbc;import org.apache.flink.apimon.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;public class ClickHouseJDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {final ClickHouseJDBCOutputFormat outputFormat;public ClickHouseJDBCSinkFunction(ClickHouseJDBCOutputFormat outputFormat) {this.outputFormat = outputFormat;}@Overridepublic void invoke(Row value, Context context) throws Exception {outputFormat.writeRecord(value);}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {outputFormat.snapshotStateFlush();}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);RuntimeContext ctx = getRuntimeContext();outputFormat.setRuntimeContext(ctx);outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());}@Overridepublic void close() throws Exception {outputFormat.close();super.close();}
}

使用方式

// insertCkBatchSize 的计算方式:
// 比如500条为一个批次,并行度是3,每个并行度(task)只要达到 500 / 3 就可以写出
// insertCkBatchSize = 500 / 3logDataStream.addSink(new ClickHouseJDBCSinkFunction(new ClickHouseJDBCOutputFormat(username, password, hosts, insertCkTimenterval, insertCkBatchSize, dataBaseName, tablename, tableColums))
).name("clickhouse-sink");

更多推荐

Flink通过JDBC写ClickHouse

本文发布于:2024-03-23 20:09:52,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1742310.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Flink   JDBC   ClickHouse

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!