RabbitMQ


RabbitMQ

消息队列

消息中间件,一个使用队列来通信的组件。

功能

异步处理

服务解耦

流量控制

Docker安装

docker pull rabbitmq:3.8.3-management
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /home/rabbit_docker:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.8.3-management
-d 后台运行容器;
--name 指定容器名;
-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
-v 映射目录或文件;
--hostname  主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
-e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认用户名的密码)

Windows下RabbitMQ安装及配置

安装Erlang

下载地址:https://www.erlang.org/downloads
设置环境变量,新建ERLANG_HOME 指向安装目录
修改环境变量path,增加Erlang变量至path,%ERLANG_HOME%\bin;
cmd输入erl,查看是否更改成功

安装rabbitmq

下载地址:http://www.rabbitmq.com/download.html
启动rabbitmq
浏览器访问http://localhost:15672,查看是否启动成功

输入账户密码进入后台管理界面,默认账户密码都是guest
后台管理界面可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等

概念

名词

  • Broker:接收分发消息,就是rabbitmq服务端
  • Virtual host:出于多租户和安全考虑,分组,类似mysql中的database概念
  • Connection:生产者,消费者和broker直接的tcp连接
  • Channel:Channel是在Connection内部建立逻辑连接,减少建立TCP连接的开销,通常每个线程建立一个channel,channel之间完全隔离
  • Exchange:消息到达broker中首先到达交换机,根据分发规则,匹配routing key,分发消息到queue中
  • Queue:队列,存放消息地方,被消费者取走,FIFO
  • Binding:exchange和queue之间的虚拟连接,Bingding信息被保存到exchange中的查询表中,用于消息分发依据

工作模式

简单模式

生产者 -> MQ <- 消费者
一个生产者对应一个消费者

work queues工作队列模式

一个生产者对应多个消费者,多个消费者共同消费同一个队列中的消息

发布订阅模式

引入Fanout广播交换机,接收消息,处理消息分发给队列,把消息分发给所有绑定到交换机的队列

路由模式

Direct定向交换机,把消息交给符合指定routing key的队列,对消息做筛选

Topics主题模式

Topic交换机,把消息交给符合routing pattern路由模式的队列,路由key一般有一个或多个单词组成,单词之间用.隔开。#匹配多个单词:item.#(item.one.two, item.one) *匹配一个单词:(item.one)

RPC远程调用模式

rabbitmq代码实现

消息推送和接收流程

简单模式

<dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>5.4.3</version>
   </dependency>
public class RabbitUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();
    static {
        connectionFactory.setHost("192.168.31.8");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("zwq");
        connectionFactory.setPassword("zwqzwq");
        connectionFactory.setVirtualHost("test");
    }
    public static Connection getConnection() {
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
        } catch (Exception e) {
            throw new RuntimeException();
        }
        return connection;
    }
}
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取TCP长连接
        Connection connection = RabbitUtils.getConnection();
        // 创建通道,相当于TCP中虚拟连接
        Channel channel = connection.createChannel();
        // 创建队列
        // 1队列名称 2是否持久化,MQ退出消息就丢失 3是否私有化,false代表所有消费者都可以访问,true代表第一次拥有它的消费者才能一直使用
        // 4是否自动删除,false代表连接停掉后不自动删除这个队列 5其它额外参数
        channel.queueDeclare("helloWordQueue1", false, false, false, null);
        String message = "this is message 1";
        // 1交换机,简单模式不需要交换机 2队列名称 3额外设置信息 4消息(字节数组)
        channel.basicPublish("", "helloWordQueue1", null, message.getBytes());
        channel.close();
        connection.close();
        System.out.println("数据发送完毕");
    }
}
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取TCP长连接
        Connection connection = RabbitUtils.getConnection();
        // 创建通道,相当于TCP中虚拟连接
        Channel channel = connection.createChannel();
        // 创建队列
        // 1队列名称 2是否持久化,MQ退出消息就丢失 3是否私有化,false代表所有消费者都可以访问,true代表第一次拥有它的消费者才能一直使用
        // 4是否自动删除,false代表连接停掉后不自动删除这个队列 5其它额外参数
        channel.queueDeclare("helloWordQueue1", false, false, false, null);
        // 获取数据 1队列名称 2是否自动确认收到消息,false代表手动编程确认消息(MQ推荐手动) 3要传入DefaultConsumer实现类
        channel.basicConsume("helloWordQueue1", false, new Reciver(channel));
    }
}

class Reciver extends DefaultConsumer {
    private Channel channel;
    // 重写构造方法把通道传入
    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("消息:" + message);
        System.out.println("消息tagId:" + envelope.getDeliveryTag());
        // 确认消息,false代表只确认当前消息,true代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

工作队列模式

// 生产者
public class OrderSystem {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        // 创建通道,相当于TCP中虚拟连接
        Channel channel = connection.createChannel();
        // 创建队列
        channel.queueDeclare("WorkQueue1", false, false, false, null);
        String message = "phone:14523123,name:zzz,message:hello ";
        for (int i = 0; i < 100; i++) {
            String send = message + i;
            // 1交换机,工作队列模式不需要交换机 2队列名称 3额外设置信息 4消息(字节数组)
            channel.basicPublish("", "WorkQueue1", null, send.getBytes());
        }
        System.out.println("数据发送完毕");
        channel.close();
        connection.close();
    }
}
// 这里写多个消费者去消费
public class SMSSystem1 {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取TCP长连接
        Connection connection = RabbitUtils.getConnection();
        // 创建通道,相当于TCP中虚拟连接
        Channel channel = connection.createChannel();
        // 创建队列
        channel.queueDeclare("WorkQueue1", false, false, false, null);
        // 不加这个MQ是平均分配,加了之后消费者处理(确认消息)完一个后继续处理
        channel.basicQos(1);
        // 获取数据 1队列名称 2是否自动确认收到消息,false代表手动编程确认消息(MQ推荐手动) 3要传入DefaultConsumer实现类
        channel.basicConsume("WorkQueue1", false, new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                Thread.sleep(10);
                System.out.println("短信系统1   -> 消费消息:" + message);
                // 确认消息,false代表只确认当前消息,true代表签收该消费者所有未签收的消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

发布订阅模式

  • 使用fanout交换机
    public class PubSystem {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            String send = "发布订阅模式信息";
            // 1交换机 2队列名称 3额外设置信息 4消息(字节数组)
            // 需要在rabbit创建pubExchange这个交换机
            channel.basicPublish("pubExchange", "", null, send.getBytes());
            channel.close();
            connection.close();
        }
    }
    public class Sub1 {
        public static void main(String[] args) throws IOException {
            // 获取TCP长连接
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            // 创建队列
            channel.queueDeclare("subQueue1", false, false, false, null);
            // 绑定交换机 1队列名字 2交换机名字 3路由key,发布订阅模式用不到
            channel.queueBind("subQueue1", "pubExchange", "");
    
            // 不加这个MQ是平均分配,加了之后消费者处理(确认消息)完一个后继续处理
            channel.basicQos(1);
            // 获取数据 1队列名称 2是否自动确认收到消息,false代表手动编程确认消息(MQ推荐手动) 3要传入DefaultConsumer实现类
            channel.basicConsume("subQueue1", false, new DefaultConsumer(channel) {
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    Thread.sleep(10);
                    System.out.println("订阅1   -> 消费消息:" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }

路由模式

  • 使用direct交换机
    public class RoutingSystem {
        public static void main(String[] args) throws IOException, TimeoutException {
            Map map = new LinkedHashMap<String, String>();
            map.put("chongqing", "重庆天气预报");
            map.put("hunan", "湖南天气预报");
            map.put("beijing", "北京天气预报");
            map.put("xinjiang", "新疆天气预报");
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            Iterator<Map.Entry<String, String>> itr = map.entrySet().iterator();
            while (itr.hasNext()) {
                Map.Entry<String, String> temp = itr.next();
                // 1交换机 2路由key 3额外设置信息 4消息(字节数组)
                // 消息带有标记
                channel.basicPublish("routingExchange", temp.getKey(), null, temp.getValue().getBytes());
            }
            channel.close();
            connection.close();
        }
    }
    public class RoutingConsumer1 {
        public static void main(String[] args) throws IOException {
            // 获取TCP长连接
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            // 创建队列
            channel.queueDeclare("routingQueue1", false, false, false, null);
            // 绑定交换机 1队列名字 2交换机名字 3路由key,发布订阅模式用不到
            channel.queueBind("routingQueue1", "routingExchange", "chongqing");
            // 不加这个MQ是平均分配,加了之后消费者处理(确认消息)完一个后继续处理
            channel.basicQos(1);
            // 获取数据 1队列名称 2是否自动确认收到消息,false代表手动编程确认消息(MQ推荐手动) 3要传入DefaultConsumer实现类
            channel.basicConsume("routingQueue1", false, new DefaultConsumer(channel) {
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    Thread.sleep(10);
                    System.out.println("路由1   -> 消费chongqing消息:" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }
    public class RoutingConsumer2 {
        public static void main(String[] args) throws IOException {
            // 获取TCP长连接
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            // 创建队列
            channel.queueDeclare("routingQueue2", false, false, false, null);
            // 绑定交换机 1队列名字 2交换机名字 3路由key,发布订阅模式用不到
            channel.queueBind("routingQueue2", "routingExchange", "chongqing");
            channel.queueBind("routingQueue2", "routingExchange", "beijing");
            channel.queueBind("routingQueue2", "routingExchange", "hunan");
            channel.queueBind("routingQueue2", "routingExchange", "none");
            // 不加这个MQ是平均分配,加了之后消费者处理(确认消息)完一个后继续处理
            channel.basicQos(1);
            // 获取数据 1队列名称 2是否自动确认收到消息,false代表手动编程确认消息(MQ推荐手动) 3要传入DefaultConsumer实现类
            channel.basicConsume("routingQueue2", false, new DefaultConsumer(channel) {
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    Thread.sleep(10);
                    System.out.println("路由2   -> 消费重庆北京湖南none消息:" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }

通配符模式

  • 使用topic交换机
    public class TopicSystem {
        public static void main(String[] args) throws IOException, TimeoutException {
            Map map = new LinkedHashMap<String, String>();
            map.put("aa.chongqing.now", "重庆天气预报");
            map.put("cc.hunan.future", "湖南天气预报");
            map.put("aa.beijing.future", "北京天气预报");
            map.put("cc.xinjiang.now", "新疆天气预报");
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            Iterator<Map.Entry<String, String>> itr = map.entrySet().iterator();
            while (itr.hasNext()) {
                Map.Entry<String, String> temp = itr.next();
                // 1交换机 2路由key 3额外设置信息 4消息(字节数组)
                // 消息带有标记
                channel.basicPublish("topicExchange", temp.getKey(), null, temp.getValue().getBytes());
            }
            channel.close();
            connection.close();
        }
    }
    public class TopicConsumer1 {
        public static void main(String[] args) throws IOException {
            // 获取TCP长连接
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            // 创建队列
            channel.queueDeclare("topicQueue1", false, false, false, null);
            // 绑定交换机 1队列名字 2交换机名字 3路由key,发布订阅模式用不到
            channel.queueBind("topicQueue1", "topicExchange", "*.*.now");
            // 不加这个MQ是平均分配,加了之后消费者处理(确认消息)完一个后继续处理
            channel.basicQos(1);
            // 获取数据 1队列名称 2是否自动确认收到消息,false代表手动编程确认消息(MQ推荐手动) 3要传入DefaultConsumer实现类
            channel.basicConsume("topicQueue1", false, new DefaultConsumer(channel) {
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    Thread.sleep(10);
                    System.out.println("通配符1   -> 消费*.*.now消息:" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }
    public class TopicConsumer2 {
        public static void main(String[] args) throws IOException {
            // 获取TCP长连接
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            // 创建队列
            channel.queueDeclare("topicQueue2", false, false, false, null);
            // 绑定交换机 1队列名字 2交换机名字 3路由key,发布订阅模式用不到
            channel.queueBind("topicQueue2", "topicExchange", "cc.#");
            // 不加这个MQ是平均分配,加了之后消费者处理(确认消息)完一个后继续处理
            channel.basicQos(1);
            // 获取数据 1队列名称 2是否自动确认收到消息,false代表手动编程确认消息(MQ推荐手动) 3要传入DefaultConsumer实现类
            channel.basicConsume("topicQueue2", false, new DefaultConsumer(channel) {
                @SneakyThrows
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body);
                    Thread.sleep(10);
                    System.out.println("通配符2   -> 消费cc.#消息:" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
    }

消息确认机制

rabbitmq提供监听器来接收消息投递的状态,消息确认涉及两种状态Confirm Return(代表生产者和broker之间的投递关系,与消费者是否确认消息无关)

  • confirm:代表生产者将消息送到broker有两种状态:1ack:代表broker已经接收消息 2nack:broker拒收消息(队列满,限流,IO异常)
  • return:消息被broker正常接收ack后,但没有队列消费,消息会回退给生产者
    public class TopicSystem {
        public static void main(String[] args) throws IOException, TimeoutException {
            Map map = new LinkedHashMap<String, String>();
            map.put("aa.chongqing.now", "重庆天气预报");
            map.put("cc.hunan.future", "湖南天气预报");
            map.put("aa.beijing.future", "北京天气预报");
            map.put("cc.xinjiang.now", "新疆天气预报");
            Connection connection = RabbitUtils.getConnection();
            // 创建通道,相当于TCP中虚拟连接
            Channel channel = connection.createChannel();
            // 开启监听模式
            channel.confirmSelect();
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    // l:tag,b:接收的数据是否为批量接收
                    System.out.println("消息已被接收:tag:" + l);
                }
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println("消息已被broker拒收,tag:" + l);
                }
            });
            channel.addReturnListener(new ReturnCallback() {
                @Override
                public void handle(Return r) {
                    System.out.println("Return编码:" + r.getReplyCode() + "描述:" + r.getReplyText());
                    System.out.println("交换机:" + r.getExchange() + "路由key:" + r.getRoutingKey());
                    System.out.println("return主题:" + new String(r.getBody()));
                }
            });
            Iterator<Map.Entry<String, String>> itr = map.entrySet().iterator();
            while (itr.hasNext()) {
                Map.Entry<String, String> temp = itr.next();
                // 1交换机 2路由key 3额外设置信息 4消息(字节数组)
                // 消息带有标记
                channel.basicPublish("topicExchange", temp.getKey(), true, null, temp.getValue().getBytes());
            }
        }
    }

springboot

配置

<!--rabbitmq springboot starter-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  #配置rabbitMq 服务器
  rabbitmq:
    host: 192.168.31.8
    port: 5672
    username: zzz
    password: zzzzzz
    virtual-host: test

生产者

  • 配置
    @Configuration
    public class RabbitConfig {
        /**
         * 交换机名字
         */
        public static final String EXCHANGE_NAME = "boot_topic_exchange";
        /**
         * 队列名字
         */
        public static final String QUEUE_NAME = "boot_queue";
    
        /**
         * description: 声明交换机
         */
        @Bean("bootExchange")
        public Exchange bootExchange() {
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        }
    
        /**
         * description: 声明队列
         */
        @Bean("bootQueue")
        public Queue bootQueue() {
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
    
        /**
         * description: 队列与交换机绑定
         */
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
        }
    }
  • 发送消息
    @SpringBootTest
    @RunWith(SpringRunner.class)
    class RabbitProducerApplicationTests {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Test
        void send() {
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "boot.zzz", "springboot rabbit message");
        }
    }

消费者

@Component
public class MyRabbitListener {
    /**
     * description: 信息监听,监听哪个队列
     */
    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message) {
        System.out.println(message);
    }

}

保证消息可靠

  1. 保证broker高可用,搭集群
  2. exchange要持久化
  3. queue要持久化
  4. message要持久化
  5. 生产者confirm
  6. 消费者ack
  • 生产者
    spring:
      #配置rabbitMq 服务器
      rabbitmq:
        publisher-confirm-type: correlated # 消息从生产者发送到交换机后触发回调方法
    @SpringBootTest
    @RunWith(SpringRunner.class)
    class RabbitProducerApplicationTests {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Test
        void send() throws InterruptedException {
            // 定义回调,消息无法到达交换机时会回调
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 * description: correlationData相关配置信息 b是否接收成功 s失败原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean b, String s) {
                    if (b) {
                        System.out.println(correlationData);
                        System.out.println("发送成功");
                        // 接收成功
                    } else {
                        System.out.println(correlationData);
                        System.out.println("发送失败:" + s);
                        // 失败,重新发送
                    }
                }
            });
            // 设置处理失败消息模式,true:消息到不了队列,会返回给生产者
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage) {
                    System.out.println("没有队列绑定");
                    System.out.println("Return编码:" + returnedMessage.getReplyCode() + "描述:" + returnedMessage.getReplyText());
                    System.out.println("交换机:" + returnedMessage.getExchange() + "路由key:" + returnedMessage.getRoutingKey());
                    System.out.println("return主题:" + new String(returnedMessage.getMessage().getBody()));
                }
            });
            rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, "boot.zzz", "springboot rabbit message");
            Thread.sleep(1000);
        }
    }
  • 消费者
    @Configuration
    public class RabbitConfig {
    
        @Resource
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory;
    
        @Bean
        public SimpleMessageListenerContainer SimpleRabbitListenerContainerFactory() {
            SimpleMessageListenerContainer simpleMessageListenerContainer =
                    simpleRabbitListenerContainerFactory.createListenerContainer();
            simpleMessageListenerContainer.setQueueNames("boot_queue");
            // 可以动态修改接收消息模式
            // simpleMessageListenerContainer.removeQueueNames("boot_queue.*.a");
            // 设置多个并发消费者一起消费,并支持运行时动态修改
            simpleMessageListenerContainer.setConcurrentConsumers(1);
            // 设置最大消费者5
            simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
            //设置是否重回队列
            // simpleMessageListenerContainer.setDefaultRequeueRejected(false);
            // 设置手动签收
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            //设置监听外露
            simpleMessageListenerContainer.setExposeListenerChannel(true);
            // 将实现了ChannelAwareMessageListener接口的MyConsumer类的实例传进setMessageListener中
            simpleMessageListenerContainer.setMessageListener(new AckListener());
            return simpleMessageListenerContainer;
    
        }
    }
    @Component
    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            // 消息ID
            long tag = message.getMessageProperties().getDeliveryTag();
            try {
                System.out.println(message);
    			// 这里报错消息重回队列
                int i = 1 / 0;
                // 确认消息,false代表只确认当前消息,true代表签收该消费者所有未签收的消息
                channel.basicAck(tag, true);
                // 可以利用redis存储重试次数,次数上限停止重试,再把消息缓存起来
            } catch (Exception e) {
                System.out.println("异常");
                // 最后一个参数:true:重回队列,false:消息丢弃
                channel.basicNack(tag, true, true);
            }
        }
    }

消费限流

消费端模式一定为手动确认模式AcknowledgeMode.MANUAL

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 2 # 消费端每个线程每次拉去2条数据,直到确认消费完毕才拉去下2条

延迟队列

消息进入队列不会立即消费,到达指定时间后消费
场景:下单30min未支付,取消订单,回滚库存
场景:新用户注册7天,发送短信
实现:定时器,延迟队列
rabbitmq是实现延迟队列需要:TTL + 死信队列

TTL

存活时间/过期时间
消息到达存活时间还没有被消费,就会自动清除
rabbitmq可以对消息设置存活时间,也可以对queue设置存活时间

/**
    * ttl交换机
    */
   @Bean("ttlExchange")
   public Exchange ttlExchange() {
       return ExchangeBuilder.topicExchange("ttl_exchange").durable(true).build();
   }
   /**
    * 配置队列的时候顺带上ttl()方法 其内部对MQ设置了参数"x-message-ttl"
    * 单位是毫秒
    */
   @Bean("ttlQueue")
   public Queue ttlQueue(){
       return QueueBuilder.durable("ttl_queue").ttl(10000).build();
   }
   /**
    * description: 队列与交换机绑定
    */
   @Bean
   public Binding bindTtlQueueExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
   }
// 这些消息只能存活10S
for (int i = 0; i < 100; i++) {
    rabbitTemplate.convertAndSend("ttl_exchange", "ttl.zzz", "ttl rabbit message");
}

死信队列

死信队列DLX:rabbitmq叫死信交换机:当消息成为dead message后,可以被重新发送到另一个交换机,这个交换机就是死信交换机
如果配置了死信队列,之前会被丢弃的信息会放到 -> 死信交换机
成为死信三种情况

  1. 队列消息长度达到限制
  2. 消费者拒收消息,并且不重回队列
  3. 原队列存在消息过期设置,消息到达超时时间未被消费
  • 配置死信队列
    /**
        * 死信队列
        */
       @Bean("deadQueue")
       public Queue deadLetterQueue(){
           return QueueBuilder.durable("dead_queue").build();
       }
    
       /**
        * 死信交换机
        */
       @Bean("deadExchange")
       public DirectExchange deadLetterExchange(){
           return ExchangeBuilder.directExchange("dead_exchange").durable(true).build();
       }
    
       /**
        * 死信队列绑定
        */
       @Bean
       public Binding bindDeadQueueExchange(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") Exchange exchange) {
           return BindingBuilder.bind(queue).to(exchange).with("dead.#").noargs();
       }
  • 配置TTL队列,需要把队列绑定到死信交换机
    /**
        * ttl交换机
        */
       @Bean("ttlExchange")
       public Exchange ttlExchange() {
           return ExchangeBuilder.topicExchange("ttl_exchange").durable(true).build();
       }
    
       /**
        * ttl队列,配置队列的时候顺带上ttl()方法 其内部对MQ设置了参数"x-message-ttl"
        */
       @Bean("ttlQueue")
       public Queue ttlQueue(){
           HashMap<String, Object> map = new HashMap<>(2);
           // 配置当前队列绑定的死信交换器
           map.put("x-dead-letter-exchange", "dead_exchange");
           // 配置当前队列的死信队列路由key,如果不设置默认为当前队列的路由key
           map.put("x-dead-letter-routing-key", "dead.#");
           return QueueBuilder.durable("ttl_queue").ttl(10000).withArguments(map).build();
       }
    
       /**
        * description: 队列与交换机绑定
        */
       @Bean
       public Binding bindTtlQueueExchange(@Qualifier("ttlQueue") Queue queue, @Qualifier("ttlExchange") Exchange exchange) {
           return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
       }

实现延迟队列

TTL队列绑定死信交换机,监听死信队列即可
TTL过期时间就是延迟时间

@Component
public class MyRabbitListener {
    /**
     * description: 信息监听,监听哪个队列
     */
    @RabbitListener(queues = "dead_queue")
    public void ListenerQueue(Message message) {
        System.out.println("DEAD_MESSAGE:" + message);
    }
}

消息积压

消费者宕机,消费者能力不足
解决:上线更多消费者,把消息取出放到数据库,然后慢慢处理

消息幂等性保障

消费多条相同的消息,得到与消费该消息一次相同的结果
可以使用乐观锁机制实现,更新数据时判断version

集群

rabbit集群

  1. 启动
    docker run -d --name rabbit1 -p 5672:5672 -p 15672:15672 -v /home/rabbit_docker/node1:/var/lib/rabbitmq --hostname myRabbit1 -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.8.3-management
    docker run -d --name rabbit2 -p 5673:5672 -p 15673:15672 -v /home/rabbit_docker/node2:/var/lib/rabbitmq --hostname myRabbit2 -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' --link rabbit1:myRabbit1 rabbitmq:3.8.3-management
  2. 加入集群节点
    进入节点2把它加入节点1
    node2:
    docker exec -it rabbit2 bash
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbit@myRabbit1
    rabbitmqctl start_app
    exit
    如果需要移除节点
    rabbitmqctl forget_cluster_node rabbit@myRabbit2

HAProxy负载均衡

yum install haproxy
vim /etc/haproxy/haproxy.cfg
systemctl start haproxy

haproxy.cfg:

global
    log         127.0.0.1 local2
    chroot      /var/lib/haproxy
    pidfile     /var/run/haproxy.pid
    maxconn     4000
    user        haproxy
	group       haproxy
    daemon
    # turn on stats unix socket
    stats socket /var/lib/haproxy/stats
#---------------------------------------------------------------------
# common defaults that all the 'listen' and 'backend' sections will
# use if not designated in their block
#---------------------------------------------------------------------
defaults
    mode                    http
    log                     global
    option                  httplog
    option                  dontlognull
    option http-server-close
    option forwardfor       except 127.0.0.0/8
    option                  redispatch
    retries                 3
    timeout http-request    10s
    timeout queue           1m
    timeout connect         10s
    timeout client          1m
    timeout server          1m
    timeout http-keep-alive 10s
    timeout check           10s
    maxconn                 3000

listen rabbitmq_cluster
    bind 0.0.0.0:5680 #通过5680对两个节点映射
	option tcplog #记录tcp连接状态和时间
    mode tcp #配置TCP模式
	option clitcpka #开启tcp keep alive长连接模式
	timeout connect 1s # haproxy与mq建立连接超时时间
	timeout client 10s # 客户端与haproxy最大空闲时间
    balance roundrobin #简单的轮询
    # rabbitmq集群节点配置
    # inter 每隔五秒对mq集群做健康检查,2次正确证明服务器可用,2次失败证明服务器不可用,并且配置主备机制
    server rabbit1 192.168.31.8:5672 check inter 5000 rise 2 fall 2
    server rabbit2 192.168.31.8:5673 check inter 5000 rise 2 fall 2
#配置haproxy web监控,查看统计信息
listen http_front
    bind 0.0.0.0:1080
    stats refresh 30s
	stats uri /haproxy_stats
	stats auth admin:admin

如果启动不了can not bind …socket

绑定非本机的IP需要在sysctl.conf文件中添加配置: net.ipv4.ip_nonlocal_bind=1
vi /etc/sysctl.conf
sysctl -p

如果还是不行可能是selinux的问题

关闭selinux
setenforce 0

监控地址:
http://192.168.31.8:1080/haproxy_stats
集群地址:
192.168.31.8:5680


  目录