forked from spring-projects/spring-batch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RemotePartitioningWorkerStepBuilder.java
257 lines (222 loc) · 8.85 KB
/
RemotePartitioningWorkerStepBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
/*
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.integration.partition;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.StepLocator;
import org.springframework.batch.core.step.builder.FlowStepBuilder;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.PartitionStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.builder.TaskletStepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.Assert;
/**
* Builder for a worker step in a remote partitioning setup. This builder creates an
* {@link IntegrationFlow} that:
*
* <ul>
* <li>listens to {@link StepExecutionRequest}s coming from the manager on the input
* channel</li>
* <li>invokes the {@link StepExecutionRequestHandler} to execute the worker step for each
* incoming request. The worker step is located using the provided {@link StepLocator}. If
* no {@link StepLocator} is provided, a {@link BeanFactoryStepLocator} configured with
* the current {@link BeanFactory} will be used
* <li>replies to the manager on the output channel (when the manager step is configured
* to aggregate replies from workers). If no output channel is provided, a
* {@link NullChannel} will be used (assuming the manager side is configured to poll the
* job repository for workers status)</li>
* </ul>
*
* @since 4.1
* @author Mahmoud Ben Hassine
*/
public class RemotePartitioningWorkerStepBuilder extends StepBuilder {
private static final String SERVICE_ACTIVATOR_METHOD_NAME = "handle";
private static final Log logger = LogFactory.getLog(RemotePartitioningWorkerStepBuilder.class);
private MessageChannel inputChannel;
private MessageChannel outputChannel;
private JobExplorer jobExplorer;
private StepLocator stepLocator;
private BeanFactory beanFactory;
/**
* Initialize a step builder for a step with the given name.
* @param name the name of the step
*/
public RemotePartitioningWorkerStepBuilder(String name) {
super(name);
}
/**
* Set the input channel on which step execution requests sent by the manager are
* received.
* @param inputChannel the input channel
* @return this builder instance for fluent chaining
*/
public RemotePartitioningWorkerStepBuilder inputChannel(MessageChannel inputChannel) {
Assert.notNull(inputChannel, "inputChannel must not be null");
this.inputChannel = inputChannel;
return this;
}
/**
* Set the output channel on which replies will be sent to the manager step.
* @param outputChannel the input channel
* @return this builder instance for fluent chaining
*/
public RemotePartitioningWorkerStepBuilder outputChannel(MessageChannel outputChannel) {
Assert.notNull(outputChannel, "outputChannel must not be null");
this.outputChannel = outputChannel;
return this;
}
/**
* Set the job explorer.
* @param jobExplorer the job explorer to use
* @return this builder instance for fluent chaining
*/
public RemotePartitioningWorkerStepBuilder jobExplorer(JobExplorer jobExplorer) {
Assert.notNull(jobExplorer, "jobExplorer must not be null");
this.jobExplorer = jobExplorer;
return this;
}
/**
* Set the step locator used to locate the worker step to execute.
* @param stepLocator the step locator to use
* @return this builder instance for fluent chaining
*/
public RemotePartitioningWorkerStepBuilder stepLocator(StepLocator stepLocator) {
Assert.notNull(stepLocator, "stepLocator must not be null");
this.stepLocator = stepLocator;
return this;
}
/**
* Set the bean factory.
* @param beanFactory the bean factory
* @return this builder instance for fluent chaining
*/
public RemotePartitioningWorkerStepBuilder beanFactory(BeanFactory beanFactory) {
Assert.notNull(beanFactory, "beanFactory must not be null");
this.beanFactory = beanFactory;
return this;
}
@Override
public RemotePartitioningWorkerStepBuilder repository(JobRepository jobRepository) {
super.repository(jobRepository);
return this;
}
@Override
public RemotePartitioningWorkerStepBuilder transactionManager(PlatformTransactionManager transactionManager) {
super.transactionManager(transactionManager);
return this;
}
@Override
public RemotePartitioningWorkerStepBuilder startLimit(int startLimit) {
super.startLimit(startLimit);
return this;
}
@Override
public RemotePartitioningWorkerStepBuilder listener(Object listener) {
super.listener(listener);
return this;
}
@Override
public RemotePartitioningWorkerStepBuilder listener(StepExecutionListener listener) {
super.listener(listener);
return this;
}
@Override
public RemotePartitioningWorkerStepBuilder allowStartIfComplete(boolean allowStartIfComplete) {
super.allowStartIfComplete(allowStartIfComplete);
return this;
}
@Override
public TaskletStepBuilder tasklet(Tasklet tasklet) {
configureWorkerIntegrationFlow();
return super.tasklet(tasklet);
}
@Override
public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
configureWorkerIntegrationFlow();
return super.chunk(chunkSize);
}
@Override
public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
configureWorkerIntegrationFlow();
return super.chunk(completionPolicy);
}
@Override
public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
configureWorkerIntegrationFlow();
return super.partitioner(stepName, partitioner);
}
@Override
public PartitionStepBuilder partitioner(Step step) {
configureWorkerIntegrationFlow();
return super.partitioner(step);
}
@Override
public JobStepBuilder job(Job job) {
configureWorkerIntegrationFlow();
return super.job(job);
}
@Override
public FlowStepBuilder flow(Flow flow) {
configureWorkerIntegrationFlow();
return super.flow(flow);
}
/**
* Create an {@link IntegrationFlow} with a {@link StepExecutionRequestHandler}
* configured as a service activator listening to the input channel and replying on
* the output channel.
*/
private void configureWorkerIntegrationFlow() {
Assert.notNull(this.inputChannel, "An InputChannel must be provided");
Assert.notNull(this.jobExplorer, "A JobExplorer must be provided");
if (this.stepLocator == null) {
BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
beanFactoryStepLocator.setBeanFactory(this.beanFactory);
this.stepLocator = beanFactoryStepLocator;
}
if (this.outputChannel == null) {
if (logger.isDebugEnabled()) {
logger.debug("The output channel is set to a NullChannel. "
+ "The manager step must poll the job repository for workers status.");
}
this.outputChannel = new NullChannel();
}
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);
stepExecutionRequestHandler.setStepLocator(this.stepLocator);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlow.from(this.inputChannel)
.handle(stepExecutionRequestHandler, SERVICE_ACTIVATOR_METHOD_NAME).channel(this.outputChannel).get();
IntegrationFlowContext integrationFlowContext = this.beanFactory.getBean(IntegrationFlowContext.class);
integrationFlowContext.registration(standardIntegrationFlow).autoStartup(false).register();
}
}