Apache Flink:运行许多作业时的性能问题

编程入门 行业动态 更新时间:2024-10-11 07:26:14
本文介绍了Apache Flink:运行许多作业时的性能问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

如果Flink SQL查询数量众多(以下100个),则Flink命令行客户端将在Yarn群集上失败,并显示"JobManager在600000毫秒内未响应",即该作业从未在该群集上启动.

With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 600000 ms" on a Yarn cluster, i.e. the job is never started on the cluster.

  • 在最后一个TaskManager启动之后,JobManager日志什么都没有,除了 调试日志与"ID为5cd95f89ed7a66ec44f2d19eca0592f7的作业不 在JobManager中找到",表明其可能卡住了(创建 ExecutionGraph?).
  • 与本地独立Java程序相同 (最初是高CPU)
  • 注意:structStream中的每一行都包含515 列(许多最终为空),包括具有原始列的列 信息.
  • 在YARN群集中,我们为TaskManager指定18GB,为18GB 对于JobManager,每个5个插槽,并行度为725(分区 在我们的Kafka资料中).
  • JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?).
  • The same works as standalone java program locally (high CPU initially)
  • Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message.
  • In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 5 slots each and parallelism of 725 (partitions in our Kafka source).
select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source from structStream where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' and Outcome='Success' group by tumble(proctime, INTERVAL '1' SECOND), Environment, CollectedTimestamp, EventTimestamp, RawMsg, Source

代码

public static void main(String[] args) throws Exception { FileSystems.newFileSystem(KafkaReadingStreamingJob.class .getResource(WHITELIST_CSV).toURI(), new HashMap<>()); final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment(); final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment); final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment); tableEnv.registerDataStream("structStream", structStream); tableEnv.scan("structStream").printSchema(); for (int i = 0; i < 100; i++) { for (String query : Queries.sample) { // Queries.sample has one query that is above. Table selectQuery = tableEnv.sqlQuery(query); DataStream<Row> selectQueryStream = tableEnv.toAppendStream(selectQuery, Row.class); selectQueryStream.print(); } } // execute program streamingEnvironment.execute("Kafka Streaming SQL"); } private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception { Properties properties = getKafkaProperties(); // TestDeserializer deserializes the JSON to a ROW of string columns (515) // and also adds a column for the raw message. FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties); DataStream<Row> stream = environment.addSource(consumer); return stream; } private static RowTypeInfo getRowTypeInfo() throws Exception { // This has 515 fields. List<String> fieldNames = DDIManager.getDDIFieldNames(); fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer fieldNames.add("proctime"); // Fill typeInformationArray with StringType to all but the last field which is of type Time ..... return new RowTypeInfo(typeInformationArray, fieldNamesArray); } private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60000); env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR)); env.setParallelism(725); return env; } private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception { Properties properties = getKafkaProperties(); // TestDeserializer deserializes the JSON to a ROW of string columns (515) // and also adds a column for the raw message. FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties); DataStream<Row> stream = environment.addSource(consumer); return stream; } private static RowTypeInfo getRowTypeInfo() throws Exception { // This has 515 fields. List<String> fieldNames = DDIManager.getDDIFieldNames(); fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer fieldNames.add("proctime"); // Fill typeInformationArray with StringType to all but the last field which is of type Time ..... return new RowTypeInfo(typeInformationArray, fieldNamesArray); } private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(60000); env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR)); env.setParallelism(725); return env; }

推荐答案

在我看来,这似乎是JobManager重载了太多正在同时运行的作业.我建议将作业分配给更多的JobManagers/Flink群集.

This looks to me as if the JobManager is overloaded with too many concurrently running jobs. I'd suggest to distribute the jobs to more JobManagers / Flink clusters.

更多推荐

Apache Flink:运行许多作业时的性能问题

本文发布于:2023-11-25 11:22:55,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1629548.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:作业   性能   Apache   Flink

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!