Skip to content

Commit

Permalink
Immediate body streaming (#2413)
Browse files Browse the repository at this point in the history
* immediate body streaming

* fix echo handler expectations
  • Loading branch information
tanner0101 committed Jun 26, 2020
1 parent 0875ed1 commit 5adcd2e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Sources/Development/routes.swift
Expand Up @@ -6,7 +6,7 @@ struct Creds: Content {
}

public func routes(_ app: Application) throws {
app.on(.GET, "ping", body: .stream) { req in
app.on(.POST, "ping") { req in
return "123" as StaticString
}

Expand Down
38 changes: 17 additions & 21 deletions Sources/Vapor/HTTP/Server/HTTPServerRequestDecoder.swift
Expand Up @@ -56,24 +56,22 @@ final class HTTPServerRequestDecoder: ChannelInboundHandler, RemovableChannelHan
}
case .body(let buffer):
switch self.requestState {
case .ready: assertionFailure("Unexpected state: \(self.requestState)")
case .ready, .awaitingEnd:
assertionFailure("Unexpected state: \(self.requestState)")
case .awaitingBody(let request):
self.requestState = .awaitingEnd(request, buffer)
case .awaitingEnd(let request, let previousBuffer):
let stream = Request.BodyStream(on: context.eventLoop)
request.bodyStorage = .stream(stream)
context.fireChannelRead(self.wrapInboundOut(request))
self.handleBodyStreamStateResult(
context: context,
self.bodyStreamState.didReadBytes(previousBuffer),
stream: stream
)
self.handleBodyStreamStateResult(
context: context,
self.bodyStreamState.didReadBytes(buffer),
stream: stream
)
self.requestState = .streamingBody(stream)
if request.headers.first(name: .contentLength) == buffer.readableBytes.description {
self.requestState = .awaitingEnd(request, buffer)
} else {
let stream = Request.BodyStream(on: context.eventLoop)
request.bodyStorage = .stream(stream)
context.fireChannelRead(self.wrapInboundOut(request))
self.handleBodyStreamStateResult(
context: context,
self.bodyStreamState.didReadBytes(buffer),
stream: stream
)
self.requestState = .streamingBody(stream)
}
case .streamingBody(let stream):
self.handleBodyStreamStateResult(
context: context,
Expand Down Expand Up @@ -191,15 +189,13 @@ final class HTTPServerRequestDecoder: ChannelInboundHandler, RemovableChannelHan
promise?.succeed(())
}
}
case .awaitingEnd, .awaitingBody:
case .awaitingBody, .awaitingEnd:
// Response ended before request started streaming.
self.logger.trace("Response already sent, skipping request body.")
self.requestState = .skipping
case .ready:
case .ready, .skipping:
// Response ended after request had been read.
break
default:
fatalError("Unexpected request state: \(self.requestState)")
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions Sources/Vapor/Request/Request+BodyStream.swift
@@ -1,23 +1,22 @@
extension Request {
final class BodyStream: BodyStreamWriter {
typealias Handler = (BodyStreamResult, EventLoopPromise<Void>?) -> ()
private(set) var isClosed: Bool
private var handler: Handler?
private var buffer: [(BodyStreamResult, EventLoopPromise<Void>?)]

let eventLoop: EventLoop

var isBeingRead: Bool {
self.handler != nil
}

private(set) var isClosed: Bool
private var handler: ((BodyStreamResult, EventLoopPromise<Void>?) -> ())?
private var buffer: [(BodyStreamResult, EventLoopPromise<Void>?)]

init(on eventLoop: EventLoop) {
self.eventLoop = eventLoop
self.isClosed = false
self.buffer = []
}

func read(_ handler: @escaping Handler) {
func read(_ handler: @escaping (BodyStreamResult, EventLoopPromise<Void>?) -> ()) {
self.handler = handler
for (result, promise) in self.buffer {
handler(result, promise)
Expand Down
6 changes: 3 additions & 3 deletions Tests/VaporTests/PipelineTests.swift
Expand Up @@ -29,16 +29,16 @@ final class PipelineTests: XCTestCase {
).wait()

try channel.writeInbound(ByteBuffer(string: "POST /echo HTTP/1.1\r\ntransfer-encoding: chunked\r\n\r\n1\r\na\r\n"))
try XCTAssertNil(channel.readOutbound(as: ByteBuffer.self)?.string)

try channel.writeInbound(ByteBuffer(string: "1\r\nb\r\n"))
let chunk = try channel.readOutbound(as: ByteBuffer.self)?.string
XCTAssertContains(chunk, "HTTP/1.1 200 OK")
XCTAssertContains(chunk, "connection: keep-alive")
XCTAssertContains(chunk, "transfer-encoding: chunked")
try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "1\r\n")
try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "a")
try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "\r\n")
try XCTAssertNil(channel.readOutbound(as: ByteBuffer.self)?.string)

try channel.writeInbound(ByteBuffer(string: "1\r\nb\r\n"))
try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "1\r\n")
try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "b")
try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "\r\n")
Expand Down

0 comments on commit 5adcd2e

Please sign in to comment.