From 2536f278b164bba1d7a0e9f6b8249ce83d42db05 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 18 Oct 2022 06:22:05 +0300 Subject: [PATCH 1/6] Add idle connection timeouts for HTTP client's connections pool. Add timestamps and duration for both HTTP client requests/responses. Add test. --- chronos/apps/http/httpclient.nim | 166 +++++++++++++++++++++++-------- tests/testhttpclient.nim | 62 ++++++++++++ 2 files changed, 185 insertions(+), 43 deletions(-) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index f63b24c4e..4e855273e 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -22,6 +22,9 @@ const ## Timeout for connecting to host (12 sec) HttpHeadersTimeout* = 120.seconds ## Timeout for receiving response headers (120 sec) + HttpConnectionIdleTimeout* = 120.seconds + ## Time after which idle connections are removed from the HttpSession's + ## connections pool (120 sec) HttpMaxRedirections* = 10 ## Maximum number of Location redirections. HttpClientConnectionTrackerName* = "httpclient.connection" @@ -100,6 +103,7 @@ type error*: ref HttpError remoteHostname*: string flags*: set[HttpClientConnectionFlag] + timestamp*: Moment HttpClientConnectionRef* = ref HttpClientConnection @@ -109,6 +113,7 @@ type maxRedirections*: int connectTimeout*: Duration headersTimeout*: Duration + idleTimeout*: Duration connectionBufferSize*: int maxConnections*: int connectionsCount*: int @@ -140,6 +145,8 @@ type buffer*: seq[byte] writer*: HttpBodyWriter redirectCount: int + timestamp*: Moment + duration*: Duration HttpClientRequestRef* = ref HttpClientRequest @@ -160,6 +167,8 @@ type transferEncoding*: set[TransferEncodingFlags] contentLength*: uint64 contentType*: Opt[ContentTypeData] + timestamp*: Moment + duration*: Duration HttpClientResponseRef* = ref HttpClientResponse @@ -284,13 +293,40 @@ template checkClosed(reqresp: untyped): untyped = reqresp.setError(e) raise e +template setTimestamp(conn: HttpClientConnectionRef, + moment: Moment): untyped = + if not(isNil(conn)): + conn.timestamp = moment + +template setTimestamp( + reqresp: HttpClientRequestRef|HttpClientRequestRef + ): untyped = + if not(isNil(reqresp)): + let timestamp = Moment.now() + reqresp.timestamp = timestamp + reqresp.connection.setTimestamp(timestamp) + +template setTimestamp(resp: HttpClientResponseRef, moment: Moment): untyped = + if not(isNil(resp)): + resp.timestamp = moment + resp.connection.setTimestamp(moment) + +template setDuration( + reqresp: HttpClientRequestRef|HttpClientResponseRef + ): untyped = + if not(isNil(reqresp)): + let timestamp = Moment.now() + reqresp.duration = timestamp - reqresp.timestamp + reqresp.connection.setTimestamp(timestamp) + proc new*(t: typedesc[HttpSessionRef], flags: HttpClientFlags = {}, maxRedirections = HttpMaxRedirections, connectTimeout = HttpConnectTimeout, headersTimeout = HttpHeadersTimeout, connectionBufferSize = DefaultStreamBufferSize, - maxConnections = -1): HttpSessionRef {. + maxConnections = -1, + idleTimeout = HttpConnectionIdleTimeout): HttpSessionRef {. raises: [Defect] .} = ## Create new HTTP session object. ## @@ -305,6 +341,7 @@ proc new*(t: typedesc[HttpSessionRef], headersTimeout: headersTimeout, connectionBufferSize: connectionBufferSize, maxConnections: maxConnections, + idleTimeout: idleTimeout, connections: initTable[string, seq[HttpClientConnectionRef]]() ) @@ -583,6 +620,31 @@ proc connect(session: HttpSessionRef, # If all attempts to connect to the remote host have failed. raiseHttpConnectionError("Could not connect to remote host") +template isReady(conn: HttpClientConnectionRef): bool = + (conn.state == HttpClientConnectionState.Ready) and + (HttpClientConnectionFlag.KeepAlive in conn.flags) and + (HttpClientConnectionFlag.Request notin conn.flags) and + (HttpClientConnectionFlag.Response notin conn.flags) + +template isIdle(conn: HttpClientConnectionRef, timestamp: Moment, + timeout: Duration): bool = + (timestamp - conn.timestamp) >= timeout + +proc removeConnection(session: HttpSessionRef, + conn: HttpClientConnectionRef) {.async.} = + let removeHost = + block: + var res = false + session.connections.withValue(conn.remoteHostname, connections): + connections[].keepItIf(it != conn) + if len(connections[]) == 0: + res = true + res + if removeHost: + session.connections.del(conn.remoteHostname) + dec(session.connectionsCount) + await conn.closeWait() + proc acquireConnection( session: HttpSessionRef, ha: HttpAddress, @@ -604,18 +666,35 @@ proc acquireConnection( else: let conn = block: - let conns = session.connections.getOrDefault(ha.id) - if len(conns) > 0: - var res: HttpClientConnectionRef = nil - for item in conns: - if item.state == HttpClientConnectionState.Ready: - res = item - break - res - else: - nil + var idleConnections: seq[HttpClientConnectionRef] + let + timestamp = Moment.now() + res = + block: + var cres: HttpClientConnectionRef = nil + session.connections.withValue(ha.id, connections): + for connection in connections[]: + if connection.isReady(): + if connection.isIdle(timestamp, session.idleTimeout): + idleConnections.add(connection) + else: + if isNil(cres): cres = connection + cres + + if not(isNil(res)): + # We mark connection as acquired to avoid race, because we need to + # prune idle connections. + res.state = HttpClientConnectionState.Acquired + if len(idleConnections) > 0: + # Closing connections which was idle for`session.idleTimeout` + # time. + var pending: seq[Future[void]] + for connection in idleConnections: + pending.add(session.removeConnection(connection)) + await allFutures(pending) + res + if not(isNil(conn)): - conn[].state = HttpClientConnectionState.Acquired return conn else: var default: seq[HttpClientConnectionRef] @@ -629,21 +708,6 @@ proc acquireConnection( inc(session.connectionsCount) return res -proc removeConnection(session: HttpSessionRef, - conn: HttpClientConnectionRef) {.async.} = - let removeHost = - block: - var res = false - session.connections.withValue(conn.remoteHostname, connections): - connections[].keepItIf(it != conn) - if len(connections[]) == 0: - res = true - res - if removeHost: - session.connections.del(conn.remoteHostname) - dec(session.connectionsCount) - await conn.closeWait() - proc releaseConnection(session: HttpSessionRef, connection: HttpClientConnectionRef) {.async.} = ## Return connection back to the ``session``. @@ -676,7 +740,9 @@ proc releaseConnection(session: HttpSessionRef, await session.removeConnection(connection) else: connection.state = HttpClientConnectionState.Ready - connection.flags = {} + connection.flags.excl({HttpClientConnectionFlag.Request, + HttpClientConnectionFlag.Response, + HttpClientConnectionFlag.NoBody}) proc releaseConnection(request: HttpClientRequestRef) {.async.} = let @@ -793,10 +859,10 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte] case resp.version of HttpVersion11, HttpVersion20: let header = toLowerAscii(headers.getString(ConnectionHeader)) - if header == "keep-alive": - true - else: + if header == "close": false + else: + true else: false @@ -836,22 +902,27 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte] proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {. async.} = var buffer: array[HttpMaxHeadersSize, byte] - let bytesRead = - try: - await req.connection.reader.readUntil(addr buffer[0], - len(buffer), HeadersMark).wait( - req.session.headersTimeout) - except CancelledError as exc: - raise exc - except AsyncTimeoutError: - raiseHttpReadError("Reading response headers timed out") - except AsyncStreamError: - raiseHttpReadError("Could not read response headers") + let timestamp = Moment.now() + req.connection.setTimestamp(timestamp) + let + bytesRead = + try: + await req.connection.reader.readUntil(addr buffer[0], + len(buffer), HeadersMark).wait( + req.session.headersTimeout) + except CancelledError as exc: + raise exc + except AsyncTimeoutError: + raiseHttpReadError("Reading response headers timed out") + except AsyncStreamError: + raiseHttpReadError("Could not read response headers") let response = prepareResponse(req, buffer.toOpenArray(0, bytesRead - 1)) if response.isErr(): raiseHttpProtocolError(response.error()) - return response.get() + let res = response.get() + res.setTimestamp(timestamp) + return res proc new*(t: typedesc[HttpClientRequestRef], session: HttpSessionRef, ha: HttpAddress, meth: HttpMethod = MethodGet, @@ -1029,6 +1100,7 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. let headers = request.prepareRequest() request.connection.state = HttpClientConnectionState.RequestHeadersSending request.state = HttpReqRespState.Open + request.setTimestamp() await request.connection.writer.write(headers) request.connection.state = HttpClientConnectionState.RequestHeadersSent request.connection.state = HttpClientConnectionState.RequestBodySending @@ -1036,10 +1108,13 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. await request.connection.writer.write(request.buffer) request.connection.state = HttpClientConnectionState.RequestBodySent request.state = HttpReqRespState.Finished + request.setDuration() except CancelledError as exc: + request.setDuration() request.setError(newHttpInterruptError()) raise exc except AsyncStreamError: + request.setDuration() let error = newHttpWriteError("Could not send request headers") request.setError(error) raise error @@ -1079,13 +1154,16 @@ proc open*(request: HttpClientRequestRef): Future[HttpBodyWriter] {. try: let headers = request.prepareRequest() request.connection.state = HttpClientConnectionState.RequestHeadersSending + request.setTimestamp() await request.connection.writer.write(headers) request.connection.state = HttpClientConnectionState.RequestHeadersSent except CancelledError as exc: + request.setDuration() request.setError(newHttpInterruptError()) raise exc except AsyncStreamError: let error = newHttpWriteError("Could not send request headers") + request.setDuration() request.setError(error) raise error @@ -1123,6 +1201,7 @@ proc finish*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {. "Body writer instance must be closed before finish(request) call") request.state = HttpReqRespState.Finished request.connection.state = HttpClientConnectionState.RequestBodySent + request.setDuration() let response = try: await request.getResponse() @@ -1190,6 +1269,7 @@ proc finish*(response: HttpClientResponseRef) {.async.} = "Body reader instance must be closed before finish(response) call") response.connection.state = HttpClientConnectionState.ResponseBodyReceived response.state = HttpReqRespState.Finished + response.setDuration() proc getBodyBytes*(response: HttpClientResponseRef): Future[seq[byte]] {. async.} = diff --git a/tests/testhttpclient.nim b/tests/testhttpclient.nim index ec55ddf44..8d03e80c8 100644 --- a/tests/testhttpclient.nim +++ b/tests/testhttpclient.nim @@ -863,6 +863,64 @@ suite "HTTP client testing suite": return true + proc testIdleConnection(address: TransportAddress): Future[bool] {. + async.} = + let + ha = getAddress(address, HttpClientScheme.NonSecure, "/test") + + proc test( + session: HttpSessionRef, + a: HttpAddress + ): Future[TestResponseTuple] {.async.} = + + var + data: HttpResponseTuple + request = HttpClientRequestRef.new(session, a, version = HttpVersion11) + try: + data = await request.fetch() + finally: + await request.closeWait() + return (data.status, data.data.bytesToString(), 0) + + proc process(r: RequestFence): Future[HttpResponseRef] {.async.} = + if r.isOk(): + let request = r.get() + case request.uri.path + of "/test": + return await request.respond(Http200, "ok") + else: + return await request.respond(Http404, "Page not found") + else: + return dumbResponse() + + var server = createServer(address, process, false) + server.start() + let session = HttpSessionRef.new(idleTimeout = 1000.milliseconds) + try: + var f1 = test(session, ha) + var f2 = test(session, ha) + await allFutures(f1, f2) + check: + f1.finished() + f1.done() + f2.finished() + f2.done() + f1.read() == (200, "ok", 0) + f2.read() == (200, "ok", 0) + session.connectionsCount == 2 + + await sleepAsync(1100.milliseconds) + let resp = await test(session, ha) + check: + resp == (200, "ok", 0) + session.connectionsCount == 1 + finally: + await session.closeWait() + await server.stop() + await server.closeWait() + + return true + test "HTTP all request methods test": let address = initTAddress("127.0.0.1:30080") check waitFor(testMethods(address, false)) == 18 @@ -934,6 +992,10 @@ suite "HTTP client testing suite": let address = initTAddress("127.0.0.1:30080") check waitFor(testConnectionManagement(address)) == true + test "HTTP client idle connection test": + let address = initTAddress("127.0.0.1:30080") + check waitFor(testIdleConnection(address)) == true + test "Leaks test": proc getTrackerLeaks(tracker: string): bool = let tracker = getTracker(tracker) From be77bfbb1cdb8008b4087ee14a1478c7b426a87f Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 18 Oct 2022 13:01:02 +0300 Subject: [PATCH 2/6] Add comments on `connectionFlag` decisions. --- chronos/apps/http/httpclient.nim | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index 4e855273e..e1763783c 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -857,14 +857,26 @@ proc prepareResponse(request: HttpClientRequestRef, data: openArray[byte] let connectionFlag = block: case resp.version - of HttpVersion11, HttpVersion20: + of HttpVersion11: + # Keeping a connection open is the default on HTTP/1.1 requests. + # https://www.rfc-editor.org/rfc/rfc2068.html#section-19.7.1 let header = toLowerAscii(headers.getString(ConnectionHeader)) if header == "close": false else: true - else: + of HttpVersion10: + # This is the default on HTTP/1.0 requests. false + else: + # HTTP/2 does not use the Connection header field (Section 7.6.1 of + # [HTTP]) to indicate connection-specific header fields. + # https://httpwg.org/specs/rfc9113.html#rfc.section.8.2.2 + # + # HTTP/3 does not use the Connection header field to indicate + # connection-specific fields; + # https://httpwg.org/specs/rfc9114.html#rfc.section.4.2 + true let contentType = block: From 2089f2c50d48a23e956e54487892ce532dddd90a Mon Sep 17 00:00:00 2001 From: cheatfate Date: Mon, 27 Feb 2023 12:39:31 +0200 Subject: [PATCH 3/6] Address review comments. Adjust default idle connection timeout to 60 seconds. --- chronos/apps/http/httpclient.nim | 171 +++++++++++++++++++------------ tests/testhttpclient.nim | 3 +- 2 files changed, 110 insertions(+), 64 deletions(-) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index e1763783c..fa9c1256a 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -22,9 +22,12 @@ const ## Timeout for connecting to host (12 sec) HttpHeadersTimeout* = 120.seconds ## Timeout for receiving response headers (120 sec) - HttpConnectionIdleTimeout* = 120.seconds + HttpConnectionIdleTimeout* = 60.seconds ## Time after which idle connections are removed from the HttpSession's ## connections pool (120 sec) + HttpConnectionCheckPeriod* = 10.seconds + ## Period of time between idle connections checks in HttpSession's + ## connection pool (10 sec) HttpMaxRedirections* = 10 ## Maximum number of Location redirections. HttpClientConnectionTrackerName* = "httpclient.connection" @@ -113,7 +116,9 @@ type maxRedirections*: int connectTimeout*: Duration headersTimeout*: Duration - idleTimeout*: Duration + idleTimeout: Duration + idlePeriod: Duration + watcherFut: Future[void] connectionBufferSize*: int maxConnections*: int connectionsCount*: int @@ -319,6 +324,18 @@ template setDuration( reqresp.duration = timestamp - reqresp.timestamp reqresp.connection.setTimestamp(timestamp) +template isReady(conn: HttpClientConnectionRef): bool = + (conn.state == HttpClientConnectionState.Ready) and + (HttpClientConnectionFlag.KeepAlive in conn.flags) and + (HttpClientConnectionFlag.Request notin conn.flags) and + (HttpClientConnectionFlag.Response notin conn.flags) + +template isIdle(conn: HttpClientConnectionRef, timestamp: Moment, + timeout: Duration): bool = + (timestamp - conn.timestamp) >= timeout + +proc sessionWatcher(session: HttpSessionRef) {.async.} + proc new*(t: typedesc[HttpSessionRef], flags: HttpClientFlags = {}, maxRedirections = HttpMaxRedirections, @@ -326,15 +343,18 @@ proc new*(t: typedesc[HttpSessionRef], headersTimeout = HttpHeadersTimeout, connectionBufferSize = DefaultStreamBufferSize, maxConnections = -1, - idleTimeout = HttpConnectionIdleTimeout): HttpSessionRef {. + idleTimeout = HttpConnectionIdleTimeout, + idlePeriod = HttpConnectionCheckPeriod): HttpSessionRef {. raises: [Defect] .} = ## Create new HTTP session object. ## ## ``maxRedirections`` - maximum number of HTTP 3xx redirections ## ``connectTimeout`` - timeout for ongoing HTTP connection ## ``headersTimeout`` - timeout for receiving HTTP response headers + ## ``idleTimeout`` - timeout to consider HTTP connection as idle + ## ``idlePeriod`` - period of time to check HTTP connections for inactivity doAssert(maxRedirections >= 0, "maxRedirections should not be negative") - HttpSessionRef( + var res = HttpSessionRef( flags: flags, maxRedirections: maxRedirections, connectTimeout: connectTimeout, @@ -342,8 +362,11 @@ proc new*(t: typedesc[HttpSessionRef], connectionBufferSize: connectionBufferSize, maxConnections: maxConnections, idleTimeout: idleTimeout, - connections: initTable[string, seq[HttpClientConnectionRef]]() + idlePeriod: idlePeriod, + connections: initTable[string, seq[HttpClientConnectionRef]](), ) + res.watcherFut = sessionWatcher(res) + res proc getTLSFlags(flags: HttpClientFlags): set[TLSFlags] {.raises: [Defect] .} = var res: set[TLSFlags] @@ -620,16 +643,6 @@ proc connect(session: HttpSessionRef, # If all attempts to connect to the remote host have failed. raiseHttpConnectionError("Could not connect to remote host") -template isReady(conn: HttpClientConnectionRef): bool = - (conn.state == HttpClientConnectionState.Ready) and - (HttpClientConnectionFlag.KeepAlive in conn.flags) and - (HttpClientConnectionFlag.Request notin conn.flags) and - (HttpClientConnectionFlag.Response notin conn.flags) - -template isIdle(conn: HttpClientConnectionRef, timestamp: Moment, - timeout: Duration): bool = - (timestamp - conn.timestamp) >= timeout - proc removeConnection(session: HttpSessionRef, conn: HttpClientConnectionRef) {.async.} = let removeHost = @@ -651,62 +664,46 @@ proc acquireConnection( flags: set[HttpClientRequestFlag] ): Future[HttpClientConnectionRef] {.async.} = ## Obtain connection from ``session`` or establish a new one. + var default: seq[HttpClientConnectionRef] if (HttpClientFlag.NewConnectionAlways in session.flags) or (HttpClientRequestFlag.DedicatedConnection in flags): - var default: seq[HttpClientConnectionRef] - let res = + var conn = try: await session.connect(ha).wait(session.connectTimeout) except AsyncTimeoutError: raiseHttpConnectionError("Connection timed out") - res[].state = HttpClientConnectionState.Acquired - session.connections.mgetOrPut(ha.id, default).add(res) + conn.state = HttpClientConnectionState.Acquired + session.connections.mgetOrPut(ha.id, default).add(conn) inc(session.connectionsCount) - return res + return conn else: - let conn = + var conn = block: - var idleConnections: seq[HttpClientConnectionRef] + var cres: HttpClientConnectionRef = nil let timestamp = Moment.now() - res = - block: - var cres: HttpClientConnectionRef = nil - session.connections.withValue(ha.id, connections): - for connection in connections[]: - if connection.isReady(): - if connection.isIdle(timestamp, session.idleTimeout): - idleConnections.add(connection) - else: - if isNil(cres): cres = connection - cres - - if not(isNil(res)): - # We mark connection as acquired to avoid race, because we need to - # prune idle connections. - res.state = HttpClientConnectionState.Acquired - if len(idleConnections) > 0: - # Closing connections which was idle for`session.idleTimeout` - # time. - var pending: seq[Future[void]] - for connection in idleConnections: - pending.add(session.removeConnection(connection)) - await allFutures(pending) - res - - if not(isNil(conn)): - return conn - else: - var default: seq[HttpClientConnectionRef] - let res = - try: - await session.connect(ha).wait(session.connectTimeout) - except AsyncTimeoutError: - raiseHttpConnectionError("Connection timed out") - res[].state = HttpClientConnectionState.Acquired - session.connections.mgetOrPut(ha.id, default).add(res) - inc(session.connectionsCount) - return res + connections = session.connections.getOrDefault(ha.id, default) + # We looking for non-idle connection, all idle connections will be + # freed by sessionWatcher(). + for connection in connections: + if connection.isReady() and + not(connection.isIdle(timestamp, session.idleTimeout)): + cres = connection + cres.state = HttpClientConnectionState.Acquired + break + cres + + if not(isNil(conn)): return conn + + conn = + try: + await session.connect(ha).wait(session.connectTimeout) + except AsyncTimeoutError: + raiseHttpConnectionError("Connection timed out") + conn.state = HttpClientConnectionState.Acquired + session.connections.mgetOrPut(ha.id, default).add(conn) + inc(session.connectionsCount) + return conn proc releaseConnection(session: HttpSessionRef, connection: HttpClientConnectionRef) {.async.} = @@ -773,11 +770,59 @@ proc closeWait*(session: HttpSessionRef) {.async.} = ## ## This closes all the connections opened to remote servers. var pending: seq[Future[void]] - for items in session.connections.values(): - for item in items: - pending.add(closeWait(item)) + # Closing sessionWatcher to avoid race condition. + await cancelAndWait(session.watcherFut) + for connections in session.connections.values(): + for conn in connections: + pending.add(closeWait(conn)) await allFutures(pending) +proc sessionWatcher(session: HttpSessionRef) {.async.} = + while true: + let firstBreak = + try: + await sleepAsync(session.idlePeriod) + false + except CancelledError: + true + + if firstBreak: + break + + var idleConnections: seq[HttpClientConnectionRef] + let timestamp = Moment.now() + for peer, connections in session.connections.pairs(): + if len(connections) > 0: + var toClose: seq[HttpClientConnectionRef] + var toKeep: seq[HttpClientConnectionRef] + for conn in connections.items(): + doAssert(not(isNil(conn)), "nil connection inside connections pool") + if conn.isReady() and conn.isIdle(timestamp, session.idleTimeout): + # Idle connection + toClose.add(conn) + else: + # Active connection (currently used) or not idle connection + toKeep.add(conn) + session.connections[peer] = toKeep + idleConnections.add(toClose) + + if len(idleConnections) > 0: + dec(session.connectionsCount, len(idleConnections)) + var pending = newSeqOfCap[Future[void]](len(idleConnections)) + let secondBreak = + try: + for connection in idleConnections: + pending.add(connection.closeWait()) + await allFutures(pending) + false + except CancelledError: + # We still want to close connections to avoid socket leaks. + await allFutures(pending) + true + + if secondBreak: + break + proc closeWait*(request: HttpClientRequestRef) {.async.} = if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}: request.state = HttpReqRespState.Closing diff --git a/tests/testhttpclient.nim b/tests/testhttpclient.nim index 8d03e80c8..16a52fb2e 100644 --- a/tests/testhttpclient.nim +++ b/tests/testhttpclient.nim @@ -895,7 +895,8 @@ suite "HTTP client testing suite": var server = createServer(address, process, false) server.start() - let session = HttpSessionRef.new(idleTimeout = 1000.milliseconds) + let session = HttpSessionRef.new(idleTimeout = 1.seconds, + idlePeriod = 200.milliseconds) try: var f1 = test(session, ha) var f2 = test(session, ha) From cd1493d88142da835f4f372bc1bf58c618fef0f1 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Mon, 27 Feb 2023 14:39:01 +0200 Subject: [PATCH 4/6] Increase timeout for test. --- tests/testhttpclient.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testhttpclient.nim b/tests/testhttpclient.nim index 16a52fb2e..5bcd5df6d 100644 --- a/tests/testhttpclient.nim +++ b/tests/testhttpclient.nim @@ -910,7 +910,7 @@ suite "HTTP client testing suite": f2.read() == (200, "ok", 0) session.connectionsCount == 2 - await sleepAsync(1100.milliseconds) + await sleepAsync(2.seconds) let resp = await test(session, ha) check: resp == (200, "ok", 0) From 605a8a868c443e6de15bb277443389569e9760e7 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Mon, 27 Feb 2023 14:42:26 +0200 Subject: [PATCH 5/6] Adjust timeout to lower value. --- tests/testhttpclient.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testhttpclient.nim b/tests/testhttpclient.nim index 5bcd5df6d..15f77d3f8 100644 --- a/tests/testhttpclient.nim +++ b/tests/testhttpclient.nim @@ -910,7 +910,7 @@ suite "HTTP client testing suite": f2.read() == (200, "ok", 0) session.connectionsCount == 2 - await sleepAsync(2.seconds) + await sleepAsync(1500.milliseconds) let resp = await test(session, ha) check: resp == (200, "ok", 0) From bac8aea0741c28592feb3942870ea8142e26ad9a Mon Sep 17 00:00:00 2001 From: cheatfate Date: Fri, 17 Mar 2023 15:43:00 +0200 Subject: [PATCH 6/6] Address review comments. --- chronos/apps/http/httpclient.nim | 90 ++++++++++++-------------------- 1 file changed, 33 insertions(+), 57 deletions(-) diff --git a/chronos/apps/http/httpclient.nim b/chronos/apps/http/httpclient.nim index fa9c1256a..2997d0429 100644 --- a/chronos/apps/http/httpclient.nim +++ b/chronos/apps/http/httpclient.nim @@ -665,45 +665,27 @@ proc acquireConnection( ): Future[HttpClientConnectionRef] {.async.} = ## Obtain connection from ``session`` or establish a new one. var default: seq[HttpClientConnectionRef] - if (HttpClientFlag.NewConnectionAlways in session.flags) or - (HttpClientRequestFlag.DedicatedConnection in flags): - var conn = - try: - await session.connect(ha).wait(session.connectTimeout) - except AsyncTimeoutError: - raiseHttpConnectionError("Connection timed out") - conn.state = HttpClientConnectionState.Acquired - session.connections.mgetOrPut(ha.id, default).add(conn) - inc(session.connectionsCount) - return conn - else: - var conn = - block: - var cres: HttpClientConnectionRef = nil - let - timestamp = Moment.now() - connections = session.connections.getOrDefault(ha.id, default) - # We looking for non-idle connection, all idle connections will be - # freed by sessionWatcher(). - for connection in connections: - if connection.isReady() and - not(connection.isIdle(timestamp, session.idleTimeout)): - cres = connection - cres.state = HttpClientConnectionState.Acquired - break - cres - - if not(isNil(conn)): return conn - - conn = - try: - await session.connect(ha).wait(session.connectTimeout) - except AsyncTimeoutError: - raiseHttpConnectionError("Connection timed out") - conn.state = HttpClientConnectionState.Acquired - session.connections.mgetOrPut(ha.id, default).add(conn) - inc(session.connectionsCount) - return conn + if (HttpClientFlag.NewConnectionAlways notin session.flags) and + (HttpClientRequestFlag.DedicatedConnection notin flags): + # Trying to reuse existing connection from our connection's pool. + let timestamp = Moment.now() + # We looking for non-idle connection at `Ready` state, all idle connections + # will be freed by sessionWatcher(). + for connection in session.connections.getOrDefault(ha.id): + if connection.isReady() and + not(connection.isIdle(timestamp, session.idleTimeout)): + connection.state = HttpClientConnectionState.Acquired + return connection + + let connection = + try: + await session.connect(ha).wait(session.connectTimeout) + except AsyncTimeoutError: + raiseHttpConnectionError("Connection timed out") + connection.state = HttpClientConnectionState.Acquired + session.connections.mgetOrPut(ha.id, default).add(connection) + inc(session.connectionsCount) + return connection proc releaseConnection(session: HttpSessionRef, connection: HttpClientConnectionRef) {.async.} = @@ -791,28 +773,24 @@ proc sessionWatcher(session: HttpSessionRef) {.async.} = var idleConnections: seq[HttpClientConnectionRef] let timestamp = Moment.now() - for peer, connections in session.connections.pairs(): - if len(connections) > 0: - var toClose: seq[HttpClientConnectionRef] - var toKeep: seq[HttpClientConnectionRef] - for conn in connections.items(): - doAssert(not(isNil(conn)), "nil connection inside connections pool") - if conn.isReady() and conn.isIdle(timestamp, session.idleTimeout): - # Idle connection - toClose.add(conn) + for _, connections in session.connections.mpairs(): + connections.keepItIf( + if isNil(it): + false + else: + if it.isReady() and it.isIdle(timestamp, session.idleTimeout): + idleConnections.add(it) + false else: - # Active connection (currently used) or not idle connection - toKeep.add(conn) - session.connections[peer] = toKeep - idleConnections.add(toClose) + true + ) if len(idleConnections) > 0: dec(session.connectionsCount, len(idleConnections)) - var pending = newSeqOfCap[Future[void]](len(idleConnections)) + var pending: seq[Future[void]] let secondBreak = try: - for connection in idleConnections: - pending.add(connection.closeWait()) + pending = idleConnections.mapIt(it.closeWait()) await allFutures(pending) false except CancelledError: @@ -967,8 +945,6 @@ proc getResponse(req: HttpClientRequestRef): Future[HttpClientResponseRef] {. await req.connection.reader.readUntil(addr buffer[0], len(buffer), HeadersMark).wait( req.session.headersTimeout) - except CancelledError as exc: - raise exc except AsyncTimeoutError: raiseHttpReadError("Reading response headers timed out") except AsyncStreamError: