SpringBoot——整合RocketMQ
一、RocketMQ 的前世今生
RocketMQ 是阿里巴巴开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务,是一个统一的消息引擎,轻量级的数据处理平台。起源于阿里巴巴 2001 年的五彩石项目, Notify 在这期间应运而生,用于交易核心消息的流转 。
2010 年, B2B 开始大规模使用 ActiveMQ 作为消息内核。
2011 年,随着阿里业务的快速发展,急需一款支持顺序消息,拥有海量消息堆积能力的消息中间件, MetaQ 1.0 因此诞生。
2012 年, MetaQ已经发展到了3.0版本,RocketMQ 正是基于 MetaQ 3.0 开发的分布式消息传递中间件,专为万亿级的消息处理而设计,具有高吞吐量,低延迟,海量积累和有序消息。它是阿里巴巴和众多大型互联网业务场景的双十一购物狂欢的现成工具。由于这些优点,它吸引了越来越多的应用程序进行访问。同年,阿里巴巴正式开源了 RocketMQ 的第一个版本。
2015 年,RocketMQ 见证了消息传递的多项重量级功能,包括交易消息、SQL过滤器、消息追溯、调度消息、多站点高可用性等,以满足阿里巴巴日益丰富的业务场景。它还取代了阿里巴巴自主研发的另一款 MQ 产品 Notify,成为阿里巴巴首选的消息中间件。
2016 年,RocketMQ 在阿里云上开发了首个全托管服务,帮助大量数字化转型企业构建现代应用,并开始体验大规模的云计算实践。同年,RocketMQ 被捐赠给Apache基金会,并加入孵化器项目,旨在在未来为更多的开发者服务。
2016 年,RocketMQ 荣获中国最受欢迎开源软件奖。
2017 年,从 Apache 基金会毕业后,RocketMQ被指定为顶级项目(TLP)。
2018 年,RocketMQ 荣获中国最受欢迎开源软件奖。
二、RocketMQ 的基本概念
2.1 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
2.2 消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。
2.3 消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
2.4 主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。
2.5 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
2.6 名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。
2.7 拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
2.8 推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
2.9 生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
2.10 消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
2.11 集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
2.12 广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
2.13 普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
2.14 严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
2.15 消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
2.16 标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
2.17 DefaultMQPushConsumer和DefaultMQPullConsumer
Push的方式是 Server端接收到消息后,主动把消息推送给 Client端,主动权在Server端,实时性高。用 Push方式主动推送有很多弊 端:首先是加大 Server 端的 工作量,进而影响 Server 的性能;其次,Client 的处理能力各不相同, Client 的状态不受 Server 控制,
Pull方式是 Client端循环地从 Server端拉取消息,主动权在 Client手里, 自己拉取到一定量消息后,处理妥当了再接着取。Pull 方式的问题是循环拉取 消息的间隔不好设定,间隔太短就处在一个 “忙等”的状态,浪费资源; Pull 的时间间隔太长 Server 端有消息到来时 有可能没有被及时处理。
三、SpringBoot整合RocketMQ
3.1 添加rocketmq-spring-boot-starter等相关依赖
org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.2
3.2 添加配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
#必须指定group
group: test-group
3.3 同步消息
同步消息 API
//发送普通同步消息-Object
syncSend(String destination, Object payload);
//发送普通同步消息-Message
syncSend(String destination, Message> message);
//发送批量普通同步消息
syncSend(String destination, Collection messages);
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout);
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message> message, long timeout);
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection messages, long timeout);
//发送普通同步延迟消息,并设置超时,这个下文会演示
syncSend(String destination, Message> message, long timeout, int delayLevel);
/**
* 普通发送
* @param topic 消息主题
* @param msg 消息体
* @param 消息泛型
*/
public void send(String topic, T msg) {
rocketMQTemplate.convertAndSend(topic + ":tag1", msg);
//rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(msg).build()); // 等价于上面一行
}
/**
* 发送带tag的消息,直接在topic后面加上":tag"
*
* @param topic 消息主题
* @param tag 消息tag
* @param msg 消息体
* @param 消息泛型
* @return
*/
public SendResult sendTagMsg(String topic, String tag, T msg) {
topic = topic + ":" + tag;
return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build());
}
/**
* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
* sendResult为返回的发送结果
*/
public SendResult sendMsg(String topic, T msg) {
Message message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
这里存在两种消息体,一种是Object的,另一种是Message>的形式的,其实我们发送Object的时候,底层是有帮我们做转换的,其实和我们在上层调用。
MessageBuilder.withPayload("hello world test1").build()
3.4 异步消息
异步消息 API
//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback);
//发送普通异步消息-Message
asyncSend(String destination, Message> message, SendCallback sendCallback);
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message> message, SendCallback sendCallback, long timeout);
//发送普通异步延迟消息,并设置超时,这个下文会演示
asyncSend(String destination, Message> message, SendCallback sendCallback, long timeout,
int delayLevel);
/**
* 发送异步消息
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
* (适合对响应时间敏感的业务场景)
* @param topic 消息Topic
* @param msg 消息实体
*
*/
public void asyncSend(String topic, T msg) {
Message message = MessageBuilder.withPayload(msg).build();
asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---发送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
}
});
}
/**
* 发送异步消息
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
* (适合对响应时间敏感的业务场景)
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
*/
public void asyncSend(String topic, Message> message, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(topic, message, sendCallback);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
*/
public void asyncSend(String topic, Message> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
3.5 单向消息
/**
* 单向消息
* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
* @param topic 消息主题
* @param msg 消息体
* @param 消息泛型
*/
public void sendOneWayMsg(String topic, T msg) {
Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.sendOneWay(topic, message);
}
3.6 批量消息
/**
* 发送批量消息
*
* @param topic 消息主题
* @param msgList 消息体集合
* @param 消息泛型
* @return
*/
public SendResult asyncSendBatch(String topic, List msgList) {
List> messageList = msgList.stream()
.map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
return rocketMQTemplate.syncSend(topic, messageList);
}
3.7 延迟消息
同步延迟消息
/**
* 同步延迟消息
* rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
* RocketMQ 目前只支持固定精度的定时消息。
* 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 延迟的底层方法是用定时任务实现的。
* 发送延时消息(delayLevel的值就为0,因为不延时)
*
* @param topic 消息主题
* @param msg 消息体
* @param timeout 发送超时时间
* @param delayLevel 延迟级别 1到18
* @param 消息泛型
*/
public void sendDelay(String topic, T msg, long timeout, int delayLevel) {
Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
}
异步延迟消息
/**
* 发送异步延迟消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSendDelay(String topic, Message> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}
/**
* 发送异步延迟消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSendDelay(String topic, Message> message, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---发送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
}
}, timeout, delayLevel);
}
3.8 顺序消息
RocketMQ顺序消息,这里使用rocketmq-spring-boot-starter发送顺序消息就比较方便了,不像使用rocket-client那样,需要手动获取RocketMQ中当前topic的队列个数然后再通过hashKey值,mqs.size()取模,得到一个索引值,这里底层都帮我们做好了处理!
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param 消息泛型
*/
public void syncSendOrderly(String topic, T msg, String hashKey) {
Message message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param timeout 超时时间
*/
public void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
Message message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
3.9 消费者
/**
* @Author: huangyibo
* @Date: 2022/7/2 22:22
* @Description:
* topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
* selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
*/
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "Con_Group_One",
topic = "RLT_TEST_TOPIC",
selectorExpression = "tag1")
public class RocketMqConsumer implements RocketMQListener {
@Override
public void onMessage(String message) {
log.info("监听到消息:message:{}", message);
}
}
四、事务消息
事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。
RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:
Half(Prepare) Message——半消息(预处理消息)
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
Message Status Check——消息状态回查
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。
执行流程:
- 1、应用模块遇到要发送事务消息的场景时,先发送prepare消息给MQ。
- 2、prepare消息发送成功后,应用模块执行数据库事务(本地事务)。
- 3、根据数据库事务执行的结果,再返回Commit或Rollback给MQ。
- 4、如果是Commit,MQ把消息下发给Consumer端,如果是Rollback,直接删掉prepare消息。
- 5、第3步的执行结果如果没响应,或是超时的,启动定时任务回查事务状态(最多重试15次,超过了默认丢弃此消息),处理结果同第4步。
- 6、MQ消费的成功机制由MQ自己保证。
具体实例:
通过rocketMQTemplate的sendMessageInTransaction方法发送事务消息
/**
* 发送事务消息
*
* @param txProducerGroup 事务消息的生产者组名称
* @param topic 事务消息主题
* @param tag 事务消息tag
* @param msg 事务消息体
* @param arg 事务消息监听器回查参数
* @param 事务消息泛型
*/
public void sendTransaction(String txProducerGroup, String topic, String tag, T msg, T arg){
if(!StringUtils.isEmpty(tag)){
topic = topic + ":" + tag;
}
String transactionId = UUID.randomUUID().toString();
Message message = MessageBuilder.withPayload(msg)
//header也有大用处
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", msg.getId())
.build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, arg);
if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
&& result.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("事物消息发送成功");
}
log.info("事物消息发送结果:{}", result);
}
定义本地事务处理类,实现RocketMQLocalTransactionListener接口,以及加上@RocketMQTransactionListener注解,这个类似方法的调用是异步的;
executeLocalTransaction方法,当我们处理完业务后,可以根据业务处理情况,返回事务执行状态,有bollback, commit or unknown三种,分别是回滚事务,提交事务和未知;根据事务消息执行流程,如果返回bollback,则直接丢弃消息;如果是返回commit,则消费消息;如果是unknow,则继续等待,然后调用checkLocalTransaction方法,最多重试15次,超过了默认丢弃此消息;
checkLocalTransaction方法,是当MQ Server未得到MQ发送方应答,或者超时的情况,或者应答是unknown的情况,调用此方法进行检查确认,返回值和上面的方法一样;
/**
* @Author: huangyibo
* @Date: 2022/7/2 23:06
* @Description: 事物消息Producer事务监听器
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private ShareService shareService;
@Autowired
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 发送prepare消息成功此方法被回调,该方法用于执行本地事务
* @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id
* @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
MessageHeaders headers = message.getHeaders();
String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.parseInt((String)headers.get("share_id"));
try {
shareService.auditBYIdWithRocketMqLog(shareId,(ShareAuditDTO)auditDTO,transactionId);
//本地事物成功,执行commit
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事物执行异常,e={}",e);
//本地事物失败,执行rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//mq回调检查本地事务执行情况
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog
.builder().transactionId(transactionId).build());
if(rocketmqTransactionLog == null){
log.error("如果本地事物日志没有记录,transactionId={}",transactionId);
//本地事物失败,执行rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
//如果本地事物日志有记录,执行commit
return RocketMQLocalTransactionState.COMMIT;
}
}
事务消息消费者和普通消息一致
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group",
topic = "transaction-str",
consumeMode = ConsumeMode.ORDERLY)
public class TransactionConsumer implements RocketMQListener {
@Override
public void onMessage(String message) {
log.info("监听到消息:message:{}", message);
}
}
完整消息发送api
@Component
@Slf4j
public class RocketMqProducer {
/**
* rocketmq模板注入
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 普通发送
* @param topic 消息主题
* @param msg 消息体
* @param 消息泛型
*/
public void send(String topic, T msg) {
rocketMQTemplate.convertAndSend(topic + ":tag1", msg);
//rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(msg).build()); // 等价于上面一行
}
/**
* 发送带tag的消息,直接在topic后面加上":tag"
*
* @param topic 消息主题
* @param tag 消息tag
* @param msg 消息体
* @param 消息泛型
* @return
*/
public SendResult sendTagMsg(String topic, String tag, T msg) {
topic = topic + ":" + tag;
return rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msg).build());
}
/**
* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
* sendResult为返回的发送结果
*/
public SendResult sendMsg(String topic, T msg) {
Message message = MessageBuilder.withPayload(msg).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
return sendResult;
}
/**
* 发送异步消息
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
* (适合对响应时间敏感的业务场景)
* @param topic 消息Topic
* @param msg 消息实体
*
*/
public void asyncSend(String topic, T msg) {
Message message = MessageBuilder.withPayload(msg).build();
asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---发送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
}
});
}
/**
* 发送异步消息
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
* (适合对响应时间敏感的业务场景)
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
*/
public void asyncSend(String topic, Message> message, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(topic, message, sendCallback);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
*/
public void asyncSend(String topic, Message> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
/**
* 同步延迟消息
* rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
* RocketMQ 目前只支持固定精度的定时消息。
* 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 延迟的底层方法是用定时任务实现的。
* 发送延时消息(delayLevel的值就为0,因为不延时)
*
* @param topic 消息主题
* @param msg 消息体
* @param timeout 发送超时时间
* @param delayLevel 延迟级别 1到18
* @param 消息泛型
*/
public void sendDelay(String topic, T msg, long timeout, int delayLevel) {
Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
}
/**
* 发送异步延迟消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSendDelay(String topic, Message> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}
/**
* 发送异步延迟消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSendDelay(String topic, Message> message, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息---发送MQ成功---", topic);
}
@Override
public void onException(Throwable throwable) {
log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
}
}, timeout, delayLevel);
}
/**
* 单向消息
* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
* @param topic 消息主题
* @param msg 消息体
* @param 消息泛型
*/
public void sendOneWayMsg(String topic, T msg) {
Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.sendOneWay(topic, message);
}
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param 消息泛型
*/
public void syncSendOrderly(String topic, T msg, String hashKey) {
Message message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param msg 消息体
* @param hashKey 确定消息发送到哪个队列中
* @param timeout 超时时间
*/
public void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
Message message = MessageBuilder.withPayload(msg).build();
log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
/**
* 发送批量消息
*
* @param topic 消息主题
* @param msgList 消息体集合
* @param 消息泛型
* @return
*/
public SendResult asyncSendBatch(String topic, List msgList) {
List> messageList = msgList.stream()
.map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
return rocketMQTemplate.syncSend(topic, messageList);
}
/**
* 发送事务消息
*
* @param txProducerGroup 事务消息的生产者组名称
* @param topic 事务消息主题
* @param tag 事务消息tag
* @param msg 事务消息体
* @param arg 事务消息监听器回查参数
* @param 事务消息泛型
*/
public void sendTransaction(String txProducerGroup, String topic, String tag, T msg, T arg){
if(!StringUtils.isEmpty(tag)){
topic = topic + ":" + tag;
}
String transactionId = UUID.randomUUID().toString();
Message message = MessageBuilder.withPayload(msg)
//header也有大用处
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", msg.getId())
.build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(txProducerGroup, topic, message, arg);
if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
&& result.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("事物消息发送成功");
}
log.info("事物消息发送结果:{}", result);
}
}
上面写的这几个消息发送方法,你应该注意到了: 前两个方法的参数 topic 和其它的不一样
其实这是 rocketmq 和 springboot 整合后设置 Tag 的方式(Tag:用于区分过滤同一主题下的不同业务类型的消息,非常实用)
在项目里往mq写入消息时,最好每条消息都带上tag,用于消费时根据业务过滤
在 rocketmq-spring-boot-starter 中,Tag 的设置方式: 在 topic后面加上 “:tagName”
另外,从上面的截图中可以看到“key”的设置方式,发送消息时在header中设置:
MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS, "key1")
总结:
- 1、实际运用中一些配置不要像我上面一样写在代码里,写在配置文件里或统一配置。
- 2、消息发送成功与失败可以根据sendResult判断,消息消费成功与否其实源码内部已做了处理,只要不出现异常,就是消费成功,如果你业务代码逻辑有问题那另说。
- 3、实际生产中还要注意重复消费问题,这里我提供一个方法:在数据库加一个去重表,给表里的一个字段如key添加唯一索引,消费前先入库,正常则往下执行你的业务逻辑,入库失败了表明该消息已消费过,不能往下走了。
- 4、其实rocketmq还有一个很重要的特性:事务,其它mq可是不支持的,利用事务可以做很多事,如跟钱相关的业务、分布式事务,不过事务的实现过程要麻烦点。
- 5、上面就是RocketMQ与Springboot的整合,整合了使用起来还是比较简单的
参考:
https://blog.csdn.net/CSDN877425287/article/details/121964142
https://blog.csdn.net/qq_36737803/article/details/112261352
https://blog.csdn.net/caoli201314/article/details/120248361
版权声明:
作者:zhangchen
链接:https://www.techfm.club/p/51844.html
来源:TechFM
文章版权归作者所有,未经允许请勿转载。
共有 0 条评论