Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cdee83e
[DSIP-23][TaskPlugin] DmsTask resource leak repair
Nov 25, 2025
3394bb2
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 25, 2025
ed00657
Merge branch 'DSIP-23-DmsTask' of github.com:niumy0701/dolphinschedul…
Nov 25, 2025
451bf9f
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 26, 2025
80b1ee7
Merge branch 'dev' of github.com:niumy0701/dolphinscheduler into DSIP…
Nov 26, 2025
6de20c9
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 27, 2025
cd182de
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 27, 2025
a08701a
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
2929213
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
03c9b3f
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
8c3d2d7
Merge branch 'dev' of github.com:niumy0701/dolphinscheduler into DSIP…
Nov 29, 2025
7be50e7
Merge branch 'DSIP-23-DmsTask' of github.com:niumy0701/dolphinschedul…
Nov 29, 2025
56115d3
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 29, 2025
f61aadc
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 1, 2025
4137abb
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 2, 2025
f905728
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 2, 2025
f259d12
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 3, 2025
1718eb2
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 4, 2025
5eac924
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 4, 2025
0b04ffe
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,19 @@ public void stopReplicationTask() {
if (replicationTaskArn == null) {
return;
}
StopReplicationTaskRequest request = new StopReplicationTaskRequest()
.withReplicationTaskArn(replicationTaskArn);
client.stopReplicationTask(request);
awaitReplicationTaskStatus(STATUS.STOPPED);
try {
StopReplicationTaskRequest request = new StopReplicationTaskRequest()
.withReplicationTaskArn(replicationTaskArn);
client.stopReplicationTask(request);
awaitReplicationTaskStatus(STATUS.STOPPED);
} catch (Exception e) {
log.error("stopReplicationTask error: ", e);
} finally {
if (client != null) {
// shutdown client
client.shutdown();
}
}
Comment on lines +144 to +156
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it need to close resources when canceling tasks, right

Copy link
Member

@ruanwenjun ruanwenjun Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to close resource when canceling, it will be close at handle lifecycle, have you test this? Please at least test in real environment when submit a PR.

}

public Boolean deleteReplicationTask() {
Expand Down Expand Up @@ -268,8 +277,8 @@ public String replaceFileParameters(String parameter) throws IOException {
}
if (parameter.startsWith("file://")) {
String filePath = parameter.substring(7);
try {
return IOUtils.toString(new FileInputStream(filePath), StandardCharsets.UTF_8);
try (FileInputStream fis = new FileInputStream(filePath)) {
return IOUtils.toString(fis, StandardCharsets.UTF_8);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Files.readString is better?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we minimize the changes and close the resources first, and then optimize them later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do it all at once? This is not a big change,

} catch (IOException e) {
throw new IOException("Error reading file: " + filePath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;

import org.apache.commons.beanutils.BeanUtils;
Expand Down Expand Up @@ -78,6 +80,42 @@ public void init() throws TaskException {
initDmsHook();
}

/**
* If appIds is empty, submit a new remote application; otherwise, just track application status.
*
* @param taskCallBack
* @throws TaskException
*/
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// if appIds is not empty, just track application status, avoid resubmitting remote task
if (StringUtils.isNotEmpty(taskRequest.getAppIds())) {
setAppIds(taskRequest.getAppIds());
trackApplicationStatus();
return;
}

// submit a remote application
submitApplication();

if (StringUtils.isNotEmpty(getAppIds())) {
taskRequest.setAppIds(getAppIds());
// callback to update remote application info
taskCallBack.updateRemoteApplicationInfo(taskRequest.getTaskInstanceId(),
new ApplicationInfo(getAppIds()));
}

// keep tracking application status
trackApplicationStatus();
} finally {
// shutdown dmsHook client
if (dmsHook.getClient() != null) {
dmsHook.getClient().shutdown();
}
}
}

@Override
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;

Expand Down Expand Up @@ -238,4 +243,63 @@ public void testAwaitReplicationTaskStatus() {
}
});
}

@Test
public void testReplaceFileParametersWithNull() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = null;
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertNull(result);
}
});
}

@Test
public void testReplaceFileParametersWithNormalString() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = "normal string";
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertEquals(parameter, result);
}
});
}

@Test
public void testReplaceFileParametersWithExistingFile() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
File tempFile = new File("tempFile.txt");
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String fileContent = "content of the file";
FileUtils.writeStringToFile(tempFile, fileContent, StandardCharsets.UTF_8);
String parameter = "file://" + tempFile.getAbsolutePath();
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertEquals(fileContent, result);
} finally {
tempFile.delete();
}
});
}

@Test
public void testReplaceFileParametersWithNonexistentFile() {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = "file://nonexistentfile.txt";
Assertions.assertThrows(IOException.class, () -> {
dmsHook.replaceFileParameters(parameter);
});
}
});
}

}