From 49900961de72f1ed80b355e70b54d3a6e73109c0 Mon Sep 17 00:00:00 2001 From: Kiril Zvezdarov Date: Tue, 11 Mar 2014 20:35:38 -0500 Subject: [PATCH] Made nsqd into an importable package with the daemon program moved to apps/nsdq --- .gitignore | 1 + Makefile | 10 ++--- nsqd/main.go => apps/nsqd/nsqd.go | 5 ++- nsqd/channel.go | 20 ++++----- nsqd/channel_test.go | 4 +- nsqd/client_v2.go | 68 +++++++++++++++---------------- nsqd/context.go | 4 +- nsqd/diskqueue.go | 52 +++++++++++------------ nsqd/diskqueue_test.go | 46 ++++++++++----------- nsqd/guid.go | 12 +++--- nsqd/guid_test.go | 4 +- nsqd/http.go | 6 +-- nsqd/http_test.go | 2 +- nsqd/lookup.go | 6 +-- nsqd/lookup_peer.go | 34 ++++++++-------- nsqd/nsqd.go | 10 ++--- nsqd/nsqd_test.go | 4 +- nsqd/options.go | 3 +- nsqd/protocol_v2.go | 46 ++++++++++----------- nsqd/protocol_v2_test.go | 34 ++++++++-------- nsqd/queue.go | 22 +++++----- nsqd/stats.go | 4 +- nsqd/stats_test.go | 6 +-- nsqd/statsd.go | 4 +- nsqd/tcp.go | 6 +-- nsqd/topic.go | 12 +++--- nsqd/topic_test.go | 2 +- test.sh | 4 +- 28 files changed, 217 insertions(+), 214 deletions(-) rename nsqd/main.go => apps/nsqd/nsqd.go (98%) diff --git a/.gitignore b/.gitignore index 829246e65..15823fd74 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ build nsqd/nsqd apps/nsqlookupd/nsqlookupd +apps/nsqd/nsqd nsqreader/nsqreader nsqstatsd/nsqstatsd nsqadmin/nsqadmin diff --git a/Makefile b/Makefile index 540105457..939dde7ec 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ DESTDIR= GOFLAGS= BINDIR=${PREFIX}/bin -NSQD_SRCS = $(wildcard nsqd/*.go nsq/*.go util/*.go util/pqueue/*.go) +NSQD_SRCS = $(wildcard apps/nsqd/*.go nsqd/*.go nsq/*.go util/*.go util/pqueue/*.go) NSQLOOKUPD_SRCS = $(wildcard apps/nsqlookupd/*.go nsqlookupd/*.go nsq/*.go util/*.go) NSQADMIN_SRCS = $(wildcard nsqadmin/*.go nsqadmin/templates/*.go util/*.go) NSQ_PUBSUB_SRCS = $(wildcard apps/nsq_pubsub/*.go nsq/*.go util/*.go) @@ -13,8 +13,8 @@ NSQ_TO_HTTP_SRCS = $(wildcard apps/nsq_to_http/*.go nsq/*.go util/*.go) NSQ_TAIL_SRCS = $(wildcard apps/nsq_tail/*.go nsq/*.go util/*.go) NSQ_STAT_SRCS = $(wildcard apps/nsq_stat/*.go util/*.go util/lookupd/*.go) -BINARIES = nsqd nsqadmin -APPS = nsqlookupd nsq_pubsub nsq_to_nsq nsq_to_file nsq_to_http nsq_tail nsq_stat +BINARIES = nsqadmin +APPS = nsqlookupd nsqd nsq_pubsub nsq_to_nsq nsq_to_file nsq_to_http nsq_tail nsq_stat BLDDIR = build all: $(BINARIES) $(APPS) @@ -26,7 +26,7 @@ $(BLDDIR)/%: $(BINARIES): %: $(BLDDIR)/% $(APPS): %: $(BLDDIR)/apps/% -$(BLDDIR)/nsqd: $(NSQD_SRCS) +$(BLDDIR)/apps/nsqd: $(NSQD_SRCS) $(BLDDIR)/apps/nsqlookupd: $(NSQLOOKUPD_SRCS) $(BLDDIR)/nsqadmin: $(NSQADMIN_SRCS) $(BLDDIR)/apps/nsq_pubsub: $(NSQ_PUBSUB_SRCS) @@ -45,8 +45,8 @@ clean: install: $(BINARIES) $(EXAMPLES) install -m 755 -d ${DESTDIR}${BINDIR} - install -m 755 $(BLDDIR)/nsqd ${DESTDIR}${BINDIR}/nsqd install -m 755 $(BLDDIR)/apps/nsqlookupd ${DESTDIR}${BINDIR}/nsqlookupd + install -m 755 $(BLDDIR)/apps/nsqd ${DESTDIR}${BINDIR}/nsqd install -m 755 $(BLDDIR)/nsqadmin ${DESTDIR}${BINDIR}/nsqadmin install -m 755 $(BLDDIR)/apps/nsq_pubsub ${DESTDIR}${BINDIR}/nsq_pubsub install -m 755 $(BLDDIR)/apps/nsq_to_nsq ${DESTDIR}${BINDIR}/nsq_to_nsq diff --git a/nsqd/main.go b/apps/nsqd/nsqd.go similarity index 98% rename from nsqd/main.go rename to apps/nsqd/nsqd.go index e972084d9..a55a5ba8a 100644 --- a/nsqd/main.go +++ b/apps/nsqd/nsqd.go @@ -11,6 +11,7 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/bitly/nsq/nsqd" "github.com/bitly/nsq/util" "github.com/mreiferson/go-options" ) @@ -100,9 +101,9 @@ func main() { } } - opts := NewNSQDOptions() + opts := nsqd.NewNSQDOptions() options.Resolve(opts, flagSet, cfg) - nsqd := NewNSQD(opts) + nsqd := nsqd.NewNSQD(opts) log.Println(util.Version("nsqd")) log.Printf("worker id %d", opts.ID) diff --git a/nsqd/channel.go b/nsqd/channel.go index 837593e2c..4e31c0b5b 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bytes" @@ -46,7 +46,7 @@ type Channel struct { topicName string name string - context *Context + context *context backend BackendQueue @@ -86,7 +86,7 @@ type inFlightMessage struct { } // NewChannel creates a new instance of the Channel type and returns a pointer -func NewChannel(topicName string, channelName string, context *Context, +func NewChannel(topicName string, channelName string, context *context, deleteCallback func(*Channel)) *Channel { c := &Channel{ @@ -111,11 +111,11 @@ func NewChannel(topicName string, channelName string, context *Context, if strings.HasSuffix(channelName, "#ephemeral") { c.ephemeralChannel = true - c.backend = NewDummyBackendQueue() + c.backend = newDummyBackendQueue() } else { // backend names, for uniqueness, automatically include the topic... : backendName := topicName + ":" + channelName - c.backend = NewDiskQueue(backendName, + c.backend = newDiskQueue(backendName, context.nsqd.options.DataPath, context.nsqd.options.MaxBytesPerFile, context.nsqd.options.SyncEvery, @@ -243,7 +243,7 @@ func (c *Channel) flush() error { // this will read until its closed (exited) for msg := range c.clientMsgChan { log.Printf("CHANNEL(%s): recovered buffered message from clientMsgChan", c.name) - WriteMessageToBackend(&msgBuf, msg, c.backend) + writeMessageToBackend(&msgBuf, msg, c.backend) } if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 { @@ -254,7 +254,7 @@ func (c *Channel) flush() error { for { select { case msg := <-c.memoryMsgChan: - err := WriteMessageToBackend(&msgBuf, msg, c.backend) + err := writeMessageToBackend(&msgBuf, msg, c.backend) if err != nil { log.Printf("ERROR: failed to write message to backend - %s", err.Error()) } @@ -266,7 +266,7 @@ func (c *Channel) flush() error { finish: for _, item := range c.inFlightMessages { msg := item.Value.(*inFlightMessage).msg - err := WriteMessageToBackend(&msgBuf, msg, c.backend) + err := writeMessageToBackend(&msgBuf, msg, c.backend) if err != nil { log.Printf("ERROR: failed to write message to backend - %s", err.Error()) } @@ -274,7 +274,7 @@ finish: for _, item := range c.deferredMessages { msg := item.Value.(*nsq.Message) - err := WriteMessageToBackend(&msgBuf, msg, c.backend) + err := writeMessageToBackend(&msgBuf, msg, c.backend) if err != nil { log.Printf("ERROR: failed to write message to backend - %s", err.Error()) } @@ -560,7 +560,7 @@ func (c *Channel) router() { select { case c.memoryMsgChan <- msg: default: - err := WriteMessageToBackend(&msgBuf, msg, c.backend) + err := writeMessageToBackend(&msgBuf, msg, c.backend) if err != nil { log.Printf("CHANNEL(%s) ERROR: failed to write message to backend - %s", c.name, err.Error()) // theres not really much we can do at this point, you're certainly diff --git a/nsqd/channel_test.go b/nsqd/channel_test.go index 9bf5305ee..15cf169b3 100644 --- a/nsqd/channel_test.go +++ b/nsqd/channel_test.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "io/ioutil" @@ -150,7 +150,7 @@ func TestChannelEmptyConsumer(t *testing.T) { topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("channel") - client := NewClientV2(0, conn, &Context{nsqd}) + client := newClientV2(0, conn, &context{nsqd}) client.SetReadyCount(25) channel.AddClient(client.ID, client) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index fc4eb99d4..522266fbb 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bufio" @@ -18,7 +18,7 @@ import ( const DefaultBufferSize = 16 * 1024 -type IdentifyDataV2 struct { +type identifyDataV2 struct { ShortId string `json:"short_id"` // TODO: deprecated, remove in 1.0 LongId string `json:"long_id"` // TODO: deprecated, remove in 1.0 @@ -37,14 +37,14 @@ type IdentifyDataV2 struct { MsgTimeout int `json:"msg_timeout"` } -type IdentifyEvent struct { +type identifyEvent struct { OutputBufferTimeout time.Duration HeartbeatInterval time.Duration SampleRate int32 MsgTimeout time.Duration } -type ClientV2 struct { +type clientV2 struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms ReadyCount int64 LastReadyCount int64 @@ -56,7 +56,7 @@ type ClientV2 struct { sync.RWMutex ID int64 - context *Context + context *context UserAgent string // original connection @@ -88,7 +88,7 @@ type ClientV2 struct { SampleRate int32 - IdentifyEventChan chan IdentifyEvent + IdentifyEventChan chan identifyEvent SubEventChan chan *Channel TLS int32 @@ -100,13 +100,13 @@ type ClientV2 struct { lenSlice []byte } -func NewClientV2(id int64, conn net.Conn, context *Context) *ClientV2 { +func newClientV2(id int64, conn net.Conn, context *context) *clientV2 { var identifier string if conn != nil { identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String()) } - c := &ClientV2{ + c := &clientV2{ ID: id, context: context, @@ -131,7 +131,7 @@ func NewClientV2(id int64, conn net.Conn, context *Context) *ClientV2 { Hostname: identifier, SubEventChan: make(chan *Channel, 1), - IdentifyEventChan: make(chan IdentifyEvent, 1), + IdentifyEventChan: make(chan identifyEvent, 1), // heartbeats are client configurable but default to 30s HeartbeatInterval: context.nsqd.options.ClientTimeout / 2, @@ -140,11 +140,11 @@ func NewClientV2(id int64, conn net.Conn, context *Context) *ClientV2 { return c } -func (c *ClientV2) String() string { +func (c *clientV2) String() string { return c.RemoteAddr().String() } -func (c *ClientV2) Identify(data IdentifyDataV2) error { +func (c *clientV2) Identify(data identifyDataV2) error { // TODO: for backwards compatibility, remove in 1.0 hostname := data.Hostname if hostname == "" { @@ -187,7 +187,7 @@ func (c *ClientV2) Identify(data IdentifyDataV2) error { return err } - ie := IdentifyEvent{ + ie := identifyEvent{ OutputBufferTimeout: c.OutputBufferTimeout, HeartbeatInterval: c.HeartbeatInterval, SampleRate: c.SampleRate, @@ -203,7 +203,7 @@ func (c *ClientV2) Identify(data IdentifyDataV2) error { return nil } -func (c *ClientV2) Stats() ClientStats { +func (c *clientV2) Stats() ClientStats { c.RLock() // TODO: deprecated, remove in 1.0 name := c.ClientID @@ -235,7 +235,7 @@ func (c *ClientV2) Stats() ClientStats { } } -func (c *ClientV2) IsReadyForMessages() bool { +func (c *clientV2) IsReadyForMessages() bool { if c.Channel.IsPaused() { return false } @@ -244,7 +244,7 @@ func (c *ClientV2) IsReadyForMessages() bool { lastReadyCount := atomic.LoadInt64(&c.LastReadyCount) inFlightCount := atomic.LoadInt64(&c.InFlightCount) - if *verbose { + if c.context.nsqd.options.Verbose { log.Printf("[%s] state rdy: %4d lastrdy: %4d inflt: %4d", c, readyCount, lastReadyCount, inFlightCount) } @@ -256,13 +256,13 @@ func (c *ClientV2) IsReadyForMessages() bool { return true } -func (c *ClientV2) SetReadyCount(count int64) { +func (c *clientV2) SetReadyCount(count int64) { atomic.StoreInt64(&c.ReadyCount, count) atomic.StoreInt64(&c.LastReadyCount, count) c.tryUpdateReadyState() } -func (c *ClientV2) tryUpdateReadyState() { +func (c *clientV2) tryUpdateReadyState() { // you can always *try* to write to ReadyStateChan because in the cases // where you cannot the message pump loop would have iterated anyway. // the atomic integer operations guarantee correctness of the value. @@ -272,50 +272,50 @@ func (c *ClientV2) tryUpdateReadyState() { } } -func (c *ClientV2) FinishedMessage() { +func (c *clientV2) FinishedMessage() { atomic.AddUint64(&c.FinishCount, 1) atomic.AddInt64(&c.InFlightCount, -1) c.tryUpdateReadyState() } -func (c *ClientV2) Empty() { +func (c *clientV2) Empty() { atomic.StoreInt64(&c.InFlightCount, 0) c.tryUpdateReadyState() } -func (c *ClientV2) SendingMessage() { +func (c *clientV2) SendingMessage() { atomic.AddInt64(&c.ReadyCount, -1) atomic.AddInt64(&c.InFlightCount, 1) atomic.AddUint64(&c.MessageCount, 1) } -func (c *ClientV2) TimedOutMessage() { +func (c *clientV2) TimedOutMessage() { atomic.AddInt64(&c.InFlightCount, -1) c.tryUpdateReadyState() } -func (c *ClientV2) RequeuedMessage() { +func (c *clientV2) RequeuedMessage() { atomic.AddUint64(&c.RequeueCount, 1) atomic.AddInt64(&c.InFlightCount, -1) c.tryUpdateReadyState() } -func (c *ClientV2) StartClose() { +func (c *clientV2) StartClose() { // Force the client into ready 0 c.SetReadyCount(0) // mark this client as closing atomic.StoreInt32(&c.State, nsq.StateClosing) } -func (c *ClientV2) Pause() { +func (c *clientV2) Pause() { c.tryUpdateReadyState() } -func (c *ClientV2) UnPause() { +func (c *clientV2) UnPause() { c.tryUpdateReadyState() } -func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error { +func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error { c.Lock() defer c.Unlock() @@ -334,7 +334,7 @@ func (c *ClientV2) SetHeartbeatInterval(desiredInterval int) error { return nil } -func (c *ClientV2) SetOutputBufferSize(desiredSize int) error { +func (c *clientV2) SetOutputBufferSize(desiredSize int) error { var size int switch { @@ -363,7 +363,7 @@ func (c *ClientV2) SetOutputBufferSize(desiredSize int) error { return nil } -func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error { +func (c *clientV2) SetOutputBufferTimeout(desiredTimeout int) error { c.Lock() defer c.Unlock() @@ -382,7 +382,7 @@ func (c *ClientV2) SetOutputBufferTimeout(desiredTimeout int) error { return nil } -func (c *ClientV2) SetSampleRate(sampleRate int32) error { +func (c *clientV2) SetSampleRate(sampleRate int32) error { if sampleRate < 0 || sampleRate > 99 { return errors.New(fmt.Sprintf("sample rate (%d) is invalid", sampleRate)) } @@ -390,7 +390,7 @@ func (c *ClientV2) SetSampleRate(sampleRate int32) error { return nil } -func (c *ClientV2) SetMsgTimeout(msgTimeout int) error { +func (c *clientV2) SetMsgTimeout(msgTimeout int) error { c.Lock() defer c.Unlock() @@ -407,7 +407,7 @@ func (c *ClientV2) SetMsgTimeout(msgTimeout int) error { return nil } -func (c *ClientV2) UpgradeTLS() error { +func (c *clientV2) UpgradeTLS() error { c.Lock() defer c.Unlock() @@ -426,7 +426,7 @@ func (c *ClientV2) UpgradeTLS() error { return nil } -func (c *ClientV2) UpgradeDeflate(level int) error { +func (c *clientV2) UpgradeDeflate(level int) error { c.Lock() defer c.Unlock() @@ -446,7 +446,7 @@ func (c *ClientV2) UpgradeDeflate(level int) error { return nil } -func (c *ClientV2) UpgradeSnappy() error { +func (c *clientV2) UpgradeSnappy() error { c.Lock() defer c.Unlock() @@ -463,7 +463,7 @@ func (c *ClientV2) UpgradeSnappy() error { return nil } -func (c *ClientV2) Flush() error { +func (c *clientV2) Flush() error { c.SetWriteDeadline(time.Now().Add(time.Second)) err := c.Writer.Flush() diff --git a/nsqd/context.go b/nsqd/context.go index be17b7fa1..d8938494f 100644 --- a/nsqd/context.go +++ b/nsqd/context.go @@ -1,5 +1,5 @@ -package main +package nsqd -type Context struct { +type context struct { nsqd *NSQD } diff --git a/nsqd/diskqueue.go b/nsqd/diskqueue.go index 63348781d..8e48d8870 100644 --- a/nsqd/diskqueue.go +++ b/nsqd/diskqueue.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bufio" @@ -15,9 +15,9 @@ import ( "time" ) -// DiskQueue implements the BackendQueue interface +// diskQueue implements the BackendQueue interface // providing a filesystem backed FIFO queue -type DiskQueue struct { +type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) @@ -60,10 +60,10 @@ type DiskQueue struct { exitSyncChan chan int } -// NewDiskQueue instantiates a new instance of DiskQueue, retrieving metadata +// newDiskQueue instantiates a new instance of diskQueue, retrieving metadata // from the filesystem and starting the read ahead goroutine -func NewDiskQueue(name string, dataPath string, maxBytesPerFile int64, syncEvery int64, syncTimeout time.Duration) BackendQueue { - d := DiskQueue{ +func newDiskQueue(name string, dataPath string, maxBytesPerFile int64, syncEvery int64, syncTimeout time.Duration) BackendQueue { + d := diskQueue{ name: name, dataPath: dataPath, maxBytesPerFile: maxBytesPerFile, @@ -90,17 +90,17 @@ func NewDiskQueue(name string, dataPath string, maxBytesPerFile int64, syncEvery } // Depth returns the depth of the queue -func (d *DiskQueue) Depth() int64 { +func (d *diskQueue) Depth() int64 { return atomic.LoadInt64(&d.depth) } // ReadChan returns the []byte channel for reading data -func (d *DiskQueue) ReadChan() chan []byte { +func (d *diskQueue) ReadChan() chan []byte { return d.readChan } // Put writes a []byte to the queue -func (d *DiskQueue) Put(data []byte) error { +func (d *diskQueue) Put(data []byte) error { d.RLock() defer d.RUnlock() @@ -113,7 +113,7 @@ func (d *DiskQueue) Put(data []byte) error { } // Close cleans up the queue and persists metadata -func (d *DiskQueue) Close() error { +func (d *diskQueue) Close() error { err := d.exit(false) if err != nil { return err @@ -121,11 +121,11 @@ func (d *DiskQueue) Close() error { return d.sync() } -func (d *DiskQueue) Delete() error { +func (d *diskQueue) Delete() error { return d.exit(true) } -func (d *DiskQueue) exit(deleted bool) error { +func (d *diskQueue) exit(deleted bool) error { d.Lock() defer d.Unlock() @@ -156,7 +156,7 @@ func (d *DiskQueue) exit(deleted bool) error { // Empty destructively clears out any pending data in the queue // by fast forwarding read positions and removing intermediate files -func (d *DiskQueue) Empty() error { +func (d *diskQueue) Empty() error { d.RLock() defer d.RUnlock() @@ -170,7 +170,7 @@ func (d *DiskQueue) Empty() error { return <-d.emptyResponseChan } -func (d *DiskQueue) deleteAllFiles() error { +func (d *diskQueue) deleteAllFiles() error { err := d.skipToNextRWFile() innerErr := os.Remove(d.metaDataFileName()) @@ -182,7 +182,7 @@ func (d *DiskQueue) deleteAllFiles() error { return err } -func (d *DiskQueue) skipToNextRWFile() error { +func (d *diskQueue) skipToNextRWFile() error { var err error if d.readFile != nil { @@ -217,7 +217,7 @@ func (d *DiskQueue) skipToNextRWFile() error { // readOne performs a low level filesystem read for a single []byte // while advancing read positions and rolling files, if necessary -func (d *DiskQueue) readOne() ([]byte, error) { +func (d *diskQueue) readOne() ([]byte, error) { var err error var msgSize int32 @@ -282,7 +282,7 @@ func (d *DiskQueue) readOne() ([]byte, error) { // writeOne performs a low level filesystem write for a single []byte // while advancing write positions and rolling files, if necessary -func (d *DiskQueue) writeOne(data []byte) error { +func (d *diskQueue) writeOne(data []byte) error { var err error if d.writeFile == nil { @@ -349,7 +349,7 @@ func (d *DiskQueue) writeOne(data []byte) error { } // sync fsyncs the current writeFile and persists metadata -func (d *DiskQueue) sync() error { +func (d *diskQueue) sync() error { if d.writeFile != nil { err := d.writeFile.Sync() if err != nil { @@ -369,7 +369,7 @@ func (d *DiskQueue) sync() error { } // retrieveMetaData initializes state from the filesystem -func (d *DiskQueue) retrieveMetaData() error { +func (d *diskQueue) retrieveMetaData() error { var f *os.File var err error @@ -396,7 +396,7 @@ func (d *DiskQueue) retrieveMetaData() error { } // persistMetaData atomically writes state to the filesystem -func (d *DiskQueue) persistMetaData() error { +func (d *diskQueue) persistMetaData() error { var f *os.File var err error @@ -424,15 +424,15 @@ func (d *DiskQueue) persistMetaData() error { return os.Rename(tmpFileName, fileName) } -func (d *DiskQueue) metaDataFileName() string { +func (d *diskQueue) metaDataFileName() string { return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.meta.dat"), d.name) } -func (d *DiskQueue) fileName(fileNum int64) string { +func (d *diskQueue) fileName(fileNum int64) string { return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum) } -func (d *DiskQueue) checkTailCorruption(depth int64) { +func (d *diskQueue) checkTailCorruption(depth int64) { if d.readFileNum < d.writeFileNum || d.readPos < d.writePos { return } @@ -464,7 +464,7 @@ func (d *DiskQueue) checkTailCorruption(depth int64) { } } -func (d *DiskQueue) moveForward() { +func (d *diskQueue) moveForward() { oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos @@ -485,7 +485,7 @@ func (d *DiskQueue) moveForward() { d.checkTailCorruption(depth) } -func (d *DiskQueue) handleReadError() { +func (d *diskQueue) handleReadError() { // jump to the next read file and rename the current (bad) file if d.readFileNum == d.writeFileNum { // if you can't properly read from the current write file it's safe to @@ -525,7 +525,7 @@ func (d *DiskQueue) handleReadError() { // go channels // // conveniently this also means that we're asynchronously reading from the filesystem -func (d *DiskQueue) ioLoop() { +func (d *diskQueue) ioLoop() { var dataRead []byte var err error var count int64 diff --git a/nsqd/diskqueue_test.go b/nsqd/diskqueue_test.go index 24d1842d7..cfac03255 100644 --- a/nsqd/diskqueue_test.go +++ b/nsqd/diskqueue_test.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "io/ioutil" @@ -18,7 +18,7 @@ func TestDiskQueue(t *testing.T) { defer log.SetOutput(os.Stdout) dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix())) - dq := NewDiskQueue(dqName, os.TempDir(), 1024, 2500, 2*time.Second) + dq := newDiskQueue(dqName, os.TempDir(), 1024, 2500, 2*time.Second) assert.NotEqual(t, dq, nil) assert.Equal(t, dq.Depth(), int64(0)) @@ -36,7 +36,7 @@ func TestDiskQueueRoll(t *testing.T) { defer log.SetOutput(os.Stdout) dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix())) - dq := NewDiskQueue(dqName, os.TempDir(), 100, 2500, 2*time.Second) + dq := newDiskQueue(dqName, os.TempDir(), 100, 2500, 2*time.Second) assert.NotEqual(t, dq, nil) assert.Equal(t, dq.Depth(), int64(0)) @@ -47,8 +47,8 @@ func TestDiskQueueRoll(t *testing.T) { assert.Equal(t, dq.Depth(), int64(i+1)) } - assert.Equal(t, dq.(*DiskQueue).writeFileNum, int64(1)) - assert.Equal(t, dq.(*DiskQueue).writePos, int64(28)) + assert.Equal(t, dq.(*diskQueue).writeFileNum, int64(1)) + assert.Equal(t, dq.(*diskQueue).writePos, int64(28)) } func assertFileNotExist(t *testing.T, fn string) { @@ -62,7 +62,7 @@ func TestDiskQueueEmpty(t *testing.T) { defer log.SetOutput(os.Stdout) dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix())) - dq := NewDiskQueue(dqName, os.TempDir(), 100, 2500, 2*time.Second) + dq := newDiskQueue(dqName, os.TempDir(), 100, 2500, 2*time.Second) assert.NotEqual(t, dq, nil) assert.Equal(t, dq.Depth(), int64(0)) @@ -86,18 +86,18 @@ func TestDiskQueueEmpty(t *testing.T) { } assert.Equal(t, dq.Depth(), int64(97)) - numFiles := dq.(*DiskQueue).writeFileNum + numFiles := dq.(*diskQueue).writeFileNum dq.Empty() - assertFileNotExist(t, dq.(*DiskQueue).metaDataFileName()) + assertFileNotExist(t, dq.(*diskQueue).metaDataFileName()) for i := int64(0); i <= numFiles; i++ { - assertFileNotExist(t, dq.(*DiskQueue).fileName(i)) + assertFileNotExist(t, dq.(*diskQueue).fileName(i)) } assert.Equal(t, dq.Depth(), int64(0)) - assert.Equal(t, dq.(*DiskQueue).readFileNum, dq.(*DiskQueue).writeFileNum) - assert.Equal(t, dq.(*DiskQueue).readPos, dq.(*DiskQueue).writePos) - assert.Equal(t, dq.(*DiskQueue).nextReadPos, dq.(*DiskQueue).readPos) - assert.Equal(t, dq.(*DiskQueue).nextReadFileNum, dq.(*DiskQueue).readFileNum) + assert.Equal(t, dq.(*diskQueue).readFileNum, dq.(*diskQueue).writeFileNum) + assert.Equal(t, dq.(*diskQueue).readPos, dq.(*diskQueue).writePos) + assert.Equal(t, dq.(*diskQueue).nextReadPos, dq.(*diskQueue).readPos) + assert.Equal(t, dq.(*diskQueue).nextReadFileNum, dq.(*diskQueue).readFileNum) for i := 0; i < 100; i++ { err := dq.Put(msg) @@ -117,9 +117,9 @@ func TestDiskQueueEmpty(t *testing.T) { } assert.Equal(t, dq.Depth(), int64(0)) - assert.Equal(t, dq.(*DiskQueue).readFileNum, dq.(*DiskQueue).writeFileNum) - assert.Equal(t, dq.(*DiskQueue).readPos, dq.(*DiskQueue).writePos) - assert.Equal(t, dq.(*DiskQueue).nextReadPos, dq.(*DiskQueue).readPos) + assert.Equal(t, dq.(*diskQueue).readFileNum, dq.(*diskQueue).writeFileNum) + assert.Equal(t, dq.(*diskQueue).readPos, dq.(*diskQueue).writePos) + assert.Equal(t, dq.(*diskQueue).nextReadPos, dq.(*diskQueue).readPos) } func TestDiskQueueCorruption(t *testing.T) { @@ -127,7 +127,7 @@ func TestDiskQueueCorruption(t *testing.T) { defer log.SetOutput(os.Stdout) dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix())) - dq := NewDiskQueue(dqName, os.TempDir(), 1000, 5, 2*time.Second) + dq := newDiskQueue(dqName, os.TempDir(), 1000, 5, 2*time.Second) msg := make([]byte, 123) for i := 0; i < 25; i++ { @@ -137,7 +137,7 @@ func TestDiskQueueCorruption(t *testing.T) { assert.Equal(t, dq.Depth(), int64(25)) // corrupt the 2nd file - dqFn := dq.(*DiskQueue).fileName(1) + dqFn := dq.(*diskQueue).fileName(1) os.Truncate(dqFn, 500) for i := 0; i < 19; i++ { @@ -145,7 +145,7 @@ func TestDiskQueueCorruption(t *testing.T) { } // corrupt the 4th (current) file - dqFn = dq.(*DiskQueue).fileName(3) + dqFn = dq.(*diskQueue).fileName(3) os.Truncate(dqFn, 100) dq.Put(msg) @@ -160,7 +160,7 @@ func TestDiskQueueTorture(t *testing.T) { var wg sync.WaitGroup dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix())) - dq := NewDiskQueue(dqName, os.TempDir(), 262144, 2500, 2*time.Second) + dq := newDiskQueue(dqName, os.TempDir(), 262144, 2500, 2*time.Second) assert.NotEqual(t, dq, nil) assert.Equal(t, dq.Depth(), int64(0)) @@ -200,7 +200,7 @@ func TestDiskQueueTorture(t *testing.T) { wg.Wait() log.Printf("restarting diskqueue") - dq = NewDiskQueue(dqName, os.TempDir(), 262144, 2500, 2*time.Second) + dq = newDiskQueue(dqName, os.TempDir(), 262144, 2500, 2*time.Second) assert.NotEqual(t, dq, nil) assert.Equal(t, dq.Depth(), depth) @@ -244,7 +244,7 @@ func BenchmarkDiskQueuePut(b *testing.B) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - dq := NewDiskQueue(dqName, os.TempDir(), 1024, 2500, 2*time.Second) + dq := newDiskQueue(dqName, os.TempDir(), 1024, 2500, 2*time.Second) b.StartTimer() for i := 0; i < b.N; i++ { @@ -260,7 +260,7 @@ func BenchmarkDiskQueueGet(b *testing.B) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix())) - dq := NewDiskQueue(dqName, os.TempDir(), 1024768, 2500, 2*time.Second) + dq := newDiskQueue(dqName, os.TempDir(), 1024768, 2500, 2*time.Second) for i := 0; i < b.N; i++ { dq.Put([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa")) } diff --git a/nsqd/guid.go b/nsqd/guid.go index 8a1a1422f..fac528110 100644 --- a/nsqd/guid.go +++ b/nsqd/guid.go @@ -1,4 +1,4 @@ -package main +package nsqd // the core algorithm here was borrowed from: // Blake Mizerany's `noeqd` https://github.com/bmizerany/noeqd @@ -31,14 +31,14 @@ const ( var ErrTimeBackwards = errors.New("time has gone backwards") var ErrSequenceExpired = errors.New("sequence expired") -type GUID int64 +type guid int64 -type GUIDFactory struct { +type guidFactory struct { sequence int64 lastTimestamp int64 } -func (f *GUIDFactory) NewGUID(workerId int64) (GUID, error) { +func (f *guidFactory) NewGUID(workerId int64) (guid, error) { ts := time.Now().UnixNano() / 1e6 if ts < f.lastTimestamp { @@ -60,10 +60,10 @@ func (f *GUIDFactory) NewGUID(workerId int64) (GUID, error) { (workerId << workerIdShift) | f.sequence - return GUID(id), nil + return guid(id), nil } -func (g GUID) Hex() nsq.MessageID { +func (g guid) Hex() nsq.MessageID { var h nsq.MessageID var b [8]byte diff --git a/nsqd/guid_test.go b/nsqd/guid_test.go index acf82c2bf..899005254 100644 --- a/nsqd/guid_test.go +++ b/nsqd/guid_test.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "testing" @@ -25,7 +25,7 @@ func BenchmarkGUIDUnsafe(b *testing.B) { } func BenchmarkGUID(b *testing.B) { - factory := &GUIDFactory{} + factory := &guidFactory{} for i := 0; i < b.N; i++ { guid, err := factory.NewGUID(0) if err != nil { diff --git a/nsqd/http.go b/nsqd/http.go index d4dccc6dd..a7861a27b 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bufio" @@ -20,7 +20,7 @@ import ( import httpprof "net/http/pprof" type httpServer struct { - context *Context + context *context } func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -500,7 +500,7 @@ func (s *httpServer) statsHandler(w http.ResponseWriter, req *http.Request) { io.WriteString(w, fmt.Sprintf("%s\n", util.Version("nsqd"))) } - stats := s.context.nsqd.getStats() + stats := s.context.nsqd.GetStats() if jsonFormat { util.ApiResponse(w, 200, "OK", struct { diff --git a/nsqd/http_test.go b/nsqd/http_test.go index e3337c534..425ab5461 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bytes" diff --git a/nsqd/lookup.go b/nsqd/lookup.go index f14bf8a56..5953364aa 100644 --- a/nsqd/lookup.go +++ b/nsqd/lookup.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bytes" @@ -14,7 +14,7 @@ import ( ) func (n *NSQD) lookupLoop() { - syncTopicChan := make(chan *LookupPeer) + syncTopicChan := make(chan *lookupPeer) hostname, err := os.Hostname() if err != nil { @@ -23,7 +23,7 @@ func (n *NSQD) lookupLoop() { for _, host := range n.options.NSQLookupdTCPAddresses { log.Printf("LOOKUP: adding peer %s", host) - lookupPeer := NewLookupPeer(host, func(lp *LookupPeer) { + lookupPeer := newLookupPeer(host, func(lp *lookupPeer) { ci := make(map[string]interface{}) ci["version"] = util.BINARY_VERSION ci["tcp_port"] = n.tcpAddr.Port diff --git a/nsqd/lookup_peer.go b/nsqd/lookup_peer.go index b1d6ca6d7..c6c5c9230 100644 --- a/nsqd/lookup_peer.go +++ b/nsqd/lookup_peer.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "log" @@ -8,32 +8,32 @@ import ( "github.com/bitly/go-nsq" ) -// LookupPeer is a low-level type for connecting/reading/writing to nsqlookupd +// lookupPeer is a low-level type for connecting/reading/writing to nsqlookupd // -// A LookupPeer instance is designed to connect lazily to nsqlookupd and reconnect +// A lookupPeer instance is designed to connect lazily to nsqlookupd and reconnect // gracefully (i.e. it is all handled by the library). Clients can simply use the // Command interface to perform a round-trip. -type LookupPeer struct { +type lookupPeer struct { addr string conn net.Conn state int32 - connectCallback func(*LookupPeer) - Info PeerInfo + connectCallback func(*lookupPeer) + Info peerInfo } -// PeerInfo contains metadata for a LookupPeer instance (and is JSON marshalable) -type PeerInfo struct { +// peerInfo contains metadata for a lookupPeer instance (and is JSON marshalable) +type peerInfo struct { TcpPort int `json:"tcp_port"` HttpPort int `json:"http_port"` Version string `json:"version"` BroadcastAddress string `json:"broadcast_address"` } -// NewLookupPeer creates a new LookupPeer instance connecting to the supplied address. +// newLookupPeer creates a new lookupPeer instance connecting to the supplied address. // // The supplied connectCallback will be called *every* time the instance connects. -func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer { - return &LookupPeer{ +func newLookupPeer(addr string, connectCallback func(*lookupPeer)) *lookupPeer { + return &lookupPeer{ addr: addr, state: nsq.StateDisconnected, connectCallback: connectCallback, @@ -41,7 +41,7 @@ func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer { } // Connect will Dial the specified address, with timeouts -func (lp *LookupPeer) Connect() error { +func (lp *lookupPeer) Connect() error { log.Printf("LOOKUP connecting to %s", lp.addr) conn, err := net.DialTimeout("tcp", lp.addr, time.Second) if err != nil { @@ -52,24 +52,24 @@ func (lp *LookupPeer) Connect() error { } // String returns the specified address -func (lp *LookupPeer) String() string { +func (lp *lookupPeer) String() string { return lp.addr } // Read implements the io.Reader interface, adding deadlines -func (lp *LookupPeer) Read(data []byte) (int, error) { +func (lp *lookupPeer) Read(data []byte) (int, error) { lp.conn.SetReadDeadline(time.Now().Add(time.Second)) return lp.conn.Read(data) } // Write implements the io.Writer interface, adding deadlines -func (lp *LookupPeer) Write(data []byte) (int, error) { +func (lp *lookupPeer) Write(data []byte) (int, error) { lp.conn.SetWriteDeadline(time.Now().Add(time.Second)) return lp.conn.Write(data) } // Close implements the io.Closer interface -func (lp *LookupPeer) Close() error { +func (lp *lookupPeer) Close() error { lp.state = nsq.StateDisconnected return lp.conn.Close() } @@ -80,7 +80,7 @@ func (lp *LookupPeer) Close() error { // reconnecting in the event of a failure. // // It returns the response from nsqlookupd as []byte -func (lp *LookupPeer) Command(cmd *nsq.Command) ([]byte, error) { +func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) { initialState := lp.state if lp.state != nsq.StateConnected { err := lp.Connect() diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index bd927aa73..2ff8dff7a 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "crypto/tls" @@ -32,7 +32,7 @@ type NSQD struct { topicMap map[string]*Topic - lookupPeers []*LookupPeer + lookupPeers []*lookupPeer tcpAddr *net.TCPAddr httpAddr *net.TCPAddr @@ -102,7 +102,7 @@ func NewNSQD(options *nsqdOptions) *NSQD { } func (n *NSQD) Main() { - context := &Context{n} + context := &context{n} n.waitGroup.Wrap(func() { n.lookupLoop() }) @@ -289,7 +289,7 @@ func (n *NSQD) GetTopic(topicName string) *Topic { n.Unlock() return t } else { - t = NewTopic(topicName, &Context{n}) + t = NewTopic(topicName, &context{n}) n.topicMap[topicName] = t log.Printf("TOPIC(%s): created", t.name) @@ -359,7 +359,7 @@ func (n *NSQD) DeleteExistingTopic(topicName string) error { } func (n *NSQD) idPump() { - factory := &GUIDFactory{} + factory := &guidFactory{} lastError := time.Now() for { id, err := factory.NewGUID(n.options.ID) diff --git a/nsqd/nsqd_test.go b/nsqd/nsqd_test.go index 4fe00695c..ea5c2cc4f 100644 --- a/nsqd/nsqd_test.go +++ b/nsqd/nsqd_test.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "fmt" @@ -165,7 +165,7 @@ func TestEphemeralChannel(t *testing.T) { body := []byte("an_ephemeral_message") topic := nsqd.GetTopic(topicName) ephemeralChannel := topic.GetChannel("ch1#ephemeral") - client := NewClientV2(0, nil, &Context{nsqd}) + client := newClientV2(0, nil, &context{nsqd}) ephemeralChannel.AddClient(client.ID, client) msg := nsq.NewMessage(<-nsqd.idChan, body) diff --git a/nsqd/options.go b/nsqd/options.go index 8b2e8c105..474b29dee 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "crypto/md5" @@ -12,6 +12,7 @@ import ( type nsqdOptions struct { // basic options ID int64 `flag:"worker-id" cfg:"id"` + Verbose bool `flag:"verbose"` TCPAddress string `flag:"tcp-address"` HTTPAddress string `flag:"http-address"` BroadcastAddress string `flag:"broadcast-address"` diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 5c98dffac..9cc2cda58 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bytes" @@ -24,17 +24,17 @@ var separatorBytes = []byte(" ") var heartbeatBytes = []byte("_heartbeat_") var okBytes = []byte("OK") -type ProtocolV2 struct { - context *Context +type protocolV2 struct { + context *context } -func (p *ProtocolV2) IOLoop(conn net.Conn) error { +func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.context.nsqd.clientIDSequence, 1) - client := NewClientV2(clientID, conn, p.context) + client := newClientV2(clientID, conn, p.context) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize @@ -67,7 +67,7 @@ func (p *ProtocolV2) IOLoop(conn net.Conn) error { } params := bytes.Split(line, separatorBytes) - if *verbose { + if p.context.nsqd.options.Verbose { log.Printf("PROTOCOL(V2): [%s] %s", client, params) } @@ -109,8 +109,8 @@ func (p *ProtocolV2) IOLoop(conn net.Conn) error { return err } -func (p *ProtocolV2) SendMessage(client *ClientV2, msg *nsq.Message, buf *bytes.Buffer) error { - if *verbose { +func (p *protocolV2) SendMessage(client *clientV2, msg *nsq.Message, buf *bytes.Buffer) error { + if p.context.nsqd.options.Verbose { log.Printf("PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.Id, client, msg.Body) } @@ -129,7 +129,7 @@ func (p *ProtocolV2) SendMessage(client *ClientV2, msg *nsq.Message, buf *bytes. return nil } -func (p *ProtocolV2) Send(client *ClientV2, frameType int32, data []byte) error { +func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error { client.Lock() client.SetWriteDeadline(time.Now().Add(time.Second)) @@ -148,7 +148,7 @@ func (p *ProtocolV2) Send(client *ClientV2, frameType int32, data []byte) error return err } -func (p *ProtocolV2) Exec(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) @@ -174,7 +174,7 @@ func (p *ProtocolV2) Exec(client *ClientV2, params [][]byte) ([]byte, error) { return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) } -func (p *ProtocolV2) messagePump(client *ClientV2, startedChan chan bool) { +func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var buf bytes.Buffer var clientMsgChan chan *nsq.Message @@ -301,7 +301,7 @@ exit: } } -func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) { var err error if atomic.LoadInt32(&client.State) != nsq.StateInit { @@ -325,13 +325,13 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) } // body is a json structure with producer information - var identifyData IdentifyDataV2 + var identifyData identifyDataV2 err = json.Unmarshal(body, &identifyData) if err != nil { return nil, util.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } - if *verbose { + if p.context.nsqd.options.Verbose { log.Printf("PROTOCOL(V2): [%s] %+v", client, identifyData) } @@ -434,7 +434,7 @@ func (p *ProtocolV2) IDENTIFY(client *ClientV2, params [][]byte) ([]byte, error) return nil, nil } -func (p *ProtocolV2) SUB(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) { if atomic.LoadInt32(&client.State) != nsq.StateInit { return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot SUB in current state") } @@ -471,7 +471,7 @@ func (p *ProtocolV2) SUB(client *ClientV2, params [][]byte) ([]byte, error) { return okBytes, nil } -func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) RDY(client *clientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state == nsq.StateClosing { @@ -506,7 +506,7 @@ func (p *ProtocolV2) RDY(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nil } -func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot FIN in current state") @@ -528,7 +528,7 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nil } -func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) REQ(client *clientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot REQ in current state") @@ -562,7 +562,7 @@ func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nil } -func (p *ProtocolV2) CLS(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) CLS(client *clientV2, params [][]byte) ([]byte, error) { if atomic.LoadInt32(&client.State) != nsq.StateSubscribed { return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot CLS in current state") } @@ -572,11 +572,11 @@ func (p *ProtocolV2) CLS(client *ClientV2, params [][]byte) ([]byte, error) { return []byte("CLOSE_WAIT"), nil } -func (p *ProtocolV2) NOP(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) NOP(client *clientV2, params [][]byte) ([]byte, error) { return nil, nil } -func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { var err error if len(params) < 2 { @@ -620,7 +620,7 @@ func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) { return okBytes, nil } -func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { var err error if len(params) < 2 { @@ -666,7 +666,7 @@ func (p *ProtocolV2) MPUB(client *ClientV2, params [][]byte) ([]byte, error) { return okBytes, nil } -func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) { +func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) { state := atomic.LoadInt32(&client.State) if state != nsq.StateSubscribed && state != nsq.StateClosing { return nil, util.NewFatalClientErr(nil, "E_INVALID", "cannot TOUCH in current state") diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 6338725ad..d8db2704a 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bufio" @@ -187,9 +187,9 @@ func TestClientTimeout(t *testing.T) { topicName := "test_client_timeout_v2" + strconv.Itoa(int(time.Now().Unix())) - *verbose = true options := NewNSQDOptions() options.ClientTimeout = 50 * time.Millisecond + options.Verbose = true tcpAddr, _, nsqd := mustStartNSQD(options) defer nsqd.Exit() @@ -260,9 +260,9 @@ func TestClientHeartbeatDisableSUB(t *testing.T) { topicName := "test_hb_v2" + strconv.Itoa(int(time.Now().Unix())) - *verbose = true options := NewNSQDOptions() options.ClientTimeout = 200 * time.Millisecond + options.Verbose = true tcpAddr, _, nsqd := mustStartNSQD(options) defer nsqd.Exit() @@ -420,7 +420,7 @@ func TestSizeLimits(t *testing.T) { defer log.SetOutput(os.Stdout) options := NewNSQDOptions() - *verbose = true + options.Verbose = true options.MaxMsgSize = 100 options.MaxBodySize = 1000 tcpAddr, _, nsqd := mustStartNSQD(options) @@ -532,8 +532,8 @@ func TestTouch(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.MsgTimeout = 50 * time.Millisecond tcpAddr, _, nsqd := mustStartNSQD(options) defer nsqd.Exit() @@ -578,8 +578,8 @@ func TestMaxRdyCount(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.MaxRdyCount = 50 tcpAddr, _, nsqd := mustStartNSQD(options) defer nsqd.Exit() @@ -649,8 +649,8 @@ func TestOutputBuffering(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.MaxOutputBufferSize = 512 * 1024 options.MaxOutputBufferTimeout = time.Second tcpAddr, _, nsqd := mustStartNSQD(options) @@ -694,8 +694,8 @@ func TestOutputBufferingValidity(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.MaxOutputBufferSize = 512 * 1024 options.MaxOutputBufferTimeout = time.Second tcpAddr, _, nsqd := mustStartNSQD(options) @@ -736,8 +736,8 @@ func TestTLS(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.TLSCert = "./test/cert.pem" options.TLSKey = "./test/key.pem" tcpAddr, _, nsqd := mustStartNSQD(options) @@ -775,8 +775,8 @@ func TestDeflate(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.DeflateEnabled = true tcpAddr, _, nsqd := mustStartNSQD(options) defer nsqd.Exit() @@ -811,8 +811,8 @@ func TestSnappy(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.SnappyEnabled = true tcpAddr, _, nsqd := mustStartNSQD(options) defer nsqd.Exit() @@ -864,8 +864,8 @@ func TestTLSDeflate(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.DeflateEnabled = true options.TLSCert = "./test/cert.pem" options.TLSKey = "./test/key.pem" @@ -921,8 +921,8 @@ func TestSampling(t *testing.T) { sampleRate := 42 slack := 5 - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.MaxRdyCount = int64(num) tcpAddr, _, nsqd := mustStartNSQD(options) defer nsqd.Exit() @@ -979,8 +979,8 @@ func TestTLSSnappy(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.SnappyEnabled = true options.TLSCert = "./test/cert.pem" options.TLSKey = "./test/key.pem" @@ -1030,8 +1030,8 @@ func TestClientMsgTimeout(t *testing.T) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - *verbose = true options := NewNSQDOptions() + options.Verbose = true tcpAddr, _, nsqd := mustStartNSQD(options) defer nsqd.Exit() @@ -1078,8 +1078,8 @@ func BenchmarkProtocolV2Exec(b *testing.B) { b.StopTimer() log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stdout) - p := &ProtocolV2{} - c := NewClientV2(0, nil, nil) + p := &protocolV2{} + c := newClientV2(0, nil, nil) params := [][]byte{[]byte("NOP")} b.StartTimer() diff --git a/nsqd/queue.go b/nsqd/queue.go index e157aa06f..de12463f6 100644 --- a/nsqd/queue.go +++ b/nsqd/queue.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bytes" @@ -17,39 +17,39 @@ type BackendQueue interface { Empty() error } -type DummyBackendQueue struct { +type dummyBackendQueue struct { readChan chan []byte } -func NewDummyBackendQueue() BackendQueue { - return &DummyBackendQueue{readChan: make(chan []byte)} +func newDummyBackendQueue() BackendQueue { + return &dummyBackendQueue{readChan: make(chan []byte)} } -func (d *DummyBackendQueue) Put([]byte) error { +func (d *dummyBackendQueue) Put([]byte) error { return nil } -func (d *DummyBackendQueue) ReadChan() chan []byte { +func (d *dummyBackendQueue) ReadChan() chan []byte { return d.readChan } -func (d *DummyBackendQueue) Close() error { +func (d *dummyBackendQueue) Close() error { return nil } -func (d *DummyBackendQueue) Delete() error { +func (d *dummyBackendQueue) Delete() error { return nil } -func (d *DummyBackendQueue) Depth() int64 { +func (d *dummyBackendQueue) Depth() int64 { return int64(0) } -func (d *DummyBackendQueue) Empty() error { +func (d *dummyBackendQueue) Empty() error { return nil } -func WriteMessageToBackend(buf *bytes.Buffer, msg *nsq.Message, bq BackendQueue) error { +func writeMessageToBackend(buf *bytes.Buffer, msg *nsq.Message, bq BackendQueue) error { buf.Reset() err := msg.Write(buf) if err != nil { diff --git a/nsqd/stats.go b/nsqd/stats.go index c017aaadf..fccd6646a 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "sort" @@ -106,7 +106,7 @@ type ChannelsByName struct { func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Channels[j].name } -func (n *NSQD) getStats() []TopicStats { +func (n *NSQD) GetStats() []TopicStats { n.RLock() defer n.RUnlock() diff --git a/nsqd/stats_test.go b/nsqd/stats_test.go index 72fa82fea..2e9139a35 100644 --- a/nsqd/stats_test.go +++ b/nsqd/stats_test.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "encoding/json" @@ -35,7 +35,7 @@ func TestStats(t *testing.T) { identify(t, conn, nil, nsq.FrameTypeResponse) sub(t, conn, topicName, "ch") - stats := nsqd.getStats() + stats := nsqd.GetStats() assert.Equal(t, len(stats), 1) assert.Equal(t, len(stats[0].Channels), 1) assert.Equal(t, len(stats[0].Channels[0].Clients), 1) @@ -48,8 +48,8 @@ func TestClientAttributes(t *testing.T) { userAgent := "Test User Agent" - *verbose = true options := NewNSQDOptions() + options.Verbose = true options.SnappyEnabled = true tcpAddr, httpAddr, nsqd := mustStartNSQD(options) defer nsqd.Exit() diff --git a/nsqd/statsd.go b/nsqd/statsd.go index 612d75931..acbe9fcef 100644 --- a/nsqd/statsd.go +++ b/nsqd/statsd.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "fmt" @@ -43,7 +43,7 @@ func (n *NSQD) statsdLoop() { log.Printf("STATSD: pushing stats to %s", statsd) - stats := n.getStats() + stats := n.GetStats() for _, topic := range stats { // try to find the topic in the last collection lastTopic := TopicStats{} diff --git a/nsqd/tcp.go b/nsqd/tcp.go index fb85ac0b5..85c586fb6 100644 --- a/nsqd/tcp.go +++ b/nsqd/tcp.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "io" @@ -10,7 +10,7 @@ import ( ) type tcpServer struct { - context *Context + context *context } func (p *tcpServer) Handle(clientConn net.Conn) { @@ -32,7 +32,7 @@ func (p *tcpServer) Handle(clientConn net.Conn) { var prot util.Protocol switch protocolMagic { case " V2": - prot = &ProtocolV2{context: p.context} + prot = &protocolV2{context: p.context} default: util.SendFramedResponse(clientConn, nsq.FrameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() diff --git a/nsqd/topic.go b/nsqd/topic.go index 77c443083..b08a1aa50 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "bytes" @@ -31,12 +31,12 @@ type Topic struct { pauseChan chan bool options *nsqdOptions - context *Context + context *context } // Topic constructor -func NewTopic(topicName string, context *Context) *Topic { - diskQueue := NewDiskQueue(topicName, +func NewTopic(topicName string, context *context) *Topic { + diskQueue := newDiskQueue(topicName, context.nsqd.options.DataPath, context.nsqd.options.MaxBytesPerFile, context.nsqd.options.SyncEvery, @@ -254,7 +254,7 @@ func (t *Topic) router() { select { case t.memoryMsgChan <- msg: default: - err := WriteMessageToBackend(&msgBuf, msg, t.backend) + err := writeMessageToBackend(&msgBuf, msg, t.backend) if err != nil { log.Printf("ERROR: failed to write message to backend - %s", err.Error()) // theres not really much we can do at this point, you're certainly @@ -350,7 +350,7 @@ func (t *Topic) flush() error { for { select { case msg := <-t.memoryMsgChan: - err := WriteMessageToBackend(&msgBuf, msg, t.backend) + err := writeMessageToBackend(&msgBuf, msg, t.backend) if err != nil { log.Printf("ERROR: failed to write message to backend - %s", err.Error()) } diff --git a/nsqd/topic_test.go b/nsqd/topic_test.go index 8969214a4..bd40f4da5 100644 --- a/nsqd/topic_test.go +++ b/nsqd/topic_test.go @@ -1,4 +1,4 @@ -package main +package nsqd import ( "io/ioutil" diff --git a/test.sh b/test.sh index fd5f7105d..ef3268a9c 100755 --- a/test.sh +++ b/test.sh @@ -8,9 +8,9 @@ apps/nsqlookupd/nsqlookupd >/dev/null 2>&1 & LOOKUPD_PID=$! # build and run nsqd configured to use our lookupd above -cmd="nsqd/nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=nsqd/test/cert.pem --tls-key=nsqd/test/key.pem" +cmd="apps/nsqd/nsqd --data-path=/tmp --lookupd-tcp-address=127.0.0.1:4160 --tls-cert=nsqd/test/cert.pem --tls-key=nsqd/test/key.pem" echo "building and starting $cmd" -go build -o nsqd/nsqd ./nsqd +go build -o apps/nsqd/nsqd ./apps/nsqd $cmd >/dev/null 2>&1 & NSQD_PID=$!