RabbitMQ

#导入镜像
docker pull rabbitmq
#运行MQ容器
docker run \
-e RABBITMQ_DEFAULT_USER=cRabbitMQ \
-e RABBITMQ_DEFAULT_PASS=cRABBIT.. \
--name mq \
--hostname hchenp \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:latest

image-20230901235256798



常见消息模型


基本消息队列(BasicQueue)

image-20230902000212209


依赖

<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

spring:

rabbitmq:
host: 198.168.238.3
port: 5672
username: itcast
password: 123321
virtual-host: /
  • 发布消息Test
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("47.115.222.113");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();

// 2.创建通道Channel
Channel channel = connection.createChannel();

// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);

// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");

// 5.关闭通道和连接
channel.close();
connection.close();
}
}


  • 接收消息Test
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("47.115.222.113");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();

// 2.创建通道Channel
Channel channel = connection.createChannel();

// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);

// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
System.out.println("进入方法");
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}





SpringAMQP

image-20231012190207601

应用间消息通信的一种协议,与语言和平台无关。

Spring AMQP 不是 RabbitMQ 的一部分,而是一个用于与 RabbitMQ 集成的 Spring 框架扩展。这使得在 Spring 应用程序中使用 RabbitMQ 变得更加简单和方便。== 简化RabbitMQ客户端


依赖

  • 发布和消费都需要amqp依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


发布-生产者

  • 添加 mq 连接信息
# 发布者

spring:
rabbitmq:
host: 47.115.222.113 # rabbitMQ的ip地址
port: 5672 # 端口
username: itcast # 用户名
password: 123321 # 密码
virtual-host: / # 虚拟主机

publisher-confirm-type: none #返回回执消息 [关闭none,同步simple,异步correlated]
publisher-returns: true #是否开启路由失败返回 [false]


  • 使用模板
@Autowired
private RabbitTemplate rabbitTemplate;

  • .convertAndSend(队列名称,消息)
String queueName = "simple.queue";      //队列名称
String message = "hello, spring amqp!"; //队列消息

rabbitTemplate.convertAndSend(queueName, message);
rabbitTemplate.convertAndSend(交换机,绑定Key,消息);


消费者

  • 配置文件
spring:
rabbitmq:
host: 47.115.222.113 # rabbitMQ的ip地址
port: 5672 # 端口
username: itcast
password: 123321
virtual-host: /

listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
acknowledge-mode: auto #确认机制:none,关闭ack,manual,手动ack,auto:自动ack

  • 声明队列【配置类】

当消费者声明一个还没有被创建的队列时,RabbitMQ会自动创建这个队列,以便它可以开始接收消息。这是RabbitMQ的默认行为,可以确保消费者在发送消息之前声明队列,而不必担心队列是否存在。

//【方式一】 bean 声明 队列

@Configuration
public class FanoutConfig {

// 声明队列 Queue : simple.queue
@Bean
public Queue simpleQueue(){
return new Queue("simple.queue");
}

}

//使用
@RabbitListener(queues = "simple.queue") //如果没有设置声明队列bean,而该队列不存在,会报错
// 【方式二】 方法上注解声明 队列和交换机
@RabbitListener(queuesToDeclare = @Queue(name = "simple.queue")) //无需设置声明队列bean
public void listenSimpleQueue(String msg){
System.out.println("消费者接收到simple.queue的消息:【"+msg+"】");
}

//设置队列类型
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true", //持久化
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))

  • 监听消息组件
  • @RabbitListener(queues= “队列名称”)
@Component
public class SpringRabbitListener {

//【方式一】 bean 已经声明 队列
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("消费者接收到simple.queue的消息:【"+msg+"】");
}

//【方式二】 方法上注解声明 队列
@RabbitListener(queuesToDeclare = @Queue(name = "simple.queue"))
public void listenSimpleQueue(String msg){
System.out.println("消费者接收到simple.queue的消息:【"+msg+"】");
}
}

  • 启动项目即可进行消费


工作消息队列(WorkQueue模型 1:1)

image-20230902152037876

  • 配置设置
spring:
rabbitmq:

listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
  • 一个队列绑定多个消费者
  • 能者多劳
  • 如果不设置分配,会被单一消费者全部拿去慢慢消费
   //@RabbitListener(queues = "simple.queue")
@RabbitListener(queuesToDeclare = @Queue(name = "simple.queue"))
public void listenWorkQueue1(String msg) throws InterruptedException {
Thread.sleep(20);
}

//@RabbitListener(queues = "simple.queue")
@RabbitListener(queuesToDeclare = @Queue(name = "simple.queue"))
public void listenWorkQueue2(String msg) throws InterruptedException {
Thread.sleep(200);
}


交换机类型

  1. Direct Exchange(直连型交换机):这是最简单的交换机类型。它将消息路由到与消息指定的路由键完全匹配的队列。如果路由键匹配,消息将被发送到相应的队列。
  2. Fanout Exchange(扇出交换机):扇出交换机将消息发送到与它绑定的所有队列。它忽略消息的路由键。这意味着无论什么路由键,消息都会被发送到与扇出交换机绑定的所有队列。
  3. Topic Exchange(主题交换机):主题交换机将消息路由到与消息的路由键匹配的队列,路由键可以包含通配符(*和#)。这使得主题交换机能够进行更灵活的消息路由。
  4. Headers Exchange(头交换机):头交换机根据消息的头部属性进行路由。它会根据消息的头部属性与队列绑定的参数进行匹配。

如果不指定交换机类型,默认的交换机类型是 “Direct Exchange”,并且会将消息路由到与消息指定的路由键完全匹配的队列。



发布订阅模型(交互机 1:n)

image-20230902154354719


发布订阅-FanoutExchange

  • 交换机、队列的声明

  • 声明交换机 FanoutExchange

  • 声明队列 Queue

  • 声明交换机和队列的绑定关系 Binding

  • **即所有的消息都复制分配给 全部队列 **

可bean 声明 or 注解声明

//【方式一】 bean 声明 交换机 和 绑定队列

@Configuration
public class FanoutConfig {
// 声明交换机 FanoutExchange类型 : itcast.fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}

// 声明队列 Queue : fanout.queue1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}

// 绑定队列1到交换机 Binding FanoutExchange类型交换机:不需要 路由键
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

// DirectExchange:需要 路由键 使用 with 指定路由键
@Bean
public Binding errorBinding(Queue errQueue,DirectExchange errExchange){
return BindingBuilder.bind(errQueue).to(errExchange).with("error");
}

}

//使用
@RabbitListener(queues = "fanout.queue1") //如果没有设置声明队列bean,而该队列不存在,会报错

//【方式二】 方法上 声明 交换机 和 声明队列并绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue1"),
exchange = @Exchange(name = "itcast.fanout",type = ExchangeTypes.Fanout), //名字、交换机类型
))
public void listenDirectQueue1(String msg){}

在绑定交换机的时候,想设置队列类型

    @RabbitListener(bindings = @QueueBinding(
value = @Queue(
name = "lazy.queue",
durable = "true", //持久化
arguments = @Argument(name = "x-queue-mode",value = "lazy")
),
exchange = @Exchange(name = "lazy.change",type = ExchangeTypes.Fanout),
key = {"lazy"}
))

  • 发布

    • 现在发布消息给 交换机 而不是队列
    • .convertAndSend(交换机名称,队列的绑定key,消息)
// 交换机名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, every one!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "", message);

  • 订阅

//@RabbitListener(queues = "fanout.queue1")
@RabbitListener(queuesToDeclare = @Queue(name = "fanout.queue1"))
public void listenFanoutQueue1(String msg) throws InterruptedException {
//消费者1
}


//@RabbitListener(queues = "fanout.queue1")
@RabbitListener(queuesToDeclare = @Queue(name = "fanout.queue1"))
public void listenFanoutQueue2(String msg) throws InterruptedException {
//消费者2
}



路由-DirectExchange

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 即不全部复制分配给全部队列,选择性复制分配

image-20230902162332635

订阅者 注解声明 队列 和其bindingKey

@RabbitListener(bindings = @QueueBinding(		//队列绑定
value = @Queue(name = "direct.queue1"), //队列名称
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //名字、交换机类型
key = {"red","blue"} //绑定key
))

  • 发布

  • 指定 routingKey :只有对应的队列才能获取
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "hello, blue!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);

  • 订阅

@RabbitListener(bindings = @QueueBinding(		//队列绑定
value = @Queue(name = "direct.queue1"), //队列名称
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //名字、类型
key = {"red","blue"} //绑定key
))
public void listenDirectQueue1(String msg){ }

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //名字、交换机类型
key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //名字、交换机类型
key = {"pink"}
))
public void listenDirectQueue3(String msg){}


主题 Topic Exchange

image-20230902165124451


  • 订阅
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic" ,type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){}


@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic" ,type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){}

  • 发布
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "cccccccccccccccc!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);





消息转换器

  • 传的消息为对象的情况下,需要序列化,RabbitMQ默认字节
  • 发送=接收

1、依赖

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>


2、配置类

@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}


3、使用

  • 发送

在 convertAndSend 外部定义 message的属性发生转换异常,没有实现消息转换

定义消息的属性在:

String str = "hello";
rabbitTemplate.convertAndSend(exchange, "", str, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); //设置非临时
message.getMessageProperties().setExpiration("5000"); //设置过期时间
return message;
}
});

 Map<String,Object> msg = new HashMap<>();
msg.put("name","chen");
msg.put("age",21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue",msg);

Student student = new Student("辰呀",12);
rabbitTemplate.convertAndSend(queue,student);

  • 接收
//声明队列  -- 配置类
@Bean
public Queue objectQueue() {
return new Queue("object.queue");
}
@RabbitListener(queues = "object.queue")
public void objectQueue(Map<String,String> msg){
System.err.println("消费者接收到的对象:"+msg);
}


@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazyQueue(Student student){
System.err.println("惰性队列接收到的消息:"+student);
}





生产者可靠



连接重连

  • 重连过程是 阻塞式

image-20231012164538217





消息确认

监听消息是否成功传递

image-20231012165133017

image-20231012160917285


  • 同步到达消息回调
  • 异步到达

image-20231012165306963

#生产者
spring:
rabbitmq:
publisher-confirm-type: correlated #返回回执消息 [关闭none,同步simple,异步correlated]
publisher-returns: true #是否开启路由失败返回 [false]

confirm模式

  • 每个发消息都需要自定义指定,回调获取

  • 消息从生产者到达交换机

image-20231012170205902

//创建CorrelationData
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//唯一性
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
//java错误
public void onFailure(Throwable throwable) {
System.err.println("消息回调失败"+throwable);
}

@Override
public void onSuccess(CorrelationData.Confirm result) {
System.err.println("收到confirm callback回调");
if (result.isAck()){
//消息发送成功
System.err.println("消息发送成功 收到ack");
}else {
System.err.println("发送消息失败,收到 nack,reason:"+result.getReason());
}
}
});

String message = "这是confirm模式的消息";
String exhcang = "exchange_confirmC";
rabbitTemplate.convertAndSend(exhcang,"confirm",message,correlationData);
  • 交换机错误
  • rountkey错误触发


return模式

  • 统一指定,容器加载完自动实现

  • 路由失败返回

  • 消息从交换机到达队列

image-20231012165742553

@Component
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

//配置回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 未能被路由的消息
* @param i 响应代码
* @param s 响应文本
* @param s1 交换机名称
* @param s2 路由键
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println(String.format("消息发送失败,应答码: %d, 原因: %s, 交换机: %s, 路由键: %s, 消息: %s", i, s, s1, s2, message));
}
});
}
}


  • 总结

基本到达路由都返回ACK

image-20231012193719413

包装生产者发送消息可靠?

1.配置生产者重连机制,在连接mq网络波动情况下会重试进行重新连接,避免网络波动导致消息的发送失败

2.其他原因导致失败,可以使用生产者确认机制,发送消息到mq的时候,mq会返回ack,失败的话会返回anck,根据返回值解决问题

3.以上操作都会增加系统的负担和额外资源开销,在平常情况下一般不推荐开启确认机制,在要求消息可靠性高的情况下开启






数据持久化

image-20231012194617444



  • 交换机持久化 【Spring默认开启】

  • 队列持久化【Spring默认开启】

  • 消息持久化【Spring默认开启】

  • 控制台默认临时

image-20231012194858521

//设置临时消息
Message message = MessageBuilder
.withBody("hello" .getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //设置临时
.build();





Lazy Queue

image-20231012201253029


image-20231012201429054


@RabbitListener(bindings = 
@QueueBinding(
value = @Queue(
name = "lazyC.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
), //声明 队列和类型arguments
exchange = @Exchange(name = "lazy.change",type = ExchangeTypes.DIRECT), //绑定 交换机
key = {"lazy"} //设置 路由键
))
public void listenLazyQueue(Student student){
System.err.println("惰性队列接收到的消息:"+student);
}


  • 总结

image-20231012210308001






消费者可靠



确认机制

spring默认实现了发送回执

image-20231012210539637

image-20231012210813727


spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
acknowledge-mode: auto #确认机制:none,关闭ack,manual,手动ack,auto:自动ack 自动模式会导致无限重试

  • 如果在监听消息方法中,抛出了异常,则继续监听获取数据,直到无异常
throw  new RuntimeException("模拟异常");
throw new MessageConversionException("转换异常");




消息失败策略

image-20231012213240043


spring:
rabbitmq:
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
#acknowledge-mode: auto #确认机制:自动模式会导致无限重试
retry:
enabled: true #开启消费者重试机制
initial-interval: 10000ms #初始的失败等待时长1秒
multiplier: 1 #下次失败的等待时长倍数
max-attempts: 3 #最大重试次数
stateless: true #是否无事务,有 fasle 无 true


防止达到设定重试次数后 删除 消息

设置专门接收错误消息的交换机,在监听组件失败达到设定重试次数后转发投递给错误交换机

image-20231012213640854


image-20231012213807124

  • 配置类定义 交换机 、队列、绑定关系、失败处理策略
  • 声明队列、交换机、绑定关系可以使用注解实现,但失败策略要注册为bean
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true")
public class Error {

@Bean
public DirectExchange errExchange(){
return new DirectExchange("erroe.direct");
}

@Bean
public Queue errQueue(){
return new Queue("error.queue");
}

@Bean
public Binding errorBinding(Queue errQueue,DirectExchange errExchange){
return BindingBuilder.bind(errQueue).to(errExchange).with("error");
}

//失败策略bean
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"erroe.direct","error");
}

}



image-20231012221429144





业务幂等性

  • 幂等性:无论执行多少次,对状态的影响一致

  • 保证非幂等业务幂等性:无论多少次相同重复的消息,只消费一次

image-20231012230143599



方案一:消息设置ID

image-20231012232924743

  • 在消息转换器中开启携带唯一ID、
@Bean
public MessageConverter messageConverter(){
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}


方案二:查询消息是否消费

  • 先查询消息是否消费,消费的前提(业务状态已经得到更新)

image-20231012233642220



image-20231012235147941






延迟消息

image-20231012235713739



死信交换机

  • 用途;【延迟消息】:设置消息过期时间,队列不设置消费者,时间到变成死信,则投递到死信交换机,进入死信队列,死信队列有消费者,则实现延迟获取消息

image-20231013003711034


  • 设置没有消费者的交换机、队列、绑定关系
//定义无消费者的队列
@Configuration
public class Direct {

@Bean
public Queue directQueue(){
Queue queue = new Queue("simpleDirect.queue");
//死信后转投递交换机 dlx.direct
queue.addArgument("x-dead-letter-exchange","dlx.direct");
return queue;
}

@Bean
public FanoutExchange directExchange(){
return new FanoutExchange("simple.direct");
}

@Bean
public Binding directBinding(Queue directQueue,FanoutExchange directExchange){
return BindingBuilder.bind(directQueue).to(directExchange);
}
}

  • 声明死信交换机和队列,并监听
@RabbitListener(bindings = @QueueBinding(
value = @Queue(
name = "dxl.queue"
),
exchange = @Exchange(name = "dlx.direct",type = ExchangeTypes.FANOUT)
))
//@RabbitListener(queues = "dxl.queue")
public void directQueue(String str){
System.err.println("延迟接收到的消息:"+str);
}


  • 测试超时
String exchange = "simple.direct";
String str = "hello";

for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(exchange, "", str, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("20000"); //设置过期时间
return message;
}
});
}




延迟消息插件

投递到交换机后得到设定的延迟时间在投递到队列

  • 消费者

image-20231013142033319



  • 生产者

image-20231013142142221





取消订单案例

  • 把30分钟拆分各段小时间,每次读取一个小段时间,时间段全部用完==30分钟 ==》结束
  • image-20231013143412015

image-20231013143143994



生产者

  • 设置延迟时间划分
//消息体
@Data
public class MultiDelayMessage<T> {
/**
* 消息体
*/
private T data;
/**
* 记录延迟时间的集合
*/
private List<Long> delayMillis;

public MultiDelayMessage(T data, List<Long> delayMillis) {
this.data = data;
this.delayMillis = delayMillis;
}
public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){
return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
}

/**
* 获取并移除下一个延迟时间
* @return 队列中的第一个延迟时间
*/
public Long removeNextDelay(){
return delayMillis.remove(0);
}

/**
* 是否还有下一个延迟时间
*/
public boolean hasNextDelay(){
return !delayMillis.isEmpty();
}
}


  • 定义常量类:交换机与队列
public interface MqConstants {
String DELAY_EXCHANGE = "trade.delay.topic";
String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
String DELAY_ORDER_ROUTING_KEY = "order.query";
}

  • 创建订单发生消息

不提取 MessagePostProcessor

MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);
rabbitTemplate.convertAndSend(
MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(msg.removeNextDelay().intValue());
return null;
}
}
);

  • 提取 MessagePostProcessor 消息处理器
@RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {

private final int delay;

@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(delay);
return message;
}
}
MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);
rabbitTemplate.convertAndSend(
MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
new DelayMessageProcessor(msg.removeNextDelay().intValue())
);




消费者

  • 重新发送该消息(更改延迟时间):重新发送到指定的交换机和对应的队列,设置延迟时间
@Component
@RequiredArgsConstructor
public class OrderStatusCheckListener {

private final IOrderService orderService;
private final RabbitTemplate rabbitTemplate;

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true", type = ExchangeTypes.TOPIC),
key = MqConstants.DELAY_ORDER_ROUTING_KEY
))
public void listenOrderDelayMessage(MultiDelayMessage<Long> msg) {
// 1.查询订单状态
// 2.判断是否已经支付
// TODO 3.去支付服务查询真正的支付状态
// 3.1.已支付,标记订单状态为已支付

// 4.判断是否存在延迟时间
if (msg.hasNextDelay()) {
// 4.1.存在,重发延迟消息
Long nextDelay = msg.removeNextDelay();
rabbitTemplate.convertAndSend(
MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY,
msg, new DelayMessageProcessor(nextDelay.intValue()));
return;
}

// 5.不存在,取消订单
}
}