Skip to content

Commit

Permalink
Merge pull request #325 from kzvezdarov/split-nsqd-app
Browse files Browse the repository at this point in the history
nsqd: make importable
  • Loading branch information
mreiferson committed Mar 15, 2014
2 parents 1b894b3 + 4990096 commit 76b9dcb
Show file tree
Hide file tree
Showing 28 changed files with 217 additions and 214 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
build
nsqd/nsqd
apps/nsqlookupd/nsqlookupd
apps/nsqd/nsqd
nsqreader/nsqreader
nsqstatsd/nsqstatsd
nsqadmin/nsqadmin
Expand Down
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ DESTDIR=
GOFLAGS=
BINDIR=${PREFIX}/bin

NSQD_SRCS = $(wildcard nsqd/*.go nsq/*.go util/*.go util/pqueue/*.go)
NSQD_SRCS = $(wildcard apps/nsqd/*.go nsqd/*.go nsq/*.go util/*.go util/pqueue/*.go)
NSQLOOKUPD_SRCS = $(wildcard apps/nsqlookupd/*.go nsqlookupd/*.go nsq/*.go util/*.go)
NSQADMIN_SRCS = $(wildcard nsqadmin/*.go nsqadmin/templates/*.go util/*.go)
NSQ_PUBSUB_SRCS = $(wildcard apps/nsq_pubsub/*.go nsq/*.go util/*.go)
Expand All @@ -13,8 +13,8 @@ NSQ_TO_HTTP_SRCS = $(wildcard apps/nsq_to_http/*.go nsq/*.go util/*.go)
NSQ_TAIL_SRCS = $(wildcard apps/nsq_tail/*.go nsq/*.go util/*.go)
NSQ_STAT_SRCS = $(wildcard apps/nsq_stat/*.go util/*.go util/lookupd/*.go)

BINARIES = nsqd nsqadmin
APPS = nsqlookupd nsq_pubsub nsq_to_nsq nsq_to_file nsq_to_http nsq_tail nsq_stat
BINARIES = nsqadmin
APPS = nsqlookupd nsqd nsq_pubsub nsq_to_nsq nsq_to_file nsq_to_http nsq_tail nsq_stat
BLDDIR = build

all: $(BINARIES) $(APPS)
Expand All @@ -26,7 +26,7 @@ $(BLDDIR)/%:
$(BINARIES): %: $(BLDDIR)/%
$(APPS): %: $(BLDDIR)/apps/%

$(BLDDIR)/nsqd: $(NSQD_SRCS)
$(BLDDIR)/apps/nsqd: $(NSQD_SRCS)
$(BLDDIR)/apps/nsqlookupd: $(NSQLOOKUPD_SRCS)
$(BLDDIR)/nsqadmin: $(NSQADMIN_SRCS)
$(BLDDIR)/apps/nsq_pubsub: $(NSQ_PUBSUB_SRCS)
Expand All @@ -45,8 +45,8 @@ clean:

install: $(BINARIES) $(EXAMPLES)
install -m 755 -d ${DESTDIR}${BINDIR}
install -m 755 $(BLDDIR)/nsqd ${DESTDIR}${BINDIR}/nsqd
install -m 755 $(BLDDIR)/apps/nsqlookupd ${DESTDIR}${BINDIR}/nsqlookupd
install -m 755 $(BLDDIR)/apps/nsqd ${DESTDIR}${BINDIR}/nsqd
install -m 755 $(BLDDIR)/nsqadmin ${DESTDIR}${BINDIR}/nsqadmin
install -m 755 $(BLDDIR)/apps/nsq_pubsub ${DESTDIR}${BINDIR}/nsq_pubsub
install -m 755 $(BLDDIR)/apps/nsq_to_nsq ${DESTDIR}${BINDIR}/nsq_to_nsq
Expand Down
5 changes: 3 additions & 2 deletions nsqd/main.go → apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/bitly/nsq/nsqd"
"github.com/bitly/nsq/util"
"github.com/mreiferson/go-options"
)
Expand Down Expand Up @@ -100,9 +101,9 @@ func main() {
}
}

opts := NewNSQDOptions()
opts := nsqd.NewNSQDOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := NewNSQD(opts)
nsqd := nsqd.NewNSQD(opts)

log.Println(util.Version("nsqd"))
log.Printf("worker id %d", opts.ID)
Expand Down
20 changes: 10 additions & 10 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package nsqd

import (
"bytes"
Expand Down Expand Up @@ -46,7 +46,7 @@ type Channel struct {

topicName string
name string
context *Context
context *context

backend BackendQueue

Expand Down Expand Up @@ -86,7 +86,7 @@ type inFlightMessage struct {
}

// NewChannel creates a new instance of the Channel type and returns a pointer
func NewChannel(topicName string, channelName string, context *Context,
func NewChannel(topicName string, channelName string, context *context,
deleteCallback func(*Channel)) *Channel {

c := &Channel{
Expand All @@ -111,11 +111,11 @@ func NewChannel(topicName string, channelName string, context *Context,

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeralChannel = true
c.backend = NewDummyBackendQueue()
c.backend = newDummyBackendQueue()
} else {
// backend names, for uniqueness, automatically include the topic... <topic>:<channel>
backendName := topicName + ":" + channelName
c.backend = NewDiskQueue(backendName,
c.backend = newDiskQueue(backendName,
context.nsqd.options.DataPath,
context.nsqd.options.MaxBytesPerFile,
context.nsqd.options.SyncEvery,
Expand Down Expand Up @@ -243,7 +243,7 @@ func (c *Channel) flush() error {
// this will read until its closed (exited)
for msg := range c.clientMsgChan {
log.Printf("CHANNEL(%s): recovered buffered message from clientMsgChan", c.name)
WriteMessageToBackend(&msgBuf, msg, c.backend)
writeMessageToBackend(&msgBuf, msg, c.backend)
}

if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
Expand All @@ -254,7 +254,7 @@ func (c *Channel) flush() error {
for {
select {
case msg := <-c.memoryMsgChan:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
Expand All @@ -266,15 +266,15 @@ func (c *Channel) flush() error {
finish:
for _, item := range c.inFlightMessages {
msg := item.Value.(*inFlightMessage).msg
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

for _, item := range c.deferredMessages {
msg := item.Value.(*nsq.Message)
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
Expand Down Expand Up @@ -560,7 +560,7 @@ func (c *Channel) router() {
select {
case c.memoryMsgChan <- msg:
default:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("CHANNEL(%s) ERROR: failed to write message to backend - %s", c.name, err.Error())
// theres not really much we can do at this point, you're certainly
Expand Down
4 changes: 2 additions & 2 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package nsqd

import (
"io/ioutil"
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestChannelEmptyConsumer(t *testing.T) {
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := NewClientV2(0, conn, &Context{nsqd})
client := newClientV2(0, conn, &context{nsqd})
client.SetReadyCount(25)
channel.AddClient(client.ID, client)

Expand Down
Loading

0 comments on commit 76b9dcb

Please sign in to comment.