Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
stephentoub committed Jun 3, 2020
1 parent 4395ed6 commit 5d3c742
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/libraries/Common/src/System/Net/ArrayBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ public void Dispose()

public int ActiveLength => _availableStart - _activeStart;
public Span<byte> ActiveSpan => new Span<byte>(_bytes, _activeStart, _availableStart - _activeStart);
public ReadOnlySpan<byte> ActiveReadOnlySpan => new ReadOnlySpan<byte>(_bytes, _activeStart, _availableStart - _activeStart);
public Memory<byte> ActiveMemory => new Memory<byte>(_bytes, _activeStart, _availableStart - _activeStart);

public int AvailableLength => _bytes.Length - _availableStart;
public Span<byte> AvailableSpan => _bytes.AsSpan(_availableStart);
public Memory<byte> AvailableMemory => _bytes.AsMemory(_availableStart);
public Memory<byte> AvailableMemorySliced(int length) => new Memory<byte>(_bytes, _availableStart, length);


public int Capacity => _bytes.Length;

public void Discard(int byteCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,15 +753,14 @@ private void ProcessGoAwayFrame(FrameHeader frameHeader)
}

internal Task FlushAsync(CancellationToken cancellationToken) =>
InvokeLockedAsync(0, this, (thisRef, writeBuffer) => thisRef.FinishWrite(writeBuffer.Length, FlushTiming.Now), null, cancellationToken);
PerformWriteAsync(0, 0, (_, __) => FlushTiming.Now, cancellationToken);

/// <summary>Performs a write operation serialized via the <see cref="_writerLock"/>.</summary>
/// <param name="writeBytes">The number of bytes to be written.</param>
/// <param name="state">The state to pass through to the callbacks.</param>
/// <param name="lockedAction">The action to be invoked while the writer lock is held and that actually writes the data to the provided buffer.</param>
/// <param name="failedAcquireAction">The action to invoke if waiting for the write lock is canceled.</param>
/// <param name="cancellationToken">The cancellation token to use while waiting.</param>
private async Task InvokeLockedAsync<T>(int writeBytes, T state, Action<T, Memory<byte>> lockedAction, Action<T>? failedAcquireAction = null, CancellationToken cancellationToken = default)
private async Task PerformWriteAsync<T>(int writeBytes, T state, Func<T, Memory<byte>, FlushTiming> lockedAction, CancellationToken cancellationToken = default)
{
if (NetEventSource.IsEnabled) Trace($"{nameof(writeBytes)}={writeBytes}");

Expand Down Expand Up @@ -794,7 +793,6 @@ private async Task InvokeLockedAsync<T>(int writeBytes, T state, Action<T, Memor
LogExceptions(FlushAsync(cancellationToken: default));
}

failedAcquireAction?.Invoke(state);
throw;
}
Interlocked.Decrement(ref _pendingWriters);
Expand All @@ -804,7 +802,6 @@ private async Task InvokeLockedAsync<T>(int writeBytes, T state, Action<T, Memor
if (_abortException != null)
{
_writerLock.Exit();
failedAcquireAction?.Invoke(state);
ThrowRequestAborted(_abortException);
}

Expand Down Expand Up @@ -833,33 +830,19 @@ private async Task InvokeLockedAsync<T>(int writeBytes, T state, Action<T, Memor

// Invoke the callback with the supplied state and the target write buffer.
_outgoingBuffer.EnsureAvailableSpace(writeBytes);
lockedAction(state, _outgoingBuffer.AvailableMemorySliced(writeBytes));
FlushTiming flush = lockedAction(state, _outgoingBuffer.AvailableMemorySliced(writeBytes));

// Finish the write
_outgoingBuffer.Commit(writeBytes);
_lastPendingWriterShouldFlush |= flush == FlushTiming.AfterPendingWrites;
EndWrite(forceFlush: flush == FlushTiming.Now);
}
finally
{
_writerLock.Exit();
}
}

/// <summary>Flushes buffered bytes to the wire.</summary>
/// <param name="bytesWritten">The number of bytes written that are being flushed.</param>
/// <param name="flush">When a flush should be performed for this write.</param>
/// <remarks>
/// Writes here need to be atomic, so as to avoid killing the whole connection.
/// Callers must hold the write lock.
/// </remarks>
private void FinishWrite(int bytesWritten, FlushTiming flush)
{
// We can't validate that we hold the mutex, but we can at least validate that someone is holding it.
Debug.Assert(_writerLock.IsHeld);

if (NetEventSource.IsEnabled) Trace($"{nameof(flush)}={flush}");

_outgoingBuffer.Commit(bytesWritten);
_lastPendingWriterShouldFlush |= flush == FlushTiming.AfterPendingWrites;
EndWrite(forceFlush: flush == FlushTiming.Now);
}

private void EndWrite(bool forceFlush)
{
// We can't validate that we hold the mutex, but we can at least validate that someone is holding it.
Expand All @@ -878,44 +861,40 @@ private void EndWrite(bool forceFlush)
}

private Task SendSettingsAckAsync() =>
InvokeLockedAsync(FrameHeader.Size, this, (thisRef, writeBuffer) =>
PerformWriteAsync(FrameHeader.Size, this, (thisRef, writeBuffer) =>
{
if (NetEventSource.IsEnabled) thisRef.Trace("Started writing.");
Span<byte> span = writeBuffer.Span;
FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Settings, FrameFlags.Ack, streamId: 0);
FrameHeader.WriteTo(span, 0, FrameType.Settings, FrameFlags.Ack, streamId: 0);
FinishWrite(span.Length, FlushTiming.AfterPendingWrites);
return FlushTiming.AfterPendingWrites;
});

/// <param name="pingContent">The 8-byte ping content to send, read as a big-endian integer.</param>
private Task SendPingAckAsync(long pingContent) =>
InvokeLockedAsync(FrameHeader.Size + FrameHeader.PingLength, (thisRef: this, pingContent), (state, writeBuffer) =>
PerformWriteAsync(FrameHeader.Size + FrameHeader.PingLength, (thisRef: this, pingContent), (state, writeBuffer) =>
{
if (NetEventSource.IsEnabled) state.thisRef.Trace("Started writing.");
Debug.Assert(sizeof(long) == FrameHeader.PingLength);
Span<byte> span = writeBuffer.Span;
FrameHeader.WriteTo(span, FrameHeader.PingLength, FrameType.Ping, FrameFlags.Ack, streamId: 0);
BinaryPrimitives.WriteInt64BigEndian(span.Slice(FrameHeader.Size), state.pingContent);
state.thisRef.FinishWrite(span.Length, FlushTiming.AfterPendingWrites);
return FlushTiming.AfterPendingWrites;
});

private Task SendRstStreamAsync(int streamId, Http2ProtocolErrorCode errorCode) =>
InvokeLockedAsync(FrameHeader.Size + FrameHeader.RstStreamLength, (thisRef: this, streamId, errorCode), (s, writeBuffer) =>
PerformWriteAsync(FrameHeader.Size + FrameHeader.RstStreamLength, (thisRef: this, streamId, errorCode), (s, writeBuffer) =>
{
if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(s.errorCode)}={s.errorCode}");
Span<byte> span = writeBuffer.Span;
FrameHeader.WriteTo(span, FrameHeader.RstStreamLength, FrameType.RstStream, FrameFlags.None, s.streamId);
BinaryPrimitives.WriteInt32BigEndian(span.Slice(FrameHeader.Size), (int)s.errorCode);
s.thisRef.FinishWrite(span.Length, FlushTiming.Now); // ensure cancellation is seen as soon as possible
return FlushTiming.Now; // ensure cancellation is seen as soon as possible
});

private static (ReadOnlyMemory<byte> first, ReadOnlyMemory<byte> rest) SplitBuffer(ReadOnlyMemory<byte> buffer, int maxSize) =>
Expand Down Expand Up @@ -1236,7 +1215,7 @@ private async ValueTask<Http2Stream> SendHeadersAsync(HttpRequestMessage request
// Start the write. This serializes access to write to the connection, and ensures that HEADERS
// and CONTINUATION frames stay together, as they must do. We use the lock as well to ensure new
// streams are created and started in order.
await InvokeLockedAsync(totalSize, (thisRef: this, http2Stream, current, remaining, totalSize, flags, mustFlush), (s, writeBuffer) =>
await PerformWriteAsync(totalSize, (thisRef: this, http2Stream, current, remaining, totalSize, flags, mustFlush), (s, writeBuffer) =>
{
try
{
Expand Down Expand Up @@ -1288,14 +1267,14 @@ private async ValueTask<Http2Stream> SendHeadersAsync(HttpRequestMessage request
Debug.Assert(span.Length == 0);
s.thisRef.FinishWrite(writeBuffer.Length, s.mustFlush || (s.flags & FrameFlags.EndStream) != 0 ? FlushTiming.AfterPendingWrites : FlushTiming.Eventually);
return s.mustFlush || (s.flags & FrameFlags.EndStream) != 0 ? FlushTiming.AfterPendingWrites : FlushTiming.Eventually;
}
catch
{
s.thisRef.EndWrite(forceFlush: false);
throw;
}
}).ConfigureAwait(false);
}, cancellationToken).ConfigureAwait(false);
return http2Stream;
}
catch
Expand All @@ -1321,55 +1300,51 @@ private async Task SendStreamDataAsync(int streamId, ReadOnlyMemory<byte> buffer

ReadOnlyMemory<byte> current;
(current, remaining) = SplitBuffer(remaining, frameSize);

await InvokeLockedAsync(
FrameHeader.Size + current.Length,
(thisRef: this, streamId, frameSize, current),
(s, writeBuffer) =>
try
{
await PerformWriteAsync(FrameHeader.Size + current.Length, (thisRef: this, streamId, current), (s, writeBuffer) =>
{
// Invoked while holding the lock:
if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(writeBuffer.Length)}={writeBuffer.Length}");
FrameHeader.WriteTo(writeBuffer.Span, s.current.Length, FrameType.Data, FrameFlags.None, s.streamId);
s.current.CopyTo(writeBuffer.Slice(FrameHeader.Size));
s.thisRef.FinishWrite(writeBuffer.Length, FlushTiming.Eventually); // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame
},
s =>
{
// Invoked if waiting for the lock is canceled (in that case, we need to return the credit that we have acquired and don't plan to use):
s.thisRef._connectionWindow.AdjustCredit(s.frameSize);
},
cancellationToken).ConfigureAwait(false);
return FlushTiming.Eventually; // no need to flush, as the request content may do so explicitly, or worst case we'll do so as part of the end data frame
}, cancellationToken).ConfigureAwait(false);
}
catch
{
// Invoked if waiting for the lock is canceled (in that case, we need to return the credit that we have acquired and don't plan to use):
_connectionWindow.AdjustCredit(frameSize);
throw;
}
}
}

private Task SendEndStreamAsync(int streamId) =>
InvokeLockedAsync(FrameHeader.Size, (thisRef: this, streamId), (s, writeBuffer) =>
PerformWriteAsync(FrameHeader.Size, (thisRef: this, streamId), (s, writeBuffer) =>
{
if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, "Started writing.");
Span<byte> span = writeBuffer.Span;
FrameHeader.WriteTo(span, 0, FrameType.Data, FrameFlags.EndStream, s.streamId);
FrameHeader.WriteTo(writeBuffer.Span, 0, FrameType.Data, FrameFlags.EndStream, s.streamId);
s.thisRef.FinishWrite(span.Length, FlushTiming.AfterPendingWrites); // finished sending request body, so flush soon (but ok to wait for pending packets)
return FlushTiming.AfterPendingWrites; // finished sending request body, so flush soon (but ok to wait for pending packets)
});

private Task SendWindowUpdateAsync(int streamId, int amount)
{
// We update both the connection-level and stream-level windows at the same time
Debug.Assert(amount > 0);
return InvokeLockedAsync(FrameHeader.Size + FrameHeader.WindowUpdateLength, (thisRef: this, streamId, amount), (s, writeBuffer) =>
return PerformWriteAsync(FrameHeader.Size + FrameHeader.WindowUpdateLength, (thisRef: this, streamId, amount), (s, writeBuffer) =>
{
if (NetEventSource.IsEnabled) s.thisRef.Trace(s.streamId, $"Started writing. {nameof(s.amount)}={s.amount}");
Span<byte> span = writeBuffer.Span;
FrameHeader.WriteTo(span, FrameHeader.WindowUpdateLength, FrameType.WindowUpdate, FrameFlags.None, s.streamId);
BinaryPrimitives.WriteInt32BigEndian(span.Slice(FrameHeader.Size), s.amount);
s.thisRef.FinishWrite(span.Length, FlushTiming.Now); // make sure window updates are seen as soon as possible
return FlushTiming.Now; // make sure window updates are seen as soon as possible
});
}

Expand Down Expand Up @@ -1424,7 +1399,6 @@ private void Abort(Exception abortException)
/// terminate it, which would be considered a failure, so this race condition is largely benign and inherent to
/// the nature of connection pooling.
/// </returns>

public bool IsExpired(long nowTicks,
TimeSpan connectionLifetime,
TimeSpan connectionIdleTimeout)
Expand Down

0 comments on commit 5d3c742

Please sign in to comment.