Skip to content

Commit

Permalink
Allow lambdas to be passed as item processors
Browse files Browse the repository at this point in the history
Resolves #4061
  • Loading branch information
hpoettker authored and fmbenhassine committed May 13, 2022
1 parent 33338a4 commit 234e28a
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 29 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2021 the original author or authors.
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,6 @@
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.function.Function;

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.ItemProcessListener;
Expand All @@ -47,7 +46,6 @@
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
Expand Down Expand Up @@ -78,8 +76,6 @@ public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleSt

private ItemProcessor<? super I, ? extends O> processor;

private Function<? super I, ? extends O> itemProcessorFunction;

private int chunkSize = 0;

private RepeatOperations chunkOperations;
Expand Down Expand Up @@ -112,7 +108,6 @@ protected SimpleStepBuilder(SimpleStepBuilder<I, O> parent) {
this.reader = parent.reader;
this.writer = parent.writer;
this.processor = parent.processor;
this.itemProcessorFunction = parent.itemProcessorFunction;
this.itemListeners = parent.itemListeners;
this.readerTransactionalQueue = parent.readerTransactionalQueue;
}
Expand Down Expand Up @@ -236,19 +231,6 @@ public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> p
return this;
}

/**
* A {@link Function} to be delegated to as an {@link ItemProcessor}. If this is set,
* it will take precedence over any {@code ItemProcessor} configured via
* {@link #processor(ItemProcessor)}.
*
* @param function the function to delegate item processing to
* @return this for fluent chaining
*/
public SimpleStepBuilder<I, O> processor(Function<? super I, ? extends O> function) {
this.itemProcessorFunction = function;
return this;
}

/**
* Sets a flag to say that the reader is transactional (usually a queue), which is to say that failed items might be
* rolled back and re-presented in a subsequent transaction. Default is false, meaning that the items are read
Expand Down Expand Up @@ -359,10 +341,6 @@ protected ItemWriter<? super O> getWriter() {
}

protected ItemProcessor<? super I, ? extends O> getProcessor() {
if(this.itemProcessorFunction != null) {
this.processor = new FunctionItemProcessor<>(this.itemProcessorFunction);
}

return processor;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -245,7 +244,7 @@ private void assertStepFunctions(boolean faultTolerantStep) throws Exception {
.transactionManager(transactionManager)
.<Object, String>chunk(3)
.reader(reader)
.processor((Function<Object, String>) s -> s.toString())
.processor(Object::toString)
.writer(itemWriter)
.listener(new AnnotationBasedStepExecutionListener());

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2021 the original author or authors.
* Copyright 2010-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,7 +70,6 @@ public class FaultTolerantStepIntegrationTests {
@Before
public void setUp() {
ItemReader<Integer> itemReader = new ListItemReader<>(createItems());
ItemProcessor<Integer, Integer> itemProcessor = item -> item > 20 ? null : item;
ItemWriter<Integer> itemWriter = chunk -> {
if (chunk.contains(1)) {
throw new IllegalArgumentException();
Expand All @@ -80,7 +79,7 @@ public void setUp() {
stepBuilder = new StepBuilderFactory(jobRepository, transactionManager).get("step")
.<Integer, Integer>chunk(CHUNK_SIZE)
.reader(itemReader)
.processor(itemProcessor)
.processor(item -> item > 20 ? null : item)
.writer(itemWriter)
.faultTolerant();
}
Expand Down

0 comments on commit 234e28a

Please sign in to comment.