diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c4ccb56..e04781b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -49,4 +49,6 @@ jobs: go-version: ${{ matrix.go-version }} check-latest: true - name: Tests + env: + RABBITMQ_RABBITMQCTL_PATH: DOCKER:${{ job.services.rabbitmq.id }} run: make check-fmt tests diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index ed1b971..ec86fe5 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -9,11 +9,13 @@ Here is the recommended workflow: 1. Run Static Checks 1. Run integration tests (see below) 1. **Implement tests** -1. Implement fixs -1. Commit your changes (`git commit -am 'Add some feature'`) +1. Implement fixes +1. Commit your changes. Use a [good, descriptive, commit message][good-commit]. 1. Push to a branch (`git push -u origin my-new-feature`) 1. Submit a pull request +[good-commit]: https://cbea.ms/git-commit/ + ## Running Static Checks golangci-lint must be installed to run the static checks. See [installation @@ -43,6 +45,18 @@ The integration tests can be run via: make tests ``` +Some tests require access to `rabbitmqctl` CLI. Use the environment variable +`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests. + +If you have Docker available in your machine, you can run: + +```shell +make tests-docker +``` + +This target will start a RabbitMQ container, run the test suite with the environment +variable setup, and stop RabbitMQ container after a successful run. + All integration tests should use the `integrationConnection(...)` test helpers defined in `integration_test.go` to setup the integration environment and logging. diff --git a/Makefile b/Makefile index 7342731..69e9e2b 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,11 @@ fmt: ## Run go fmt against code tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test go test -race -v -tags integration $(GO_TEST_FLAGS) +.PHONY: tests-docker +tests-docker: rabbitmq-server + RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS) + $(MAKE) stop-rabbitmq-server + .PHONY: check check: golangci-lint run ./... diff --git a/connection.go b/connection.go index def2260..abe4b02 100644 --- a/connection.go +++ b/connection.go @@ -399,12 +399,47 @@ func (c *Connection) Close() error { ) } +// CloseDeadline requests and waits for the response to close this AMQP connection. +// +// Accepts a deadline for waiting the server response. The deadline is passed +// to the low-level connection i.e. network socket. +// +// Regardless of the error returned, the connection is considered closed, and it +// should not be used after calling this function. +// +// In the event of an I/O timeout, connection-closed listeners are NOT informed. +// +// After returning from this call, all resources associated with this connection, +// including the underlying io, Channels, Notify listeners and Channel consumers +// will also be closed. +func (c *Connection) CloseDeadline(deadline time.Time) error { + if c.IsClosed() { + return ErrClosed + } + + defer c.shutdown(nil) + + err := c.setDeadline(deadline) + if err != nil { + return err + } + + return c.call( + &connectionClose{ + ReplyCode: replySuccess, + ReplyText: "kthxbai", + }, + &connectionCloseOk{}, + ) +} + func (c *Connection) closeWith(err *Error) error { if c.IsClosed() { return ErrClosed } defer c.shutdown(err) + return c.call( &connectionClose{ ReplyCode: uint16(err.Code), @@ -420,6 +455,18 @@ func (c *Connection) IsClosed() bool { return atomic.LoadInt32(&c.closed) == 1 } +// setDeadline is a wrapper to type assert Connection.conn and set an I/O +// deadline in the underlying TCP connection socket, by calling +// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails, +// although this should never happen. +func (c *Connection) setDeadline(t time.Time) error { + con, ok := c.conn.(net.Conn) + if !ok { + return errInvalidTypeAssertion + } + return con.SetDeadline(t) +} + func (c *Connection) send(f frame) error { if c.IsClosed() { return ErrClosed diff --git a/connection_test.go b/connection_test.go index 1f61fc5..7eb7c30 100644 --- a/connection_test.go +++ b/connection_test.go @@ -9,14 +9,20 @@ package amqp091 import ( + "context" "crypto/tls" "net" + "os" + "os/exec" "regexp" + "strings" "sync" "testing" "time" ) +const rabbitmqctlEnvKey = "RABBITMQ_RABBITMQCTL_PATH" + func TestRequiredServerLocale(t *testing.T) { conn := integrationConnection(t, "AMQP 0-9-1 required server locale") t.Cleanup(func() { conn.Close() }) @@ -332,3 +338,56 @@ func TestNewConnectionProperties_HasDefaultProperties(t *testing.T) { t.Fatalf("Version in NewConnectionProperties is not a valid semver value: %s", version) } } + +// Connection and channels should be closeable when a memory alarm is active. +// https://github.com/rabbitmq/amqp091-go/issues/178 +func TestConnection_Close_WhenMemoryAlarmIsActive(t *testing.T) { + err := rabbitmqctl(t, "set_vm_memory_high_watermark", "0.0001") + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + _ = rabbitmqctl(t, "set_vm_memory_high_watermark", "0.4") + conn, ch := integrationQueue(t, t.Name()) + integrationQueueDelete(t, ch, t.Name()) + _ = ch.Close() + _ = conn.Close() + }) + + conn, ch := integrationQueue(t, t.Name()) + + go func() { + // simulate a producer + // required to block the connection + _ = ch.PublishWithContext(context.Background(), "", t.Name(), false, false, Publishing{ + Body: []byte("this is a test"), + }) + }() + <-time.After(time.Second * 1) + + err = conn.CloseDeadline(time.Now().Add(time.Second * 2)) + if err == nil { + t.Fatal("expected error, got nil") + } + if !conn.IsClosed() { + t.Fatal("expected connection to be closed") + } +} + +func rabbitmqctl(t *testing.T, args ...string) error { + rabbitmqctlPath, found := os.LookupEnv(rabbitmqctlEnvKey) + if !found { + t.Skipf("variable for %s for rabbitmqctl not found, skipping", rabbitmqctlEnvKey) + } + + var cmd *exec.Cmd + if strings.HasPrefix(rabbitmqctlPath, "DOCKER:") { + containerName := strings.Split(rabbitmqctlPath, ":")[1] + cmd = exec.Command("docker", "exec", containerName, "rabbitmqctl") + cmd.Args = append(cmd.Args, args...) + } else { + cmd = exec.Command(rabbitmqctlPath, args...) + } + + return cmd.Run() +} diff --git a/integration_test.go b/integration_test.go index 89858d9..d9ec51f 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2008,13 +2008,6 @@ func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) { * Support for integration tests */ -func loggedConnection(t *testing.T, conn *Connection, name string) *Connection { - if name != "" { - conn.conn = &logIO{t, name, conn.conn} - } - return conn -} - // Returns a connection to the AMQP if the AMQP_URL environment // variable is set and a connection can be established. func integrationConnection(t *testing.T, name string) *Connection { @@ -2023,7 +2016,7 @@ func integrationConnection(t *testing.T, name string) *Connection { t.Fatalf("cannot dial integration server. Is the rabbitmq-server service running? %s", err) return nil } - return loggedConnection(t, conn, name) + return conn } // Returns a connection, channel and declares a queue when the AMQP_URL is in the environment diff --git a/types.go b/types.go index 427eefb..e8d8986 100644 --- a/types.go +++ b/types.go @@ -63,6 +63,11 @@ var ( ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"} ) +// internal errors used inside the library +var ( + errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true} +) + // Error captures the code and reason a channel or connection has been closed // by the server. type Error struct {