RabbitMQ教程 - Work Queues

半兽人 发表于: 2017-11-13   最后更新时间: 2018-05-08 10:57:07  
{{totalSubscript}} 订阅, 3,207 游览

工作队列

(使用Java Client)

screenshot

在第一个教程中,我们编写了发送和接收消息的程序。在这一个中,我们将创建一个工作队列(Work Queue),用于在多个workers之间分配消费的任务。

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待完成。相反,我们特意推迟完成任务。我们把一个任务封装成一个消息并发送给一个队列。 在后台运行的工作进程将最终执行任务。当你运行许个worker时,任务将在它们之间共享。

这个概念在web应用程序中特别有用,在短的HTTP请求窗口中不可能处理复杂的任务。

预备知识

在本教程的前一部分,我们发送了一个包含“Hello World!”的消息。 现在我们将发送复杂任务的字符串。 我们没有真正的任务,比如图像被重新调整大小或者PDF文件被渲染,所以我们需要假装我们很忙 - 通过使用Thread.sleep()函数来伪装它。我们将把字符串中的点数作为复杂度。 每一个点将占到“工作”的一秒钟。例如,Hello ...描述的假任务将需要三秒钟的时间。

我们稍微修改前面例子中的Send.java代码,以允许从命令行发送任意消息。 这个程序会把任务安排到我们的工作队列中,所以我们把它命名为NewTask.java:

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

从命令行参数获取消息:

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

我们的老Recv.java程序也需要做一些改变:它需要伪造消息体中每个点的第二个工作。它将处理交付的消息并执行任务,所以我们称之为Worker.java:

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

我们假冒的任务来模拟执行时间:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

按照教程1(工作目录中的jar文件和环境变量CP)编译它们:

javac -cp $CP NewTask.java Worker.java

循环调度

使用任务队列的优点之一是能够很地并行工作。如果我们积压工作,我们可以增加更多的work,这样可以轻松扩展。

首先,我们尝试同时运行两个工作者实例。它们都会从队列中得到消息,但究竟是如何? 让我们来看看。

您需要打开三个控制台。 两个将运行工人程序。 这些控制台将是我们的两个消费者 - C1和C2。

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

在第三个,我们将发布新的任务。 一旦你开始了消费者,你可以发布一些消息:

# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....

让我们看看交给我们work的消息:

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ将按顺序将每条消息发送给下一个consumer。 平均而言,每个消费者将获得相同数量的消息。 这种分发消息的方式称为循环法(round-robin)。试试三个或更多的worker。

消息确认

完成任务可能需要几秒钟的时间。你可能会想知道如果其中一个消费者开始一个长时间的任务,并且只是部分完成而故障了。在我们当前的代码中,一旦RabbitMQ向客户端发送了消息,则立即将其标记为删除。在这种情况下,如果你kill了一个worker,我们将失去这条消息。我们也将失去所有派发给这个woker但尚未处理的消息。

当然,我们不想丢失任何任务。如果一名worker死亡,我们希望将任务交付给另一名worker。

为了确保消息永不丢失,RabbitMQ支持消息确认。消费者发回确认(告知),告诉RabbitMQ已经收到,处理了这个消息,RabbitMQ可以自由删除它了。

如果消费者故障(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将认为该消息未被完全处理,并将重新排队。 如果有其他消费者也运行则,则会迅速的重新发送给其他消费者。这样,即使worker偶尔故障,也可以确保没有任何信息丢失。

没有任何消息超时; 当消费者挂掉时,RabbitMQ将重新传递消息。 即使处理消息需要很长的时间也没关系。

手动消息确认默认打开。在前面的例子中,我们明确地通过autoAck=true关闭了手动。 现在是时候把这个设置为false了,一旦我们完成了一项任务,并且从worker发送一个消息应答确认。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

使用这段代码,我们可以确定,即使在处理消息的时候使用CTRL+C来杀死一个worker,也不会丢失任何东西。worker挂后不就..所有未确认的消息将被重新发送。

忘记确认

忘记basicAck是一个很常见的错误。这是个容易犯的错误,但后果是严重的。当你的客户退出时,消息将被重新传递,但是RabbitMQ将会使用越来越多的内存,因为它不能释放任何未被确认消费的消息。

为了调试这种错误,你可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,去掉sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经学会了如何确保即使消费者挂,消息也不会丢失。但是,如果RabbitMQ服务器停止,我们的消息仍然会丢失。

当RabbitMQ退出或崩溃时,它会丢掉队列和消息。需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为durable(持久)

首先,我们需要确保RabbitMQ永远不会失去队列。为了做到这一点,我们需要生命它是durable(持久)的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然这个命令本身是正确的,但是在我们现在的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,该队列不是durable的。 RabbitMQ不允许使用不同的参数重新定义一个现有的队列,并且会向任何尝试这样做的程序返回一个错误。 但有一个快速的解决方法 - 让我们声明一个不同名称的队列,例如task_queue

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

这个queueDeclare需要应用于生产者和消费者的代码中。

此时我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久的 - 通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

注意消息持久性

将消息标记为“永久”并不能完全保证消息不会丢失。 尽管它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接收到消息并且还没有保存消息时,仍然有一个很短的时间窗口。此外,RabbitMQ不会为每个消息执行fsync(2) - 它可能只是保存到缓存中,并没有真正写入磁盘。 持久性保证不强,但对于我们简单的任务队列已经足够了。如果你需要更强大的保证,那么你可以使用publisher confirms

公平分发

你可能已经注意到调度仍然不能按照我们的要求工作。例如在有两个worker的情况下,当所有的奇数的消息都很重,偶数消息很轻时,一个worker就会一直很忙,另一个worker几乎没有工作。那么,RabbitMQ是不知道的,仍然均匀地发送消息。

发生这种情况是因为RabbitMQ只在消息进入队列时发送消息。它没有考虑消费者未确认消息的数量。它只是盲目地把第n条消息分发给第n个消费者。

为了解决这个,我们可以使用basicQos,设置prefetchCount = 1。这就告诉RabbitMQ一次不能给一个worker多个消息。 或者换句话说,不要向worker发送新消息,直到处理并确认了前一个消息。相反,它会将其分派给下一个"还不忙"的worker。

 int prefetchCount = 1;
 channel.basicQos(prefetchCount);

关于队列大小的说明

如果所有的woker都很忙,你的queue就可能被填满。你需要关注一下,也可以增加更多的worker,或者有其他的策略。

放一起

NewTask.java类的最终代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

(NewTask.java源码)

Worker.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

(Worker.java源码)

使用消息确认和prefetchCount,你可以设置到工作队列中去。即使RabbitMQ重新启动,耐用性选项也能让消息不丢失。

有关Channel方法和MessageProperties的更多信息,可以在线浏览JavaDocs

更新于 2018-05-08

查看RabbitMq更多相关的文章或提一个关于RabbitMq的问题,也可以与我们一起分享文章