/
HTTPServerResponseEncoder.swift
151 lines (135 loc) · 5.95 KB
/
HTTPServerResponseEncoder.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import NIO
import NIOHTTP1
final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelHandler {
typealias OutboundIn = Response
typealias OutboundOut = HTTPServerResponsePart
/// Optional server header.
private let serverHeader: String?
private let dateCache: RFC1123DateCache
struct ResponseEndSentEvent { }
init(serverHeader: String?, dateCache: RFC1123DateCache) {
self.serverHeader = serverHeader
self.dateCache = dateCache
}
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let response = self.unwrapOutboundIn(data)
// add a RFC1123 timestamp to the Date header to make this
// a valid request
response.headers.add(name: "date", value: self.dateCache.currentTimestamp())
if let server = self.serverHeader {
response.headers.add(name: "server", value: server)
}
// begin serializing
context.write(wrapOutboundOut(.head(.init(
version: response.version,
status: response.status,
headers: response.headers
))), promise: nil)
if response.status == .noContent || response.forHeadRequest {
// don't send bodies for 204 (no content) responses
// or HEAD requests
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: promise)
context.fireUserInboundEventTriggered(ResponseEndSentEvent())
} else {
switch response.body.storage {
case .none:
context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: promise)
context.fireUserInboundEventTriggered(ResponseEndSentEvent())
case .buffer(let buffer):
self.writeAndflush(buffer: buffer, context: context, promise: promise)
case .string(let string):
var buffer = context.channel.allocator.buffer(capacity: string.count)
buffer.writeString(string)
self.writeAndflush(buffer: buffer, context: context, promise: promise)
case .staticString(let string):
var buffer = context.channel.allocator.buffer(capacity: string.utf8CodeUnitCount)
buffer.writeStaticString(string)
self.writeAndflush(buffer: buffer, context: context, promise: promise)
case .data(let data):
var buffer = context.channel.allocator.buffer(capacity: data.count)
buffer.writeBytes(data)
self.writeAndflush(buffer: buffer, context: context, promise: promise)
case .dispatchData(let data):
var buffer = context.channel.allocator.buffer(capacity: data.count)
buffer.writeDispatchData(data)
self.writeAndflush(buffer: buffer, context: context, promise: promise)
case .stream(let stream):
let channelStream = ChannelResponseBodyStream(
context: context,
handler: self,
promise: promise,
count: stream.count
)
stream.callback(channelStream)
}
}
}
/// Writes a `ByteBuffer` to the context.
private func writeAndflush(buffer: ByteBuffer, context: ChannelHandlerContext, promise: EventLoopPromise<Void>?) {
if buffer.readableBytes > 0 {
context.write(wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
}
context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: promise)
context.fireUserInboundEventTriggered(ResponseEndSentEvent())
}
}
private final class ChannelResponseBodyStream: BodyStreamWriter {
let context: ChannelHandlerContext
let handler: HTTPServerResponseEncoder
let promise: EventLoopPromise<Void>?
let count: Int
var currentCount: Int
var isComplete: Bool
var eventLoop: EventLoop {
return self.context.eventLoop
}
enum Error: Swift.Error {
case tooManyBytes
case notEnoughBytes
}
init(
context: ChannelHandlerContext,
handler: HTTPServerResponseEncoder,
promise: EventLoopPromise<Void>?,
count: Int
) {
self.context = context
self.handler = handler
self.promise = promise
self.count = count
self.currentCount = 0
self.isComplete = false
}
func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
switch result {
case .buffer(let buffer):
self.context.writeAndFlush(self.handler.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
self.currentCount += buffer.readableBytes
guard self.currentCount <= self.count else {
self.promise?.fail(Error.tooManyBytes)
promise?.fail(Error.notEnoughBytes)
return
// self.context.close(promise: nil)
}
case .end:
self.isComplete = true
guard self.currentCount == self.count else {
self.promise?.fail(Error.notEnoughBytes)
promise?.fail(Error.notEnoughBytes)
return
// self.context.close(promise: nil)
}
self.context.writeAndFlush(self.handler.wrapOutboundOut(.end(nil)), promise: promise)
self.context.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
self.promise?.succeed(())
case .error(let error):
self.isComplete = true
self.context.writeAndFlush(self.handler.wrapOutboundOut(.end(nil)), promise: promise)
self.context.fireUserInboundEventTriggered(HTTPServerResponseEncoder.ResponseEndSentEvent())
self.promise?.fail(error)
}
}
deinit {
assert(self.isComplete, "Response body stream writer deinitialized before .end or .error was sent.")
}
}