RabbitMQ教程 - “Hello world!”

预备知识

本教程假定RabbitMQ已经安装并运行(标准端口:5672,地址:localhost)。如果你使用了不同的host,port或credentials,那么请自行调整。

入门

RabbitMQ是一个消息代理:它接收和转发消息。 你可以将其视为邮局:当你要发布的邮件放在邮箱中时,你可以确信Postman先生最终会将邮件发送给收件人。RabbitMQ就如邮箱,邮局和邮递员。

RabbitMQ和邮局之间的主要区别在于它不进行处理,而是接收,存储和转发二进制数据块 - 即是消息

RabbitMQ和消息传递使用一些术语。

发送消息的程序即是一个生产者:

screenshot

队列是居住在RabbitMQ中的邮箱的名称。 虽然消息流经RabbitMQ和你的应用程序,但它们只能存储在队列之中。它本质上就是一个大的消息缓冲区,队列只受主机的内存和磁盘限制的限制。许多生产者可以发送消息到一个队列中,多个消费者从队列中接收数据。我们如何代表一个队列:

screenshot

消费与接收有相似的含义。消费者其实就是一个等待接收消息的程序:

screenshot
请注意,生产者,消费者和经纪人不需要在相同一个主机上。

"Hello World"

(使用Java Client)

我们将用Java编写两个程序; 发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将介绍Java API中的一些细节,开始一个“Hello World”。

在下图中,“P”是我们的生产者,“C”是我们的消费者。 中间的框是队列 - RabbitMQ代表消费者的消息缓冲区。
screenshot

### Java客户端库

RabbitMQ提供多种协议。 本教程使用AMQP 0-9-1,这是一个开放,通用的消息传递协议。 RabbitMQ有许多不同的语言客户端。 我们将使用RabbitMQ提供的Java客户端。

下载客户端库及其依赖项(SLF4J API和SLF4J Simple)。 将这些文件复制到工作目录中,并沿着教程Java文件复制。

请注意SLF4J Simple对于教程是足够的,但是你应该使用像Logback这样的完整的日志库。

(RabbitMQ Java客户端也位于中央Maven仓库中,使用groupId com.rabbitmq和artifactId amqp-client。)

现在我们有了Java客户端及其依赖关系,我们可以编写一些代码。

发送

screenshot

命名消息生产者(发布者)Send和消息消费者(接收者)Recv。 生产者将连接到RabbitMQ,发送一条消息,然后退出。

首先,在Send.java中,我们需要引入一些类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

设置queue的名字:

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

然后,我们创建一个连接到服务器:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

连接是socket连接,并为我们处理协议版本协商和认证等。在这里,我们连接到本地机器上的broker - 因为是本地主机。 如果我们想连接到另一台机器上的broker,只需在此指定其名称或IP地址即可。

接下来我们创建一个channel,这是完成大部分API的地方。

发送,我们必须声明发送的队列; 然后,我们可以发布消息到队列中:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

声明一个队列是幂等的 - 只有当它不存在时才会被创建。 消息内容是一个字节数组,所以你可以编码任何你需要的格式。

最后,我们关闭channel和connection;

channel.close();
connection.close();

这是整个Send.java类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
    channel.close();
    connection.close();
  }
}

发送不起作用!

如果这是你第一次使用RabbitMQ,并且你没有看到“已发送”消息,那么你可能会不知所措。也许broker没有足够的可用磁盘空间(默认情况下,它至少需要200MB空闲空间),因此拒绝接受消息。 检查代理日志文件以确认并在必要时减少限制。 配置文件文档将告诉你如何设置disk_free_limit。

接收

对于生产者。 RabbitMQ推送消息给消费者,因此与发送单个消息的发送者不同,我们将继续运行以收听消息并将其打印出来。

screenshot

Recv.java和Send引入几乎相同:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

DefaultConsumer实现Consumer接口的类,我们将使用它来缓存由服务器推送给我们的消息。

设置与生产者相同; 打开一个connection和一个channel,并声明我们要消费的队列。请注意,和发送的队列是同一个。

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

请注意,我们也在这里声明队列。因为可能在生产者生产消息之前先启动消费者,所以我们希望确保队列存在,然后再尝试消费消息。

我们将告诉服务器将队列中的消息传递给我们。由于它是异步推送消息,因此我们以对象的形式提供回调,缓存消息直到准备好使用它们。通过DefaultConsumer子类完成。

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
  }
};


channel.basicConsume(QUEUE_NAME, true, consumer);

这是整个Recv.java类


import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(QUEUE_NAME, true, consumer);
  }
}

一起运行

可以在类路径中仅使用RabbitMQ java客户端来编译这两个类:

javac -cp amqp-client-4.0.2.jar Send.java Recv.java

要运行它们,你需要rabbitmq-client.jar及其它的依赖关系类。在终端一个中,运行消费者:

java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv

然后,运行生产者:

java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send

在Windows上,使用分号而不是冒号来分隔类路径中的项目。

消费者将通过RabbitMQ打印从生产者处获得的消息。 消费者持续运行,等待消息(使用Ctrl-C停止它)。

监听队列

你可能想看看RabbitMQ有什么队列,有多少条消息。 你可以使用rabbitmqctl工具(管理员才行)执行此操作:

sudo rabbitmqctl list_queues

在Windows上,省略sudo:

rabbitmqctl.bat list_queues

下一节(第二部分)构建一个简单的工作队列。






发表于: 1年前   最后更新时间: 6月前   游览量:820
上一条: RabbitMQ之消息确认机制
下一条: RabbitMQ教程 - Work Queues

评论…


  • 评论…
    • in this conversation