Skip to content

Commit

Permalink
Merge pull request #37 from andygrunwald/client-example-session
Browse files Browse the repository at this point in the history
Client example: Renamed concept "Session" to "Client"
  • Loading branch information
Zerpet authored Jan 31, 2022
2 parents 51fade5 + b146d26 commit 5127ea9
Showing 1 changed file with 67 additions and 67 deletions.
134 changes: 67 additions & 67 deletions example_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

// This exports a Session object that wraps this library. It
// This exports a Client object that wraps this library. It
// automatically reconnects when the connection fails, and
// blocks all pushes until the connection succeeds. It also
// confirms every outgoing message, so none are lost.
Expand All @@ -40,7 +40,7 @@ func Example() {
}
}

type Session struct {
type Client struct {
name string
logger *log.Logger
connection *amqp.Connection
Expand All @@ -66,93 +66,93 @@ const (
var (
errNotConnected = errors.New("not connected to a server")
errAlreadyClosed = errors.New("already closed: not connected to the server")
errShutdown = errors.New("session is shutting down")
errShutdown = errors.New("client is shutting down")
)

// New creates a new consumer state instance, and automatically
// attempts to connect to the server.
func New(name string, addr string) *Session {
session := Session{
func New(name string, addr string) *Client {
client := Client{
logger: log.New(os.Stdout, "", log.LstdFlags),
name: name,
done: make(chan bool),
}
go session.handleReconnect(addr)
return &session
go client.handleReconnect(addr)
return &client
}

// handleReconnect will wait for a connection error on
// notifyConnClose, and then continuously attempt to reconnect.
func (session *Session) handleReconnect(addr string) {
func (client *Client) handleReconnect(addr string) {
for {
session.isReady = false
session.logger.Println("Attempting to connect")
client.isReady = false
client.logger.Println("Attempting to connect")

conn, err := session.connect(addr)
conn, err := client.connect(addr)

if err != nil {
session.logger.Println("Failed to connect. Retrying...")
client.logger.Println("Failed to connect. Retrying...")

select {
case <-session.done:
case <-client.done:
return
case <-time.After(reconnectDelay):
}
continue
}

if done := session.handleReInit(conn); done {
if done := client.handleReInit(conn); done {
break
}
}
}

// connect will create a new AMQP connection
func (session *Session) connect(addr string) (*amqp.Connection, error) {
func (client *Client) connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)

if err != nil {
return nil, err
}

session.changeConnection(conn)
session.logger.Println("Connected!")
client.changeConnection(conn)
client.logger.Println("Connected!")
return conn, nil
}

// handleReconnect will wait for a channel error
// and then continuously attempt to re-initialize both channels
func (session *Session) handleReInit(conn *amqp.Connection) bool {
func (client *Client) handleReInit(conn *amqp.Connection) bool {
for {
session.isReady = false
client.isReady = false

err := session.init(conn)
err := client.init(conn)

if err != nil {
session.logger.Println("Failed to initialize channel. Retrying...")
client.logger.Println("Failed to initialize channel. Retrying...")

select {
case <-session.done:
case <-client.done:
return true
case <-time.After(reInitDelay):
}
continue
}

select {
case <-session.done:
case <-client.done:
return true
case <-session.notifyConnClose:
session.logger.Println("Connection closed. Reconnecting...")
case <-client.notifyConnClose:
client.logger.Println("Connection closed. Reconnecting...")
return false
case <-session.notifyChanClose:
session.logger.Println("Channel closed. Re-running init...")
case <-client.notifyChanClose:
client.logger.Println("Channel closed. Re-running init...")
}
}
}

// init will initialize channel & declare queue
func (session *Session) init(conn *amqp.Connection) error {
func (client *Client) init(conn *amqp.Connection) error {
ch, err := conn.Channel()

if err != nil {
Expand All @@ -165,7 +165,7 @@ func (session *Session) init(conn *amqp.Connection) error {
return err
}
_, err = ch.QueueDeclare(
session.name,
client.name,
false, // Durable
false, // Delete when unused
false, // Exclusive
Expand All @@ -177,76 +177,76 @@ func (session *Session) init(conn *amqp.Connection) error {
return err
}

session.changeChannel(ch)
session.isReady = true
session.logger.Println("Setup!")
client.changeChannel(ch)
client.isReady = true
client.logger.Println("Setup!")

return nil
}

// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (session *Session) changeConnection(connection *amqp.Connection) {
session.connection = connection
session.notifyConnClose = make(chan *amqp.Error)
session.connection.NotifyClose(session.notifyConnClose)
func (client *Client) changeConnection(connection *amqp.Connection) {
client.connection = connection
client.notifyConnClose = make(chan *amqp.Error)
client.connection.NotifyClose(client.notifyConnClose)
}

// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (session *Session) changeChannel(channel *amqp.Channel) {
session.channel = channel
session.notifyChanClose = make(chan *amqp.Error)
session.notifyConfirm = make(chan amqp.Confirmation, 1)
session.channel.NotifyClose(session.notifyChanClose)
session.channel.NotifyPublish(session.notifyConfirm)
func (client *Client) changeChannel(channel *amqp.Channel) {
client.channel = channel
client.notifyChanClose = make(chan *amqp.Error)
client.notifyConfirm = make(chan amqp.Confirmation, 1)
client.channel.NotifyClose(client.notifyChanClose)
client.channel.NotifyPublish(client.notifyConfirm)
}

// Push will push data onto the queue, and wait for a confirm.
// If no confirms are received until within the resendTimeout,
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (session *Session) Push(data []byte) error {
if !session.isReady {
func (client *Client) Push(data []byte) error {
if !client.isReady {
return errors.New("failed to push: not connected")
}
for {
err := session.UnsafePush(data)
err := client.UnsafePush(data)
if err != nil {
session.logger.Println("Push failed. Retrying...")
client.logger.Println("Push failed. Retrying...")
select {
case <-session.done:
case <-client.done:
return errShutdown
case <-time.After(resendDelay):
}
continue
}
select {
case confirm := <-session.notifyConfirm:
case confirm := <-client.notifyConfirm:
if confirm.Ack {
session.logger.Println("Push confirmed!")
client.logger.Println("Push confirmed!")
return nil
}
case <-time.After(resendDelay):
}
session.logger.Println("Push didn't confirm. Retrying...")
client.logger.Println("Push didn't confirm. Retrying...")
}
}

// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (session *Session) UnsafePush(data []byte) error {
if !session.isReady {
func (client *Client) UnsafePush(data []byte) error {
if !client.isReady {
return errNotConnected
}
return session.channel.Publish(
"", // Exchange
session.name, // Routing key
false, // Mandatory
false, // Immediate
return client.channel.Publish(
"", // Exchange
client.name, // Routing key
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "text/plain",
Body: data,
Expand All @@ -258,12 +258,12 @@ func (session *Session) UnsafePush(data []byte) error {
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (session *Session) Stream() (<-chan amqp.Delivery, error) {
if !session.isReady {
func (client *Client) Stream() (<-chan amqp.Delivery, error) {
if !client.isReady {
return nil, errNotConnected
}
return session.channel.Consume(
session.name,
return client.channel.Consume(
client.name,
"", // Consumer
false, // Auto-Ack
false, // Exclusive
Expand All @@ -274,20 +274,20 @@ func (session *Session) Stream() (<-chan amqp.Delivery, error) {
}

// Close will cleanly shutdown the channel and connection.
func (session *Session) Close() error {
if !session.isReady {
func (client *Client) Close() error {
if !client.isReady {
return errAlreadyClosed
}
close(session.done)
err := session.channel.Close()
close(client.done)
err := client.channel.Close()
if err != nil {
return err
}
err = session.connection.Close()
err = client.connection.Close()
if err != nil {
return err
}

session.isReady = false
client.isReady = false
return nil
}

0 comments on commit 5127ea9

Please sign in to comment.