From fc50aef2b4e2b71442695e718bd9509704d8c015 Mon Sep 17 00:00:00 2001 From: Aleksey Baulin Date: Tue, 20 Sep 2016 14:21:43 +0300 Subject: [PATCH] Send responses in correct order - first working version. (#419) --- tempesta_fw/connection.h | 14 +- tempesta_fw/http.c | 440 ++++++++++++++++++++++------- tempesta_fw/http.h | 16 ++ tempesta_fw/http_msg.c | 18 +- tempesta_fw/http_sess.c | 11 +- tempesta_fw/msg.h | 10 +- tempesta_fw/sched/tfw_sched_hash.c | 4 +- tempesta_fw/sched/tfw_sched_rr.c | 2 +- tempesta_fw/sock.c | 2 +- tempesta_fw/sock_srv.c | 2 +- 10 files changed, 388 insertions(+), 131 deletions(-) diff --git a/tempesta_fw/connection.h b/tempesta_fw/connection.h index e3b54b8fd..2cafee18f 100644 --- a/tempesta_fw/connection.h +++ b/tempesta_fw/connection.h @@ -91,6 +91,7 @@ typedef struct { struct list_head msg_queue; spinlock_t msg_qlock; atomic_t refcnt; + unsigned long flags; struct timer_list timer; TfwMsg *msg; TfwPeer *peer; @@ -102,6 +103,9 @@ typedef struct { #define TFW_CONN_TYPE(c) ((c)->proto.type) +/* Connection flags. */ +#define TFW_CONN_FWD_HOLD 0x0001 /* Hold sending messages */ + /** * TLS hardened connection. */ @@ -153,7 +157,7 @@ extern TfwConnHooks *conn_hooks[TFW_CONN_MAX_PROTOS]; tfw_conn_hook_call(TFW_CONN_TYPE2IDX(TFW_CONN_TYPE(c)), c, f) static inline bool -tfw_connection_nfo(TfwConnection *conn) +tfw_connection_live(TfwConnection *conn) { return atomic_read(&conn->refcnt) > 0; } @@ -169,7 +173,7 @@ tfw_connection_get(TfwConnection *conn) * process, i.e. @refcnt wasn't zero. */ static inline bool -tfw_connection_get_if_nfo(TfwConnection *conn) +tfw_connection_get_if_live(TfwConnection *conn) { int old, rc = atomic_read(&conn->refcnt); @@ -277,12 +281,6 @@ tfw_connection_unlink_msg(TfwConnection *conn) conn->msg = NULL; } -static inline bool -tfw_connection_live(TfwConnection *conn) -{ - return conn->sk && ss_sock_live(conn->sk); -} - /** * Check that TfwConnection resources are cleaned up properly. */ diff --git a/tempesta_fw/http.c b/tempesta_fw/http.c index 9eafcf6db..326c01c10 100644 --- a/tempesta_fw/http.c +++ b/tempesta_fw/http.c @@ -184,19 +184,6 @@ tfw_http_prep_302(TfwHttpMsg *resp, TfwHttpReq *req, TfwStr *cookie) return TFW_PASS; } -static inline void -__init_req_ss_flags(TfwHttpReq *req) -{ - ((TfwMsg *)req)->ss_flags |= SS_F_KEEP_SKB; -} - -static inline void -__init_resp_ss_flags(TfwHttpResp *resp, const TfwHttpReq *req) -{ - if (req->flags & TFW_HTTP_CONN_CLOSE) - ((TfwMsg *)resp)->ss_flags |= SS_F_CONN_CLOSE; -} - /* * Perform operations common to sending an error response to a client. * Set current date in the header of an HTTP error response, and set @@ -209,7 +196,7 @@ tfw_http_send_resp(TfwHttpReq *req, TfwStr *msg, const TfwStr *date) { int conn_flag = req->flags & __TFW_HTTP_CONN_MASK; TfwStr *crlf = __TFW_STR_CH(msg, TFW_STR_CHUNKN(msg) - 1); - TfwHttpMsg resp; + TfwHttpMsg *hmresp; TfwMsgIter it; if (conn_flag) { @@ -224,15 +211,16 @@ tfw_http_send_resp(TfwHttpReq *req, TfwStr *msg, const TfwStr *date) msg->len += crlf->len - crlf_len; } - if (!tfw_http_msg_create(&resp, &it, Conn_Srv, msg->len)) + if (!(hmresp = tfw_http_msg_create(NULL, &it, Conn_Srv, msg->len))) return -ENOMEM; tfw_http_prep_date(date->ptr); - tfw_http_msg_write(&it, &resp, msg); + tfw_http_msg_write(&it, hmresp, msg); - __init_resp_ss_flags((TfwHttpResp *)&resp, req); + __init_resp_ss_flags((TfwHttpResp *)hmresp, req); + tfw_http_resp_fwd(req, (TfwHttpResp *)hmresp); - return tfw_cli_conn_send(req->conn, (TfwMsg *)&resp); + return 0; } #define S_200_PART_01 S_200 S_CRLF S_F_DATE @@ -404,7 +392,7 @@ tfw_http_conn_msg_alloc(TfwConnection *conn) spin_lock(&conn->msg_qlock); req = (TfwHttpReq *)list_first_entry_or_null(&conn->msg_queue, - TfwMsg, msg_list); + TfwMsg, fwd_list); spin_unlock(&conn->msg_qlock); if (req && (req->method == TFW_HTTP_METH_HEAD)) hm->flags |= TFW_HTTP_VOID_BODY; @@ -465,28 +453,63 @@ tfw_http_conn_init(TfwConnection *conn) /* * Connection with a peer is released. * - * For server connections requests that were sent to that server are kept - * in the queue until a paired response comes. That will never happen now. - * For each request that has been unanswered send an error response, then - * delete the request and drop the connection with the client if required. + * For server connections the requests that were sent to that server are + * kept in the queue until a paired response comes. That will never happen + * now, and requests will remain unanswered. For each request in the queue + * send an error response to the corresponding client connection. Both the + * request and the response will be freed when the response is sent out. * * Called when a connection is released. There are no users at that time, * so locks are not needed. */ static void -tfw_http_conn_release(TfwConnection *conn) +tfw_http_conn_release(TfwConnection *srv_conn) { - TfwMsg *msg, *tmp; - - list_for_each_entry_safe(msg, tmp, &conn->msg_queue, msg_list) { - BUG_ON(((TfwHttpMsg *)msg)->conn - && (((TfwHttpMsg *)msg)->conn == conn)); - list_del(&msg->msg_list); - tfw_http_send_404((TfwHttpReq *)msg); - tfw_http_conn_msg_free((TfwHttpMsg *)msg); + TfwHttpReq *req, *tmp; + struct list_head *zap_queue = &srv_conn->msg_queue; + + TFW_DBG3("%s: conn = %p\n", __func__, srv_conn); + BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); + + list_for_each_entry_safe(req, tmp, zap_queue, msg.fwd_list) { + BUG_ON(req->conn && (req->conn == srv_conn)); + list_del_init(&req->msg.fwd_list); + tfw_http_send_404(req); TFW_INC_STAT_BH(clnt.msgs_otherr); } - INIT_LIST_HEAD(&conn->msg_queue); + INIT_LIST_HEAD(&srv_conn->msg_queue); +} + +/* + * Drop client connection's resources. + * + * Desintegrate the list, but do not free the requests. These requests + * have not been answered yet. They are held in the lists of respective + * server connections until paired responses come. If a response comes + * after the list is destroyed, then both the request and the response + * are dropped at the sight of an empty list. The requests from the + * dead client connection are then removed from that server connection. + * + * Locking is necessary as the list is constantly probed from server + * connection threads. + */ +static void +tfw_http_conn_cli_drop(TfwConnection *cli_conn) +{ + TfwHttpMsg *hmreq, *tmp; + struct list_head *seq_queue = &cli_conn->msg_queue; + + TFW_DBG3("%s: conn = %p\n", __func__, cli_conn); + BUG_ON(!(TFW_CONN_TYPE(cli_conn) & Conn_Clnt)); + + if (list_empty_careful(seq_queue)) + return; + + spin_lock(&cli_conn->msg_qlock); + list_for_each_entry_safe(hmreq, tmp, seq_queue, msg.seq_list) { + list_del_init(&hmreq->msg.seq_list); + } + spin_unlock(&cli_conn->msg_qlock); } /* @@ -500,10 +523,11 @@ static void tfw_http_resp_terminate(TfwHttpMsg *hm); static void tfw_http_conn_drop(TfwConnection *conn) { - if (conn->msg && (TFW_CONN_TYPE(conn) & Conn_Srv)) { - if (tfw_http_parse_terminate((TfwHttpMsg *)conn->msg)) { + if (TFW_CONN_TYPE(conn) & Conn_Clnt) { + tfw_http_conn_cli_drop(conn); + } else if (conn->msg) { + if (tfw_http_parse_terminate((TfwHttpMsg *)conn->msg)) tfw_http_resp_terminate((TfwHttpMsg *)conn->msg); - } } tfw_http_conn_msg_free((TfwHttpMsg *)conn->msg); } @@ -766,6 +790,223 @@ tfw_http_adjust_resp(TfwHttpResp *resp, TfwHttpReq *req) TFW_HTTP_HDR_SERVER, 0); } +static inline bool +tfw_http_req_is_nonidempotent(TfwHttpReq *req) +{ + return (req->flags & TFW_HTTP_NON_IDEMPOTENT); +} + +/* + * Forward request @req to server connection @srv_conn. + */ +static void +tfw_http_conn_req_fwd(TfwConnection *srv_conn, TfwHttpReq *req) +{ + TFW_DBG2("%s: srv_conn=[%p], req=[%p]\n", __func__, srv_conn, req); + BUG_ON(!(TFW_CONN_TYPE(srv_conn) & Conn_Srv)); + + /* + * A request is added to the server connection queue. + * If the connection is not on hold, then the request + * is forwarded to the server immediately. Otherwise, + * it is forwarded when the hold is removed. A server + * connection is put on hold when an non-idempotent + * request is forwarded to the server. + */ + spin_lock(&srv_conn->msg_qlock); + list_add_tail(&req->msg.fwd_list, &srv_conn->msg_queue); + if (srv_conn->flags & TFW_CONN_FWD_HOLD) { + spin_unlock(&srv_conn->msg_qlock); + return; + } + if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { + list_del_init(&req->msg.fwd_list); + spin_unlock(&srv_conn->msg_qlock); + tfw_http_send_500(req); + return; + } + if (tfw_http_req_is_nonidempotent(req)) + srv_conn->flags |= TFW_CONN_FWD_HOLD; + spin_unlock(&srv_conn->msg_qlock); +} + +/* + * Forward stalled requests in server connection @srv_conn. + */ +static void +tfw_http_conn_req_fwd_stalled(TfwConnection *srv_conn) +{ + TfwHttpReq *req, *tmp, *end; + struct list_head zap_queue, err_queue; + struct list_head *fwd_queue = &srv_conn->msg_queue; + + TFW_DBG2("%s: conn = %p\n", __func__, srv_conn); + BUG_ON(!(srv_conn->flags & TFW_CONN_FWD_HOLD)); + + INIT_LIST_HEAD(&zap_queue); + INIT_LIST_HEAD(&err_queue); + /* + * Process the server connection's queue of pending requests. + * The queue is locked against concurrent updates: inserts of + * outgoing requests, or closing of the server connection. Do + * it as fast as possible by moving failed requests to other + * queues that can be processed without this lock. + */ + spin_lock(&srv_conn->msg_qlock); + end = container_of(fwd_queue, TfwHttpReq, msg.fwd_list); + list_for_each_entry_safe(req, tmp, fwd_queue, msg.fwd_list) { + /* + * If the client connection is dead, then don't send + * the request to the server. Move it to @zap_queue + * for deletion later. + */ + if (!tfw_connection_live(req->conn)) { + list_move_tail(&req->msg.fwd_list, &zap_queue); + continue; + } + /* + * If the server connection is dead, then there's + * nothing to do here. The procedure of closing the + * server connection will do whatever is necessary. + */ + if (!tfw_connection_live(srv_conn)) + break; + /* + * If unable to send to the server connection due to + * an error, then move the request to @err_queue for + * sending a 500 error response later. That is safe + * as the response will be sent in proper seq order. + */ + if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { + list_move_tail(&req->msg.fwd_list, &err_queue); + continue; + } + /* Stop sending if the request is non-idempotent. */ + if (tfw_http_req_is_nonidempotent(req)) + break; + } + /* + * If the full server connection queue has been processed, + * then upcoming requests may be send to the server right away. + */ + if (req == end) + srv_conn->flags &= ~TFW_CONN_FWD_HOLD; + spin_unlock(&srv_conn->msg_qlock); + + /* + * Delete requests from dead client connections. The requests + * need to be removed from @seq_list. The process for closing + * a client connection does the same, so there may be certain + * concurrency here. + */ + list_for_each_entry_safe(req, tmp, &zap_queue, msg.fwd_list) { + list_del_init(&req->msg.fwd_list); + if (!list_empty_careful(&req->msg.seq_list)) { + spin_lock_bh(&req->conn->msg_qlock); + list_del_init(&req->msg.seq_list); + spin_unlock_bh(&req->conn->msg_qlock); + } + tfw_http_conn_msg_free((TfwHttpMsg *)req); + } + /* + * Requests that were not forwarded due to an error. Send an + * error response to a client. The response will be attached + * to the request and sent to the client in proper seq order. + */ + list_for_each_entry_safe(req, tmp, &err_queue, msg.fwd_list) { + list_del_init(&req->msg.fwd_list); + tfw_http_send_500(req); + } +} + +/* + * Forward responses to the client in the correct order. + */ +void +tfw_http_resp_fwd(TfwHttpReq *req, TfwHttpResp *resp) +{ + TfwHttpReq *tmp; + TfwConnection *cli_conn = req->conn; + struct list_head out_queue, *seq_queue = &cli_conn->msg_queue; + + TFW_DBG2("%s: req=[%p], resp=[%p]\n", __func__, req, resp); + + INIT_LIST_HEAD(&out_queue); + /* + * Starting with the first request on the list, pick consecutive + * requests that have a paired response. Remove those requests + * from the list, and put them on the list of outgoing responses. + * + * However, if the list is empty, then then it's either a bug, + * or the client connection had been closed. If it's a bug, then + * the correct order of responses to requests may be broken. The + * client connection needs to be closed. + */ + spin_lock(&cli_conn->msg_qlock); + if (list_empty(seq_queue)) { + spin_unlock(&cli_conn->msg_qlock); + ss_close_sync(cli_conn->sk, true); + tfw_http_conn_msg_free((TfwHttpMsg *)resp); + tfw_http_conn_msg_free((TfwHttpMsg *)req); + return; + } + req->resp = (TfwHttpMsg *)resp; + do { + req = list_first_entry(seq_queue, TfwHttpReq, msg.seq_list); + if (req->resp == NULL) + break; + list_move_tail(&req->msg.seq_list, &out_queue); + } while(!list_empty(seq_queue)); + spin_unlock(&cli_conn->msg_qlock); + + /* Forward responses to the client. */ + list_for_each_entry_safe(req, tmp, &out_queue, msg.seq_list) { + list_del_init(&req->msg.seq_list); + resp = (TfwHttpResp *)req->resp; + /* + * If the client connection is dead, then discard + * all @req and @resp in the @out_queue. Remaining requests + * from the client in the @seq_queue will be handled when + * the client connection is released. + */ + if (!tfw_connection_live(cli_conn)) + goto loop_discard; + /* + * Close the client connection in case of an error. + * Otherwise, the correct order of responses may be broken. + * + * FIXME Sending is asynchronous. An error may still occur + * when the response is actually sent out. If that happens + * it breaks the correct order of responses. Perhaps, the + * client connection needs to be closed in that case. + */ + if (tfw_cli_conn_send(cli_conn, (TfwMsg *)resp)) { + ss_close_sync(cli_conn->sk, true); + goto loop_discard; + } + /* + * If this is a response to a non-idempotent request, then + * it's time to continue forwarding requests to the server + * connection the response has come on. If the server is in + * failover state, then the stalled requests will be taken + * care of by the failover processing. + * + * FIXME It might be better to mark the server connection + * somehow, then forward stalled requests for each marked + * server connection outside of this @out_queue processing. + */ + if (tfw_http_req_is_nonidempotent(req) && resp->conn + && (tfw_connection_get_if_live(resp->conn))) + { + tfw_http_conn_req_fwd_stalled(resp->conn); + tfw_connection_put(resp->conn); + } +loop_discard: + tfw_http_conn_msg_free((TfwHttpMsg *)resp); + tfw_http_conn_msg_free((TfwHttpMsg *)req); + } +} + /** * The request is served from cache. * Send the response as is and unrefer its data. @@ -775,20 +1016,13 @@ tfw_http_req_cache_service(TfwHttpReq *req, TfwHttpResp *resp) { if (tfw_http_adjust_resp(resp, req)) goto resp_err; - - if (tfw_cli_conn_send(req->conn, (TfwMsg *)resp)) - goto resp_err; - + tfw_http_resp_fwd(req, resp); TFW_INC_STAT_BH(clnt.msgs_fromcache); - -resp_out: - tfw_http_conn_msg_free((TfwHttpMsg *)resp); - tfw_http_conn_msg_free((TfwHttpMsg *)req); return; resp_err: tfw_http_send_500(req); TFW_INC_STAT_BH(clnt.msgs_otherr); - goto resp_out; + return; } /** @@ -802,48 +1036,45 @@ tfw_http_req_cache_cb(TfwHttpReq *req, TfwHttpResp *resp) int r; TfwConnection *srv_conn = NULL; + TFW_DBG2("%s: req = %p, resp = %p\n", __func__, req, resp); + if (resp) { tfw_http_req_cache_service(req, resp); return; } - /* * Sticky cookie module used for HTTP session identification may send - * a response to the client when sticky cookie presence is enforced and - * the cookie is missing from the request. + * a response to the client when sticky cookie presence is enforced + * and the cookie is missing from the request. * - * HTTP session can be required for the request schduling, so obtain it - * first. However, req->sess still can be NULL if sticky cookies aren't - * enabled. + * HTTP session may be required for request scheduling, so obtain it + * first. However, req->sess still may be NULL if sticky cookies are + * not enabled. */ r = tfw_http_sess_obtain(req); if (r < 0) goto send_500; - if (r > 0) { - /* Response sent, nothing to do. */ - tfw_http_conn_msg_free((TfwHttpMsg *)req); + if (r > 0) /* Response sent, nothing to do. */ return; - } /* - * Dispatch request to an appropriate server. Schedulers - * should make a decision based on an unmodified request, - * so this must be done before any request mangling. + * Dispatch request to an appropriate server. Schedulers should + * make a decision based on an unmodified request, so this must + * be done before any request mangling. * - * The code below is typically called on remote NUMA node. - * That's not good, but we must run TDB lookup on the node - * before this is executed, to avoid unnecessary work in - * SoftIRQ and to speed up the cache operation. - * At the same time, cache hits are expected to prevail - * over cache misses, so this is not a frequent path. + * The code below is usually called on a remote NUMA node. That's + * not good, but TDB lookup must be run on the node before it is + * executed, to avoid unnecessary work in SoftIRQ and to speed up + * the cache operation. At the same time, cache hits are expected + * to prevail over cache misses, so this is not a frequent path. * - * TODO #593: check whether req->sess->srv_conn is alive or - * get a new connection for req->sess->srv_conn->peer from appropriate - * scheduler otherwise. This eliminates long generic scheduling work - * flow. When a first request in the session is scheduled by the generic - * logic, TfwSession->srv_conn must be initialized by poniter to - * appropriate TfwConnection, so all following session hits will be - * scheduled much faster. + * TODO #593: check whether req->sess->srv_conn is alive. If not, + * then get a new connection for req->sess->srv_conn->peer from + * an appropriate scheduler. That eliminates the long generic + * scheduling work flow. When the first request in a session is + * scheduled by the generic logic, TfwSession->srv_conn must be + * initialized to point at the appropriate TfwConnection, so that + * all subsequent session hits are scheduled much faster. */ srv_conn = tfw_sched_get_srv_conn((TfwMsg *)req); if (srv_conn == NULL) { @@ -854,34 +1085,31 @@ tfw_http_req_cache_cb(TfwHttpReq *req, TfwHttpResp *resp) if (tfw_http_adjust_req(req)) goto send_500; - /* Add request to the server connection. */ - spin_lock(&srv_conn->msg_qlock); - list_add_tail(&req->msg.msg_list, &srv_conn->msg_queue); - spin_unlock(&srv_conn->msg_qlock); - /* Send request to the server. */ - if (tfw_connection_send(srv_conn, (TfwMsg *)req)) { - spin_lock(&srv_conn->msg_qlock); - list_del(&req->msg.msg_list); - spin_unlock(&srv_conn->msg_qlock); - goto send_500; - } - TFW_INC_STAT_BH(clnt.msgs_forwarded); + tfw_http_conn_req_fwd(srv_conn, req); goto conn_put; send_404: tfw_http_send_404(req); - tfw_http_conn_msg_free((TfwHttpMsg *)req); TFW_INC_STAT_BH(clnt.msgs_otherr); return; send_500: tfw_http_send_500(req); - tfw_http_conn_msg_free((TfwHttpMsg *)req); TFW_INC_STAT_BH(clnt.msgs_otherr); conn_put: tfw_connection_put(srv_conn); } +/* + * Set a flag if the request is idempotent. +*/ +static inline void +tfw_http_req_set_nonidempotent(TfwHttpReq *req) +{ + if (req->method == TFW_HTTP_METH_POST) + req->flags |= TFW_HTTP_NON_IDEMPOTENT; +} + static int tfw_http_req_set_context(TfwHttpReq *req) { @@ -1058,6 +1286,17 @@ tfw_http_req_process(TfwConnection *conn, struct sk_buff *skb, unsigned int off) */ tfw_connection_unlink_msg(conn); + /* Set a flag if the request is idempotent. */ + tfw_http_req_set_nonidempotent(req); + + /* + * Add the request to the list of the client connection + * to preserve the correct order of responses to requests. + */ + spin_lock(&conn->msg_qlock); + list_add_tail(&req->msg.seq_list, &conn->msg_queue); + spin_unlock(&conn->msg_qlock); + /* * The request should either be stored or released. * Otherwise we lose the reference to it and get a leak. @@ -1109,6 +1348,7 @@ tfw_http_req_process(TfwConnection *conn, struct sk_buff *skb, unsigned int off) static void tfw_http_resp_cache_cb(TfwHttpReq *req, TfwHttpResp *resp) { + TFW_DBG2("%s: req = %p, resp = %p\n", __func__, req, resp); /* * Typically we're at a node far from the node where @resp was * received, so we do an inter-node transfer. However, this is @@ -1116,25 +1356,21 @@ tfw_http_resp_cache_cb(TfwHttpReq *req, TfwHttpResp *resp) * requests will get responded to by the current node without * inter-node data transfers. (see tfw_http_req_cache_cb()) */ - if (tfw_http_adjust_resp(resp, req)) - goto err; - - if (tfw_cli_conn_send(req->conn, (TfwMsg *)resp)) - goto err; - - TFW_INC_STAT_BH(serv.msgs_forwarded); + if (tfw_http_adjust_resp(resp, req)) { + tfw_http_conn_msg_free((TfwHttpMsg *)resp); + tfw_http_send_500(req); + TFW_INC_STAT_BH(serv.msgs_otherr); + return; + } + tfw_http_resp_fwd(req, resp); + /* Responses from cache don't have @resp->conn. */ +TFW_DBG2("%s: resp=[%p] resp->conn=[%p] resp->conn->peer=[%p] resp->conn->peer->apm=[%p]\n", + __func__, resp, resp->conn, resp->conn->peer, ((TfwServer *)resp->conn->peer)->apm); if (resp->conn) tfw_apm_update(((TfwServer *)resp->conn->peer)->apm, resp->jtstamp, resp->jtstamp - req->jtstamp); -out: - /* Now we don't need the request and the response anymore. */ - tfw_http_conn_msg_free((TfwHttpMsg *)resp); - tfw_http_conn_msg_free((TfwHttpMsg *)req); + TFW_INC_STAT_BH(serv.msgs_forwarded); return; -err: - tfw_http_send_500(req); - TFW_INC_STAT_BH(serv.msgs_otherr); - goto out; } /* @@ -1158,8 +1394,8 @@ tfw_http_popreq(TfwHttpMsg *hmresp) TFW_INC_STAT_BH(serv.msgs_otherr); return NULL; } - msg = list_first_entry(&conn->msg_queue, TfwMsg, msg_list); - list_del(&msg->msg_list); + msg = list_first_entry(&conn->msg_queue, TfwMsg, fwd_list); + list_del_init(&msg->fwd_list); spin_unlock(&conn->msg_qlock); return (TfwHttpReq *)msg; diff --git a/tempesta_fw/http.h b/tempesta_fw/http.h index e0c4bcaea..741acd406 100644 --- a/tempesta_fw/http.h +++ b/tempesta_fw/http.h @@ -229,6 +229,7 @@ typedef struct { #define TFW_HTTP_FIELD_DUPENTRY 0x000200 /* Duplicate field */ /* URI has form http://authority/path, not just /path */ #define TFW_HTTP_URI_FULL 0x000400 +#define TFW_HTTP_NON_IDEMPOTENT 0x000800 /* Response flags */ #define TFW_HTTP_VOID_BODY 0x010000 /* Resp to HEAD req */ @@ -326,6 +327,7 @@ typedef struct { unsigned long tm_header; unsigned long tm_bchunk; unsigned long hash; + TfwHttpMsg *resp; } TfwHttpReq; #define TFW_HTTP_REQ_STR_START(r) __MSG_STR_START(r) @@ -375,6 +377,19 @@ tfw_current_timestamp(void) return ts.tv_sec; } +static inline void +__init_req_ss_flags(TfwHttpReq *req) +{ + ((TfwMsg *)req)->ss_flags |= SS_F_KEEP_SKB; +} + +static inline void +__init_resp_ss_flags(TfwHttpResp *resp, const TfwHttpReq *req) +{ + if (req->flags & TFW_HTTP_CONN_CLOSE) + ((TfwMsg *)resp)->ss_flags |= SS_F_CONN_CLOSE; +} + typedef void (*tfw_http_cache_cb_t)(TfwHttpReq *, TfwHttpResp *); /* Internal (parser) HTTP functions. */ @@ -386,6 +401,7 @@ bool tfw_http_parse_terminate(TfwHttpMsg *hm); int tfw_http_msg_process(void *conn, struct sk_buff *skb, unsigned int off); unsigned long tfw_http_req_key_calc(TfwHttpReq *req); void tfw_http_req_destruct(void *msg); +void tfw_http_resp_fwd(TfwHttpReq *req, TfwHttpResp *resp); /* * Functions to send an HTTP error response to a client. diff --git a/tempesta_fw/http_msg.c b/tempesta_fw/http_msg.c index 4298c1ffd..37c0b2e45 100644 --- a/tempesta_fw/http_msg.c +++ b/tempesta_fw/http_msg.c @@ -635,10 +635,12 @@ tfw_http_msg_hdr_add(TfwHttpMsg *hm, TfwStr *hdr) } /** - * Allocate skb space for further @hm data writing. - * Put as much as possible to one skb, TCP GSO will care about segmentation. + * Given the total message length as @len, allocate an appropriate number + * of SKBs and page fragments to hold the payload, and add them to the + * message. Put as much as possible in one SKB. TCP GSO will take care of + * segmentation. The allocated payload space will be filled with data. * - * tfw_http_msg_free() is expected to be called for @hm if the function fails. + * Call tfw_http_msg_free() is for @hm if the function fails. */ static int __msg_alloc_skb_data(TfwHttpMsg *hm, size_t len) @@ -672,7 +674,8 @@ tfw_http_msg_create(TfwHttpMsg *hm, TfwMsgIter *it, int type, size_t data_len) if (hm) { memset(hm, 0, sizeof(*hm)); ss_skb_queue_head_init(&hm->msg.skb_list); - INIT_LIST_HEAD(&hm->msg.msg_list); + INIT_LIST_HEAD(&hm->msg.seq_list); + INIT_LIST_HEAD(&hm->msg.fwd_list); } else { if (!(hm = tfw_http_msg_alloc(type))) return NULL; @@ -827,9 +830,7 @@ EXPORT_SYMBOL(tfw_http_msg_free); /** * Allocate a new HTTP message. - * Given the total message length as @data_len, it allocates an appropriate - * number of SKBs and page fragments to hold the payload, and sets them up - * in Tempesta message. + * The space to hold the payload is allocated separately. */ TfwHttpMsg * tfw_http_msg_alloc(int type) @@ -853,8 +854,9 @@ tfw_http_msg_alloc(int type) hm->h_tbl->off = TFW_HTTP_HDR_RAW; memset(hm->h_tbl->tbl, 0, __HHTBL_SZ(1) * sizeof(TfwStr)); + INIT_LIST_HEAD(&hm->msg.fwd_list); + INIT_LIST_HEAD(&hm->msg.seq_list); ss_skb_queue_head_init(&hm->msg.skb_list); - INIT_LIST_HEAD(&hm->msg.msg_list); if (type & Conn_Clnt) hm->destructor = tfw_http_req_destruct; diff --git a/tempesta_fw/http_sess.c b/tempesta_fw/http_sess.c index 0fdaabaac..bea948824 100644 --- a/tempesta_fw/http_sess.c +++ b/tempesta_fw/http_sess.c @@ -94,13 +94,14 @@ static struct kmem_cache *sess_cache; static int tfw_http_sticky_send_302(TfwHttpReq *req, StickyVal *sv) { - TfwConnection *conn = req->conn; unsigned long ts_be64 = cpu_to_be64(sv->ts); TfwStr chunks[3], cookie = { 0 }; DEFINE_TFW_STR(s_eq, "="); - TfwHttpMsg resp; + TfwHttpMsg *hmresp; char buf[sizeof(*sv) * 2]; + if (!(hmresp = tfw_http_msg_alloc(Conn_Srv))) + return -ENOMEM; /* * Form the cookie as: * @@ -124,9 +125,11 @@ tfw_http_sticky_send_302(TfwHttpReq *req, StickyVal *sv) cookie.len = chunks[0].len + chunks[1].len + chunks[2].len; __TFW_STR_CHUNKN_SET(&cookie, 3); - if (tfw_http_prep_302(&resp, req, &cookie)) + if (tfw_http_prep_302(hmresp, req, &cookie)) return -1; - tfw_cli_conn_send(conn, (TfwMsg *)&resp); + + __init_resp_ss_flags((TfwHttpResp *)hmresp, req); + tfw_http_resp_fwd(req, (TfwHttpResp *)hmresp); return 0; } diff --git a/tempesta_fw/msg.h b/tempesta_fw/msg.h index 531b80d7e..c47e8bfb7 100644 --- a/tempesta_fw/msg.h +++ b/tempesta_fw/msg.h @@ -30,13 +30,15 @@ #include "sync_socket.h" /** - * @msg_list - messages queue to send to peer; - * @skb_list - list of sk_buff's belonging to the message; + * @seq_list - member in the ordered queue of incoming requests; + * @fwd_list - member in the queue of forwarded/backlogged requests; + * @skb_list - list of sk_buff that belong to the message; * @ss_flags - message processing flags; - * @len - total body length; + * @len - total message length; */ typedef struct { - struct list_head msg_list; + struct list_head seq_list; + struct list_head fwd_list; int ss_flags; SsSkbList skb_list; size_t len; diff --git a/tempesta_fw/sched/tfw_sched_hash.c b/tempesta_fw/sched/tfw_sched_hash.c index 1302a6a28..43a40cbf0 100644 --- a/tempesta_fw/sched/tfw_sched_hash.c +++ b/tempesta_fw/sched/tfw_sched_hash.c @@ -146,7 +146,7 @@ tfw_sched_hash_get_srv_conn(TfwMsg *msg, TfwSrvGroup *sg) for (tries = 0; tries < __HLIST_SZ(TFW_SG_MAX_CONN); ++tries) { for (ch = sg->sched_data; ch->conn; ++ch) { curr_weight = msg_hash ^ ch->hash; - if (likely(tfw_connection_nfo(ch->conn)) + if (likely(tfw_connection_live(ch->conn)) && curr_weight > best_weight) { best_weight = curr_weight; @@ -156,7 +156,7 @@ tfw_sched_hash_get_srv_conn(TfwMsg *msg, TfwSrvGroup *sg) if (unlikely(!best_conn)) return NULL; - if (tfw_connection_get_if_nfo(best_conn)) + if (tfw_connection_get_if_live(best_conn)) return best_conn; } return NULL; diff --git a/tempesta_fw/sched/tfw_sched_rr.c b/tempesta_fw/sched/tfw_sched_rr.c index 84cb167ad..4f90b6052 100644 --- a/tempesta_fw/sched/tfw_sched_rr.c +++ b/tempesta_fw/sched/tfw_sched_rr.c @@ -123,7 +123,7 @@ tfw_sched_rr_get_srv_conn(TfwMsg *msg, TfwSrvGroup *sg) i = atomic64_inc_return(&srv_cl->rr_counter) % srv_cl->conn_n; conn = srv_cl->conns[i]; - if (tfw_connection_get_if_nfo(conn)) + if (tfw_connection_get_if_live(conn)) return conn; } } diff --git a/tempesta_fw/sock.c b/tempesta_fw/sock.c index 2401e2582..69d55d485 100644 --- a/tempesta_fw/sock.c +++ b/tempesta_fw/sock.c @@ -428,7 +428,7 @@ ss_droplink(struct sock *sk) int __ss_close(struct sock *sk, int flags) { - if (unlikely(!sk)) + if (unlikely(!(sk && ss_sock_live(sk)))) return SS_OK; sk_incoming_cpu_update(sk); diff --git a/tempesta_fw/sock_srv.c b/tempesta_fw/sock_srv.c index 59a3c523d..e25fe7235 100644 --- a/tempesta_fw/sock_srv.c +++ b/tempesta_fw/sock_srv.c @@ -291,7 +291,7 @@ tfw_sock_srv_do_failover(struct sock *sk, const char *msg) * connection reference to indicate that the connection is in the * failover state. */ - if (tfw_connection_nfo(conn)) { + if (tfw_connection_live(conn)) { tfw_connection_put_to_death(conn); tfw_connection_drop(conn); TFW_INC_STAT_BH(serv.conn_disconnects);