From 5adcd2efaa2940db2e1eeec5c9c3006da6c5a1f4 Mon Sep 17 00:00:00 2001 From: Tanner Date: Fri, 26 Jun 2020 14:25:23 -0400 Subject: [PATCH] Immediate body streaming (#2413) * immediate body streaming * fix echo handler expectations --- Sources/Development/routes.swift | 2 +- .../Server/HTTPServerRequestDecoder.swift | 38 +++++++++---------- .../Vapor/Request/Request+BodyStream.swift | 11 +++--- Tests/VaporTests/PipelineTests.swift | 6 +-- 4 files changed, 26 insertions(+), 31 deletions(-) diff --git a/Sources/Development/routes.swift b/Sources/Development/routes.swift index e28a0d4a05..5dcdd4ef36 100644 --- a/Sources/Development/routes.swift +++ b/Sources/Development/routes.swift @@ -6,7 +6,7 @@ struct Creds: Content { } public func routes(_ app: Application) throws { - app.on(.GET, "ping", body: .stream) { req in + app.on(.POST, "ping") { req in return "123" as StaticString } diff --git a/Sources/Vapor/HTTP/Server/HTTPServerRequestDecoder.swift b/Sources/Vapor/HTTP/Server/HTTPServerRequestDecoder.swift index 3c3a558274..a9141cc434 100644 --- a/Sources/Vapor/HTTP/Server/HTTPServerRequestDecoder.swift +++ b/Sources/Vapor/HTTP/Server/HTTPServerRequestDecoder.swift @@ -56,24 +56,22 @@ final class HTTPServerRequestDecoder: ChannelInboundHandler, RemovableChannelHan } case .body(let buffer): switch self.requestState { - case .ready: assertionFailure("Unexpected state: \(self.requestState)") + case .ready, .awaitingEnd: + assertionFailure("Unexpected state: \(self.requestState)") case .awaitingBody(let request): - self.requestState = .awaitingEnd(request, buffer) - case .awaitingEnd(let request, let previousBuffer): - let stream = Request.BodyStream(on: context.eventLoop) - request.bodyStorage = .stream(stream) - context.fireChannelRead(self.wrapInboundOut(request)) - self.handleBodyStreamStateResult( - context: context, - self.bodyStreamState.didReadBytes(previousBuffer), - stream: stream - ) - self.handleBodyStreamStateResult( - context: context, - self.bodyStreamState.didReadBytes(buffer), - stream: stream - ) - self.requestState = .streamingBody(stream) + if request.headers.first(name: .contentLength) == buffer.readableBytes.description { + self.requestState = .awaitingEnd(request, buffer) + } else { + let stream = Request.BodyStream(on: context.eventLoop) + request.bodyStorage = .stream(stream) + context.fireChannelRead(self.wrapInboundOut(request)) + self.handleBodyStreamStateResult( + context: context, + self.bodyStreamState.didReadBytes(buffer), + stream: stream + ) + self.requestState = .streamingBody(stream) + } case .streamingBody(let stream): self.handleBodyStreamStateResult( context: context, @@ -191,15 +189,13 @@ final class HTTPServerRequestDecoder: ChannelInboundHandler, RemovableChannelHan promise?.succeed(()) } } - case .awaitingEnd, .awaitingBody: + case .awaitingBody, .awaitingEnd: // Response ended before request started streaming. self.logger.trace("Response already sent, skipping request body.") self.requestState = .skipping - case .ready: + case .ready, .skipping: // Response ended after request had been read. break - default: - fatalError("Unexpected request state: \(self.requestState)") } } } diff --git a/Sources/Vapor/Request/Request+BodyStream.swift b/Sources/Vapor/Request/Request+BodyStream.swift index f499a7f0cb..c9c829216d 100644 --- a/Sources/Vapor/Request/Request+BodyStream.swift +++ b/Sources/Vapor/Request/Request+BodyStream.swift @@ -1,23 +1,22 @@ extension Request { final class BodyStream: BodyStreamWriter { - typealias Handler = (BodyStreamResult, EventLoopPromise?) -> () - private(set) var isClosed: Bool - private var handler: Handler? - private var buffer: [(BodyStreamResult, EventLoopPromise?)] - let eventLoop: EventLoop var isBeingRead: Bool { self.handler != nil } + private(set) var isClosed: Bool + private var handler: ((BodyStreamResult, EventLoopPromise?) -> ())? + private var buffer: [(BodyStreamResult, EventLoopPromise?)] + init(on eventLoop: EventLoop) { self.eventLoop = eventLoop self.isClosed = false self.buffer = [] } - func read(_ handler: @escaping Handler) { + func read(_ handler: @escaping (BodyStreamResult, EventLoopPromise?) -> ()) { self.handler = handler for (result, promise) in self.buffer { handler(result, promise) diff --git a/Tests/VaporTests/PipelineTests.swift b/Tests/VaporTests/PipelineTests.swift index 2ed33e2680..b3f9789157 100644 --- a/Tests/VaporTests/PipelineTests.swift +++ b/Tests/VaporTests/PipelineTests.swift @@ -29,9 +29,6 @@ final class PipelineTests: XCTestCase { ).wait() try channel.writeInbound(ByteBuffer(string: "POST /echo HTTP/1.1\r\ntransfer-encoding: chunked\r\n\r\n1\r\na\r\n")) - try XCTAssertNil(channel.readOutbound(as: ByteBuffer.self)?.string) - - try channel.writeInbound(ByteBuffer(string: "1\r\nb\r\n")) let chunk = try channel.readOutbound(as: ByteBuffer.self)?.string XCTAssertContains(chunk, "HTTP/1.1 200 OK") XCTAssertContains(chunk, "connection: keep-alive") @@ -39,6 +36,9 @@ final class PipelineTests: XCTestCase { try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "1\r\n") try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "a") try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "\r\n") + try XCTAssertNil(channel.readOutbound(as: ByteBuffer.self)?.string) + + try channel.writeInbound(ByteBuffer(string: "1\r\nb\r\n")) try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "1\r\n") try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "b") try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "\r\n")