From fdcaa26aab45209772f266b297907ad51ff4e7a1 Mon Sep 17 00:00:00 2001 From: Mahmud Ridwan Date: Tue, 23 Jan 2024 20:58:42 +0600 Subject: [PATCH 1/3] Use correct mutex --- channel.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/channel.go b/channel.go index 09ce37e..96ebd0b 100644 --- a/channel.go +++ b/channel.go @@ -1826,8 +1826,8 @@ func (ch *Channel) Reject(tag uint64, requeue bool) error { // GetNextPublishSeqNo returns the sequence number of the next message to be // published, when in confirm mode. func (ch *Channel) GetNextPublishSeqNo() uint64 { - ch.confirms.m.Lock() - defer ch.confirms.m.Unlock() + ch.confirms.publishedMut.Lock() + defer ch.confirms.publishedMut.Unlock() return ch.confirms.published + 1 } From 936f704f88301b2888b62c4938edd620cef09a7f Mon Sep 17 00:00:00 2001 From: Mahmud Ridwan Date: Tue, 23 Jan 2024 21:21:13 +0600 Subject: [PATCH 2/3] Add test --- integration_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/integration_test.go b/integration_test.go index f92d788..80ab7b3 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2025,6 +2025,57 @@ func TestIntegrationGetNextPublishSeqNo(t *testing.T) { } } +func TestIntegrationGetNextPublishSeqNoRace(t *testing.T) { + if c := integrationConnection(t, "GetNextPublishSeqNoRace"); c != nil { + defer c.Close() + + ch, err := c.Channel() + if err != nil { + t.Fatalf("channel: %v", err) + } + + if err = ch.Confirm(false); err != nil { + t.Fatalf("could not confirm") + } + + ex := "test-get-next-pub" + if err = ch.ExchangeDeclare(ex, "direct", false, false, false, false, nil); err != nil { + t.Fatalf("cannot declare %v: got: %v", ex, err) + } + + n := ch.GetNextPublishSeqNo() + if n != 1 { + t.Fatalf("wrong next publish seqence number before any publish, expected: %d, got: %d", 1, n) + } + + wg := sync.WaitGroup{} + + wg.Add(2) + + go func() { + defer wg.Done() + n := ch.GetNextPublishSeqNo() + if n <= 0 { + t.Fatalf("wrong next publish seqence number, expected: > %d, got: %d", 0, n) + } + }() + + go func() { + defer wg.Done() + if err := ch.PublishWithContext(context.TODO(), "test-get-next-pub-seq", "", false, false, Publishing{}); err != nil { + t.Fatalf("publish error: %v", err) + } + }() + + wg.Wait() + + n = ch.GetNextPublishSeqNo() + if n != 2 { + t.Fatalf("wrong next publish seqence number after 15 publishing, expected: %d, got: %d", 2, n) + } + } +} + // https://github.com/rabbitmq/amqp091-go/pull/44 func TestShouldNotWaitAfterConnectionClosedIssue44(t *testing.T) { conn := integrationConnection(t, "TestShouldNotWaitAfterConnectionClosedIssue44") From 977c4d20faa7ec9d287741e1016ba760d8ef57d0 Mon Sep 17 00:00:00 2001 From: Mahmud Ridwan Date: Tue, 23 Jan 2024 23:28:32 +0600 Subject: [PATCH 3/3] Fix lint issues --- integration_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/integration_test.go b/integration_test.go index 80ab7b3..50c6507 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2049,25 +2049,27 @@ func TestIntegrationGetNextPublishSeqNoRace(t *testing.T) { } wg := sync.WaitGroup{} + fail := false wg.Add(2) go func() { defer wg.Done() - n := ch.GetNextPublishSeqNo() - if n <= 0 { - t.Fatalf("wrong next publish seqence number, expected: > %d, got: %d", 0, n) - } + _ = ch.GetNextPublishSeqNo() }() go func() { defer wg.Done() if err := ch.PublishWithContext(context.TODO(), "test-get-next-pub-seq", "", false, false, Publishing{}); err != nil { - t.Fatalf("publish error: %v", err) + t.Logf("publish error: %v", err) + fail = true } }() wg.Wait() + if fail { + t.FailNow() + } n = ch.GetNextPublishSeqNo() if n != 2 {