无法通过StreamExecutionEnvironment使用S3接收器写入S3

编程入门 行业动态 更新时间:2024-10-12 01:30:36
本文介绍了无法通过StreamExecutionEnvironment使用S3接收器写入S3-Apache Flink 1.1.4的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我创建了一个简单的Apache Flink项目,该项目将从Kafka主题读取数据并将该数据写入S3存储桶.运行项目时,我没有收到任何错误,它成功读取了Kafka主题中的每条消息,但是没有任何内容写入我的S3存储桶.没有错误,因此很难尝试和调试正在发生的事情.以下是我的项目和配置.这仅在我使用StreamExecutionEnviornment时发生.如果我尝试使用常规的批处理ExecutionEnviornment来生产S3,那么它将起作用.

I have created a simple Apache Flink project that will read data from a Kafka topic and write that data to an S3 bucket. I do not receive any errors when I run the project and it successfully reads each message from the Kafka topic, but nothing is written to my S3 bucket. There are no errors so it is difficult to try and debug what is going on. Below is my project and my configurations. This is only occurring when I am using a StreamExecutionEnviornment. If I try to just produce to S3 using a regular batch ExecutionEnviornment it works.

S3测试Java程序

public class S3Test { public static void main(String[] args) throws Exception { // parse input arguments final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]); if(parameterTool.getNumberOfParameters() < 4) { System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " + "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>"); return; } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds env.getConfig().setGlobalJobParameters(parameterTool); //make parameters available in the web interface DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer09<String>( parameterTool.getRequired("kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties())); // write kafka stream to standard out. //messageStream.print(); String id = UUID.randomUUID().toString(); messageStream.writeAsText("s3://flink-data/" + id + ".txt").setParallelism(1); env.execute("Write to S3 Example"); } }

pom.xml

<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.10</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>1.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.2.5</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.2.5</version> </dependency> <!-- Apache Kafka Dependencies --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies>

core-site.xml(Hadoop配置)

<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <!-- Comma separated list of local directories used to buffer large results prior to transmitting them to S3. --> <property> <name>fs.s3a.buffer.dir</name> <value>/tmp</value> </property> <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> <property> <name>fs.s3a.access.key</name> <value>***************</value> </property> <!-- set your AWS access key --> <property> <name>fs.s3a.secret.key</name> <value>****************</value> </property> </configuration>

推荐答案

要通过Flink从Kafka主题保留到S3,需要使用RollingSink. RollingSink使用Bucketer来指定零件文件将被保存到的目录的名称. DateTime是默认的Bucketer,但您也可以创建一个自定义的.只要达到最大批处理大小,就会保存并关闭零件文件,然后将创建一个新的零件文件.下面的代码有效:

Persisting from Kafka topic to S3 via Flink requires the use of the RollingSink. RollingSink uses Bucketer to specify the name of the directories to which the part files will be saved. DateTime is the default Bucketer, but you can also create a custom one. Part files will be saved and closed whenever the max batch size is reached and then a new part file will be created. The code below works:

public class TestRollingSink { public static void main(String[] args){ Map<String, String> configs = ConfigUtils.loadConfigs("/Users/path/to/config.yaml"); final ParameterTool parameterTool = ParameterTool.fromMap(configs); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setGlobalJobParameters(parameterTool); env.socketTextStream("localhost", 9092); DataStream<String> parsed = env .addSource(new FlinkKafkaConsumer09<String>( parameterTool.getRequired("kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties())); env.enableCheckpointing(2000, CheckpointingMode.AT_LEAST_ONCE); RollingSink<String> sink = new RollingSink<String>("s3://flink-test/"+"TEST"); sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")); sink.setWriter(new StringWriter<String>()); sink.setBatchSize(200); sink.setPendingPrefix("file-"); sink.setPendingSuffix(".txt"); parsed.print(); parsed.addSink(sink).setParallelism(1); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } }

}

更多推荐

无法通过StreamExecutionEnvironment使用S3接收器写入S3

本文发布于:2023-11-24 11:05:08,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1624950.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:接收器   StreamExecutionEnvironment

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!