RocketMQ


RocketMQ

概念

https://rocketmq.apache.org

RocketMQ由以下组件组成

  • NameServer:提供轻量级broker路由服务
  • Broker:实际处理消息存储,核心组件
  • Producer:消息生产者集群
  • Consumer:消费者集群
  • Topic:区分消息种类,一个发送者可以发送消息给一个或多个Topic,一个消费者可以订阅一个或多个Topic消息
    启动rocket服务首先需要启动nameserver

5.0引入proxy,客户端与proxy直接交互,proxy再与nameserver broker交互

搭建

安装

useradd rocket
passwd rocket
groupadd mq
usermod -g mq rocket
mkdir -p /home/rocket
chown rocket:mq /home/rocket
下载zip解压,把...bin-release放到rocket文件夹
添加环境变量
vi ~/.bash_profile
export ROCKETMQ_HOME=/home/rocket/rocketmq-all-5.1.1-bin-release
PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME...
export PATH
执行下面让环境变量生效
source ~/.bash_profile

启动

启动nameserver
vi runserver.sh
调整JVM内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:Meta...."
nohup sh bin/mqnamesrv &
启动broker
vi runbroker.sh
调整JVM内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
修改conf/broker.conf,最下面添加
autoCreateTopicEnable=true
启动
// nohup sh mqbroker &
nohup sh mqbroker -n 192.168.31.8:9876 --enable-proxy &

关闭

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

管理控制台

https://github.com/apache/rocketmq-dashboard

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
Docker
docker pull apacherocketmq/rocketmq-dashboard:latest
docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.31.8:9876" -p 18080:8080 -t apacherocketmq/rocketmq-dashboard:latest

http://192.168.31.8:18080

生产者

消息发送

  • 正常发送:发送消息到broker还需等待broker返回信息,然后代码才能继续执行
  • 异步发送:发送消息到broker后不管,继续执行代码。之后broker会回调生产者的方法,告诉生产者是否发送成功
    正常发送
    public static void main(String[] args) throws ClientException {
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoint = "192.168.31.8:8080";
            // 消息发送的目标Topic名称,需要提前创建。
            String topic = "TestTopic";
            // 权限
            String accessKey = "yourAccessKey";
            String secretKey = "yourSecretKey";
            SessionCredentialsProvider sessionCredentialsProvider =
                    new StaticSessionCredentialsProvider(accessKey, secretKey);
            ClientServiceProvider provider = ClientServiceProvider.loadService();
            ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
            ClientConfiguration configuration = builder.build();
            // 初始化Producer时需要设置通信配置以及预绑定的Topic。
            Producer producer = provider.newProducerBuilder()
                    .setTopics(topic)
                    .setClientConfiguration(configuration)
                    .build();
            // 普通消息发送。
            Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    // 设置消息索引键,可根据关键字精确查找某条消息。
                    .setKeys("messageKey")
                    // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                    .setTag("messageTag")
                    // 消息体。
                    .setBody("messageBody".getBytes())
                    .build();
            try {
                // 发送消息,需要关注发送结果,并捕获失败等异常。
                SendReceipt sendReceipt = producer.send(message);
                producer.send(message);
                logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
            } catch (ClientException e) {
                logger.error("Failed to send message", e);
            }
            // producer.close();
        }
    异步发送
    private static final Logger logger = LoggerFactory.getLogger(AsyncProducerExample.class);
        public static void main(String[] args) throws ClientException, InterruptedException, IOException {
            // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
            String endpoint = "192.168.31.8:8080";
            // 消息发送的目标Topic名称,需要提前创建。
            String topic = "TestTopic";
            // 权限
            String accessKey = "yourAccessKey";
            String secretKey = "yourSecretKey";
            SessionCredentialsProvider sessionCredentialsProvider =
                    new StaticSessionCredentialsProvider(accessKey, secretKey);
            ClientServiceProvider provider = ClientServiceProvider.loadService();
            ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
            ClientConfiguration configuration = builder.build();
            // 初始化Producer时需要设置通信配置以及预绑定的Topic。
            Producer producer = provider.newProducerBuilder()
                    .setTopics(topic)
                    .setClientConfiguration(configuration)
                    .build();
            // 普通消息发送。
            Message message = provider.newMessageBuilder()
                    .setTopic(topic)
                    // 设置消息索引键,可根据关键字精确查找某条消息。
                    .setKeys("key2")
                    // 设置消息Tag,用于消费端根据指定Tag过滤消息。
                    .setTag("tag2")
                    // 消息体。
                    .setBody("async message".getBytes())
                    .build();
            // 创建自定义线程池接收回调
            final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
            ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
            future.whenCompleteAsync((sendReceipt, throwable) -> {
                if (null != throwable) {
                    logger.error("Failed to send message", throwable);
                    // Return early.
                    return;
                }
                logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
            }, sendCallbackExecutor);
            // Block to avoid exist of background threads.
            Thread.sleep(Long.MAX_VALUE);
            // Close the producer when you don't need it anymore.
            producer.close();
        }

消费者

推模式(push):broker收到消息主动推送给消费者
拉模式(pull):消费者主动去broker拉消息

push
	private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        String endpoints = "192.168.31.8:8080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // 为消费者指定所属的消费者分组,Group需要提前创建。
        String consumerGroup = "zzzGroup";
        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
        String topic = "TestTopic";
        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者分组。
                .setConsumerGroup(consumerGroup)
                // 设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 设置消费监听器。
                .setMessageListener(messageView -> {
                    // 处理消息并返回消费结果。
                    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // 如果不需要再使用 PushConsumer,可关闭该实例。
        // pushConsumer.close();
    }
pull
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);
    @SuppressWarnings({"resource", "InfiniteLoopStatement"})
    public static void main(String[] args) throws ClientException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // Credential provider is optional for client configuration.
        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider =
            new StaticSessionCredentialsProvider(accessKey, secretKey);

        String endpoints = "foobar.com:8080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints)
            .setCredentialProvider(sessionCredentialsProvider)
            .build();
        String consumerGroup = "yourConsumerGroup";
        Duration awaitDuration = Duration.ofSeconds(30);
        String tag = "yourMessageTagA";
        String topic = "yourTopic";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // In most case, you don't need to create too many consumers, singleton pattern is recommended.
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // Set the consumer group name.
            .setConsumerGroup(consumerGroup)
            // set await duration for long-polling.
            .setAwaitDuration(awaitDuration)
            // Set the subscription for the consumer.
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            .build();
        // Max message num for each long polling.
        int maxMessageNum = 16;
        // Set message invisible duration after it is received.
        Duration invisibleDuration = Duration.ofSeconds(15);
        // Receive message, multi-threading is more recommended.
        do {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            log.info("Received {} message(s)", messages.size());
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    consumer.ack(message);
                    log.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        } while (true);
        // Close the simple consumer when you don't need it anymore.
        // consumer.close();
    }

特性

顺序消息

创建topic

sh bin/mqadmin updatetopic -n localhost:9876 -t FIFOTopic -b localhost:10911  -a +message.type=FIFO

和普通消息发送相比,顺序消息发送必须要设置消息组。

Message message = provider.newMessageBuilder()
    .setTopic("FIFOTopic")
    // 设置消息索引键,可根据关键字精确查找某条消息。
    .setKeys("fifoKey")
    // 设置消息Tag,用于消费端根据指定Tag过滤消息。
    .setTag("test")
    //设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
    .setMessageGroup("fifoGroup001")
    // 消息体。
    .setBody("messageBody".getBytes())
    .build();
SendReceipt sendReceipt = producer.send(message);

消费顺序消息时,需要确保当前消费者分组是顺序投递模式,否则仍然按并发乱序投递。

//消费示例一:使用PushConsumer消费顺序消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
    @Override
    public ConsumeResult consume(MessageView messageView) {
        System.out.println(messageView);
        //根据消费结果返回状态。
        return ConsumeResult.SUCCESS;
    }
};
//消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。
//需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
List<MessageView> messageViewList = null;
try {
    messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
    messageViewList.forEach(messageView -> {
        System.out.println(messageView);
        //消费处理完成后,需要主动调用ACK提交消费结果。
        try {
            simpleConsumer.ack(messageView);
        } catch (ClientException e) {
            e.printStackTrace();
        }
    });
} catch (ClientException e) {
    //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
    e.printStackTrace();
}
  • 发送到同一队列
    public static void main(String[] args) throws ClientException {
            try {
                DefaultMQProducer producer = new DefaultMQProducer("zzzFIFOGroup");
                producer.setNamesrvAddr("192.168.31.8:9876");
                producer.start();
                for (int i = 0; i < 10; i++) {
                    int orderId = i;
                    for (int j = 0; j < 5; j++) {
                        Message message = new Message("FIFOTopic", "order_" + orderId, "KEY" + orderId,
                                ("order:" + orderId + " step:" + j).getBytes());
                        SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                            @Override
                            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                                Integer id = (Integer) o;
                                int index = id % list.size();
                                return list.get(index);
                            }
                        }, orderId);
                        System.out.println(sendResult);
                    }
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
  • 使用MessageListenerOrderly消费
    public static void main(String[] args) throws ClientException, IOException, InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zzzFIFOGroup");
            consumer.setNamesrvAddr("192.168.31.8:9876");
            consumer.subscribe("FIFOTopic", "*");
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                    consumeOrderlyContext.setAutoCommit(true);
                    for (MessageExt msg: list) {
                        System.out.println("收到的消息:" + new String(msg.getBody()));
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
            Thread.sleep(Long.MAX_VALUE);
        }

过滤

  • tag
    String tag = "tagA || tagB";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // 设置预绑定的订阅关系。
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
  • sql
    String sql = "(TAGS is not null and TAGS in ('tagA', 'tagB')) and (a is not null and a between 20 and 50)";
    FilterExpression filterExpression = new FilterExpression(sql, FilterExpressionType.SQL92);

广播

消费模式分为广播模式,集群模式
rocket默认集群消费,同一个消费group下的多个consumer平摊消息队列中的消息
广播模式:同一个消费group下的多个消费者,每个都会消费消息队列中的全量消息

consumer.setMessageModel(MessageModel.BROADCASTING);

事务消息

  • 事务监听器
    public class TestTransactionListener implements TransactionListener {
         /**
          * description: 回调操作方法
          *
          * 消息预提交成功就会触发该方法的执行,用于完成本地事务
          */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println("预提交消息成功:" + msg);
            // 假设接收到TAGA的消息就表示扣款操作成功,TAGB的消息表示扣款失败,
            // TAGC表示扣款结果不清楚,需要执行消息回查
            if (StringUtils.equals("TAGA", msg.getTags())) {
                return LocalTransactionState.COMMIT_MESSAGE;
            } else if (StringUtils.equals("TAGB", msg.getTags())) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            } else if (StringUtils.equals("TAGC", msg.getTags())) {
                return LocalTransactionState.UNKNOW;
            }
            return LocalTransactionState.UNKNOW;
        }
    
        /**
         * description: 消息回查方法
         * 引发消息回查的原因最常见的有两个:
         * 1)回调操作返回UNKNWON
         * 2)TC没有接收到TM的最终全局事务确认指令
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.out.println("执行消息回查" + msg.getTags());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }
  • 生产者
    public class TransactionProducer {
        public static void main(String[] args) throws Exception {
            TransactionMQProducer producer = new TransactionMQProducer("transactionGroup");
            producer.setNamesrvAddr("192.168.31.8:9876");
            // 自定义线程池
            ExecutorService executorService = new ThreadPoolExecutor(2, 5,
                    100, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r);
                            thread.setName("client-transaction-msg-check-thread");
                            return thread;
                        }
                    });
            // 为生产者指定一个线程池
            producer.setExecutorService(executorService);
            // 为生产者添加事务监听器
            producer.setTransactionListener(new TestTransactionListener());
            producer.start();
            String[] tags = {"TAGA","TAGB","TAGC"};
            for (int i = 0; i < 3; i++) {
                byte[] body = ("Hi," + i).getBytes();
                Message msg = new Message("TransTopic", tags[i], body);
                // 发送事务消息
                // 第二个参数用于指定在执行本地事务时要使用的业务参数
                SendResult sendResult = producer.sendMessageInTransaction(msg,null);
                System.out.println("发送结果为:" + sendResult.getSendStatus());
            }
        }
    }
  • 消费者
    public class TransactionConsumer {
        public static void main(String[] args) throws ClientException, IOException, InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transactionGroup");
            consumer.setNamesrvAddr("192.168.31.8:9876");
            consumer.subscribe("TransTopic", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt msg: list) {
                        System.out.println("收到的消息:" + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            Thread.sleep(Long.MAX_VALUE);
        }
    }

其它

ACL鉴权

鉴权规则生效
vi broker.conf
aclEnable=true
配置鉴权规则
vi conf/plain_acl.yml(热加载,不需要重启)

plain_acl.yml配置:

globalWhiteRemoteAddresses:
- 10.11.192.111
- 192.168.31.*

accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - TopicTest=PUB
  groupPerms:
  # the group should convert to retry topic
  - oms_consumer_group=DENY

- accessKey: admin
  secretKey: 12345678
  whiteRemoteAddress:
  # if it is admin, it could access all resources
  admin: true
  # RocketMQ有个bug,admin默认权限不对,所以下面默认给发布和订阅权限
  defaultTopicPerm: PUB|SUB
  defaultGroupPerm: PUB|SUB
  • PUB是发布权限,SUB是订阅权限、也就是消费权限,按需配置,可以分发布用户和订阅用户,也可以一个用户拥有PUB|SUB权限
  • 用户RocketMQ普通用户,分配的权限为只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。
  • 用户admin是管理员用户,权限最大

客户端

static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials("username", "password"));
}
DefaultMQProducer producer = new DefaultMQProducer("testTopic", getAclRPCHook());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testTopic", getAclRPCHook(), new AllocateMessageQueueAveragely());

消息轨迹

vi broker.conf
traceTopicEnable=true
客户端使用消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("zzzFIFOGroup", true);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zzzFIFOGroup", true);

springboot

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
    <!-- 如果和当前项目使用的spring版本还有springBoot版本不一致,建议排除-->
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
        </exclusion>
    </exclusions>
</dependency>
rocketmq:
  name-server: 192.168.31.8:9876
  producer:
    group: springBootGroup
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    #access-key: # Access Key
    #secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启
  • 生产者
    @RequiredArgsConstructor
    @RestController
    public class ProducerController {
        private final String topic = "bootTopic";
        private final RocketMQTemplate rocketMQTemplate;
    
        @RequestMapping("/sync")
        public String syncSend() {
            // 同步
            Message<String> message = MessageBuilder.withPayload("sync message")
                    .setHeader(RocketMQHeaders.KEYS, "key_sync")
                    .setHeader(RocketMQHeaders.TAGS, "tagA")
                    .build();
            SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
            System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
            return "OK";
        }
    
        @RequestMapping("/async")
        public String asyncSend() {
            // 异步
            Message<String> message = MessageBuilder.withPayload("async message")
                    .setHeader(RocketMQHeaders.KEYS, "key_async")
                    .setHeader(RocketMQHeaders.TAGS, "tagA")
                    .build();
            rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
                @Override
                public void onSuccess(SendResult var1) {
                    System.out.printf("async onSuccess SendResult=%s %n", var1);
                }
                @Override
                public void onException(Throwable var1) {
                    System.out.printf("async onException Throwable=%s %n", var1);
                }
            });
            return "OK";
        }
    
        @RequestMapping("/batch")
        public String batchSend() {
            // 批量
            List<Message> msgs = new ArrayList<Message>();
            for (int i = 0; i < 10; i++) {
                msgs.add(MessageBuilder.withPayload("batch message#" + i)
                        .setHeader(RocketMQHeaders.KEYS, "key_batch_" + i)
                        .setHeader(RocketMQHeaders.TAGS, "tag" + i%3).build());
            }
            SendResult result = rocketMQTemplate.syncSend(topic, msgs, 60000);
            System.out.printf("--- Batch messages send result :" + result);
            return "OK";
        }
    
        @RequestMapping("/fifo")
        public String fifoSend() {
            for (int i = 0; i < 10; i++) {
                Message<String> message =MessageBuilder.withPayload("batch message#" + i)
                        .setHeader(RocketMQHeaders.KEYS, "key_batch_" + i)
                        .setHeader(RocketMQHeaders.TAGS, "tag" + i%3).build();
                // 应该是会以hashKey进行取模,让需要保证顺序的消息坐落在同一个queue上,
                SendResult sr = rocketMQTemplate.syncSendOrderly(topic, message, "hashKey", 60000);
            }
            return "OK";
        }
    
        @RequestMapping("/transaction")
        public String transactionSend() {
            String[] tags = {"TAGA","TAGB","TAGC"};
            for (int i = 0; i < 3; i++) {
                Message<String> message =MessageBuilder.withPayload("hi! " + i)
                        .setHeader(RocketMQHeaders.KEYS, "key_batch_" + i)
                        .build();
                String destination = "TransTopic" + ":" + tags[i];
                SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
                        destination, message, destination);
                System.out.println("发送结果为:" + sendResult.getSendStatus());
            }
            return "OK";
        }
    }
  • 事务监听配置
    @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
    public class MyTransactionImpl implements RocketMQLocalTransactionListener {
    
        private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
            String destination = arg.toString();
            localTrans.put(transId, msg);
            // 这个msg的实现类是GenericMessage,里面实现了toString方法
            // 在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
            // 而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
            System.out.println("预提交消息成功:" + msg);
            // 转成RocketMQ的Message对象
            org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
            String tags = message.getTags();
            if(StringUtils.contains(tags,"TAGA")){
                return RocketMQLocalTransactionState.COMMIT;
            }else if(StringUtils.contains(tags,"TAGB")){
                return RocketMQLocalTransactionState.ROLLBACK;
            }else{
                return RocketMQLocalTransactionState.UNKNOWN;
            }
        }
        // 延迟检查的时间间隔要有点奇怪。
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            System.out.println("会查" + msg);
            String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
            Message originalMessage = localTrans.get(transId);
            // 这里能够获取到自定义的transaction_id属性
            System.out.println("执行消息回查" + originalMessage);
            // 获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
            String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
            if(StringUtils.contains(tags,"TAGC")){
                return RocketMQLocalTransactionState.COMMIT;
            }else if(StringUtils.contains(tags,"TAGD")){
                return RocketMQLocalTransactionState.ROLLBACK;
            }else{
                return RocketMQLocalTransactionState.UNKNOWN;
            }
        }
    }
  • 消费者
    @Component
    @RocketMQMessageListener(consumerGroup = "bootConsumer", topic = "bootTopic", consumeMode = ConsumeMode.CONCURRENTLY)
    public class TestConsumer implements RocketMQListener {
        @Override
        public void onMessage(Object o) {
            System.out.println(o);
        }
    }
  • 额外的RocketMQTemplate
    @ExtRocketMQTemplateConfiguration(nameServer = "${rocketmq.extNameServer}")
    public class ExtRocketMQTemplate extends RocketMQTemplate {
    
    @Resource(name="extRocketMQTemplate")
    private ExtRocketMQTemplate extRocketMQTemplate;

  目录