Flink之输出算子Data Sink

编程入门 行业动态 更新时间:2024-10-24 12:26:36

Flink之输出<a href=https://www.elefans.com/category/jswz/34/1748093.html style=算子Data Sink"/>

Flink之输出算子Data Sink

Flink之输出算子Data Sink

  • Data Sink
  • 常见输出算子
    • print()
    • printToErr()
    • writeAsText()
    • writeAsCsv()
    • writeToSocket()
  • 常用连接器
    • File Sink连接器
    • Kafka Sink连接器
    • RabbitMQ Sink连接器
    • JDBC Sink连接器
    • Elasticsearch Sink连接器
    • MongoDB Sink连接器
  • 自定义Sink
    • RichSinkFunction
    • SinkFunction
    • 验证测试

Data Sink

在Apache Flink中,输出算子(Data Sink)用于将数据流发送到外部系统或存储介质中,如数据库、消息队列、文件系统、Apache Kafka等,以便进行后续的持久化、分析或其他操作。输出算子是数据流处理的最后一步,它决定了数据的最终去向。

Flink提供了各种内置的输出算子,可支持许多常见的数据存储系统,如print()、printToErr()、writeAsText()等。

Flink还提供了一部分框架的Sink连接器,支持与许多外部系统集成的连接器,如Apache Kafka、Elasticsearch、JDBC、MongoDB等。这些连接器提供了专门的输出算子,可以直接与这些外部系统进行交互。

也可以自定义输出算子来与其他系统进行集成,只需实现 SinkFunction 接口,并将其用作 addSink 方法的参数。

常见输出算子

在使用Flink进行数据处理时,数据经Data Source流入,然后通过系列Transformations的转化,最终可以通过Sink 将计算结果进行输出,Flink Data Sinks就是用于定义数据流最终的输出位置。

Flink 提供了几个较为简单的 Sink API 用于日常的开发,具体如下:

print()

将数据打印到标准输出stdout,不需要额外配置,它会将数据元素以字符串形式直接打印到标准输出上

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");dataStream.print();env.execute("Print Data");

printToErr()

将数据打印到标准错误输出stderr,类似于 print(),但将数据打印到标准错误输出上。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");dataStream.printToErr();env.execute("Print to StdErr");

writeAsText()

将数据写入文本文件,将数据按照行的形式写入文本文件,每行一个数据元素。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");dataStream.writeAsText("/path/to/output.txt"); // 指定输出路径和文件名env.execute("Write as Text");

可以通过指定第二个参数来定义输出模式,它有以下两个可选值:

WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖

并行的方式写出到多个文件:

 streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE);

将输出结果全部写出到一个文件,设置其并行度为1:

streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

writeAsCsv()

用于将计算结果以CSV的文件格式写出到指定目录,除了路径参数是必选外,该方法还支持传入输出模式,行分隔符,和字段分隔符三个额外的参数

DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(new Tuple2<>("Alice", 25),new Tuple2<>("Bob", 30),new Tuple2<>("Charlie", 35)
);// writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) 
dataStream.writeAsCsv("/path/to/output.csv", WriteMode.OVERWRITE, "\n", ",");env.execute("Write as CSV");

writeToSocket()

将数据通过网络 socket 发送到指定的主机和端口。可以用于将数据流输出到外部系统或进行简单的网络通信。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> dataStream = env.fromElements("Hello", "World", "Flink");dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());env.execute("Write to Socket");

常用连接器

连接器可以和多种多样的第三方系统进行交互。Flink官方目前支持以下第三方系统连接器

Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon DynamoDB (sink)
Amazon Kinesis Data Streams (source/sink)
Amazon Kinesis Data Firehose (sink)
DataGen (source)
Elasticsearch (sink)
Opensearch (sink)
FileSystem (sink)
RabbitMQ (source/sink)
Google PubSub (source/sink)
Hybrid Source (source)
Apache Pulsar (source)
JDBC (sink)
MongoDB (source/sink)

除Flink官方之外,还有一些其他第三方系统与Flink的连接器,通过Apache Bahir发布:

Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)

File Sink连接器

Flink提供了一个流式文件系统的连接器FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

File Sink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder,可以直接调用FileSink的静态方法:

// 行编码
FileSink.forRowFormat(basePath,rowEncoder)// 批量编码
FileSink.forBulkFormat(basePath,bulkWriterFactory)

File Sink将传入的数据写入存储桶中。考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的 Part 文件。 完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。

注意:

在 STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,有N个并行度的文件在写入env.setParallelism(2);// 开启checkpoint,否则生成文件有后缀: .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);/*** 生成模拟数据*/DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE, RateLimiterStrategy.perSecond(1000), Types.STRING);// 读取模拟数据DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("D:/temp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("fileSink-").withPartSuffix(".log").build())// 按照目录分桶:每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略:  30秒 或 1m 生成一个文件.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(30)).withMaxPartSize(new MemorySize(1024 * 1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}

Kafka Sink连接器

添加Kafka 连接器依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version>
</dependency>
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 写到kafka的一致性级别如果是精准一次,必须开启checkpointenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> singleOutputStreamOperator = env.socketTextStream("node01", 8888);KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定kafka地址和端口.setBootstrapServers("node01:9092,node01:9092,node03:9092")// 指定序列化器、指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("kafkaSink").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("kafkaSink-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();singleOutputStreamOperator.sinkTo(kafkaSink);env.execute();}

自定义序列化器,实现带key的record

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("node01", 8888);/*** 自定义序列器:* 实现KafkaRecordSerializationSchema接口,重写ProducerRecord序列化方法* 指定key、value,转成字节数组* @return 返回一个 ProducerRecord对象,把key、value放进去*/class MyKafkaRecordSerializa implements KafkaRecordSerializationSchema<String> {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext kafkaSinkContext, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("kafkaSink", key, value);}}KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("node01:9092,node02:9092,node03:9092").setRecordSerializer(new MyKafkaRecordSerializa()).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("kafkaSink-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}

启动一个消费者,查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic ws

RabbitMQ Sink连接器

要在Flink中使用RabbitMQ作为输出连接器,可以使用Flink提供的RabbitMQ Sink。

添加对RabbitMQ连接器的依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>

定义一个用于序列化数据的简单类

public class MyFlinkBean {public String id;public String name;public Integer age;// getter() setter()public MyFlinkBean() {}public MyFlinkBean(String id, String name, Integer age) {this.id = id;this.name = name;this.age = age;}
}
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 构建数据源* 定义要发送到RabbitMQ的数据流*/ArrayList<String> list = new ArrayList<>();for (int i = 0; i < 100; i++) {list.add(UUID.randomUUID() + "," + "flink" + i + "," + i);}DataStreamSource<String> stream = env.fromCollection(list);/*** Map算子转换处理*/class MyMapFunction implements MapFunction<String, MyFlinkBean> {@Overridepublic MyFlinkBean map(String value) throws Exception {String[] data = value.split(",");String uid = data[0].replace("-", "");return new MyFlinkBean(uid, data[1], Integer.valueOf(data[2]));}}SingleOutputStreamOperator<MyFlinkBean> source = stream.map(new MyMapFunction());// 定义RabbitMQ连接配置RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("work").setPassword("12345678").setVirtualHost("/").build();// 将MyFlinkBean对象序列化为字节数组class MySerializationSchema implements SerializationSchema<MyFlinkBean> {@Overridepublic byte[] serialize(MyFlinkBean flinkBean) {return flinkBean.toString().getBytes();}}// 使用RabbitMQ连接器将数据发送到RabbitMQ队列RMQSink<MyFlinkBean> rabbitMQSink = new RMQSink<>(connectionConfig, "flinkQueue", new MySerializationSchema());source.addSink(rabbitMQSink);env.execute("RabbitMQ Example");}

    /*** MQ其他参数配置*/public class MyRMQSinkPublishOptions implements RMQSinkPublishOptions<MyFlinkBean> {private final String exchangeName;private final String routingKey;public MyRMQSinkPublishOptions(String exchangeName, String routingKey) {this.exchangeName = exchangeName;this.routingKey = routingKey;}@Overridepublic String computeRoutingKey(MyFlinkBean element) {// 根据具体逻辑计算路由键return routingKey;}@Overridepublic AMQP.BasicProperties computeProperties(MyFlinkBean a) {// 其他MQ参数配置AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();return basicProperties;}@Overridepublic String computeExchange(MyFlinkBean a) {// 根据具体逻辑计算交换机名称return exchangeName;}}
        // 交换机名称String exchange = "myExchange";// 路由键名称String routingKey = "myRoutingKey";// 使用RabbitMQ连接器将数据发送到RabbitMQ队列RMQSink<MyFlinkBean> rabbitMQSink = new RMQSink<>(connectionConfig, new MySerializationSchema(), new MyRMQSinkPublishOptions(exchange, routingKey), null);source.addSink(rabbitMQSink);

JDBC Sink连接器

添加MySQL驱动

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.20</version>
</dependency>

添加jdbc连接器

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>

定义一个用于序列化数据的简单类

public class MyFlinkBean {public String id;public String name;public Integer age;// getter() setter()public MyFlinkBean() {}public MyFlinkBean(String id, String name, Integer age) {this.id = id;this.name = name;this.age = age;}
}

编写输出到MySQL

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 构建数据源*/ArrayList<String> list = new ArrayList<>();for (int i = 0; i < 100; i++) {list.add(UUID.randomUUID() + "," + "flink" + i + "," + i);}DataStreamSource<String> stream = env.fromCollection(list);/*** Map算子转换处理*/class MyMapFunction implements MapFunction<String, MyFlinkBean> {@Overridepublic MyFlinkBean map(String value) throws Exception {String[] data = value.split(",");String uid = data[0].replace("-", "");return new MyFlinkBean(uid, data[1], Integer.valueOf(data[2]));}}SingleOutputStreamOperator<MyFlinkBean> source = stream.map(new MyMapFunction());/*** 预编译SQL,对SQL填充占位符*/class MyJdbcStatementBuilder implements JdbcStatementBuilder<MyFlinkBean> {@Overridepublic void accept(PreparedStatement preparedStatement, MyFlinkBean myFlinkBean) throws SQLException {preparedStatement.setString(1, myFlinkBean.getId());preparedStatement.setString(2, myFlinkBean.getName());preparedStatement.setInt(3, myFlinkBean.getAge());}}// 定义执行的sqlString sql = "insert into flink values(?,?,?)";/*** 执行选项参数*/JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()// 重试次数.withMaxRetries(3)// 批次的大小:条数.withBatchSize(100)// 批次的时间.withBatchIntervalMs(3000).build();/*** 连接选项参数*/JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/demo?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("123456")// 重试的超时时间.withConnectionCheckTimeoutSeconds(60).build();SinkFunction<MyFlinkBean> jdbcSink = JdbcSink.sink(sql, new MyJdbcStatementBuilder(), jdbcExecutionOptions, jdbcConnectionOptions);source.addSink(jdbcSink);env.execute();}

创建表

CREATE TABLE `flink` (`id` VARCHAR ( 32 ) NOT NULL,`name` VARCHAR ( 10 ) DEFAULT NULL,`age` TINYINT ( 3 ) DEFAULT NULL,
PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4

执行测试

Elasticsearch Sink连接器

注意:

Flink Elasticsearch Sink的每个并行实例使用一个BulkProcessor向集群发送操作请求。 这会在元素批量发送到集群之前进行缓存。 BulkProcessor 一次执行一个批量请求,即不会存在两个并行刷新缓存的操作。

new Elasticsearch7SinkBuilder<String>()// 设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来
.setBulkFlushMaxActions(1)

Elasticsearch Sinks容错:

通过启用Flink checkpoint,Elasticsearch Sink保证至少一次将操作请求发送到Elasticsearch集群。 这是通过在进行 checkpoint时等待BulkProcessor中所有挂起的操作请求来实现。 这有效地保证了在触发checkpoint之前所有的请求被Elasticsearch成功确认,然后继续处理发送到sink的记录。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 5000 毫秒执行一次 checkpoint
env.enableCheckpointing(5000);

Elasticsearch请求处理失败:

ES操作请求可能由于多种原因而失败,包括节点队列容量暂时已满或者要被索引的文档格式错误。 Flink Elasticsearch Sink允许用户通过通过指定一个退避策略来重试请求。

	new Elasticsearch7SinkBuilder<String>()// 启用一个指数退避重试策略,初始延迟为 1000 毫秒且最大重试次数为 5.setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)

配置内部批量处理器:

可以进一步配置内部的 BulkProcessor 关于其如何刷新缓存操作请求的行为:

setBulkFlushMaxActions(int numMaxActions):刷新前最大缓存的操作数。
setBulkFlushMaxSizeMb(int maxSizeMb):刷新前最大缓存的数据量(以兆字节为单位)。
setBulkFlushInterval(long intervalMillis):刷新的时间间隔(不论缓存操作的数量或大小如何)。

配置如何对暂时性请求错误进行重试:

// 退避延迟的类型,CONSTANT 或者 EXPONENTIAL,退避重试次数,退避重试的时间间隔。
// 对于常量延迟来说,此值是每次重试间的间隔。对于指数延迟来说,此值是延迟的初始值
setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int maxRetries, long delayMillis):

示例:
添加Elasticsearch7.x依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.17</version>
</dependency>
public class Demo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每 5000 毫秒执行一次 checkpointenv.enableCheckpointing(5000);// 模拟数据流DataStream<String> input = env.fromElements("data1", "data2", "data3");// 构建 Elasticsearch Sinkinput.sinkTo(new Elasticsearch7SinkBuilder<String>()// 设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来.setBulkFlushMaxActions(1).setHosts(new HttpHost("127.0.0.1", 9200, "http")).setEmitter((element, context, indexer) -> indexer.add(createIndexRequest(element))).build());env.execute("Elasticsearch Sink Example");}/*** 对每个传入的元素执行单个索引请求* 还有DeleteRequest、 UpdateRequest* 创建一个包含要写入的数据的IndexRequest对象,并设置索引名称、文档ID和数据源** @param element* @return*/private static IndexRequest createIndexRequest(String element) {Map<String, Object> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index").id(element).source(json);}
}

MongoDB Sink连接器

Flink 提供了 MongoDB 连接器使用至少一次(At-least-once)的语义在 MongoDB collection 中读取和写入数据。

容错:

默认的写入语义为至少一次 (AT_LEAST_ONCE),但检查点并不是默认开启的,这会导致 Sink 缓冲写入请求,直到完成或 MongoWriter 自动刷新。 默认情况下,MongoWriter 会缓冲 1000 个新增的写入请求。

当开启了 Flink checkpoint,Flink MongoDB Sink 保证至少一次 (at-least-once) 的写入语义. MongoWriter 在检查点时,会确认所有被缓存的写入操作被正确写入

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); 

配置 Mongo Writer:

内部的 MongoWriter 可以使用 MongoSinkBuilder 更精细化地配置来实现不一样的写入行为:

setBatchSize(int batchSize):设置写入的批次大小。 可以设置为 -1 来禁用批式写入。
setBatchIntervalMs(long batchIntervalMs):设置写入的最大间隔时间,单位为毫秒。 可以设置为 -1 来禁用批式写入。

当使用如下设置时,会产生不一样的写入行为:

1.当缓存记录数超过最大批次大小,或者写入时间间隔超过限制时写入

batchSize > 1 and batchInterval > 0

2.仅在检查点写入

batchSize == -1 and batchInterval == -1

3.对每条记录进行写入

batchSize == 1 or batchInterval == 0

示例:

添加依赖到项目:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId><version>1.0.1-1.17</version>
</dependency>
    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟数据流DataStream<String> stream = env.fromElements("data1", "data2", "data3");// 构建 MongoDB SinkMongoSink<String> sink = MongoSink.<String>builder()// 设置 MongoDB 连接字符串.setUri("mongodb://user:password@127.0.0.1:27017")// 设置写入的数据库名称.setDatabase("my_db")// 设置写入的集合名称.setCollection("my_coll")// 默认值:1000。 设置写入的最大批次大小。可以设置为 -1 来禁用批式写入.setBatchSize(1000)// 默认值:1000. 设置写入的最大间隔时间,单位为毫秒。可以设置为 -1 来禁用批式写入.setBatchIntervalMs(1000)// 默认值:3。设置写入失败时最大重试次数.setMaxRetries(3)// 默认值:DeliveryGuarantee.AT_LEAST_ONCE. 设置投递保证。 仅一次(EXACTLY_ONCE)的投递保证暂不支持.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)// 将数据对象转换为 MongoDB 中的文档对象.setSerializationSchema((input, context) -> new InsertOneModel<>(BsonDocument.parse(input))).build();// 将数据流写入 MongoDBstream.sinkTo(sink);env.execute("MongoDB Sink Example");}

自定义Sink

当Flink没有提供可以直接使用的连接器,就只能自定义Sink进行输出。与Source类似,Flink提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

在Flink中有两种主要类型的 Sink,分别是SinkFunctionRichSinkFunction

1.SinkFunction:

SinkFunction是 Flink 提供的一个简单的数据接收器接口。可以通过实现SinkFunction接口来定义接收数据和处理数据的逻辑。

它只有一个核心方法`

void invoke(IN value) throws Exception: 用于接收数据并进行处理。

2.RichSinkFunction:

RichSinkFunction是SinkFunction的子类,它提供了更多的生命周期管理方法和访问上下文的功能。通过继承 RichSinkFunction,可以在接收器的生命周期过程中进行一些初始化或清理操作

它定义了三个额外的方法:

void open(Configuration parameters) throws Exception:初始化资源的方法,可以在接收器开始时执行void close() throws Exception:清理资源的方法,可以在接收器停止时执行void setRuntimeContext(RuntimeContext t):设置运行时上下文对象,可以通过该对象访问一些运行时环境信息

RichSinkFunction

实现RichSinkFunction,重写关键方法invoke(),在方法中实现将流里的数据发送出去的逻辑。

    public static class MySink extends RichSinkFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 初始化操作 例如:创建Kafka生产者}@Overridepublic void close() throws Exception {super.close();// 清理、销毁操作 例如:关闭Kafka生产者}/*** sink的核心逻辑** @param value* @param context* @throws Exception*/@Overridepublic void invoke(String value, Context context) throws Exception {// 来一条数据,调用一次// 处理接收到的数据  例如:将数据发送到Kafka// 打印接收到的数据System.out.println(value);}}

SinkFunction

根据logLevel参数的不同,在invoke()方法中将数据以不同的日志级别发送到日志系统中

    public class LogSinkFunction<T> implements SinkFunction<T> {private final String logLevel; // 日志级别public LogSinkFunction(String logLevel) {this.logLevel = logLevel;}@Overridepublic void invoke(T value, Context context) throws Exception {// 根据日志级别将数据发送到日志系统switch (logLevel) {case "INFO":// 发送到INFO日志break;case "WARN":// 发送到WARN日志break;case "ERROR":// 发送到ERROR日志break;default:// 默认发送到INFO日志break;}}}

验证测试

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<String> source = env.fromElements("1", "2", "3");//        source.addSink(new MySink());source.addSink(new LogSinkFunction<>("INFO"));source.print();env.execute();}

更多推荐

Flink之输出算子Data Sink

本文发布于:2023-12-07 01:45:56,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1669697.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:算子   Flink   Sink   Data

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!