Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: make importable #325

Merged
merged 1 commit into from
Mar 15, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
build
nsqd/nsqd
apps/nsqlookupd/nsqlookupd
apps/nsqd/nsqd
nsqreader/nsqreader
nsqstatsd/nsqstatsd
nsqadmin/nsqadmin
Expand Down
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ DESTDIR=
GOFLAGS=
BINDIR=${PREFIX}/bin

NSQD_SRCS = $(wildcard nsqd/*.go nsq/*.go util/*.go util/pqueue/*.go)
NSQD_SRCS = $(wildcard apps/nsqd/*.go nsqd/*.go nsq/*.go util/*.go util/pqueue/*.go)
NSQLOOKUPD_SRCS = $(wildcard apps/nsqlookupd/*.go nsqlookupd/*.go nsq/*.go util/*.go)
NSQADMIN_SRCS = $(wildcard nsqadmin/*.go nsqadmin/templates/*.go util/*.go)
NSQ_PUBSUB_SRCS = $(wildcard apps/nsq_pubsub/*.go nsq/*.go util/*.go)
Expand All @@ -13,8 +13,8 @@ NSQ_TO_HTTP_SRCS = $(wildcard apps/nsq_to_http/*.go nsq/*.go util/*.go)
NSQ_TAIL_SRCS = $(wildcard apps/nsq_tail/*.go nsq/*.go util/*.go)
NSQ_STAT_SRCS = $(wildcard apps/nsq_stat/*.go util/*.go util/lookupd/*.go)

BINARIES = nsqd nsqadmin
APPS = nsqlookupd nsq_pubsub nsq_to_nsq nsq_to_file nsq_to_http nsq_tail nsq_stat
BINARIES = nsqadmin
APPS = nsqlookupd nsqd nsq_pubsub nsq_to_nsq nsq_to_file nsq_to_http nsq_tail nsq_stat
BLDDIR = build

all: $(BINARIES) $(APPS)
Expand All @@ -26,7 +26,7 @@ $(BLDDIR)/%:
$(BINARIES): %: $(BLDDIR)/%
$(APPS): %: $(BLDDIR)/apps/%

$(BLDDIR)/nsqd: $(NSQD_SRCS)
$(BLDDIR)/apps/nsqd: $(NSQD_SRCS)
$(BLDDIR)/apps/nsqlookupd: $(NSQLOOKUPD_SRCS)
$(BLDDIR)/nsqadmin: $(NSQADMIN_SRCS)
$(BLDDIR)/apps/nsq_pubsub: $(NSQ_PUBSUB_SRCS)
Expand All @@ -45,8 +45,8 @@ clean:

install: $(BINARIES) $(EXAMPLES)
install -m 755 -d ${DESTDIR}${BINDIR}
install -m 755 $(BLDDIR)/nsqd ${DESTDIR}${BINDIR}/nsqd
install -m 755 $(BLDDIR)/apps/nsqlookupd ${DESTDIR}${BINDIR}/nsqlookupd
install -m 755 $(BLDDIR)/apps/nsqd ${DESTDIR}${BINDIR}/nsqd
install -m 755 $(BLDDIR)/nsqadmin ${DESTDIR}${BINDIR}/nsqadmin
install -m 755 $(BLDDIR)/apps/nsq_pubsub ${DESTDIR}${BINDIR}/nsq_pubsub
install -m 755 $(BLDDIR)/apps/nsq_to_nsq ${DESTDIR}${BINDIR}/nsq_to_nsq
Expand Down
5 changes: 3 additions & 2 deletions nsqd/main.go → apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/bitly/nsq/nsqd"
"github.com/bitly/nsq/util"
"github.com/mreiferson/go-options"
)
Expand Down Expand Up @@ -100,9 +101,9 @@ func main() {
}
}

opts := NewNSQDOptions()
opts := nsqd.NewNSQDOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := NewNSQD(opts)
nsqd := nsqd.NewNSQD(opts)

log.Println(util.Version("nsqd"))
log.Printf("worker id %d", opts.ID)
Expand Down
20 changes: 10 additions & 10 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package nsqd

import (
"bytes"
Expand Down Expand Up @@ -46,7 +46,7 @@ type Channel struct {

topicName string
name string
context *Context
context *context

backend BackendQueue

Expand Down Expand Up @@ -86,7 +86,7 @@ type inFlightMessage struct {
}

// NewChannel creates a new instance of the Channel type and returns a pointer
func NewChannel(topicName string, channelName string, context *Context,
func NewChannel(topicName string, channelName string, context *context,
deleteCallback func(*Channel)) *Channel {

c := &Channel{
Expand All @@ -111,11 +111,11 @@ func NewChannel(topicName string, channelName string, context *Context,

if strings.HasSuffix(channelName, "#ephemeral") {
c.ephemeralChannel = true
c.backend = NewDummyBackendQueue()
c.backend = newDummyBackendQueue()
} else {
// backend names, for uniqueness, automatically include the topic... <topic>:<channel>
backendName := topicName + ":" + channelName
c.backend = NewDiskQueue(backendName,
c.backend = newDiskQueue(backendName,
context.nsqd.options.DataPath,
context.nsqd.options.MaxBytesPerFile,
context.nsqd.options.SyncEvery,
Expand Down Expand Up @@ -243,7 +243,7 @@ func (c *Channel) flush() error {
// this will read until its closed (exited)
for msg := range c.clientMsgChan {
log.Printf("CHANNEL(%s): recovered buffered message from clientMsgChan", c.name)
WriteMessageToBackend(&msgBuf, msg, c.backend)
writeMessageToBackend(&msgBuf, msg, c.backend)
}

if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
Expand All @@ -254,7 +254,7 @@ func (c *Channel) flush() error {
for {
select {
case msg := <-c.memoryMsgChan:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
Expand All @@ -266,15 +266,15 @@ func (c *Channel) flush() error {
finish:
for _, item := range c.inFlightMessages {
msg := item.Value.(*inFlightMessage).msg
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
}

for _, item := range c.deferredMessages {
msg := item.Value.(*nsq.Message)
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("ERROR: failed to write message to backend - %s", err.Error())
}
Expand Down Expand Up @@ -560,7 +560,7 @@ func (c *Channel) router() {
select {
case c.memoryMsgChan <- msg:
default:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
err := writeMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
log.Printf("CHANNEL(%s) ERROR: failed to write message to backend - %s", c.name, err.Error())
// theres not really much we can do at this point, you're certainly
Expand Down
4 changes: 2 additions & 2 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package nsqd

import (
"io/ioutil"
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestChannelEmptyConsumer(t *testing.T) {
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := NewClientV2(0, conn, &Context{nsqd})
client := newClientV2(0, conn, &context{nsqd})
client.SetReadyCount(25)
channel.AddClient(client.ID, client)

Expand Down
Loading