在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本教程中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务。
工作队列(又名:任务队列)背后的主要理念是避免立即执行资源密集型任务而不得不等待其完成。相反,我们将任务安排在稍后完成。我们将任务封装为消息,并将其发送到队列。在后台运行的 Worker 进程会弹出任务并最终执行作业。当运行多个 Worker 时,任务将在它们之间共享。
这一概念在网络应用程序中尤其有用,因为在短时间的 HTTP 请求窗口中不可能处理复杂的任务。
在本教程的上一部分,我们发送了一条包含 "Hello World!"的信息。现在我们将发送代表复杂任务的字符串。我们没有现实世界中的任务,比如需要调整大小的图片或需要渲染的 pdf 文件,因此我们可以使用 setTimeout 方法假装我们很忙。我们将以字符串中的点数作为其复杂度;每个点代表一秒钟的 "工作"。例如,"你好...... "描述的假任务需要三秒钟。
我们将对上一个示例中的 send.js 代码稍作修改,以允许从命令行发送任意信息。这个程序将为我们的工作队列安排任务,因此我们将其命名为 new_task.js:
var queue = "task_queue";
var msg = process.argv.slice(2).join(" ") || "Hello World!";
channel.assertQueue(queue, {
durable: true,
});
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true,
});
console.log(" [x] Sent '%s'", msg);
我们原来的 receive.js 脚本也需要做一些改动:它需要为消息正文中的每一个点伪造一秒钟的工作时间。它将从队列中弹出消息并执行任务,因此我们称之为 worker.js:
var queue = "task_queue";
// This makes sure the queue is declared before attempting to consume from it
channel.assertQueue(queue, {
durable: true,
});
channel.consume(
queue,
function(msg) {
var secs = msg.content.toString().split(".").length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
}, secs * 1000);
},
{
// automatic acknowledgment mode,
// see /docs/confirms for details
noAck: true,
}
);
请注意,我们的假任务模拟的是执行时间。 按照教程一运行它们:
# shell 1
node worker.js
# shell 2
node new_task.js
使用任务队列的优势之一是可以轻松实现工作并行化。如果我们积压了大量工作,只需添加更多工人,就能轻松实现扩展。
首先,让我们尝试同时运行两个 worker.js 脚本。它们都会从队列中获取信息,但具体是如何获取的呢?让我们来看看。
您需要打开三个控制台。其中两个将运行 worker.js 脚本。这两个控制台就是我们的两个消费者 - C1 和 C2。
# shell 1
node worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
node worker.js
# => [*] Waiting for messages. To exit press CTRL+C
在第三项中,我们将发布新任务。启动消费者后,您可以发布一些信息:
# shell 3
./new_task.js First message.
./new_task.js Second message..
./new_task.js Third message...
./new_task.js Fourth message....
./new_task.js Fifth message.....
让我们看看我们的工人都得到了什么:
# shell 1
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
./worker.js
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ 会将每条消息按顺序发送给下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种分配消息的方式称为轮循。请使用三个或更多 Worker 试试看。
完成一个任务可能需要几秒钟,您可能会想,如果消费者启动了一个较长的任务,但在完成之前就终止了,会发生什么情况?在我们当前的代码中,一旦 RabbitMQ 将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果终止 Worker,就会丢失它刚刚处理的消息。同时也会丢失已发送到此特定 Worker 但尚未处理的消息。
但我们不想失去任何任务。如果一名工人死亡,我们希望将任务交付给另一名工人。
为了确保消息不会丢失,RabbitMQ 支持消息确认。消费者会发送回执 (ack),告诉 RabbitMQ 已收到并处理了特定消息,RabbitMQ 可以删除该消息。
如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而未发送应答,RabbitMQ 将了解消息未被完全处理,并将其重新排队。如果有其他消费者同时在线,它将迅速将消息重新传递给另一个消费者。这样,您就可以确保不会丢失任何消息,即使工作者偶尔死机。
消费者确认交付时会有一个超时(默认为 30 分钟)。这有助于检测出从不确认交付的错误(卡住)消费者。您可以按照交付确认超时中的说明增加超时时间。
在前面的示例中,我们已经关闭了消费者手动确认功能。现在是时候使用 {noAck: false} 选项打开它们,并在完成任务后从 Worker 发送适当的确认信息了。
channel.consume(
queue,
function(msg) {
var secs = msg.content.toString().split(".").length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
channel.ack(msg);
}, secs * 1000);
},
{
// manual acknowledgment mode,
// see /docs/confirms for details
noAck: false,
}
);
使用这段代码,可以确保即使在处理报文时使用 CTRL+C 终止 Worker,也不会丢失任何信息。在 Worker 终止后不久,所有未确认的消息都会重新发送。
确认必须在接收传送的同一信道上发送。试图使用不同的通道进行确认将导致通道级协议异常。有关确认的更多信息,请参阅文档指南。
被遗忘的消息确认
漏掉 "ack "是一个常见错误。这是一个很容易犯的错误,但后果却很严重。当您的客户端退出时,消息将被重新交付(这可能看起来像随机重新交付),但 RabbitMQ 将占用越来越多的内存,因为它将无法释放任何未确认的消息。
为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在 Windows 系统中,去掉 sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保队列能在 RabbitMQ 节点重启后继续运行。为此,我们需要将其声明为持久:
channel.assertQueue("hello", { durable: true });
虽然这条命令本身是正确的,但在我们目前的设置中却行不通。这是因为我们已经定义了一个名为 hello 的队列,而这个队列并不持久。RabbitMQ 不允许您使用不同的参数重新定义现有队列,任何试图这样做的程序都会返回错误。但有一个快速的解决方法--让我们用不同的名称声明一个队列,例如 task_queue:
channel.assertQueue("task_queue", { durable: true });
durable选项的更改需要同时应用于生产者和消费者代码。
至此,我们确信即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。现在,我们需要将消息标记为持久性消息。
- 通过使用persistent选项 Channel.sendToQueue 来实现。
channel.sendToQueue(queue, Buffer.from(msg), {persistent: true});
关于信息持久性的说明
将消息标记为持久并不能完全保证消息不会丢失。虽然它会告诉 RabbitMQ 将消息保存到磁盘,但当 RabbitMQ 已接受消息但尚未保存消息时,仍会有一个很短的时间窗口。此外,RabbitMQ 不会对每条消息都执行 fsync(2) -- 消息可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说绰绰有余。如果需要更强的保证,可以使用发布者确认。
您可能已经注意到,调度工作仍然不能完全按照我们的要求进行。例如,在有两个 Worker 的情况下,当所有奇数消息都很重,而偶数消息都很轻时,一个 Worker 将一直处于忙碌状态,而另一个 Worker 几乎不做任何工作。但是,RabbitMQ 对此一无所知,它仍然会均匀地分派消息。
出现这种情况是因为 RabbitMQ 只是在消息进入队列时分派消息。它不会查看消费者未确认消息的数量。它只是盲目地将每 n 条消息分派给第 n 个消费者。
为了避免这种情况,我们可以使用值为 1 的预取方法。这将告诉 RabbitMQ 不要一次向 Worker 发送多于一条消息。或者换句话说,在处理并确认前一条消息之前,不要向 Worker 发送新消息。相反,它会将消息分派给下一个不忙的 Worker。
channel.prefetch(1);
队列大小注意事项 如果所有工人都很忙,您的队列就会爆满。您需要密切关注这一情况,也许可以增加工人或采取其他策略。
new_task.js 的最终代码:
#!/usr/bin/env node
var amqp = require("amqplib/callback_api");
amqp.connect("amqp://localhost", function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = "task_queue";
var msg = process.argv.slice(2).join(" ") || "Hello World!";
channel.assertQueue(queue, {
durable: true,
});
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true,
});
console.log(" [x] Sent '%s'", msg);
});
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
worker.js 的最终代码:
#!/usr/bin/env node
var amqp = require("amqplib/callback_api");
amqp.connect("amqp://localhost", function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var queue = "task_queue";
channel.assertQueue(queue, {
durable: true,
});
channel.prefetch(1);
console.log(
" [*] Waiting for messages in %s. To exit press CTRL+C",
queue
);
channel.consume(
queue,
function(msg) {
var secs = msg.content.toString().split(".").length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
channel.ack(msg);
}, secs * 1000);
},
{
// manual acknowledgment mode,
// see /docs/confirms for details
noAck: false,
}
);
});
});
使用消息确认和 prefetch
可以建立一个工作队列。durability
选项可让任务在重新启动 RabbitMQ 后仍然存活。
有关通道方法和消息属性的更多信息,请浏览 amqplib 文档。