在第一个教程中,我们编写了发送和接收消息的程序。在这一个中,我们将创建一个工作队列(Work Queue),用于在多个workers
之间分配消费的任务。
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待完成。相反,我们特意推迟完成任务。我们把一个任务封装成一个消息并发送给一个队列。 在后台运行的工作进程将最终执行任务。当你运行许个worker时,任务将在它们之间共享。
这个概念在web应用程序中特别有用,在短的HTTP请求窗口中不可能处理复杂的任务。
在本教程的前一部分,我们发送了一个包含“Hello World!”的消息。 现在我们将发送复杂任务的字符串。 我们没有真正的任务,比如图像被重新调整大小或者PDF文件被渲染,所以我们需要假装我们很忙 - 通过使用Thread.sleep()
函数来伪装它。我们将把字符串中的点数作为复杂度。 每一个点将占到“工作”的一秒钟。例如,Hello ...
描述的假任务将需要三秒钟的时间。
我们稍微修改前面例子中的Send.java代码,以允许从命令行发送任意消息。 这个程序会把任务安排到我们的工作队列中,所以我们把它命名为NewTask.java:
String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
从命令行参数获取消息:
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
我们的老Recv.java
程序也需要做一些改变:它需要伪造消息体中每个点的第二个工作。它将处理交付的消息并执行任务,所以我们称之为Worker.java:
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
我们假冒的任务来模拟执行时间:
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
按照教程1(工作目录中的jar文件和环境变量CP)编译它们:
javac -cp $CP NewTask.java Worker.java
使用任务队列的优点之一是能够很地并行工作。如果我们积压工作,我们可以增加更多的work,这样可以轻松扩展。
首先,我们尝试同时运行两个工作者实例。它们都会从队列中得到消息,但究竟是如何? 让我们来看看。
您需要打开三个控制台。 两个将运行工人程序。 这些控制台将是我们的两个消费者 - C1和C2。
# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
在第三个,我们将发布新的任务。 一旦你开始了消费者,你可以发布一些消息:
# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....
让我们看看交给我们work的消息:
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个consumer。 平均而言,每个消费者将获得相同数量的消息。 这种分发消息的方式称为循环法(round-robin)。试试三个或更多的worker。
完成任务可能需要几秒钟的时间。你可能会想知道如果其中一个消费者开始一个长时间的任务,并且只是部分完成而故障了。在我们当前的代码中,一旦RabbitMQ向客户端发送了消息,则立即将其标记为删除。在这种情况下,如果你kill了一个worker,我们将失去这条消息。我们也将失去所有派发给这个woker但尚未处理的消息。
当然,我们不想丢失任何任务。如果一名worker死亡,我们希望将任务交付给另一名worker。
为了确保消息永不丢失,RabbitMQ支持消息确认。消费者发回确认(告知),告诉RabbitMQ已经收到,处理了这个消息,RabbitMQ可以自由删除它了。
如果消费者故障(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将认为该消息未被完全处理,并将重新排队。 如果有其他消费者也运行则,则会迅速的重新发送给其他消费者。这样,即使worker偶尔故障,也可以确保没有任何信息丢失。
没有任何消息超时; 当消费者挂掉时,RabbitMQ将重新传递消息。 即使处理消息需要很长的时间也没关系。
手动消息确认默认打开。在前面的例子中,我们明确地通过autoAck=true
关闭了手动。 现在是时候把这个设置为false了,一旦我们完成了一项任务,并且从worker发送一个消息应答确认。
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
使用这段代码,我们可以确定,即使在处理消息的时候使用CTRL+C
来杀死一个worker,也不会丢失任何东西。worker挂后不就..所有未确认的消息将被重新发送。
忘记确认
忘记
basicAck
是一个很常见的错误。这是个容易犯的错误,但后果是严重的。当你的客户退出时,消息将被重新传递,但是RabbitMQ将会使用越来越多的内存,因为它不能释放任何未被确认消费的消息。为了调试这种错误,你可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,去掉sudo:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
我们已经学会了如何确保即使消费者挂,消息也不会丢失。但是,如果RabbitMQ服务器停止,我们的消息仍然会丢失。
当RabbitMQ退出或崩溃时,它会丢掉队列和消息。需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为durable(持久)
。
首先,我们需要确保RabbitMQ永远不会失去队列。为了做到这一点,我们需要生命它是durable(持久)
的:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
虽然这个命令本身是正确的,但是在我们现在的设置中不起作用。这是因为我们已经定义了一个名为hello
的队列,该队列不是durable的。 RabbitMQ不允许使用不同的参数重新定义一个现有的队列,并且会向任何尝试这样做的程序返回一个错误。 但有一个快速的解决方法 - 让我们声明一个不同名称的队列,例如task_queue
:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
这个queueDeclare需要应用于生产者和消费者的代码中。
此时我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久的 - 通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN
。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
注意消息持久性
将消息标记为“永久”并不能完全保证消息不会丢失。 尽管它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接收到消息并且还没有保存消息时,仍然有一个很短的时间窗口。此外,RabbitMQ不会为每个消息执行
fsync(2)
- 它可能只是保存到缓存中,并没有真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列已经足够了。如果你需要更强大的保证,那么你可以使用publisher confirms
。
你可能已经注意到调度仍然不能按照我们的要求工作。例如在有两个worker的情况下,当所有的奇数的消息都很重,偶数消息很轻时,一个worker就会一直很忙,另一个worker几乎没有工作。那么,RabbitMQ是不知道的,仍然均匀地发送消息。
发生这种情况是因为RabbitMQ只在消息进入队列时发送消息。它没有考虑消费者未确认消息的数量。它只是盲目地把第n条消息分发给第n个消费者。
为了解决这个,我们可以使用basicQos
,设置prefetchCount = 1。这就告诉RabbitMQ一次不能给一个worker多个消息。 或者换句话说,不要向worker发送新消息,直到处理并确认了前一个消息。相反,它会将其分派给下一个"还不忙"的worker。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
关于队列大小的说明
如果所有的woker都很忙,你的queue就可能被填满。你需要关注一下,也可以增加更多的worker,或者有其他的策略。
NewTask.java类的最终代码:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
Worker.java:
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
使用消息确认和prefetchCount
,你可以设置到工作队列中去。即使RabbitMQ重新启动,耐用性选项也能让消息不丢失。
有关Channel方法和MessageProperties的更多信息,可以在线浏览JavaDocs。