RabbitMQ
docker run \ -e RABBITMQ_DEFAULT_USER=cRabbitMQ \ -e RABBITMQ_DEFAULT_PASS=cRABBIT.. \ --name mq \ --hostname hchenp \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:latest
|

常见消息模型
基本消息队列(BasicQueue)

依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置文件
spring:
rabbitmq: host: 198.168.238.3 port: 5672 username: itcast password: 123321 virtual-host: /
|
public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("47.115.222.113"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null);
String message = "hello, rabbitmq!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】");
channel.close(); connection.close(); } }
|
public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("47.115.222.113"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("itcast"); factory.setPassword("123321"); Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null);
channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("进入方法"); String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
|
SpringAMQP

应用间消息通信的一种协议,与语言和平台无关。
Spring AMQP 不是 RabbitMQ 的一部分,而是一个用于与 RabbitMQ 集成的 Spring 框架扩展。这使得在 Spring 应用程序中使用 RabbitMQ 变得更加简单和方便。== 简化RabbitMQ客户端
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
发布-生产者
spring: rabbitmq: host: 47.115.222.113 port: 5672 username: itcast password: 123321 virtual-host: / publisher-confirm-type: none publisher-returns: true
|
@Autowired private RabbitTemplate rabbitTemplate;
|
String queueName = "simple.queue"; String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
|
rabbitTemplate.convertAndSend(交换机,绑定Key,消息);
|
消费者
spring: rabbitmq: host: 47.115.222.113 port: 5672 username: itcast password: 123321 virtual-host: / listener: simple: prefetch: 1 acknowledge-mode: auto
|
当消费者声明一个还没有被创建的队列时,RabbitMQ会自动创建这个队列,以便它可以开始接收消息。这是RabbitMQ的默认行为,可以确保消费者在发送消息之前声明队列,而不必担心队列是否存在。
@Configuration public class FanoutConfig {
@Bean public Queue simpleQueue(){ return new Queue("simple.queue"); }
}
@RabbitListener(queues = "simple.queue")
|
@RabbitListener(queuesToDeclare = @Queue(name = "simple.queue")) public void listenSimpleQueue(String msg){ System.out.println("消费者接收到simple.queue的消息:【"+msg+"】"); }
@RabbitListener(queuesToDeclare = @Queue( name = "lazy.queue", durable = "true", //持久化 arguments = @Argument(name = "x-queue-mode",value = "lazy") ))
|
- 监听消息组件
- @RabbitListener(queues= “队列名称”)
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg){ System.out.println("消费者接收到simple.queue的消息:【"+msg+"】"); } @RabbitListener(queuesToDeclare = @Queue(name = "simple.queue")) public void listenSimpleQueue(String msg){ System.out.println("消费者接收到simple.queue的消息:【"+msg+"】"); } }
|
工作消息队列(WorkQueue模型 1:1)

spring: rabbitmq: listener: simple: prefetch: 1
|
- 一个队列绑定多个消费者
- 能者多劳
- 如果不设置分配,会被单一消费者全部拿去慢慢消费
@RabbitListener(queuesToDeclare = @Queue(name = "simple.queue")) public void listenWorkQueue1(String msg) throws InterruptedException { Thread.sleep(20); }
@RabbitListener(queuesToDeclare = @Queue(name = "simple.queue")) public void listenWorkQueue2(String msg) throws InterruptedException { Thread.sleep(200); }
|
交换机类型
- Direct Exchange(直连型交换机):这是最简单的交换机类型。它将消息路由到与消息指定的路由键完全匹配的队列。如果路由键匹配,消息将被发送到相应的队列。
- Fanout Exchange(扇出交换机):扇出交换机将消息发送到与它绑定的所有队列。它忽略消息的路由键。这意味着无论什么路由键,消息都会被发送到与扇出交换机绑定的所有队列。
- Topic Exchange(主题交换机):主题交换机将消息路由到与消息的路由键匹配的队列,路由键可以包含通配符(*和#)。这使得主题交换机能够进行更灵活的消息路由。
- Headers Exchange(头交换机):头交换机根据消息的头部属性进行路由。它会根据消息的头部属性与队列绑定的参数进行匹配。
如果不指定交换机类型,默认的交换机类型是 “Direct Exchange”,并且会将消息路由到与消息指定的路由键完全匹配的队列。
发布订阅模型(交互机 1:n)

发布订阅-FanoutExchange
声明交换机 FanoutExchange
声明队列 Queue
声明交换机和队列的绑定关系 Binding
**即所有的消息都复制分配给 全部队列 **
可bean 声明 or 注解声明
@Configuration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); }
@Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); }
@Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Binding errorBinding(Queue errQueue,DirectExchange errExchange){ return BindingBuilder.bind(errQueue).to(errExchange).with("error"); } }
@RabbitListener(queues = "fanout.queue1")
|
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout.queue1"), exchange = @Exchange(name = "itcast.fanout",type = ExchangeTypes.Fanout), //名字、交换机类型 )) public void listenDirectQueue1(String msg){}
|
在绑定交换机的时候,想设置队列类型
@RabbitListener(bindings = @QueueBinding( value = @Queue( name = "lazy.queue", durable = "true", //持久化 arguments = @Argument(name = "x-queue-mode",value = "lazy") ), exchange = @Exchange(name = "lazy.change",type = ExchangeTypes.Fanout), key = {"lazy"} ))
|
发布
- 现在发布消息给 交换机 而不是队列
- .convertAndSend(交换机名称,队列的绑定key,消息)
String exchangeName = "itcast.fanout";
String message = "hello, every one!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
|
@RabbitListener(queuesToDeclare = @Queue(name = "fanout.queue1")) public void listenFanoutQueue1(String msg) throws InterruptedException { }
@RabbitListener(queuesToDeclare = @Queue(name = "fanout.queue1")) public void listenFanoutQueue2(String msg) throws InterruptedException { }
|
路由-DirectExchange
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

订阅者 注解声明 队列 和其bindingKey
@RabbitListener(bindings = @QueueBinding( //队列绑定 value = @Queue(name = "direct.queue1"), //队列名称 exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //名字、交换机类型 key = {"red","blue"} //绑定key ))
|
- 指定 routingKey :只有对应的队列才能获取
String exchangeName = "itcast.direct"; String message = "hello, blue!"; rabbitTemplate.convertAndSend(exchangeName, "blue", message);
|
@RabbitListener(bindings = @QueueBinding( //队列绑定 value = @Queue(name = "direct.queue1"), //队列名称 exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //名字、类型 key = {"red","blue"} //绑定key )) public void listenDirectQueue1(String msg){ }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //名字、交换机类型 key = {"red","yellow"} )) public void listenDirectQueue2(String msg){}
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue3"), exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //名字、交换机类型 key = {"pink"} )) public void listenDirectQueue3(String msg){}
|
主题 Topic Exchange

@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic" ,type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){}
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic" ,type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){}
|
String exchangeName = "itcast.topic";
String message = "cccccccccccccccc!";
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
|
消息转换器
- 传的消息为对象的情况下,需要序列化,RabbitMQ默认字节
- 发送=接收
1、依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
|
2、配置类
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
|
3、使用
在 convertAndSend 外部定义 message的属性 会发生转换异常,没有实现消息转换
定义消息的属性在:
String str = "hello"; rabbitTemplate.convertAndSend(exchange, "", str, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); message.getMessageProperties().setExpiration("5000"); return message; } });
|
Map<String,Object> msg = new HashMap<>(); msg.put("name","chen"); msg.put("age",21);
rabbitTemplate.convertAndSend("object.queue",msg);
Student student = new Student("辰呀",12); rabbitTemplate.convertAndSend(queue,student);
|
@Bean public Queue objectQueue() { return new Queue("object.queue"); }
|
@RabbitListener(queues = "object.queue") public void objectQueue(Map<String,String> msg){ System.err.println("消费者接收到的对象:"+msg); }
@RabbitListener(queuesToDeclare = @Queue( name = "lazy.queue", durable = "true", arguments = @Argument(name = "x-queue-mode",value = "lazy") )) public void listenLazyQueue(Student student){ System.err.println("惰性队列接收到的消息:"+student); }
|
生产者可靠
连接重连

消息确认
监听消息是否成功传递



spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true
|
confirm模式
每个发消息都需要自定义指定,回调获取
消息从生产者到达交换机

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable throwable) { System.err.println("消息回调失败"+throwable); }
@Override public void onSuccess(CorrelationData.Confirm result) { System.err.println("收到confirm callback回调"); if (result.isAck()){ System.err.println("消息发送成功 收到ack"); }else { System.err.println("发送消息失败,收到 nack,reason:"+result.getReason()); } } });
String message = "这是confirm模式的消息"; String exhcang = "exchange_confirmC"; rabbitTemplate.convertAndSend(exhcang,"confirm",message,correlationData);
|
return模式
统一指定,容器加载完自动实现
路由失败返回
消息从交换机到达队列

@Component public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println(String.format("消息发送失败,应答码: %d, 原因: %s, 交换机: %s, 路由键: %s, 消息: %s", i, s, s1, s2, message)); } }); } }
|
基本到达路由都返回ACK

包装生产者发送消息可靠?
1.配置生产者重连机制,在连接mq网络波动情况下会重试进行重新连接,避免网络波动导致消息的发送失败
2.其他原因导致失败,可以使用生产者确认机制,发送消息到mq的时候,mq会返回ack,失败的话会返回anck,根据返回值解决问题
3.以上操作都会增加系统的负担和额外资源开销,在平常情况下一般不推荐开启确认机制,在要求消息可靠性高的情况下开启
数据持久化

交换机持久化 【Spring默认开启】
队列持久化【Spring默认开启】
消息持久化【Spring默认开启】
控制台默认临时

Message message = MessageBuilder .withBody("hello" .getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) .build();
|
Lazy Queue


@RabbitListener(bindings = @QueueBinding( value = @Queue( name = "lazyC.queue", durable = "true", arguments = @Argument(name = "x-queue-mode",value = "lazy") ), //声明 队列和类型arguments exchange = @Exchange(name = "lazy.change",type = ExchangeTypes.DIRECT), //绑定 交换机 key = {"lazy"} //设置 路由键 )) public void listenLazyQueue(Student student){ System.err.println("惰性队列接收到的消息:"+student); }
|

消费者可靠
确认机制
spring默认实现了发送回执


spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: auto
|
- 如果在监听消息方法中,抛出了异常,则继续监听获取数据,直到无异常
throw new RuntimeException("模拟异常"); throw new MessageConversionException("转换异常");
|
消息失败策略

spring: rabbitmq: listener: simple: prefetch: 1 retry: enabled: true initial-interval: 10000ms multiplier: 1 max-attempts: 3 stateless: true
|
防止达到设定重试次数后 删除 消息
设置专门接收错误消息的交换机,在监听组件失败达到设定重试次数后转发投递给错误交换机


- 配置类定义 交换机 、队列、绑定关系、失败处理策略
- 声明队列、交换机、绑定关系可以使用注解实现,但失败策略要注册为bean
@Configuration @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true") public class Error {
@Bean public DirectExchange errExchange(){ return new DirectExchange("erroe.direct"); }
@Bean public Queue errQueue(){ return new Queue("error.queue"); }
@Bean public Binding errorBinding(Queue errQueue,DirectExchange errExchange){ return BindingBuilder.bind(errQueue).to(errExchange).with("error"); }
@Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate,"erroe.direct","error"); }
}
|

业务幂等性

方案一:消息设置ID

@Bean public MessageConverter messageConverter(){ Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }
|
方案二:查询消息是否消费
- 先查询消息是否消费,消费的前提(业务状态已经得到更新)


延迟消息

死信交换机
- 用途;【延迟消息】:设置消息过期时间,队列不设置消费者,时间到变成死信,则投递到死信交换机,进入死信队列,死信队列有消费者,则实现延迟获取消息

@Configuration public class Direct {
@Bean public Queue directQueue(){ Queue queue = new Queue("simpleDirect.queue"); queue.addArgument("x-dead-letter-exchange","dlx.direct"); return queue; }
@Bean public FanoutExchange directExchange(){ return new FanoutExchange("simple.direct"); }
@Bean public Binding directBinding(Queue directQueue,FanoutExchange directExchange){ return BindingBuilder.bind(directQueue).to(directExchange); } }
|
@RabbitListener(bindings = @QueueBinding( value = @Queue( name = "dxl.queue" ), exchange = @Exchange(name = "dlx.direct",type = ExchangeTypes.FANOUT) ))
public void directQueue(String str){ System.err.println("延迟接收到的消息:"+str); }
|
String exchange = "simple.direct"; String str = "hello";
for (int i = 0; i < 5; i++) { rabbitTemplate.convertAndSend(exchange, "", str, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("20000"); return message; } }); }
|
延迟消息插件
投递到交换机后得到设定的延迟时间在投递到队列


取消订单案例
- 把30分钟拆分各段小时间,每次读取一个小段时间,时间段全部用完==30分钟 ==》结束


生产者
@Data public class MultiDelayMessage<T> {
private T data;
private List<Long> delayMillis;
public MultiDelayMessage(T data, List<Long> delayMillis) { this.data = data; this.delayMillis = delayMillis; } public static <T> MultiDelayMessage<T> of(T data, Long ... delayMillis){ return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis)); }
public Long removeNextDelay(){ return delayMillis.remove(0); }
public boolean hasNextDelay(){ return !delayMillis.isEmpty(); } }
|
public interface MqConstants { String DELAY_EXCHANGE = "trade.delay.topic"; String DELAY_ORDER_QUEUE = "trade.order.delay.queue"; String DELAY_ORDER_ROUTING_KEY = "order.query"; }
|
不提取 MessagePostProcessor
MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L); rabbitTemplate.convertAndSend( MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(msg.removeNextDelay().intValue()); return null; } } );
|
- 提取
MessagePostProcessor
消息处理器
@RequiredArgsConstructor public class DelayMessageProcessor implements MessagePostProcessor {
private final int delay;
@Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(delay); return message; } }
|
MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L); rabbitTemplate.convertAndSend( MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg, new DelayMessageProcessor(msg.removeNextDelay().intValue()) );
|
消费者
- 重新发送该消息(更改延迟时间):重新发送到指定的交换机和对应的队列,设置延迟时间
@Component @RequiredArgsConstructor public class OrderStatusCheckListener {
private final IOrderService orderService; private final RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"), exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true", type = ExchangeTypes.TOPIC), key = MqConstants.DELAY_ORDER_ROUTING_KEY )) public void listenOrderDelayMessage(MultiDelayMessage<Long> msg) { if (msg.hasNextDelay()) { Long nextDelay = msg.removeNextDelay(); rabbitTemplate.convertAndSend( MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg, new DelayMessageProcessor(nextDelay.intValue())); return; } } }
|