你好,我是姚秋辰。
在平时网购的时候,你一定有过下单之后忘记付款的情况,等到再回过头想起要付款,发现订单已经被关闭了,很多网购流程里都有类似的“订单超时关闭”功能。相类似的功能还有“自动确认收货”,如果在一定时间内买家都没有点击确认收货按钮,那么系统会自动确认收货并且将订单款项打给卖家。
我举的这两个例子都有一个共同的特征,那就是业务逻辑会预设在未来的某一个时间点被触发。在早期我们经常会使用TTL+死信队列的方式来实现这种定时事件,通过设置一个正常的消息队列并使用TTL指定超时时间,如果队列中的消息超时了,它就会被DLX(死信交换机)转向死信队列。借助这种曲线救国的方式,你就可以通过MQ组件实现“定时消息”。
相比于TTL+DLX,RabbitMQ提供了一种更为优雅的方式来实现这类业务。在这节课中,我将带你使用RabbitMQ的延迟消息插件,实现延迟发放优惠券的场景。
那么首先,我们先来安装这个延迟消息插件吧。
你需要先打开RabbitMQ官网并进入到插件下载页面,在页面中定位到rabbitmq_delayed_message_exchange这个插件。
点击插件上的“Releases”链接,你可以看到适配不同RabbitMQ版本的延迟消息插件。我本地安装的的RabbitMQ版本是3.9.8,最新的延迟消息插件的版本是3.9.0,它可以适配3.9.X系列的RMQ组件,所以我建议你下载3.9.0版本对应的rabbitmq_delayed_message_exchange-3.9.0.ez安装包。
接下来,你需要把安装包的后缀名从.ez改成.zip,然后使用解压缩工具对安装包进行解压。再把解压后的文件复制到RabbitMQ安装路径下的plugins文件夹。以我的本地MAC环境为例,plugins目录位于/usr/local/Cellar/rabbitmq/3.9.8/plugins,你需要根据自己的操作系统和安装路径找到对应的目录。
然后,你需要执行下面这行rabbitmq-plugins命令,通过人工的方式启动rabbitmq_delayed_message_exchange插件。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
最后,你只需要重启一下RabbitMQ服务器,新安装的插件就可以生效了,接下来我们就可以通过代码落地延迟领劵业务了。
因为延迟消息队列和普通消息队列的类型不同,为了和之前的普通领券接口做个区分,我们今天要声明一个新的生产者和消费者,用来对接延迟消息队列。我先从生产者开始创建。
我们依然保持队形,将生产者方法写入CouponProducer这个类中,你可以参考一下下面的代码。
在这段代码中,有一个显而易见的不同之处,你会发现我没有直接将coupon对象传递给生产者,取而代之的是使用了MessageBuilder来构建消息对象,这样做的一个目的是传入一个特殊的header,那就是x-delay。它是延迟消息特有的参数,代表了你想让这个消息在Queue里延迟多久以后再被消费者处理,x-delay对应的单位是毫秒,我在代码中设置的延迟时间是10秒。
// 使用延迟消息发送
public void sendCouponInDelay(RequestCoupon coupon) {
log.info("sent: {}", coupon);
streamBridge.send(EventConstant.ADD_COUPON_DELAY_EVENT,
MessageBuilder.withPayload(coupon)
.setHeader("x-delay", 10 * 1000)
.build());
}
代码中的ADD_COUPON_DELAY_EVENT的值是addCouponDelay-out-0,它是我单独为延迟消息队列指定的function name。
接下来,我在CouponCustomerController类中声明了一个入口方法,用来对接生产者方法创建延迟消息。
@PostMapping("requestCouponDelayEvent")
public void requestCouponDelayedEvent(@Valid @RequestBody RequestCoupon request) {
couponProducer.sendCouponInDelay(request);
}
生产者到这里就创建完了,接下来是消费者。
在消费者这一端,延迟消息和普通消息的实现方式并没有任何不同,你可以把下面这段朴实无华的代码加入到CouponConsumer类中。
@Bean
public Consumer<RequestCoupon> addCouponDelay() {
return request -> {
log.info("received: {}", request);
customerService.requestCoupon(request);
};
}
你需要留意一下消费者的方法名称,一定要保证这里的方法名和配置文件中的function name保持完全的一致。
消费者创建完成之后,我们最后还需要对配置文件做一些修改。
这一步中我们需要做的就是把生产者和消费者添加到application.yml文件中,你可以参考下面这段代码。
spring:
cloud:
stream:
bindings:
# 延迟发券 - producer
addCouponDelay-out-0:
destination: request-coupon-delayed-topic
content-type: application/json
binder: my-rabbit
# 延迟发券 - Consumer
addCouponDelay-in-0:
destination: request-coupon-delayed-topic
content-type: application/json
# 消费组,同一个组内只能被消费一次
group: add-coupon-group
binder: my-rabbit
consumer:
# 如果最大尝试次数为1,即不重试
# 默认是做3次尝试
max-attempts: 1
function:
definition: addCoupon;deleteCoupon;addCouponDelay
rabbit:
bindings:
addCouponDelay-out-0:
producer:
delayed-exchange: true
addCouponDelay-in-0:
consumer:
delayed-exchange: true
在这段代码里有几个关键点,我需要提醒你一下。
第一个是function name的统一。在spring.cloud.stream.function.definition中我添加了addCouponDelay作为functiona name,它和Consumer方法中声明的method name是一致的。
第二个关键点是绑定生产者消费者Topic。你会发现我在生产者和消费者端的destination属性中声明了一个全新的Topic,request-coupon-delayed-topic,这样做是为了重新创建一个带有x-delay-message功能的交换机。
第三个关键点是声明延迟消息功能。在bindings节点下面声明的生产者和消费者配置项中,我设置了delayed-exchange=true,这是延迟队列最为关键的一个属性。如果没有设置,那么系统将会创建一个普通的交换机,而不是具有延迟消费功能的交换机。
实现延迟消息功能所需要的全部操作就完成了,你可以启动项目并尝试发送几个请求,来验证消息是否会延迟消费。
如果你登录到RabbitMQ控制台查看交换机信息,你会发现我们今天声明的延迟消息交换机(request-coupon-delayed-topic)和第29节课中声明的常规交换机(request-coupon-topic)之间的不同,延迟交换机的类型是x-delayed-message,并且带有DM功能标签,这代表当前交换机具备延迟消费功能。
到这里,我们就了解了如何搭建一个延迟消息的场景,下面让我来带你回顾下本节重点吧。
利用RabbitMQ搭建延迟消息的过程并不复杂,不过当项目中Topic多起来的时候,function name的配置很容易出错。当你和一个遵循“约定大于配置”的框架打交道的时候,经常会因为没有遵循一个不起眼的约定,导致功能不work,而且排查起来特别困难。可见事物总是相对的,约定大于配置的思想在提高开发效率的同时,也略微抬高了入门成本和异常排查的成本。
在使用rabbitMQ实现高并发业务场景的时候,我有几个经验跟你分享。
生产者和消费者只是一个消息队列最普通的玩法,每个消息队列都有自己丰富的功能库,比如RabbitMQ就提供了各种强大的插件。你能打开文档中的RabbitMQ插件页面,深入了解几个感兴趣的插件功能,然后在评论区和大家分享吗?
好啦,这节课就结束啦。欢迎你把这节课分享给更多对Spring Cloud感兴趣的朋友。我是姚秋辰,我们下节课再见!