Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: apache_beam.transforms.util.BatchElements allows creating batches that exceed max_batch_size #29350

Closed
1 of 16 tasks
jmdobry opened this issue Nov 8, 2023 · 3 comments · Fixed by #31228
Closed
1 of 16 tasks

Comments

@jmdobry
Copy link
Contributor

jmdobry commented Nov 8, 2023

What happened?

When using BatchElements, one can almost precisely control the size of each batch by setting min_batch_size and max_batch_size to the same value and by also providing element_size_fn. It almost works.

However, the process function of _GlobalWindowsBatchingDoFn blindly adds elements to a batch before checking to see if adding that element would exceed max_batch_size. This can be fixed by changing process to only add an element to a batch if A) the batch is empty or B) adding the element will not exceed max_batch_size. There are cases where exceeding max_batch_size in any way is unacceptable (example, APIs that have a max request payload size).

Example fixed implementation:

if self._running_batch_size and (
    (self._running_batch_size + self._element_size_fn(element))
    >= self._target_batch_size
):
  with self._batch_size_estimator.record_time(self._running_batch_size):
    yield window.GlobalWindows.windowed_value_at_end_of_window(self._batch)
  self._batch = []
  self._running_batch_size = 0
  self._target_batch_size = self._batch_size_estimator.next_batch_size()
# End if

self._batch.append(element)
self._running_batch_size += self._element_size_fn(element)

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@AnandInguva
Copy link
Contributor

cc: @jrmccluskey who has some context on BatchElements

@jrmccluskey
Copy link
Contributor

jrmccluskey commented Nov 9, 2023

The proposed change LGTM, feel free to open a PR. This pattern is also used in the windowed equivalent and can be updated there too

@tvalentyn
Copy link
Contributor

hi @jmdobry ! Thanks for reporting. Would you be interested in contributing a fix (and ideally a test) to address this ? Let me know if you have any questions. FYI we also have https://s.apache.org/beam-python-dev-wiki for development tips. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants