Skip to content

Commit

Permalink
[WIP] Use chunk API consistently
Browse files Browse the repository at this point in the history
  • Loading branch information
fmbenhassine committed Aug 5, 2022
1 parent 9211c2d commit 38a400b
Show file tree
Hide file tree
Showing 37 changed files with 198 additions and 73 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2019 the original author or authors.
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,13 +21,15 @@
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.lang.Nullable;

/**
* A {@link Tasklet} implementing variations on read-process-write item handling.
*
* @author Dave Syer
* @author Mahmoud Ben Hassine
* @param <I> input item type
*/
public class ChunkOrientedTasklet<I> implements Tasklet {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2007 the original author or authors.
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.batch.core.step.item;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.item.Chunk;

/**
* Interface defined for processing {@link Chunk}s.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2007 the original author or authors.
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.batch.core.step.item;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.item.Chunk;

/**
* Interface for providing {@link Chunk}s to be processed, used by the
Expand Down
Expand Up @@ -34,8 +34,10 @@
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
import org.springframework.batch.core.step.skip.SkipListenerFailedException;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.SkipWrapper;
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.classify.Classifier;
import org.springframework.retry.ExhaustedRetryException;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.springframework.batch.core.step.skip.SkipListenerFailedException;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.classify.BinaryExceptionClassifier;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.springframework.batch.core.StepListener;
import org.springframework.batch.core.listener.MulticasterBatchListener;
import org.springframework.batch.core.observability.BatchMetrics;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.springframework.batch.core.StepListener;
import org.springframework.batch.core.listener.MulticasterBatchListener;
import org.springframework.batch.core.observability.BatchMetrics;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.repeat.RepeatCallback;
import org.springframework.batch.repeat.RepeatContext;
Expand Down
Expand Up @@ -44,6 +44,8 @@
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.core.step.StepSupport;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -292,7 +294,11 @@ static class JobConfiguration {
public Step step(StepBuilderFactory stepBuilderFactory,
@Value("#{jobParameters['chunkSize']}") Integer chunkSize) {
return stepBuilderFactory.get("step").<Integer, Integer>chunk(chunkSize)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4))).writer(items -> {
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4))).writer(new ItemWriter<Integer>() {
@Override
public void write(Chunk<? extends Integer> chunk) throws Exception {

}
}).build();
}

Expand Down
Expand Up @@ -38,6 +38,8 @@
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -237,17 +239,27 @@ public Step step1() {

@Bean
public Step step2() {
ItemWriter<? super Integer> itemWriter = new ItemWriter<Integer>() {
@Override
public void write(Chunk<? extends Integer> chunk) throws Exception {
chunk.getItems().forEach(System.out::println);
}
};
return stepBuilderFactory.get("step2").<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5)))
.writer(items -> items.forEach(System.out::println)).build();
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5))).writer(itemWriter).build();
}

@Bean
public Step step3() {
ItemWriter<? super Integer> itemWriter = new ItemWriter<Integer>() {
@Override
public void write(Chunk<? extends Integer> chunk) throws Exception {
chunk.getItems().forEach(System.out::println);
}
};
return stepBuilderFactory.get("step3").<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(6, 7, 8, 9, 10)))
.writer(items -> items.forEach(System.out::println)).faultTolerant().skip(Exception.class)
.skipLimit(3).build();
.reader(new ListItemReader<>(Arrays.asList(6, 7, 8, 9, 10))).writer(itemWriter).faultTolerant()
.skip(Exception.class).skipLimit(3).build();
}

@Bean
Expand Down
Expand Up @@ -26,12 +26,15 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.springframework.batch.item.Chunk;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* @author Dave Syer
* @author Mahmoud Ben Hassine
*
*/
class AlmostStatefulRetryChunkTests {
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.item.Chunk;

/**
* @author Dave Syer
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.PassThroughItemProcessor;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.lang.Nullable;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.support.RepeatTemplate;

Expand Down
Expand Up @@ -21,6 +21,8 @@

import org.junit.jupiter.api.Test;

import org.springframework.batch.item.SkipWrapper;

/**
* @author Dave Syer
*
Expand All @@ -41,7 +43,7 @@ void testItemWrapperT() {

/**
* Test method for
* {@link org.springframework.batch.core.step.item.SkipWrapper#SkipWrapper(java.lang.Object, java.lang.Throwable)}.
* {@link SkipWrapper#SkipWrapper(java.lang.Object, java.lang.Throwable)}.
*/
@Test
void testItemWrapperTException() {
Expand All @@ -51,8 +53,7 @@ void testItemWrapperTException() {
}

/**
* Test method for
* {@link org.springframework.batch.core.step.item.SkipWrapper#toString()}.
* Test method for {@link SkipWrapper#toString()}.
*/
@Test
void testToString() {
Expand Down
Expand Up @@ -26,8 +26,10 @@
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.JobSupport;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
import org.springframework.batch.repeat.support.RepeatTemplate;
Expand Down Expand Up @@ -87,13 +89,18 @@ void onSetUp() {
@Disabled
void testStatusForCommitFailedException() throws Exception {

step.setTasklet(new TestingChunkOrientedTasklet<>(getReader(new String[] { "a", "b", "c" }),
data -> TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
step.setTasklet(
new TestingChunkOrientedTasklet<>(getReader(new String[] { "a", "b", "c" }), new ItemWriter<String>() {
@Override
public void beforeCommit(boolean readOnly) {
throw new RuntimeException("Simulate commit failure");
public void write(Chunk<? extends String> chunk) throws Exception {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void beforeCommit(boolean readOnly) {
throw new RuntimeException("Simulate commit failure");
}
});
}
}), chunkOperations));
}, chunkOperations));

JobExecution jobExecution = jobRepository.createJobExecution(job.getName(),
new JobParameters(Collections.singletonMap("run.id", new JobParameter(getClass().getName() + ".1"))));
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.SkipLimitExceededException;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
Expand Down Expand Up @@ -70,9 +71,12 @@ class FaultTolerantStepIntegrationTests {
@BeforeEach
void setUp() {
ItemReader<Integer> itemReader = new ListItemReader<>(createItems());
ItemWriter<Integer> itemWriter = chunk -> {
if (chunk.contains(1)) {
throw new IllegalArgumentException();
ItemWriter<Integer> itemWriter = new ItemWriter<Integer>() {
@Override
public void write(Chunk<? extends Integer> chunk) throws Exception {
if (chunk.getItems().contains(1)) {
throw new IllegalArgumentException();
}
}
};
skipPolicy = new SkipIllegalArgumentExceptionSkipPolicy();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2013 the original author or authors.
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.batch.core.step.item;
package org.springframework.batch.item;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -26,10 +26,11 @@
* Encapsulation of a list of items to be processed and possibly a list of failed items to
* be skipped. To mark an item as skipped clients should iterate over the chunk using the
* {@link #iterator()} method, and if there is a failure call
* {@link org.springframework.batch.core.step.item.Chunk.ChunkIterator#remove()} on the
* iterator. The skipped items are then available through the chunk.
* {@link Chunk.ChunkIterator#remove()} on the iterator. The skipped items are then
* available through the chunk.
*
* @author Dave Syer
* @author Mahmoud Ben Hassine
* @since 2.0
*/
public class Chunk<W> implements Iterable<W> {
Expand Down
Expand Up @@ -36,8 +36,9 @@
* @author Dave Syer
* @author Lucas Ward
* @author Taeik Lim
* @author Mahmoud Ben Hassine
*/
@FunctionalInterface
// TODO in v5.2 add @FunctionalInterface once write(List<? extends T> items) is removed
public interface ItemWriter<T> {

/**
Expand All @@ -46,7 +47,23 @@ public interface ItemWriter<T> {
* @param items items to be written
* @throws Exception if there are errors. The framework will catch the exception and
* convert or rethrow it as appropriate.
* @deprecated since 5.0 in favor of {@link ItemWriter#write(Chunk)} instead. Will be
* removed in v5.2
*/
void write(List<? extends T> items) throws Exception;
@Deprecated(since = "5.0")
default void write(List<? extends T> items) throws Exception {
write(new Chunk<>(items));
}

/**
* Write items to a target. Will not be called with any null items in normal
* operation.
* @param items to write
* @throws Exception if there are errors. The framework will catch the exception and
* convert or rethrow it as appropriate.
*/
default void write(Chunk<? extends T> items) throws Exception {

}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2018 the original author or authors.
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.batch.core.step.item;
package org.springframework.batch.item;

import org.springframework.lang.Nullable;

Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.SpELItemKeyMapper;
import org.springframework.data.gemfire.GemfireTemplate;
import org.springframework.core.convert.converter.Converter;
Expand Down Expand Up @@ -116,7 +118,7 @@ public String convert(Foo item) {

@Test
void testWriteNoTransactionNoItems() throws Exception {
writer.write(null);
writer.write(new Chunk<>());
verifyNoInteractions(template);
}

Expand Down

0 comments on commit 38a400b

Please sign in to comment.