From 0764ff48e926bfd2812c5097adc10a145c7876bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aitor=20P=C3=A9rez=20Cedres?= Date: Tue, 3 Jan 2023 16:14:44 +0000 Subject: [PATCH] Add example producer using DeferredConfirm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Aitor Pérez Cedres Start tracking outstanding confirms Producer example keeps a window of outstanding confirms Update example with name conventions Following Go naming conventions. Change Dial function to use DialConfig instead, since we are creating a Config struct. Signed-off-by: Aitor Pérez Cedres Better naming, better way to check confirms when exiting confirm handler The producer example should be one that handles confirms Always check exit at start of confirm handler --- .../{simple-consumer => consumer}/consumer.go | 0 _examples/producer/producer.go | 274 ++++++++++++++++++ _examples/simple-producer/producer.go | 174 ----------- 3 files changed, 274 insertions(+), 174 deletions(-) rename _examples/{simple-consumer => consumer}/consumer.go (100%) create mode 100644 _examples/producer/producer.go delete mode 100644 _examples/simple-producer/producer.go diff --git a/_examples/simple-consumer/consumer.go b/_examples/consumer/consumer.go similarity index 100% rename from _examples/simple-consumer/consumer.go rename to _examples/consumer/consumer.go diff --git a/_examples/producer/producer.go b/_examples/producer/producer.go new file mode 100644 index 0000000..c92c6fa --- /dev/null +++ b/_examples/producer/producer.go @@ -0,0 +1,274 @@ +// This example declares a durable exchange, and publishes one messages to that +// exchange. This example allows up to 8 outstanding publisher confirmations +// before blocking publishing. +package main + +import ( + "context" + "flag" + amqp "github.com/rabbitmq/amqp091-go" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +var ( + uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI") + exchange = flag.String("exchange", "test-exchange", "Durable AMQP exchange name") + exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom") + queue = flag.String("queue", "test-queue", "Ephemeral AMQP queue name") + routingKey = flag.String("key", "test-key", "AMQP routing key") + body = flag.String("body", "foobar", "Body of message") + continuous = flag.Bool("continuous", false, "Keep publishing messages at a 1msg/sec rate") + WarnLog = log.New(os.Stderr, "[WARNING] ", log.LstdFlags|log.Lmsgprefix) + ErrLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmsgprefix) + Log = log.New(os.Stdout, "[INFO] ", log.LstdFlags|log.Lmsgprefix) +) + +func init() { + flag.Parse() +} + +func main() { + exitCh := make(chan struct{}) + confirmsCh := make(chan *amqp.DeferredConfirmation) + confirmsDoneCh := make(chan struct{}) + // Note: this is a buffered channel so that indicating OK to + // publish does not block the confirm handler + publishOkCh := make(chan struct{}, 1) + + setupCloseHandler(exitCh) + + startConfirmHandler(publishOkCh, confirmsCh, confirmsDoneCh, exitCh) + + publish(context.Background(), publishOkCh, confirmsCh, confirmsDoneCh, exitCh) +} + +func setupCloseHandler(exitCh chan struct{}) { + c := make(chan os.Signal, 2) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + Log.Printf("close handler: Ctrl+C pressed in Terminal") + close(exitCh) + }() +} + +func publish(ctx context.Context, publishOkCh <-chan struct{}, confirmsCh chan<- *amqp.DeferredConfirmation, confirmsDoneCh <-chan struct{}, exitCh chan struct{}) { + config := amqp.Config{ + Vhost: "/", + Properties: amqp.NewConnectionProperties(), + } + config.Properties.SetClientConnectionName("producer-with-confirms") + + Log.Printf("producer: dialing %s", *uri) + conn, err := amqp.DialConfig(*uri, config) + if err != nil { + ErrLog.Fatalf("producer: error in dial: %s", err) + } + defer conn.Close() + + Log.Println("producer: got Connection, getting Channel") + channel, err := conn.Channel() + if err != nil { + ErrLog.Fatalf("error getting a channel: %s", err) + } + defer channel.Close() + + Log.Printf("producer: declaring exchange") + if err := channel.ExchangeDeclare( + *exchange, // name + *exchangeType, // type + true, // durable + false, // auto-delete + false, // internal + false, // noWait + nil, // arguments + ); err != nil { + ErrLog.Fatalf("producer: Exchange Declare: %s", err) + } + + Log.Printf("producer: declaring queue '%s'", *queue) + queue, err := channel.QueueDeclare( + *queue, // name of the queue + true, // durable + false, // delete when unused + false, // exclusive + false, // noWait + nil, // arguments + ) + if err == nil { + Log.Printf("producer: declared queue (%q %d messages, %d consumers), binding to Exchange (key %q)", + queue.Name, queue.Messages, queue.Consumers, *routingKey) + } else { + ErrLog.Fatalf("producer: Queue Declare: %s", err) + } + + Log.Printf("producer: declaring binding") + if err := channel.QueueBind(queue.Name, *routingKey, *exchange, false, nil); err != nil { + ErrLog.Fatalf("producer: Queue Bind: %s", err) + } + + // Reliable publisher confirms require confirm.select support from the + // connection. + Log.Printf("producer: enabling publisher confirms.") + if err := channel.Confirm(false); err != nil { + ErrLog.Fatalf("producer: channel could not be put into confirm mode: %s", err) + } + + for { + canPublish := false + Log.Println("producer: waiting on the OK to publish...") + for { + select { + case <-confirmsDoneCh: + Log.Println("producer: stopping, all confirms seen") + return + case <-publishOkCh: + Log.Println("producer: got the OK to publish") + canPublish = true + break + case <-time.After(time.Second): + WarnLog.Println("producer: still waiting on the OK to publish...") + continue + } + if canPublish { + break + } + } + + Log.Printf("producer: publishing %dB body (%q)", len(*body), *body) + dConfirmation, err := channel.PublishWithDeferredConfirmWithContext( + ctx, + *exchange, + *routingKey, + true, + false, + amqp.Publishing{ + Headers: amqp.Table{}, + ContentType: "text/plain", + ContentEncoding: "", + DeliveryMode: amqp.Persistent, + Priority: 0, + AppId: "sequential-producer", + Body: []byte(*body), + }, + ) + if err != nil { + ErrLog.Fatalf("producer: error in publish: %s", err) + } + + select { + case <-confirmsDoneCh: + Log.Println("producer: stopping, all confirms seen") + return + case confirmsCh <- dConfirmation: + Log.Println("producer: delivered deferred confirm to handler") + break + } + + select { + case <-confirmsDoneCh: + Log.Println("producer: stopping, all confirms seen") + return + case <-time.After(time.Millisecond * 250): + if *continuous { + continue + } else { + Log.Println("producer: initiating stop") + close(exitCh) + select { + case <-confirmsDoneCh: + Log.Println("producer: stopping, all confirms seen") + return + case <-time.After(time.Second * 10): + WarnLog.Println("producer: may be stopping with outstanding confirmations") + return + } + } + } + } +} + +func startConfirmHandler(publishOkCh chan<- struct{}, confirmsCh <-chan *amqp.DeferredConfirmation, confirmsDoneCh chan struct{}, exitCh <-chan struct{}) { + go func() { + confirms := make(map[uint64]*amqp.DeferredConfirmation) + + for { + select { + case <-exitCh: + exitConfirmHandler(confirms, confirmsDoneCh) + return + default: + break + } + + outstandingConfirmationCount := len(confirms) + + // Note: 8 is arbitrary, you may wish to allow more outstanding confirms before blocking publish + if outstandingConfirmationCount <= 8 { + select { + case publishOkCh <- struct{}{}: + Log.Println("confirm handler: sent OK to publish") + case <-time.After(time.Second * 5): + WarnLog.Println("confirm handler: timeout indicating OK to publish (this should never happen!)") + } + } else { + WarnLog.Printf("confirm handler: waiting on %d outstanding confirmations, blocking publish", outstandingConfirmationCount) + } + + select { + case confirmation := <-confirmsCh: + dtag := confirmation.DeliveryTag + confirms[dtag] = confirmation + case <-exitCh: + exitConfirmHandler(confirms, confirmsDoneCh) + return + } + + checkConfirmations(confirms) + } + }() +} + +func exitConfirmHandler(confirms map[uint64]*amqp.DeferredConfirmation, confirmsDoneCh chan struct{}) { + Log.Println("confirm handler: exit requested") + waitConfirmations(confirms) + close(confirmsDoneCh) + Log.Println("confirm handler: exiting") +} + +func checkConfirmations(confirms map[uint64]*amqp.DeferredConfirmation) { + Log.Printf("confirm handler: checking %d outstanding confirmations", len(confirms)) + for k, v := range confirms { + if v.Acked() { + Log.Printf("confirm handler: confirmed delivery with tag: %d", k) + delete(confirms, k) + } + } +} + +func waitConfirmations(confirms map[uint64]*amqp.DeferredConfirmation) { + Log.Printf("confirm handler: waiting on %d outstanding confirmations", len(confirms)) + + checkConfirmations(confirms) + + for k, v := range confirms { + select { + case <-v.Done(): + Log.Printf("confirm handler: confirmed delivery with tag: %d", k) + delete(confirms, k) + case <-time.After(time.Second): + WarnLog.Printf("confirm handler: did not receive confirmation for tag %d", k) + } + } + + outstandingConfirmationCount := len(confirms) + if outstandingConfirmationCount > 0 { + ErrLog.Printf("confirm handler: exiting with %d outstanding confirmations", outstandingConfirmationCount) + } else { + Log.Println("confirm handler: done waiting on outstanding confirmations") + } +} diff --git a/_examples/simple-producer/producer.go b/_examples/simple-producer/producer.go deleted file mode 100644 index 86a003e..0000000 --- a/_examples/simple-producer/producer.go +++ /dev/null @@ -1,174 +0,0 @@ -// This example declares a durable Exchange, and publishes a single message to -// that Exchange with a given routing key. -package main - -import ( - "context" - "flag" - "fmt" - "log" - "os" - "os/signal" - "syscall" - "time" - - amqp "github.com/rabbitmq/amqp091-go" -) - -var ( - uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI") - exchangeName = flag.String("exchange", "test-exchange", "Durable AMQP exchange name") - exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom") - routingKey = flag.String("key", "test-key", "AMQP routing key") - body = flag.String("body", "foobar", "Body of message") - reliable = flag.Bool("reliable", true, "Wait for the publisher confirmation before exiting") - continuous = flag.Bool("continuous", false, "Keep publishing messages at a 1msg/sec rate") - ErrLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmsgprefix) - Log = log.New(os.Stdout, "[INFO] ", log.LstdFlags|log.Lmsgprefix) -) - -func init() { - flag.Parse() -} - -func main() { - done := make(chan bool) - - SetupCloseHandler(done) - - if err := publish(done, *uri, *exchangeName, *exchangeType, *routingKey, *body, *reliable); err != nil { - ErrLog.Fatalf("%s", err) - } -} - -func SetupCloseHandler(done chan bool) { - c := make(chan os.Signal, 2) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - done <- true - Log.Printf("Ctrl+C pressed in Terminal") - }() -} - -func publish(done chan bool, amqpURI, exchange, exchangeType, routingKey, body string, reliable bool) error { - // This function dials, connects, declares, publishes, and tears down, - // all in one go. In a real service, you probably want to maintain a - // long-lived connection as state, and publish against that. - config := amqp.Config{Properties: amqp.NewConnectionProperties()} - config.Properties.SetClientConnectionName("sample-producer") - Log.Printf("dialing %q", amqpURI) - connection, err := amqp.DialConfig(amqpURI, config) - if err != nil { - return fmt.Errorf("Dial: %s", err) - } - defer connection.Close() - - Log.Printf("got Connection, getting Channel") - channel, err := connection.Channel() - if err != nil { - return fmt.Errorf("Channel: %s", err) - } - - Log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange) - if err := channel.ExchangeDeclare( - exchange, // name - exchangeType, // type - true, // durable - false, // auto-deleted - false, // internal - false, // noWait - nil, // arguments - ); err != nil { - return fmt.Errorf("Exchange Declare: %s", err) - } - - var publishes chan uint64 = nil - var confirms chan amqp.Confirmation = nil - - // Reliable publisher confirms require confirm.select support from the - // connection. - if reliable { - Log.Printf("enabling publisher confirms.") - if err := channel.Confirm(false); err != nil { - return fmt.Errorf("Channel could not be put into confirm mode: %s", err) - } - // We'll allow for a few outstanding publisher confirms - publishes = make(chan uint64, 8) - confirms = channel.NotifyPublish(make(chan amqp.Confirmation, 1)) - - go confirmHandler(done, publishes, confirms) - } - - Log.Println("declared Exchange, publishing messages") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - for { - seqNo := channel.GetNextPublishSeqNo() - Log.Printf("publishing %dB body (%q)", len(body), body) - - if err := channel.PublishWithContext(ctx, - exchange, // publish to an exchange - routingKey, // routing to 0 or more queues - false, // mandatory - false, // immediate - amqp.Publishing{ - Headers: amqp.Table{}, - ContentType: "text/plain", - ContentEncoding: "", - Body: []byte(body), - DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent - Priority: 0, // 0-9 - // a bunch of application/implementation-specific fields - }, - ); err != nil { - return fmt.Errorf("Exchange Publish: %s", err) - } - - Log.Printf("published %dB OK", len(body)) - if reliable { - publishes <- seqNo - } - - if *continuous { - select { - case <-done: - Log.Println("producer is stopping") - return nil - case <-time.After(time.Second): - continue - } - } else { - break - } - } - - return nil -} - -func confirmHandler(done chan bool, publishes chan uint64, confirms chan amqp.Confirmation) { - m := make(map[uint64]bool) - for { - select { - case <-done: - Log.Println("confirmHandler is stopping") - return - case publishSeqNo := <-publishes: - Log.Printf("waiting for confirmation of %d", publishSeqNo) - m[publishSeqNo] = false - case confirmed := <-confirms: - if confirmed.DeliveryTag > 0 { - if confirmed.Ack { - Log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag) - } else { - ErrLog.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag) - } - delete(m, confirmed.DeliveryTag) - } - } - if len(m) > 1 { - Log.Printf("outstanding confirmations: %d", len(m)) - } - } -}