From 8af88db0793706e702730500c21e8bea0a59ba98 Mon Sep 17 00:00:00 2001 From: Lei Liu Date: Thu, 4 Jul 2024 16:50:59 +0800 Subject: [PATCH] stub: support restart after ttrpc connection lost In some scenarios such as runtime restart or the occurrence of nri request timeout, the ttrpc connections between the plugin and the runtime will be actively closed by the runtime, even the underlying network connection will be closed together. After this, the plugin must need to re-register to the adaptation side, but now the stub object cannot be reused for this; if the running plugin wants to reconnect to the runtime, the only way is to create a new stub for the plugin. This commit introduce a new stub option `WithAutoRestartDelay`. The plugin developer can use this to build a reuseable stub, which will wait for a confiurable time duration and restart automatically after the ttrpc connection is lost, then plugin will be auto-reregistered to the adaptation. Signed-off-by: Lei Liu --- pkg/stub/stub.go | 48 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/pkg/stub/stub.go b/pkg/stub/stub.go index 3cc837b0..403953b1 100644 --- a/pkg/stub/stub.go +++ b/pkg/stub/stub.go @@ -153,6 +153,9 @@ type Stub interface { const ( // Plugin registration timeout. registrationTimeout = 2 * time.Second + + // Plugin auto restart delay duration, default negative value indicates no restart. + restartDelay = -1 * time.Second ) var ( @@ -236,6 +239,15 @@ func WithTTRPCOptions(clientOpts []ttrpc.ClientOpts, serverOpts []ttrpc.ServerOp } } +// WithAutoRestartDelay sets the delay duration for auto restart +// after ttrpc connection lost. +func WithAutoRestartDelay(d time.Duration) Option { + return func(s *stub) error { + s.restartDelay = d + return nil + } +} + // stub implements Stub. type stub struct { sync.Mutex @@ -260,6 +272,11 @@ type stub struct { doneC chan struct{} srvErrC chan error cfgErrC chan error + + // when ttrpc connection closed by runtime: such as in scenarios like + // runtime restart or nri request timeout, plugin(stub) will automatically + // restart after waiting for the restartDelay time. + restartDelay time.Duration } // Handlers for NRI plugin event and request. @@ -283,12 +300,12 @@ type handlers struct { // New creates a stub with the given plugin and options. func New(p interface{}, opts ...Option) (Stub, error) { stub := &stub{ - plugin: p, - name: os.Getenv(api.PluginNameEnvVar), - idx: os.Getenv(api.PluginIdxEnvVar), - socketPath: api.DefaultSocketPath, - dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) }, - doneC: make(chan struct{}), + plugin: p, + name: os.Getenv(api.PluginNameEnvVar), + idx: os.Getenv(api.PluginIdxEnvVar), + socketPath: api.DefaultSocketPath, + dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) }, + restartDelay: restartDelay, } for _, o := range opts { @@ -320,6 +337,7 @@ func (stub *stub) Start(ctx context.Context) (retErr error) { return fmt.Errorf("stub already started") } stub.started = true + stub.doneC = make(chan struct{}) err := stub.connect() if err != nil { @@ -426,23 +444,37 @@ func (stub *stub) close() { } if stub.rpcm != nil { stub.rpcm.Close() + stub.conn = nil } if stub.srvErrC != nil { <-stub.doneC } + stub.started = false }) + if stub.willRestart() { + stub.closeOnce = sync.Once{} + } +} + +func (stub *stub) willRestart() bool { + return stub.restartDelay >= 0 } // Run the plugin. Start event processing then wait for an error or getting stopped. func (stub *stub) Run(ctx context.Context) error { var err error +START: if err = stub.Start(ctx); err != nil { return err } err = <-stub.srvErrC if err == ttrpc.ErrServerClosed { + if stub.willRestart() { + time.Sleep(stub.restartDelay) + goto START // auto restart plugin when ttrpc closed by server + } return nil } @@ -524,7 +556,9 @@ func (stub *stub) connClosed() { return } - os.Exit(0) + if !stub.willRestart() { + os.Exit(0) + } } //