Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public enum TaskLifecycleEventType implements ILifecycleEventType {
* Dispatch the task instance to target.
*/
DISPATCH,
/**
* Severe and unrecoverable errors, such as initialization failure.
*/
FATAL,
/**
* The task instance is dispatched to the target executor server.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event;

import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;

import java.util.Date;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
@AllArgsConstructor
public class TaskFatalLifecycleEvent extends AbstractTaskLifecycleEvent {

private final ITaskExecutionRunnable taskExecutionRunnable;

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AbstractTaskLifecycleEvent.getTaskExecutionRunnable
; it is advisable to add an Override annotation.
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overrides AbstractTaskLifecycleEvent.getTaskExecutionRunnable; it is advisable to add an Override annotation.

Copilot uses AI. Check for mistakes.

private final Date endTime;

@Override
public ILifecycleEventType getEventType() {
return TaskLifecycleEventType.FATAL;
}

@Override
public String toString() {
return "TaskFatalLifecycleEvent{" +
"task=" + taskExecutionRunnable.getName() +
", endTime=" + endTime +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;

import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.task.statemachine.ITaskStateAction;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

import org.springframework.stereotype.Component;

@Component
public class TaskFatalLifecycleEventHandler extends AbstractTaskLifecycleEventHandler<TaskFatalLifecycleEvent> {

@Override
public void handle(final ITaskStateAction taskStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent) {
taskStateAction.onFatalEvent(workflowExecutionRunnable, taskExecutionRunnable, taskFatalEvent);
}

@Override
public ILifecycleEventType matchEventType() {
return TaskLifecycleEventType.FATAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRunningLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRuntimeContextChangedEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStopLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

Expand Down Expand Up @@ -99,6 +101,29 @@ protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable
}
}

@Override
public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent) {
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();

// Update task state to FAILURE
taskInstance.setState(TaskExecutionStatus.FAILURE);
taskInstance.setEndTime(taskFatalEvent.getEndTime());
taskInstanceDao.updateById(taskInstance);

// Mark task as inactive in the execution graph
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableInActive(taskExecutionRunnable);
// Mark taskExecutionRunnable as failure in the execution graph
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);

log.info(
"Publishing WorkflowStopLifecycleEvent due to fatal failure of task[id={}, name={}] in workflow[processInstanceId={}].",
taskInstance.getId(), taskInstance.getName(), workflowExecutionRunnable.getWorkflowInstance().getId());
workflowExecutionRunnable.getWorkflowEventBus()
.publish(WorkflowStopLifecycleEvent.of(workflowExecutionRunnable));
}

@Override
public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
Expand Down Expand Up @@ -91,6 +92,14 @@ void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskDispatchLifecycleEvent taskDispatchEvent);

/**
* Perform the necessary actions when the task in a certain state receive a {@link TaskFatalLifecycleEvent}.
* <p> This method is called when the task encounters an unrecoverable error (e.g., initialization failure).
*/
void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent);

/**
* Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}.
* <p> This method is called when the task has been dispatched to executor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
Expand All @@ -37,6 +38,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -109,7 +111,17 @@ public void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRu
taskInstance.getDelayTime(),
remainTimeMills);
}
taskExecutionRunnable.initializeTaskExecutionContext();
try {
taskExecutionRunnable.initializeTaskExecutionContext();
} catch (Exception ex) {
log.error("Current taskInstance: {} initializeTaskExecutionContext error", taskInstance.getName(), ex);
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFatalEvent);
return;
}
Comment on lines +114 to +124
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to create a TaskFatalException, and throw TaskFatalException here and handle TaskFatalException at WorkflowEventBusFireWorker.

workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable, remainTimeMills);
}

Expand Down
Loading