Skip to content

Commit

Permalink
Provide AsyncFileStreaming API (#3184)
Browse files Browse the repository at this point in the history
* Provide async stream implemetation

* Add AsyncFileTests

* Fix double/int bug

* Allow the stream writer to complete the writer promise for async streams while cascading errors thrown from the stream callback to the promise.

* Use the same ordering for both stores to the isComplete atomic in ChannelResponseBodyStream

* Actually call the onCompleted() callback in asyncStreamFile()

* Make sure the promise is never dropped on the floor in release builds (in which assertions like the one in ChannelResponseBodyStream's deinit aren't checked).

* Make completion handler async

* Use NIOFilesytemFoundationCompat

* Fix imports

* Make sure the NIO FileHandle is always closed on error.

* Make sure the onCompleted() callback is always invoked in the error case even if trying to write the error indication to the stream fails

* Migrate old ELF stream file function to use async version under the hood

* Migrate FileMiddleware to async

* Heavily revise ChannelResponseBodyStream's logic to eliminate races, atomics, and to handle errors and the promise completions more consistently. Especially affects async streaming, but eliminates error handling issues in non-async streaming as well.

* Fix off-by-one error in FileIO.readFile(at:chunkSize:offset:byteCount:) async

* Use XCTAssertNoThrow() to simplify several of the AsyncFileTests

* Fix testSimpleETagHeaders test

* Add async versions for XCTVapor and mark wait() calls with noasync

* Fix XCTVapor error messages

* Try and make tests async

* Fix some warnings in tests

* One less test running on 8080

* Fix the tests

* Revert "Fix the tests"

This reverts commit c98f0bf.

* Hook up the response body callback for async streams

* Remove a couple of instances of FileManager

* Remove FileManager from AsyncFileTests

* Fix default file

* Fix the tests

* Rework it to reduce all the returns

* Update Sources/Vapor/Utilities/FileIO.swift

Co-authored-by: Gwynne Raskind <gwynne@vapor.codes>

* PR Reviews

* Fix merge issue:

* Test the correct behaviour

---------

Co-authored-by: Gwynne Raskind <gwynne@vapor.codes>
  • Loading branch information
0xTim and gwynne committed May 7, 2024
1 parent af86ea4 commit cdbbd04
Show file tree
Hide file tree
Showing 14 changed files with 795 additions and 226 deletions.
4 changes: 2 additions & 2 deletions Package.swift
Expand Up @@ -34,7 +34,7 @@ let package = Package(
.package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"),

// Event-driven network application framework for high performance protocol servers & clients, non-blocking.
.package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"),

// Bindings to OpenSSL-compatible libraries for TLS support in SwiftNIO
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
Expand Down Expand Up @@ -92,8 +92,8 @@ let package = Package(
.product(name: "WebSocketKit", package: "websocket-kit"),
.product(name: "MultipartKit", package: "multipart-kit"),
.product(name: "Atomics", package: "swift-atomics"),

.product(name: "_NIOFileSystem", package: "swift-nio"),
.product(name: "_NIOFileSystemFoundationCompat", package: "swift-nio"),
]),

// Development
Expand Down
4 changes: 2 additions & 2 deletions Package@swift-5.9.swift
Expand Up @@ -31,7 +31,7 @@ let package = Package(
.package(url: "https://github.com/vapor/routing-kit.git", from: "4.9.0"),

// Event-driven network application framework for high performance protocol servers & clients, non-blocking.
.package(url: "https://github.com/apple/swift-nio.git", from: "2.63.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"),

// Bindings to OpenSSL-compatible libraries for TLS support in SwiftNIO
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
Expand Down Expand Up @@ -90,8 +90,8 @@ let package = Package(
.product(name: "WebSocketKit", package: "websocket-kit"),
.product(name: "MultipartKit", package: "multipart-kit"),
.product(name: "Atomics", package: "swift-atomics"),

.product(name: "_NIOFileSystem", package: "swift-nio"),
.product(name: "_NIOFileSystemFoundationCompat", package: "swift-nio"),
],
swiftSettings: [.enableExperimentalFeature("StrictConcurrency=complete")]
),
Expand Down
15 changes: 12 additions & 3 deletions Sources/Vapor/HTTP/Server/HTTPServerHandler.swift
Expand Up @@ -59,6 +59,10 @@ final class HTTPServerHandler: ChannelInboundHandler, RemovableChannelHandler {
case .failure(let error):
if case .stream(let stream) = response.body.storage {
stream.callback(ErrorBodyStreamWriter(eventLoop: request.eventLoop, error: error))
} else if case .asyncStream(let stream) = response.body.storage {
Task {
try? await stream.callback(ErrorBodyStreamWriter(eventLoop: request.eventLoop, error: error))
}
}
handler.errorCaught(context: context, error: error)
}
Expand All @@ -77,10 +81,15 @@ final class HTTPServerHandler: ChannelInboundHandler, RemovableChannelHandler {
}
}

struct ErrorBodyStreamWriter: BodyStreamWriter {
var eventLoop: EventLoop
var error: Error
fileprivate struct ErrorBodyStreamWriter: BodyStreamWriter, AsyncBodyStreamWriter {
let eventLoop: EventLoop
let error: Error

func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
promise?.fail(error)
}

func write(_ result: BodyStreamResult) async throws {
throw error
}
}
96 changes: 66 additions & 30 deletions Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift
@@ -1,7 +1,6 @@
import NIOCore
import NIOHTTP1
import NIOConcurrencyHelpers
import Atomics

final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelHandler {
typealias OutboundIn = Response
Expand Down Expand Up @@ -74,12 +73,21 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
let channelStream = ChannelResponseBodyStream(
context: context,
handler: self,
promise: nil,
promise: promise,
count: stream.count == -1 ? nil : stream.count
)

promise?.completeWithTask {
try await stream.callback(channelStream)
Task {
do {
try await stream.callback(channelStream)
// We assert in ChannelResponseBodyStream that either .end or .error gets sent, so once we
// get here the promise can be assumed to already be completed. However, just in case, succeed
// it here anyway. This guarantees we never leave the callback without completing the promise
// one way or the other in release builds.
promise?.succeed()
} catch {
promise?.fail(error)
}
}
}
}
Expand All @@ -100,13 +108,14 @@ private final class ChannelResponseBodyStream: BodyStreamWriter, AsyncBodyStream
let handlerBox: NIOLoopBound<HTTPServerResponseEncoder>
let promise: EventLoopPromise<Void>?
let count: Int?
let currentCount: ManagedAtomic<Int>
let isComplete: ManagedAtomic<Bool>
let currentCount: NIOLoopBoundBox<Int>
let isComplete: NIOLockedValueBox<Bool>
let eventLoop: EventLoop

enum Error: Swift.Error {
case tooManyBytes
case notEnoughBytes
case apiMisuse // tried to send a buffer or end indication after already ending or erroring the stream
}

init(
Expand All @@ -115,52 +124,79 @@ private final class ChannelResponseBodyStream: BodyStreamWriter, AsyncBodyStream
promise: EventLoopPromise<Void>?,
count: Int?
) {
context.eventLoop.assertInEventLoop()

self.contextBox = .init(context, eventLoop: context.eventLoop)
self.handlerBox = .init(handler, eventLoop: context.eventLoop)
self.promise = promise
self.count = count
self.currentCount = .init(0)
self.currentCount = .init(0, eventLoop: context.eventLoop)
self.isComplete = .init(false)
self.eventLoop = context.eventLoop
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write(result, promise: promise)
return promise.futureResult
}.get()
let promise = self.eventLoop.makePromise(of: Void.self)

self.eventLoop.execute { self.write(result, promise: promise) }
try await promise.futureResult.get()
}

/// > Note: `self.promise` is the promise that completes the original write to `HTTPServerResponseEncoder` that
/// > triggers the streaming response; it should only be succeeded when the stream ends. The `promise` parameter
/// > of this method is specific to the particular invocation and signals that a buffer has finished writing or
/// > that the stream has been fully completed, and should always be completed or pending completion by the time
/// > this method returns. Both promises should be failed when an error occurs, unless otherwise specifically noted.
func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop() // Only check in debug, just in case...

func finishStream() {
self.isComplete.withLockedValue { $0 = true }
self.contextBox.value.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
// Don't forward the current promise (if any) to the write completion of the end-response signal, as we
// will be notified of errors through other paths and can get spurious I/O errors from this write that
// ought to be ignored.
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.end(nil)), promise: nil)
}

// See https://github.com/vapor/vapor/issues/2976 for why we do some of these checks.
switch result {
case .buffer(let buffer):
// See: https://github.com/vapor/vapor/issues/2976
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
if let count = self.count, self.currentCount.wrappingIncrementThenLoad(by: buffer.readableBytes, ordering: .sequentiallyConsistent) > count {
guard !self.isComplete.withLockedValue({ $0 }) else { // Don't try to send data if we already ended
return promise?.fail(Error.apiMisuse) ?? () // self.promise is already completed, so fail the local one and bail
}
if let count = self.count, (self.currentCount.value + buffer.readableBytes) > count {
self.promise?.fail(Error.tooManyBytes)
promise?.fail(Error.notEnoughBytes)
promise?.fail(Error.tooManyBytes)
} else {
self.currentCount.value += buffer.readableBytes
// Cascade the completion of the buffer write to the local promise (if any).
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
}
case .end:
// See: https://github.com/vapor/vapor/issues/2976
self.isComplete.store(true, ordering: .sequentiallyConsistent)
if let count = self.count, self.currentCount.load(ordering: .sequentiallyConsistent) < count {
self.promise?.fail(Error.notEnoughBytes)
promise?.fail(Error.notEnoughBytes)
if !self.isComplete.withLockedValue({ $0 }) { // Don't send the response end events more than once.
finishStream()
// check this only after sending the stream end; we want to make send that regardless
if let count = self.count, self.currentCount.value < count {
self.promise?.fail(Error.notEnoughBytes)
promise?.fail(Error.notEnoughBytes)
} else {
self.promise?.succeed()
promise?.succeed()
}
} else {
promise?.fail(Error.apiMisuse) // If we already ended, fail the local promise with API misuse
}
self.contextBox.value.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.end(nil)), promise: promise)
self.promise?.succeed(())
case .error(let error):
self.isComplete.store(true, ordering: .relaxed)
self.contextBox.value.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.end(nil)), promise: promise)
self.promise?.fail(error)
if !self.isComplete.withLockedValue({ $0 }) { // Don't send the response end events more than once.
finishStream()
self.promise?.fail(error)
}
promise?.fail(error) // We want to fail the local promise regardless. Echo the error back.
}
}

deinit {
assert(self.isComplete.load(ordering: .sequentiallyConsistent), "Response body stream writer deinitialized before .end or .error was sent.")
assert(self.isComplete.withLockedValue { $0 }, "Response body stream writer deinitialized before .end or .error was sent.")
}
}
72 changes: 33 additions & 39 deletions Sources/Vapor/Middleware/FileMiddleware.swift
@@ -1,10 +1,11 @@
import Foundation
import NIOCore
import _NIOFileSystem

/// Serves static files from a public directory.
///
/// `FileMiddleware` will default to `DirectoryConfig`'s working directory with `"/Public"` appended.
public final class FileMiddleware: Middleware {
public final class FileMiddleware: AsyncMiddleware {
/// The public directory. Guaranteed to end with a slash.
private let publicDirectory: String
private let defaultFile: String?
Expand Down Expand Up @@ -47,62 +48,55 @@ public final class FileMiddleware: Middleware {
self.advancedETagComparison = advancedETagComparison
}

public func respond(to request: Request, chainingTo next: Responder) -> EventLoopFuture<Response> {
public func respond(to request: Request, chainingTo next: any AsyncResponder) async throws -> Response {
// make a copy of the percent-decoded path
guard var path = request.url.path.removingPercentEncoding else {
return request.eventLoop.makeFailedFuture(Abort(.badRequest))
throw Abort(.badRequest)
}

// path must be relative.
path = path.removeLeadingSlashes()

// protect against relative paths
guard !path.contains("../") else {
return request.eventLoop.makeFailedFuture(Abort(.forbidden))
throw Abort(.forbidden)
}

// create absolute path
var absPath = self.publicDirectory + path

// check if path exists and whether it is a directory
var isDir: ObjCBool = false
guard FileManager.default.fileExists(atPath: absPath, isDirectory: &isDir) else {
return next.respond(to: request)
}

if isDir.boolValue {
guard absPath.hasSuffix("/") else {
switch directoryAction.kind {
case .redirect:
var redirectUrl = request.url
redirectUrl.path += "/"
return request.eventLoop.future(
request.redirect(to: redirectUrl.string, redirectType: .permanent)
)
case .none:
return next.respond(to: request)
if let fileInfo = try await FileSystem.shared.info(forFileAt: .init(absPath)) {
// path exists, check for directory or file
if fileInfo.type == .directory {
// directory exists, see if we can return a file
if absPath.hasSuffix("/") {
// If a directory, check for the default file
if let defaultFile = defaultFile {
if defaultFile.isAbsolute() {
absPath = self.publicDirectory + defaultFile.removeLeadingSlashes()
} else {
absPath = absPath + defaultFile
}

if try await FileSystem.shared.info(forFileAt: .init(absPath)) != nil {
// If the default file exists, stream it
return try await request.fileio.asyncStreamFile(at: absPath, advancedETagComparison: advancedETagComparison)
}
}
} else {
if directoryAction.kind == .redirect {
var redirectUrl = request.url
redirectUrl.path += "/"
return request.redirect(to: redirectUrl.string, redirectType: .permanent)
}
}
}

// If a directory, check for the default file
guard let defaultFile = defaultFile else {
return next.respond(to: request)
}

if defaultFile.isAbsolute() {
absPath = self.publicDirectory + defaultFile.removeLeadingSlashes()
} else {
absPath = absPath + defaultFile
}

// If the default file doesn't exist, pass on request
guard FileManager.default.fileExists(atPath: absPath) else {
return next.respond(to: request)
// file exists, stream it
return try await request.fileio.asyncStreamFile(at: absPath, advancedETagComparison: advancedETagComparison)
}
}

// stream the file
return request.fileio.streamFile(at: absPath, advancedETagComparison: advancedETagComparison)

return try await next.respond(to: request)
}

/// Creates a new `FileMiddleware` for a server contained in an Xcode Project.
Expand Down
16 changes: 15 additions & 1 deletion Sources/Vapor/Response/Response+Body.swift
Expand Up @@ -108,6 +108,13 @@ extension Response {
stream.callback(collector)
return collector.promise.futureResult
.map { $0 }
case .asyncStream(let stream):
let collector = ResponseBodyCollector(eventLoop: eventLoop, byteBufferAllocator: self.byteBufferAllocator)
return eventLoop.makeFutureWithTask {
try await stream.callback(collector)
}.flatMap {
collector.promise.futureResult.map { $0 }
}
default:
return eventLoop.makeSucceededFuture(self.buffer)
}
Expand Down Expand Up @@ -242,7 +249,7 @@ extension Response {
// Since all buffer mutation is done on the event loop, we can be unchecked here.
// This removes the need for a lock and performance hits from that
// Any changes to this type need to be carefully considered
private final class ResponseBodyCollector: BodyStreamWriter, @unchecked Sendable {
private final class ResponseBodyCollector: BodyStreamWriter, AsyncBodyStreamWriter, @unchecked Sendable {
var buffer: ByteBuffer
let eventLoop: EventLoop
let promise: EventLoopPromise<ByteBuffer>
Expand All @@ -268,4 +275,11 @@ private final class ResponseBodyCollector: BodyStreamWriter, @unchecked Sendable
// Fixes an issue where errors in the stream should fail the individual write promise.
if let promise { future.cascade(to: promise) }
}

func write(_ result: BodyStreamResult) async throws {
let promise = self.eventLoop.makePromise(of: Void.self)

self.eventLoop.execute { self.write(result, promise: promise) }
try await promise.futureResult.get()
}
}

0 comments on commit cdbbd04

Please sign in to comment.