Skip to content

Commit

Permalink
More Sendable Conformances (#3057)
Browse files Browse the repository at this point in the history
* Make use of NIOFileHandle safe

* Fix sendable closure warnings in FileIO

* Make BodyStreamWriter Sendable

* Fix Sendable warnings in HTTPServerHandler

* Fix Sendable warnings in HTTPServerRequestDecoder

* Fix Sendable warnings in ChannelResponseBodyStream

* Fix Sendable warnings in HTTPServerUpgradeHandler

* Fix warnings in Tests

* Fix some test warnings

* Make Request+BodyStream sendable

* Use LoopBound instead of Locks as we know we're on an EventLoop

* Fix some test warnings

* Fix some test warnings

* Fix the last warnings in the tests

* Remove the unchecked Sendable on request and response

* Changes from code review

* Reduce inits of NIOLoopBound

* Remove unnecessary checks on NIOLoopBound

* Remove reminders

* Revert "Remove unnecessary checks on NIOLoopBound"

This reverts commit de0d52f.

* Improve routes

* Remove unnecessary checks on NIOLoopBound

* Show error

* Compiles but failing test

* Don't make stuff sendable when we don't need to be

* Fix code review comments

* Fix a couple of logic bugs in response

* Fix some outdated comment docs

* PR Review

* HTTPServerErrorHandler: Improve Error Handling for HTTPParserError (#2922)

* fix: HTTPServerErrorHandler catch HTTPParserError
Refs: #2921
* add reference to https://github.com/apple/swift-nio/blob/00341c92770e0a7bebdc5fda783f08765eb3ff56/Sources/NIOHTTP1/HTTPServerProtocolErrorHandler.swift
* set hasUnterminatedResponse in write
* error-handler needs to be before response encoder
* add test for invalid http
* rephrase and improve comments slightly
* remove http1 error handler from http2 pipeline
---------

Co-authored-by: Tim Condon <0xTim@users.noreply.github.com>
Co-authored-by: Gwynne Raskind <gwynne@vapor.codes>

* Fix mismatching manifests

* PR feedback

* Trigger CodeQL

* Trip CI

---------

Co-authored-by: Gwynne Raskind <gwynne@vapor.codes>
Co-authored-by: fred-sch <73998525+fred-sch@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 26, 2023
1 parent 1f2b44b commit 446b877
Show file tree
Hide file tree
Showing 23 changed files with 271 additions and 197 deletions.
6 changes: 5 additions & 1 deletion Package.swift
Expand Up @@ -34,7 +34,7 @@ let package = Package(
.package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.1"),

// Event-driven network application framework for high performance protocol servers & clients, non-blocking.
.package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.59.0"),

// Bindings to OpenSSL-compatible libraries for TLS support in SwiftNIO
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
Expand All @@ -59,6 +59,9 @@ let package = Package(

// MultipartKit, Multipart encoding and decoding
.package(url: "https://github.com/vapor/multipart-kit.git", from: "4.2.1"),

// Low-level atomic operations
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.1.0"),
],
targets: [
// C helpers
Expand Down Expand Up @@ -90,6 +93,7 @@ let package = Package(
.product(name: "RoutingKit", package: "routing-kit"),
.product(name: "WebSocketKit", package: "websocket-kit"),
.product(name: "MultipartKit", package: "multipart-kit"),
.product(name: "Atomics", package: "swift-atomics"),
]),

// Development
Expand Down
6 changes: 5 additions & 1 deletion Package@swift-5.9.swift
Expand Up @@ -31,7 +31,7 @@ let package = Package(
.package(url: "https://github.com/vapor/routing-kit.git", from: "4.5.0"),

// Event-driven network application framework for high performance protocol servers & clients, non-blocking.
.package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.59.0"),

// Bindings to OpenSSL-compatible libraries for TLS support in SwiftNIO
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.8.0"),
Expand All @@ -56,6 +56,9 @@ let package = Package(

// MultipartKit, Multipart encoding and decoding
.package(url: "https://github.com/vapor/multipart-kit.git", from: "4.2.1"),

// Low-level atomic operations
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.1.0"),
],
targets: [
// C helpers
Expand Down Expand Up @@ -88,6 +91,7 @@ let package = Package(
.product(name: "RoutingKit", package: "routing-kit"),
.product(name: "WebSocketKit", package: "websocket-kit"),
.product(name: "MultipartKit", package: "multipart-kit"),
.product(name: "Atomics", package: "swift-atomics"),
],
swiftSettings: [.enableExperimentalFeature("StrictConcurrency")]
),
Expand Down
7 changes: 4 additions & 3 deletions Sources/Development/routes.swift
Expand Up @@ -205,16 +205,17 @@ public func routes(_ app: Application) throws {
let promise = req.eventLoop.makePromise(of: HTTPStatus.self)
let fileHandleBox = NIOLoopBound(fileHandle, eventLoop: req.eventLoop)
req.body.drain { part in
let fileHandle = fileHandleBox.value
switch part {
case .buffer(let buffer):
return req.application.fileio.write(
fileHandle: fileHandleBox.value,
fileHandle: fileHandle,
buffer: buffer,
eventLoop: req.eventLoop
)
case .error(let drainError):
do {
try fileHandleBox.value.close()
try fileHandle.close()
promise.fail(BodyStreamWritingToDiskError.streamFailure(drainError))
} catch {
promise.fail(BodyStreamWritingToDiskError.multipleFailures([
Expand All @@ -225,7 +226,7 @@ public func routes(_ app: Application) throws {
return req.eventLoop.makeSucceededFuture(())
case .end:
do {
try fileHandleBox.value.close()
try fileHandle.close()
promise.succeed(.ok)
} catch {
promise.fail(BodyStreamWritingToDiskError.fileHandleClosedFailure(error))
Expand Down
2 changes: 1 addition & 1 deletion Sources/Vapor/Authentication/BasicAuthorization.swift
Expand Up @@ -2,7 +2,7 @@ import Foundation
import NIOHTTP1

/// A basic username and password.
public struct BasicAuthorization {
public struct BasicAuthorization: Sendable {
/// The username, sometimes an email address
public let username: String

Expand Down
2 changes: 1 addition & 1 deletion Sources/Vapor/Authentication/BearerAuthorization.swift
@@ -1,7 +1,7 @@
import NIOHTTP1

/// A bearer token.
public struct BearerAuthorization {
public struct BearerAuthorization: Sendable {
/// The plaintext token
public let token: String

Expand Down
12 changes: 6 additions & 6 deletions Sources/Vapor/Concurrency/ResponseCodable+Concurrency.swift
Expand Up @@ -7,11 +7,11 @@ import NIOHTTP1
///
/// This is the async version of `ResponseEncodable`
public protocol AsyncResponseEncodable {
/// Encodes an instance of `Self` to a `HTTPResponse`.
/// Encodes an instance of `Self` to a `Response`.
///
/// - parameters:
/// - for: The `HTTPRequest` associated with this `HTTPResponse`.
/// - returns: An `HTTPResponse`.
/// - for: The `Request` associated with this `Response`.
/// - returns: An `Response`.
func encodeResponse(for request: Request) async throws -> Response
}

Expand All @@ -21,10 +21,10 @@ public protocol AsyncResponseEncodable {
///
/// This is the async version of `RequestDecodable`
public protocol AsyncRequestDecodable {
/// Decodes an instance of `HTTPRequest` to a `Self`.
/// Decodes an instance of `Request` to a `Self`.
///
/// - parameters:
/// - request: The `HTTPRequest` to be decoded.
/// - request: The `Request` to be decoded.
/// - returns: An asynchronous `Self`.
static func decodeRequest(_ request: Request) async throws -> Self
}
Expand All @@ -39,7 +39,7 @@ extension Request: AsyncRequestDecodable {
extension AsyncResponseEncodable {
/// Asynchronously encodes `Self` into a `Response`, setting the supplied status and headers.
///
/// router.post("users") { req async throws -> HTTPResponse in
/// router.post("users") { req async throws -> Response in
/// return try await req.content
/// .decode(User.self)
/// .save(on: req)
Expand Down
4 changes: 2 additions & 2 deletions Sources/Vapor/HTTP/BodyStream.swift
@@ -1,6 +1,6 @@
import NIOCore

public enum BodyStreamResult {
public enum BodyStreamResult: Sendable {
/// A normal data chunk.
/// There will be 0 or more of these.
case buffer(ByteBuffer)
Expand Down Expand Up @@ -39,7 +39,7 @@ extension BodyStreamResult: CustomDebugStringConvertible {
}
}

public protocol BodyStreamWriter {
public protocol BodyStreamWriter: Sendable {
var eventLoop: EventLoop { get }
func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?)
}
Expand Down
8 changes: 6 additions & 2 deletions Sources/Vapor/HTTP/Server/HTTPServerHandler.swift
Expand Up @@ -16,9 +16,11 @@ final class HTTPServerHandler: ChannelInboundHandler, RemovableChannelHandler {
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let box = NIOLoopBound((context, self), eventLoop: context.eventLoop)
let request = self.unwrapInboundIn(data)
self.responder.respond(to: request).whenComplete { response in
self.serialize(response, for: request, context: context)
let (context, handler) = box.value
handler.serialize(response, for: request, context: context)
}
}

Expand All @@ -45,7 +47,9 @@ final class HTTPServerHandler: ChannelInboundHandler, RemovableChannelHandler {
}
response.headers.add(name: .connection, value: keepAlive ? "keep-alive" : "close")
let done = context.write(self.wrapOutboundOut(response))
let box = NIOLoopBound((context, self), eventLoop: context.eventLoop)
done.whenComplete { result in
let (context, handler) = box.value
switch result {
case .success:
if !keepAlive {
Expand All @@ -55,7 +59,7 @@ final class HTTPServerHandler: ChannelInboundHandler, RemovableChannelHandler {
if case .stream(let stream) = response.body.storage {
stream.callback(ErrorBodyStreamWriter(eventLoop: request.eventLoop, error: error))
}
self.errorCaught(context: context, error: error)
handler.errorCaught(context: context, error: error)
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions Sources/Vapor/HTTP/Server/HTTPServerRequestDecoder.swift
Expand Up @@ -158,19 +158,21 @@ final class HTTPServerRequestDecoder: ChannelDuplexHandler, RemovableChannelHand
switch result.action {
case .nothing: break
case .write(let buffer):
let box = NIOLoopBound((context, self), eventLoop: context.eventLoop)
stream.write(.buffer(buffer)).whenComplete { writeResult in
let (context, handler) = box.value
switch writeResult {
case .failure(let error):
self.handleBodyStreamStateResult(
handler.handleBodyStreamStateResult(
context: context,
self.bodyStreamState.didError(error),
handler.bodyStreamState.didError(error),
stream: stream
)
case .success: break
}
self.handleBodyStreamStateResult(
handler.handleBodyStreamStateResult(
context: context,
self.bodyStreamState.didWrite(),
handler.bodyStreamState.didWrite(),
stream: stream
)
}
Expand Down
45 changes: 22 additions & 23 deletions Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift
@@ -1,5 +1,7 @@
import NIOCore
import NIOHTTP1
import NIOConcurrencyHelpers
import Atomics

final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelHandler {
typealias OutboundIn = Response
Expand Down Expand Up @@ -81,16 +83,13 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
}

private final class ChannelResponseBodyStream: BodyStreamWriter {
let context: ChannelHandlerContext
let handler: HTTPServerResponseEncoder
let contextBox: NIOLoopBound<ChannelHandlerContext>
let handlerBox: NIOLoopBound<HTTPServerResponseEncoder>
let promise: EventLoopPromise<Void>?
let count: Int?
var currentCount: Int
var isComplete: Bool

var eventLoop: EventLoop {
return self.context.eventLoop
}
let currentCount: ManagedAtomic<Int>
let isComplete: ManagedAtomic<Bool>
let eventLoop: EventLoop

enum Error: Swift.Error {
case tooManyBytes
Expand All @@ -103,41 +102,41 @@ private final class ChannelResponseBodyStream: BodyStreamWriter {
promise: EventLoopPromise<Void>?,
count: Int?
) {
self.context = context
self.handler = handler
self.contextBox = .init(context, eventLoop: context.eventLoop)
self.handlerBox = .init(handler, eventLoop: context.eventLoop)
self.promise = promise
self.count = count
self.currentCount = 0
self.isComplete = false
self.currentCount = .init(0)
self.isComplete = .init(false)
self.eventLoop = context.eventLoop
}

func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
switch result {
case .buffer(let buffer):
self.context.writeAndFlush(self.handler.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
self.currentCount += buffer.readableBytes
if let count = self.count, self.currentCount > count {
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
if let count = self.count, self.currentCount.wrappingIncrementThenLoad(by: buffer.readableBytes, ordering: .sequentiallyConsistent) > count {
self.promise?.fail(Error.tooManyBytes)
promise?.fail(Error.notEnoughBytes)
}
case .end:
self.isComplete = true
if let count = self.count, self.currentCount != count {
self.isComplete.store(true, ordering: .sequentiallyConsistent)
if let count = self.count, self.currentCount.load(ordering: .sequentiallyConsistent) < count {
self.promise?.fail(Error.notEnoughBytes)
promise?.fail(Error.notEnoughBytes)
}
self.context.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
self.context.writeAndFlush(self.handler.wrapOutboundOut(.end(nil)), promise: promise)
self.contextBox.value.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.end(nil)), promise: promise)
self.promise?.succeed(())
case .error(let error):
self.isComplete = true
self.context.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
self.context.writeAndFlush(self.handler.wrapOutboundOut(.end(nil)), promise: promise)
self.isComplete.store(true, ordering: .relaxed)
self.contextBox.value.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
self.contextBox.value.writeAndFlush(self.handlerBox.value.wrapOutboundOut(.end(nil)), promise: promise)
self.promise?.fail(error)
}
}

deinit {
assert(self.isComplete, "Response body stream writer deinitialized before .end or .error was sent.")
assert(self.isComplete.load(ordering: .sequentiallyConsistent), "Response body stream writer deinitialized before .end or .error was sent.")
}
}
35 changes: 26 additions & 9 deletions Sources/Vapor/HTTP/Server/HTTPServerUpgradeHandler.swift
Expand Up @@ -46,36 +46,53 @@ final class HTTPServerUpgradeHandler: ChannelDuplexHandler, RemovableChannelHand
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let res = self.unwrapOutboundIn(data)

struct SendableBox {
let context: ChannelHandlerContext
let buffer: UpgradeBufferHandler
var handler: HTTPServerUpgradeHandler
let protocolUpgrader: HTTPServerProtocolUpgrader
}

// check upgrade
switch self.upgradeState {
case .pending(let req, let buffer):
self.upgradeState = .upgraded
if res.status == .switchingProtocols, let upgrader = res.upgrader {
let protocolUpgrader = upgrader.applyUpgrade(req: req, res: res)
let sendableBox = SendableBox(
context: context,
buffer: buffer,
handler: self,
protocolUpgrader: protocolUpgrader)
let box = NIOLoopBound(sendableBox, eventLoop: context.eventLoop)

var head = HTTPRequestHead(
let head = HTTPRequestHead(
version: req.version,
method: req.method,
uri: req.url.string
uri: req.url.string,
headers: req.headers
)
head.headers = req.headers

protocolUpgrader.buildUpgradeResponse(
channel: context.channel,
upgradeRequest: head,
initialResponseHeaders: [:]
).map { headers in
let sendableBox = box.value
res.headers = headers
context.write(self.wrapOutboundOut(res), promise: promise)
sendableBox.context.write(sendableBox.handler.wrapOutboundOut(res), promise: promise)
}.flatMap {
let handlers: [RemovableChannelHandler] = [self] + self.httpHandlers
let sendableBox = box.value
let handlers: [RemovableChannelHandler] = [sendableBox.handler] + sendableBox.handler.httpHandlers
return .andAllComplete(handlers.map { handler in
return context.pipeline.removeHandler(handler)
}, on: context.eventLoop)
return sendableBox.context.pipeline.removeHandler(handler)
}, on: box.value.context.eventLoop)
}.flatMap {
return protocolUpgrader.upgrade(context: context, upgradeRequest: head)
let sendableBox = box.value
return sendableBox.protocolUpgrader.upgrade(context: sendableBox.context, upgradeRequest: head)
}.flatMap {
return context.pipeline.removeHandler(buffer)
let sendableBox = box.value
return sendableBox.context.pipeline.removeHandler(sendableBox.buffer)
}.cascadeFailure(to: promise)
} else {
// reset handlers
Expand Down
4 changes: 2 additions & 2 deletions Sources/Vapor/Request/Request+Body.swift
@@ -1,7 +1,7 @@
import NIOCore

extension Request {
public struct Body: CustomStringConvertible {
public struct Body: CustomStringConvertible, Sendable {
let request: Request

init(_ request: Request) {
Expand All @@ -23,7 +23,7 @@ extension Request {
}
}

public func drain(_ handler: @escaping (BodyStreamResult) -> EventLoopFuture<Void>) {
public func drain(_ handler: @Sendable @escaping (BodyStreamResult) -> EventLoopFuture<Void>) {
switch self.request.bodyStorage {
case .stream(let stream):
stream.read { (result, promise) in
Expand Down

0 comments on commit 446b877

Please sign in to comment.