Skip to content

Commit

Permalink
Add support for asynchronous body stream writing (#2998)
Browse files Browse the repository at this point in the history
* Add support for asynchronous body stream writing, thereby also fixing crashes when users try to write a body from within a task towards the ELF APIs

* Fix compilation for Swift 5.6 inference

* Add support for asynchronous body stream writing, thereby also fixing crashes when users try to write a body from within a task towards the ELF APIs

* Fix compilation for Swift 5.6 inference

* Fix older swift compilign tests

* Fix the test, as you cannot write a response while the request is still inbound

* Add docc comments, remove TODOs and converted them into an issue

* Be explicit in the return tpe that fails to infer on Swift 5.6

* Remove Swift 5.6 support

* Fix Sendable warning in RFC1123

* Tidy ups

* Tidy ups

---------

Co-authored-by: Gwynne Raskind <gwynne@vapor.codes>
Co-authored-by: Tim Condon <0xTim@users.noreply.github.com>
Co-authored-by: Tim <0xtimc@gmail.com>
  • Loading branch information
4 people committed Apr 30, 2024
1 parent e91021c commit c524f60
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 2 deletions.
20 changes: 20 additions & 0 deletions Sources/Vapor/HTTP/BodyStream.swift
Expand Up @@ -39,8 +39,12 @@ extension BodyStreamResult: CustomDebugStringConvertible {
}
}

/// A type that represents the writable handle of a streamed ``Response`` body.
public protocol BodyStreamWriter: Sendable {
/// The eventloop upon which writes must be sent
var eventLoop: EventLoop { get }

/// Writes an event to a streaming HTTP body. If the `result` is `.end` or `.error`, the stream ends.
func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?)
}

Expand All @@ -63,3 +67,19 @@ extension BodyStreamWriter {
return promise.futureResult
}
}

/// A type that represents the writable handle of a streamed ``Response`` body
public protocol AsyncBodyStreamWriter: Sendable {
/// Writes an event to a streaming HTTP body. If the `result` is `.end` or `.error`, the stream ends.
func write(_ result: BodyStreamResult) async throws

/// Writes a `ByteBuffer` to the stream. Provides a default implementation that calls itself using `BodyStreamResult`
func writeBuffer(_ buffer: ByteBuffer) async throws
}

extension AsyncBodyStreamWriter {
/// Writes the buffer wrapped in a ``BodyStreamResult`` to `self`
public func writeBuffer(_ buffer: ByteBuffer) async throws {
try await write(.buffer(buffer))
}
}
24 changes: 23 additions & 1 deletion Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift
Expand Up @@ -70,6 +70,17 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
count: stream.count == -1 ? nil : stream.count
)
stream.callback(channelStream)
case .asyncStream(let stream):
let channelStream = ChannelResponseBodyStream(
context: context,
handler: self,
promise: nil,
count: stream.count == -1 ? nil : stream.count
)

promise?.completeWithTask {
try await stream.callback(channelStream)
}
}
}
}
Expand All @@ -84,7 +95,7 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
}
}

private final class ChannelResponseBodyStream: BodyStreamWriter {
private final class ChannelResponseBodyStream: BodyStreamWriter, AsyncBodyStreamWriter {
let contextBox: NIOLoopBound<ChannelHandlerContext>
let handlerBox: NIOLoopBound<HTTPServerResponseEncoder>
let promise: EventLoopPromise<Void>?
Expand Down Expand Up @@ -113,15 +124,26 @@ private final class ChannelResponseBodyStream: BodyStreamWriter {
self.eventLoop = context.eventLoop
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write(result, promise: promise)
return promise.futureResult
}.get()
}

func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
switch result {
case .buffer(let buffer):
// See: https://github.com/vapor/vapor/issues/2976
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
if let count = self.count, self.currentCount.wrappingIncrementThenLoad(by: buffer.readableBytes, ordering: .sequentiallyConsistent) > count {
self.promise?.fail(Error.tooManyBytes)
promise?.fail(Error.notEnoughBytes)
}
case .end:
// See: https://github.com/vapor/vapor/issues/2976
self.isComplete.store(true, ordering: .sequentiallyConsistent)
if let count = self.count, self.currentCount.load(ordering: .sequentiallyConsistent) < count {
self.promise?.fail(Error.notEnoughBytes)
Expand Down
11 changes: 10 additions & 1 deletion Sources/Vapor/Request/Request+BodyStream.swift
Expand Up @@ -2,7 +2,7 @@ import NIOCore
import NIOConcurrencyHelpers

extension Request {
final class BodyStream: BodyStreamWriter {
final class BodyStream: BodyStreamWriter, AsyncBodyStreamWriter {
let eventLoop: EventLoop

var isBeingRead: Bool {
Expand Down Expand Up @@ -45,6 +45,15 @@ extension Request {
}
self.handlerBuffer.value.buffer = []
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write0(result, promise: promise)
return promise.futureResult
}.get()
}

func write(_ chunk: BodyStreamResult, promise: EventLoopPromise<Void>?) {
// See https://github.com/vapor/vapor/issues/2906
Expand Down
58 changes: 58 additions & 0 deletions Sources/Vapor/Response/Response+Body.swift
Expand Up @@ -12,6 +12,11 @@ extension Response {
let count: Int
let callback: @Sendable (BodyStreamWriter) -> ()
}

struct AsyncBodyStream {
let count: Int
let callback: @Sendable (AsyncBodyStreamWriter) async throws -> ()
}

/// Represents a `Response`'s body.
///
Expand All @@ -29,6 +34,7 @@ extension Response {
case staticString(StaticString)
case string(String)
case stream(BodyStream)
case asyncStream(AsyncBodyStream)
}

/// An empty `Response.Body`.
Expand Down Expand Up @@ -56,6 +62,7 @@ extension Response {
case .buffer(let buffer): return buffer.readableBytes
case .none: return 0
case .stream(let stream): return stream.count
case .asyncStream(let stream): return stream.count
}
}

Expand All @@ -69,6 +76,7 @@ extension Response {
case .string(let string): return Data(string.utf8)
case .none: return nil
case .stream: return nil
case .asyncStream: return nil
}
}

Expand All @@ -89,6 +97,7 @@ extension Response {
return buffer
case .none: return nil
case .stream: return nil
case .asyncStream: return nil
}
}

Expand All @@ -114,6 +123,7 @@ extension Response {
case .staticString(let string): return string.description
case .string(let string): return string
case .stream: return "<stream>"
case .asyncStream: return "<async stream>"
}
}

Expand Down Expand Up @@ -167,6 +177,54 @@ extension Response {
self.init(stream: stream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// Creates a chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter``.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results. **MUST** call `.end` or `.error` when terminating the stream
/// - count: The amount of bytes that will be written. The `asyncStream` **MUST** produce exactly `count` bytes.
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.byteBufferAllocator = byteBufferAllocator
self.storage = .asyncStream(.init(count: count, callback: asyncStream))
}

/// Creates a chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter``.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results. **MUST** call `.end` or `.error` when terminating the stream
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.init(asyncStream: asyncStream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// Creates a _managed_ chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter`` that automtically closes or fails based if the closure throws an error or returns.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results, which **MUST NOT** call `.end` or `.error` when terminating the stream.
/// - count: The amount of bytes that will be written. The `asyncStream` **MUST** produce exactly `count` bytes.
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.byteBufferAllocator = byteBufferAllocator
self.storage = .asyncStream(.init(count: count) { writer in
do {
try await managedAsyncStream(writer)
try await writer.write(.end)
} catch {
try await writer.write(.error(error))
}
})
}

/// Creates a _managed_ chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter`` that automtically closes or fails based if the closure throws an error or returns.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results, which **MUST NOT** call `.end` or `.error` when terminating the stream.
/// - count: The amount of bytes that will be written
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.init(managedAsyncStream: managedAsyncStream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// `ExpressibleByStringLiteral` conformance.
public init(stringLiteral value: String) {
self.byteBufferAllocator = ByteBufferAllocator()
Expand Down
65 changes: 65 additions & 0 deletions Tests/VaporTests/PipelineTests.swift
@@ -1,6 +1,7 @@
@testable import Vapor
import enum NIOHTTP1.HTTPParserError
import XCTest
import AsyncHTTPClient
import NIOEmbedded
import NIOCore

Expand Down Expand Up @@ -66,6 +67,70 @@ final class PipelineTests: XCTestCase {
try XCTAssertNil(channel.readOutbound(as: ByteBuffer.self)?.string)
}

func testAsyncEchoHandlers() async throws {
let app = Application(.testing)
defer { app.shutdown() }


app.on(.POST, "echo", body: .stream) { request async throws -> Response in
var buffers = [ByteBuffer]()

for try await buffer in request.body {
buffers.append(buffer)
}

return Response(body: .init(managedAsyncStream: { [buffers] writer in
for buffer in buffers {
try await writer.writeBuffer(buffer)
}
}))
}

try app.start()

guard
let localAddress = app.http.server.shared.localAddress,
let port = localAddress.port
else {
XCTFail("couldn't get port from \(app.http.server.shared.localAddress.debugDescription)")
return
}

let client = HTTPClient(eventLoopGroupProvider: .createNew)

let chunks = [
"1\r\n",
"a",
"\r\n",
"1\r\n",
"b",
"\r\n",
"1\r\n",
"c",
"\r\n",
]

let response = try await client.post(url: "http://localhost:\(port)/echo", body: .stream { writer in
@Sendable func write(chunks: [String]) -> EventLoopFuture<Void> {
var chunks = chunks
let chunk = chunks.removeFirst()

if chunks.isEmpty {
return writer.write(.byteBuffer(ByteBuffer(string: chunk)))
} else {
return writer.write(.byteBuffer(ByteBuffer(string: chunk))).flatMap { [chunks] in
return write(chunks: chunks)
}
}
}

return write(chunks: chunks)
}).get()

XCTAssertEqual(response.body?.string, chunks.joined(separator: ""))
try await client.shutdown()
}

func testEOFFraming() throws {
app.on(.POST, "echo", body: .stream) { request -> Response in
Response(body: .init(stream: { writer in
Expand Down

0 comments on commit c524f60

Please sign in to comment.