Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise View.asList() side inputs for iterating rather than for indexing. #31087

Merged
merged 9 commits into from
Apr 30, 2024
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Java's View.asList() side inputs are now optimized for iterating rather than
indexing when in the global window.
This new implementation still supports all (immutable) List methods as before,
but some of the random access methods like get() and size() will be slower.
To use the old implementation one can use View.asList().withRandomAccess().

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.beam.sdk.options;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Options used to configure streaming. */
Expand All @@ -41,4 +45,20 @@ public interface StreamingOptions extends ApplicationNameOptions, PipelineOption
String getUpdateCompatibilityVersion();

void setUpdateCompatibilityVersion(@Nullable String updateCompatibilityVersion);

static boolean updateCompatibilityVersionLessThan(PipelineOptions options, String version) {
if (options == null) {
return false;
}
String updateCompatibilityVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
if (updateCompatibilityVersion == null) {
return false;
}
List<String> requestedVersion = Arrays.asList(updateCompatibilityVersion.split("\\."));
List<String> targetVersion = Arrays.asList(version.split("\\."));
return Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -173,7 +175,7 @@ public static <T> AsSingleton<T> asSingleton() {
* <p>Some runners may require that the view fits in memory.
*/
public static <T> AsList<T> asList() {
return new AsList<>();
return new AsList<>(null);
}

/**
Expand Down Expand Up @@ -235,7 +237,33 @@ public static <K, V> AsMultimap<K, V> asMultimap() {
*/
@Internal
public static class AsList<T> extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
private AsList() {}
private final @Nullable Boolean withRandomAccess;

private AsList(@Nullable Boolean withRandomAccess) {
this.withRandomAccess = withRandomAccess;
}

/**
* Returns a PCollection view like this one, but whose resulting list will have RandomAccess
* (aka fast indexing).
*
* <p>A veiw with random access will be much more expensive to compute and iterate over, but
* will have a faster get() method.
*/
public AsList<T> withRandomAccess() {
return withRandomAccess(true);
}

/**
* Returns a PCollection view like this one, but whose resulting list will have RandomAccess
* (aka fast indexing) according to the input parameter.
*
* <p>A veiw with random access will be much more expensive to compute and iterate over, but
* will have a faster get() method.
*/
public AsList<T> withRandomAccess(boolean withRandomAccess) {
return new AsList<>(withRandomAccess);
}

@Override
public PCollectionView<List<T>> expand(PCollection<T> input) {
Expand All @@ -244,7 +272,30 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
} catch (IllegalStateException e) {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
boolean explicitWithRandomAccess =
withRandomAccess != null
? withRandomAccess
: StreamingOptions.updateCompatibilityVersionLessThan(
input.getPipeline().getOptions(), "2.57.0");
if (explicitWithRandomAccess
|| !(input.getWindowingStrategy().getWindowFn() instanceof GlobalWindows)) {
return expandWithRandomAccess(input);
} else {
return expandWithoutRandomAccess(input);
}
}

private PCollectionView<List<T>> expandWithoutRandomAccess(PCollection<T> input) {
PCollectionView<List<T>> view =
PCollectionViews.listView(
input,
(TypeDescriptorSupplier<T>) input.getCoder()::getEncodedTypeDescriptor,
input.getWindowingStrategy());
input.apply(CreatePCollectionView.of(view));
return view;
}

private PCollectionView<List<T>> expandWithRandomAccess(PCollection<T> input) {
/**
* The materialized format uses {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap}
* access pattern where the key is a position and the index of the value in the iterable is a
Expand All @@ -266,7 +317,7 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
BigEndianLongCoder.of(),
ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
PCollectionView<List<T>> view =
PCollectionViews.listView(
PCollectionViews.listViewWithRandomAccess(
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -65,9 +66,11 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSortedMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Ints;
Expand Down Expand Up @@ -175,6 +178,38 @@ public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterable
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<T> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new IterableBackedListViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}

/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<T> pCollection,
TupleTag<Materializations.IterableView<T>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
tag,
new IterableBackedListViewFn<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}

/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewWithRandomAccess(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
Expand All @@ -189,7 +224,7 @@ public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewWithRandomAccess(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TupleTag<Materializations.MultimapView<Long, ValueOrMetadata<T, OffsetRange>>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
Expand Down Expand Up @@ -960,6 +995,179 @@ public void verifyDeterministic() throws NonDeterministicException {
}
}

/**
* Implementation which is able to adapt an iterable materialization to a {@code List<T>}.
*
* <p>Unlike ListViewFn2, this implementation is optimized for iteration rather than indexing.
*
* <p>For internal use only.
*/
public static class IterableBackedListViewFn<T> extends ViewFn<IterableView<T>, List<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;

public IterableBackedListViewFn(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.typeDescriptorSupplier = typeDescriptorSupplier;
}

@Override
public Materialization<IterableView<T>> getMaterialization() {
return Materializations.iterable();
}

@Override
public List<T> apply(IterableView<T> primitiveViewT) {
Supplier<Integer> size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));

return new List<T>() {
@Override
public int size() {
return size.get();
}

@Override
public boolean isEmpty() {
return Iterables.isEmpty(primitiveViewT.get());
}

@Override
public boolean contains(Object o) {
return Iterables.contains(primitiveViewT.get(), o);
}

@Override
public Iterator<T> iterator() {
return primitiveViewT.get().iterator();
}

@Override
public T get(int index) {
return Iterables.get(primitiveViewT.get(), index);
}

@Override
public Object[] toArray() {
return Iterables.toArray(primitiveViewT.get(), Object.class);
}

@Override
public <T1> T1[] toArray(T1[] a) {
return Iterables.toArray(
(Iterable<T1>) primitiveViewT.get(), (Class<T1>) a.getClass().getComponentType());
}

@Override
public boolean containsAll(Collection<?> c) {
for (Object o : c) {
if (!contains(o)) {
return false;
}
}
return true;
}

@Override
public int indexOf(Object o) {
return Iterables.indexOf(primitiveViewT.get(), v -> Objects.equals(v, o));
}

@Override
public int lastIndexOf(Object o) {
return ImmutableList.copyOf(primitiveViewT.get()).lastIndexOf(o);
}

@Override
public List<T> subList(int fromIndex, int toIndex) {
Iterator<T> iterator = primitiveViewT.get().iterator();
if (Iterators.advance(iterator, fromIndex) != fromIndex) {
throw new IndexOutOfBoundsException();
}
List<T> subList = ImmutableList.copyOf(Iterators.limit(iterator, toIndex - fromIndex));
if (subList.size() != toIndex - fromIndex) {
throw new IndexOutOfBoundsException();
}
return subList;
}

@Override
public ListIterator<T> listIterator() {
return ImmutableList.copyOf(primitiveViewT.get()).listIterator();
}

@Override
public ListIterator<T> listIterator(int index) {
return ImmutableList.copyOf(primitiveViewT.get()).listIterator(index);
}

// Unimplemented mutable list methods.

@Override
public boolean add(T t) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(int index, Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}

@Override
public T set(int index, T element) {
throw new UnsupportedOperationException();
}

@Override
public void add(int index, T element) {
throw new UnsupportedOperationException();
}

@Override
public T remove(int index) {
throw new UnsupportedOperationException();
}
};
}

@Override
public TypeDescriptor<List<T>> getTypeDescriptor() {
return TypeDescriptors.lists(typeDescriptorSupplier.get());
}

@Override
public boolean equals(@Nullable Object other) {
return other instanceof IterableBackedListViewFn;
}

@Override
public int hashCode() {
return ListViewFn.class.hashCode();
}
}

/**
* Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
*
Expand Down