From 2fb1275ee7a78145d9f1d3aac0aed1d2e29611a3 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Wed, 31 Oct 2012 14:16:49 -0400 Subject: [PATCH] nsq: add README and code comments (gopkgdoc) --- nsq/README.md | 34 +++++++++++ nsq/api_request.go | 4 ++ nsq/command.go | 21 +++++-- nsq/errors.go | 25 +++++---- nsq/lookup_peer.go | 22 +++++++- nsq/message.go | 14 +++-- nsq/protocol.go | 52 +++++++++++++---- nsq/reader.go | 133 ++++++++++++++++++++++++++++++++------------ nsq/states.go | 3 +- nsq/version.go | 6 ++ nsqd/diskqueue.go | 6 +- nsqd/lookup.go | 8 +-- nsqd/protocol_v2.go | 8 +-- pynsq/README.md | 2 +- 14 files changed, 253 insertions(+), 85 deletions(-) create mode 100644 nsq/README.md diff --git a/nsq/README.md b/nsq/README.md new file mode 100644 index 000000000..bbf3dd295 --- /dev/null +++ b/nsq/README.md @@ -0,0 +1,34 @@ +## nsq + +`nsq` is the official Go package for [NSQ][nsq]. + +It provides the building blocks for developing applications on the [NSQ][nsq] platform in Go. + +Low-level functions and types are provided to communicate over the [NSQ protocol][protocol] as well +as a high-level [Reader][reader] library to implement consumers. + +See the [examples][examples] directory for utilities built using this package that provide support +for common tasks. + +### Installing + + $ go get github.com/bitly/nsq/nsq + +### Importing + +```go +import "github.com/bitly/nsq/nsq" +``` + +### Docs + +See [gopkgdoc][nsq_gopkgdoc] for pretty documentation or: + + # in the nsq package directory + $ go doc + +[nsq]: https://github.com/bitly/nsq +[nsq_gopkgdoc]: http://go.pkgdoc.org/github.com/bitly/nsq/nsq +[protocol]: https://github.com/bitly/nsq/blob/master/docs/protocol.md +[examples]: https://github.com/bitly/nsq/tree/master/examples +[reader]: http://go.pkgdoc.org/github.com/bitly/nsq/nsq#Reader diff --git a/nsq/api_request.go b/nsq/api_request.go index 5af691090..bab9cb428 100644 --- a/nsq/api_request.go +++ b/nsq/api_request.go @@ -24,6 +24,10 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) { return c.Conn.Write(b) } +// ApiRequest is a helper function to perform an HTTP request +// and parse our NSQ daemon's expected response format, with deadlines. +// +// {"status_code":200, "status_txt":"OK", "data":{...}} func ApiRequest(endpoint string) (*simplejson.Json, error) { transport := &http.Transport{ Dial: func(netw, addr string) (net.Conn, error) { diff --git a/nsq/command.go b/nsq/command.go index 34bbb5fe3..c70502a26 100644 --- a/nsq/command.go +++ b/nsq/command.go @@ -11,12 +11,14 @@ import ( "strings" ) +// Command represents a command from a client to an NSQ daemon type Command struct { Name []byte Params [][]byte Body []byte } +// String returns the name and parameters of the Command func (c *Command) String() string { if len(c.Params) > 0 { return fmt.Sprintf("%s %s", c.Name, string(bytes.Join(c.Params, []byte(" ")))) @@ -24,6 +26,9 @@ func (c *Command) String() string { return string(c.Name) } +// Write serializes the Command to the supplied Writer. +// +// It is suggested that the target Writer is buffered to avoid performing many system calls. func (c *Command) Write(w io.Writer) error { _, err := w.Write(c.Name) if err != nil { @@ -69,7 +74,8 @@ func Announce(topic string, channel string, port int, ips []string) *Command { return &Command{[]byte("ANNOUNCE"), params, []byte(strings.Join(ips, "\n"))} } -// Identify is the first message sent to the Lookupd and provides information about the client +// Identify creates a new Command to provide information about the client to nsqlookupd. +// After connecting, it is the first message sent to nsqlookupd. func Identify(version string, tcpPort int, httpPort int, address string) *Command { body, err := json.Marshal(struct { Version string `json:"version"` @@ -88,7 +94,7 @@ func Identify(version string, tcpPort int, httpPort int, address string) *Comman return &Command{[]byte("IDENTIFY"), [][]byte{}, body} } -// REGISTER a topic/channel for this nsqd +// Register creates a new Command to add a topic/channel for the connected nsqd func Register(topic string, channel string) *Command { params := [][]byte{[]byte(topic)} if len(channel) > 0 { @@ -97,7 +103,7 @@ func Register(topic string, channel string) *Command { return &Command{[]byte("REGISTER"), params, nil} } -// UNREGISTER removes a topic/channel from this nsqd +// Unregister creates a new Command to remove a topic/channel for the connected nsqd func UnRegister(topic string, channel string) *Command { params := [][]byte{[]byte(topic)} if len(channel) > 0 { @@ -118,6 +124,8 @@ func Publish(topic string, body []byte) *Command { return &Command{[]byte("PUB"), params, body} } +// MultiPublish creates a new Command to write more than one message to a given topic. +// This is useful for high-throughput situations to avoid roundtrips and saturate the pipe. func MultiPublish(topic string, bodies [][]byte) (*Command, error) { var params = [][]byte{[]byte(topic)} @@ -147,8 +155,7 @@ func MultiPublish(topic string, bodies [][]byte) (*Command, error) { return &Command{[]byte("MPUB"), params, body}, nil } -// Subscribe creates a new Command to subscribe -// to the given topic/channel +// Subscribe creates a new Command to subscribe to the given topic/channel func Subscribe(topic string, channel string, shortIdentifier string, longIdentifier string) *Command { var params = [][]byte{[]byte(topic), []byte(channel), []byte(shortIdentifier), []byte(longIdentifier)} return &Command{[]byte("SUB"), params, nil} @@ -179,11 +186,13 @@ func Requeue(id []byte, timeoutMs int) *Command { // StartClose creates a new Command to indicate that the // client would like to start a close cycle. nsqd will no longer // send messages to a client in this state and the client is expected -// to ACK after which it can finish pending messages and close the connection +// finish pending messages and close the connection func StartClose() *Command { return &Command{[]byte("CLS"), nil, nil} } +// Nop creates a new Command that has no effect server side. +// Commonly used to respond to heartbeats func Nop() *Command { return &Command{[]byte("NOP"), nil, nil} } diff --git a/nsq/errors.go b/nsq/errors.go index d3a0d9d9e..69e3c4dbd 100644 --- a/nsq/errors.go +++ b/nsq/errors.go @@ -1,28 +1,33 @@ package nsq -// E_INVALID -// E_BAD_PROTOCOL -// E_BAD_TOPIC -// E_BAD_CHANNEL -// E_BAD_BODY -// E_REQ_FAILED -// E_FIN_FAILED -// E_PUT_FAILED -// E_MISSING_PARAMS - +// ClientErr provides a way for NSQ daemons to log a human reabable +// error string and return a machine readable string to the client. +// +// E_INVALID +// E_BAD_PROTOCOL +// E_BAD_TOPIC +// E_BAD_CHANNEL +// E_BAD_BODY +// E_REQ_FAILED +// E_FIN_FAILED +// E_PUT_FAILED +// E_MISSING_PARAMS type ClientErr struct { Err string Desc string } +// Error returns the machine readable form func (e *ClientErr) Error() string { return e.Err } +// Description return the human readable form func (e *ClientErr) Description() string { return e.Desc } +// NewClientErr creates a ClientErr with the supplied human and machine readable strings func NewClientErr(err string, description string) *ClientErr { return &ClientErr{err, description} } diff --git a/nsq/lookup_peer.go b/nsq/lookup_peer.go index 848454e46..e4d1ed070 100644 --- a/nsq/lookup_peer.go +++ b/nsq/lookup_peer.go @@ -7,14 +7,19 @@ import ( ) // LookupPeer is a low-level type for connecting/reading/writing to nsqlookupd +// +// A LookupPeer instance is designed to connect lazily to nsqlookupd and reconnect +// gracefully (i.e. it is all handled by the library). Clients can simply use the +// Command interface to perform a round-trip. type LookupPeer struct { addr string conn net.Conn state int32 connectCallback func(*LookupPeer) - PeerInfo PeerInfo + Info PeerInfo } +// PeerInfo contains metadata for a LookupPeer instance (and is JSON marshalable) type PeerInfo struct { TcpPort int `json:"tcp_port"` HttpPort int `json:"http_port"` @@ -22,7 +27,9 @@ type PeerInfo struct { Address string `json:"address"` } -// NewLookupPeer creates a new LookupPeer instance +// NewLookupPeer creates a new LookupPeer instance connecting to the supplied address. +// +// The supplied connectCallback will be called *every* time the instance connects. func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer { return &LookupPeer{ addr: addr, @@ -31,6 +38,7 @@ func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer { } } +// Connect will Dial the specified address, with timeouts func (lp *LookupPeer) Connect() error { log.Printf("LOOKUP connecting to %s", lp.addr) conn, err := net.DialTimeout("tcp", lp.addr, time.Second) @@ -41,24 +49,34 @@ func (lp *LookupPeer) Connect() error { return nil } +// String returns the specified address func (lp *LookupPeer) String() string { return lp.addr } +// Read implements the io.Reader interface, adding deadlines func (lp *LookupPeer) Read(data []byte) (int, error) { lp.conn.SetReadDeadline(time.Now().Add(time.Second)) return lp.conn.Read(data) } +// Write implements the io.Writer interface, adding deadlines func (lp *LookupPeer) Write(data []byte) (int, error) { lp.conn.SetWriteDeadline(time.Now().Add(time.Second)) return lp.conn.Write(data) } +// Close implements the io.Closer interface func (lp *LookupPeer) Close() error { return lp.conn.Close() } +// Command performs a round-trip for the specified Command. +// +// It will lazily connect to nsqlookupd and gracefully handle +// reconnecting in the event of a failure. +// +// It returns the response from nsqlookupd as []byte func (lp *LookupPeer) Command(cmd *Command) ([]byte, error) { initialState := lp.state if lp.state != StateConnected { diff --git a/nsq/message.go b/nsq/message.go index a8f0ab0de..d893621bf 100644 --- a/nsq/message.go +++ b/nsq/message.go @@ -8,10 +8,11 @@ import ( "time" ) +// The number of bytes for a Message.Id const MsgIdLength = 16 // Message is the fundamental data type containing -// the id, body, and meta-data +// the id, body, and metadata type Message struct { Id []byte Body []byte @@ -19,7 +20,7 @@ type Message struct { Attempts uint16 } -// NewMessage creates a Message, initializes some meta-data, +// NewMessage creates a Message, initializes some metadata, // and returns a pointer func NewMessage(id []byte, body []byte) *Message { return &Message{ @@ -29,7 +30,7 @@ func NewMessage(id []byte, body []byte) *Message { } } -// EncodeBytes serializes the message into a new []byte +// EncodeBytes serializes the message into a new, returned, []byte func (m *Message) EncodeBytes() ([]byte, error) { var buf bytes.Buffer err := m.Write(&buf) @@ -39,7 +40,9 @@ func (m *Message) EncodeBytes() ([]byte, error) { return buf.Bytes(), nil } -// Write serializes the message into the supplied writer +// Write serializes the message into the supplied writer. +// +// It is suggested that the target Writer is buffered to avoid performing many system calls. func (m *Message) Write(w io.Writer) error { err := binary.Write(w, binary.BigEndian, &m.Timestamp) if err != nil { @@ -64,8 +67,7 @@ func (m *Message) Write(w io.Writer) error { return nil } -// DecodeMessage deseralizes data (as []byte) and creates/returns -// a pointer to a new Message +// DecodeMessage deseralizes data (as []byte) and creates a new Message func DecodeMessage(byteBuf []byte) (*Message, error) { var timestamp int64 var attempts uint16 diff --git a/nsq/protocol.go b/nsq/protocol.go index 614b9e36e..22f288029 100644 --- a/nsq/protocol.go +++ b/nsq/protocol.go @@ -13,19 +13,25 @@ import ( var MagicV1 = []byte(" V1") var MagicV2 = []byte(" V2") +// The maximum value a client can specify via RDY const MaxReadyCount = 2500 const ( + // when successful FrameTypeResponse int32 = 0 - FrameTypeError int32 = 1 - FrameTypeMessage int32 = 2 + // when an error occurred + FrameTypeError int32 = 1 + // when it's a serialized message + FrameTypeMessage int32 = 2 ) +// The amount of time nsqd will allow a client to idle, can be overriden const DefaultClientTimeout = 60 * time.Second var validTopicNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+$`) var validChannelNameRegex = regexp.MustCompile(`^[\.a-zA-Z0-9_-]+(#ephemeral)?$`) +// IsValidTopicName checks a topic name for correctness func IsValidTopicName(name string) bool { if len(name) > 32 || len(name) < 1 { return false @@ -33,6 +39,7 @@ func IsValidTopicName(name string) bool { return validTopicNameRegex.MatchString(name) } +// IsValidChannelName checks a channel name for correctness func IsValidChannelName(name string) bool { if len(name) > 32 || len(name) < 1 { return false @@ -40,17 +47,20 @@ func IsValidChannelName(name string) bool { return validChannelNameRegex.MatchString(name) } -// describes the basic behavior of any protocol in the system +// Protocol describes the basic behavior of any protocol in the system type Protocol interface { IOLoop(conn net.Conn) error } +// ReadMagic is a server-side utility function to read the 4-byte magic id +// from the supplied Reader. +// +// The client should initialize itself by sending a 4 byte sequence indicating +// the version of the protocol that it intends to communicate, this will allow us +// to gracefully upgrade the protocol away from text/line oriented to whatever... func ReadMagic(r io.Reader) (int32, error) { var protocolMagic int32 - // the client should initialize itself by sending a 4 byte sequence indicating - // the version of the protocol that it intends to communicate, this will allow us - // to gracefully upgrade the protocol away from text/line oriented to whatever... err := binary.Read(r, binary.BigEndian, &protocolMagic) if err != nil { return 0, err @@ -59,6 +69,8 @@ func ReadMagic(r io.Reader) (int32, error) { return protocolMagic, nil } +// SendResponse is a server side utility function to prefix data with a length header +// and write to the supplied Writer func SendResponse(w io.Writer, data []byte) (int, error) { err := binary.Write(w, binary.BigEndian, int32(len(data))) if err != nil { @@ -73,6 +85,14 @@ func SendResponse(w io.Writer, data []byte) (int, error) { return (n + 4), nil } +// ReadResponse is a client-side utility function to read from the supplied Reader +// according to the NSQ protocol spec: +// +// [x][x][x][x][x][x][x][x]... +// | (int32) || (binary) +// | 4-byte || N-byte +// ------------------------... +// size data func ReadResponse(r io.Reader) ([]byte, error) { var msgSize int32 @@ -92,12 +112,15 @@ func ReadResponse(r io.Reader) ([]byte, error) { return buf, nil } -// DEPRECATED in 0.2.5, use: cmd.Write(w) -// SendCommand writes a serialized command to the supplied Writer +// DEPRECATED in 0.2.5, use: cmd.Write(w). +// +// SendCommand is a client-side utility function to serialize a command to the supplied Writer func SendCommand(w io.Writer, cmd *Command) error { return cmd.Write(w) } +// Frame is a server-side utility function to write the specified frameType +// and data to the supplied Writer func Frame(w io.Writer, frameType int32, data []byte) error { err := binary.Write(w, binary.BigEndian, &frameType) if err != nil { @@ -112,9 +135,16 @@ func Frame(w io.Writer, frameType int32, data []byte) error { return nil } -// UnpackResponse is a helper function that takes serialized data (as []byte), -// unpacks and returns a triplicate of: -// frame type, data ([]byte), error +// UnpackResponse is a client-side utility function that unpacks serialized data +// according to NSQ protocol spec: +// +// [x][x][x][x][x][x][x][x]... +// | (int32) || (binary) +// | 4-byte || N-byte +// ------------------------... +// frame ID data +// +// Returns a triplicate of: frame type, data ([]byte), error func UnpackResponse(response []byte) (int32, []byte, error) { var frameType int32 diff --git a/nsq/reader.go b/nsq/reader.go index a33ea5fbc..793d15033 100644 --- a/nsq/reader.go +++ b/nsq/reader.go @@ -18,35 +18,54 @@ import ( "time" ) +// returned from ConnectToNSQ() when already connected var ErrAlreadyConnected = errors.New("already connected") -// a syncronous handler that returns an error (or nil to indicate success) +// Handler is the synchronous interface to Reader. +// +// Implement this interface for handlers that return whether or not message +// processing completed successfully. +// +// When the return value is nil Reader will automatically handle FINishing. +// +// When the returned value is non-nil Reader will automatically handle REQueing. type Handler interface { HandleMessage(message *Message) error } -type FailedMessageLogger interface { - LogFailedMessage(message *Message) -} - -// an async handler that must send a &FinishedMessage{messageID, requeueDelay, true|false} onto -// responseChannel to indicate that a message has been finished. This is usefull -// if you want to batch work together and delay response that processing is complete +// AsyncHandler is the asynchronous interface to Reader. +// +// Implement this interface for handlers that wish to defer responding until later. +// This is particularly useful if you want to batch work together. +// +// An AsyncHandler must send: +// +// &FinishedMessage{messageID, requeueDelay, true|false} +// +// To the supplied responseChannel to indicate that a message is processed. type AsyncHandler interface { HandleMessage(message *Message, responseChannel chan *FinishedMessage) } -type incomingMessage struct { - *Message - responseChannel chan *FinishedMessage -} - +// FinishedMessage is the data type used over responseChannel in AsyncHandlers type FinishedMessage struct { Id []byte RequeueDelayMs int Success bool } +// FailedMessageLogger is an interface that can be implemented by handlers that wish +// to receive a callback when a message is deemed "failed" (i.e. the number of attempts +// exceeded the Reader specified MaxAttemptCount) +type FailedMessageLogger interface { + LogFailedMessage(message *Message) +} + +type incomingMessage struct { + *Message + responseChannel chan *FinishedMessage +} + type nsqConn struct { net.Conn r *bufio.Reader @@ -116,24 +135,34 @@ func (c *nsqConn) sendCommand(buf *bytes.Buffer, cmd *Command) error { return err } +// Reader is a high-level type to consume from NSQ. +// +// A Reader instance is supplied handler(s) that will be executed +// concurrently via goroutines to handle processing the stream of messages +// consumed from the specified topic/channel. See: AsyncHandler and Handler +// for details on implementing those interfaces to create handlers. +// +// If configured, it will poll nsqlookupd instances and handle connection (and +// reconnection) to any discovered nsqds. type Reader struct { TopicName string // name of topic to subscribe to ChannelName string // name of channel to subscribe to - LookupdPollInterval time.Duration // seconds between polling lookupd's (+/- random 1/10th this value) - MaxAttemptCount uint16 - DefaultRequeueDelay time.Duration - MaxRequeueDelay time.Duration - VerboseLogging bool - ShortIdentifier string // an identifier to send to nsqd when connecting (defaults: short hostname) - LongIdentifier string // an identifier to send to nsqd when connecting (defaults: long hostname) - ReadTimeout time.Duration - WriteTimeout time.Duration - MessagesReceived uint64 - MessagesFinished uint64 - MessagesRequeued uint64 - ExitChan chan int - - maxInFlight int // max number of messages to allow in-flight at a time + LookupdPollInterval time.Duration // seconds between polling lookupd's (+/- random 1/10th this value for jitter) + MaxAttemptCount uint16 // maximum number of times this reader will attempt to process a message + DefaultRequeueDelay time.Duration // the default duration when REQueueing + MaxRequeueDelay time.Duration // the maximum duration when REQueueing (for doubling backoff) + VerboseLogging bool // enable verbose logging + ShortIdentifier string // an identifier to send to nsqd when connecting (defaults: short hostname) + LongIdentifier string // an identifier to send to nsqd when connecting (defaults: long hostname) + ReadTimeout time.Duration // the deadline set for network reads + WriteTimeout time.Duration // the deadline set for network writes + MessagesReceived uint64 // an atomic counter - # of messages received + MessagesFinished uint64 // an atomic counter - # of messages FINished + MessagesRequeued uint64 // an atomic counter - # of messages REQueued + ExitChan chan int // read from this channel to block your main loop + + // internal variables + maxInFlight int incomingMessages chan *incomingMessage nsqConnections map[string]*nsqConn lookupdExitChan chan int @@ -145,6 +174,10 @@ type Reader struct { stopHandler sync.Once } +// NewReader creates a new instance of Reader for the specified topic/channel +// +// The returned Reader instance is setup with sane default values. To modify +// configuration, update the values on the returned instance before connecting. func NewReader(topic string, channel string) (*Reader, error) { if !IsValidTopicName(topic) { return nil, errors.New("invalid topic name") @@ -179,8 +212,10 @@ func NewReader(topic string, channel string) (*Reader, error) { return q, nil } -// calculate the max in flight count per connection -// this may change dynamically based on the number of connections +// ConnectionMaxInFlight calculates the per-connection max-in-flight count. +// +// This may change dynamically based on the number of connections to nsqd the Reader +// is responsible for. func (q *Reader) ConnectionMaxInFlight() int { b := float64(q.maxInFlight) s := b / float64(len(q.nsqConnections)) @@ -200,7 +235,10 @@ func (q *Reader) IsStarved() bool { return false } -// update the reader ready state, updating each connection as appropriate +// SetMaxInFlight sets the maximum number of messages this reader instance +// will allow in-flight. +// +// If already connected, it updates the reader RDY state for each connection. func (q *Reader) SetMaxInFlight(maxInFlight int) { if atomic.LoadInt32(&q.stopFlag) == 1 { return @@ -221,11 +259,17 @@ func (q *Reader) SetMaxInFlight(maxInFlight int) { } } -// max number of messages to allow in-flight at a time +// MaxInFlight returns the configured maximum number of messages to allow in-flight. func (q *Reader) MaxInFlight() int { return q.maxInFlight } +// ConnectToLookupd adds a nsqlookupd address to the list for this Reader instance. +// +// If it is the first to be added, it initiates an HTTP request to discover nsqd +// producers for the configured topic. +// +// A goroutine is spawned to handle continual polling. func (q *Reader) ConnectToLookupd(addr string) error { // make a HTTP req to the lookupd, and ask it for endpoints that have the // topic we are interested in. @@ -248,6 +292,8 @@ func (q *Reader) ConnectToLookupd(addr string) error { // poll all known lookup servers every LookupdPollInterval func (q *Reader) lookupdLoop() { + // add some jitter so that multiple consumers discovering the same topic, + // when restarted at the same time, dont all connect at once. rand.Seed(time.Now().UnixNano()) time.Sleep(time.Duration(rand.Int63n(int64(q.LookupdPollInterval / 10)))) ticker := time.Tick(q.LookupdPollInterval) @@ -296,6 +342,11 @@ func (q *Reader) queryLookupd() { } } +// ConnectToNSQ takes a nsqd address to connect directly to. +// +// It is recommended to use ConnectToLookupd so that topics are discovered +// automatically. This method is useful when you want to connect to a single, local, +// instance. func (q *Reader) ConnectToNSQ(addr string) error { var buf bytes.Buffer @@ -542,7 +593,7 @@ func (q *Reader) updateReady(c *nsqConn) error { return nil } -// stops a Reader gracefully +// Stop will gracefully stop the Reader func (q *Reader) Stop() { var buf bytes.Buffer @@ -579,8 +630,12 @@ func (q *Reader) stopHandlers() { }) } -// this starts a handler on the Reader -// it's ok to start more than one handler simultaneously +// AddHandler adds a Handler for messages received by this Reader. +// +// See Handler for details on implementing this interface. +// +// It's ok to start more than one handler simultaneously, they +// are concurrently executed in goroutines. func (q *Reader) AddHandler(handler Handler) { atomic.AddInt32(&q.runningHandlers, 1) log.Println("starting Handler go-routine") @@ -623,8 +678,12 @@ func (q *Reader) AddHandler(handler Handler) { }() } -// this starts an async handler on the Reader -// it's ok to start more than one handler simultaneously +// AddAsyncHandler adds an AsyncHandler for messages received by this Reader. +// +// See AsyncHandler for details on implementing this interface. +// +// It's ok to start more than one handler simultaneously, they +// are concurrently executed in goroutines. func (q *Reader) AddAsyncHandler(handler AsyncHandler) { atomic.AddInt32(&q.runningHandlers, 1) log.Println("starting AsyncHandler go-routine") diff --git a/nsq/states.go b/nsq/states.go index 64ca221cb..b906da631 100644 --- a/nsq/states.go +++ b/nsq/states.go @@ -5,5 +5,6 @@ const ( StateDisconnected StateConnected StateSubscribed - StateClosing // close has started. responses are ok, but no new messages will be sent + // close has started. responses are ok, but no new messages will be sent + StateClosing ) diff --git a/nsq/version.go b/nsq/version.go index 82f559751..42c2196d9 100644 --- a/nsq/version.go +++ b/nsq/version.go @@ -1,3 +1,9 @@ +// nsq is the official Go package for https://github.com/bitly/nsq +// +// It provides the building blocks for developing applications on the NSQ platform in Go. +// +// Low-level functions and types are provided to communicate over the NSQ protocol as well +// as a high-level Reader library to implement robust consumers. package nsq const VERSION = "0.2.4" diff --git a/nsqd/diskqueue.go b/nsqd/diskqueue.go index 94d772c16..84643bc3b 100644 --- a/nsqd/diskqueue.go +++ b/nsqd/diskqueue.go @@ -20,7 +20,7 @@ import ( type DiskQueue struct { sync.RWMutex - // instatiation time meta-data + // instatiation time metadata name string dataPath string maxBytesPerFile int64 // currently this cannot change once created @@ -56,7 +56,7 @@ type DiskQueue struct { exitSyncChan chan int } -// NewDiskQueue instantiates a new instance of DiskQueue, retrieving meta-data +// NewDiskQueue instantiates a new instance of DiskQueue, retrieving metadata // from the filesystem and starting the read ahead goroutine func NewDiskQueue(name string, dataPath string, maxBytesPerFile int64, syncEvery int64) BackendQueue { d := DiskQueue{ @@ -107,7 +107,7 @@ func (d *DiskQueue) Put(data []byte) error { return <-d.writeResponseChan } -// Close cleans up the queue and persists meta-data +// Close cleans up the queue and persists metadata func (d *DiskQueue) Close() error { d.Lock() defer d.Unlock() diff --git a/nsqd/lookup.go b/nsqd/lookup.go index 28b2862b2..166d8e14b 100644 --- a/nsqd/lookup.go +++ b/nsqd/lookup.go @@ -33,11 +33,11 @@ func (n *NSQd) lookupLoop() { } else if bytes.Equal(resp, []byte("E_INVALID")) { log.Printf("LOOKUPD(%s): lookupd returned %s", lp, resp) } else { - err = json.Unmarshal(resp, &lp.PeerInfo) + err = json.Unmarshal(resp, &lp.Info) if err != nil { log.Printf("LOOKUPD(%s): ERROR parsing response - %v", lp, resp) } else { - log.Printf("LOOKUPD(%s): peer info %+v", lp, lp.PeerInfo) + log.Printf("LOOKUPD(%s): peer info %+v", lp, lp.Info) } } @@ -141,10 +141,10 @@ exit: func (n *NSQd) lookupHttpAddrs() []string { var lookupHttpAddrs []string for _, lp := range n.lookupPeers { - if len(lp.PeerInfo.Address) <= 0 { + if len(lp.Info.Address) <= 0 { continue } - addr := net.JoinHostPort(lp.PeerInfo.Address, strconv.Itoa(lp.PeerInfo.HttpPort)) + addr := net.JoinHostPort(lp.Info.Address, strconv.Itoa(lp.Info.HttpPort)) lookupHttpAddrs = append(lookupHttpAddrs, addr) } return lookupHttpAddrs diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index a99649454..2c6fdb602 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -285,8 +285,8 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nsq.NewClientErr("E_MISSING_PARAMS", "insufficient number of params") } - idStr := params[1] - err := client.Channel.FinishMessage(client, idStr) + id := params[1] + err := client.Channel.FinishMessage(client, id) if err != nil { return nil, nsq.NewClientErr("E_FIN_FAILED", err.Error()) } @@ -306,7 +306,7 @@ func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nsq.NewClientErr("E_MISSING_PARAMS", "insufficient number of params") } - idStr := params[1] + id := params[1] timeoutMs, err := strconv.Atoi(string(params[2])) if err != nil { return nil, nsq.NewClientErr("E_INVALID", fmt.Sprintf("could not parse timeout %s", params[2])) @@ -317,7 +317,7 @@ func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) { return nil, nsq.NewClientErr("E_INVALID", fmt.Sprintf("timeout %d out of range", timeoutDuration)) } - err = client.Channel.RequeueMessage(client, idStr, timeoutDuration) + err = client.Channel.RequeueMessage(client, id, timeoutDuration) if err != nil { return nil, nsq.NewClientErr("E_REQ_FAILED", err.Error()) } diff --git a/pynsq/README.md b/pynsq/README.md index a56229508..b9da21a13 100644 --- a/pynsq/README.md +++ b/pynsq/README.md @@ -1 +1 @@ -**pynsq** has moved to it's [own repository][https://github.com/bitly/pynsq] +**pynsq** has moved to https://github.com/bitly/pynsq