Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer Tag Reuse Race Condition #297

Closed
lance-menard opened this issue Sep 29, 2022 · 5 comments · Fixed by #299
Closed

Consumer Tag Reuse Race Condition #297

lance-menard opened this issue Sep 29, 2022 · 5 comments · Fixed by #299
Labels

Comments

@lance-menard
Copy link

lance-menard commented Sep 29, 2022

There appears to be a race condition in the ChannelWrapper constructor which can cause RabbitMQ to disconnect the channel with a 530 "attempt to reuse consumer tag" error when calling consume(). This is caused by a several factors:

  1. _onConnect is called inside the ChannelWrapper constructor here. Since _onConnect is async, and constructors must be synchronous, this creates a separate flow of execution disconnected from the context which called the constructor.
  2. The consume function only checks if this._channel is set, and does not check this._settingUp, as (for example) _shouldPublish does. this._channel is set before the setup promise chain is run, and that promise chain includes calling this._reconnectConsumer for each registered consumer. As a result, if a consumer is registered via this.consume between the time this._channel is set and the time the consumers are reconnected, this._channel.consume will be called twice for the same consumer. This can easily happen if the setup function performs multiple operations in RabbitMQ or otherwise takes a while to complete.
  3. This is only made apparent after this change, which generates a consumer tag if one isn't provided. Prior to this change, the server would simply have generated two unique consumer tags, which I believe would simply have attached two identical consumers to the same queue - which would likely slip under the radar in most setups. It may have resulted in other issues when those consumers are canceled, however.
  4. The issue can be worked around by using waitForConnect between constructing the channel wrapper and calling consume, but that comes with the obvious downside of blocking the flow of execution until RabbitMQ is connected and the full channel setup is complete.
  5. The effects of this aren't catastrophic at a small scale - the connection manger does its job and reconnects the channel, at which point the consume logic works as expected - but when the connection is shared across many consumers, this results in a lot of overhead with setup functions being repeatedly rerun as new channels are created and throw this error.

Let me know if you'd like any further information. If I have time, I may put in a PR addressing factor 2 (which seems like the easiest approach to addressing the issue), but I can't guarantee that will happen any time soon.

I've put together a reproduction for the issue (which is how I noodled through everything above) which I'll include here:

const { connect } = require("amqp-connection-manager");

const QUEUE_NAME = "connection-manager-consumer-tag-test";

const setupFn = async (channel) => {
  console.debug("Starting setup...");
  await channel.assertQueue(QUEUE_NAME);
  await new Promise((resolve) => setTimeout(resolve, 2000));
  console.debug("Setup complete.");
};

const main = async () => {
  // Create a connection.
  const connection = await connect({
    protocol: "amqp",
    hostname: "localhost",
    port: 5672,
    username: "guest",
    password: "guest",
    vhost: "/",
  });

  // Bind some event handlers to help understand what's going on.
  connection.on("connect", () => console.debug("Connected to rabbitmq."));

  connection.on("connectFailed", (error) =>
    console.warn("RabbitMQ connection failed.", error)
  );

  connection.on("disconnect", (error) =>
    console.warn(
      `Disconnected from rabbitmq. Error: ${error.message}`,
      error.err.message
    )
  );

  // This race condition only occurs if the connection manager is already connected
  // when you create the channel, since that's the condition in which the constructor
  // calls _onConnect.
  await connection.connect();

  // Create a new channel.
  const channelWrapper = await connection.createChannel({
    setup: setupFn,
  });

  // Bind a few more event handlers to help understand what's going on.
  channelWrapper.on("error", (error) =>
    console.warn(`Channel error.  Error: ${error.message}`)
  );

  channelWrapper.on("close", () => console.debug("Channel closed."));

  // Adding a waitForConnect here resolves the issue, since it ensures the initial _onConnect call
  // has finished before consuming messages.
  // await channelWrapper.waitForConnect();

  // We need to wait a short period of time to ensure the channel has been created via
  // createChannel/createConfirmChannel and assigned to this._channel, but the setup functions have not completed yet.
  await new Promise((resolve) => setTimeout(resolve, 1000));

  console.debug("Consuming messages...");

  // This function call will cause the connection manager to call _connect twice.  Since the ChannelWrapper class now
  // generates a consumer tag internally, this will cause RabbitMQ to throw a 530 error for attempting to reuse a
  // consumer tag.  Before the change was made to generate consumer tags internally, this presumably would have led to
  // two consumers with the same handler being bound to the same queue, which likely would have flown under the radar.
  await channelWrapper.consume(QUEUE_NAME, async () => {
    // The message handler body isn't relevant here.
  });
};

main().catch((error) => {
  console.error(error);
  process.exit(-1);
});

Thank you!

luddd3 added a commit to luddd3/node-amqp-connection-manager that referenced this issue Sep 30, 2022
Fixes jwalton#297

If consume is called before the setup function is completed, the
same consumer can be registered twice and cause a precondition
error.
@jwalton
Copy link
Owner

jwalton commented Sep 30, 2022

Thanks for the detailed bug repot!

github-actions bot pushed a commit that referenced this issue Sep 30, 2022
## [4.1.7](v4.1.6...v4.1.7) (2022-09-30)

### Bug Fixes

* consumer registered twice during setup ([1ca216a](1ca216a)), closes [#297](#297)
@jwalton
Copy link
Owner

jwalton commented Sep 30, 2022

🎉 This issue has been resolved in version 4.1.7 🎉

The release is available on:

Your semantic-release bot 📦🚀

@lance-menard
Copy link
Author

Thank you for the quick fix!

@TalFaitlov
Copy link

Thanks for the fix!

I was getting the "unknown delivery tag" on version 4.1.6 during HPA scale-up/down and by manually Force closing connections from the RabbitMQ management console
After upgrading to v4.1.7, I was not able to reproduce the issue

BUT, What if we ack a message that was processed during a reconnection?
https://github.com/jwalton/node-amqp-connection-manager/blob/master/src/ChannelWrapper.ts#L825 only protects acking while reconnecting
Is there anything protecting against acking an old message on the new channel?

@jwalton I hope commenting on a closed channel is ok, this is the most relevant issue and "quorum"

@aki-ks
Copy link

aki-ks commented Oct 16, 2024

We are experiencing the same issues as @TalFaitlov describes on version 4.1.14, was his comment noticed?

We are consuming messages using a single consumer from a channel, that continuously received ~20/s messages.
If the connections dies (e.g. nodejs event-loop is blocked for a while and thous heartbearts are not send), it then reconnects and seems to try to acknowledge messages. The acknowledgements are however rejected by the Server and it closes the channel once again. This reconnect-kick-loop iterates several times

2024-10-15 12:01:37.615774+00:00 [info] <0.157825925.0> accepting AMQP connection <0.157825925.0> (10.0.1.222:43282 -> 10.0.2.161:5672)
2024-10-15 12:01:37.743676+00:00 [info] <0.157825925.0> connection <0.157825925.0> (10.0.1.222:43282 -> 10.0.2.161:5672): user 'my-user' authenticated and granted access to vhost 'my-queue'
2024-10-15 12:02:47.712296+00:00 [error] <0.157825925.0> closing AMQP connection <0.157825925.0> (10.0.1.222:43282 -> 10.0.2.161:5672):
2024-10-15 12:02:47.712296+00:00 [error] <0.157825925.0> missed heartbeats from client, timeout: 5s
2024-10-15 12:03:10.112264+00:00 [info] <0.157847679.0> accepting AMQP connection <0.157847679.0> (10.0.1.222:40302 -> 10.0.2.161:5672)
2024-10-15 12:03:10.161293+00:00 [info] <0.157847679.0> connection <0.157847679.0> (10.0.1.222:40302 -> 10.0.2.161:5672): user 'my-user' authenticated and granted access to vhost 'my-queue'
2024-10-15 12:03:10.299430+00:00 [error] <0.157847709.0> Channel error on connection <0.157847679.0> (10.0.1.222:40302 -> 10.0.2.161:5672, vhost: 'my-queue', user: 'my-user'), channel 1:
2024-10-15 12:03:10.299430+00:00 [error] <0.157847709.0> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 1368
2024-10-15 12:03:10.312699+00:00 [warning] <0.157847679.0> closing AMQP connection <0.157847679.0> (10.0.1.222:40302 -> 10.0.2.161:5672, vhost: 'my-queue', user: 'my-user'):
2024-10-15 12:03:10.312699+00:00 [warning] <0.157847679.0> client unexpectedly closed TCP connection
2024-10-15 12:03:41.258421+00:00 [info] <0.157854795.0> accepting AMQP connection <0.157854795.0> (10.0.1.222:58264 -> 10.0.2.161:5672)
2024-10-15 12:03:41.303497+00:00 [info] <0.157854795.0> connection <0.157854795.0> (10.0.1.222:58264 -> 10.0.2.161:5672): user 'my-user' authenticated and granted access to vhost 'my-queue'
2024-10-15 12:03:41.392559+00:00 [error] <0.157854799.0> Channel error on connection <0.157854795.0> (10.0.1.222:58264 -> 10.0.2.161:5672, vhost: 'my-queue', user: 'my-user'), channel 1:
2024-10-15 12:03:41.392559+00:00 [error] <0.157854799.0> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 10
2024-10-15 12:03:41.401717+00:00 [warning] <0.157854795.0> closing AMQP connection <0.157854795.0> (10.0.1.222:58264 -> 10.0.2.161:5672, vhost: 'my-queue', user: 'my-user'):
2024-10-15 12:03:41.401717+00:00 [warning] <0.157854795.0> client unexpectedly closed TCP connection
2024-10-15 12:03:56.684204+00:00 [info] <0.157858323.0> accepting AMQP connection <0.157858323.0> (10.0.1.222:48298 -> 10.0.2.161:5672)
2024-10-15 12:03:56.727542+00:00 [info] <0.157858323.0> connection <0.157858323.0> (10.0.1.222:48298 -> 10.0.2.161:5672): user 'my-user' authenticated and granted access to vhost 'my-queue'
2024-10-15 12:03:56.798493+00:00 [error] <0.157858319.0> Channel error on connection <0.157858323.0> (10.0.1.222:48298 -> 10.0.2.161:5672, vhost: 'my-queue', user: 'my-user'), channel 1:
2024-10-15 12:03:56.798493+00:00 [error] <0.157858319.0> operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 13
2024-10-15 12:03:56.807095+00:00 [warning] <0.157858323.0> closing AMQP connection <0.157858323.0> (10.0.1.222:48298 -> 10.0.2.161:5672, vhost: 'my-queue', user: 'my-user'):
2024-10-15 12:03:56.807095+00:00 [warning] <0.157858323.0> client unexpectedly closed TCP connection
...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants