Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for asynchronous body stream writing #2998

Merged
merged 21 commits into from Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3c97f93
Add support for asynchronous body stream writing, thereby also fixing…
Joannis Jan 19, 2023
875881f
Fix compilation for Swift 5.6 inference
Joannis Jan 19, 2023
fa426b9
Add support for asynchronous body stream writing, thereby also fixing…
Joannis Jan 19, 2023
c672309
Fix compilation for Swift 5.6 inference
Joannis Jan 19, 2023
d300db7
Fix older swift compilign tests
Joannis Jan 31, 2023
d04d1ec
Merge branch 'feature/jo-async-body-stream' of github.com:vapor/vapor…
Joannis Jan 31, 2023
11bdbba
Merge branch 'main' into feature/jo-async-body-stream
gwynne Feb 21, 2023
6efc1c0
Fix the test, as you cannot write a response while the request is sti…
Joannis Feb 22, 2023
afbeaf2
Merge branch 'main' into feature/jo-async-body-stream
0xTim Mar 7, 2023
19e4706
Add docc comments, remove TODOs and converted them into an issue
Joannis Mar 7, 2023
9629929
Merge remote-tracking branch 'origin/main' into feature/jo-async-body…
Joannis Mar 7, 2023
6b5764e
Be explicit in the return tpe that fails to infer on Swift 5.6
Joannis Mar 8, 2023
168b822
Merge branch 'main' into feature/jo-async-body-stream
0xTim Mar 8, 2023
fdc3c9f
Merge remote-tracking branch 'origin/main' into feature/jo-async-body…
Joannis Apr 14, 2023
204b047
Remove Swift 5.6 support
Joannis Apr 14, 2023
1b6f942
Merge branch 'main' into feature/jo-async-body-stream
Joannis May 15, 2023
15a3fc3
Fix Sendable warning in RFC1123
0xTim Apr 26, 2024
c80ebda
Merge branch 'main' into feature/jo-async-body-stream
0xTim Apr 30, 2024
b611047
Tidy ups
0xTim Apr 30, 2024
f11d3fe
Tidy ups
0xTim Apr 30, 2024
ef1b218
Merge branch 'main' into feature/jo-async-body-stream
0xTim Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions Sources/Vapor/HTTP/BodyStream.swift
Expand Up @@ -39,8 +39,12 @@ extension BodyStreamResult: CustomDebugStringConvertible {
}
}

/// A type that represents the writable handle of a streamed ``Response`` body.
public protocol BodyStreamWriter: Sendable {
/// The eventloop upon which writes must be sent
var eventLoop: EventLoop { get }

/// Writes an event to a streaming HTTP body. If the `result` is `.end` or `.error`, the stream ends.
func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?)
}

Expand All @@ -63,3 +67,19 @@ extension BodyStreamWriter {
return promise.futureResult
}
}

/// A type that represents the writable handle of a streamed ``Response`` body
public protocol AsyncBodyStreamWriter: Sendable {
/// Writes an event to a streaming HTTP body. If the `result` is `.end` or `.error`, the stream ends.
func write(_ result: BodyStreamResult) async throws

/// Writes a `ByteBuffer` to the stream. Provides a default implementation that calls itself using `BodyStreamResult`
func writeBuffer(_ buffer: ByteBuffer) async throws
}

extension AsyncBodyStreamWriter {
/// Writes the buffer wrapped in a ``BodyStreamResult`` to `self`
public func writeBuffer(_ buffer: ByteBuffer) async throws {
try await write(.buffer(buffer))
}
}
24 changes: 23 additions & 1 deletion Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift
Expand Up @@ -70,6 +70,17 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
count: stream.count == -1 ? nil : stream.count
)
stream.callback(channelStream)
case .asyncStream(let stream):
let channelStream = ChannelResponseBodyStream(
context: context,
handler: self,
promise: nil,
count: stream.count == -1 ? nil : stream.count
)

promise?.completeWithTask {
try await stream.callback(channelStream)
}
}
}
}
Expand All @@ -84,7 +95,7 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
}
}

private final class ChannelResponseBodyStream: BodyStreamWriter {
private final class ChannelResponseBodyStream: BodyStreamWriter, AsyncBodyStreamWriter {
let contextBox: NIOLoopBound<ChannelHandlerContext>
let handlerBox: NIOLoopBound<HTTPServerResponseEncoder>
let promise: EventLoopPromise<Void>?
Expand Down Expand Up @@ -113,15 +124,26 @@ private final class ChannelResponseBodyStream: BodyStreamWriter {
self.eventLoop = context.eventLoop
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write(result, promise: promise)
return promise.futureResult
}.get()
}

func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
switch result {
case .buffer(let buffer):
// See: https://github.com/vapor/vapor/issues/2976
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
if let count = self.count, self.currentCount.wrappingIncrementThenLoad(by: buffer.readableBytes, ordering: .sequentiallyConsistent) > count {
self.promise?.fail(Error.tooManyBytes)
promise?.fail(Error.notEnoughBytes)
}
case .end:
// See: https://github.com/vapor/vapor/issues/2976
self.isComplete.store(true, ordering: .sequentiallyConsistent)
if let count = self.count, self.currentCount.load(ordering: .sequentiallyConsistent) < count {
self.promise?.fail(Error.notEnoughBytes)
Expand Down
11 changes: 10 additions & 1 deletion Sources/Vapor/Request/Request+BodyStream.swift
Expand Up @@ -2,7 +2,7 @@ import NIOCore
import NIOConcurrencyHelpers

extension Request {
final class BodyStream: BodyStreamWriter {
final class BodyStream: BodyStreamWriter, AsyncBodyStreamWriter {
let eventLoop: EventLoop

var isBeingRead: Bool {
Expand Down Expand Up @@ -45,6 +45,15 @@ extension Request {
}
self.handlerBuffer.value.buffer = []
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write0(result, promise: promise)
return promise.futureResult
}.get()
}

func write(_ chunk: BodyStreamResult, promise: EventLoopPromise<Void>?) {
// See https://github.com/vapor/vapor/issues/2906
Expand Down
58 changes: 58 additions & 0 deletions Sources/Vapor/Response/Response+Body.swift
Expand Up @@ -12,6 +12,11 @@ extension Response {
let count: Int
let callback: @Sendable (BodyStreamWriter) -> ()
}

struct AsyncBodyStream {
let count: Int
let callback: @Sendable (AsyncBodyStreamWriter) async throws -> ()
}

/// Represents a `Response`'s body.
///
Expand All @@ -29,6 +34,7 @@ extension Response {
case staticString(StaticString)
case string(String)
case stream(BodyStream)
case asyncStream(AsyncBodyStream)
}

/// An empty `Response.Body`.
Expand Down Expand Up @@ -56,6 +62,7 @@ extension Response {
case .buffer(let buffer): return buffer.readableBytes
case .none: return 0
case .stream(let stream): return stream.count
case .asyncStream(let stream): return stream.count
}
}

Expand All @@ -69,6 +76,7 @@ extension Response {
case .string(let string): return Data(string.utf8)
case .none: return nil
case .stream: return nil
case .asyncStream: return nil
}
}

Expand All @@ -89,6 +97,7 @@ extension Response {
return buffer
case .none: return nil
case .stream: return nil
case .asyncStream: return nil
}
}

Expand All @@ -114,6 +123,7 @@ extension Response {
case .staticString(let string): return string.description
case .string(let string): return string
case .stream: return "<stream>"
case .asyncStream: return "<async stream>"
}
}

Expand Down Expand Up @@ -167,6 +177,54 @@ extension Response {
self.init(stream: stream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// Creates a chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter``.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results. **MUST** call `.end` or `.error` when terminating the stream
/// - count: The amount of bytes that will be written. The `asyncStream` **MUST** produce exactly `count` bytes.
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.byteBufferAllocator = byteBufferAllocator
self.storage = .asyncStream(.init(count: count, callback: asyncStream))
}

/// Creates a chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter``.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results. **MUST** call `.end` or `.error` when terminating the stream
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.init(asyncStream: asyncStream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// Creates a _managed_ chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter`` that automtically closes or fails based if the closure throws an error or returns.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results, which **MUST NOT** call `.end` or `.error` when terminating the stream.
/// - count: The amount of bytes that will be written. The `asyncStream` **MUST** produce exactly `count` bytes.
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.byteBufferAllocator = byteBufferAllocator
self.storage = .asyncStream(.init(count: count) { writer in
do {
try await managedAsyncStream(writer)
try await writer.write(.end)
} catch {
try await writer.write(.error(error))
}
})
}

/// Creates a _managed_ chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter`` that automtically closes or fails based if the closure throws an error or returns.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results, which **MUST NOT** call `.end` or `.error` when terminating the stream.
/// - count: The amount of bytes that will be written
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.init(managedAsyncStream: managedAsyncStream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// `ExpressibleByStringLiteral` conformance.
public init(stringLiteral value: String) {
self.byteBufferAllocator = ByteBufferAllocator()
Expand Down
65 changes: 65 additions & 0 deletions Tests/VaporTests/PipelineTests.swift
@@ -1,6 +1,7 @@
@testable import Vapor
import enum NIOHTTP1.HTTPParserError
import XCTest
import AsyncHTTPClient
import NIOEmbedded
import NIOCore

Expand Down Expand Up @@ -66,6 +67,70 @@ final class PipelineTests: XCTestCase {
try XCTAssertNil(channel.readOutbound(as: ByteBuffer.self)?.string)
}

func testAsyncEchoHandlers() async throws {
let app = Application(.testing)
defer { app.shutdown() }


app.on(.POST, "echo", body: .stream) { request async throws -> Response in
var buffers = [ByteBuffer]()

for try await buffer in request.body {
buffers.append(buffer)
}

return Response(body: .init(managedAsyncStream: { [buffers] writer in
for buffer in buffers {
try await writer.writeBuffer(buffer)
}
}))
}

try app.start()

guard
let localAddress = app.http.server.shared.localAddress,
let port = localAddress.port
else {
XCTFail("couldn't get port from \(app.http.server.shared.localAddress.debugDescription)")
return
}

let client = HTTPClient(eventLoopGroupProvider: .createNew)

let chunks = [
"1\r\n",
"a",
"\r\n",
"1\r\n",
"b",
"\r\n",
"1\r\n",
"c",
"\r\n",
]

let response = try await client.post(url: "http://localhost:\(port)/echo", body: .stream { writer in
@Sendable func write(chunks: [String]) -> EventLoopFuture<Void> {
var chunks = chunks
let chunk = chunks.removeFirst()

if chunks.isEmpty {
return writer.write(.byteBuffer(ByteBuffer(string: chunk)))
} else {
return writer.write(.byteBuffer(ByteBuffer(string: chunk))).flatMap { [chunks] in
return write(chunks: chunks)
}
}
}

return write(chunks: chunks)
}).get()

XCTAssertEqual(response.body?.string, chunks.joined(separator: ""))
try await client.shutdown()
}

func testEOFFraming() throws {
app.on(.POST, "echo", body: .stream) { request -> Response in
Response(body: .init(stream: { writer in
Expand Down