admin管理员组文章数量:1648597
1.概述
转载:Flink 1.12.2 源码浅析 : TaskExecutor
TaskExecutor 是TaskManger的具体实现.
二 .TaskExecutorGateway
TaskExecutor 是TaskManager的具体实现, 首先看网关都实现了什么逻辑. 清单如下
2.1. 类图
2.2. 接口清单
名称 | 描述 |
---|---|
CompletableFuture requestSlot( SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, String targetAddress, ResourceManagerId resourceManagerId, @RpcTimeout Time timeout) | 从TaskManager请求slot |
requestTaskBackPressure | 获取任务背压相关信息 |
submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout) | [核心]提交任务 |
updatePartitions | 更改任务的 分区 |
releaseOrPromotePartitions | 批量发布/升级中间结果分区。 |
releaseClusterPartitions | 释放属于任何给定数据集的所有群集分区 |
triggerCheckpoint | **触发给定任务的checkpoint。**checkpoint由checkpointID和checkpoint时间戳标识。 |
confirmCheckpoint | 确认给定任务的checkpoint。 checkpoint由checkpointID和checkpoint时间戳标识。 |
abortCheckpoint | 终止Checkpoint |
cancelTask | 取消任务 |
heartbeatFromJobManager | JobManager心跳请求 |
heartbeatFromResourceManager | ResourceManager心跳请求 |
disconnectJobManager | 断开给定JobManager与TaskManager的连接。 |
disconnectResourceManager | 建立给定ResourceManager与TaskManager的连接。 |
freeSlot | 释放slot |
requestFileUploadByType | 请求将指定类型的文件上载到集群的{@link BlobServer}。 |
requestFileUploadByName | 请求将指定名称的文件上载到集群的{@link BlobServer}。 |
requestMetricQueryServiceAddress | 返回TaskManager上度量查询服务的网关。 |
canBeReleased | 检查是否可以释放任务执行器。如果有未使用的结果分区,则不能释放它。 |
requestLogList | 请求TaskManager上的历史日志文件名。 |
sendOperatorEventToTask | 向Task发送Operator Event |
requestThreadDump | 请求TaskManager 的thread dump 信息 |
三 .代码浅析
3.1. 属性
3.1.1. 服务相关
// HA
/** The access to the leader election and retrieval services. */
private final HighAvailabilityServices haServices;
// TaskExecutor 相关的服务比如: MemoryManager , IOManager ,ShuffleEnvironment 等等
private final TaskManagerServices taskExecutorServices;
/**
* The task manager configuration.
* */
private final TaskManagerConfiguration taskManagerConfiguration;
/** The fatal error handler to use in case of a fatal error. */
private final FatalErrorHandler fatalErrorHandler;
// BLOB缓存提供对永久和临时BLOB的BLOB服务的访问。
private final BlobCacheService blobCacheService;
private final LibraryCacheManager libraryCacheManager;
/** The address to metric query service on this Task Manager. */
@Nullable private final String metricQueryServiceAddress;
3.1.2. TaskManager相关服务
/**
* 此任务管理器的连接信息。
* The connection information of this task manager. */
private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;
private final TaskManagerMetricGroup taskManagerMetricGroup;
/**
* 此任务的状态管理器,为每个插槽提供状态管理器。
* The state manager for this task, providing state managers per slot. */
private final TaskExecutorLocalStateStoresManager localStateStoresManager;
/** Information provider for external resources. */
private final ExternalResourceInfoProvider externalResourceInfoProvider;
/** The network component in the task manager. */
private final ShuffleEnvironment<?, ?> shuffleEnvironment;
/** The kvState registration service in the task manager. */
private final KvStateService kvStateService;
private final Executor ioExecutor;
3.1.3. 任务slot分配表
private final TaskSlotTable<Task> taskSlotTable;
private final JobTable jobTable;
private final JobLeaderService jobLeaderService;
private final LeaderRetrievalService resourceManagerLeaderRetriever;
3.1.4. resource manager 相关
// resource manager 相关
@Nullable private ResourceManagerAddress resourceManagerAddress;
@Nullable private EstablishedResourceManagerConnection establishedResourceManagerConnection;
@Nullable private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@Nullable private UUID currentRegistrationTimeoutId;
private Map<JobID, Collection<CompletableFuture<ExecutionState>>>
taskResultPartitionCleanupFuturesPerJob = new HashMap<>(8);
3.1.5. 其他
// 硬件描述信息
private final HardwareDescription hardwareDescription;
// 内存配置信息
private final TaskExecutorMemoryConfiguration memoryConfiguration;
// 文件缓存
private FileCache fileCache;
// jobManager 心跳相关
/** The heartbeat manager for job manager in the task manager. */
private final HeartbeatManager<AllocatedSlotReport, TaskExecutorToJobManagerHeartbeatPayload>
jobManagerHeartbeatManager;
// resource manager 心跳相关
/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload>
resourceManagerHeartbeatManager;
// 分区相关
private final TaskExecutorPartitionTracker partitionTracker;
// 背压相关
private final BackPressureSampleService backPressureSampleService;
3.2. 核心方法
3.2.1. requestSlot
ResourceManager中的SlotManager的调用requestSlot接口向TaskExecutor请求slot .
org.apache.flink.runtime.taskexecutor.TaskExecutor#requestSlot
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the
// instance/registration Id
// 输出日志信息
// Receive slot request
// 3755cb8f9962a9a7738db04f2a02084c
// for job
// 694474d11da6100e82744c9e47e2f511
// from resource manager with leader id
// 00000000000000000000000000000000.
log.info(
"Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId,
jobId,
resourceManagerId);
// 是否连接到 ResourceManager
if (!isConnectedToResourceManager(resourceManagerId)) {
final String message =
String.format(
"TaskManager is not connected to the resource manager %s.",
resourceManagerId);
log.debug(message);
return FutureUtils.completedExceptionally(new TaskManagerException(message));
}
try {
//[重点] 分配 slot
allocateSlot(slotId, jobId, allocationId, resourceProfile);
} catch (SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}
final JobTable.Job job;
try {
// 获取/构建 JobTable.Job
job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
} catch (Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}
// release local state under the allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(new Exception("Could not free slot " + slotId));
}
return FutureUtils.completedExceptionally(
new SlotAllocationException("Could not create new job.", e));
}
if (job.isConnected()) {
//[重要] 向JobManager提供Slot
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#allocateSlot(slotId, jobId, allocationId, resourceProfile);
private void allocateSlot(
SlotID slotId, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile)
throws SlotAllocationException {
// slotId = {SlotID@6055} "container_1619273419318_0032_01_000002_0"
// resourceId = {ResourceID@6114} "container_1619273419318_0032_01_000002"
// slotNumber = 0
// jobId = {JobID@6056} "05fdf1bc744b274be1525c918c1ad378"
// allocationId = {AllocationID@6057} "a9ce7abc6f1d6f264dbdce5564efcb76"
// resourceProfile = {ResourceProfile@6058} "ResourceProfile{UNKNOWN}"
// cpuCores = null
// taskHeapMemory = null
// taskOffHeapMemory = null
// managedMemory = null
// networkMemory = null
// extendedResources = {HashMap@6116} size = 0
// taskSlotTable = {TaskSlotTableImpl@6077}
// numberSlots = 4
// defaultSlotResourceProfile = {ResourceProfile@6124} "ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=96.000mb (100663293 bytes), taskOffHeapMemory=0 bytes, managedMemory=128.000mb (134217730 bytes), networkMemory=32.000mb (33554432 bytes)}"
// cpuCores = {CPUResource@6139} "Resource(CPU: 1.0000000000000000)"
// taskHeapMemory = {MemorySize@6140} "100663293 bytes"
// taskOffHeapMemory = {MemorySize@6141} "0 bytes"
// managedMemory = {MemorySize@6142} "134217730 bytes"
// networkMemory = {MemorySize@6143} "32 mb"
// extendedResources = {HashMap@6144} size = 0
// memoryPageSize = 32768
// timerService = {TimerService@6125}
// taskSlots = {HashMap@6126} size = 0
// allocatedSlots = {HashMap@6127} size = 0
// taskSlotMappings = {HashMap@6128} size = 0
// slotsPerJob = {HashMap@6129} size = 0
// slotActions = {TaskExecutor$SlotActionsImpl@6130}
// state = {TaskSlotTableImpl$State@6131} "RUNNING"
// budgetManager = {ResourceBudgetManager@6132}
// closingFuture = {CompletableFuture@6133} "java.util.concurrent.CompletableFuture@9a6e076[Not completed]"
// mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@6096}
// memoryVerificationExecutor = {ThreadPoolExecutor@6076} "java.util.concurrent.ThreadPoolExecutor@da5c1a9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]"
// if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
// 进行分配操作..
// TaskSlotTableImpl # allocateSlot
if (taskSlotTable.allocateSlot(
slotId.getSlotNumber(),
jobId,
allocationId,
resourceProfile,
taskManagerConfiguration.getTimeout())) {
// Allocated slot for 3755cb8f9962a9a7738db04f2a02084c.
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
throw new SlotAllocationException("Could not allocate slot.");
}
} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
final String message =
"The slot " + slotId + " has already been allocated for a different job.";
log.info(message);
final AllocationID allocationID =
taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
throw new SlotOccupiedException(
message, allocationID, taskSlotTable.getOwningJob(allocationID));
}
}
org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager(jobId);
// ------------------------------------------------------------------------
// Internal job manager connection methods
// ------------------------------------------------------------------------
private void offerSlotsToJobManager(final JobID jobId) {
// 向JobManager提供Slot : internalOfferSlotsToJobManager
jobTable.getConnection(jobId).ifPresent(this::internalOfferSlotsToJobManager);
}
private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
// 获取JobID
final JobID jobId = jobManagerConnection.getJobId();
// JobID是否已经分配
if (taskSlotTable.hasAllocatedSlots(jobId)) {
// Offer reserved slots to the leader of job 694474d11da6100e82744c9e47e2f511.
log.info("Offer reserved slots to the leader of job {}.", jobId);
// 获取JobMaster 的 Gateway
final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
// 获取 分配给jobId 的所有 TaskSlot
final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
// 获取 JobMasterId
final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();
// 保留的Slot
final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
while (reservedSlotsIterator.hasNext()) {
SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
reservedSlots.add(offer);
}
// offerSlots
// Offers the given slots to the job manager.
// The response contains the set of accepted slots.
// JobMaster#offerSlots
CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture =
jobMasterGateway.offerSlots(
getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout());
// 异步操作. 处理响应请求,处理异常 || 标记为 slot 状态为active
acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots),
getMainThreadExecutor());
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
}
3.2.2. freeSlot
Frees the slot with the given allocation ID.
@Override
public CompletableFuture<Acknowledge> freeSlot(
AllocationID allocationId, Throwable cause, Time timeout) {
freeSlotInternal(allocationId, cause);
return CompletableFuture.completedFuture(Acknowledge.get());
}
private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
checkNotNull(allocationId);
log.debug("Free slot with allocation id {} because: {}", allocationId, cause.getMessage());
try {
final JobID jobId = taskSlotTable.getOwningJob(allocationId);
// 获取slot 索引的下标.
final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);
if (slotIndex != -1) {
if (isConnectedToResourceManager()) {
// 获取ResourceManager
// the slot was freed. Tell the RM about it
ResourceManagerGateway resourceManagerGateway =
establishedResourceManagerConnection.getResourceManagerGateway();
// 通知RM slot释放.
resourceManagerGateway.notifySlotAvailable(
establishedResourceManagerConnection.getTaskExecutorRegistrationId(),
new SlotID(getResourceID(), slotIndex),
allocationId);
}
if (jobId != null) {
closeJobManagerConnectionIfNoAllocatedResources(jobId);
}
}
} catch (SlotNotFoundException e) {
log.debug("Could not free slot for allocation id {}.", allocationId, e);
}
// 本地存储清空
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
}
3.2.3. submitTask
核心的就是构造一个Task, 然后交由线程Thread执行.
// ----------------------------------------------------------------------
// Task lifecycle RPCs
// 提交 任务 !!!
// ----------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
try {
final JobID jobId = tdd.getJobId();
final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();
final JobTable.Connection jobManagerConnection =
jobTable.getConnection(jobId)
.orElseThrow(
() -> {
final String message =
"Could not submit task because there is no JobManager "
+ "associated for the job "
+ jobId
+ '.';
log.debug(message);
return new TaskSubmissionException(message);
});
if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
final String message =
"Rejecting the task submission because the job manager leader id "
+ jobMasterId
+ " does not match the expected job manager leader id "
+ jobManagerConnection.getJobMasterId()
+ '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
if (!taskSlotTable.tryMarkSlotActive(jobId, tdd.getAllocationId())) {
final String message =
"No task slot allocated for job ID "
+ jobId
+ " and allocation ID "
+ tdd.getAllocationId()
+ '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
// re-integrate offloaded data:
try {
tdd.loadBigData(blobCacheService.getPermanentBlobService());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
}
// deserialize the pre-serialized information
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
jobInformation =
tdd.getSerializedJobInformation()
.deserializeValue(getClass().getClassLoader());
taskInformation =
tdd.getSerializedTaskInformation()
.deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not deserialize the job or task information.", e);
}
if (!jobId.equals(jobInformation.getJobId())) {
throw new TaskSubmissionException(
"Inconsistent job ID information inside TaskDeploymentDescriptor ("
+ tdd.getJobId()
+ " vs. "
+ jobInformation.getJobId()
+ ")");
}
TaskMetricGroup taskMetricGroup =
taskManagerMetricGroup.addTaskForJob(
jobInformation.getJobId(),
jobInformation.getJobName(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskInformation.getTaskName(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber());
InputSplitProvider inputSplitProvider =
new RpcInputSplitProvider(
jobManagerConnection.getJobManagerGateway(),
taskInformation.getJobVertexId(),
tdd.getExecutionAttemptId(),
taskManagerConfiguration.getTimeout());
final TaskOperatorEventGateway taskOperatorEventGateway =
new RpcTaskOperatorEventGateway(
jobManagerConnection.getJobManagerGateway(),
executionAttemptID,
(t) -> runAsync(() -> failTask(executionAttemptID, t)));
TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
GlobalAggregateManager aggregateManager =
jobManagerConnection.getGlobalAggregateManager();
LibraryCacheManager.ClassLoaderHandle classLoaderHandle =
jobManagerConnection.getClassLoaderHandle();
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier =
jobManagerConnection.getResultPartitionConsumableNotifier();
PartitionProducerStateChecker partitionStateChecker =
jobManagerConnection.getPartitionStateChecker();
final TaskLocalStateStore localStateStore =
localStateStoresManager.localStateStoreForSubtask(
jobId,
tdd.getAllocationId(),
taskInformation.getJobVertexId(),
tdd.getSubtaskIndex());
final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();
// 构造 TaskStateManager
final TaskStateManager taskStateManager =
new TaskStateManagerImpl(
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
taskRestore,
checkpointResponder);
MemoryManager memoryManager;
try {
memoryManager = taskSlotTable.getTaskMemoryManager(tdd.getAllocationId());
} catch (SlotNotFoundException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
// 构造一个新的Task
Task task =
new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
tdd.getTargetSlotNumber(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getExecutor());
taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED, task::isBackPressured);
// Received task
// Window(
// TumblingProcessingTimeWindows(5000),
// ProcessingTimeTrigger,
// ReduceFunction$1, PassThroughWindowFunction
// ) ->
// Sink: Print to Std. Out (1/1)#0 (141dd597dc560a831b2b4bc195943f0b),
//
// deploy into slot with allocation id
// 3755cb8f9962a9a7738db04f2a02084c.
log.info(
"Received task {} ({}), deploy into slot with allocation id {}.",
task.getTaskInfo().getTaskNameWithSubtasks(),
tdd.getExecutionAttemptId(),
tdd.getAllocationId());
boolean taskAdded;
try {
taskAdded = taskSlotTable.addTask(task);
} catch (SlotNotFoundException | SlotNotActiveException e) {
throw new TaskSubmissionException("Could not submit task.", e);
}
if (taskAdded) {
// 启动线程
task.startTaskThread();
setupResultPartitionBookkeeping(
tdd.getJobId(), tdd.getProducedPartitions(), task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message =
"TaskManager already contains a task for id " + task.getExecutionId() + '.';
log.debug(message);
throw new TaskSubmissionException(message);
}
} catch (TaskSubmissionException e) {
return FutureUtils.completedExceptionally(e);
}
}
3.2.4. updatePartitions
根据分区信息 org.apache.flink.runtime.iowork.NettyShuffleEnvironment#updatePartitionInfo 同步分区相关信息.
@Override
public CompletableFuture<Acknowledge> updatePartitions(
final ExecutionAttemptID executionAttemptID,
Iterable<PartitionInfo> partitionInfos,
Time timeout) {
// 获取任务
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
// 迭代分区信息
for (final PartitionInfo partitionInfo : partitionInfos) {
// Run asynchronously because it might be blocking
FutureUtils.assertNoException(
CompletableFuture.runAsync(
() -> {
try {
// 更改分区信息
if (!shuffleEnvironment.updatePartitionInfo( executionAttemptID, partitionInfo)) {
log.debug(
"Discard update for input gate partition {} of result {} in task {}. "
+ "The partition is no longer available.",
partitionInfo
.getShuffleDescriptor()
.getResultPartitionID(),
partitionInfo.getIntermediateDataSetID(),
executionAttemptID);
}
} catch (IOException | InterruptedException e) {
log.error(
"Could not update input data location for task {}. Trying to fail task.",
task.getTaskInfo().getTaskName(),
e);
task.failExternally(e);
}
},
getRpcService().getExecutor()));
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
log.debug(
"Discard update for input partitions of task {}. Task is no longer running.",
executionAttemptID);
return CompletableFuture.completedFuture(Acknowledge.get());
}
}
3.2.5. triggerCheckpoint
最终是调用task的triggerCheckpointBarrier方法, 触发Checkpoint .
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
log.debug(
"Trigger checkpoint {}@{} for {}.",
checkpointId,
checkpointTimestamp,
executionAttemptID);
// 获取CheckpointType
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (checkpointType.getPostCheckpointAction() == PostCheckpointAction.TERMINATE
&& !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
// 获取任务
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
// 调用task的triggerCheckpointBarrier方法, 触发chckpoint
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message =
"TaskManager received a checkpoint request for unknown task "
+ executionAttemptID
+ '.';
log.debug(message);
return FutureUtils.completedExceptionally(
new CheckpointException(
message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
3.2.6. confirmCheckpoint
通过task的 notifyCheckpointComplete 方法 . Checkpoint完成
@Override
public CompletableFuture<Acknowledge> confirmCheckpoint(
ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
log.debug(
"Confirm checkpoint {}@{} for {}.",
checkpointId,
checkpointTimestamp,
executionAttemptID);
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
// 通过task的 notifyCheckpointComplete 方法 . Checkpoint完成
task.notifyCheckpointComplete(checkpointId);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message =
"TaskManager received a checkpoint confirmation for unknown task "
+ executionAttemptID
+ '.';
log.debug(message);
return FutureUtils.completedExceptionally(
new CheckpointException(
message,
CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
}
}
3.2.7. abortCheckpoint
通过task的 notifyCheckpointAborted 方法 . Checkpoint取消
@Override
public CompletableFuture<Acknowledge> abortCheckpoint(
ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
log.debug(
"Abort checkpoint {}@{} for {}.",
checkpointId,
checkpointTimestamp,
executionAttemptID);
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
// abortCheckpoint
task.notifyCheckpointAborted(checkpointId);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message =
"TaskManager received an aborted checkpoint for unknown task "
+ executionAttemptID
+ '.';
log.debug(message);
return FutureUtils.completedExceptionally(
new CheckpointException(
message,
CheckpointFailureReason.UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE));
}
}
3.2.8. heartbeatFromJobManager
JobManager的心跳相关信息
@Override
public void heartbeatFromJobManager(
ResourceID resourceID, AllocatedSlotReport allocatedSlotReport) {
jobManagerHeartbeatManager.requestHeartbeat(resourceID, allocatedSlotReport);
}
3.2.9. heartbeatFromResourceManager
ResourceManager的心跳相关信息
@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {
resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
3.2.10. requestFileUploadByType
根据类型请求文件的上传路径…
@Override
public CompletableFuture<TransientBlobKey> requestFileUploadByType(
FileType fileType, Time timeout) {
final String filePath;
switch (fileType) {
case LOG:
filePath = taskManagerConfiguration.getTaskManagerLogPath();
break;
case STDOUT:
filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
break;
default:
filePath = null;
}
return requestFileUploadByFilePath(filePath, fileType.toString());
}
3.2.11. requestFileUploadByName
@Override
public CompletableFuture<TransientBlobKey> requestFileUploadByName(
String fileName, Time timeout) {
final String filePath;
final String logDir = taskManagerConfiguration.getTaskManagerLogDir();
if (StringUtils.isNullOrWhitespaceOnly(logDir)
|| StringUtils.isNullOrWhitespaceOnly(fileName)) {
filePath = null;
} else {
// 根据 taskManagerLogDir + 文件的名字 获取路径
filePath = new File(logDir, new File(fileName).getName()).getPath();
}
return requestFileUploadByFilePath(filePath, fileName);
}
3.2.12. sendOperatorEventToTask
@Override
public CompletableFuture<Acknowledge> sendOperatorEventToTask(
ExecutionAttemptID executionAttemptID,
OperatorID operatorId,
SerializedValue<OperatorEvent> evt) {
log.debug("Operator event for {} - {}", executionAttemptID, operatorId);
// 获取Task
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task == null) {
return FutureUtils.completedExceptionally(
new TaskNotRunningException(
"Task " + executionAttemptID + " not running on TaskManager"));
}
try {
// 发送 OperatorEvent 给 task
task.deliverOperatorEvent(operatorId, evt);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalError(t);
return FutureUtils.completedExceptionally(t);
}
}
3.2.13. requestThreadDump
请求获取Thread线程的信息…
@Override
public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time timeout) {
final Collection<ThreadInfo> threadDump = JvmUtils.createThreadDump();
final Collection<ThreadDumpInfo.ThreadInfo> threadInfos =
threadDump.stream()
.map(
threadInfo ->
ThreadDumpInfo.ThreadInfo.create(
threadInfo.getThreadName(), threadInfo.toString()))
.collect(Collectors.toList());
return CompletableFuture.completedFuture(ThreadDumpInfo.create(threadInfos));
}
本文标签: 源码FlinkTaskExecutor
版权声明:本文标题:【Flink】Flink 1.12.2 源码浅析 : TaskExecutor 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/dongtai/1729493674a1202696.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论