Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise View.asList() side inputs for iterating rather than for indexing. #31087

Merged
merged 9 commits into from
Apr 30, 2024
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Java's View.asList() side inputs are now optimized for iterating rather than indexing.
This new implementation still supports all (immutable) List methods as before,
but some of the random access methods like get() and size() will be slower.
To use the old implementation one can use View.asList().withRandomAccess().

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.beam.sdk.options;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Options used to configure streaming. */
Expand All @@ -41,4 +45,20 @@ public interface StreamingOptions extends ApplicationNameOptions, PipelineOption
String getUpdateCompatibilityVersion();

void setUpdateCompatibilityVersion(@Nullable String updateCompatibilityVersion);

static boolean updateCompatibilityVersionLessThan(PipelineOptions options, String version) {
if (options == null) {
return false;
}
String updateCompatibilityVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
if (updateCompatibilityVersion == null) {
return false;
}
List<String> requestedVersion = Arrays.asList(updateCompatibilityVersion.split("\\."));
List<String> targetVersion = Arrays.asList(version.split("\\."));
return Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
Expand Down Expand Up @@ -173,7 +174,7 @@ public static <T> AsSingleton<T> asSingleton() {
* <p>Some runners may require that the view fits in memory.
*/
public static <T> AsList<T> asList() {
return new AsList<>();
return new AsList<>(null);
}

/**
Expand Down Expand Up @@ -235,7 +236,33 @@ public static <K, V> AsMultimap<K, V> asMultimap() {
*/
@Internal
public static class AsList<T> extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
private AsList() {}
private final @Nullable Boolean withRandomAccess;

private AsList(@Nullable Boolean withRandomAccess) {
this.withRandomAccess = withRandomAccess;
}

/**
* Returns a PCollection view like this one, but whose resulting list will have RandomAccess
* (aka fast indexing).
*
* <p>A veiw with random access will be much more expensive to compute and iterate over, but
* will have a faster get() method.
*/
public AsList<T> withRandomAccess() {
return withRandomAccess(true);
}

/**
* Returns a PCollection view like this one, but whose resulting list will have RandomAccess
* (aka fast indexing) according to the input parameter.
*
* <p>A veiw with random access will be much more expensive to compute and iterate over, but
* will have a faster get() method.
*/
public AsList<T> withRandomAccess(boolean withRandomAccess) {
return new AsList<>(withRandomAccess);
}

@Override
public PCollectionView<List<T>> expand(PCollection<T> input) {
Expand All @@ -244,7 +271,29 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
} catch (IllegalStateException e) {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
boolean explicitWithRandomAccess =
withRandomAccess != null
? withRandomAccess
: StreamingOptions.updateCompatibilityVersionLessThan(
input.getPipeline().getOptions(), "2.57.0");
if (explicitWithRandomAccess) {
return expandWithRandomAccess(input);
} else {
return expandWithoutRandomAccess(input);
}
}

private PCollectionView<List<T>> expandWithoutRandomAccess(PCollection<T> input) {
PCollectionView<List<T>> view =
PCollectionViews.listView(
input,
(TypeDescriptorSupplier<T>) input.getCoder()::getEncodedTypeDescriptor,
input.getWindowingStrategy());
input.apply(CreatePCollectionView.of(view));
return view;
}

private PCollectionView<List<T>> expandWithRandomAccess(PCollection<T> input) {
/**
* The materialized format uses {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap}
* access pattern where the key is a position and the index of the value in the iterable is a
Expand All @@ -266,7 +315,7 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
BigEndianLongCoder.of(),
ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
PCollectionView<List<T>> view =
PCollectionViews.listView(
PCollectionViews.listViewWithRandomAccess(
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -65,9 +66,11 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSortedMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Ints;
Expand Down Expand Up @@ -175,6 +178,38 @@ public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterable
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<T> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new ListViewFn3<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}

/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<T> pCollection,
TupleTag<Materializations.IterableView<T>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
tag,
new ListViewFn3<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}

/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewWithRandomAccess(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
Expand All @@ -189,7 +224,7 @@ public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewWithRandomAccess(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TupleTag<Materializations.MultimapView<Long, ValueOrMetadata<T, OffsetRange>>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
Expand Down Expand Up @@ -960,6 +995,179 @@ public void verifyDeterministic() throws NonDeterministicException {
}
}

/**
* Implementation which is able to adapt an iterable materialization to a {@code List<T>}.
*
* <p>Unlike ListViewFn2, this implementation is optimized for iteration rather than indexing.
*
* <p>For internal use only.
*/
public static class ListViewFn3<T> extends ViewFn<IterableView<T>, List<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that these are just named 1, 2, 3...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is equivalent to View.asIterable() plus implementing the List methods so nothing breaks. Good question about the Window -> Iterable map; this is handled at a lower level, but I don't know all the details there (though in that case I can see that constructing the mapping would be more worthwhile). In the interest of being conservative while capturing the most important gains I'll restrict this to the global window case.

private TypeDescriptorSupplier<T> typeDescriptorSupplier;

public ListViewFn3(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.typeDescriptorSupplier = typeDescriptorSupplier;
}

@Override
public Materialization<IterableView<T>> getMaterialization() {
return Materializations.iterable();
}

@Override
public List<T> apply(IterableView<T> primitiveViewT) {
Supplier<Integer> size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));

return new List<T>() {
@Override
public int size() {
return size.get();
}

@Override
public boolean isEmpty() {
return Iterables.isEmpty(primitiveViewT.get());
}

@Override
public boolean contains(Object o) {
return Iterables.contains(primitiveViewT.get(), o);
}

@Override
public Iterator<T> iterator() {
return primitiveViewT.get().iterator();
}

@Override
public T get(int index) {
return Iterables.get(primitiveViewT.get(), index);
}

@Override
public Object[] toArray() {
return Iterables.toArray(primitiveViewT.get(), Object.class);
}

@Override
public <T1> T1[] toArray(T1[] a) {
return Iterables.toArray(
(Iterable<T1>) primitiveViewT.get(), (Class<T1>) a.getClass().getComponentType());
}

@Override
public boolean containsAll(Collection<?> c) {
for (Object o : c) {
if (!contains(o)) {
return false;
}
}
return true;
}

@Override
public int indexOf(Object o) {
return Iterables.indexOf(primitiveViewT.get(), v -> Objects.equals(v, o));
}

@Override
public int lastIndexOf(Object o) {
return ImmutableList.copyOf(primitiveViewT.get()).lastIndexOf(o);
}

@Override
public List<T> subList(int fromIndex, int toIndex) {
Iterator<T> iterator = primitiveViewT.get().iterator();
if (Iterators.advance(iterator, fromIndex) != fromIndex) {
throw new IndexOutOfBoundsException();
}
List<T> subList = ImmutableList.copyOf(Iterators.limit(iterator, toIndex - fromIndex));
if (subList.size() != toIndex - fromIndex) {
throw new IndexOutOfBoundsException();
}
return subList;
}

@Override
public ListIterator<T> listIterator() {
return ImmutableList.copyOf(primitiveViewT.get()).listIterator();
}

@Override
public ListIterator<T> listIterator(int index) {
return ImmutableList.copyOf(primitiveViewT.get()).listIterator(index);
}

// Unimplemented mutable list methods.

@Override
public boolean add(T t) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(int index, Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}

@Override
public T set(int index, T element) {
throw new UnsupportedOperationException();
}

@Override
public void add(int index, T element) {
throw new UnsupportedOperationException();
}

@Override
public T remove(int index) {
throw new UnsupportedOperationException();
}
};
}

@Override
public TypeDescriptor<List<T>> getTypeDescriptor() {
return TypeDescriptors.lists(typeDescriptorSupplier.get());
}

@Override
public boolean equals(@Nullable Object other) {
return other instanceof ListViewFn;
}

@Override
public int hashCode() {
return ListViewFn.class.hashCode();
}
}

/**
* Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
*
Expand Down