Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Server-Sent Events #2960

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 29 additions & 17 deletions Sources/Development/routes.swift
Expand Up @@ -11,16 +11,16 @@ public func routes(_ app: Application) throws {
app.on(.GET, "ping") { req -> StaticString in
return "123" as StaticString
}


// ( echo -e 'POST /slow-stream HTTP/1.1\r\nContent-Length: 1000000000\r\n\r\n'; dd if=/dev/zero; ) | nc localhost 8080
app.on(.POST, "slow-stream", body: .stream) { req -> EventLoopFuture<String> in
let done = req.eventLoop.makePromise(of: String.self)

var total = 0
req.body.drain { result in
let promise = req.eventLoop.makePromise(of: Void.self)

switch result {
case .buffer(let buffer):
req.eventLoop.scheduleTask(in: .milliseconds(1000)) {
Expand All @@ -33,7 +33,7 @@ public func routes(_ app: Application) throws {
promise.succeed(())
done.succeed(total.description)
}

// manually return pre-completed future
// this should balloon in memory
// return req.eventLoop.makeSucceededFuture(())
Expand All @@ -42,14 +42,14 @@ public func routes(_ app: Application) throws {
// this should use very little memory
return promise.futureResult
}

return done.futureResult
}

app.get("test", "head") { req -> String in
return "OK!"
}

app.post("test", "head") { req -> String in
return "OK!"
}
Expand All @@ -62,7 +62,7 @@ public func routes(_ app: Application) throws {
app.on(.POST, "large-file", body: .collect(maxSize: 1_000_000_000)) { req -> String in
return req.body.data?.readableBytes.description ?? "none"
}

app.get("json") { req -> [String: String] in
return ["foo": "bar"]
}.description("returns some test json")
Expand All @@ -74,7 +74,7 @@ public func routes(_ app: Application) throws {
ws.close(promise: nil)
}
}

let ip = req.remoteAddress?.description ?? "<no ip>"
ws.send("Hello 👋 \(ip)")
}
Expand All @@ -94,15 +94,15 @@ public func routes(_ app: Application) throws {
}
return promise.futureResult
}

app.get("shutdown") { req -> HTTPStatus in
guard let running = req.application.running else {
throw Abort(.internalServerError)
}
running.stop()
return .ok
}

let cache = MemoryCache()
app.get("cache", "get", ":key") { req -> String in
guard let key = req.parameters.get("key") else {
Expand All @@ -120,15 +120,15 @@ public func routes(_ app: Application) throws {
cache.set(key, to: value)
return "\(key) = \(value)"
}

app.get("hello", ":name") { req in
return req.parameters.get("name") ?? "<nil>"
}

app.get("search") { req in
return req.query["q"] ?? "none"
}

let sessions = app.grouped("sessions")
.grouped(app.sessions.middleware)
sessions.get("set", ":value") { req -> HTTPStatus in
Expand All @@ -142,11 +142,11 @@ public func routes(_ app: Application) throws {
req.session.destroy()
return "done"
}

app.get("client") { req in
return req.client.get("http://httpbin.org/status/201").map { $0.description }
}

app.get("client-json") { req -> EventLoopFuture<String> in
struct HTTPBinResponse: Decodable {
struct Slideshow: Decodable {
Expand All @@ -159,6 +159,18 @@ public func routes(_ app: Application) throws {
.map { $0.slideshow.title }
}

app.grouped(CORSMiddleware()).get("sse") { req in
try await req.serverSentEvents { producer in
for i in 0..<1000 {
let event = SSEvent(
data: SSEValue(string: "\(i)")
)
try await producer.sendEvent(event)
try await Task.sleep(nanoseconds: 1_000_000_000) // 1 sec
}
}
}

let users = app.grouped("users")
users.get { req in
return "users"
Expand Down
11 changes: 11 additions & 0 deletions Sources/Vapor/HTTP/BodyStream.swift
Expand Up @@ -51,3 +51,14 @@ extension BodyStreamWriter {
return promise.futureResult
}
}

public protocol AsyncBodyStreamWriter {
func write(_ result: BodyStreamResult) async throws
func writeBuffer(_ buffer: ByteBuffer) async throws
}

extension AsyncBodyStreamWriter {
public func writeBuffer(_ buffer: ByteBuffer) async throws {
try await write(.buffer(buffer))
}
}
2 changes: 2 additions & 0 deletions Sources/Vapor/HTTP/Headers/HTTPMediaType.swift
Expand Up @@ -236,6 +236,8 @@ public extension HTTPMediaType {
static let avi = HTTPMediaType(type: "video", subType: "avi")
/// MPEG video.
static let mpeg = HTTPMediaType(type: "video", subType: "mpeg")
/// Event-Stream.
static let eventStream = HTTPMediaType(type: "text", subType: "event-stream")
}

// MARK: Extensions
Expand Down
25 changes: 23 additions & 2 deletions Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift
Expand Up @@ -32,7 +32,6 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
status: response.status,
headers: response.headers
))), promise: nil)


if response.status == .noContent || response.forHeadRequest {
// don't send bodies for 204 (no content) responses
Expand Down Expand Up @@ -66,6 +65,17 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
count: stream.count == -1 ? nil : stream.count
)
stream.callback(channelStream)
case .asyncStream(let stream):
let channelStream = ChannelResponseBodyStream(
context: context,
handler: self,
promise: nil,
count: stream.count == -1 ? nil : stream.count
)

promise?.completeWithTask {
try await stream.callback(channelStream)
}
}
}
}
Expand All @@ -80,7 +90,7 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
}
}

private final class ChannelResponseBodyStream: BodyStreamWriter {
private final class ChannelResponseBodyStream: BodyStreamWriter, AsyncBodyStreamWriter {
let context: ChannelHandlerContext
let handler: HTTPServerResponseEncoder
let promise: EventLoopPromise<Void>?
Expand Down Expand Up @@ -111,16 +121,27 @@ private final class ChannelResponseBodyStream: BodyStreamWriter {
self.isComplete = false
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write(result, promise: promise)
return promise.futureResult
}.get()
}

func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
switch result {
case .buffer(let buffer):
// TODO: What if it's complete at this point?
self.context.writeAndFlush(self.handler.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
self.currentCount += buffer.readableBytes
if let count = self.count, self.currentCount > count {
self.promise?.fail(Error.tooManyBytes)
promise?.fail(Error.notEnoughBytes)
}
case .end:
// TODO: What if it's already complete?
self.isComplete = true
if let count = self.count, self.currentCount != count {
self.promise?.fail(Error.notEnoughBytes)
Expand Down
11 changes: 10 additions & 1 deletion Sources/Vapor/Request/Request+BodyStream.swift
Expand Up @@ -2,7 +2,7 @@ import NIOCore
import NIOConcurrencyHelpers

extension Request {
final class BodyStream: BodyStreamWriter {
final class BodyStream: BodyStreamWriter, AsyncBodyStreamWriter {
let eventLoop: EventLoop

var isBeingRead: Bool {
Expand All @@ -28,6 +28,15 @@ extension Request {
}
self.buffer = []
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write0(result, promise: promise)
return promise.futureResult
}.get()
}

func write(_ chunk: BodyStreamResult, promise: EventLoopPromise<Void>?) {
// See https://github.com/vapor/vapor/issues/2906
Expand Down
35 changes: 35 additions & 0 deletions Sources/Vapor/Response/Response+Body.swift
Expand Up @@ -6,6 +6,11 @@ extension Response {
let count: Int
let callback: (BodyStreamWriter) -> ()
}

struct AsyncBodyStream {
let count: Int
let callback: @Sendable (AsyncBodyStreamWriter) async throws -> ()
}

/// Represents a `Response`'s body.
///
Expand All @@ -23,6 +28,7 @@ extension Response {
case staticString(StaticString)
case string(String)
case stream(BodyStream)
case asyncStream(AsyncBodyStream)
}

/// An empty `Response.Body`.
Expand Down Expand Up @@ -50,6 +56,7 @@ extension Response {
case .buffer(let buffer): return buffer.readableBytes
case .none: return 0
case .stream(let stream): return stream.count
case .asyncStream(let stream): return stream.count
}
}

Expand All @@ -63,6 +70,7 @@ extension Response {
case .string(let string): return Data(string.utf8)
case .none: return nil
case .stream: return nil
case .asyncStream: return nil
}
}

Expand All @@ -83,6 +91,7 @@ extension Response {
return buffer
case .none: return nil
case .stream: return nil
case .asyncStream: return nil
}
}

Expand All @@ -108,6 +117,7 @@ extension Response {
case .staticString(let string): return string.description
case .string(let string): return string
case .stream: return "<stream>"
case .asyncStream: return "<async stream>"
}
}

Expand Down Expand Up @@ -159,6 +169,31 @@ extension Response {
self.init(stream: stream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.byteBufferAllocator = byteBufferAllocator
self.storage = .asyncStream(.init(count: count, callback: asyncStream))
}

public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.init(asyncStream: asyncStream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.byteBufferAllocator = byteBufferAllocator
self.storage = .asyncStream(.init(count: count) { writer in
do {
try await managedAsyncStream(writer)
try await writer.write(.end)
} catch {
try await writer.write(.error(error))
}
})
}

public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.init(managedAsyncStream: managedAsyncStream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// `ExpressibleByStringLiteral` conformance.
public init(stringLiteral value: String) {
self.byteBufferAllocator = ByteBufferAllocator()
Expand Down