Skip to content

Commit

Permalink
Optimise View.asList() side inputs for iterating rather than for inde…
Browse files Browse the repository at this point in the history
…xing. (#31087)

The current implementation is, essentially, a distributed hashmap from
integer keys to the list contents, mediated by each upstream worker starting
at a random value to minimize overlaps and emitting sufficient metadata to map
this onto the contiguous range [0, N). This provides optimal *random-access*
performance, but very poor *iteration* performance (essentially having to do
a key lookup for every advance, and as the keys are hashed and distributed
rather than clustered numerically, there is little to no amortization in these
lookups for adjacent items.

Given that most uses for List side inputs are merely to gather a collection
of values (the user has no control over the ordering when materialized) and
the high costs of providing random access, this is probably the wrong tradeoff
for most pipelines.

This is an update-incompatible change and so has been guarded by the
update compatibility version flag. The old behavior can be explicitly
asked for via a new AsList#withRandomAccess() method.
  • Loading branch information
robertwb committed Apr 30, 2024
1 parent 4a134b5 commit 7f7bc3e
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Java's View.asList() side inputs are now optimized for iterating rather than
indexing when in the global window.
This new implementation still supports all (immutable) List methods as before,
but some of the random access methods like get() and size() will be slower.
To use the old implementation one can use View.asList().withRandomAccess().

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.beam.sdk.options;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Options used to configure streaming. */
Expand All @@ -41,4 +45,20 @@ public interface StreamingOptions extends ApplicationNameOptions, PipelineOption
String getUpdateCompatibilityVersion();

void setUpdateCompatibilityVersion(@Nullable String updateCompatibilityVersion);

static boolean updateCompatibilityVersionLessThan(PipelineOptions options, String version) {
if (options == null) {
return false;
}
String updateCompatibilityVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
if (updateCompatibilityVersion == null) {
return false;
}
List<String> requestedVersion = Arrays.asList(updateCompatibilityVersion.split("\\."));
List<String> targetVersion = Arrays.asList(version.split("\\."));
return Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -173,7 +175,7 @@ public static <T> AsSingleton<T> asSingleton() {
* <p>Some runners may require that the view fits in memory.
*/
public static <T> AsList<T> asList() {
return new AsList<>();
return new AsList<>(null);
}

/**
Expand Down Expand Up @@ -235,7 +237,33 @@ public static <K, V> AsMultimap<K, V> asMultimap() {
*/
@Internal
public static class AsList<T> extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
private AsList() {}
private final @Nullable Boolean withRandomAccess;

private AsList(@Nullable Boolean withRandomAccess) {
this.withRandomAccess = withRandomAccess;
}

/**
* Returns a PCollection view like this one, but whose resulting list will have RandomAccess
* (aka fast indexing).
*
* <p>A veiw with random access will be much more expensive to compute and iterate over, but
* will have a faster get() method.
*/
public AsList<T> withRandomAccess() {
return withRandomAccess(true);
}

/**
* Returns a PCollection view like this one, but whose resulting list will have RandomAccess
* (aka fast indexing) according to the input parameter.
*
* <p>A veiw with random access will be much more expensive to compute and iterate over, but
* will have a faster get() method.
*/
public AsList<T> withRandomAccess(boolean withRandomAccess) {
return new AsList<>(withRandomAccess);
}

@Override
public PCollectionView<List<T>> expand(PCollection<T> input) {
Expand All @@ -244,7 +272,30 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
} catch (IllegalStateException e) {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
boolean explicitWithRandomAccess =
withRandomAccess != null
? withRandomAccess
: StreamingOptions.updateCompatibilityVersionLessThan(
input.getPipeline().getOptions(), "2.57.0");
if (explicitWithRandomAccess
|| !(input.getWindowingStrategy().getWindowFn() instanceof GlobalWindows)) {
return expandWithRandomAccess(input);
} else {
return expandWithoutRandomAccess(input);
}
}

private PCollectionView<List<T>> expandWithoutRandomAccess(PCollection<T> input) {
PCollectionView<List<T>> view =
PCollectionViews.listView(
input,
(TypeDescriptorSupplier<T>) input.getCoder()::getEncodedTypeDescriptor,
input.getWindowingStrategy());
input.apply(CreatePCollectionView.of(view));
return view;
}

private PCollectionView<List<T>> expandWithRandomAccess(PCollection<T> input) {
/**
* The materialized format uses {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap}
* access pattern where the key is a position and the index of the value in the iterable is a
Expand All @@ -266,7 +317,7 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
BigEndianLongCoder.of(),
ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
PCollectionView<List<T>> view =
PCollectionViews.listView(
PCollectionViews.listViewWithRandomAccess(
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -65,9 +66,11 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSortedMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Ints;
Expand Down Expand Up @@ -175,6 +178,38 @@ public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterable
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<T> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new IterableBackedListViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}

/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<T> pCollection,
TupleTag<Materializations.IterableView<T>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
tag,
new IterableBackedListViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}

/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewWithRandomAccess(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
Expand All @@ -189,7 +224,7 @@ public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewWithRandomAccess(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TupleTag<Materializations.MultimapView<Long, ValueOrMetadata<T, OffsetRange>>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
Expand Down Expand Up @@ -960,6 +995,179 @@ public void verifyDeterministic() throws NonDeterministicException {
}
}

/**
* Implementation which is able to adapt an iterable materialization to a {@code List<T>}.
*
* <p>Unlike ListViewFn2, this implementation is optimized for iteration rather than indexing.
*
* <p>For internal use only.
*/
public static class IterableBackedListViewFn<T> extends ViewFn<IterableView<T>, List<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;

public IterableBackedListViewFn(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.typeDescriptorSupplier = typeDescriptorSupplier;
}

@Override
public Materialization<IterableView<T>> getMaterialization() {
return Materializations.iterable();
}

@Override
public List<T> apply(IterableView<T> primitiveViewT) {
Supplier<Integer> size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));

return new List<T>() {
@Override
public int size() {
return size.get();
}

@Override
public boolean isEmpty() {
return Iterables.isEmpty(primitiveViewT.get());
}

@Override
public boolean contains(Object o) {
return Iterables.contains(primitiveViewT.get(), o);
}

@Override
public Iterator<T> iterator() {
return primitiveViewT.get().iterator();
}

@Override
public T get(int index) {
return Iterables.get(primitiveViewT.get(), index);
}

@Override
public Object[] toArray() {
return Iterables.toArray(primitiveViewT.get(), Object.class);
}

@Override
public <T1> T1[] toArray(T1[] a) {
return Iterables.toArray(
(Iterable<T1>) primitiveViewT.get(), (Class<T1>) a.getClass().getComponentType());
}

@Override
public boolean containsAll(Collection<?> c) {
for (Object o : c) {
if (!contains(o)) {
return false;
}
}
return true;
}

@Override
public int indexOf(Object o) {
return Iterables.indexOf(primitiveViewT.get(), v -> Objects.equals(v, o));
}

@Override
public int lastIndexOf(Object o) {
return ImmutableList.copyOf(primitiveViewT.get()).lastIndexOf(o);
}

@Override
public List<T> subList(int fromIndex, int toIndex) {
Iterator<T> iterator = primitiveViewT.get().iterator();
if (Iterators.advance(iterator, fromIndex) != fromIndex) {
throw new IndexOutOfBoundsException();
}
List<T> subList = ImmutableList.copyOf(Iterators.limit(iterator, toIndex - fromIndex));
if (subList.size() != toIndex - fromIndex) {
throw new IndexOutOfBoundsException();
}
return subList;
}

@Override
public ListIterator<T> listIterator() {
return ImmutableList.copyOf(primitiveViewT.get()).listIterator();
}

@Override
public ListIterator<T> listIterator(int index) {
return ImmutableList.copyOf(primitiveViewT.get()).listIterator(index);
}

// Unimplemented mutable list methods.

@Override
public boolean add(T t) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(int index, Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}

@Override
public T set(int index, T element) {
throw new UnsupportedOperationException();
}

@Override
public void add(int index, T element) {
throw new UnsupportedOperationException();
}

@Override
public T remove(int index) {
throw new UnsupportedOperationException();
}
};
}

@Override
public TypeDescriptor<List<T>> getTypeDescriptor() {
return TypeDescriptors.lists(typeDescriptorSupplier.get());
}

@Override
public boolean equals(@Nullable Object other) {
return other instanceof IterableBackedListViewFn;
}

@Override
public int hashCode() {
return ListViewFn.class.hashCode();
}
}

/**
* Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
*
Expand Down

0 comments on commit 7f7bc3e

Please sign in to comment.