Skip to content

Commit

Permalink
Add logging (#84)
Browse files Browse the repository at this point in the history
* Add targets to start and stop RabbitMQ via docker

Correctly exit test on errors

Get rid of a couple more panics

Replace nolint / TODO with logging

Use single Logger

Add more informative log messages

* Remove function as suggested by @fho
  • Loading branch information
lukebakken authored May 19, 2022
1 parent b221bfd commit b068367
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 33 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ tests: ## Run all tests and requires a running rabbitmq-server
.PHONY: check
check:
golangci-lint run ./...

.PHONY: rabbitmq-server
rabbitmq-server:
docker run --detach --rm --name amqp091-go-rabbitmq \
--publish 5672:5672 --publish 15672:15672 \
--pull always rabbitmq:3-management

.PHONY: stop-rabbitmq-server
stop-rabbitmq-server:
docker stop $$(docker inspect --format='{{.Id}}' amqp091-go-rabbitmq)
10 changes: 6 additions & 4 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,9 @@ func (ch *Channel) dispatch(msg message) {
// to avoid unexpected interleaving with basic.publish frames if
// publishing is happening concurrently
ch.m.Lock()
// TODO check error case
ch.send(&channelCloseOk{}) //nolint
if err := ch.send(&channelCloseOk{}); err != nil {
Logger.Printf("error sending channelCloseOk, channel id: %d error: %+v", ch.id, err)
}
ch.m.Unlock()
ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText))

Expand All @@ -289,8 +290,9 @@ func (ch *Channel) dispatch(msg message) {
c <- m.Active
}
ch.notifyM.RUnlock()
// TODO check error case
ch.send(&channelFlowOk{Active: m.Active}) //nolint
if err := ch.send(&channelFlowOk{Active: m.Active}); err != nil {
Logger.Printf("error sending channelFlowOk, channel id: %d error: %+v", ch.id, err)
}

case *basicCancel:
ch.notifyM.RLock()
Expand Down
8 changes: 4 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (t *server) expectBytes(b []byte) {
}

func (t *server) send(channel int, m message) {
defer time.AfterFunc(time.Second, func() { panic("send deadlock") }).Stop()
defer time.AfterFunc(time.Second, func() { t.Fatalf("send deadlock") }).Stop()

if msg, ok := m.(messageWithContent); ok {
props, body := msg.getContent()
Expand Down Expand Up @@ -115,7 +115,7 @@ func (t *server) send(channel int, m message) {

// drops all but method frames expected on the given channel
func (t *server) recv(channel int, m message) message {
defer time.AfterFunc(time.Second, func() { panic("recv deadlock") }).Stop()
defer time.AfterFunc(time.Second, func() { t.Fatalf("recv deadlock") }).Stop()

var remaining int
var header *headerFrame
Expand Down Expand Up @@ -777,7 +777,7 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
t.Fatalf("couldn't open channel: %v (%s)", ch, err)
}

defer time.AfterFunc(500*time.Millisecond, func() { panic("Publish deadlock") }).Stop()
defer time.AfterFunc(500*time.Millisecond, func() { t.Fatalf("Publish deadlock") }).Stop()
for {
if err := ch.Publish("exchange", "q", false, false, Publishing{Body: []byte("test")}); err != nil {
t.Log("successfully caught disconnect error", err)
Expand All @@ -791,7 +791,7 @@ func TestPublishAndShutdownDeadlockIssue84(t *testing.T) {
// channel.shutdown() which closes all registered notification channels - checks
// for a "send on closed channel" panic
func TestChannelReturnsCloseRace(t *testing.T) {
defer time.AfterFunc(5*time.Second, func() { panic("Shutdown deadlock") }).Stop()
defer time.AfterFunc(5*time.Second, func() { t.Fatalf("Shutdown deadlock") }).Stop()
ch := newChannel(&Connection{}, 1)

// Register a channel to close in channel.shutdown()
Expand Down
22 changes: 13 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,10 @@ func (c *Connection) dispatch0(f frame) {
switch m := mf.Method.(type) {
case *connectionClose:
// Send immediately as shutdown will close our side of the writer.
// TODO check error case
f := &methodFrame{ChannelId: 0, Method: &connectionCloseOk{}}
c.send(f) //nolint
if err := c.send(f); err != nil {
Logger.Printf("error sending connectionCloseOk, error: %+v", err)
}
c.shutdown(newError(m.ReplyCode, m.ReplyText))
case *connectionBlocked:
for _, c := range c.blocks {
Expand All @@ -485,8 +486,9 @@ func (c *Connection) dispatch0(f frame) {
// kthx - all reads reset our deadline. so we can drop this
default:
// lolwat - channel0 only responds to methods and heartbeats
// TODO check error case
c.closeWith(ErrUnexpectedFrame) //nolint
if err := c.closeWith(ErrUnexpectedFrame); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err)
}
}
}

Expand Down Expand Up @@ -518,15 +520,17 @@ func (c *Connection) dispatchClosed(f frame) {
if mf, ok := f.(*methodFrame); ok {
switch mf.Method.(type) {
case *channelClose:
// TODO check error case
f := &methodFrame{ChannelId: f.channel(), Method: &channelCloseOk{}}
c.send(f) //nolint
if err := c.send(f); err != nil {
Logger.Printf("error sending channelCloseOk, channel id: %d error: %+v", f.channel(), err)
}
case *channelCloseOk:
// we are already closed, so do nothing
default:
// unexpected method on closed channel
// TODO check error case
c.closeWith(ErrClosed) //nolint
if err := c.closeWith(ErrClosed); err != nil {
Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err)
}
}
}
}
Expand Down Expand Up @@ -603,7 +607,7 @@ func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
if err := conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)); err != nil {
var opErr *net.OpError
if !errors.As(err, &opErr) {
// TODO check error case
Logger.Printf("error setting read deadline in heartbeater: %+v", err)
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestChannelOpenOnAClosedConnectionFails_ReleasesAllocatedChannel(t *testing
// See https://github.com/streadway/amqp/issues/251 - thanks to jmalloc for the
// test case.
func TestRaceBetweenChannelAndConnectionClose(t *testing.T) {
defer time.AfterFunc(10*time.Second, func() { panic("Close deadlock") }).Stop()
defer time.AfterFunc(10*time.Second, func() { t.Fatalf("Close deadlock") }).Stop()

conn := integrationConnection(t, "allocation/shutdown race")

Expand All @@ -95,7 +95,7 @@ func TestRaceBetweenChannelAndConnectionClose(t *testing.T) {
// more details - thanks to jmalloc again.
func TestRaceBetweenChannelShutdownAndSend(t *testing.T) {
const concurrency = 10
defer time.AfterFunc(10*time.Second, func() { panic("Close deadlock") }).Stop()
defer time.AfterFunc(10*time.Second, func() { t.Fatalf("Close deadlock") }).Stop()

conn := integrationConnection(t, "channel close/send race")
defer conn.Close()
Expand Down
14 changes: 7 additions & 7 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {

c1, err := conn.Channel()
if err != nil {
panic(err)
t.Fatalf("failed to create channel, got: %v", err)
}

go func() {
Expand All @@ -1590,7 +1590,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {

c2, err := conn.Channel()
if err != nil {
panic(err)
t.Fatalf("failed to create channel, got: %v", err)
}

go func() {
Expand All @@ -1604,7 +1604,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {
// asynchronous method.
err = c1.Publish("nonexisting-exchange", "", false, false, Publishing{})
if err != nil {
panic(err)
t.Fatalf("failed to publish, got: %v", err)
}

// Receive or send the channel close method, the channel shuts down
Expand All @@ -1615,7 +1615,7 @@ func TestChannelExceptionWithCloseIssue43(t *testing.T) {
// on channel 1.
err = c2.ExchangeDeclare("test-channel-still-exists", "direct", false, true, false, false, nil)
if err != nil {
panic(err)
t.Fatalf("failed to declare exchange, got: %v", err)
}
}
}
Expand Down Expand Up @@ -1658,7 +1658,7 @@ func TestCorruptedMessageIssue7(t *testing.T) {

for i := 0; i < messageCount; i++ {
err := pub.Publish("", queue, false, false, Publishing{
Body: generateCrc32Random(7 * i),
Body: generateCrc32Random(t, 7*i),
})

if err != nil {
Expand Down Expand Up @@ -2095,10 +2095,10 @@ func assertMessageCrc32(t *testing.T, msg []byte, assert string) {

// Creates a random body size with a leading 32-bit CRC in network byte order
// that verifies the remaining slice
func generateCrc32Random(size int) []byte {
func generateCrc32Random(t *testing.T, size int) []byte {
msg := make([]byte, size+8)
if _, err := io.ReadFull(devrand.Reader, msg); err != nil {
panic(err)
t.Fatalf("could not get random data: %+v", err)
}

crc := crc32.NewIEEE()
Expand Down
23 changes: 23 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2022 VMware, Inc. or its affiliates. All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package amqp091

type Logging interface {
Printf(format string, v ...interface{})
}

var Logger Logging = NullLogger{}

// Enables logging using a custom Logging instance. Note that this is
// not thread safe and should be called at application start
func SetLogger(logger Logging) {
Logger = logger
}

type NullLogger struct {
}

func (l NullLogger) Printf(format string, v ...interface{}) {
}
14 changes: 7 additions & 7 deletions tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
"time"
)

func tlsServerConfig() *tls.Config {
func tlsServerConfig(t *testing.T) *tls.Config {
cfg := new(tls.Config)

cfg.ClientCAs = x509.NewCertPool()
cfg.ClientCAs.AppendCertsFromPEM([]byte(caCert))

cert, err := tls.X509KeyPair([]byte(serverCert), []byte(serverKey))
if err != nil {
panic(err)
t.Fatalf("TLS server config error: %+v", err)
}

cfg.Certificates = append(cfg.Certificates, cert)
Expand All @@ -32,14 +32,14 @@ func tlsServerConfig() *tls.Config {
return cfg
}

func tlsClientConfig() *tls.Config {
func tlsClientConfig(t *testing.T) *tls.Config {
cfg := new(tls.Config)
cfg.RootCAs = x509.NewCertPool()
cfg.RootCAs.AppendCertsFromPEM([]byte(caCert))

cert, err := tls.X509KeyPair([]byte(clientCert), []byte(clientKey))
if err != nil {
panic(err)
t.Fatalf("TLS client config error: %+v", err)
}

cfg.Certificates = append(cfg.Certificates, cert)
Expand Down Expand Up @@ -68,7 +68,7 @@ func (s *tlsServer) Serve(t *testing.T) {
func startTLSServer(t *testing.T, cfg *tls.Config) tlsServer {
l, err := tls.Listen("tcp", "127.0.0.1:0", cfg)
if err != nil {
panic(err)
t.Fatalf("TLS server Listen error: %+v", err)
}

s := tlsServer{
Expand All @@ -84,7 +84,7 @@ func startTLSServer(t *testing.T, cfg *tls.Config) tlsServer {

// Tests opening a connection of a TLS enabled socket server
func TestTLSHandshake(t *testing.T) {
srv := startTLSServer(t, tlsServerConfig())
srv := startTLSServer(t, tlsServerConfig(t))
defer srv.Close()

success := make(chan bool)
Expand All @@ -102,7 +102,7 @@ func TestTLSHandshake(t *testing.T) {
}()

go func() {
c, err := DialTLS(srv.URL, tlsClientConfig())
c, err := DialTLS(srv.URL, tlsClientConfig(t))
if err != nil {
errs <- fmt.Errorf("expected to open a TLS connection, got err: %v", err)
}
Expand Down

0 comments on commit b068367

Please sign in to comment.