Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Considering Individual Element Weight in Determining Buffer Boundary instead of Element Counts #3786

Open
moksie opened this issue Apr 17, 2024 · 0 comments
Labels
type/enhancement A general enhancement

Comments

@moksie
Copy link

moksie commented Apr 17, 2024

Motivation

I have a use case where I need to group and accumulate incoming objects by keys obtained from the objects until the size of the accumulation in bytes reaches a predefined threshold. If adding an object exceeds the threshold it should cut off and add the new object to a new batch, and emits the old batch. Along with this size-based boundary condition, it should also consider a timeout. When the timeout elapses, it should emit the batch and start accumulating in a new one.

Here is a simplified workflow that I would like to implement -

    class BatchProvider {
	Bridge bridge;
        Consumer<Collection<String>> consumer;

        public BatchProvider(Consumer<Collection<String>> consumer) {
             this.consumer = consumer;
        }

	public void start() {
		initializePipeline();
	}

	public void batch(String key, String value) {
		bridge.emit(new Pair(key, value));
	}

	public void end() {
		bridge.complete();
	}
	
	private void initializePipeline() {
		bridge  = new Bridge<>();
                 Function<String, Integer> weightProvider = String::length;
  		Flux.<Pair>create(sink -> bridge.sink = sink)
				.groupBy(Pair::key, Pair::value)
				.flatMap(gfp -> gfp.weightedBufferTimeout(1 << 10, Duration.ofSeconds(1L), weightProvider)
						.doOnNext(c -> consumer.accept(c)))
				.subscribe();
	}

	private record Pair(String key, String value){};

	private static class Bridge<T> {
		private FluxSink<T> sink;

		void emit(T value) {sink.next(value);}
		void error(Throwable error) {sink.error(error);}
		void complete() {sink.complete();}
	}
    }

Desired solution

This would be achieved if the buffer size were generalized to consider the weight of the individual elements. By default, the weight of the elements would be 1, which would give the same effect as counting the elements as buffer size. Either bufferTimeout could be modified to generalize the boundary computation or a new function could be introduced as used in the example above.

Alternatively, bufferUntil could be modified to support timeouts.

Considered alternatives

  • bufferTimeout did not work as it does not support the dynamic buffer size determination.
  • bufferTimeout with the hack mentioned in here did not work. Could not achieve the strict size limit (as much as possible less than equal to limit).
  • bufferUntil did not work as it does not support timeouts. I could achieve timeout, but the hack would be too ugly.

Instead of implementing those ugly hacks, extending Flux would be much elegant.

@moksie moksie added the type/enhancement A general enhancement label Apr 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

1 participant