Skip to content

Commit

Permalink
Optimise View.asList() side inputs for iterating rather than for inde…
Browse files Browse the repository at this point in the history
…xing.

The current implementation is, essentially, a distributed hashmap from
integer keys to the list contents, mediated by each upstream worker starting
at a random value to minimize overlaps and emitting sufficient metadata to map
this onto the contiguous range [0, N). This provides optimal *random-access*
performance, but very poor *iteration* performance (essentially having to do
a key lookup for every advance, and as the keys are hashed and distributed
rather than clustered numerically, there is little to no amortiziation in these
lookups for adjacent items.

Given that most uses for List side inpupts are merely to gather a collection
of values (the user has no control over the ordering when materialized) and
the high costs of providing random access, this is probably the wrong tradeoff
for most pipelines.

This is an update-incompatable change and so has been guarded by the
update compatibility version flag. The old behavior can be explicilty
asked for via a new AsList#withRandomAccess() method.
  • Loading branch information
robertwb committed Apr 23, 2024
1 parent 1a5dc1c commit d0a859f
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.57.0] - Unreleased

* Java's View.asList() side inputs are now optimized for iterating rather than indexing.
This new implementation still supports all (immutable) List methods as before,
but some of the random access methods like get() and size() will be slower.
To use the old implementation one can use View.asList().withRandomAccess().

# [2.56.0] - Unreleased

## Highlights
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.beam.sdk.options;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Options used to configure streaming. */
Expand All @@ -41,4 +45,15 @@ public interface StreamingOptions extends ApplicationNameOptions, PipelineOption
String getUpdateCompatibilityVersion();

void setUpdateCompatibilityVersion(@Nullable String updateCompatibilityVersion);

default boolean updateCompatibilityVersionLessThan(String version) {
if (getUpdateCompatibilityVersion() == null) {
return false;
}
List<String> requestedVersion = Arrays.asList(getUpdateCompatibilityVersion().split("\\."));
List<String> targetVersion = Arrays.asList(version.split("\\."));
return Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
Expand Down Expand Up @@ -173,7 +174,7 @@ public static <T> AsSingleton<T> asSingleton() {
* <p>Some runners may require that the view fits in memory.
*/
public static <T> AsList<T> asList() {
return new AsList<>();
return new AsList<>(null);
}

/**
Expand Down Expand Up @@ -235,7 +236,33 @@ public static <K, V> AsMultimap<K, V> asMultimap() {
*/
@Internal
public static class AsList<T> extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
private AsList() {}
private final @Nullable Boolean withRandomAccess;

private AsList(@Nullable Boolean withRandomAccess) {
this.withRandomAccess = withRandomAccess;
}

/**
* Returns a PCollection view like this one, but whose resulting list will have RandomAccess
* (aka fast indexing).
*
* <p>A veiw with random access will be much more expensive to compute and iterate over, but
* will have a faster get() method.
*/
public AsList<T> withRandomAccess() {
return withRandomAccess(true);
}

/**
* Returns a PCollection view like this one, but whose resulting list will have RandomAccess
* (aka fast indexing) according to the input parameter.
*
* <p>A veiw with random access will be much more expensive to compute and iterate over, but
* will have a faster get() method.
*/
public AsList<T> withRandomAccess(boolean withRandomAccess) {
return new AsList<>(withRandomAccess);
}

@Override
public PCollectionView<List<T>> expand(PCollection<T> input) {
Expand All @@ -244,7 +271,32 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
} catch (IllegalStateException e) {
throw new IllegalStateException("Unable to create a side-input view from input", e);
}
boolean explicitWithRandomAccess =
withRandomAccess != null
? withRandomAccess
: input
.getPipeline()
.getOptions()
.as(StreamingOptions.class)
.updateCompatibilityVersionLessThan("2.57.0");
if (explicitWithRandomAccess) {
return expandWithRandomAccess(input);
} else {
return expandWithoutRandomAccess(input);
}
}

private PCollectionView<List<T>> expandWithoutRandomAccess(PCollection<T> input) {
PCollectionView<List<T>> view =
PCollectionViews.listView(
input,
(TypeDescriptorSupplier<T>) input.getCoder()::getEncodedTypeDescriptor,
input.getWindowingStrategy());
input.apply(CreatePCollectionView.of(view));
return view;
}

private PCollectionView<List<T>> expandWithRandomAccess(PCollection<T> input) {
/**
* The materialized format uses {@link Materializations#MULTIMAP_MATERIALIZATION_URN multimap}
* access pattern where the key is a position and the index of the value in the iterable is a
Expand All @@ -266,7 +318,7 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
BigEndianLongCoder.of(),
ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
PCollectionView<List<T>> view =
PCollectionViews.listView(
PCollectionViews.listViewWithRandomAccess(
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.facebook.presto.hadoop.$internal.com.google.common.collect.Iterators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -28,6 +29,7 @@
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -58,6 +60,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
Expand Down Expand Up @@ -173,6 +176,38 @@ public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterable
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<T> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
new ListViewFn3<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}

/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
PCollection<T> pCollection,
TupleTag<Materializations.IterableView<T>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
return new SimplePCollectionView<>(
pCollection,
tag,
new ListViewFn3<>(typeDescriptorSupplier),
windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
windowingStrategy);
}

/**
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewWithRandomAccess(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
WindowingStrategy<?, W> windowingStrategy) {
Expand All @@ -187,7 +222,7 @@ public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
* Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
* provided {@link WindowingStrategy}.
*/
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewWithRandomAccess(
PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
TupleTag<Materializations.MultimapView<Long, ValueOrMetadata<T, OffsetRange>>> tag,
TypeDescriptorSupplier<T> typeDescriptorSupplier,
Expand Down Expand Up @@ -920,6 +955,178 @@ public void verifyDeterministic() throws NonDeterministicException {
}
}

/**
* Implementation which is able to adapt an iterable materialization to a {@code List<T>}.
*
* <p>For internal use only.
*/
@Deprecated
public static class ListViewFn3<T> extends ViewFn<IterableView<T>, List<T>> {
private TypeDescriptorSupplier<T> typeDescriptorSupplier;

public ListViewFn3(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
this.typeDescriptorSupplier = typeDescriptorSupplier;
}

@Override
public Materialization<IterableView<T>> getMaterialization() {
return Materializations.iterable();
}

@Override
public List<T> apply(IterableView<T> primitiveViewT) {
Supplier<Integer> size = Suppliers.memoize(() -> Iterables.size(primitiveViewT.get()));

return new List<T>() {
@Override
public int size() {
return size.get();
}

@Override
public boolean isEmpty() {
return Iterables.isEmpty(primitiveViewT.get());
}

@Override
public boolean contains(Object o) {
return Iterables.contains(primitiveViewT.get(), o);
}

@Override
public Iterator<T> iterator() {
return primitiveViewT.get().iterator();
}

@Override
public T get(int index) {
return Iterables.get(primitiveViewT.get(), index);
}

@Override
public Object[] toArray() {
return Iterables.toArray(primitiveViewT.get(), Object.class);
}

@Override
public <T1> T1[] toArray(T1[] a) {
return Iterables.toArray(
(Iterable<T1>) primitiveViewT.get(), (Class<T1>) a.getClass().getComponentType());
}

@Override
public boolean containsAll(Collection<?> c) {
for (Object o : c) {
if (!contains(o)) {
return false;
}
}
return true;
}

@Override
public int indexOf(Object o) {
return Iterables.indexOf(primitiveViewT.get(), v -> Objects.equals(v, o));
}

@Override
public int lastIndexOf(Object o) {
return ImmutableList.copyOf(primitiveViewT.get()).lastIndexOf(o);
}

@Override
public List<T> subList(int fromIndex, int toIndex) {
Iterator<T> iterator = primitiveViewT.get().iterator();
if (Iterators.skip(iterator, fromIndex) != fromIndex) {
throw new IndexOutOfBoundsException();
}
List<T> subList = ImmutableList.copyOf(Iterators.limit(iterator, toIndex - fromIndex));
if (subList.size() != toIndex - fromIndex) {
throw new IndexOutOfBoundsException();
}
return subList;
}

@Override
public ListIterator<T> listIterator() {
return ImmutableList.copyOf(primitiveViewT.get()).listIterator();
}

@Override
public ListIterator<T> listIterator(int index) {
return ImmutableList.copyOf(primitiveViewT.get()).listIterator(index);
}

// Unimplemented mutable list methods.

@Override
public boolean add(T t) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean addAll(int index, Collection<? extends T> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}

@Override
public void clear() {
throw new UnsupportedOperationException();
}

@Override
public T set(int index, T element) {
throw new UnsupportedOperationException();
}

@Override
public void add(int index, T element) {
throw new UnsupportedOperationException();
}

@Override
public T remove(int index) {
throw new UnsupportedOperationException();
}
};
}

@Override
public TypeDescriptor<List<T>> getTypeDescriptor() {
return TypeDescriptors.lists(typeDescriptorSupplier.get());
}

@Override
public boolean equals(@Nullable Object other) {
return other instanceof ListViewFn;
}

@Override
public int hashCode() {
return ListViewFn.class.hashCode();
}
}

/**
* Implementation which is able to adapt a multimap materialization to a {@code List<T>}.
*
Expand Down

0 comments on commit d0a859f

Please sign in to comment.