Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/Grpc.AspNetCore.Server/Internal/PipeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ public static async Task WriteSingleMessageAsync<TResponse>(this PipeWriter pipe
GrpcEventSource.Log.MessageSent();
}
}
catch (Exception ex)
catch (Exception ex) when (ex is not OperationCanceledException)
{
// Don't write error when user cancels write
GrpcServerLog.ErrorSendingMessage(logger, ex);
throw;
}
Expand Down Expand Up @@ -121,8 +122,9 @@ public static async Task WriteStreamedMessageAsync<TResponse>(this PipeWriter pi
GrpcEventSource.Log.MessageSent();
}
}
catch (Exception ex)
catch (Exception ex) when (ex is not OperationCanceledException)
{
// Don't write error when user cancels write
GrpcServerLog.ErrorSendingMessage(logger, ex);
throw;
}
Expand Down Expand Up @@ -275,8 +277,9 @@ public static async ValueTask<T> ReadSingleMessageAsync<T>(this PipeReader input
}
}
}
catch (Exception ex)
catch (Exception ex) when (ex is not OperationCanceledException)
{
// Don't write error when user cancels read
GrpcServerLog.ErrorReadingMessage(logger, ex);
throw;
}
Expand Down
16 changes: 13 additions & 3 deletions src/Grpc.Net.Client/Internal/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,11 @@ public static async ValueTask WriteMessageAsync<TMessage>(
}
catch (Exception ex)
{
GrpcCallLog.ErrorSendingMessage(call.Logger, ex);
if (!IsCancellationException(ex))
{
// Don't write error when user cancels write
GrpcCallLog.ErrorSendingMessage(call.Logger, ex);
}

if (TryCreateCallCompleteException(ex, call, out var statusException))
{
Expand Down Expand Up @@ -350,7 +354,11 @@ public static async ValueTask WriteMessageAsync(
}
catch (Exception ex)
{
GrpcCallLog.ErrorSendingMessage(call.Logger, ex);
if (!IsCancellationException(ex))
{
// Don't write error when user cancels write
GrpcCallLog.ErrorSendingMessage(call.Logger, ex);
}

if (TryCreateCallCompleteException(ex, call, out var statusException))
{
Expand All @@ -361,6 +369,8 @@ public static async ValueTask WriteMessageAsync(
}
}

private static bool IsCancellationException(Exception ex) => ex is OperationCanceledException or ObjectDisposedException;

private static bool TryCreateCallCompleteException(Exception originalException, GrpcCall call, [NotNullWhen(true)] out Exception? exception)
{
// The call may have been completed while WriteAsync was running and caused WriteAsync to throw.
Expand All @@ -369,7 +379,7 @@ private static bool TryCreateCallCompleteException(Exception originalException,
// Replace exception with the status error if:
// 1. The original exception is one Stream.WriteAsync throws if the call was completed during a write, and
// 2. The call has already been successfully completed.
if (originalException is OperationCanceledException or ObjectDisposedException &&
if (IsCancellationException(originalException) &&
call.CallTask.IsCompletedSuccessfully())
{
exception = call.CreateFailureStatusException(call.CallTask.Result);
Expand Down
44 changes: 44 additions & 0 deletions test/Grpc.Net.Client.Tests/AsyncClientStreamingCallTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using Grpc.Net.Client.Internal.Http;
using Grpc.Net.Client.Tests.Infrastructure;
using Grpc.Tests.Shared;
using Microsoft.Extensions.Logging.Testing;
using NUnit.Framework;

namespace Grpc.Net.Client.Tests;
Expand Down Expand Up @@ -160,6 +161,49 @@ public async Task ClientStreamWriter_WriteWhilePendingWrite_ErrorThrown()
Assert.AreEqual("Can't write the message because the previous write is in progress.", ex.Message);
}

[Test]
public async Task ClientStreamWriter_DisposeWhilePendingWrite_NoReadMessageError()
{
// Arrange
var testSink = new TestSink();
var loggerFactory = new TestLoggerFactory(testSink, true);
PushStreamContent<HelloRequest, HelloReply>? content = null;

var responseTcs = new TaskCompletionSource<HttpResponseMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
var httpClient = ClientTestHelpers.CreateTestClient(request =>
{
content = (PushStreamContent<HelloRequest, HelloReply>)request.Content!;
return responseTcs.Task;
});
var invoker = HttpClientCallInvokerFactory.Create(httpClient, loggerFactory: loggerFactory);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());

// Assert
var writeTask1 = call.RequestStream.WriteAsync(new HelloRequest { Name = "1" });

var writeSyncPoint = new SyncPoint(runContinuationsAsynchronously: true);
var testStream = new TestStream(writeSyncPoint);
var serializeToStreamTask = content!.SerializeToStreamAsync(testStream);

Assert.IsFalse(writeTask1.IsCompleted);
await writeSyncPoint.WaitForSyncPoint().DefaultTimeout();

call.Dispose();
writeSyncPoint.Continue();

var ex1 = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseAsync).DefaultTimeout();
Assert.AreEqual(StatusCode.Cancelled, ex1.StatusCode);
Assert.AreEqual(StatusCode.Cancelled, call.GetStatus().StatusCode);
Assert.AreEqual("gRPC call disposed.", call.GetStatus().Detail);

var ex2 = await ExceptionAssert.ThrowsAsync<RpcException>(() => writeTask1).DefaultTimeout();
Assert.AreEqual(StatusCode.Cancelled, ex2.StatusCode);

Assert.IsFalse(testSink.Writes.Any(w => w.EventId.Name == "ErrorSendingMessage"), "ErrorSendingMessage shouldn't be logged on dispose.");
}

[Test]
public async Task ClientStreamWriter_CompleteWhilePendingWrite_ErrorThrown()
{
Expand Down
39 changes: 38 additions & 1 deletion test/Grpc.Net.Client.Tests/AsyncServerStreamingCallTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
Expand All @@ -23,6 +23,7 @@
using Grpc.Net.Client.Tests.Infrastructure;
using Grpc.Shared;
using Grpc.Tests.Shared;
using Microsoft.Extensions.Logging.Testing;
using NUnit.Framework;

namespace Grpc.Net.Client.Tests;
Expand Down Expand Up @@ -277,6 +278,42 @@ await streamContent.AddDataAndWait(await ClientTestHelpers.GetResponseDataAsync(
Assert.AreEqual("gRPC call disposed.", call.GetStatus().Detail);
}

[Test]
public async Task AsyncServerStreamingCall_DisposeDuringPendingRead_NoReadMessageError()
{
// Arrange
var testSink = new TestSink();
var loggerFactory = new TestLoggerFactory(testSink, true);

var streamContent = new SyncPointMemoryStream();

var httpClient = ClientTestHelpers.CreateTestClient(request =>
{
return Task.FromResult(ResponseUtils.CreateResponse(HttpStatusCode.OK, new StreamContent(streamContent)));
});
var invoker = HttpClientCallInvokerFactory.Create(httpClient, loggerFactory: loggerFactory);

// Act
var call = invoker.AsyncServerStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(), new HelloRequest());

var responseStream = call.ResponseStream;

// Assert
Assert.IsNull(responseStream.Current);

var moveNextTask1 = responseStream.MoveNext(CancellationToken.None);
Assert.IsFalse(moveNextTask1.IsCompleted);

call.Dispose();

var ex = await ExceptionAssert.ThrowsAsync<RpcException>(() => moveNextTask1).DefaultTimeout();
Assert.AreEqual(StatusCode.Cancelled, ex.StatusCode);
Assert.AreEqual(StatusCode.Cancelled, call.GetStatus().StatusCode);
Assert.AreEqual("gRPC call disposed.", call.GetStatus().Detail);

Assert.IsFalse(testSink.Writes.Any(w => w.EventId.Name == "ErrorReadingMessage"), "ErrorReadingMessage shouldn't be logged on dispose.");
}

[Test]
public async Task ClientStreamReader_WriteWithInvalidHttpStatus_ErrorThrown()
{
Expand Down