From 38a400bf2d0c0e8f300b8ad851c9528d10e7b8cf Mon Sep 17 00:00:00 2001 From: Mahmoud Ben Hassine Date: Sat, 9 Jul 2022 05:21:03 +0200 Subject: [PATCH] [WIP] Use chunk API consistently Issue #3954 --- .../core/step/item/ChunkOrientedTasklet.java | 4 +++- .../batch/core/step/item/ChunkProcessor.java | 3 ++- .../batch/core/step/item/ChunkProvider.java | 3 ++- .../item/FaultTolerantChunkProcessor.java | 2 ++ .../step/item/FaultTolerantChunkProvider.java | 1 + .../core/step/item/SimpleChunkProcessor.java | 1 + .../core/step/item/SimpleChunkProvider.java | 1 + .../core/job/builder/FlowJobBuilderTests.java | 8 ++++++- .../core/observability/BatchMetricsTests.java | 22 ++++++++++++++----- .../item/AlmostStatefulRetryChunkTests.java | 3 +++ .../step/item/ChunkOrientedTaskletTests.java | 1 + .../FaultTolerantChunkProcessorTests.java | 1 + .../item/FaultTolerantChunkProviderTests.java | 1 + .../step/item/SimpleChunkProcessorTests.java | 1 + .../step/item/SimpleChunkProviderTests.java | 1 + .../core/step/item/SkipWrapperTests.java | 7 +++--- .../ChunkOrientedStepIntegrationTests.java | 17 +++++++++----- .../FaultTolerantStepIntegrationTests.java | 10 ++++++--- .../springframework/batch}/item/Chunk.java | 9 ++++---- .../batch/item/ItemWriter.java | 21 ++++++++++++++++-- .../batch}/item/SkipWrapper.java | 4 ++-- .../item/data/GemfireItemWriterTests.java | 4 +++- .../batch/item/data/MongoItemWriterTests.java | 4 +++- .../batch/item/data/Neo4jItemWriterTests.java | 6 +++-- .../item/data/RepositoryItemWriterTests.java | 4 +++- ...sifierCompositeItemWriterBuilderTests.java | 15 +++++++++++-- .../chunk/ChunkProcessorChunkHandler.java | 5 +++-- .../chunk/RemoteChunkHandlerFactoryBean.java | 4 ++-- .../MessageChannelPartitionHandler.java | 7 +++--- .../ChunkProcessorChunkHandlerTests.java | 2 +- ...RemoteChunkingManagerStepBuilderTests.java | 7 +++++- .../RemoteChunkingWorkerBuilderTests.java | 19 +++++++++++++--- .../sample/metrics/Job2Configuration.java | 20 ++++++++++------- .../remotechunking/WorkerConfiguration.java | 11 +++++++--- ...SkippableExceptionDuringProcessSample.java | 12 ++++++---- .../SkippableExceptionDuringReadSample.java | 12 ++++++---- .../SkippableExceptionDuringWriteSample.java | 18 +++++++++------ 37 files changed, 198 insertions(+), 73 deletions(-) rename {spring-batch-core/src/main/java/org/springframework/batch/core/step => spring-batch-infrastructure/src/main/java/org/springframework/batch}/item/Chunk.java (95%) rename {spring-batch-core/src/main/java/org/springframework/batch/core/step => spring-batch-infrastructure/src/main/java/org/springframework/batch}/item/SkipWrapper.java (93%) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java index 1a7fc7d0a3..fbeb0425bf 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2019 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.item.Chunk; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.lang.Nullable; @@ -28,6 +29,7 @@ * A {@link Tasklet} implementing variations on read-process-write item handling. * * @author Dave Syer + * @author Mahmoud Ben Hassine * @param input item type */ public class ChunkOrientedTasklet implements Tasklet { diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java index 5ca36744dd..ece181f5a4 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2007 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.batch.core.step.item; import org.springframework.batch.core.StepContribution; +import org.springframework.batch.item.Chunk; /** * Interface defined for processing {@link Chunk}s. diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProvider.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProvider.java index 36b778d2c2..bd93f800b7 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProvider.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2007 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.batch.core.step.item; import org.springframework.batch.core.StepContribution; +import org.springframework.batch.item.Chunk; /** * Interface for providing {@link Chunk}s to be processed, used by the diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java index dbcdc83705..d46366c7dc 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessor.java @@ -34,8 +34,10 @@ import org.springframework.batch.core.step.skip.SkipLimitExceededException; import org.springframework.batch.core.step.skip.SkipListenerFailedException; import org.springframework.batch.core.step.skip.SkipPolicy; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.SkipWrapper; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.Classifier; import org.springframework.retry.ExhaustedRetryException; diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProvider.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProvider.java index 47ec677fb2..768bf0f793 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProvider.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/FaultTolerantChunkProvider.java @@ -23,6 +23,7 @@ import org.springframework.batch.core.step.skip.SkipListenerFailedException; import org.springframework.batch.core.step.skip.SkipPolicy; import org.springframework.batch.core.step.skip.SkipPolicyFailedException; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemReader; import org.springframework.batch.repeat.RepeatOperations; import org.springframework.classify.BinaryExceptionClassifier; diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java index 0799d24530..05f460087f 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProcessor.java @@ -26,6 +26,7 @@ import org.springframework.batch.core.StepListener; import org.springframework.batch.core.listener.MulticasterBatchListener; import org.springframework.batch.core.observability.BatchMetrics; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.InitializingBean; diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java index 6bb97894f1..8cb63e60c5 100755 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SimpleChunkProvider.java @@ -28,6 +28,7 @@ import org.springframework.batch.core.StepListener; import org.springframework.batch.core.listener.MulticasterBatchListener; import org.springframework.batch.core.observability.BatchMetrics; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemReader; import org.springframework.batch.repeat.RepeatCallback; import org.springframework.batch.repeat.RepeatContext; diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java index 7feb030a20..a59b1b818b 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/job/builder/FlowJobBuilderTests.java @@ -44,6 +44,8 @@ import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.core.step.StepSupport; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; @@ -292,7 +294,11 @@ static class JobConfiguration { public Step step(StepBuilderFactory stepBuilderFactory, @Value("#{jobParameters['chunkSize']}") Integer chunkSize) { return stepBuilderFactory.get("step").chunk(chunkSize) - .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4))).writer(items -> { + .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4))).writer(new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + + } }).build(); } diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java index 5aee7586ae..eaa4d2c861 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/observability/BatchMetricsTests.java @@ -38,6 +38,8 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.context.ApplicationContext; @@ -237,17 +239,27 @@ public Step step1() { @Bean public Step step2() { + ItemWriter itemWriter = new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + chunk.getItems().forEach(System.out::println); + } + }; return stepBuilderFactory.get("step2").chunk(2) - .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5))) - .writer(items -> items.forEach(System.out::println)).build(); + .reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5))).writer(itemWriter).build(); } @Bean public Step step3() { + ItemWriter itemWriter = new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + chunk.getItems().forEach(System.out::println); + } + }; return stepBuilderFactory.get("step3").chunk(2) - .reader(new ListItemReader<>(Arrays.asList(6, 7, 8, 9, 10))) - .writer(items -> items.forEach(System.out::println)).faultTolerant().skip(Exception.class) - .skipLimit(3).build(); + .reader(new ListItemReader<>(Arrays.asList(6, 7, 8, 9, 10))).writer(itemWriter).faultTolerant() + .skip(Exception.class).skipLimit(3).build(); } @Bean diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/AlmostStatefulRetryChunkTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/AlmostStatefulRetryChunkTests.java index f9cd6c3f28..7a7165327a 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/AlmostStatefulRetryChunkTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/AlmostStatefulRetryChunkTests.java @@ -26,12 +26,15 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.batch.item.Chunk; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author Dave Syer + * @author Mahmoud Ben Hassine * */ class AlmostStatefulRetryChunkTests { diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java index c883e59a24..800a4c86ff 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/ChunkOrientedTaskletTests.java @@ -27,6 +27,7 @@ import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.item.Chunk; /** * @author Dave Syer diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java index 24d2099213..5acc5d78e3 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProcessorTests.java @@ -36,6 +36,7 @@ import org.springframework.batch.core.listener.ItemListenerSupport; import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.PassThroughItemProcessor; diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProviderTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProviderTests.java index e418b747b8..b0f87b7ed4 100755 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProviderTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/FaultTolerantChunkProviderTests.java @@ -28,6 +28,7 @@ import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java index 7b1f48779b..6e7d812702 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProcessorTests.java @@ -28,6 +28,7 @@ import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.lang.Nullable; diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProviderTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProviderTests.java index b4161133c9..eb5f4196d0 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProviderTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SimpleChunkProviderTests.java @@ -26,6 +26,7 @@ import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.support.ListItemReader; import org.springframework.batch.repeat.support.RepeatTemplate; diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SkipWrapperTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SkipWrapperTests.java index 32a3cf6678..0ca4275b9a 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SkipWrapperTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/item/SkipWrapperTests.java @@ -21,6 +21,8 @@ import org.junit.jupiter.api.Test; +import org.springframework.batch.item.SkipWrapper; + /** * @author Dave Syer * @@ -41,7 +43,7 @@ void testItemWrapperT() { /** * Test method for - * {@link org.springframework.batch.core.step.item.SkipWrapper#SkipWrapper(java.lang.Object, java.lang.Throwable)}. + * {@link SkipWrapper#SkipWrapper(java.lang.Object, java.lang.Throwable)}. */ @Test void testItemWrapperTException() { @@ -51,8 +53,7 @@ void testItemWrapperTException() { } /** - * Test method for - * {@link org.springframework.batch.core.step.item.SkipWrapper#toString()}. + * Test method for {@link SkipWrapper#toString()}. */ @Test void testToString() { diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/ChunkOrientedStepIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/ChunkOrientedStepIntegrationTests.java index 9192bcd83c..8632f7edfb 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/ChunkOrientedStepIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/tasklet/ChunkOrientedStepIntegrationTests.java @@ -26,8 +26,10 @@ import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.job.JobSupport; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; import org.springframework.batch.repeat.support.RepeatTemplate; @@ -87,13 +89,18 @@ void onSetUp() { @Disabled void testStatusForCommitFailedException() throws Exception { - step.setTasklet(new TestingChunkOrientedTasklet<>(getReader(new String[] { "a", "b", "c" }), - data -> TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + step.setTasklet( + new TestingChunkOrientedTasklet<>(getReader(new String[] { "a", "b", "c" }), new ItemWriter() { @Override - public void beforeCommit(boolean readOnly) { - throw new RuntimeException("Simulate commit failure"); + public void write(Chunk chunk) throws Exception { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void beforeCommit(boolean readOnly) { + throw new RuntimeException("Simulate commit failure"); + } + }); } - }), chunkOperations)); + }, chunkOperations)); JobExecution jobExecution = jobRepository.createJobExecution(job.getName(), new JobParameters(Collections.singletonMap("run.id", new JobParameter(getClass().getName() + ".1")))); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/test/step/FaultTolerantStepIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/test/step/FaultTolerantStepIntegrationTests.java index 08513018fb..e58259c9fe 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/test/step/FaultTolerantStepIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/test/step/FaultTolerantStepIntegrationTests.java @@ -35,6 +35,7 @@ import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy; import org.springframework.batch.core.step.skip.SkipLimitExceededException; import org.springframework.batch.core.step.skip.SkipPolicy; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; @@ -70,9 +71,12 @@ class FaultTolerantStepIntegrationTests { @BeforeEach void setUp() { ItemReader itemReader = new ListItemReader<>(createItems()); - ItemWriter itemWriter = chunk -> { - if (chunk.contains(1)) { - throw new IllegalArgumentException(); + ItemWriter itemWriter = new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + if (chunk.getItems().contains(1)) { + throw new IllegalArgumentException(); + } } }; skipPolicy = new SkipIllegalArgumentExceptionSkipPolicy(); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/Chunk.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/Chunk.java similarity index 95% rename from spring-batch-core/src/main/java/org/springframework/batch/core/step/item/Chunk.java rename to spring-batch-infrastructure/src/main/java/org/springframework/batch/item/Chunk.java index 9fa7f46825..c3201c9579 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/Chunk.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/Chunk.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2013 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.core.step.item; +package org.springframework.batch.item; import java.util.ArrayList; import java.util.Collection; @@ -26,10 +26,11 @@ * Encapsulation of a list of items to be processed and possibly a list of failed items to * be skipped. To mark an item as skipped clients should iterate over the chunk using the * {@link #iterator()} method, and if there is a failure call - * {@link org.springframework.batch.core.step.item.Chunk.ChunkIterator#remove()} on the - * iterator. The skipped items are then available through the chunk. + * {@link Chunk.ChunkIterator#remove()} on the iterator. The skipped items are then + * available through the chunk. * * @author Dave Syer + * @author Mahmoud Ben Hassine * @since 2.0 */ public class Chunk implements Iterable { diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/ItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/ItemWriter.java index c5af2028d6..054a7f7c23 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/ItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/ItemWriter.java @@ -36,8 +36,9 @@ * @author Dave Syer * @author Lucas Ward * @author Taeik Lim + * @author Mahmoud Ben Hassine */ -@FunctionalInterface +// TODO in v5.2 add @FunctionalInterface once write(List items) is removed public interface ItemWriter { /** @@ -46,7 +47,23 @@ public interface ItemWriter { * @param items items to be written * @throws Exception if there are errors. The framework will catch the exception and * convert or rethrow it as appropriate. + * @deprecated since 5.0 in favor of {@link ItemWriter#write(Chunk)} instead. Will be + * removed in v5.2 */ - void write(List items) throws Exception; + @Deprecated(since = "5.0") + default void write(List items) throws Exception { + write(new Chunk<>(items)); + } + + /** + * Write items to a target. Will not be called with any null items in normal + * operation. + * @param items to write + * @throws Exception if there are errors. The framework will catch the exception and + * convert or rethrow it as appropriate. + */ + default void write(Chunk items) throws Exception { + + } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SkipWrapper.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/SkipWrapper.java similarity index 93% rename from spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SkipWrapper.java rename to spring-batch-infrastructure/src/main/java/org/springframework/batch/item/SkipWrapper.java index 2ad4eac4b5..41b5dd977f 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/SkipWrapper.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/SkipWrapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2018 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.batch.core.step.item; +package org.springframework.batch.item; import org.springframework.lang.Nullable; diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/GemfireItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/GemfireItemWriterTests.java index 68bad56ea4..fa1de737cb 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/GemfireItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/GemfireItemWriterTests.java @@ -27,6 +27,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; + +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.SpELItemKeyMapper; import org.springframework.data.gemfire.GemfireTemplate; import org.springframework.core.convert.converter.Converter; @@ -116,7 +118,7 @@ public String convert(Foo item) { @Test void testWriteNoTransactionNoItems() throws Exception { - writer.write(null); + writer.write(new Chunk<>()); verifyNoInteractions(template); } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java index ccb54fdc68..162ce4417f 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/MongoItemWriterTests.java @@ -38,6 +38,8 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; + +import org.springframework.batch.item.Chunk; import org.springframework.batch.support.transaction.ResourcelessTransactionManager; import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.mongodb.core.BulkOperations; @@ -126,7 +128,7 @@ void testWriteNoTransactionWithCollection() throws Exception { @Test void testWriteNoTransactionNoItems() throws Exception { - writer.write(null); + writer.write(new Chunk<>()); verifyNoInteractions(template); verifyNoInteractions(bulkOperations); diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/Neo4jItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/Neo4jItemWriterTests.java index 0a9584a48d..b0a29a7c2c 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/Neo4jItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/Neo4jItemWriterTests.java @@ -25,6 +25,8 @@ import org.neo4j.ogm.session.Session; import org.neo4j.ogm.session.SessionFactory; +import org.springframework.batch.item.Chunk; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.verify; @@ -70,7 +72,7 @@ void testWriteNullSession() throws Exception { writer.setSessionFactory(this.sessionFactory); writer.afterPropertiesSet(); - writer.write(null); + writer.write(new Chunk<>()); verifyNoInteractions(this.session); } @@ -83,7 +85,7 @@ void testWriteNullWithSession() throws Exception { writer.afterPropertiesSet(); when(this.sessionFactory.openSession()).thenReturn(this.session); - writer.write(null); + writer.write(new Chunk<>()); verifyNoInteractions(this.session); } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/RepositoryItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/RepositoryItemWriterTests.java index 11cdbceee9..05d44bd2b7 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/RepositoryItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/data/RepositoryItemWriterTests.java @@ -31,6 +31,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; + +import org.springframework.batch.item.Chunk; import org.springframework.data.repository.CrudRepository; @ExtendWith(MockitoExtension.class) @@ -64,7 +66,7 @@ void testAfterPropertiesSet() throws Exception { @Test void testWriteNoItems() throws Exception { - writer.write(null); + writer.write(new Chunk<>()); writer.write(new ArrayList<>()); diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/builder/ClassifierCompositeItemWriterBuilderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/builder/ClassifierCompositeItemWriterBuilderTests.java index d35a8452f4..ace9390698 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/builder/ClassifierCompositeItemWriterBuilderTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/builder/ClassifierCompositeItemWriterBuilderTests.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ClassifierCompositeItemWriter; import org.springframework.classify.PatternMatchingClassifier; @@ -43,8 +44,18 @@ class ClassifierCompositeItemWriterBuilderTests { @Test void testWrite() throws Exception { Map> map = new HashMap<>(); - ItemWriter fooWriter = items -> foos.addAll(items); - ItemWriter defaultWriter = items -> defaults.addAll(items); + ItemWriter fooWriter = new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + foos.addAll(chunk.getItems()); + } + }; + ItemWriter defaultWriter = new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + defaults.addAll(chunk.getItems()); + } + }; map.put("foo", fooWriter); map.put("*", defaultWriter); ClassifierCompositeItemWriter writer = new ClassifierCompositeItemWriterBuilder() diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkHandler.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkHandler.java index 7ff9115bd5..95fa3bc49b 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkHandler.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2013 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.JobInterruptedException; import org.springframework.batch.core.StepContribution; -import org.springframework.batch.core.step.item.Chunk; +import org.springframework.batch.item.Chunk; import org.springframework.batch.core.step.item.ChunkProcessor; import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor; import org.springframework.batch.core.step.skip.NonSkippableReadException; @@ -40,6 +40,7 @@ * * @author Dave Syer * @author Michael Minella + * @author Mahmoud Ben Hassine * @param the type of the items in the chunk to be handled */ @MessageEndpoint diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkHandlerFactoryBean.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkHandlerFactoryBean.java index d6ccd43251..1d6428f974 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkHandlerFactoryBean.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkHandlerFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2021 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecutionListener; -import org.springframework.batch.core.step.item.Chunk; +import org.springframework.batch.item.Chunk; import org.springframework.batch.core.step.item.ChunkOrientedTasklet; import org.springframework.batch.core.step.item.ChunkProcessor; import org.springframework.batch.core.step.item.FaultTolerantChunkProcessor; diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java index 23a905388c..4b48a6836a 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java @@ -222,7 +222,8 @@ public void setReplyChannel(PollableChannel replyChannel) { * @see PartitionHandler#handle(StepExecutionSplitter, StepExecution) */ @Override - protected Set doHandle(StepExecution managerStepExecution, Set partitionStepExecutions) throws Exception { + protected Set doHandle(StepExecution managerStepExecution, + Set partitionStepExecutions) throws Exception { if (CollectionUtils.isEmpty(partitionStepExecutions)) { return partitionStepExecutions; @@ -248,8 +249,8 @@ protected Set doHandle(StepExecution managerStepExecution, Set pollReplies(final StepExecution managerStepExecution, - final Set split) throws Exception { + private Set pollReplies(final StepExecution managerStepExecution, final Set split) + throws Exception { final Set result = new HashSet<>(split.size()); Callable> callback = new Callable>() { diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkHandlerTests.java b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkHandlerTests.java index 9b66e0df7b..7fbfb2bebf 100644 --- a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkHandlerTests.java +++ b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/ChunkProcessorChunkHandlerTests.java @@ -20,7 +20,7 @@ import org.junit.jupiter.api.Test; import org.springframework.batch.core.StepContribution; -import org.springframework.batch.core.step.item.Chunk; +import org.springframework.batch.item.Chunk; import org.springframework.batch.core.step.item.ChunkProcessor; import org.springframework.batch.test.MetaDataInstanceFactory; import org.springframework.util.StringUtils; diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilderTests.java b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilderTests.java index 3d06679642..905dab7c21 100644 --- a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilderTests.java +++ b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilderTests.java @@ -37,6 +37,7 @@ import org.springframework.batch.core.step.item.SimpleChunkProcessor; import org.springframework.batch.core.step.item.SimpleChunkProvider; import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemStreamSupport; @@ -174,7 +175,11 @@ void testUnsupportedOperationExceptionWhenSpecifyingAnItemWriter() { // when final Exception expectedException = assertThrows(UnsupportedOperationException.class, () -> new RemoteChunkingManagerStepBuilder("step").reader(this.itemReader) - .writer(items -> { + .writer(new ItemWriter() { + @Override + public void write(Chunk items) throws Exception { + + } }).repository(this.jobRepository).transactionManager(this.transactionManager) .inputChannel(this.inputChannel).outputChannel(this.outputChannel).build()); diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilderTests.java b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilderTests.java index 753c38f368..d658f9a99f 100644 --- a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilderTests.java +++ b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilderTests.java @@ -17,6 +17,7 @@ import org.junit.jupiter.api.Test; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.PassThroughItemProcessor; @@ -34,7 +35,11 @@ class RemoteChunkingWorkerBuilderTests { private final ItemProcessor itemProcessor = new PassThroughItemProcessor<>(); - private final ItemWriter itemWriter = items -> { + private final ItemWriter itemWriter = new ItemWriter() { + @Override + public void write(Chunk items) throws Exception { + + } }; @Test @@ -93,7 +98,11 @@ void testMandatoryItemWriter() { void testMandatoryInputChannel() { // given RemoteChunkingWorkerBuilder builder = new RemoteChunkingWorkerBuilder() - .itemWriter(items -> { + .itemWriter(new ItemWriter() { + @Override + public void write(Chunk items) throws Exception { + + } }); // when @@ -107,7 +116,11 @@ void testMandatoryInputChannel() { void testMandatoryOutputChannel() { // given RemoteChunkingWorkerBuilder builder = new RemoteChunkingWorkerBuilder() - .itemWriter(items -> { + .itemWriter(new ItemWriter() { + @Override + public void write(Chunk items) throws Exception { + ItemWriter.super.write(items); + } }).inputChannel(new DirectChannel()); // when diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/metrics/Job2Configuration.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/metrics/Job2Configuration.java index bf6865b827..b791f955e6 100644 --- a/spring-batch-samples/src/main/java/org/springframework/batch/sample/metrics/Job2Configuration.java +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/metrics/Job2Configuration.java @@ -9,6 +9,7 @@ import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.support.ListItemReader; import org.springframework.context.annotation.Bean; @@ -52,15 +53,18 @@ public ListItemReader itemReader() { @Bean public ItemWriter itemWriter() { - return items -> { - for (Integer item : items) { - int nextInt = random.nextInt(1000); - Thread.sleep(nextInt); - // simulate write failure - if (nextInt % 57 == 0) { - throw new Exception("Boom!"); + return new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + for (Integer item : chunk.getItems()) { + int nextInt = random.nextInt(1000); + Thread.sleep(nextInt); + // simulate write failure + if (nextInt % 57 == 0) { + throw new Exception("Boom!"); + } + System.out.println("item = " + item); } - System.out.println("item = " + item); } }; } diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java index 1da466a5d2..2772fa791f 100644 --- a/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java @@ -21,6 +21,7 @@ import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder; import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; @@ -108,9 +109,13 @@ public ItemProcessor itemProcessor() { @Bean public ItemWriter itemWriter() { - return items -> { - for (Integer item : items) { - System.out.println("writing item " + item); + return new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + for (Integer item : chunk.getItems()) { + System.out.println("writing item " + item); + } + } }; } diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringProcessSample.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringProcessSample.java index 2609fdb105..4d7186444a 100644 --- a/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringProcessSample.java +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringProcessSample.java @@ -23,6 +23,7 @@ import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; @@ -75,10 +76,13 @@ public ItemProcessor itemProcessor() { @Bean public ItemWriter itemWriter() { - return items -> { - System.out.println("About to write chunk: " + items); - for (Integer item : items) { - System.out.println("writing item = " + item); + return new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + System.out.println("About to write chunk: " + chunk.getItems()); + for (Integer item : chunk.getItems()) { + System.out.println("writing item = " + item); + } } }; } diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringReadSample.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringReadSample.java index 8f15f901e3..1a67fac4af 100644 --- a/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringReadSample.java +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringReadSample.java @@ -23,6 +23,7 @@ import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; @@ -75,10 +76,13 @@ public ItemProcessor itemProcessor() { @Bean public ItemWriter itemWriter() { - return items -> { - System.out.println("About to write chunk: " + items); - for (Integer item : items) { - System.out.println("writing item = " + item); + return new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + System.out.println("About to write chunk: " + chunk.getItems()); + for (Integer item : chunk.getItems()) { + System.out.println("writing item = " + item); + } } }; } diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringWriteSample.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringWriteSample.java index b4298051c8..14f7b9d8cb 100644 --- a/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringWriteSample.java +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/skip/SkippableExceptionDuringWriteSample.java @@ -23,6 +23,7 @@ import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.item.Chunk; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; @@ -71,14 +72,17 @@ public ItemProcessor itemProcessor() { @Bean public ItemWriter itemWriter() { - return items -> { - System.out.println("About to write chunk: " + items); - for (Integer item : items) { - if (item.equals(5)) { - System.out.println("Throwing exception on item " + item); - throw new IllegalArgumentException("Sorry, no 5 here!"); + return new ItemWriter() { + @Override + public void write(Chunk chunk) throws Exception { + System.out.println("About to write chunk: " + chunk.getItems()); + for (Integer item : chunk.getItems()) { + if (item.equals(5)) { + System.out.println("Throwing exception on item " + item); + throw new IllegalArgumentException("Sorry, no 5 here!"); + } + System.out.println("writing item = " + item); } - System.out.println("writing item = " + item); } }; }