Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stub: support restart after stub stopped #91

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 51 additions & 32 deletions pkg/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ type PostUpdateContainerInterface interface {

// Stub is the interface the stub provides for the plugin implementation.
type Stub interface {
// Run the plugin. Starts the plugin then waits for an error or the plugin to stop
// Run starts the plugin then waits for the plugin service to exit, either due to a
// critical error or an explicit call to Stop(). Once Run() returns, the plugin can be
// restarted by calling Run() or Start() again.
Run(context.Context) error
// Start the plugin.
Start(context.Context) error
Expand Down Expand Up @@ -255,7 +257,6 @@ type stub struct {
rpcs *ttrpc.Server
rpcc *ttrpc.Client
runtime api.RuntimeService
closeOnce sync.Once
started bool
doneC chan struct{}
srvErrC chan error
Expand Down Expand Up @@ -288,7 +289,6 @@ func New(p interface{}, opts ...Option) (Stub, error) {
idx: os.Getenv(api.PluginIdxEnvVar),
socketPath: api.DefaultSocketPath,
dialer: func(p string) (stdnet.Conn, error) { return stdnet.Dial("unix", p) },
doneC: make(chan struct{}),
}

for _, o := range opts {
Expand Down Expand Up @@ -316,10 +316,10 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {
stub.Lock()
defer stub.Unlock()

if stub.started {
if stub.isStarted() {
return fmt.Errorf("stub already started")
}
stub.started = true
stub.doneC = make(chan struct{})
fuweid marked this conversation as resolved.
Show resolved Hide resolved

err := stub.connect()
if err != nil {
Expand Down Expand Up @@ -378,10 +378,11 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {

stub.srvErrC = make(chan error, 1)
stub.cfgErrC = make(chan error, 1)
go func() {
stub.srvErrC <- rpcs.Serve(ctx, rpcl)
close(stub.doneC)
}()

go func(l stdnet.Listener, doneC chan struct{}, srvErrC chan error) {
srvErrC <- rpcs.Serve(ctx, l)
close(doneC)
}(rpcl, stub.doneC, stub.srvErrC)

stub.rpcm = rpcm
stub.rpcl = rpcl
Expand All @@ -401,6 +402,7 @@ func (stub *stub) Start(ctx context.Context) (retErr error) {

log.Infof(ctx, "Started plugin %s...", stub.Name())

stub.started = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 398 is still working? We can fix it in the follow-up.

if err = <-stub.cfgErrC; err != nil {
		return err
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think before we need to let stub.Configure() may been called more than once, and these code are must run after stub.register(), we can let it remain as it is for the time being, the currently implementation of stub's restart is depend that stub.register() is executed every time after restart .

return nil
}

Expand All @@ -413,24 +415,42 @@ func (stub *stub) Stop() {
stub.close()
}

// IsStarted returns true if the plugin has been started either by Start() or by Run().
func (stub *stub) IsStarted() bool {
stub.Lock()
defer stub.Unlock()
return stub.isStarted()
}

func (stub *stub) isStarted() bool {
return stub.started
}

// reset stub to the status that can initiate a new
// NRI connection, the caller must hold lock.
func (stub *stub) close() {
stub.closeOnce.Do(func() {
if stub.rpcl != nil {
stub.rpcl.Close()
}
if stub.rpcs != nil {
stub.rpcs.Close()
}
if stub.rpcc != nil {
stub.rpcc.Close()
}
if stub.rpcm != nil {
stub.rpcm.Close()
}
if stub.srvErrC != nil {
<-stub.doneC
}
})
if !stub.isStarted() {
return
}

if stub.rpcl != nil {
stub.rpcl.Close()
}
if stub.rpcs != nil {
stub.rpcs.Close()
}
if stub.rpcc != nil {
stub.rpcc.Close()
}
if stub.rpcm != nil {
stub.rpcm.Close()
}
if stub.srvErrC != nil {
<-stub.doneC
}

stub.started = false
stub.conn = nil
}

// Run the plugin. Start event processing then wait for an error or getting stopped.
Expand All @@ -449,14 +469,11 @@ func (stub *stub) Run(ctx context.Context) error {
return err
}

// Wait for the plugin to stop.
// Wait for the plugin to stop, should be called after Start() or Run().
func (stub *stub) Wait() {
stub.Lock()
if stub.srvErrC == nil {
return
if stub.IsStarted() {
<-stub.doneC
}
stub.Unlock()
<-stub.doneC
}

// Name returns the full indexed name of the plugin.
Expand Down Expand Up @@ -518,7 +535,9 @@ func (stub *stub) register(ctx context.Context) error {

// Handle a lost connection.
func (stub *stub) connClosed() {
stub.Lock()
stub.close()
stub.Unlock()
if stub.onClose != nil {
stub.onClose()
return
Expand Down
Loading