Flink Task的数据交换分析

编程入门 行业动态 更新时间:2024-10-08 08:26:56

Flink Task的<a href=https://www.elefans.com/category/jswz/34/1756756.html style=数据交换分析"/>

Flink Task的数据交换分析

本地数据交换

        如果一个InputChannel和其消费的上游ResultPartition所属Task都在同一个TaskManager中运行,那么它们之间的数据交换就在同一个JVM进程内不同线程之间进行,无需通过网络交换。我们已经了解到,ResultSubpartition中的buffer可以通过ResultSubpartitionView进行消费。LocalInputChannel正是用来进行同一JVM进程中不同线程之间的数据交换:

public class LocalInputChannel extends InputChannel implements BufferAvailabilityListener {/** The local partition manager. */private final ResultPartitionManager partitionManager;/** Task event dispatcher for backwards events. */private final TaskEventPublisher taskEventPublisher;/** The consumed subpartition. */private volatile ResultSubpartitionView subpartitionView;//请求消费对应的子分区@Overridevoid requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {boolean retriggerRequest = false;// The lock is required to request only once in the presence of retriggered requests.synchronized (requestLock) {checkState(!isReleased, "LocalInputChannel has been released already");if (subpartitionView == null) {try {//Local,无需网络通信,通过 ResultPartitionManager 创建一个 ResultSubpartitionView//LocalInputChannel 实现了 BufferAvailabilityListener//在有数据时会得到通知,notifyDataAvailable 会被调用,进而将当前 channel 加到 InputGate 的可用 Channel 队列中ResultSubpartitionView subpartitionView = partitionManager.createSubpartitionView(partitionId, subpartitionIndex, this);if (subpartitionView == null) {throw new IOException("Error requesting subpartition.");}// make the subpartition view visiblethis.subpartitionView = subpartitionView;// check if the channel was released in the meantimeif (isReleased) {subpartitionView.releaseAllResources();this.subpartitionView = null;}} catch (PartitionNotFoundException notFound) {if (increaseBackoff()) {retriggerRequest = true;} else {throw notFound;}}}}// Do this outside of the lock scope as this might lead to a// deadlock with a concurrent release of the channel via the// input gate.if (retriggerRequest) {inputGate.retriggerPartitionRequest(partitionId.getPartitionId());}}//读取数据,借助 ResultSubparitionView 消费 ResultSubparition 中的数据@OverrideOptional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {checkError();ResultSubpartitionView subpartitionView = this.subpartitionView;if (subpartitionView == null) {// There is a possible race condition between writing a EndOfPartitionEvent (1) and flushing (3) the Local// channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush notification (4). When// they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue LocalInputChannel after (or// during) it was released during reading the EndOfPartitionEvent (2).if (isReleased) {return Optional.empty();}// this can happen if the request for the partition was triggered asynchronously// by the time trigger// would be good to avoid that, by guaranteeing that the requestPartition() and// getNextBuffer() always come from the same thread// we could do that by letting the timer insert a special "requesting channel" into the input gate's queuesubpartitionView = checkAndWaitForSubpartitionView();}//通过 ResultSubparitionView 获取BufferAndBacklog next = subpartitionView.getNextBuffer();if (next == null) {if (subpartitionView.isReleased()) {throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");} else {return Optional.empty();}}numBytesIn.inc(next.buffer().getSizeUnsafe());numBuffersIn.inc();return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));}//回调,在 ResultSubparition 通知 ResultSubparitionView 有数据可供消费,@Overridepublic void notifyDataAvailable() {//LocalInputChannel 通知 InputGatenotifyChannelNonEmpty();}@Overridevoid sendTaskEvent(TaskEvent event) throws IOException {checkError();checkState(subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");//事件分发if (!taskEventPublisher.publish(partitionId, event)) {throw new IOException("Error while publishing event " + event + " to producer. The producer could not be found.");}}
}

        这里的逻辑相对比较简单,LocalInputChannel实现了InputChannel接口,同时也实现了BufferAvailabilityListener接口。LocalInputChannel通过ResultPartitionManager请求创建和指定ResultSubparition关联的ResultSubparitionView,并以自身作为ResultSubparitionView的回调。这样,一旦ResultSubparition有数据产出时,ResultSubparitionView会得到通知,同时LocalInputChannel的回调函数也会被调用,这样消费者这一端就可以及时获取到数据的生产情况,从而及时地去消费数据。

本地数据交换详细运行模式如下:

  1. SourceStreamTask和OneInputStreamTask被同一个TaskExecutor进行调度执行,其都会在同一个networkEnvironment中注册自己;在network Buffer申请输入输出的独立localbuffer,并向partitionManager注册自己的输出ResultPartition;
  2. 对于下游OneInputStreamTask来说,其会构建自己的输入组件InputGate,其内部会持有真正用于获取上游ResultPartition数据输出的LocalInputChannel;其会在自己的循环获取上游数据的while (running && inputProcessor.processInput())方法中,调用barrierHandler.getNextNonBlocked();其会委托给其内部具体的数据输入组件InputGate来进行阻塞式的获取inputGate.getNextBufferOrEvent();在InputGate具体的获取当中;首先其会调用requestPartitions()去请求上游数据输出的ResultPartition;在此处会区分local和remote模式;在local模式下,InputChannel是在生成端、消费端共用的一个channel;其会通过ResultPartitionManager来尝试创建上游数据BufferConsumer的数据读取组件ResultSubpartitionView(其主要用于在数据生产侧读取对应ResultSubparition产出的数据Buffer,并以自身InputChannel作为ResultSubparitionView的回调availabilityListener);如果此时上游SourceStreamTask并未调度,并未向partitionManager注册自己的输出ResultPartition;那么下游的OneInputStreamTask就会尝试采用timer定时的方式去获取上游的输出数据的ResultSubpartitionView;之后便执行队列阻塞inputChannelsWithData.wait();阻塞等待直到有可用的channel加入到inputChannelsWithData中,并使用inputChannelsWithData.notify()通知此处有可用InputChannel数据可以消费;一旦inputChannelsWithData中有可用的InputChannel可以消费;其便将该InputChannel从inputChannelsWithData中移除(移除后后续便又会执行队列阻塞inputChannelsWithData.wait()),并调用InputChannel.getNextBuffer(),其委托给内部的ResultSubpartitionView用于获取上游的Buffer数据;
  3. SourceStreamTask在初始化过程中,其会先向partitionManager注册自己的输出ResultPartition;所以在local模式下,下游的OneInputStreamTask任务也可以通过同一个partitionManager来获取到SourceStreamTask所能够产出的ResultPartition;并创建和指定ResultSubparition所关联的ResultSubparitionView;在SourceStreamTask产出Buffer数据的时候;其会通过RecordWriter将数据写入到指定的ResultPartition的localBuffer中;并主动或者周期性的触发ResultSubpartition.flush();将该buffer数据可用的消息通知相关联的ResultSubparitionView用于读取该BufferConsumer中的数据;其会通过回调通知availabilityListener.notifyDataAvailable();去回调通知对应的LocalInputChannel;最终其会将该channel加入到inputChannelsWithData中,并调用notify()通知下游OneInputStreamTask中的阻塞获取数据处进行有可用数据channel的数据消费获取;典型的生产者-消费者模型;

Task通过网络进行数据交换

        在Flink中,不同Task之间的网络传输基于Netty实现。NetworkEnvironment中通过ConnectionManager来管理所有的网络的连接,而NettyConnectionManager就是ConnectionManager的具体实现。NettyConnectionManager在启动的时候会创建并启动NettyClient和NettyServer,NettyServer会启动一个服务端监听,等待其它NettyClient的连接:

public class NettyConnectionManager implements ConnectionManager {private final NettyServer server;private final NettyClient client;  // 持有NettyProtocol对象private final NettyBufferPool bufferPool;private final PartitionRequestClientFactory partitionRequestClientFactory;private final boolean isCreditBased;public NettyConnectionManager(NettyConfig nettyConfig) {this.server = new NettyServer(nettyConfig);this.client = new NettyClient(nettyConfig);this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);}@Overridepublic void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {NettyProtocol partitionRequestProtocol = new NettyProtocol(partitionProvider,taskEventDispatcher,client.getConfig().isCreditBasedEnabled());client.init(partitionRequestProtocol, bufferPool);   // 初始化Netty Clientserver.init(partitionRequestProtocol, bufferPool);   // 初始化并启动Netty Server}
}

NettyProtocal中提供了NettyClient和NettyServer引导启动注册的一系列ChannelHandler。

public class NettyProtocol {/*** Returns the server channel handlers.** <pre>* +-------------------------------------------------------------------+* |                        SERVER CHANNEL PIPELINE                    |* |                                                                   |* |    +----------+----------+ (3) write  +----------------------+    |* |    | Queue of queues     +----------->| Message encoder      |    |* |    +----------+----------+            +-----------+----------+    |* |              /|\                                 \|/              |* |               | (2) enqueue                       |               |* |    +----------+----------+                        |               |* |    | Request handler     |                        |               |* |    +----------+----------+                        |               |* |              /|\                                  |               |* |               |                                   |               |* |   +-----------+-----------+                       |               |* |   | Message+Frame decoder |                       |               |* |   +-----------+-----------+                       |               |* |              /|\                                  |               |* +---------------+-----------------------------------+---------------+* |               | (1) client request               \|/* +---------------+-----------------------------------+---------------+* |               |                                   |               |* |       [ Socket.read() ]                    [ Socket.write() ]     |* |                                                                   |* |  Netty Internal I/O Threads (Transport Implementation)            |* +-------------------------------------------------------------------+* </pre>** @return channel handlers*/public ChannelHandler[] getServerChannelHandlers() {// netty server端的ChannelHandlerPartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(partitionProvider, taskEventDispatcher, queueOfPartitionQueues, creditBasedEnabled);return new ChannelHandler[] {messageEncoder,new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),serverHandler,queueOfPartitionQueues};}/*** Returns the client channel handlers.** <pre>*     +-----------+----------+            +----------------------+*     | Remote input channel |            | request client       |*     +-----------+----------+            +-----------+----------+*                 |                                   | (1) write* +---------------+-----------------------------------+---------------+* |               |     CLIENT CHANNEL PIPELINE       |               |* |               |                                  \|/              |* |    +----------+----------+            +----------------------+    |* |    | Request handler     +            | Message encoder      |    |* |    +----------+----------+            +-----------+----------+    |* |              /|\                                 \|/              |* |               |                                   |               |* |    +----------+------------+                      |               |* |    | Message+Frame decoder |                      |               |* |    +----------+------------+                      |               |* |              /|\                                  |               |* +---------------+-----------------------------------+---------------+* |               | (3) server response              \|/ (2) client request* +---------------+-----------------------------------+---------------+* |               |                                   |               |* |       [ Socket.read() ]                    [ Socket.write() ]     |* |                                                                   |* |  Netty Internal I/O Threads (Transport Implementation)            |* +-------------------------------------------------------------------+* </pre>** @return channel handlers*/public ChannelHandler[] getClientChannelHandlers() {// netty client端的ChannelHandlerNetworkClientHandler networkClientHandler =creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :new PartitionRequestClientHandler();return new ChannelHandler[] {messageEncoder,new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),networkClientHandler};}
}

        NettyServer在启动的时候会配置水位线,如果Netty输出缓冲中的字节数超过了高水位值,我们会等到其降到低水位值以下才继续写入数据。通过水位线机制确保不往网络中写入太多数据:

class NettyServer {void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {......// Low and high water marks for flow control// hack around the impossibility (in the current netty version) to set both watermarks at// the same time:final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)final int newLowWaterMark = config.getMemorySegmentSize() + 1;final int newHighWaterMark = 2 * config.getMemorySegmentSize();//配置水位线,确保不往网络中写入太多数据//当输出缓冲中的字节数超过高水位值, 则 Channel.isWritable() 会返回false//当输出缓存中的字节数低于低水位值, 则 Channel.isWritable() 会重新返回trueif (newLowWaterMark > defaultHighWaterMark) {bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);} else { // including (newHighWaterMark < defaultLowWaterMark)bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);}}
}

        当RemoteInputChannel请求一个远端的ResultSubpartition的时候,NettyClient就会发起和请求的ResultSubpartition所在Task的NettyServer的连接,后续所有的数据交换都在这个连接上进行。两个Task之间只会建立一个连接,这个连接会在不同的RemoteInputChannel和ResultSubpartition之间进行复用:

public class NettyConnectionManager implements ConnectionManager {@Overridepublic PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)throws IOException, InterruptedException {//这里实际上会建立和其它 Task 的 Server 的连接//返回的 PartitionRequestClient 中封装了 netty channel 和 channel handlerreturn partitionRequestClientFactory.createPartitionRequestClient(connectionId);}
}class PartitionRequestClientFactory {private final NettyClient nettyClient;private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>();PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {Object entry;PartitionRequestClient client = null;while (client == null) {entry = clients.get(connectionId);if (entry != null) {// 连接已经建立// Existing channel or connecting channelif (entry instanceof PartitionRequestClient) {client = (PartitionRequestClient) entry;} else {ConnectingChannel future = (ConnectingChannel) entry;client = future.waitForChannel();clients.replace(connectionId, future, client);}}else {// No channel yet. Create one, but watch out for a race.// We create a "connecting future" and atomically add it to the map.// Only the thread that really added it establishes the channel.// The others need to wait on that original establisher's future.// 连接创建成功后会回调 handInChannel 方法ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this);Object old = clients.putIfAbsent(connectionId, connectingChannel);if (old == null) {//连接到 Netty ServernettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);client = connectingChannel.waitForChannel(); // 等待连接建立并回调handInChannel方法创建PartitionRequestClientclients.replace(connectionId, connectingChannel, client);} else if (old instanceof ConnectingChannel) {client = ((ConnectingChannel) old).waitForChannel();clients.replace(connectionId, old, client);} else {client = (PartitionRequestClient) old;}}// Make sure to increment the reference count before handing a client// out to ensure correct bookkeeping for channel closing.if (!client.incrementReferenceCounter()) {destroyPartitionRequestClient(connectionId, client);client = null;}}return client;}
}

另外,FlinkBuffer的实现类NetworkBuffer直接继承了Netty的AbstractReferenceCountedByteBuf,这样使得Netty可以直接使用Flink的Buffer,从而避免了在FlinkBuffers和NettyBuffers之间的数据拷贝:

public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Buffer {private final MemorySegment memorySegment;//......@Overrideprotected void deallocate() {//回收当前buffer, LocalBufferPool 实现了 BufferRecycler 接口recycler.recycle(memorySegment);}
}

 

更多推荐

Flink Task的数据交换分析

本文发布于:2024-02-07 10:35:15,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1756382.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数据交换   Flink   Task

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!