Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: deleting a "live" topic #182

Merged
merged 1 commit into from
Apr 28, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,22 +292,26 @@ func (n *NSQd) GetExistingTopic(topicName string) (*Topic, error) {

// DeleteExistingTopic removes a topic only if it exists
func (n *NSQd) DeleteExistingTopic(topicName string) error {
n.Lock()
n.RLock()
topic, ok := n.topicMap[topicName]
if !ok {
n.Unlock()
n.RUnlock()
return errors.New("topic does not exist")
}
delete(n.topicMap, topicName)
// not defered so that we can continue while the topic async closes
n.Unlock()

log.Printf("TOPIC(%s): deleting", topic.name)
n.RUnlock()

// delete empties all channels and the topic itself before closing
// (so that we dont leave any messages around)
//
// we do this before removing the topic from map below (with no lock)
// so that any incoming writes will error and not create a new topic
// to enforce ordering
topic.Delete()

n.Lock()
delete(n.topicMap, topicName)
n.Unlock()

return nil
}

Expand Down
17 changes: 11 additions & 6 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,10 @@ func (t *Topic) router() {

// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
err := t.exit(true)
// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
go t.notifier.Notify(t)
return err
return t.exit(true)
}

// Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error {
return t.exit(false)
}
Expand All @@ -235,7 +232,15 @@ func (t *Topic) exit(deleted bool) error {
return errors.New("exiting")
}

log.Printf("TOPIC(%s): closing", t.name)
if deleted {
log.Printf("TOPIC(%s): deleting", t.name)

// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
go t.notifier.Notify(t)
} else {
log.Printf("TOPIC(%s): closing", t.name)
}

close(t.exitChan)

Expand Down