Skip to content

Commit

Permalink
restartable view termination
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens committed Mar 14, 2018
1 parent 8acf000 commit ecdeedf
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ type View struct {
done chan bool
dead chan bool

errors multierr.Errors
stopOnce sync.Once
errors multierr.Errors
stopOnce sync.Once
mInit sync.Mutex
terminated bool
}

// NewView creates a new View object from a group.
Expand Down Expand Up @@ -110,6 +112,12 @@ func (v *View) createPartitions(brokers []string) (err error) {

// reinit (re)initializes the view and its partitions to connect to Kafka
func (v *View) reinit() error {
v.mInit.Lock()
defer v.mInit.Unlock()
if v.terminated {
return fmt.Errorf("view: cannot reinitialize terminated view")
}

consumer, err := v.opts.builders.consumer(v.brokers, "goka-view", v.opts.clientID)
if err != nil {
return fmt.Errorf("view: cannot create Kafka consumer: %v", err)
Expand All @@ -127,7 +135,6 @@ func (v *View) reinit() error {

// Start starts consuming the view's topic.
func (v *View) Start() error {

if err := v.reinit(); err != nil {
return err
}
Expand Down Expand Up @@ -209,6 +216,16 @@ func (v *View) close() {
// Stop stops the view, closes storage partitions, and frees resources
func (v *View) Stop() {
v.opts.log.Printf("View: stopping")

// do not allow any reinitialization
v.mInit.Lock()
if v.terminated {
return
v.mInit.Unlock()
}
v.terminated = true
v.mInit.Unlock()

v.stop()

if v.opts.restartable {
Expand Down

0 comments on commit ecdeedf

Please sign in to comment.