Skip to content

Commit

Permalink
Fix a miscalculation in SSE's indices during parsing, by normalizing …
Browse files Browse the repository at this point in the history
…the offset to 0, using `readableBytesView.startIndex`
  • Loading branch information
Joannis committed Feb 22, 2023
1 parent 5d622a4 commit 995b7d3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
23 changes: 15 additions & 8 deletions Sources/Vapor/SSE/Client+SSE.swift
Expand Up @@ -35,7 +35,7 @@ extension HTTPClientResponse {

internal enum SSEParser {
enum ParsingStatus {
case nextField, nextEvent, haltParsing
case nextField, haltParsing
}

static func process(sse text: inout ByteBuffer) throws -> [SSEvent] {
Expand Down Expand Up @@ -69,7 +69,9 @@ internal enum SSEParser {
data.removeAll(keepingCapacity: true)
id = nil

return text.readableBytes > 0 ? .nextEvent : .haltParsing
lastEventReaderIndex = text.readerIndex

return text.readableBytes > 0 ? .nextField : .haltParsing
}

return .nextField
Expand All @@ -79,24 +81,29 @@ internal enum SSEParser {

repeat {
switch checkEndOfEventAndStream() {
case .nextEvent:
lastEventReaderIndex = text.readerIndex
fallthrough
case .nextField:
var value = ""
let colonIndex = text.readableBytesView.firstIndex(where: { byte in

let readableBytesView = text.readableBytesView
let colonIndex = readableBytesView.firstIndex(where: { byte in
byte == 0x3a // `:`
})

guard var lineEndingIndex = text.readableBytesView.firstIndex(where: { byte in
guard var lineEndingIndex = readableBytesView.firstIndex(where: { byte in
byte == 0x0a || byte == 0x0d // `\n` or `\r`
}) else {
// Reset to before this event, as we didn't fully process this
text.moveReaderIndex(to: lastEventReaderIndex)
return events
}

if let colonIndex = colonIndex {
// The indices are offset from the start of the buffer, not the start of the readable bytes
lineEndingIndex -= readableBytesView.startIndex

if var colonIndex = colonIndex {
// The indices are offset from the start of the buffer, not the start of the readable bytes
colonIndex -= readableBytesView.startIndex

guard let key = text.readString(length: colonIndex) else {
// Reset to before this event, as we didn't fully process this
text.moveReaderIndex(to: lastEventReaderIndex)
Expand Down
54 changes: 54 additions & 0 deletions Tests/VaporTests/SSETests.swift
@@ -0,0 +1,54 @@
import XCTVapor
import AsyncHTTPClient

final class SSETests: XCTestCase {
func testEndToEndUse() async throws {
let app = Application()
defer { app.shutdown() }

let allEvents = ["hello\nworld", "1", "2", "3"]

app.get("sse") { req in
try await req.serverSentEvents { producer in
for event in allEvents {
try await producer.sendEvent(.init(data: SSEValue(string: event)))
}
}
}

app.environment.arguments = ["serve"]
try app.boot()
try app.start()

guard
let localAddress = app.http.server.shared.localAddress,
let port = localAddress.port
else {
XCTFail("couldn't get port from \(app.http.server.shared.localAddress.debugDescription)")
return
}

let client = HTTPClient(eventLoopGroupProvider: .createNew)

let request = HTTPClientRequest(url: "http://localhost:\(port)/sse")
defer { _ = try? client.syncShutdown() }

let events = try await client.execute(request, timeout: .seconds(1))
.getServerSentEvents(allocator: app.allocator)

var expectedEvents = allEvents
for try await event in events {
if expectedEvents.isEmpty {
return XCTFail("Unexpected event received")
}

XCTAssertEqual(event.data.string, expectedEvents.removeFirst())
}

XCTAssertTrue(expectedEvents.isEmpty)

// retain util the end
_ = app
_ = client
}
}

0 comments on commit 995b7d3

Please sign in to comment.