Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues with streams during rolling deploy (Orleans.Streams.QueueCacheMissException) #8986

Open
erikljung opened this issue May 8, 2024 · 0 comments

Comments

@erikljung
Copy link

erikljung commented May 8, 2024

I've spent a couple of days fighting issues with stream events gone missing during rolling deploy. I created a minimal repro, to rule out any business logic issues, but I can still reproduce the error. It's possible that this configuration is sub-optimal or that I'm doing something wrong, so please advice me.

Setup

Orleans version: 7.2.6
Stream pub sub configuration: StreamPubSubType.ImplicitOnly
Stream storage: MemoryGrainStorage for "PubSubStore" (if this is applicable with ImplicitOnly?)
GrainDirectory: Redis
DefaultCompatibilityStrategy: BackwardCompatible
DefaultVersionSelectorStrategy: LatestVersion
Using Postgres as grain storage/membership

Repro

  1. Start silo A
  2. Start sending traffic to the cluster, see producer and consumer below
  3. Consumer (V1) activates and starts receiving events
  4. Start silo B, which joins the cluster, now a total of 2 silos in the cluster
  5. StreamGrain (V1) will deactivate on Silo A, and activate (V2) on silo B. This happens due to the version selector strategy, plus a call to another grain method (since implicit stream subscription grains won't move automatically)
  6. Stop silo A after some time
  7. One (1) Orleans.Streams.QueueCacheMissException occurs on Silo B at the consumers end. This is the only error I get. No error occur at the producers end
  8. Everything seems to recover, but some events never reaches the consumer (500 events out of 7000)

Questions

  1. Why is this happening, and can it be prevented?
  2. Is there any way to recover the events that the consumer grain never received?
  3. Is there another configuration that would be more suitable for rolling deploys?
Error

2024-05-08 12:30:23.571 [FTL] StreamGrain - Stream failed for grain "fd4e338d-36dd-4533-99c8-909c3a4ac187" Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: [EventSequenceToken: SeqNum=638507681451236680, EventIndex=0], Low: [EventSequenceToken: SeqNum=638507681451236716, EventIndex=0], High: [EventSequenceToken: SeqNum=638507681451236729, EventIndex=0]
   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
   at Orleans.Providers.Streams.Common.PooledQueueCache.GetCursor(StreamId streamId, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 118
   at Orleans.Providers.MemoryPooledCache`1.Cursor..ctor(PooledQueueCache cache, StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 107
   at Orleans.Providers.MemoryPooledCache`1.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Orleans.Streaming/MemoryStreams/MemoryPooledCache.cs:line 170
   at Orleans.Streams.PersistentStreamPullingAgent.DoHandshakeWithConsumer(StreamConsumerData consumerData, StreamSequenceToken cacheToken) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 303
--- End of stack trace from previous location ---
Producer

var stream =
  IClusterClient.GetStream<StreamEvent>(
	StreamId.Create(
		"Namespace.StreamEvent",
		request.GrainId ) );

await stream.OnNextAsync( new StreamEvent( request.Id ) );
Consumer

[ImplicitStreamSubscription( streamNamespace: STREAM_NS )]
public class StreamGrain : IGrainBase, IStreamGrain
{
	private const string STREAM_NS = "Namespace.StreamEvent";

	private IPersistentState<StreamState> Storage { get; }
	private ILogger<StreamGrain> Logger { get; }

	public IGrainContext GrainContext { get; }

	public StreamGrain(
		[PersistentState( nameof( StreamGrain ) )]
		IPersistentState<StreamState> storage,
		IGrainContext grainContext,
		ILogger<StreamGrain> logger )

	{
		Storage = storage;
		GrainContext = grainContext;
		Logger = logger;
	}

	public async Task OnActivateAsync( CancellationToken token )
	{
		LogGrainStatus( "Activating" );

		var streamId = StreamId.Create( STREAM_NS, this.GetPrimaryKey() );
		var stream = this.GetDefaultStreamProvider().GetStream<StreamEvent>( streamId );

		await stream.SubscribeAsync(
			onNextAsync: Register,
			onErrorAsync: OnError,
			token: Storage.State.LastStreamToken );
	}

	public Task OnDeactivateAsync( DeactivationReason reason, CancellationToken token )
	{
		LogGrainStatus( "Deactivating" );
		return Task.CompletedTask;
	}

	public async Task Register( StreamEvent input, StreamSequenceToken token )
	{
		Logger.LogInformation( "Storing event: {Id}; token {Token}", input.Id, token );

		Storage.State.LastStreamToken = token;
		Storage.State.ReceivedEvents.Add( input.Id );
		await Storage.WriteStateAsync();
	}

	private Task OnError( Exception ex )
	{
		Logger.LogCritical( ex, "Stream failed for grain {Id}", this.GetPrimaryKey() );
		return Task.CompletedTask;
	}

	private void LogGrainStatus( string state ) =>
		Logger.LogInformation(
			"{State} stream grain {Id}; Stream token: {Token}; Events: {Events}",
			state,
			this.GetPrimaryKey(),
			Storage.State.LastStreamToken,
			Storage.State.ReceivedEvents.Count );
}
@erikljung erikljung changed the title Issues with Orleans.Streams.QueueCacheMissException during rolling deploy Issues with streams during rolling deploy (Orleans.Streams.QueueCacheMissException) May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant