From 5733aaabf1ce39e3efaed406c16908aeffdefc27 Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 19 Jul 2024 11:52:53 +0800 Subject: [PATCH] [INLONG-16070][SDK] Fix potential block in Golang SDK --- .../dataproxy-sdk-golang/dataproxy/client.go | 28 +++++-- .../dataproxy-sdk-golang/dataproxy/worker.go | 83 +++++++++++++++---- 2 files changed, 86 insertions(+), 25 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go index b6f3a0f2af7..cce1e4e4abc 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go @@ -91,23 +91,23 @@ func NewClient(opts ...Option) (Client, error) { func (c *client) initAll() error { // the following initialization order must not be changed。 - err := c.initDiscoverer() + err := c.initMetrics() if err != nil { return err } - err = c.initNetClient() + err = c.initDiscoverer() if err != nil { return err } - err = c.initConns() + err = c.initNetClient() if err != nil { return err } - err = c.initFramer() + err = c.initConns() if err != nil { return err } - err = c.initMetrics() + err = c.initFramer() if err != nil { return err } @@ -162,10 +162,8 @@ func (c *client) initConns() error { endpoints[i] = epList[i].Addr } - // maximum connection number per endpoint is 3 - connsPerEndpoint := c.options.WorkerNum/epLen + 1 - connsPerEndpoint = int(math.Min(3, float64(connsPerEndpoint))) - + // minimum connection number per endpoint is 1 + connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 / float64(epLen))) pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, c.log) if err != nil { return err @@ -296,6 +294,18 @@ func (c *client) OnClose(conn gnet.Conn, err error) gnet.Action { c.log.Warn("connection closed: ", conn.RemoteAddr(), ", err: ", err) c.metrics.incError(errConnClosedByPeer.strCode) } + + // delete this conn from conn pool + if c.connPool != nil { + c.connPool.OnConnClosed(conn, err) + } + + if err != nil { + for _, w := range c.workers { + w.onConnClosed(conn, err) + } + } + return gnet.None } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go index 2991364b823..935c69732c6 100755 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/worker.go @@ -25,11 +25,12 @@ import ( "github.com/gofrs/uuid" + "github.com/panjf2000/gnet/v2" + "go.uber.org/atomic" + "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger" "github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx" - "github.com/panjf2000/gnet/v2" - "go.uber.org/atomic" ) const ( @@ -114,6 +115,7 @@ type worker struct { pendingBatches map[string]*batchReq // pending batches unackedBatches map[string]*batchReq // sent but not acknowledged batches sendFailedBatches chan *sendFailedBatchReq // send failed batches channel + updateConnChan chan error // update conn channel retryBatches chan *batchReq // retry batches channel responseBatches chan *batchRsp // batch response channel batchTimeoutTicker *time.Ticker // batch timeout ticker @@ -145,6 +147,7 @@ func newWorker(cli *client, index int, opts *Options) (*worker, error) { pendingBatches: make(map[string]*batchReq), unackedBatches: make(map[string]*batchReq), sendFailedBatches: make(chan *sendFailedBatchReq, opts.MaxPendingMessages), + updateConnChan: make(chan error, 64), retryBatches: make(chan *batchReq, opts.MaxPendingMessages), responseBatches: make(chan *batchRsp, opts.MaxPendingMessages), batchTimeoutTicker: time.NewTicker(opts.BatchingMaxPublishDelay), @@ -166,6 +169,7 @@ func newWorker(cli *client, index int, opts *Options) (*worker, error) { if err != nil { return nil, err } + w.log.Debug("use conn: ", conn.RemoteAddr().String()) w.setConn(conn) // start the worker @@ -184,7 +188,7 @@ func (w *worker) start() { go func() { defer func() { if rec := recover(); rec != nil { - w.log.Errorf("panic:", rec) + w.log.Error("panic:", rec) w.log.Error(string(debug.Stack())) w.metrics.incError(errServerPanic.getStrCode()) } @@ -220,6 +224,12 @@ func (w *worker) start() { case <-w.updateConnTicker.C: // update connection periodically w.handleUpdateConn() + case e, ok := <-w.updateConnChan: + if !ok { + continue + } + // update conn + w.updateConn(nil, e) case batch, ok := <-w.sendFailedBatches: // handle send failed batches if !ok { @@ -369,7 +379,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { b.lastSendTime = time.Now() b.encode() - //error callback + // error callback onErr := func(c gnet.Conn, e error, inCallback bool) { defer func() { if rec := recover(); rec != nil { @@ -380,7 +390,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { }() w.metrics.incError(errConnWriteFailed.getStrCode()) - w.log.Error("send batch failed, err:", e) + w.log.Error("send batch failed, err: ", e, ", inCallback: ", inCallback, ", logNum:", len(b.dataReqs)) // close already if w.getState() == stateClosed { @@ -388,19 +398,21 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { return } - // network error, change a new connection - w.updateConn(c, errConnWriteFailed) - // important:when AsyncWrite() call succeed, the batch will be put into w.unackedBatches,now it failed, we need // to delete from w.unackedBatches, as onErr() is call concurrently in different goroutine, we can not delete it // from this callback directly, or will be panic, so we put into the w.sendFailedBatches channel, and it will be // deleted and retried in handleSendFailed() one by one if inCallback { + // can not call w.updateConn() in callback, updateConn() may open new conn, which will call gent.Client.Dial() + // gent.Client.Dial() and this callback are run in a same goroutine, it will be blocked + w.updateConnAsync(errConnWriteFailed) w.sendFailedBatches <- &sendFailedBatchReq{batch: b, retry: retryOnFail} return } // in a same goroutine, retry it directly + // network error, change a new connection + w.updateConn(c, errConnWriteFailed) if retryOnFail { // w.retryBatches <- b w.backoffRetry(context.Background(), b) @@ -411,15 +423,18 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) { // very important:'cause we use gnet, we must call AsyncWrite to send data in goroutines that are different from gnet.OnTraffic() callback conn := w.getConn() + if b.retries > 0 { + w.log.Debug("retry batch to conn:", conn.RemoteAddr(), ", workerID:", w.index, ", batchID:", b.batchID, ", logNum:", len(b.dataReqs)) + } err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error) error { if e != nil { - onErr(c, e, true) //error callback + onErr(c, e, true) // error callback } return nil }) if err != nil { - onErr(conn, err, false) //error callback + onErr(conn, err, false) // error callback return } @@ -485,6 +500,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) { } // put the batch into the retry channel + w.log.Debug("put to retry...") w.retryBatches <- batch case <-ctx.Done(): // in the case the process exit, just end up the batch sending routine @@ -501,6 +517,7 @@ func (w *worker) handleRetry(batch *batchReq, retryOnFail bool) { } // retry + w.log.Debug("retry batch...", ", workerID:", w.index, ", batchID:", batch.batchID) w.metrics.incRetry(w.indexStr) w.sendBatch(batch, retryOnFail) } @@ -553,17 +570,23 @@ func (w *worker) handleSendHeartbeat() { bb := w.bufferPool.Get() bytes := hb.encode(bb) - onErr := func(c gnet.Conn, e error) { + onErr := func(c gnet.Conn, e error, inCallback bool) { w.metrics.incError(errConnWriteFailed.getStrCode()) w.log.Error("send heartbeat failed, err:", e) - w.updateConn(c, errConnWriteFailed) + if inCallback { + // can not call w.updateConn() in callback, updateConn() may open new conn, which will call gent.Client.Dial() + // gent.Client.Dial() and this callback are run in a same goroutine, it will be blocked + w.updateConnAsync(errConnWriteFailed) + } else { + w.updateConn(c, errConnWriteFailed) + } } // very important:'cause we use gnet, we must call AsyncWrite to send data in goroutines that are different from gnet.OnTraffic() callback conn := w.getConn() err := conn.AsyncWrite(bytes, func(c gnet.Conn, e error) error { if e != nil { - onErr(c, e) + onErr(c, e, true) } // recycle the buffer w.bufferPool.Put(bb) @@ -571,7 +594,7 @@ func (w *worker) handleSendHeartbeat() { }) if err != nil { - onErr(conn, err) + onErr(conn, err, false) // recycle the buffer w.bufferPool.Put(bb) } @@ -681,7 +704,9 @@ func (w *worker) handleClose(req *closeReq) { close(w.retryBatches) // close the send failed channel close(w.sendFailedBatches) - // close the response channel + // close the update conn chan + close(w.updateConnChan) + // close the response chan close(w.responseBatches) // close the done channel of the close request to notify the close is done close(req.doneCh) @@ -713,6 +738,18 @@ func (w *worker) handleUpdateConn() { w.updateConn(nil, nil) } +func (w *worker) updateConnAsync(err error) { + // 已经处于关闭状态 + if w.getState() == stateClosed { + return + } + + select { + case w.updateConnChan <- err: + default: + } +} + func (w *worker) updateConn(old gnet.Conn, err error) { newConn, newErr := w.client.getConn() if newErr != nil { @@ -726,9 +763,16 @@ func (w *worker) updateConn(old gnet.Conn, err error) { oldConn = w.getConn() } - w.client.putConn(oldConn, err) ok := w.casConn(oldConn, newConn) if ok { + // put back to pool only if there is no error + if err == nil { + w.client.putConn(oldConn, err) + } else { // nolint:staticcheck + // if there are some errors, there are basically conn closed by peer, + // gnet will call Client.OnClose() to delete it from the pool, + // it won't be wrong even though we do not put it back here + } w.metrics.incUpdateConn(getErrorCode(err)) } else { w.client.putConn(newConn, nil) @@ -743,6 +787,13 @@ func (w *worker) getConn() gnet.Conn { return w.conn.Load().(gnet.Conn) } +func (w *worker) onConnClosed(conn gnet.Conn, err error) { + oldConn := w.conn.Load().(gnet.Conn) + if oldConn == conn { + w.updateConnAsync(err) + } +} + func (w *worker) casConn(oldConn, newConn gnet.Conn) bool { return w.conn.CompareAndSwap(oldConn, newConn) }