diff --git a/pkg/stub/stub.go b/pkg/stub/stub.go index 3cc837b0..403953b1 100644 --- a/pkg/stub/stub.go +++ b/pkg/stub/stub.go @@ -153,6 +153,9 @@ type Stub interface { const ( // Plugin registration timeout. registrationTimeout = 2 * time.Second + + // Plugin auto restart delay duration, default negative value indicates no restart. + restartDelay = -1 * time.Second ) var ( @@ -236,6 +239,15 @@ func WithTTRPCOptions(clientOpts []ttrpc.ClientOpts, serverOpts []ttrpc.ServerOp } } +// WithAutoRestartDelay sets the delay duration for auto restart +// after ttrpc connection lost. +func WithAutoRestartDelay(d time.Duration) Option { + return func(s *stub) error { + s.restartDelay = d + return nil + } +} + // stub implements Stub. type stub struct { sync.Mutex @@ -260,6 +272,11 @@ type stub struct { doneC chan struct{} srvErrC chan error cfgErrC chan error + + // when ttrpc connection closed by runtime: such as in scenarios like + // runtime restart or nri request timeout, plugin(stub) will automatically + // restart after waiting for the restartDelay time. + restartDelay time.Duration } // Handlers for NRI plugin event and request. @@ -283,12 +300,12 @@ type handlers struct { // New creates a stub with the given plugin and options. func New(p interface{}, opts ...Option) (Stub, error) { stub := &stub{ - plugin: p, - name: os.Getenv(api.PluginNameEnvVar), - idx: os.Getenv(api.PluginIdxEnvVar), - socketPath: api.DefaultSocketPath, - dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) }, - doneC: make(chan struct{}), + plugin: p, + name: os.Getenv(api.PluginNameEnvVar), + idx: os.Getenv(api.PluginIdxEnvVar), + socketPath: api.DefaultSocketPath, + dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) }, + restartDelay: restartDelay, } for _, o := range opts { @@ -320,6 +337,7 @@ func (stub *stub) Start(ctx context.Context) (retErr error) { return fmt.Errorf("stub already started") } stub.started = true + stub.doneC = make(chan struct{}) err := stub.connect() if err != nil { @@ -426,23 +444,37 @@ func (stub *stub) close() { } if stub.rpcm != nil { stub.rpcm.Close() + stub.conn = nil } if stub.srvErrC != nil { <-stub.doneC } + stub.started = false }) + if stub.willRestart() { + stub.closeOnce = sync.Once{} + } +} + +func (stub *stub) willRestart() bool { + return stub.restartDelay >= 0 } // Run the plugin. Start event processing then wait for an error or getting stopped. func (stub *stub) Run(ctx context.Context) error { var err error +START: if err = stub.Start(ctx); err != nil { return err } err = <-stub.srvErrC if err == ttrpc.ErrServerClosed { + if stub.willRestart() { + time.Sleep(stub.restartDelay) + goto START // auto restart plugin when ttrpc closed by server + } return nil } @@ -524,7 +556,9 @@ func (stub *stub) connClosed() { return } - os.Exit(0) + if !stub.willRestart() { + os.Exit(0) + } } //