Skip to content

Commit

Permalink
Merge pull request #89 from mreiferson/docs_89
Browse files Browse the repository at this point in the history
NSQ package docs
  • Loading branch information
jehiah committed Nov 1, 2012
2 parents 762dd84 + 2fb1275 commit ae28471
Show file tree
Hide file tree
Showing 14 changed files with 253 additions and 85 deletions.
34 changes: 34 additions & 0 deletions nsq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
## nsq

`nsq` is the official Go package for [NSQ][nsq].

It provides the building blocks for developing applications on the [NSQ][nsq] platform in Go.

Low-level functions and types are provided to communicate over the [NSQ protocol][protocol] as well
as a high-level [Reader][reader] library to implement consumers.

See the [examples][examples] directory for utilities built using this package that provide support
for common tasks.

### Installing

$ go get github.com/bitly/nsq/nsq

### Importing

```go
import "github.com/bitly/nsq/nsq"
```

### Docs

See [gopkgdoc][nsq_gopkgdoc] for pretty documentation or:

# in the nsq package directory
$ go doc

[nsq]: https://github.com/bitly/nsq
[nsq_gopkgdoc]: http://go.pkgdoc.org/github.com/bitly/nsq/nsq
[protocol]: https://github.com/bitly/nsq/blob/master/docs/protocol.md
[examples]: https://github.com/bitly/nsq/tree/master/examples
[reader]: http://go.pkgdoc.org/github.com/bitly/nsq/nsq#Reader
4 changes: 4 additions & 0 deletions nsq/api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {
return c.Conn.Write(b)
}

// ApiRequest is a helper function to perform an HTTP request
// and parse our NSQ daemon's expected response format, with deadlines.
//
// {"status_code":200, "status_txt":"OK", "data":{...}}
func ApiRequest(endpoint string) (*simplejson.Json, error) {
transport := &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
Expand Down
21 changes: 15 additions & 6 deletions nsq/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,24 @@ import (
"strings"
)

// Command represents a command from a client to an NSQ daemon
type Command struct {
Name []byte
Params [][]byte
Body []byte
}

// String returns the name and parameters of the Command
func (c *Command) String() string {
if len(c.Params) > 0 {
return fmt.Sprintf("%s %s", c.Name, string(bytes.Join(c.Params, []byte(" "))))
}
return string(c.Name)
}

// Write serializes the Command to the supplied Writer.
//
// It is suggested that the target Writer is buffered to avoid performing many system calls.
func (c *Command) Write(w io.Writer) error {
_, err := w.Write(c.Name)
if err != nil {
Expand Down Expand Up @@ -69,7 +74,8 @@ func Announce(topic string, channel string, port int, ips []string) *Command {
return &Command{[]byte("ANNOUNCE"), params, []byte(strings.Join(ips, "\n"))}
}

// Identify is the first message sent to the Lookupd and provides information about the client
// Identify creates a new Command to provide information about the client to nsqlookupd.
// After connecting, it is the first message sent to nsqlookupd.
func Identify(version string, tcpPort int, httpPort int, address string) *Command {
body, err := json.Marshal(struct {
Version string `json:"version"`
Expand All @@ -88,7 +94,7 @@ func Identify(version string, tcpPort int, httpPort int, address string) *Comman
return &Command{[]byte("IDENTIFY"), [][]byte{}, body}
}

// REGISTER a topic/channel for this nsqd
// Register creates a new Command to add a topic/channel for the connected nsqd
func Register(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
Expand All @@ -97,7 +103,7 @@ func Register(topic string, channel string) *Command {
return &Command{[]byte("REGISTER"), params, nil}
}

// UNREGISTER removes a topic/channel from this nsqd
// Unregister creates a new Command to remove a topic/channel for the connected nsqd
func UnRegister(topic string, channel string) *Command {
params := [][]byte{[]byte(topic)}
if len(channel) > 0 {
Expand All @@ -118,6 +124,8 @@ func Publish(topic string, body []byte) *Command {
return &Command{[]byte("PUB"), params, body}
}

// MultiPublish creates a new Command to write more than one message to a given topic.
// This is useful for high-throughput situations to avoid roundtrips and saturate the pipe.
func MultiPublish(topic string, bodies [][]byte) (*Command, error) {
var params = [][]byte{[]byte(topic)}

Expand Down Expand Up @@ -147,8 +155,7 @@ func MultiPublish(topic string, bodies [][]byte) (*Command, error) {
return &Command{[]byte("MPUB"), params, body}, nil
}

// Subscribe creates a new Command to subscribe
// to the given topic/channel
// Subscribe creates a new Command to subscribe to the given topic/channel
func Subscribe(topic string, channel string, shortIdentifier string, longIdentifier string) *Command {
var params = [][]byte{[]byte(topic), []byte(channel), []byte(shortIdentifier), []byte(longIdentifier)}
return &Command{[]byte("SUB"), params, nil}
Expand Down Expand Up @@ -179,11 +186,13 @@ func Requeue(id []byte, timeoutMs int) *Command {
// StartClose creates a new Command to indicate that the
// client would like to start a close cycle. nsqd will no longer
// send messages to a client in this state and the client is expected
// to ACK after which it can finish pending messages and close the connection
// finish pending messages and close the connection
func StartClose() *Command {
return &Command{[]byte("CLS"), nil, nil}
}

// Nop creates a new Command that has no effect server side.
// Commonly used to respond to heartbeats
func Nop() *Command {
return &Command{[]byte("NOP"), nil, nil}
}
25 changes: 15 additions & 10 deletions nsq/errors.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
package nsq

// E_INVALID
// E_BAD_PROTOCOL
// E_BAD_TOPIC
// E_BAD_CHANNEL
// E_BAD_BODY
// E_REQ_FAILED
// E_FIN_FAILED
// E_PUT_FAILED
// E_MISSING_PARAMS

// ClientErr provides a way for NSQ daemons to log a human reabable
// error string and return a machine readable string to the client.
//
// E_INVALID
// E_BAD_PROTOCOL
// E_BAD_TOPIC
// E_BAD_CHANNEL
// E_BAD_BODY
// E_REQ_FAILED
// E_FIN_FAILED
// E_PUT_FAILED
// E_MISSING_PARAMS
type ClientErr struct {
Err string
Desc string
}

// Error returns the machine readable form
func (e *ClientErr) Error() string {
return e.Err
}

// Description return the human readable form
func (e *ClientErr) Description() string {
return e.Desc
}

// NewClientErr creates a ClientErr with the supplied human and machine readable strings
func NewClientErr(err string, description string) *ClientErr {
return &ClientErr{err, description}
}
22 changes: 20 additions & 2 deletions nsq/lookup_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,29 @@ import (
)

// LookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
//
// A LookupPeer instance is designed to connect lazily to nsqlookupd and reconnect
// gracefully (i.e. it is all handled by the library). Clients can simply use the
// Command interface to perform a round-trip.
type LookupPeer struct {
addr string
conn net.Conn
state int32
connectCallback func(*LookupPeer)
PeerInfo PeerInfo
Info PeerInfo
}

// PeerInfo contains metadata for a LookupPeer instance (and is JSON marshalable)
type PeerInfo struct {
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
Address string `json:"address"`
}

// NewLookupPeer creates a new LookupPeer instance
// NewLookupPeer creates a new LookupPeer instance connecting to the supplied address.
//
// The supplied connectCallback will be called *every* time the instance connects.
func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer {
return &LookupPeer{
addr: addr,
Expand All @@ -31,6 +38,7 @@ func NewLookupPeer(addr string, connectCallback func(*LookupPeer)) *LookupPeer {
}
}

// Connect will Dial the specified address, with timeouts
func (lp *LookupPeer) Connect() error {
log.Printf("LOOKUP connecting to %s", lp.addr)
conn, err := net.DialTimeout("tcp", lp.addr, time.Second)
Expand All @@ -41,24 +49,34 @@ func (lp *LookupPeer) Connect() error {
return nil
}

// String returns the specified address
func (lp *LookupPeer) String() string {
return lp.addr
}

// Read implements the io.Reader interface, adding deadlines
func (lp *LookupPeer) Read(data []byte) (int, error) {
lp.conn.SetReadDeadline(time.Now().Add(time.Second))
return lp.conn.Read(data)
}

// Write implements the io.Writer interface, adding deadlines
func (lp *LookupPeer) Write(data []byte) (int, error) {
lp.conn.SetWriteDeadline(time.Now().Add(time.Second))
return lp.conn.Write(data)
}

// Close implements the io.Closer interface
func (lp *LookupPeer) Close() error {
return lp.conn.Close()
}

// Command performs a round-trip for the specified Command.
//
// It will lazily connect to nsqlookupd and gracefully handle
// reconnecting in the event of a failure.
//
// It returns the response from nsqlookupd as []byte
func (lp *LookupPeer) Command(cmd *Command) ([]byte, error) {
initialState := lp.state
if lp.state != StateConnected {
Expand Down
14 changes: 8 additions & 6 deletions nsq/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ import (
"time"
)

// The number of bytes for a Message.Id
const MsgIdLength = 16

// Message is the fundamental data type containing
// the id, body, and meta-data
// the id, body, and metadata
type Message struct {
Id []byte
Body []byte
Timestamp int64
Attempts uint16
}

// NewMessage creates a Message, initializes some meta-data,
// NewMessage creates a Message, initializes some metadata,
// and returns a pointer
func NewMessage(id []byte, body []byte) *Message {
return &Message{
Expand All @@ -29,7 +30,7 @@ func NewMessage(id []byte, body []byte) *Message {
}
}

// EncodeBytes serializes the message into a new []byte
// EncodeBytes serializes the message into a new, returned, []byte
func (m *Message) EncodeBytes() ([]byte, error) {
var buf bytes.Buffer
err := m.Write(&buf)
Expand All @@ -39,7 +40,9 @@ func (m *Message) EncodeBytes() ([]byte, error) {
return buf.Bytes(), nil
}

// Write serializes the message into the supplied writer
// Write serializes the message into the supplied writer.
//
// It is suggested that the target Writer is buffered to avoid performing many system calls.
func (m *Message) Write(w io.Writer) error {
err := binary.Write(w, binary.BigEndian, &m.Timestamp)
if err != nil {
Expand All @@ -64,8 +67,7 @@ func (m *Message) Write(w io.Writer) error {
return nil
}

// DecodeMessage deseralizes data (as []byte) and creates/returns
// a pointer to a new Message
// DecodeMessage deseralizes data (as []byte) and creates a new Message
func DecodeMessage(byteBuf []byte) (*Message, error) {
var timestamp int64
var attempts uint16
Expand Down
Loading

0 comments on commit ae28471

Please sign in to comment.