Skip to content

Commit

Permalink
Merge pull request #182 from mreiferson/active_deletion_182
Browse files Browse the repository at this point in the history
nsqd: deleting a "live" topic
  • Loading branch information
jehiah committed Apr 28, 2013
2 parents 6375db4 + a0a6ce6 commit 1f833d2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
18 changes: 11 additions & 7 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,22 +292,26 @@ func (n *NSQd) GetExistingTopic(topicName string) (*Topic, error) {

// DeleteExistingTopic removes a topic only if it exists
func (n *NSQd) DeleteExistingTopic(topicName string) error {
n.Lock()
n.RLock()
topic, ok := n.topicMap[topicName]
if !ok {
n.Unlock()
n.RUnlock()
return errors.New("topic does not exist")
}
delete(n.topicMap, topicName)
// not defered so that we can continue while the topic async closes
n.Unlock()

log.Printf("TOPIC(%s): deleting", topic.name)
n.RUnlock()

// delete empties all channels and the topic itself before closing
// (so that we dont leave any messages around)
//
// we do this before removing the topic from map below (with no lock)
// so that any incoming writes will error and not create a new topic
// to enforce ordering
topic.Delete()

n.Lock()
delete(n.topicMap, topicName)
n.Unlock()

return nil
}

Expand Down
17 changes: 11 additions & 6 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,10 @@ func (t *Topic) router() {

// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
err := t.exit(true)
// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
go t.notifier.Notify(t)
return err
return t.exit(true)
}

// Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error {
return t.exit(false)
}
Expand All @@ -235,7 +232,15 @@ func (t *Topic) exit(deleted bool) error {
return errors.New("exiting")
}

log.Printf("TOPIC(%s): closing", t.name)
if deleted {
log.Printf("TOPIC(%s): deleting", t.name)

// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
go t.notifier.Notify(t)
} else {
log.Printf("TOPIC(%s): closing", t.name)
}

close(t.exitChan)

Expand Down

0 comments on commit 1f833d2

Please sign in to comment.