RocketMQ
概念
https://rocketmq.apache.org
RocketMQ由以下组件组成
- NameServer:提供轻量级broker路由服务
- Broker:实际处理消息存储,核心组件
- Producer:消息生产者集群
- Consumer:消费者集群
- Topic:区分消息种类,一个发送者可以发送消息给一个或多个Topic,一个消费者可以订阅一个或多个Topic消息
启动rocket服务首先需要启动nameserver
5.0引入proxy,客户端与proxy直接交互,proxy再与nameserver broker交互
搭建
安装
useradd rocket
passwd rocket
groupadd mq
usermod -g mq rocket
mkdir -p /home/rocket
chown rocket:mq /home/rocket
下载zip解压,把...bin-release放到rocket文件夹
添加环境变量
vi ~/.bash_profile
export ROCKETMQ_HOME=/home/rocket/rocketmq-all-5.1.1-bin-release
PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME...
export PATH
执行下面让环境变量生效
source ~/.bash_profile
启动
启动nameserver
vi runserver.sh
调整JVM内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:Meta...."
nohup sh bin/mqnamesrv &
启动broker
vi runbroker.sh
调整JVM内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
修改conf/broker.conf,最下面添加
autoCreateTopicEnable=true
启动
// nohup sh mqbroker &
nohup sh mqbroker -n 192.168.31.8:9876 --enable-proxy &
关闭
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
管理控制台
https://github.com/apache/rocketmq-dashboard
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
Docker
docker pull apacherocketmq/rocketmq-dashboard:latest
docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.31.8:9876" -p 18080:8080 -t apacherocketmq/rocketmq-dashboard:latest
生产者
消息发送
- 正常发送:发送消息到broker还需等待broker返回信息,然后代码才能继续执行
- 异步发送:发送消息到broker后不管,继续执行代码。之后broker会回调生产者的方法,告诉生产者是否发送成功
正常发送 public static void main(String[] args) throws ClientException { // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。 String endpoint = "192.168.31.8:8080"; // 消息发送的目标Topic名称,需要提前创建。 String topic = "TestTopic"; // 权限 String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); // 初始化Producer时需要设置通信配置以及预绑定的Topic。 Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); // 普通消息发送。 Message message = provider.newMessageBuilder() .setTopic(topic) // 设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") // 设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") // 消息体。 .setBody("messageBody".getBytes()) .build(); try { // 发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); producer.send(message); logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (ClientException e) { logger.error("Failed to send message", e); } // producer.close(); }
异步发送 private static final Logger logger = LoggerFactory.getLogger(AsyncProducerExample.class); public static void main(String[] args) throws ClientException, InterruptedException, IOException { // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。 String endpoint = "192.168.31.8:8080"; // 消息发送的目标Topic名称,需要提前创建。 String topic = "TestTopic"; // 权限 String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); // 初始化Producer时需要设置通信配置以及预绑定的Topic。 Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); // 普通消息发送。 Message message = provider.newMessageBuilder() .setTopic(topic) // 设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("key2") // 设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("tag2") // 消息体。 .setBody("async message".getBytes()) .build(); // 创建自定义线程池接收回调 final CompletableFuture<SendReceipt> future = producer.sendAsync(message); ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool(); future.whenCompleteAsync((sendReceipt, throwable) -> { if (null != throwable) { logger.error("Failed to send message", throwable); // Return early. return; } logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); }, sendCallbackExecutor); // Block to avoid exist of background threads. Thread.sleep(Long.MAX_VALUE); // Close the producer when you don't need it anymore. producer.close(); }
消费者
推模式(push):broker收到消息主动推送给消费者
拉模式(pull):消费者主动去broker拉消息
push
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "192.168.31.8:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "zzzGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
pull
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);
@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
String accessKey = "yourAccessKey";
String secretKey = "yourSecretKey";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
String endpoints = "foobar.com:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "yourConsumerGroup";
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "yourMessageTagA";
String topic = "yourTopic";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// set await duration for long-polling.
.setAwaitDuration(awaitDuration)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
// Receive message, multi-threading is more recommended.
do {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
} while (true);
// Close the simple consumer when you don't need it anymore.
// consumer.close();
}
特性
顺序消息
创建topic
sh bin/mqadmin updatetopic -n localhost:9876 -t FIFOTopic -b localhost:10911 -a +message.type=FIFO
和普通消息发送相比,顺序消息发送必须要设置消息组。
Message message = provider.newMessageBuilder()
.setTopic("FIFOTopic")
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("fifoKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("test")
//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
.setMessageGroup("fifoGroup001")
// 消息体。
.setBody("messageBody".getBytes())
.build();
SendReceipt sendReceipt = producer.send(message);
消费顺序消息时,需要确保当前消费者分组是顺序投递模式,否则仍然按并发乱序投递。
//消费示例一:使用PushConsumer消费顺序消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。
//需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}
- 发送到同一队列
public static void main(String[] args) throws ClientException { try { DefaultMQProducer producer = new DefaultMQProducer("zzzFIFOGroup"); producer.setNamesrvAddr("192.168.31.8:9876"); producer.start(); for (int i = 0; i < 10; i++) { int orderId = i; for (int j = 0; j < 5; j++) { Message message = new Message("FIFOTopic", "order_" + orderId, "KEY" + orderId, ("order:" + orderId + " step:" + j).getBytes()); SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, Message message, Object o) { Integer id = (Integer) o; int index = id % list.size(); return list.get(index); } }, orderId); System.out.println(sendResult); } } } catch (Exception e) { e.printStackTrace(); } }
- 使用MessageListenerOrderly消费
public static void main(String[] args) throws ClientException, IOException, InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zzzFIFOGroup"); consumer.setNamesrvAddr("192.168.31.8:9876"); consumer.subscribe("FIFOTopic", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { consumeOrderlyContext.setAutoCommit(true); for (MessageExt msg: list) { System.out.println("收到的消息:" + new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); Thread.sleep(Long.MAX_VALUE); }
过滤
- tag
String tag = "tagA || tagB"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
- sql
String sql = "(TAGS is not null and TAGS in ('tagA', 'tagB')) and (a is not null and a between 20 and 50)"; FilterExpression filterExpression = new FilterExpression(sql, FilterExpressionType.SQL92);
广播
消费模式分为广播模式,集群模式
rocket默认集群消费,同一个消费group下的多个consumer平摊消息队列中的消息
广播模式:同一个消费group下的多个消费者,每个都会消费消息队列中的全量消息
consumer.setMessageModel(MessageModel.BROADCASTING);
事务消息
- 事务监听器
public class TestTransactionListener implements TransactionListener { /** * description: 回调操作方法 * * 消息预提交成功就会触发该方法的执行,用于完成本地事务 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("预提交消息成功:" + msg); // 假设接收到TAGA的消息就表示扣款操作成功,TAGB的消息表示扣款失败, // TAGC表示扣款结果不清楚,需要执行消息回查 if (StringUtils.equals("TAGA", msg.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TAGB", msg.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if (StringUtils.equals("TAGC", msg.getTags())) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.UNKNOW; } /** * description: 消息回查方法 * 引发消息回查的原因最常见的有两个: * 1)回调操作返回UNKNWON * 2)TC没有接收到TM的最终全局事务确认指令 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("执行消息回查" + msg.getTags()); return LocalTransactionState.COMMIT_MESSAGE; } }
- 生产者
public class TransactionProducer { public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("transactionGroup"); producer.setNamesrvAddr("192.168.31.8:9876"); // 自定义线程池 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); // 为生产者指定一个线程池 producer.setExecutorService(executorService); // 为生产者添加事务监听器 producer.setTransactionListener(new TestTransactionListener()); producer.start(); String[] tags = {"TAGA","TAGB","TAGC"}; for (int i = 0; i < 3; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("TransTopic", tags[i], body); // 发送事务消息 // 第二个参数用于指定在执行本地事务时要使用的业务参数 SendResult sendResult = producer.sendMessageInTransaction(msg,null); System.out.println("发送结果为:" + sendResult.getSendStatus()); } } }
- 消费者
public class TransactionConsumer { public static void main(String[] args) throws ClientException, IOException, InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transactionGroup"); consumer.setNamesrvAddr("192.168.31.8:9876"); consumer.subscribe("TransTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg: list) { System.out.println("收到的消息:" + new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); Thread.sleep(Long.MAX_VALUE); } }
其它
ACL鉴权
鉴权规则生效
vi broker.conf
aclEnable=true
配置鉴权规则
vi conf/plain_acl.yml(热加载,不需要重启)
plain_acl.yml配置:
globalWhiteRemoteAddresses:
- 10.11.192.111
- 192.168.31.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTest=PUB
groupPerms:
# the group should convert to retry topic
- oms_consumer_group=DENY
- accessKey: admin
secretKey: 12345678
whiteRemoteAddress:
# if it is admin, it could access all resources
admin: true
# RocketMQ有个bug,admin默认权限不对,所以下面默认给发布和订阅权限
defaultTopicPerm: PUB|SUB
defaultGroupPerm: PUB|SUB
- PUB是发布权限,SUB是订阅权限、也就是消费权限,按需配置,可以分发布用户和订阅用户,也可以一个用户拥有PUB|SUB权限
- 用户RocketMQ普通用户,分配的权限为只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。
- 用户admin是管理员用户,权限最大
客户端
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("username", "password"));
}
DefaultMQProducer producer = new DefaultMQProducer("testTopic", getAclRPCHook());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testTopic", getAclRPCHook(), new AllocateMessageQueueAveragely());
消息轨迹
vi broker.conf
traceTopicEnable=true
客户端使用消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("zzzFIFOGroup", true);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zzzFIFOGroup", true);
springboot
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<!-- 如果和当前项目使用的spring版本还有springBoot版本不一致,建议排除-->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
rocketmq:
name-server: 192.168.31.8:9876
producer:
group: springBootGroup
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
#access-key: # Access Key
#secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启
- 生产者
@RequiredArgsConstructor @RestController public class ProducerController { private final String topic = "bootTopic"; private final RocketMQTemplate rocketMQTemplate; @RequestMapping("/sync") public String syncSend() { // 同步 Message<String> message = MessageBuilder.withPayload("sync message") .setHeader(RocketMQHeaders.KEYS, "key_sync") .setHeader(RocketMQHeaders.TAGS, "tagA") .build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, message); System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult); return "OK"; } @RequestMapping("/async") public String asyncSend() { // 异步 Message<String> message = MessageBuilder.withPayload("async message") .setHeader(RocketMQHeaders.KEYS, "key_async") .setHeader(RocketMQHeaders.TAGS, "tagA") .build(); rocketMQTemplate.asyncSend(topic, message, new SendCallback() { @Override public void onSuccess(SendResult var1) { System.out.printf("async onSuccess SendResult=%s %n", var1); } @Override public void onException(Throwable var1) { System.out.printf("async onException Throwable=%s %n", var1); } }); return "OK"; } @RequestMapping("/batch") public String batchSend() { // 批量 List<Message> msgs = new ArrayList<Message>(); for (int i = 0; i < 10; i++) { msgs.add(MessageBuilder.withPayload("batch message#" + i) .setHeader(RocketMQHeaders.KEYS, "key_batch_" + i) .setHeader(RocketMQHeaders.TAGS, "tag" + i%3).build()); } SendResult result = rocketMQTemplate.syncSend(topic, msgs, 60000); System.out.printf("--- Batch messages send result :" + result); return "OK"; } @RequestMapping("/fifo") public String fifoSend() { for (int i = 0; i < 10; i++) { Message<String> message =MessageBuilder.withPayload("batch message#" + i) .setHeader(RocketMQHeaders.KEYS, "key_batch_" + i) .setHeader(RocketMQHeaders.TAGS, "tag" + i%3).build(); // 应该是会以hashKey进行取模,让需要保证顺序的消息坐落在同一个queue上, SendResult sr = rocketMQTemplate.syncSendOrderly(topic, message, "hashKey", 60000); } return "OK"; } @RequestMapping("/transaction") public String transactionSend() { String[] tags = {"TAGA","TAGB","TAGC"}; for (int i = 0; i < 3; i++) { Message<String> message =MessageBuilder.withPayload("hi! " + i) .setHeader(RocketMQHeaders.KEYS, "key_batch_" + i) .build(); String destination = "TransTopic" + ":" + tags[i]; SendResult sendResult = rocketMQTemplate.sendMessageInTransaction( destination, message, destination); System.out.println("发送结果为:" + sendResult.getSendStatus()); } return "OK"; } }
- 事务监听配置
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate") public class MyTransactionImpl implements RocketMQLocalTransactionListener { private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID); String destination = arg.toString(); localTrans.put(transId, msg); // 这个msg的实现类是GenericMessage,里面实现了toString方法 // 在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。 // 而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀 System.out.println("预提交消息成功:" + msg); // 转成RocketMQ的Message对象 org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg); String tags = message.getTags(); if(StringUtils.contains(tags,"TAGA")){ return RocketMQLocalTransactionState.COMMIT; }else if(StringUtils.contains(tags,"TAGB")){ return RocketMQLocalTransactionState.ROLLBACK; }else{ return RocketMQLocalTransactionState.UNKNOWN; } } // 延迟检查的时间间隔要有点奇怪。 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { System.out.println("会查" + msg); String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString(); Message originalMessage = localTrans.get(transId); // 这里能够获取到自定义的transaction_id属性 System.out.println("执行消息回查" + originalMessage); // 获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性 String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString(); if(StringUtils.contains(tags,"TAGC")){ return RocketMQLocalTransactionState.COMMIT; }else if(StringUtils.contains(tags,"TAGD")){ return RocketMQLocalTransactionState.ROLLBACK; }else{ return RocketMQLocalTransactionState.UNKNOWN; } } }
- 消费者
@Component @RocketMQMessageListener(consumerGroup = "bootConsumer", topic = "bootTopic", consumeMode = ConsumeMode.CONCURRENTLY) public class TestConsumer implements RocketMQListener { @Override public void onMessage(Object o) { System.out.println(o); } }
- 额外的RocketMQTemplate
@ExtRocketMQTemplateConfiguration(nameServer = "${rocketmq.extNameServer}") public class ExtRocketMQTemplate extends RocketMQTemplate { @Resource(name="extRocketMQTemplate") private ExtRocketMQTemplate extRocketMQTemplate;