本文介绍了单个作业中的Flink EXECUTE语句集和数据流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
不知何故,我无法在单个环境中执行语句集和可查询流,如果我的最后一条语句是flinkEnv.ecute,它将执行可查询流,而不执行语句集中的其他语句,反之亦然
val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(); val tableEnv = StreamTableEnvironment.create(flinkEnv); val statementSet = tableEnv.createStatementSet(); statementSet.addInsertSql("INSERT INTO OUTPUT (SELECT * FROM INPUT_TRANSFORFM)") tableEnv.toChangelogStream(tableEnv.sqlQuery("SELECT * FROM OUTPUT")).keyBy(row -> row.getField(0)).asQueryableState("OUTPUT_CHANGELOG_STATE"); flinkEnv.execute("job"); // only execute queryable operator //statementSet.execute(); // only execute insert statement, not queryable stateOUTPUT表定义为upsert-kafka->;OUPUT(pkey, name)
接口推荐答案当前限制此功能。即使使用较低的层也是可能的。
本ticket跟踪语句集+输出到数据流API的用例。
作为一种解决办法,您可以使用toChangelogStreamforsqlQuery("SELECT * FROM INPUT_TRANSFORM"),然后使用fromChangelogStream再次将其注册为表,以便在多个sqlQuery()s中引用同一管道。
更多推荐
单个作业中的Flink EXECUTE语句集和数据流
发布评论