-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathHTTPRequestHandler.swift
206 lines (182 loc) · 8.33 KB
/
HTTPRequestHandler.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/*
* Copyright IBM Corporation 2018
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NIO
import NIOHTTP1
import NIOWebSocket
import LoggerAPI
import Foundation
import Dispatch
internal class HTTPRequestHandler: ChannelInboundHandler, RemovableChannelHandler {
/// The HTTPServer instance on which this handler is installed
var server: HTTPServer
var requestSize: Int = 0
/// The serverRequest related to this handler instance
var serverRequest: HTTPServerRequest?
/// The serverResponse related to this handler instance
var serverResponse: HTTPServerResponse?
/// We'd want to send an error response only once
var errorResponseSent = false
var keepAliveState: KeepAliveState {
set {
self.syncQueue.sync {
_keepAliveState = newValue
}
}
get {
return self.syncQueue.sync {
return _keepAliveState
}
}
}
private let syncQueue = DispatchQueue(label: "HTTPServer.keepAliveSync")
private var _keepAliveState: KeepAliveState = .unlimited
static let keepAliveTimeout: TimeInterval = 60
private(set) var clientRequestedKeepAlive = false
private(set) var enableSSLVerification = false
public init(for server: HTTPServer) {
self.server = server
self.keepAliveState = server.keepAliveState
if server.sslConfig != nil {
self.enableSSLVerification = true
}
}
public typealias InboundIn = HTTPServerRequestPart
public typealias OutboundOut = HTTPServerResponsePart
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let request = self.unwrapInboundIn(data)
// If an error response was already sent, we'd want to spare running through this for now.
// If an upgrade to WebSocket fails, both `errorCaught` and `channelRead` are triggered.
// We'd want to return the error via `errorCaught`.
if errorResponseSent { return }
switch request {
case .head(let header):
serverRequest = HTTPServerRequest(channel: context.channel, requestHead: header, enableSSL: enableSSLVerification)
if let requestSizeLimit = server.options.requestSizeLimit,
let contentLength = header.headers["Content-Length"].first,
let contentLengthValue = Int(contentLength) {
if contentLengthValue > requestSizeLimit {
do {
if let (httpStatus, response) = server.options.requestSizeResponseGenerator(requestSizeLimit, serverRequest?.remoteAddress ?? "") {
serverResponse = HTTPServerResponse(channel: context.channel, handler: self)
errorResponseSent = true
try serverResponse?.end(with: httpStatus, message: response)
}
} catch {
Log.error("Failed to send error response")
}
context.close(promise: nil)
}
}
serverRequest = HTTPServerRequest(channel: context.channel, requestHead: header, enableSSL: enableSSLVerification)
self.clientRequestedKeepAlive = header.isKeepAlive
case .body(var buffer):
requestSize += buffer.readableBytes
if let requestSizeLimit = server.options.requestSizeLimit {
if requestSize > requestSizeLimit {
do {
if let (httpStatus, response) = server.options.requestSizeResponseGenerator(requestSizeLimit,serverRequest?.remoteAddress ?? "") {
serverResponse = HTTPServerResponse(channel: context.channel, handler: self)
errorResponseSent = true
try serverResponse?.end(with: httpStatus, message: response)
}
} catch {
Log.error("Failed to send error response")
}
}
}
guard let serverRequest = serverRequest else {
Log.error("No ServerRequest available")
return
}
if serverRequest.buffer == nil {
serverRequest.buffer = BufferList(with: buffer)
} else {
serverRequest.buffer!.byteBuffer.writeBuffer(&buffer)
}
case .end:
requestSize = 0
_ = server.connectionCount.add(1)
if let connectionLimit = server.options.connectionLimit {
if server.connectionCount.load() > connectionLimit {
do {
if let (httpStatus, response) = server.options.connectionResponseGenerator(connectionLimit,serverRequest?.remoteAddress ?? "") {
serverResponse = HTTPServerResponse(channel: context.channel, handler: self)
errorResponseSent = true
try serverResponse?.end(with: httpStatus, message: response)
}
} catch {
Log.error("Failed to send error response")
}
}
}
serverResponse = HTTPServerResponse(channel: context.channel, handler: self)
//Make sure we use the latest delegate registered with the server
DispatchQueue.global().async {
guard let serverRequest = self.serverRequest, let serverResponse = self.serverResponse else { return }
let delegate = self.server.delegate ?? HTTPDummyServerDelegate()
Monitor.delegate?.started(request: serverRequest, response: serverResponse)
delegate.handle(request: serverRequest, response: serverResponse)
}
}
}
//IdleStateEvents are received on this method
public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
if event is IdleStateHandler.IdleStateEvent {
_ = context.close()
}
}
public func channelReadComplete(context: ChannelHandlerContext) {
context.flush()
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {
guard !errorResponseSent else { return }
var message: String?
switch error {
case KituraWebSocketUpgradeError.noWebSocketKeyHeader:
message = "Sec-WebSocket-Key header missing in the upgrade request"
case KituraWebSocketUpgradeError.noWebSocketVersionHeader:
message = "Sec-WebSocket-Version header missing in the upgrade request"
case KituraWebSocketUpgradeError.invalidKeyHeaderCount(_):
break
case KituraWebSocketUpgradeError.invalidVersionHeaderCount(_):
break
case KituraWebSocketUpgradeError.invalidVersionHeader(_):
message = "Only WebSocket protocol version 13 is supported"
default:
// Handle only NIOWebSocketUpgradeError here, nothing else
if let upgradeError = error as? NIOWebSocketUpgradeError, upgradeError == .unsupportedWebSocketTarget {
let target = server.latestWebSocketURI
message = "No service has been registered for the path \(target)"
} else {
context.close(promise: nil)
return
}
}
do {
serverResponse = HTTPServerResponse(channel: context.channel, handler: self)
errorResponseSent = true
try serverResponse?.end(with: .badRequest, message: message)
} catch {
Log.error("Failed to send error response")
}
}
func updateKeepAliveState() {
keepAliveState.decrement()
}
func channelInactive(context: ChannelHandlerContext) {
_ = server.connectionCount.sub(1)
}
}