本教程假定RabbitMQ已经安装并运行(标准端口:5672,地址:localhost)。如果你使用了不同的host,port或credentials,那么请自行调整。
RabbitMQ是一个消息代理:它接收和转发消息。 你可以将其视为邮局:当你要发布的邮件放在邮箱中时,你可以确信Postman先生最终会将邮件发送给收件人。RabbitMQ就如邮箱,邮局和邮递员。
RabbitMQ和邮局之间的主要区别在于它不进行处理,而是接收,存储和转发二进制数据块 - 即是消息 。
RabbitMQ和消息传递使用一些术语。
发送消息的程序即是一个生产者:
队列是居住在RabbitMQ中的邮箱的名称。 虽然消息流经RabbitMQ和你的应用程序,但它们只能存储在队列之中。它本质上就是一个大的消息缓冲区,队列只受主机的内存和磁盘限制的限制。许多生产者可以发送消息到一个队列中,多个消费者从队列中接收数据。我们如何代表一个队列:
消费与接收有相似的含义。消费者其实就是一个等待接收消息的程序:
请注意,生产者,消费者和经纪人不需要在相同一个主机上。
(使用Java Client)
我们将用Java编写两个程序; 发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将介绍Java API中的一些细节,开始一个“Hello World”。
在下图中,“P”是我们的生产者,“C”是我们的消费者。 中间的框是队列 - RabbitMQ代表消费者的消息缓冲区。
### Java客户端库
RabbitMQ提供多种协议。 本教程使用AMQP 0-9-1,这是一个开放,通用的消息传递协议。 RabbitMQ有许多不同的语言客户端。 我们将使用RabbitMQ提供的Java客户端。
下载客户端库及其依赖项(SLF4J API和SLF4J Simple)。 将这些文件复制到工作目录中,并沿着教程Java文件复制。
请注意SLF4J Simple对于教程是足够的,但是你应该使用像Logback这样的完整的日志库。
(RabbitMQ Java客户端也位于中央Maven仓库中,使用groupId com.rabbitmq和artifactId amqp-client。)
现在我们有了Java客户端及其依赖关系,我们可以编写一些代码。
命名消息生产者(发布者)Send
和消息消费者(接收者)Recv
。 生产者将连接到RabbitMQ,发送一条消息,然后退出。
首先,在Send.java中,我们需要引入一些类:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
设置queue的名字:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException {
...
}
}
然后,我们创建一个连接到服务器:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
连接是socket连接,并为我们处理协议版本协商和认证等。在这里,我们连接到本地机器上的broker - 因为是本地主机。 如果我们想连接到另一台机器上的broker,只需在此指定其名称或IP地址即可。
接下来我们创建一个channel,这是完成大部分API的地方。
发送,我们必须声明发送的队列; 然后,我们可以发布消息到队列中:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
声明一个队列是幂等的 - 只有当它不存在时才会被创建。 消息内容是一个字节数组,所以你可以编码任何你需要的格式。
最后,我们关闭channel和connection;
channel.close();
connection.close();
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
如果这是你第一次使用RabbitMQ,并且你没有看到“已发送”消息,那么你可能会不知所措。也许broker没有足够的可用磁盘空间(默认情况下,它至少需要200MB空闲空间),因此拒绝接受消息。 检查代理日志文件以确认并在必要时减少限制。 配置文件文档将告诉你如何设置disk_free_limit。
对于生产者。 RabbitMQ推送消息给消费者,因此与发送单个消息的发送者不同,我们将继续运行以收听消息并将其打印出来。
在Recv.java
和Send引入几乎相同:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer实现Consumer接口的类,我们将使用它来缓存由服务器推送给我们的消息。
设置与生产者相同; 打开一个connection
和一个channel
,并声明我们要消费的队列。请注意,和发送的队列是同一个。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
请注意,我们也在这里声明队列。因为可能在生产者生产消息之前先启动消费者,所以我们希望确保队列存在,然后再尝试消费消息。
我们将告诉服务器将队列中的消息传递给我们。由于它是异步推送消息,因此我们以对象的形式提供回调,缓存消息直到准备好使用它们。通过DefaultConsumer子类完成。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
可以在类路径中仅使用RabbitMQ java客户端来编译这两个类:
javac -cp amqp-client-4.0.2.jar Send.java Recv.java
要运行它们,你需要rabbitmq-client.jar及其它的依赖关系类。在终端一个中,运行消费者:
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv
然后,运行生产者:
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send
在Windows上,使用分号而不是冒号来分隔类路径中的项目。
消费者将通过RabbitMQ打印从生产者处获得的消息。 消费者持续运行,等待消息(使用Ctrl-C停止它)。
你可能想看看RabbitMQ有什么队列,有多少条消息。 你可以使用rabbitmqctl工具(管理员才行)执行此操作:
sudo rabbitmqctl list_queues
在Windows上,省略sudo:
rabbitmqctl.bat list_queues
下一节(第二部分)构建一个简单的工作队列。