From 3b21050e3fd57e3f316d71297d07bd6272f6e251 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Fri, 25 Mar 2022 11:21:02 +0100 Subject: [PATCH 1/4] Some unit tests in connection were showing a race condition (see #31): These ones were the ones testing Open scenarios. The issue is that Open and Close, rwc.Open and rwc.Close can at the same time write on: c.allocator = newAllocator(1, c.Config.ChannelMax) connection.go line 444 and connection.go line 849 while shutdown is protected by the structure mutex m, OpenComplete() is not causing the race. While it's not clear if the library should protect this eventuality, the tests are testing the Open function, so I think the close can be put in the main thread avoiding the race and not affecting the test validity --- client_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/client_test.go b/client_test.go index 748ff14..9c561d1 100644 --- a/client_test.go +++ b/client_test.go @@ -218,7 +218,7 @@ func TestDefaultClientProperties(t *testing.T) { go func() { srv.connectionOpen() - rwc.Close() + }() if c, err := Open(rwc, defaultConfig()); err != nil { @@ -229,13 +229,14 @@ func TestDefaultClientProperties(t *testing.T) { t.Errorf("expected product %s got: %s", want, got) } - if want, got := defaultVersion, srv.start.ClientProperties["version"]; want != got { + if want, got := buildVersion, srv.start.ClientProperties["version"]; want != got { t.Errorf("expected version %s got: %s", want, got) } if want, got := defaultLocale, srv.start.Locale; want != got { t.Errorf("expected locale %s got: %s", want, got) } + rwc.Close() } func TestCustomClientProperties(t *testing.T) { @@ -249,7 +250,7 @@ func TestCustomClientProperties(t *testing.T) { go func() { srv.connectionOpen() - rwc.Close() + }() if c, err := Open(rwc, config); err != nil { @@ -263,18 +264,21 @@ func TestCustomClientProperties(t *testing.T) { if want, got := config.Properties["version"], srv.start.ClientProperties["version"]; want != got { t.Errorf("expected version %s got: %s", want, got) } + + rwc.Close() } func TestOpen(t *testing.T) { rwc, srv := newSession(t) go func() { srv.connectionOpen() - rwc.Close() + }() if c, err := Open(rwc, defaultConfig()); err != nil { t.Fatalf("could not create connection: %v (%s)", c, err) } + rwc.Close() } func TestChannelOpen(t *testing.T) { @@ -326,7 +330,7 @@ func TestOpenAMQPlainAuth(t *testing.T) { srv.recv(0, &connectionOpen{}) srv.send(0, &connectionOpenOk{}) - rwc.Close() + auth <- table }() @@ -340,6 +344,7 @@ func TestOpenAMQPlainAuth(t *testing.T) { if table["PASSWORD"] != defaultPassword { t.Fatalf("unexpected password: want: %s, got: %s", defaultPassword, table["PASSWORD"]) } + rwc.Close() } func TestOpenFailedCredentials(t *testing.T) { From f781c65b0fac4bd1347941b3f1f5ca56ba91c937 Mon Sep 17 00:00:00 2001 From: Daniele Date: Fri, 18 Mar 2022 13:53:47 +0100 Subject: [PATCH 2/4] adding integration test for issue 11 (#50) Co-authored-by: Daniele Palaia --- integration_test.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/integration_test.go b/integration_test.go index 2fcdd11..70ad414 100644 --- a/integration_test.go +++ b/integration_test.go @@ -3,6 +3,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +//go:build integration // +build integration package amqp091 @@ -1909,6 +1910,31 @@ func assertConsumeBody(t *testing.T, messages <-chan Delivery, want []byte) (msg return msg } +// https://github.com/rabbitmq/amqp091-go/issues/11 +func TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11(t *testing.T) { + conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedNewChannelCreatedIssue11") + ch, err := conn.Channel() + if err != nil { + t.Fatalf("channel error: %v", err) + } + + conn.NotifyClose(make(chan *Error, 1)) + + _, err = ch.PublishWithDeferredConfirm("issue11", "issue11", false, false, Publishing{Body: []byte("abc")}) + if err != nil { + t.Fatalf("PublishWithDeferredConfirm error: %v", err) + } + + ch.Close() + conn.Close() + + ch, err = conn.Channel() + if err == nil { + t.Fatalf("Opening a channel from a closed connection should not block but returning an error %v", err) + } + +} + // Pulls out the CRC and verifies the remaining content against the CRC func assertMessageCrc32(t *testing.T, msg []byte, assert string) { size := binary.BigEndian.Uint32(msg[:4]) From 9bfc51b6dda5c27990d3a6bf749df000476eacc9 Mon Sep 17 00:00:00 2001 From: Daniele Date: Thu, 24 Mar 2022 15:57:07 +0100 Subject: [PATCH 3/4] changing defaultVersion to buildVersion and create a simple change_version file to manage versioning (#54) Co-authored-by: Daniele Palaia --- change_version.sh | 4 ++++ connection.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) create mode 100755 change_version.sh diff --git a/change_version.sh b/change_version.sh new file mode 100755 index 0000000..a51fbb0 --- /dev/null +++ b/change_version.sh @@ -0,0 +1,4 @@ +#/bin/bash +echo $1 > VERSION +sed -i -e "s/.*buildVersion = \"*.*/buildVersion = \"$1\"/" ./connection.go +go fmt ./... diff --git a/connection.go b/connection.go index dc3d066..f4e706e 100644 --- a/connection.go +++ b/connection.go @@ -24,7 +24,7 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second defaultProduct = "Amqp 0.9.1 Client" - defaultVersion = "β" + buildVersion = "1.3.0" platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. @@ -759,7 +759,7 @@ func (c *Connection) openTune(config Config, auth Authentication) error { if len(config.Properties) == 0 { config.Properties = Table{ "product": defaultProduct, - "version": defaultVersion, + "version": buildVersion, "platform": platform, } } From 6d321497bc1d41c809641b847201d49d6ceaaf55 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Fri, 25 Mar 2022 14:22:49 +0100 Subject: [PATCH 4/4] using t.Cleanup() to close the connection --- client_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/client_test.go b/client_test.go index 9c561d1..19e7ce0 100644 --- a/client_test.go +++ b/client_test.go @@ -236,7 +236,7 @@ func TestDefaultClientProperties(t *testing.T) { if want, got := defaultLocale, srv.start.Locale; want != got { t.Errorf("expected locale %s got: %s", want, got) } - rwc.Close() + t.Cleanup(func() { rwc.Close() }) } func TestCustomClientProperties(t *testing.T) { @@ -265,7 +265,7 @@ func TestCustomClientProperties(t *testing.T) { t.Errorf("expected version %s got: %s", want, got) } - rwc.Close() + t.Cleanup(func() { rwc.Close() }) } func TestOpen(t *testing.T) { @@ -278,7 +278,7 @@ func TestOpen(t *testing.T) { if c, err := Open(rwc, defaultConfig()); err != nil { t.Fatalf("could not create connection: %v (%s)", c, err) } - rwc.Close() + t.Cleanup(func() { rwc.Close() }) } func TestChannelOpen(t *testing.T) { @@ -288,7 +288,6 @@ func TestChannelOpen(t *testing.T) { srv.connectionOpen() srv.channelOpen(1) - rwc.Close() }() c, err := Open(rwc, defaultConfig()) @@ -300,6 +299,8 @@ func TestChannelOpen(t *testing.T) { if err != nil { t.Fatalf("could not open channel: %v (%s)", ch, err) } + + t.Cleanup(func() { rwc.Close() }) } func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) { @@ -314,6 +315,7 @@ func TestOpenFailedSASLUnsupportedMechanisms(t *testing.T) { if err != ErrSASL { t.Fatalf("expected ErrSASL got: %+v on %+v", err, c) } + t.Cleanup(func() { rwc.Close() }) } func TestOpenAMQPlainAuth(t *testing.T) { @@ -344,7 +346,7 @@ func TestOpenAMQPlainAuth(t *testing.T) { if table["PASSWORD"] != defaultPassword { t.Fatalf("unexpected password: want: %s, got: %s", defaultPassword, table["PASSWORD"]) } - rwc.Close() + t.Cleanup(func() { rwc.Close() }) } func TestOpenFailedCredentials(t *testing.T) { @@ -361,6 +363,7 @@ func TestOpenFailedCredentials(t *testing.T) { if err != ErrCredentials { t.Fatalf("expected ErrCredentials got: %+v on %+v", err, c) } + } func TestOpenFailedVhost(t *testing.T) {