Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoItemReader mongoOperation used in doPageRead to utilize stream method instead of find #4502

Open
ZJTAN97 opened this issue Nov 30, 2023 · 0 comments
Labels
status: waiting-for-triage Issues that we did not analyse yet type: bug

Comments

@ZJTAN97
Copy link

ZJTAN97 commented Nov 30, 2023

Bug description

While using MongoItemReader, I have configured my Step bean to utilize faultTolerant method and a skipLimit of 5
on the skip condition for IllegalArgumentException

@Configuration
@RequiredArgsConstructor
public class PetJobConfig {

    private final PetRepo petRepo;

    private final JobRepository jobRepository;

    private final PlatformTransactionManager platformTransactionManager;

    private final MongoTemplate mongoTemplate;

    @Bean
    public Step readPetFromMongo() {
        return new StepBuilder("petReaderMongo", jobRepository)
                .allowStartIfComplete(true)
                .<PetDomain, PetDomain>chunk(1000, platformTransactionManager)
                .reader(petRepo.petReader())
                .writer(new PetWriter(mongoTemplate))
                .faultTolerant()
                .skipLimit(5)
                .skip(IllegalArgumentException.class)
                .build();
    }

    @Bean
    public Job readPetFromMongoJob() {
        return new JobBuilder("petReaderMongoJob", jobRepository)
                .start(readPetFromMongo())
                .build();
    }

}

In my current context, there could be data in my database that does not fully conform to the target type provided to the
reader. And I would like to make use of the faultTolerant method to skip these dirty data.

type provided to MongoItemReader

public record Animal (
  String name,
  Animal animal
) {}


public enum Animal {
   CAT,
   DOG;
}

Repo class


@Repository
@RequiredArgsConstructor
public class PetRepo {

    private final MongoTemplate mongoTemplate;

    public MongoItemReader<PetDomain> petReader() {

        Map<String, Sort.Direction> sorts = new HashMap<>();

        Query query = new Query();

        var reader = new MongoItemReaderBuilder<PetDomain>()
                .name("petReader")
                .collection("pet")
                .pageSize(500)
                .template(mongoTemplate)
                .targetType(PetDomain.class)
                .sorts(sorts)
                .query(query)
                .build();

        return reader;
    }

}


E.g. dirty data from mongodb

{
  name: "Bingo",
  animal: "CAT2" // Does not conform to enum provided
}

However, due to the way doPageRead utilizes MongoOperations to retrieve data as a list instead of a stream, it
is unable to serialize to the type as long as there is dirty data.

So to be able to iterate through the iterator, I have to override the entire doPageRead method just to change
the MongoOperation method from find to stream

@AllArgsConstructor
@NoArgsConstructor
@Setter
public class CustomMongoItemReader<T> extends MongoItemReader<T> {

    private MongoOperations template;
    private Query query;
    private String queryString;
    private Class<? extends T> type;
    private Sort sort;
    private String hint;
    private String fields;
    private String collection;
    private List<Object> parameterValues = new ArrayList();

    @Override
    protected Iterator<T> doPageRead() {
        PageRequest pageRequest;
        if (this.queryString != null) {
            pageRequest = PageRequest.of(this.page, this.pageSize, this.sort);
            String populatedQuery = this.replacePlaceholders(this.queryString, this.parameterValues);
            BasicQuery mongoQuery;
            if (StringUtils.hasText(this.fields)) {
                mongoQuery = new BasicQuery(populatedQuery, this.fields);
            } else {
                mongoQuery = new BasicQuery(populatedQuery);
            }

            mongoQuery.with(pageRequest);
            if (StringUtils.hasText(this.hint)) {
                mongoQuery.withHint(this.hint);
            }

            return StringUtils.hasText(this.collection) ?
                    // Changing from `find` to `stream`
                    (Iterator<T>) this.template.stream(mongoQuery, this.type, this.collection).iterator() :
                    (Iterator<T>) this.template.stream(mongoQuery, this.type).iterator();
        } else {
            pageRequest = PageRequest.of(this.page, this.pageSize);
            this.query.with(pageRequest);
            return StringUtils.hasText(this.collection) ?
                    // Changing from `find` to `stream`
                    (Iterator<T>) this.template.stream(this.query, this.type, this.collection).iterator() :
                    (Iterator<T>) this.template.stream(this.query, this.type).iterator();
        }
    }

    private String replacePlaceholders(String input, List<Object> values) {
        ParameterBindingJsonReader reader = new ParameterBindingJsonReader(input, values.toArray());
        DecoderContext decoderContext = DecoderContext.builder().build();
        Document document = (new ParameterBindingDocumentCodec()).decode(reader, decoderContext);
        return document.toJson();
    }

}

I was wondering if its actually better to utilize stream instead as find prevents the iterator from iterating if a
document from the database does not conform to the class type provided.

Environment

  • JDK 17
  • spring batch 5.0.3

Expected behavior

If theres non conforming data from the database to the type specified in MongoItemReader, it should be able to move on
to the next item in the iterator.

Example Repository

https://github.com/ZJTAN97/spring-batch-mongo-item-reader-issue/tree/main

Thank you for reading the issue!

@ZJTAN97 ZJTAN97 added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Nov 30, 2023
@ZJTAN97 ZJTAN97 changed the title MongoItemReader mongoTemplate used in doPageRead to utilize stream method instead of find MongoItemReader mongoOperation used in doPageRead to utilize stream method instead of find Dec 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage Issues that we did not analyse yet type: bug
Projects
None yet
Development

No branches or pull requests

1 participant