Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-16070][SDK] Fix potential block in Golang SDK #10674

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,23 @@ func NewClient(opts ...Option) (Client, error) {

func (c *client) initAll() error {
// the following initialization order must not be changed。
err := c.initDiscoverer()
err := c.initMetrics()
if err != nil {
return err
}
err = c.initNetClient()
err = c.initDiscoverer()
if err != nil {
return err
}
err = c.initConns()
err = c.initNetClient()
if err != nil {
return err
}
err = c.initFramer()
err = c.initConns()
if err != nil {
return err
}
err = c.initMetrics()
err = c.initFramer()
if err != nil {
return err
}
Expand Down Expand Up @@ -162,10 +162,8 @@ func (c *client) initConns() error {
endpoints[i] = epList[i].Addr
}

// maximum connection number per endpoint is 3
connsPerEndpoint := c.options.WorkerNum/epLen + 1
connsPerEndpoint = int(math.Min(3, float64(connsPerEndpoint)))

// minimum connection number per endpoint is 1
connsPerEndpoint := int(math.Ceil(float64(c.options.WorkerNum) * 1.2 / float64(epLen)))
pool, err := connpool.NewConnPool(endpoints, connsPerEndpoint, 512, c, c.log)
if err != nil {
return err
Expand Down Expand Up @@ -296,6 +294,18 @@ func (c *client) OnClose(conn gnet.Conn, err error) gnet.Action {
c.log.Warn("connection closed: ", conn.RemoteAddr(), ", err: ", err)
c.metrics.incError(errConnClosedByPeer.strCode)
}

// delete this conn from conn pool
if c.connPool != nil {
c.connPool.OnConnClosed(conn, err)
}

if err != nil {
for _, w := range c.workers {
w.onConnClosed(conn, err)
}
}

return gnet.None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (

"github.com/gofrs/uuid"

"github.com/panjf2000/gnet/v2"
"go.uber.org/atomic"

"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/bufferpool"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/logger"
"github.com/apache/inlong/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/syncx"
"github.com/panjf2000/gnet/v2"
"go.uber.org/atomic"
)

const (
Expand Down Expand Up @@ -114,6 +115,7 @@ type worker struct {
pendingBatches map[string]*batchReq // pending batches
unackedBatches map[string]*batchReq // sent but not acknowledged batches
sendFailedBatches chan *sendFailedBatchReq // send failed batches channel
updateConnChan chan error // update conn channel
retryBatches chan *batchReq // retry batches channel
responseBatches chan *batchRsp // batch response channel
batchTimeoutTicker *time.Ticker // batch timeout ticker
Expand Down Expand Up @@ -145,6 +147,7 @@ func newWorker(cli *client, index int, opts *Options) (*worker, error) {
pendingBatches: make(map[string]*batchReq),
unackedBatches: make(map[string]*batchReq),
sendFailedBatches: make(chan *sendFailedBatchReq, opts.MaxPendingMessages),
updateConnChan: make(chan error, 64),
retryBatches: make(chan *batchReq, opts.MaxPendingMessages),
responseBatches: make(chan *batchRsp, opts.MaxPendingMessages),
batchTimeoutTicker: time.NewTicker(opts.BatchingMaxPublishDelay),
Expand All @@ -166,6 +169,7 @@ func newWorker(cli *client, index int, opts *Options) (*worker, error) {
if err != nil {
return nil, err
}
w.log.Debug("use conn: ", conn.RemoteAddr().String())
w.setConn(conn)

// start the worker
Expand All @@ -184,7 +188,7 @@ func (w *worker) start() {
go func() {
defer func() {
if rec := recover(); rec != nil {
w.log.Errorf("panic:", rec)
w.log.Error("panic:", rec)
w.log.Error(string(debug.Stack()))
w.metrics.incError(errServerPanic.getStrCode())
}
Expand Down Expand Up @@ -220,6 +224,12 @@ func (w *worker) start() {
case <-w.updateConnTicker.C:
// update connection periodically
w.handleUpdateConn()
case e, ok := <-w.updateConnChan:
if !ok {
continue
}
// update conn
w.updateConn(nil, e)
case batch, ok := <-w.sendFailedBatches:
// handle send failed batches
if !ok {
Expand Down Expand Up @@ -369,7 +379,7 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
b.lastSendTime = time.Now()
b.encode()

//error callback
// error callback
onErr := func(c gnet.Conn, e error, inCallback bool) {
defer func() {
if rec := recover(); rec != nil {
Expand All @@ -380,27 +390,29 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {
}()

w.metrics.incError(errConnWriteFailed.getStrCode())
w.log.Error("send batch failed, err:", e)
w.log.Error("send batch failed, err: ", e, ", inCallback: ", inCallback, ", logNum:", len(b.dataReqs))

// close already
if w.getState() == stateClosed {
b.done(errConnWriteFailed)
return
}

// network error, change a new connection
w.updateConn(c, errConnWriteFailed)

// important:when AsyncWrite() call succeed, the batch will be put into w.unackedBatches,now it failed, we need
// to delete from w.unackedBatches, as onErr() is call concurrently in different goroutine, we can not delete it
// from this callback directly, or will be panic, so we put into the w.sendFailedBatches channel, and it will be
// deleted and retried in handleSendFailed() one by one
if inCallback {
// can not call w.updateConn() in callback, updateConn() may open new conn, which will call gent.Client.Dial()
// gent.Client.Dial() and this callback are run in a same goroutine, it will be blocked
w.updateConnAsync(errConnWriteFailed)
w.sendFailedBatches <- &sendFailedBatchReq{batch: b, retry: retryOnFail}
return
}

// in a same goroutine, retry it directly
// network error, change a new connection
w.updateConn(c, errConnWriteFailed)
if retryOnFail {
// w.retryBatches <- b
w.backoffRetry(context.Background(), b)
Expand All @@ -411,15 +423,18 @@ func (w *worker) sendBatch(b *batchReq, retryOnFail bool) {

// very important:'cause we use gnet, we must call AsyncWrite to send data in goroutines that are different from gnet.OnTraffic() callback
conn := w.getConn()
if b.retries > 0 {
w.log.Debug("retry batch to conn:", conn.RemoteAddr(), ", workerID:", w.index, ", batchID:", b.batchID, ", logNum:", len(b.dataReqs))
}
err := conn.AsyncWrite(b.buffer.Bytes(), func(c gnet.Conn, e error) error {
if e != nil {
onErr(c, e, true) //error callback
onErr(c, e, true) // error callback
}
return nil
})

if err != nil {
onErr(conn, err, false) //error callback
onErr(conn, err, false) // error callback
return
}

Expand Down Expand Up @@ -485,6 +500,7 @@ func (w *worker) backoffRetry(ctx context.Context, batch *batchReq) {
}

// put the batch into the retry channel
w.log.Debug("put to retry...")
w.retryBatches <- batch
case <-ctx.Done():
// in the case the process exit, just end up the batch sending routine
Expand All @@ -501,6 +517,7 @@ func (w *worker) handleRetry(batch *batchReq, retryOnFail bool) {
}

// retry
w.log.Debug("retry batch...", ", workerID:", w.index, ", batchID:", batch.batchID)
w.metrics.incRetry(w.indexStr)
w.sendBatch(batch, retryOnFail)
}
Expand Down Expand Up @@ -553,25 +570,31 @@ func (w *worker) handleSendHeartbeat() {
bb := w.bufferPool.Get()
bytes := hb.encode(bb)

onErr := func(c gnet.Conn, e error) {
onErr := func(c gnet.Conn, e error, inCallback bool) {
w.metrics.incError(errConnWriteFailed.getStrCode())
w.log.Error("send heartbeat failed, err:", e)
w.updateConn(c, errConnWriteFailed)
if inCallback {
// can not call w.updateConn() in callback, updateConn() may open new conn, which will call gent.Client.Dial()
// gent.Client.Dial() and this callback are run in a same goroutine, it will be blocked
w.updateConnAsync(errConnWriteFailed)
} else {
w.updateConn(c, errConnWriteFailed)
}
}

// very important:'cause we use gnet, we must call AsyncWrite to send data in goroutines that are different from gnet.OnTraffic() callback
conn := w.getConn()
err := conn.AsyncWrite(bytes, func(c gnet.Conn, e error) error {
if e != nil {
onErr(c, e)
onErr(c, e, true)
}
// recycle the buffer
w.bufferPool.Put(bb)
return nil
})

if err != nil {
onErr(conn, err)
onErr(conn, err, false)
// recycle the buffer
w.bufferPool.Put(bb)
}
Expand Down Expand Up @@ -681,7 +704,9 @@ func (w *worker) handleClose(req *closeReq) {
close(w.retryBatches)
// close the send failed channel
close(w.sendFailedBatches)
// close the response channel
// close the update conn chan
close(w.updateConnChan)
// close the response chan
close(w.responseBatches)
// close the done channel of the close request to notify the close is done
close(req.doneCh)
Expand Down Expand Up @@ -713,6 +738,18 @@ func (w *worker) handleUpdateConn() {
w.updateConn(nil, nil)
}

func (w *worker) updateConnAsync(err error) {
// 已经处于关闭状态
if w.getState() == stateClosed {
return
}

select {
case w.updateConnChan <- err:
default:
}
}

func (w *worker) updateConn(old gnet.Conn, err error) {
newConn, newErr := w.client.getConn()
if newErr != nil {
Expand All @@ -726,9 +763,16 @@ func (w *worker) updateConn(old gnet.Conn, err error) {
oldConn = w.getConn()
}

w.client.putConn(oldConn, err)
ok := w.casConn(oldConn, newConn)
if ok {
// put back to pool only if there is no error
if err == nil {
w.client.putConn(oldConn, err)
} else { // nolint:staticcheck
// if there are some errors, there are basically conn closed by peer,
// gnet will call Client.OnClose() to delete it from the pool,
// it won't be wrong even though we do not put it back here
}
w.metrics.incUpdateConn(getErrorCode(err))
} else {
w.client.putConn(newConn, nil)
Expand All @@ -743,6 +787,13 @@ func (w *worker) getConn() gnet.Conn {
return w.conn.Load().(gnet.Conn)
}

func (w *worker) onConnClosed(conn gnet.Conn, err error) {
oldConn := w.conn.Load().(gnet.Conn)
if oldConn == conn {
w.updateConnAsync(err)
}
}

func (w *worker) casConn(oldConn, newConn gnet.Conn) bool {
return w.conn.CompareAndSwap(oldConn, newConn)
}
Expand Down
Loading