Skip to content

Commit

Permalink
Upgrade connection to websocket and provision new connection
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksey Mikhaylov <[email protected]>
  • Loading branch information
ttaym committed Mar 18, 2022
1 parent 94be89e commit eac58dd
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 29 deletions.
23 changes: 21 additions & 2 deletions fw/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ enum {
/* HTTPS */
Conn_HttpsClnt = Conn_Clnt | TFW_FSM_HTTPS,
Conn_HttpsSrv = Conn_Srv | TFW_FSM_HTTPS,

/* Websocket plain */
Conn_WsClnt = Conn_HttpClnt | TFW_FSM_WS,
Conn_WsSrv = Conn_HttpSrv | TFW_FSM_WS,

/* Websocket secure */
Conn_WssClnt = Conn_HttpsClnt | TFW_FSM_WS,
Conn_WssSrv = Conn_HttpsSrv | TFW_FSM_WS,
};

#define TFW_CONN_TYPE2IDX(t) TFW_FSM_TYPE(t)
Expand Down Expand Up @@ -100,7 +108,7 @@ enum {
struct sock *sk; \
void (*destructor)(void *);

typedef struct TfwConn {
typedef struct tfw_conn_t {
TFW_CONN_COMMON;
} TfwConn;

Expand Down Expand Up @@ -199,7 +207,9 @@ enum {
/* Connection is in use or at least scheduled to be established. */
TFW_CONN_B_ACTIVE,
/* Connection is disconnected and stopped. */
TFW_CONN_B_STOPPED
TFW_CONN_B_STOPPED,
/* Mark connection as unavailable to schedulers */
TFW_CONN_B_UNSCHED
};

/**
Expand Down Expand Up @@ -297,6 +307,15 @@ tfw_srv_conn_restricted(TfwSrvConn *srv_conn)
return test_bit(TFW_CONN_B_RESEND, &srv_conn->flags);
}

/*
* Connection is unavailable to scheduler and may be removed from it
*/
static inline bool
tfw_srv_conn_unscheduled(TfwSrvConn *srv_conn)
{
return test_bit(TFW_CONN_B_UNSCHED, &srv_conn->flags);
}

/*
* Tell if a connection has non-idempotent requests.
*/
Expand Down
4 changes: 3 additions & 1 deletion fw/gfsm.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ enum {
/* Protocols */
TFW_FSM_HTTP,
TFW_FSM_HTTPS,
/* Not really a FSM */
TFW_FSM_WS,

/* Security rules enforcement. */
TFW_FSM_FRANG_REQ,
Expand Down Expand Up @@ -181,7 +183,7 @@ typedef struct {
& ((TFW_GFSM_FSM_MASK << TFW_GFSM_FSM_SHIFT) \
| TFW_GFSM_STATE_MASK))

typedef struct TfwConn TfwConn;
typedef struct tfw_conn_t TfwConn;
typedef int (*tfw_gfsm_handler_t)(TfwConn *conn, TfwFsmData *data);

void tfw_gfsm_state_init(TfwGState *st, void *obj, int st0);
Expand Down
41 changes: 40 additions & 1 deletion fw/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -2339,7 +2339,6 @@ static int
tfw_http_conn_init(TfwConn *conn)
{
T_DBG2("%s: conn=[%p]\n", __func__, conn);

if (TFW_CONN_TYPE(conn) & Conn_Srv) {
TfwSrvConn *srv_conn = (TfwSrvConn *)conn;
if (!list_empty(&srv_conn->fwd_queue)) {
Expand Down Expand Up @@ -5931,6 +5930,7 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
unsigned int chunks_unused, parsed;
TfwHttpReq *bad_req;
TfwHttpMsg *hmresp, *hmsib;
TfwHttpResp *resp;
TfwFsmData data_up;
bool conn_stop, filtout = false;

Expand All @@ -5953,6 +5953,7 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
parsed = 0;
hmsib = NULL;
hmresp = (TfwHttpMsg *)stream->msg;
resp = (TfwHttpResp *)hmresp;

r = ss_skb_process(skb, tfw_http_parse_resp, hmresp, &chunks_unused,
&parsed);
Expand Down Expand Up @@ -6098,6 +6099,44 @@ tfw_http_resp_process(TfwConn *conn, TfwStream *stream, struct sk_buff *skb)
r = TFW_PASS;
goto next_resp;
}

/*
* Upgrade client and server connection to websocket, remove it
* from scheduler and provision new connection.
*
* TODO #755: set existent client and server connection to Conn_Ws*
* when websocket proxing protocol will be implemented
*/
if (unlikely(test_bit(TFW_HTTP_B_CONN_UPGRADE, hmresp->flags)
&& test_bit(TFW_HTTP_B_UPGRADE_WEBSOCKET, hmresp->flags)
&& resp->status == 101))
{
TfwServer *srv = (TfwServer *)resp->conn->peer;
TfwSrvConn *srv_conn;

/* Cannot proceed with upgrade websocket due to error
* in creation of new http connection. While it will not be
* inherently erroneous to upgrade existing connection, but
* we would pay for it with essentially dropping connection with
* server. Better just drop upgrade request and
* reestablish connection.
*/
if (!(srv_conn = tfw_sock_srv_new_conn(srv))) {
tfw_http_conn_error_log(conn, "Can't create new "
"connection for websocket"
" upgrade response");
return TFW_BLOCK;
}

set_bit(TFW_CONN_B_UNSCHED,
&((TfwSrvConn *)hmresp->conn)->flags);

tfw_sock_srv_conn_activate(srv, srv_conn);
tfw_sock_srv_connect_try(srv_conn);

srv->sg->sched->upd_srv(srv);
}

/*
* Pass the response to cache for further processing.
* In the end, the response is sent on to the client.
Expand Down
2 changes: 1 addition & 1 deletion fw/http_frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ typedef struct {
unsigned char data_off;
} TfwH2Ctx;

typedef struct TfwConn TfwConn;
typedef struct tfw_conn_t TfwConn;

int tfw_h2_init(void);
void tfw_h2_cleanup(void);
Expand Down
28 changes: 28 additions & 0 deletions fw/http_sched_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ __is_conn_suitable(TfwSrvConn *conn, bool hmonitor)
{
return (hmonitor || !tfw_srv_suspended((TfwServer *)conn->peer))
&& !tfw_srv_conn_restricted(conn)
&& !tfw_srv_conn_unscheduled(conn)
&& !tfw_srv_conn_busy(conn)
&& !tfw_srv_conn_queue_full(conn)
&& tfw_srv_conn_get_if_live(conn);
Expand Down Expand Up @@ -333,6 +334,8 @@ tfw_sched_hash_add_conns(TfwServer *srv, TfwHashConnList *cl, size_t *seed,

list_for_each_entry(conn, &srv->conn_list, list) {
unsigned long hash;
if (tfw_srv_conn_unscheduled(conn))
continue;
do {
hash = __hash_64(srv_hash ^ *seed);
*seed += seed_inc;
Expand Down Expand Up @@ -432,13 +435,38 @@ tfw_sched_hash_del_srv(TfwServer *srv)
call_rcu(&cl->rcu, tfw_sched_hash_put_srv_data);
}

static int
tfw_sched_hash_upd_srv(TfwServer *srv)
{
size_t size, seed, seed_inc = 0;
TfwHashConnList *cl = rcu_dereference_bh_check(srv->sched_data, 1);
TfwHashConnList *cl_copy;

seed = get_random_long();
seed_inc = get_random_int();

size = sizeof(TfwHashConnList) + srv->conn_n * sizeof(TfwHashConn);
if (!(cl_copy = kzalloc(size, GFP_ATOMIC)))
return -ENOMEM;

tfw_sched_hash_add_conns(srv, cl_copy, &seed, seed_inc);

rcu_assign_pointer(srv->sched_data, cl_copy);

if (cl)
call_rcu(&cl->rcu, tfw_sched_hash_put_srv_data);

return 0;
}

static TfwScheduler tfw_sched_hash = {
.name = "hash",
.list = LIST_HEAD_INIT(tfw_sched_hash.list),
.add_grp = tfw_sched_hash_add_grp,
.del_grp = tfw_sched_hash_del_grp,
.add_srv = tfw_sched_hash_add_srv,
.del_srv = tfw_sched_hash_del_srv,
.upd_srv = tfw_sched_hash_upd_srv,
.sched_sg_conn = tfw_sched_hash_get_sg_conn,
.sched_srv_conn = tfw_sched_hash_get_srv_conn,
};
Expand Down
90 changes: 73 additions & 17 deletions fw/http_sched_ratio.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@

#define TFW_SCHED_RATIO_INTVL (HZ / 20) /* The timer periodicity. */

typedef struct {
struct rcu_head rcu;
size_t conn_n;
TfwSrvConn *conns[0];
} TfwRatioSrvConnList;

/**
* Individual upstream server descriptor.
*
Expand All @@ -38,17 +44,15 @@
*
* @rcu - RCU control structure.
* @srv - pointer to server structure.
* @conn - list of pointers to server connection structures.
* @cl - pointer to list of pointers to server connection structures.
* @counter - monotonic counter for choosing the next connection.
* @conn_n - number of connections to server.
* @seq - current sequence number for APM stats.
*/
typedef struct {
struct rcu_head rcu;
TfwServer *srv;
TfwSrvConn **conn;
TfwRatioSrvConnList *cl;
atomic64_t counter;
size_t conn_n;
unsigned int seq;
} TfwRatioSrvDesc;

Expand Down Expand Up @@ -827,12 +831,15 @@ static inline TfwSrvConn *
__sched_srv(TfwRatioSrvDesc *srvdesc, int skipnip, int *nipconn)
{
size_t ci;
TfwRatioSrvConnList *cl = rcu_dereference_bh_check(srvdesc->cl, 1);

for (ci = 0; ci < srvdesc->conn_n; ++ci) {
rcu_read_lock_bh();
for (ci = 0; ci < cl->conn_n; ++ci) {
unsigned long idxval = atomic64_inc_return(&srvdesc->counter);
TfwSrvConn *srv_conn = srvdesc->conn[idxval % srvdesc->conn_n];
TfwSrvConn *srv_conn = cl->conns[idxval % cl->conn_n];

if (unlikely(tfw_srv_conn_restricted(srv_conn)
|| tfw_srv_conn_unscheduled(srv_conn)
|| tfw_srv_conn_busy(srv_conn)
|| tfw_srv_conn_queue_full(srv_conn)))
continue;
Expand All @@ -841,9 +848,12 @@ __sched_srv(TfwRatioSrvDesc *srvdesc, int skipnip, int *nipconn)
++(*nipconn);
continue;
}
if (likely(tfw_srv_conn_get_if_live(srv_conn)))
if (likely(tfw_srv_conn_get_if_live(srv_conn))) {
rcu_read_unlock_bh();
return srv_conn;
}
}
rcu_read_unlock_bh();

return NULL;
}
Expand Down Expand Up @@ -981,7 +991,7 @@ tfw_sched_ratio_cleanup(TfwRatio *ratio)

/* Data that is shared between pool entries. */
for (si = 0; si < ratio->srv_n; ++si)
kfree(ratio->srvdesc[si].conn);
kfree(ratio->srvdesc[si].cl);

kfree(ratio->hstdata);
kfree(ratio->rtodata);
Expand Down Expand Up @@ -1039,28 +1049,32 @@ static int
tfw_sched_ratio_srvdesc_setup_srv(TfwServer *srv, TfwRatioSrvDesc *srvdesc)
{
size_t size, ci = 0;
TfwSrvConn **conn, *srv_conn;
TfwSrvConn *srv_conn;
TfwRatioSrvConnList *cl;

size = sizeof(TfwSrvConn *) * srv->conn_n;
if (!(srvdesc->conn = kzalloc(size, GFP_KERNEL)))
size = sizeof(TfwRatioSrvConnList) + sizeof(TfwSrvConn *) * srv->conn_n;
if (!(cl = kzalloc(size, GFP_KERNEL)))
return -ENOMEM;

conn = srvdesc->conn;
list_for_each_entry(srv_conn, &srv->conn_list, list) {
if (tfw_srv_conn_unscheduled(srv_conn))
continue;
if (unlikely(ci++ == srv->conn_n))
goto err;
*conn++ = srv_conn;

cl->conns[ci-1] = srv_conn;
}
if (unlikely(ci != srv->conn_n))
goto err;

srvdesc->conn_n = srv->conn_n;
cl->conn_n = ci;
srvdesc->srv = srv;
atomic64_set(&srvdesc->counter, 0);

rcu_assign_pointer(srvdesc->cl, cl);
return 0;
err:
kfree(srvdesc->conn);
kfree(srvdesc->cl);
return -EINVAL;
}

Expand All @@ -1081,7 +1095,7 @@ tfw_sched_ratio_srvdesc_setup(TfwSrvGroup *sg, TfwRatio *ratio)
int r;
size_t si = 0;
TfwServer *srv;
TfwRatioSrvDesc *srvdesc = ratio->srvdesc;
TfwRatioSrvDesc *srvdesc = rcu_dereference_bh_check(ratio->srvdesc, 1);

list_for_each_entry(srv, &sg->srv_list, list) {
if (unlikely((si++ == sg->srv_n) || !srv->conn_n
Expand Down Expand Up @@ -1277,10 +1291,17 @@ static void
tfw_sched_ratio_put_srv_data(struct rcu_head *rcu)
{
TfwRatioSrvDesc *srvdesc = container_of(rcu, TfwRatioSrvDesc, rcu);
kfree(srvdesc->conn);
kfree(srvdesc->cl);
kfree(srvdesc);
}

static void
tfw_sched_ratio_put_conn_data(struct rcu_head *rcu)
{
TfwRatioSrvConnList *cl = container_of(rcu, TfwRatioSrvConnList, rcu);
kfree(cl);
}

static void
tfw_sched_ratio_del_srv(TfwServer *srv)
{
Expand All @@ -1291,13 +1312,48 @@ tfw_sched_ratio_del_srv(TfwServer *srv)
call_rcu(&srvdesc->rcu, tfw_sched_ratio_put_srv_data);
}

static int
tfw_sched_ratio_upd_srv(TfwServer *srv)
{
TfwRatioSrvDesc *srvdesc = rcu_dereference_bh_check(srv->sched_data, 1);
size_t size, ci = 0;
TfwRatioSrvConnList *cl_copy;
TfwRatioSrvConnList *cl = rcu_dereference_bh_check(srvdesc->cl, 1);
TfwSrvConn *srv_conn;

size = sizeof(TfwRatioSrvConnList) + sizeof(TfwSrvConn *) * srv->conn_n;
if (!(cl_copy = kzalloc(size, GFP_ATOMIC)))
return -ENOMEM;

list_for_each_entry(srv_conn, &srv->conn_list, list) {
if (tfw_srv_conn_unscheduled(srv_conn))
continue;
if (unlikely(ci++ == srv->conn_n))
goto err;
cl_copy->conns[ci-1] = srv_conn;
}
if (unlikely(ci != srv->conn_n))
goto err;
cl->conn_n = ci;
rcu_assign_pointer(srvdesc->cl, cl_copy);

if (srvdesc)
call_rcu(&cl->rcu, tfw_sched_ratio_put_conn_data);

return 0;
err:
kfree(cl_copy);
return -EINVAL;
}

static TfwScheduler tfw_sched_ratio = {
.name = "ratio",
.list = LIST_HEAD_INIT(tfw_sched_ratio.list),
.add_grp = tfw_sched_ratio_add_grp,
.del_grp = tfw_sched_ratio_del_grp,
.add_srv = tfw_sched_ratio_add_srv,
.del_srv = tfw_sched_ratio_del_srv,
.upd_srv = tfw_sched_ratio_upd_srv,
.sched_sg_conn = tfw_sched_ratio_sched_sg_conn,
.sched_srv_conn = tfw_sched_ratio_sched_srv_conn,
};
Expand Down
Loading

0 comments on commit eac58dd

Please sign in to comment.