Flink写ClickHouse的分布式表

编程入门 行业动态 更新时间:2024-10-09 19:16:07

Flink写ClickHouse的<a href=https://www.elefans.com/category/jswz/34/1770120.html style=分布式表"/>

Flink写ClickHouse的分布式表

  1. 导入 clickhouse-jdbc 依赖
<!-- .yandex.clickhouse/clickhouse-jdbc -->
<dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version>
</dependency>
  1. AbstractClickHouseJDBCOutputFormat.java
package cn.hooli.flink.connector.jdbc.internal;import org.apache.flink.apimon.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;/*** 写出数据到 ClickHouse** @param <T>*/
public abstract class AbstractClickHouseJDBCOutputFormat<T> extends RichOutputFormat<T> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(AbstractClickHouseJDBCOutputFormat.class);// ClickHouse 用户名private final String username;// ClickHouse 密码private final String password;// ClickHouse JDBC 驱动类private final String drivername = "ru.yandex.clickhouse.ClickHouseDriver";// ClickHouse 集群 所有IPpublic final String[] ips;// ClickHouse JDBC 连接集合protected final List<Connection> connectionList;/*** 构造方法** @param username ClickHouse 用户名* @param password ClickHouse 密码* @param ips      ClickHouse 集群 所有IP*/public AbstractClickHouseJDBCOutputFormat(String username, String password, String[] ips) {this.username = username;this.password = password;this.ips = ips;this.connectionList = new ArrayList<>();}/*** 创建 ClickHouse JDBC 连接** @throws SQLException* @throws ClassNotFoundException*/protected void establishConnection() throws SQLException, ClassNotFoundException {Class.forName(drivername);for (String ip : this.ips) {String url = "jdbc:clickhouse://" + ip;Connection connection = DriverManager.getConnection(url, this.username, this.password);this.connectionList.add(connection);}}@Overridepublic void configure(Configuration parameters) {}/*** 关闭 ClickHouse JDBC 连接** @throws IOException*/protected void closeDbConnection() throws IOException {for (Connection connection : this.connectionList) {if (connection != null) {try {connection.close();} catch (SQLException se) {LOG.warn("JDBC connection could not be closed: " + se.getMessage());} finally {connection = null;}}}}
}
  1. ClickHouseJDBCOutputFormat.java
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     .0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package cn.hooli.flink.connector.jdbc.internal;import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;/*** OutputFormat to write Rows into a JDBC database.* The OutputFormat has to be configured using the supplied OutputFormatBuilder.** @see Row* @see DriverManager*/
public class ClickHouseJDBCOutputFormat extends AbstractClickHouseJDBCOutputFormat<Row> {private static final long serialVersionUID = 1L;private static final Logger LOG = LoggerFactory.getLogger(ClickHouseJDBCOutputFormat.class);// SQL 语句private final String query;// PreparedStatementprivate final List<PreparedStatement> preparedStatementList;// ClickHouse 的集群 IP 和 数据进行绑定存储,记录数据写出的 ClickHouse IPprivate final Map<Integer, List<Row>> ipWithDataList;// 满足此时间条件写出数据private final long insertCkTimeInterval; // 4000L// 插入的批次private final int insertCkBatchSize;  // 开发测试用10条// 数据库名private final String dataBaseName;// 数据表名private final String tableName;// 表列名private final String[] tableColumns;private transient volatile boolean closed = false;private transient ScheduledExecutorService scheduler;private transient ScheduledFuture<?> scheduledFuture;private transient volatile Exception flushException;public ClickHouseJDBCOutputFormat(String username, String password, String[] ips, long insertCkTimeInterval, int insertCkBatchSize, String dataBaseName, String tablename, String[] tableColums) {super(username, password, ips);this.insertCkTimeInterval = insertCkTimeInterval;this.insertCkBatchSize = insertCkBatchSize;this.ipWithDataList = new ConcurrentHashMap<>();this.dataBaseName = dataBaseName;this.tableName = tablename;this.tableColumns = tableColums;this.preparedStatementList = Collections.synchronizedList(new ArrayList<>());this.query = StrUtils.clickhouseInsertValue(this.tableColumns, this.tableName, this.dataBaseName);}/*** Connects to the target database and initializes the prepared statement.** @param taskNumber The number of the parallel instance.* @throws IOException Thrown, if the output could not be opened due to an*                     I/O problem.*/@Overridepublic void open(int taskNumber, int numTasks) throws IOException {try {establishConnection();for (Connection connection : connectionList) {PreparedStatement preparedStatement = connection.prepareStatement(query);this.preparedStatementList.add(preparedStatement);}if (insertCkTimeInterval != 0 && insertCkBatchSize != 1) {this.scheduler =Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("jdbc-upsert-output-format"));this.scheduledFuture =this.scheduler.scheduleWithFixedDelay(() -> {synchronized (ClickHouseJDBCOutputFormat.this) {if (!closed) {try {LOG.info("ck_flush.............");flush();} catch (Exception e) {flushException = e;}}}},insertCkTimeInterval,insertCkTimeInterval,TimeUnit.MILLISECONDS);}} catch (SQLException sqe) {throw new IllegalArgumentException("open() failed.", sqe);} catch (ClassNotFoundException cnfe) {throw new IllegalArgumentException("JDBC driver class not found.", cnfe);}}@Overridepublic final synchronized void writeRecord(Row row) throws IOException {checkFlushException();/*** 1. 将数据写入CK*/final int[] size = {0};ipWithDataList.values().forEach(rows -> {size[0] += rows.size();});if (size[0] >= this.insertCkBatchSize) {ipWithDataList.forEach((index, rows) -> {try {flush(rows, preparedStatementList.get(index), connectionList.get(index));LOG.info("insertCkBatchSize");} catch (SQLException e) {throw new RuntimeException("Preparation of JDBC statement failed.", e);}});}/*** 将当前行数据添加到List中*/// 轮询写入各个local表,避免单节点数据过多if (null != row) {Random random = new Random();int index = random.nextInt(super.ips.length);List<Row> rows = ipWithDataList.get(index);if (rows == null) {rows = new ArrayList<>();}rows.add(row);ipWithDataList.put(index, rows);}}private void checkFlushException() {if (flushException != null) {throw new RuntimeException("Writing records to JDBC failed.", flushException);}}// 插入数据private synchronized void flush(List<Row> rows, PreparedStatement preparedStatement, Connection connection) throws SQLException {checkFlushException();for (int i = 0; i < rows.size(); ++i) {Row row = rows.get(i);for (int j = 0; j < this.tableColumns.length; ++j) {if (null != row.getField(j)) {preparedStatement.setObject(j + 1, row.getField(j));} else {preparedStatement.setObject(j + 1, "null");}}preparedStatement.addBatch();}preparedStatement.executeBatch();connectionmit();preparedStatement.clearBatch();rows.clear();}private synchronized void flush() {checkFlushException();ipWithDataList.forEach((index, rows) -> {try {flush(rows, preparedStatementList.get(index), connectionList.get(index));} catch (SQLException e) {// throw new RuntimeException("Preparation of JDBC statement failed.", e);flushException = e;}});}/*** Executes prepared statement and closes all resources of this instance.** @throws IOException Thrown, if the input could not be closed properly.*/@Overridepublic synchronized void close() throws IOException {if (!closed) {closed = true;if (this.scheduledFuture != null) {scheduledFuture.cancel(false);this.scheduler.shutdown();}for (PreparedStatement preparedStatement : this.preparedStatementList) {if (preparedStatement != null) {flush();try {preparedStatement.close();} catch (SQLException e) {// LOG.info("JDBC statement could not be closed: " + e.getMessage());flushException = e;} finally {preparedStatement = null;}}}closeDbConnection();}checkFlushException();}}
  1. ClickHouseJDBCSinkFunction.java
package cn.hooli.flink.connector.jdbc.internal;import org.apache.flink.apimon.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;/*** 数据写出到 ClickHouse*/
public class ClickHouseJDBCSinkFunction extends RichSinkFunction<Row> {final ClickHouseJDBCOutputFormat outputFormat;public ClickHouseJDBCSinkFunction(ClickHouseJDBCOutputFormat outputFormat) {this.outputFormat = outputFormat;}/*** 数据写出** @param value   数据* @param context Flink Context* @throws Exception*/@Overridepublic void invoke(Row value, Context context) throws Exception {outputFormat.writeRecord(value);}/*** 初始化参数化* @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);RuntimeContext ctx = getRuntimeContext();outputFormat.setRuntimeContext(ctx);outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());}/*** 停止写出数据* @throws Exception*/@Overridepublic void close() throws Exception {outputFormat.close();super.close();}
}
  1. StrUtils.java
package cn.hooli.flink.connector.jdbc.internal;/*** @description:* @author: 林志明* @time: 2021/3/31 17:28*/
public class StrUtils {/*** insert数据,ClickHouse 字段的整合。** @param tableColums* @param tablename* @return*/public static String clickhouseInsertValue(String[] tableColums, String tablename,String dataBaseName){StringBuffer sbCloums = new StringBuffer();StringBuffer sbValues = new StringBuffer();for (String s:tableColums) {sbCloums.append(s).append(",");sbValues.append("?").append(",");}String colums=sbCloums.toString().substring(0,sbCloums.toString().length()-1);String values=sbValues.toString().substring(0,sbValues.toString().length()-1);String insertSQL="insert into "+dataBaseName+"."+tablename+" ( "+colums+" ) values ( "+values+")";return insertSQL;}}
  1. FlinkJobApplication.java
package cn.hooli.flink.connector.jdbc.internal;import org.apache.flink.apimon.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;import java.util.Arrays;/*** @description: Flink 写 ClickHouse 的示例* @author: 林志明* @time: 2021/3/31 17:35*/
public class FlinkJobApplication {public static void main(String[] args) throws Exception {// 初始化 StreamExecutionEnvironmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 创建 SourceDataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("a|b|c", "c|d|e", "x|y|z"));// 解析数据 String -> RowDataStream<Row> dataStreamTransform = dataStreamSource.map((MapFunction<String, Row>) value -> {String[] fields = value.split("\\|");Row row = new Row(3);row.setField(0, fields[0]);row.setField(1, fields[1]);row.setField(2, fields[2]);return row;});/*** 写出数据到 ClickHouse** 1. 初始化 ClickHouseJDBCOutputFormat* 2. 初始化 ClickHouseJDBCSinkFunction* 3. 增加 Sink*/ClickHouseJDBCOutputFormat clickHouseJDBCOutputFormat = new ClickHouseJDBCOutputFormat(null,null,new String[]{"1.1.1.1:8123", "1.1.1.2:8123", "1.1.1.3:8123"},10000,1000,"test_db","test_table_local", // 配置 local 表new String[]{"column_1", "column_2", "column_3"});ClickHouseJDBCSinkFunction clickHouseJDBCSinkFunction = new ClickHouseJDBCSinkFunction(clickHouseJDBCOutputFormat);dataStreamTransform.addSink(clickHouseJDBCSinkFunction);env.execute("write_to_clickhouse");}
}

更多推荐

Flink写ClickHouse的分布式表

本文发布于:2024-03-23 20:09:36,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1742307.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分布式   Flink   ClickHouse

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!