Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid throwing InvalidOpEx in server stream #1435

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ async Task<bool> MoveNextAsync(ValueTask<TRequest?> readStreamTask)

if (_completed || _serverCallContext.CancellationToken.IsCancellationRequested)
{
return Task.FromException<bool>(new InvalidOperationException("Can't read messages after the request is complete."));
// gRPC specification indicates that MoveNext() should not throw. Simply return false.
return CommonGrpcProtocolHelpers.FalseTask;
}

var request = _serverCallContext.HttpContext.Request.BodyReader.ReadStreamMessageAsync(_serverCallContext, _deserializer, cancellationToken);
Expand Down
27 changes: 13 additions & 14 deletions test/FunctionalTests/Client/StreamingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace Grpc.AspNetCore.FunctionalTests.Client
public class StreamingTests : FunctionalTestBase
{
[Test]
public async Task DuplexStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Success()
public async Task DuplexStream_SendLargeFileBatchedAndReceiveLargeFileBatched_Success()
{
// Arrange
var data = CreateTestData(1024 * 1024 * 1); // 1 MB
Expand Down Expand Up @@ -306,7 +306,7 @@ await foreach (var message in call.ResponseStream.ReadAllAsync().DefaultTimeout(
[TestCase(1)]
[TestCase(5)]
[TestCase(20)]
public async Task DuplexStreaming_SimultaniousSendAndReceiveInParallel_Success(int tasks)
public async Task DuplexStreaming_SimultaneousSendAndReceiveInParallel_Success(int tasks)
{
// Arrange
const int total = 1024 * 1024 * 1;
Expand All @@ -316,7 +316,7 @@ public async Task DuplexStreaming_SimultaniousSendAndReceiveInParallel_Success(i

var client = new StreamService.StreamServiceClient(Channel);

await TestHelpers.RunParallel(tasks, async taskIndex =>
await TestHelpers.RunParallel(tasks, async _ =>
{
var (sent, received) = await EchoData(total, data, client).DefaultTimeout();

Expand Down Expand Up @@ -421,7 +421,7 @@ await foreach (var message in requestStream.ReadAllAsync())
[Test]
public async Task DuplexStreaming_ParallelCallsFromOneChannel_Success()
{
async Task UnaryDeadlineExceeded(IAsyncStreamReader<DataMessage> requestStream, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
static async Task UnaryDeadlineExceeded(IAsyncStreamReader<DataMessage> requestStream, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
{
await foreach (var message in requestStream.ReadAllAsync())
{
Expand Down Expand Up @@ -460,7 +460,7 @@ await foreach (var message in requestStream.ReadAllAsync())
[Test]
public async Task ServerStreaming_GetTrailersAndStatus_Success()
{
async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
static async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter<DataMessage> responseStream, ServerCallContext context)
{
await responseStream.WriteAsync(new DataMessage());
context.ResponseTrailers.Add("my-trailer", "value");
Expand Down Expand Up @@ -625,7 +625,7 @@ async Task ServerStreamingWithTrailers(DataMessage request, IServerStreamWriter<

[TestCase(true)]
[TestCase(false)]
public async Task ClientStreaming_ReadAfterMethodComplete_Error(bool readBeforeExit)
public async Task ClientStreaming_ReadAfterMethodComplete_False(bool readBeforeExit)
{
SetExpectedErrorsFilter(writeContext =>
{
Expand All @@ -641,7 +641,7 @@ public async Task ClientStreaming_ReadAfterMethodComplete_Error(bool readBeforeE
var tcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
var readTcs = new TaskCompletionSource<Task>(TaskCreationOptions.RunContinuationsAsynchronously);
var syncPoint = new SyncPoint(runContinuationsAsynchronously: true);
async Task<DataMessage> ClientStreamingWithTrailers(IAsyncStreamReader<DataMessage> requestStream, ServerCallContext context)
async Task<DataMessage> ClientStreamingWithTrailersAsync(IAsyncStreamReader<DataMessage> requestStream, ServerCallContext context)
{
var readTask = Task.Run(async () =>
{
Expand All @@ -661,7 +661,7 @@ async Task<DataMessage> ClientStreamingWithTrailers(IAsyncStreamReader<DataMessa
}

// Arrange
var method = Fixture.DynamicGrpc.AddClientStreamingMethod<DataMessage, DataMessage>(ClientStreamingWithTrailers);
var method = Fixture.DynamicGrpc.AddClientStreamingMethod<DataMessage, DataMessage>(ClientStreamingWithTrailersAsync);

var channel = CreateChannel();

Expand All @@ -680,21 +680,21 @@ async Task<DataMessage> ClientStreamingWithTrailers(IAsyncStreamReader<DataMessa

tcs.SetResult(null);

var response = await call;
DataMessage response = await call;
Assert.IsNotNull(response);

syncPoint.Continue();

var readTask = await readTcs.Task.DefaultTimeout();
var ex = await ExceptionAssert.ThrowsAsync<InvalidOperationException>(() => readTask).DefaultTimeout();
Assert.AreEqual("Can't read messages after the request is complete.", ex.Message);
await readTask.DefaultTimeout();

var clientException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new DataMessage())).DefaultTimeout();
Assert.AreEqual(StatusCode.OK, clientException.StatusCode);
}

[TestCase(true)]
[TestCase(false)]
public async Task ClientStreaming_ReadAfterMethodCancelled_Error(bool readBeforeExit)
public async Task ClientStreaming_ReadAfterMethodCancelled_False(bool readBeforeExit)
{
SetExpectedErrorsFilter(writeContext =>
{
Expand Down Expand Up @@ -759,8 +759,7 @@ async Task<DataMessage> ClientStreamingWithTrailers(IAsyncStreamReader<DataMessa
syncPoint.Continue();

var readTask = await readTcs.Task.DefaultTimeout();
var serverException = await ExceptionAssert.ThrowsAsync<InvalidOperationException>(() => readTask).DefaultTimeout();
Assert.AreEqual("Can't read messages after the request is complete.", serverException.Message);
await readTask;

// Ensure the server abort reaches the client
await Task.Delay(100);
Expand Down