-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
SystemCommandTasklet.java
278 lines (233 loc) · 9.09 KB
/
SystemCommandTasklet.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
/*
* Copyright 2006-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.step.tasklet;
import java.io.File;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInterruptedException;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* {@link Tasklet} that executes a system command.
*
* The system command is executed asynchronously using injected
* {@link #setTaskExecutor(TaskExecutor)} - timeout value is required to be set, so that
* the batch job does not hang forever if the external process hangs.
*
* Tasklet periodically checks for termination status (i.e. {@link #setCommand(String)}
* finished its execution or {@link #setTimeout(long)} expired or job was interrupted).
* The check interval is given by {@link #setTerminationCheckInterval(long)}.
*
* When job interrupt is detected tasklet's execution is terminated immediately by
* throwing {@link JobInterruptedException}.
*
* {@link #setInterruptOnCancel(boolean)} specifies whether the tasklet should attempt to
* interrupt the thread that executes the system command if it is still running when
* tasklet exits (abnormally).
*
* @author Robert Kasanicky
* @author Will Schipp
* @author Mahmoud Ben Hassine
*/
public class SystemCommandTasklet implements StepExecutionListener, StoppableTasklet, InitializingBean {
protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class);
private static CommandRunner commandRunner = new JvmCommandRunner();
private String command;
private String[] environmentParams = null;
private File workingDirectory = null;
private SystemProcessExitCodeMapper systemProcessExitCodeMapper = new SimpleSystemProcessExitCodeMapper();
private long timeout = 0;
private long checkInterval = 1000;
private StepExecution execution = null;
private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
private boolean interruptOnCancel = false;
private volatile boolean stopped = false;
private JobExplorer jobExplorer;
private boolean stoppable = false;
/**
* Execute system command and map its exit code to {@link ExitStatus} using
* {@link SystemProcessExitCodeMapper}.
*/
@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
FutureTask<Integer> systemCommandTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Process process = commandRunner.exec(command, environmentParams, workingDirectory);
return process.waitFor();
}
});
long t0 = System.currentTimeMillis();
taskExecutor.execute(systemCommandTask);
while (true) {
Thread.sleep(checkInterval);// moved to the end of the logic
if (stoppable) {
JobExecution jobExecution = jobExplorer
.getJobExecution(chunkContext.getStepContext().getStepExecution().getJobExecutionId());
if (jobExecution.isStopping()) {
stopped = true;
}
}
if (systemCommandTask.isDone()) {
contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get()));
return RepeatStatus.FINISHED;
}
else if (System.currentTimeMillis() - t0 > timeout) {
systemCommandTask.cancel(interruptOnCancel);
throw new SystemCommandException("Execution of system command did not finish within the timeout");
}
else if (execution.isTerminateOnly()) {
systemCommandTask.cancel(interruptOnCancel);
throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'");
}
else if (stopped) {
systemCommandTask.cancel(interruptOnCancel);
contribution.setExitStatus(ExitStatus.STOPPED);
return RepeatStatus.FINISHED;
}
}
}
/**
* Static setter for the {@link CommandRunner} so it can be adjusted before
* dependency injection. Typically overridden by
* {@link #setCommandRunner(CommandRunner)}.
*
* @param commandRunner {@link CommandRunner} instance to be used by SystemCommandTasklet instance.
*/
public static void presetCommandRunner(CommandRunner commandRunner) {
SystemCommandTasklet.commandRunner = commandRunner;
}
/**
* Injection setter for the {@link CommandRunner}.
*
* @param commandRunner {@link CommandRunner} instance to be used by SystemCommandTasklet instance.
*/
public void setCommandRunner(CommandRunner commandRunner) {
SystemCommandTasklet.commandRunner = commandRunner;
}
/**
* @param command command to be executed in a separate system process
*/
public void setCommand(String command) {
this.command = command;
}
/**
* @param envp environment parameter values, inherited from parent process when not
* set (or set to null).
*/
public void setEnvironmentParams(String[] envp) {
this.environmentParams = envp;
}
/**
* @param dir working directory of the spawned process, inherited from parent process
* when not set (or set to null).
*/
public void setWorkingDirectory(String dir) {
if (dir == null) {
this.workingDirectory = null;
return;
}
this.workingDirectory = new File(dir);
Assert.isTrue(workingDirectory.exists(), "working directory must exist");
Assert.isTrue(workingDirectory.isDirectory(), "working directory value must be a directory");
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(commandRunner, "CommandRunner must be set");
Assert.hasLength(command, "'command' property value is required");
Assert.notNull(systemProcessExitCodeMapper, "SystemProcessExitCodeMapper must be set");
Assert.isTrue(timeout > 0, "timeout value must be greater than zero");
Assert.notNull(taskExecutor, "taskExecutor is required");
stoppable = jobExplorer != null;
}
public void setJobExplorer(JobExplorer jobExplorer) {
this.jobExplorer = jobExplorer;
}
/**
* @param systemProcessExitCodeMapper maps system process return value to
* <code>ExitStatus</code> returned by Tasklet.
* {@link SimpleSystemProcessExitCodeMapper} is used by default.
*/
public void setSystemProcessExitCodeMapper(SystemProcessExitCodeMapper systemProcessExitCodeMapper) {
this.systemProcessExitCodeMapper = systemProcessExitCodeMapper;
}
/**
* Timeout in milliseconds.
* @param timeout upper limit for how long the execution of the external program is
* allowed to last.
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* The time interval how often the tasklet will check for termination status.
* @param checkInterval time interval in milliseconds (1 second by default).
*/
public void setTerminationCheckInterval(long checkInterval) {
this.checkInterval = checkInterval;
}
/**
* Get a reference to {@link StepExecution} for interrupt checks during system command
* execution.
*/
@Override
public void beforeStep(StepExecution stepExecution) {
this.execution = stepExecution;
}
/**
* Sets the task executor that will be used to execute the system command NB! Avoid
* using a synchronous task executor
* @param taskExecutor instance of {@link TaskExecutor}.
*/
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* If <code>true</code> tasklet will attempt to interrupt the thread executing the
* system command if {@link #setTimeout(long)} has been exceeded or user interrupts
* the job. <code>false</code> by default
* @param interruptOnCancel boolean determines if process should be interrupted
*/
public void setInterruptOnCancel(boolean interruptOnCancel) {
this.interruptOnCancel = interruptOnCancel;
}
/**
* Will interrupt the thread executing the system command only if
* {@link #setInterruptOnCancel(boolean)} has been set to true. Otherwise the
* underlying command will be allowed to finish before the tasklet ends.
*
* @since 3.0
* @see StoppableTasklet#stop()
*/
@Override
public void stop() {
stopped = true;
}
}