Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a CloseDeadline function to Connection #181

Merged
merged 5 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ jobs:
go-version: ${{ matrix.go-version }}
check-latest: true
- name: Tests
env:
RABBITMQ_RABBITMQCTL_PATH: DOCKER:${{ job.services.rabbitmq.id }}
run: make check-fmt tests
18 changes: 16 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ Here is the recommended workflow:
1. Run Static Checks
1. Run integration tests (see below)
1. **Implement tests**
1. Implement fixs
1. Commit your changes (`git commit -am 'Add some feature'`)
1. Implement fixes
1. Commit your changes. Use a [good, descriptive, commit message][good-commit].
1. Push to a branch (`git push -u origin my-new-feature`)
1. Submit a pull request

[good-commit]: https://cbea.ms/git-commit/

## Running Static Checks

golangci-lint must be installed to run the static checks. See [installation
Expand Down Expand Up @@ -43,6 +45,18 @@ The integration tests can be run via:
make tests
```

Some tests require access to `rabbitmqctl` CLI. Use the environment variable
`RABBITMQ_RABBITMQCTL_PATH=/some/path/to/rabbitmqctl` to run those tests.

If you have Docker available in your machine, you can run:

```shell
make tests-docker
```

This target will start a RabbitMQ container, run the test suite with the environment
variable setup, and stop RabbitMQ container after a successful run.

All integration tests should use the `integrationConnection(...)` test
helpers defined in `integration_test.go` to setup the integration environment
and logging.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ fmt: ## Run go fmt against code
tests: ## Run all tests and requires a running rabbitmq-server. Use GO_TEST_FLAGS to add extra flags to go test
go test -race -v -tags integration $(GO_TEST_FLAGS)

.PHONY: tests-docker
tests-docker: rabbitmq-server
RABBITMQ_RABBITMQCTL_PATH="DOCKER:$(CONTAINER_NAME)" go test -race -v -tags integration $(GO_TEST_FLAGS)
$(MAKE) stop-rabbitmq-server

.PHONY: check
check:
golangci-lint run ./...
Expand Down
47 changes: 47 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,47 @@ func (c *Connection) Close() error {
)
}

// CloseDeadline requests and waits for the response to close this AMQP connection.
//
// Accepts a deadline for waiting the server response. The deadline is passed
// to the low-level connection i.e. network socket.
//
// Regardless of the error returned, the connection is considered closed, and it
// should not be used after calling this function.
//
// In the event of an I/O timeout, connection-closed listeners are NOT informed.
//
// After returning from this call, all resources associated with this connection,
// including the underlying io, Channels, Notify listeners and Channel consumers
// will also be closed.
func (c *Connection) CloseDeadline(deadline time.Time) error {
if c.IsClosed() {
return ErrClosed
}

defer c.shutdown(nil)

err := c.setDeadline(deadline)
if err != nil {
return err
}

return c.call(
&connectionClose{
ReplyCode: replySuccess,
ReplyText: "kthxbai",
},
&connectionCloseOk{},
)
}

func (c *Connection) closeWith(err *Error) error {
if c.IsClosed() {
return ErrClosed
}

defer c.shutdown(err)

return c.call(
&connectionClose{
ReplyCode: uint16(err.Code),
Expand All @@ -420,6 +455,18 @@ func (c *Connection) IsClosed() bool {
return atomic.LoadInt32(&c.closed) == 1
}

// setDeadline is a wrapper to type assert Connection.conn and set an I/O
// deadline in the underlying TCP connection socket, by calling
// net.Conn.SetDeadline(). It returns an error, in case the type assertion fails,
// although this should never happen.
func (c *Connection) setDeadline(t time.Time) error {
con, ok := c.conn.(net.Conn)
if !ok {
return errInvalidTypeAssertion
}
return con.SetDeadline(t)
}

func (c *Connection) send(f frame) error {
if c.IsClosed() {
return ErrClosed
Expand Down
59 changes: 59 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@
package amqp091

import (
"context"
"crypto/tls"
"net"
"os"
"os/exec"
"regexp"
"strings"
"sync"
"testing"
"time"
)

const rabbitmqctlEnvKey = "RABBITMQ_RABBITMQCTL_PATH"

func TestRequiredServerLocale(t *testing.T) {
conn := integrationConnection(t, "AMQP 0-9-1 required server locale")
t.Cleanup(func() { conn.Close() })
Expand Down Expand Up @@ -332,3 +338,56 @@ func TestNewConnectionProperties_HasDefaultProperties(t *testing.T) {
t.Fatalf("Version in NewConnectionProperties is not a valid semver value: %s", version)
}
}

// Connection and channels should be closeable when a memory alarm is active.
// https://github.com/rabbitmq/amqp091-go/issues/178
func TestConnection_Close_WhenMemoryAlarmIsActive(t *testing.T) {
err := rabbitmqctl(t, "set_vm_memory_high_watermark", "0.0001")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = rabbitmqctl(t, "set_vm_memory_high_watermark", "0.4")
conn, ch := integrationQueue(t, t.Name())
integrationQueueDelete(t, ch, t.Name())
_ = ch.Close()
_ = conn.Close()
})

conn, ch := integrationQueue(t, t.Name())

go func() {
// simulate a producer
// required to block the connection
_ = ch.PublishWithContext(context.Background(), "", t.Name(), false, false, Publishing{
Body: []byte("this is a test"),
})
}()
<-time.After(time.Second * 1)

err = conn.CloseDeadline(time.Now().Add(time.Second * 2))
if err == nil {
t.Fatal("expected error, got nil")
}
if !conn.IsClosed() {
t.Fatal("expected connection to be closed")
}
}

func rabbitmqctl(t *testing.T, args ...string) error {
rabbitmqctlPath, found := os.LookupEnv(rabbitmqctlEnvKey)
if !found {
t.Skipf("variable for %s for rabbitmqctl not found, skipping", rabbitmqctlEnvKey)
}

var cmd *exec.Cmd
if strings.HasPrefix(rabbitmqctlPath, "DOCKER:") {
containerName := strings.Split(rabbitmqctlPath, ":")[1]
cmd = exec.Command("docker", "exec", containerName, "rabbitmqctl")
cmd.Args = append(cmd.Args, args...)
} else {
cmd = exec.Command(rabbitmqctlPath, args...)
}

return cmd.Run()
}
9 changes: 1 addition & 8 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2008,13 +2008,6 @@ func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) {
* Support for integration tests
*/

func loggedConnection(t *testing.T, conn *Connection, name string) *Connection {
if name != "" {
conn.conn = &logIO{t, name, conn.conn}
}
return conn
}

// Returns a connection to the AMQP if the AMQP_URL environment
// variable is set and a connection can be established.
func integrationConnection(t *testing.T, name string) *Connection {
Expand All @@ -2023,7 +2016,7 @@ func integrationConnection(t *testing.T, name string) *Connection {
t.Fatalf("cannot dial integration server. Is the rabbitmq-server service running? %s", err)
return nil
}
return loggedConnection(t, conn, name)
return conn
}

// Returns a connection, channel and declares a queue when the AMQP_URL is in the environment
Expand Down
5 changes: 5 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ var (
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
)

// internal errors used inside the library
var (
errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true}
)

// Error captures the code and reason a channel or connection has been closed
// by the server.
type Error struct {
Expand Down