Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public enum TaskLifecycleEventType implements ILifecycleEventType {
* Dispatch the task instance to target.
*/
DISPATCH,
/**
* Severe and unrecoverable errors, such as initialization failure.
*/
FATAL,
/**
* The task instance is dispatched to the target executor server.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event;

import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.AbstractTaskLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;

import java.util.Date;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@Data
@Builder
@AllArgsConstructor
public class TaskFatalLifecycleEvent extends AbstractTaskLifecycleEvent {

private final ITaskExecutionRunnable taskExecutionRunnable;

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AbstractTaskLifecycleEvent.getTaskExecutionRunnable
; it is advisable to add an Override annotation.
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method overrides AbstractTaskLifecycleEvent.getTaskExecutionRunnable; it is advisable to add an Override annotation.

Copilot uses AI. Check for mistakes.

private final Date endTime;

@Override
public ILifecycleEventType getEventType() {
return TaskLifecycleEventType.FATAL;
}

@Override
public String toString() {
return "TaskFatalLifecycleEvent{" +
"task=" + taskExecutionRunnable.getName() +
", endTime=" + endTime +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.task.lifecycle.handler;

import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.task.statemachine.ITaskStateAction;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

import org.springframework.stereotype.Component;

@Component
public class TaskFatalLifecycleEventHandler extends AbstractTaskLifecycleEventHandler<TaskFatalLifecycleEvent> {

@Override
public void handle(final ITaskStateAction taskStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent) {
taskStateAction.onFatalEvent(workflowExecutionRunnable, taskExecutionRunnable, taskFatalEvent);
}

@Override
public ILifecycleEventType matchEventType() {
return TaskLifecycleEventType.FATAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPausedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskRetryLifecycleEvent;
Expand Down Expand Up @@ -99,6 +100,29 @@ protected void releaseTaskInstanceResourcesIfNeeded(final ITaskExecutionRunnable
}
}

@Override
public void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent) {
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);

final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
taskInstance.setState(TaskExecutionStatus.FAILURE);
taskInstance.setEndTime(taskFatalEvent.getEndTime());
taskInstanceDao.updateById(taskInstance);
Comment on lines +109 to +112
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] For consistency with other event handlers in this class (e.g., onFailedEvent, onPausedEvent, onKilledEvent), consider extracting the persistence logic into a private helper method like persistentTaskInstanceFatalEventToDB. This improves code maintainability and follows the established pattern in this class.

Copilot uses AI. Check for mistakes.

// If all successors are condition tasks, then the task will not be marked as failure.
// And the DAG will continue to execute.
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
return;
}
taskExecutionRunnable.getWorkflowExecutionGraph().markTaskExecutionRunnableChainFailure(taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
}

@Override
public void onDispatchedEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
Expand Down Expand Up @@ -91,6 +92,14 @@ void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskDispatchLifecycleEvent taskDispatchEvent);

/**
* Perform the necessary actions when the task in a certain state receive a {@link TaskFatalLifecycleEvent}.
* <p> This method is called when the task encounters an unrecoverable error (e.g., initialization failure).
*/
void onFatalEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskFatalLifecycleEvent taskFatalEvent);

/**
* Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}.
* <p> This method is called when the task has been dispatched to executor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFatalLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKilledLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
Expand All @@ -37,6 +38,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -109,7 +111,17 @@ public void onDispatchEvent(final IWorkflowExecutionRunnable workflowExecutionRu
taskInstance.getDelayTime(),
remainTimeMills);
}
taskExecutionRunnable.initializeTaskExecutionContext();
try {
taskExecutionRunnable.initializeTaskExecutionContext();
} catch (Exception ex) {
log.error("Current taskInstance: {} initializeTaskExecutionContext error", taskInstance.getName(), ex);
final TaskFatalLifecycleEvent taskFatalEvent = TaskFatalLifecycleEvent.builder()
.taskExecutionRunnable(taskExecutionRunnable)
.endTime(new Date())
.build();
taskExecutionRunnable.getWorkflowEventBus().publish(taskFatalEvent);
return;
}
Comment on lines +114 to +124
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to create a TaskFatalException, and throw TaskFatalException here and handle TaskFatalException at WorkflowEventBusFireWorker.

workerGroupDispatcherCoordinator.dispatchTask(taskExecutionRunnable, remainTimeMills);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@

@Test
@DisplayName("Test start a workflow with one fake task(A) failed")
public void testStartWorkflow_with_oneFailedTask() {

Check warning on line 824 in dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace these 3 tests with a single Parameterized one.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZrseilDIYxZ4fmsqdKs&open=AZrseilDIYxZ4fmsqdKs&pullRequest=17759
final String yaml = "/it/start/workflow_with_one_fake_task_failed.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();
Expand Down Expand Up @@ -849,6 +849,36 @@
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) fatal")
public void testStartWorkflow_with_oneFatalTask() {

Check warning on line 854 in dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZrseilDIYxZ4fmsqdKr&open=AZrseilDIYxZ4fmsqdKr&pullRequest=17759
final String yaml = "/it/start/workflow_with_one_fake_task_fatal.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflow))
.satisfiesExactly(workflowInstance -> assertThat(workflowInstance.getState())
.isEqualTo(WorkflowExecutionStatus.FAILURE));
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
});
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) failed")
public void testStartWorkflow_with_oneFailedTaskWithRetry() {
Expand Down Expand Up @@ -1403,6 +1433,46 @@
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one condition task(B) when one fake predecessor task(A) run fatal")
void testStartWorkflow_with_oneConditionTaskWithOneFakePredecessor_runFatal() {
final String yaml = "/it/start/workflow_with_one_condition_task_with_one_fake_predecessor_fatal.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(3)
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("B");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("D");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
});
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run failed")
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFailed() {
Expand Down Expand Up @@ -1435,4 +1505,37 @@
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one condition task(B) which is forbidden when one fake predecessor task(A) run fatal")
void testStartWorkflow_with_oneForbiddenConditionTaskWithOneFakePredecessor_runFatal() {
final String yaml =
"/it/start/workflow_with_one_forbidden_condition_task_with_one_fake_predecessor_fatal.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(1)
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
});
});
masterContainer.assertAllResourceReleased();
}
}
Loading
Loading