Skip to content

Commit

Permalink
Issue 6009, 6010: Controller support for Stream Tags. (pravega#6059)
Browse files Browse the repository at this point in the history
*  Enable StreamTags on Pravega Streams. Implements tag related GRPC APIs on the controller and ensure these APIs can be invoked by the PravegaClient.

Signed-off-by: Sandeep <sandeep.shridhar@emc.com>
  • Loading branch information
shrids committed Jun 23, 2021
1 parent 67e91f5 commit 6a34e64
Show file tree
Hide file tree
Showing 43 changed files with 1,944 additions and 450 deletions.
11 changes: 11 additions & 0 deletions client/src/main/java/io/pravega/client/admin/StreamInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.pravega.client.admin;

import com.google.common.annotations.Beta;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import lombok.Data;

Expand Down Expand Up @@ -49,6 +50,16 @@ public class StreamInfo {
*/
private final String streamName;

/**
* Stream Configuration.
* This includes the {@link io.pravega.client.stream.ScalingPolicy}, {@link io.pravega.client.stream.RetentionPolicy}
* and the tags associated with the stream.
*
* @param streamConfiguration Stream configuration.
* @return Stream configuration.
*/
private final StreamConfiguration streamConfiguration;

/**
* {@link StreamCut} representing the current TAIL of the stream.
*
Expand Down
23 changes: 22 additions & 1 deletion client/src/main/java/io/pravega/client/admin/StreamManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;

/**
Expand Down Expand Up @@ -137,6 +138,25 @@ public static StreamManager create(ClientConfig clientConfig) {
*/
Iterator<Stream> listStreams(String scopeName);

/**
* Gets an iterator to list all streams with the provided tag.
*
* @param scopeName The name of the scope for which to list streams in.
* @param tagName The name of the tag.
*
* @return Iterator of Stream to iterator over all streams in scope with the provided tag.
*/
Iterator<Stream> listStreams(String scopeName, String tagName);

/**
* Gets the Tags associated with a stream.
*
* @param scopeName Scope name.
* @param streamName Stream name.
* @return Tags associated with the stream.
*/
Collection<String> getStreamTags(String scopeName, String streamName);

/**
* Checks if a stream exists in scope.
*
Expand Down Expand Up @@ -169,7 +189,8 @@ public static StreamManager create(ClientConfig clientConfig) {

/**
* Get information about a given Stream, {@link StreamInfo}.
* This includes {@link StreamCut}s pointing to the current HEAD and TAIL of the Stream.
* This includes {@link StreamCut}s pointing to the current HEAD and TAIL of the Stream and the current
* {@link StreamConfiguration}
*
* @param scopeName The scope of the stream.
* @param streamName The stream name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.pravega.shared.NameUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -90,6 +91,7 @@ public boolean createStream(String scopeName, String streamName, StreamConfigura
return Futures.getThrowingException(controller.createStream(scopeName, streamName, StreamConfiguration.builder()
.scalingPolicy(config.getScalingPolicy())
.retentionPolicy(config.getRetentionPolicy())
.tags(config.getTags())
.build()));
}

Expand All @@ -98,10 +100,12 @@ public boolean updateStream(String scopeName, String streamName, StreamConfigura
NameUtils.validateUserStreamName(streamName);
NameUtils.validateUserScopeName(scopeName);
log.info("Updating scope/stream: {}/{} with configuration: {}", scopeName, streamName, config);
return Futures.getThrowingException(controller.updateStream(scopeName, streamName, StreamConfiguration.builder()
.scalingPolicy(config.getScalingPolicy())
.retentionPolicy(config.getRetentionPolicy())
.build()));
return Futures.getThrowingException(controller.updateStream(scopeName, streamName,
StreamConfiguration.builder()
.scalingPolicy(config.getScalingPolicy())
.retentionPolicy(config.getRetentionPolicy())
.tags(config.getTags())
.build()));
}

@Override
Expand Down Expand Up @@ -157,6 +161,23 @@ public Iterator<Stream> listStreams(String scopeName) {
return asyncIterator.asIterator();
}

@Override
public Iterator<Stream> listStreams(String scopeName, String tagName) {
NameUtils.validateUserScopeName(scopeName);
log.info("Listing streams in scope: {} which has tag: {}", scopeName, tagName);
AsyncIterator<Stream> asyncIterator = controller.listStreamsForTag(scopeName, tagName);
return asyncIterator.asIterator();
}

@Override
public Collection<String> getStreamTags(String scopeName, String streamName) {
NameUtils.validateUserScopeName(scopeName);
NameUtils.validateUserStreamName(streamName);
log.info("Fetching tags associated with stream: {}/{}", scopeName, streamName);
return Futures.getThrowingException(controller.getStreamConfiguration(scopeName, streamName)
.thenApply(StreamConfiguration::getTags));
}

@Override
public boolean checkStreamExists(String scopeName, String streamName) {
log.info("Checking if stream {} exists in scope {}", streamName, scopeName);
Expand Down Expand Up @@ -240,16 +261,17 @@ public StreamInfo getStreamInfo(String scopeName, String streamName) {
* @return A future representing {@link StreamInfo}.
*/
private CompletableFuture<StreamInfo> getStreamInfo(final Stream stream) {
//Fetch the stream cut representing the current TAIL and current HEAD of the stream.
// Fetch the stream configuration which includes the tags associated with the stream.
CompletableFuture<StreamConfiguration> streamConfiguration = controller.getStreamConfiguration(stream.getScope(), stream.getStreamName());
// Fetch the stream cut representing the current TAIL and current HEAD of the stream.
CompletableFuture<StreamCut> currentTailStreamCut = streamCutHelper.fetchTailStreamCut(stream);
CompletableFuture<StreamCut> currentHeadStreamCut = streamCutHelper.fetchHeadStreamCut(stream);
return currentTailStreamCut.thenCombine(currentHeadStreamCut,
(tailSC, headSC) -> {
boolean isSealed = ((StreamCutImpl) tailSC).getPositions().isEmpty();
return new StreamInfo(stream.getScope(),
stream.getStreamName(),
tailSC, headSC, isSealed);
});
return CompletableFuture.allOf(streamConfiguration, currentHeadStreamCut, currentTailStreamCut)
.thenApply(v -> {
boolean isSealed = ((StreamCutImpl) currentTailStreamCut.join()).getPositions().isEmpty();
return new StreamInfo(stream.getScope(), stream.getStreamName(), streamConfiguration.join(),
currentTailStreamCut.join(), currentHeadStreamCut.join(), isSealed);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ public interface Controller extends AutoCloseable {
*/
AsyncIterator<Stream> listStreams(final String scopeName);

/**
* Gets an async iterator on streams in scope.
*
* @param scopeName The name of the scope for which to list streams in.
* @param tag The Stream tag.
* @return An AsyncIterator which can be used to iterate over all Streams in the scope.
*/
AsyncIterator<Stream> listStreamsForTag(final String scopeName, final String tag);

/**
* API to delete a scope. Note that a scope can only be deleted in the case is it empty. If
* the scope contains at least one stream, then the delete request will fail.
Expand Down Expand Up @@ -117,6 +126,16 @@ public interface Controller extends AutoCloseable {
*/
CompletableFuture<Boolean> checkStreamExists(final String scopeName, final String streamName);

/**
* Fetch the current Stream Configuration. This includes the {@link io.pravega.client.stream.ScalingPolicy},
* {@link io.pravega.client.stream.RetentionPolicy} and tags for the given stream.
*
* @param scopeName name of scope.
* @param streamName name of stream.
* @return CompletableFuture which returns the current stream configuration.
*/
CompletableFuture<StreamConfiguration> getStreamConfiguration(final String scopeName, final String streamName);

/**
* API to update the configuration of a stream.
* @param scope Scope
Expand Down

0 comments on commit 6a34e64

Please sign in to comment.