Skip to content

Commit

Permalink
Make ServerTransport protocol possible to use with structured concurr…
Browse files Browse the repository at this point in the history
…ency (#1865)

Motivation:

The existing `ServerProtocol` has some design issues: its `listen` method returns the async sequence of streams to be handled, while at the same time it's `async` and must run for as long as the transport is up and running. This means we'll never get the sequence of streams back in most transport implementations (such as H2).

Modifications:

This PR tries to solve this by changing the protocol so that the `listen` method now takes a closure which allows handling an individual stream. This forces the transport implementations to maintain the right structured task hierarchy, as tasks to run this stream handler will have to be added as children of the long-running "listen" task.

Result:

Server Transports can now be used in structured concurrency.
  • Loading branch information
gjcairo committed May 16, 2024
1 parent 0e9def8 commit 0b47aea
Show file tree
Hide file tree
Showing 25 changed files with 167 additions and 370 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ let packageDependencies: [Package.Dependency] = [
),
.package(
url: "https://github.com/apple/swift-nio-http2.git",
from: "1.24.1"
from: "1.31.0"
),
.package(
url: "https://github.com/apple/swift-nio-transport-services.git",
Expand Down
216 changes: 31 additions & 185 deletions Sources/GRPCCore/GRPCServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import Atomics
/// streams to a service to handle the RPC or rejects them with an appropriate error if no service
/// can handle the RPC.
///
/// A ``GRPCServer`` may listen with multiple transports (for example, HTTP/2 and in-process) and route
/// requests from each transport to the same service instance. You can also use "interceptors",
/// A ``GRPCServer`` listens with a specific transport implementation (for example, HTTP/2 or in-process),
/// and routes requests from the transport to the service instance. You can also use "interceptors",
/// to implement cross-cutting logic which apply to all accepted RPCs. Example uses of interceptors
/// include request filtering, authentication, and logging. Once requests have been intercepted
/// they are passed to a handler which in turn returns a response to send back to the client.
Expand All @@ -46,17 +46,17 @@ import Atomics
///
/// // Finally create the server.
/// let server = GRPCServer(
/// transports: [inProcessTransport],
/// transport: inProcessTransport,
/// services: [greeter, echo],
/// interceptors: [statsRecorder]
/// )
/// ```
///
/// ## Starting and stopping the server
///
/// Once you have configured the server call ``run()`` to start it. Calling ``run()`` starts each
/// of the server's transports. A ``RuntimeError`` is thrown if any of the transports can't be
/// started.
/// Once you have configured the server call ``run()`` to start it. Calling ``run()`` starts the server's
/// transport too. A ``RuntimeError`` is thrown if the transport can't be started or encounters some other
/// runtime error.
///
/// ```swift
/// // Start running the server.
Expand All @@ -73,9 +73,8 @@ import Atomics
public struct GRPCServer: Sendable {
typealias Stream = RPCStream<ServerTransport.Inbound, ServerTransport.Outbound>

/// A collection of ``ServerTransport`` implementations that the server uses to listen
/// for new requests.
private let transports: [any ServerTransport]
/// The ``ServerTransport`` implementation that the server uses to listen for new requests.
private let transport: any ServerTransport

/// The services registered which the server is serving.
private let router: RPCRouter
Expand All @@ -92,11 +91,8 @@ public struct GRPCServer: Sendable {
private let state: ManagedAtomic<State>

private enum State: UInt8, AtomicValue {
/// The server hasn't been started yet. Can transition to `starting` or `stopped`.
/// The server hasn't been started yet. Can transition to `running` or `stopped`.
case notStarted
/// The server is starting but isn't accepting requests yet. Can transition to `running`
/// and `stopping`.
case starting
/// The server is running and accepting RPCs. Can transition to `stopping`.
case running
/// The server is stopping and no new RPCs will be accepted. Existing RPCs may run to
Expand All @@ -110,15 +106,15 @@ public struct GRPCServer: Sendable {
/// Creates a new server with no resources.
///
/// - Parameters:
/// - transports: The transports the server should listen on.
/// - transport: The transport the server should listen on.
/// - services: Services offered by the server.
/// - interceptors: A collection of interceptors providing cross-cutting functionality to each
/// accepted RPC. The order in which interceptors are added reflects the order in which they
/// are called. The first interceptor added will be the first interceptor to intercept each
/// request. The last interceptor added will be the final interceptor to intercept each
/// request before calling the appropriate handler.
public init(
transports: [any ServerTransport],
transport: any ServerTransport,
services: [any RegistrableRPCService],
interceptors: [any ServerInterceptor] = []
) {
Expand All @@ -127,37 +123,37 @@ public struct GRPCServer: Sendable {
service.registerMethods(with: &router)
}

self.init(transports: transports, router: router, interceptors: interceptors)
self.init(transport: transport, router: router, interceptors: interceptors)
}

/// Creates a new server with no resources.
///
/// - Parameters:
/// - transports: The transports the server should listen on.
/// - transport: The transport the server should listen on.
/// - router: A ``RPCRouter`` used by the server to route accepted streams to method handlers.
/// - interceptors: A collection of interceptors providing cross-cutting functionality to each
/// accepted RPC. The order in which interceptors are added reflects the order in which they
/// are called. The first interceptor added will be the first interceptor to intercept each
/// request. The last interceptor added will be the final interceptor to intercept each
/// request before calling the appropriate handler.
public init(
transports: [any ServerTransport],
transport: any ServerTransport,
router: RPCRouter,
interceptors: [any ServerInterceptor] = []
) {
self.state = ManagedAtomic(.notStarted)
self.transports = transports
self.transport = transport
self.router = router
self.interceptors = interceptors
}

/// Starts the server and runs until all registered transports have closed.
/// Starts the server and runs until the registered transport has closed.
///
/// No RPCs are processed until all transports are listening. If a transport fails to start
/// listening then all open transports are closed and a ``RuntimeError`` is thrown.
/// No RPCs are processed until the configured transport is listening. If the transport fails to start
/// listening, or if it encounters a runtime error, then ``RuntimeError`` is thrown.
///
/// This function returns when all transports have stopped listening and all requests have been
/// handled. You can signal to transports that they should stop listening by calling
/// This function returns when the configured transport has stopped listening and all requests have been
/// handled. You can signal to the transport that it should stop listening by calling
/// ``stopListening()``. The server will continue to process existing requests.
///
/// To stop the server more abruptly you can cancel the task that this function is running in.
Expand All @@ -167,15 +163,15 @@ public struct GRPCServer: Sendable {
public func run() async throws {
let (wasNotStarted, actualState) = self.state.compareExchange(
expected: .notStarted,
desired: .starting,
desired: .running,
ordering: .sequentiallyConsistent
)

guard wasNotStarted else {
switch actualState {
case .notStarted:
fatalError()
case .starting, .running:
case .running:
throw RuntimeError(
code: .serverIsAlreadyRunning,
message: "The server is already running and can only be started once."
Expand All @@ -194,152 +190,16 @@ public struct GRPCServer: Sendable {
self.state.store(.stopped, ordering: .sequentiallyConsistent)
}

if self.transports.isEmpty {
throw RuntimeError(
code: .noTransportsConfigured,
message: """
Can't start server, no transports are configured. You must add at least one transport \
to the server before calling 'run()'.
"""
)
}

var listeners: [RPCAsyncSequence<Stream>] = []
listeners.reserveCapacity(self.transports.count)

for transport in self.transports {
do {
let listener = try await transport.listen()
listeners.append(listener)
} catch let cause {
// Failed to start, so start stopping.
self.state.store(.stopping, ordering: .sequentiallyConsistent)
// Some listeners may have started and have streams which need closing.
await self.rejectRequests(listeners)

throw RuntimeError(
code: .failedToStartTransport,
message: """
Server didn't start because the '\(type(of: transport))' transport threw an error \
while starting.
""",
cause: cause
)
do {
try await transport.listen { stream in
await self.router.handle(stream: stream, interceptors: self.interceptors)
}
}

// May have been told to stop listening while starting the transports.
let (wasStarting, _) = self.state.compareExchange(
expected: .starting,
desired: .running,
ordering: .sequentiallyConsistent
)

// If the server is stopping then notify the transport and then consume them: there may be
// streams opened at a lower level (e.g. HTTP/2) which are already open and need to be consumed.
if wasStarting {
await self.handleRequests(listeners)
} else {
await self.rejectRequests(listeners)
}
}

private func rejectRequests(_ listeners: [RPCAsyncSequence<Stream>]) async {
// Tell the active listeners to stop listening.
for transport in self.transports.prefix(listeners.count) {
transport.stopListening()
}

// Drain any open streams on active listeners.
await withTaskGroup(of: Void.self) { group in
let unavailable = Status(
code: .unavailable,
message: "The server isn't ready to accept requests."
} catch {
throw RuntimeError(
code: .transportError,
message: "Server transport threw an error.",
cause: error
)

for listener in listeners {
do {
for try await stream in listener {
group.addTask {
try? await stream.outbound.write(.status(unavailable, [:]))
stream.outbound.finish()
}
}
} catch {
// Suppress any errors, the original error from the transport which failed to start
// should be thrown.
}
}
}
}

private func handleRequests(_ listeners: [RPCAsyncSequence<Stream>]) async {
#if swift(>=5.9)
if #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) {
await self.handleRequestsInDiscardingTaskGroup(listeners)
} else {
await self.handleRequestsInTaskGroup(listeners)
}
#else
await self.handleRequestsInTaskGroup(listeners)
#endif
}

#if swift(>=5.9)
@available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *)
private func handleRequestsInDiscardingTaskGroup(_ listeners: [RPCAsyncSequence<Stream>]) async {
await withDiscardingTaskGroup { group in
for listener in listeners {
group.addTask {
await withDiscardingTaskGroup { subGroup in
do {
for try await stream in listener {
subGroup.addTask {
await self.router.handle(stream: stream, interceptors: self.interceptors)
}
}
} catch {
// If the listener threw then the connection must be broken, cancel all work.
subGroup.cancelAll()
}
}
}
}
}
}
#endif

private func handleRequestsInTaskGroup(_ listeners: [RPCAsyncSequence<Stream>]) async {
// If the discarding task group isn't available then fall back to using a regular task group
// with a limit on subtasks. Most servers will use an HTTP/2 based transport, most
// implementations limit connections to 100 concurrent streams. A limit of 4096 gives the server
// scope to handle nearly 41 completely saturated connections.
let maxConcurrentSubTasks = 4096
let tasks = ManagedAtomic(0)

await withTaskGroup(of: Void.self) { group in
for listener in listeners {
group.addTask {
await withTaskGroup(of: Void.self) { subGroup in
do {
for try await stream in listener {
let taskCount = tasks.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent)
if taskCount >= maxConcurrentSubTasks {
_ = await subGroup.next()
tasks.wrappingDecrement(ordering: .sequentiallyConsistent)
}

subGroup.addTask {
await self.router.handle(stream: stream, interceptors: self.interceptors)
}
}
} catch {
// If the listener threw then the connection must be broken, cancel all work.
subGroup.cancelAll()
}
}
}
}
}
}

Expand All @@ -357,9 +217,7 @@ public struct GRPCServer: Sendable {
)

if wasRunning {
for transport in self.transports {
transport.stopListening()
}
self.transport.stopListening()
} else {
switch actual {
case .notStarted:
Expand All @@ -374,18 +232,6 @@ public struct GRPCServer: Sendable {
self.stopListening()
}

case .starting:
let (exchanged, _) = self.state.compareExchange(
expected: .starting,
desired: .stopping,
ordering: .sequentiallyConsistent
)

// Lost a race with 'run()', try again.
if !exchanged {
self.stopListening()
}

case .running:
// Unreachable, this branch only happens when the initial exchange didn't take place.
fatalError()
Expand Down
12 changes: 0 additions & 12 deletions Sources/GRPCCore/RuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ extension RuntimeError {
case invalidArgument
case serverIsAlreadyRunning
case serverIsStopped
case failedToStartTransport
case noTransportsConfigured
case clientIsAlreadyRunning
case clientIsStopped
case transportError
Expand All @@ -137,16 +135,6 @@ extension RuntimeError {
Self(.serverIsStopped)
}

/// The server couldn't be started because a transport failed to start.
public static var failedToStartTransport: Self {
Self(.failedToStartTransport)
}

/// The server couldn't be started because no transports were configured.
public static var noTransportsConfigured: Self {
Self(.noTransportsConfigured)
}

/// At attempt to start the client was made but it is already running.
public static var clientIsAlreadyRunning: Self {
Self(.clientIsAlreadyRunning)
Expand Down
11 changes: 7 additions & 4 deletions Sources/GRPCCore/Transport/ServerTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@
* limitations under the License.
*/

/// A protocol server transport implementations must conform to.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public protocol ServerTransport: Sendable {
typealias Inbound = RPCAsyncSequence<RPCRequestPart>
typealias Outbound = RPCWriter<RPCResponsePart>.Closable

/// Starts the transport and returns a sequence of accepted streams to handle.
/// Starts the transport.
///
/// Implementations will typically bind to a listening port when this function is called
/// and start accepting new connections. Each accepted inbound RPC stream should be published
/// to the async sequence returned by the function.
/// and start accepting new connections. Each accepted inbound RPC stream will be handed over to
/// the provided `streamHandler` to handle accordingly.
///
/// You can call ``stopListening()`` to stop the transport from accepting new streams. Existing
/// streams must be allowed to complete naturally. However, transports may also enforce a grace
/// period after which any open streams may be cancelled. You can also cancel the task running
/// ``listen()`` to abruptly close connections and streams.
func listen() async throws -> RPCAsyncSequence<RPCStream<Inbound, Outbound>>
func listen(
_ streamHandler: @escaping (RPCStream<Inbound, Outbound>) async -> Void
) async throws

/// Indicates to the transport that no new streams should be accepted.
///
Expand Down

0 comments on commit 0b47aea

Please sign in to comment.