flink的副输出sideoutput单元测试

编程入门 行业动态 更新时间:2024-10-22 11:09:39

flink的副输出sideoutput<a href=https://www.elefans.com/category/jswz/34/1771349.html style=单元测试"/>

flink的副输出sideoutput单元测试

背景

处理函数中处理输出主输出的数据流数据外,也可以输出多个其他的副输出的数据流数据,当我们的处理函数有副输出时,我们需要测试他们功能的正确性,本文就提供一个测试flink副输出单元测试的例子

测试flink副输出单元测试

首先看一下处理函数,其中包含副输出逻辑

public class MySideOutputProcessFunction extends ProcessFunction<String, String> {public static final OutputTag<String> OUTPUT_TAG = new OutputTag<String>("sideoutput") {};@Overridepublic void processElement(String value, Context ctx, Collector<String> out) throws Exception {out.collect("normal:" + value);ctx.output(OUTPUT_TAG, "side:" + value);}
}

其次,看下对应的单元测试

/*** 测试sideOutput的输出功能*/
@Test
public void testSideOutput() throws Exception {MySideOutputProcessFunction mySideOutputProcessFunction = new MySideOutputProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forProcessFunction(mySideOutputProcessFunction);testHarness.open();testHarness.processElement("hello", 10);// 测试主输出Assert.assertEquals(Lists.newArrayList("normal:hello"), testHarness.extractOutputValues());ConcurrentLinkedQueue<StreamRecord<String>> sideOutPutQueue =testHarness.getSideOutput(MySideOutputProcessFunction.OUTPUT_TAG);// 测试副输出Assert.assertEquals(Lists.newArrayList("side:hello"),sideOutPutQueue.stream().map(StreamRecord::getValue).collect(Collectors.toList()));testHarness.close();
}

更多推荐

flink的副输出sideoutput单元测试

本文发布于:2023-11-15 00:32:31,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1590520.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:单元测试   flink   sideoutput

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!