RabbitMQ之隊列優先級


優先級隊列,顧名思義,具有更高優先級的隊列具有較高的優先權,優先級高的消息具備優先被消費的特權。
本文主要講解如何使用RabbitMQ實現隊列優先級。

可以通過RabbitMQ管理界面配置隊列的優先級屬性,如下圖的x-max-priority.
這里寫圖片描述
也可以通過代碼去實現,比如:

Map<String,Object> args = new HashMap<String,Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue_priority", true, false, false, args);

配置了隊列優先級的屬性之后,可以在管理頁面看到Pri的標記:
這里寫圖片描述

上面配置的是一個隊列queue的最大優先級。之后要在發送的消息中設置消息本身的優先級,如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

下面演示一段生產-消費的代碼。首先producer端先生產10個消息,第奇數個消息具備優先級,第偶數個消息就是默認的優先級(最低:0)。
生產端:

package com.vms.test.zzh.rabbitmq.priority;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* Created by hidden on 2017/2/14.
*/

public class PriorityProducer {
public static final String ip = "10.198.197.73";
public static final int port = 5672;
public static final String username = "root";
public static final String password = "root";

public static void main(String[] arggs) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPassword(password);
connectionFactory.setUsername(username);
connectionFactory.setPort(port);
connectionFactory.setHost(ip);

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

//create exchange
channel.exchangeDeclare("exchange_priority","direct",true);

//create queue with priority
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue_priority", true, false, false, args);
channel.queueBind("queue_priority", "exchange_priority", "rk_priority");

//send message with priority
for(int i=0;i<10;i++) {
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
if(i%2!=0)
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages-"+i).getBytes());
}

channel.close();
connection.close();
}
}

消費端:

package com.vms.test.zzh.rabbitmq.priority;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Created by hidden on 2017/2/14.
*/

public class PriorityConsumer {
public static final String ip = "10.198.197.73";
public static final int port = 5672;
public static final String username = "root";
public static final String password = "root";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPassword(password);
connectionFactory.setUsername(username);
connectionFactory.setPort(port);
connectionFactory.setHost(ip);

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("queue_priority", false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}

消費端運行結果:

messages-1
messages-3
messages-5
messages-7
messages-9
messages-0
messages-2
messages-4
messages-6
messages-8

查看運行結果可以驗證優先級隊列的作用。

當然,在消費端速度大於生產端速度,且broker中沒有消息堆積的話,對發送的消息設置優先級也沒什么實際意義,因為發送端剛發送完一條消息就被消費端消費了,那么就相當於broker至多只有一條消息,那么對於單條消息來說優先級是沒有什么意義的。


歡迎支持筆者新書:《RabbitMQ實戰指南》以及關注微信公眾號:Kafka技術專欄。
這里寫圖片描述


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com