你好,我是蒋德钧。
现在的互联网应用基本上都是采用分布式系统架构进行设计的,而很多分布式系统必备的一个基础软件就是消息队列。
消息队列要能支持组件通信消息的快速读写,而Redis本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。不过,除了性能,消息队列还有其他的要求,所以,很多人都很关心一个问题:“Redis适合做消息队列吗?”
其实,这个问题的背后,隐含着两方面的核心问题:
这节课,我们就来聊一聊消息队列的特征和Redis提供的消息队列方案。只有把这两方面的知识和实践经验串连起来,才能彻底理解基于Redis实现消息队列的技术实践。以后当你需要为分布式系统组件做消息队列选型时,就可以根据组件通信量和消息通信速度的要求,选择出适合的Redis消息队列方案了。
我们先来看下第一个问题:消息队列的消息读取有什么样的需求?
我先介绍一下消息队列存取消息的过程。在分布式系统中,当两个组件要基于消息队列进行通信时,一个组件会把要处理的数据以消息的形式传递给消息队列,然后,这个组件就可以继续执行其他操作了;远端的另一个组件从消息队列中把消息读取出来,再在本地进行处理。
为了方便你理解,我还是借助一个例子来解释一下。
假设组件1需要对采集到的数据进行求和计算,并写入数据库,但是,消息到达的速度很快,组件1没有办法及时地既做采集,又做计算,并且写入数据库。所以,我们可以使用基于消息队列的通信,让组件1把数据x和y保存为JSON格式的消息,再发到消息队列,这样它就可以继续接收新的数据了。组件2则异步地从消息队列中把数据读取出来,在服务器2上进行求和计算后,再写入数据库。这个过程如下图所示:
我们一般把消息队列中发送消息的组件称为生产者(例子中的组件1),把接收消息的组件称为消费者(例子中的组件2),下图展示了一个通用的消息队列的架构模型:
在使用消息队列时,消费者可以异步读取生产者消息,然后再进行处理。这样一来,即使生产者发送消息的速度远远超过了消费者处理消息的速度,生产者已经发送的消息也可以缓存在消息队列中,避免阻塞生产者,这是消息队列作为分布式组件通信的一大优势。
不过,消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。
虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。
我们来看一个更新商品库存的场景。
假设生产者负责接收库存更新请求,消费者负责实际更新库存,现有库存量是10。生产者先后发送了消息1和消息2,消息1要把商品X的库存记录更新为5,消息2是把商品X库存更新为3。如果消息1和2在消息队列中无法保序,出现消息2早于消息1被处理的情况,那么,很显然,库存更新就出错了。这是业务应用无法接受的。
面对这种情况,你可能会想到一种解决方案:不要把更新后的库存量作为生产者发送的消息,而是把库存扣除值作为消息的内容。这样一来,消息1是扣减库存量5,消息2是扣减库存量2。如果消息1和消息2之间没有库存查询请求的话,即使消费者先处理消息2,再处理消息1,这个方案也能够保证最终的库存量是正确的,也就是库存量为3。
但是,我们还需要考虑这样一种情况:假如消费者收到了这样三条消息:消息1是扣减库存量5,消息2是读取库存量,消息3是扣减库存量2,此时,如果消费者先处理了消息3(把库存量扣减2),那么库存量就变成了8。然后,消费者处理了消息2,读取当前的库存量是8,这就会出现库存量查询不正确的情况。从业务应用层面看,消息1、2、3应该是顺序执行的,所以,消息2查询到的应该是扣减了5以后的库存量,而不是扣减了2以后的库存量。所以,用库存扣除值作为消息的方案,在消息中同时包含读写操作的场景下,会带来数据读取错误的问题。而且,这个方案还会面临一个问题,那就是重复消息处理。
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。
还是以库存更新为例,假设消费者收到了一次消息1,要扣减库存量5,然后又收到了一次消息1,那么,如果消费者无法识别这两条消息实际是一条相同消息的话,就会执行两次扣减库存量5的操作,此时,库存量就不对了。这当然也是无法接受的。
另外,消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
Redis的List和Streams两种数据类型,就可以满足消息队列的这三个需求。我们先来了解下基于List的消息队列实现方法。
List本身就是按先进先出的顺序对数据进行存取的,所以,如果使用List作为消息队列保存消息的话,就已经能满足消息保序的需求了。
具体来说,生产者可以使用LPUSH命令把要发送的消息依次写入List,而消费者则可以使用RPOP命令,从List的另一端按照消息的写入顺序,依次读取消息并进行处理。
如下图所示,生产者先用LPUSH写入了两条库存消息,分别是5和3,表示要把库存更新为5和3;消费者则用RPOP把两条消息依次读出,然后进行相应的处理。
不过,在消费者读取数据时,有一个潜在的性能风险点。
在生产者往List中写入数据时,List并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用RPOP命令(比如使用一个while(1)循环)。如果有新消息写入,RPOP命令就会返回结果,否则,RPOP命令返回空值,再继续循环。
所以,即使没有新消息写入List,消费者也要不停地调用RPOP命令,这就会导致消费者程序的CPU一直消耗在执行RPOP命令上,带来不必要的性能损失。
为了解决这个问题,Redis提供了BRPOP命令。BRPOP命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用RPOP命令相比,这种方式能节省CPU开销。
消息保序的问题解决了,接下来,我们还需要考虑解决重复消息处理的问题,这里其实有一个要求:消费者程序本身能对重复消息进行判断。
一方面,消息队列要能给每一个消息提供全局唯一的ID号;另一方面,消费者程序要把已经处理过的消息的ID号记录下来。
当收到一条消息后,消费者程序就可以对比收到的消息ID和记录的已处理过的消息ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。这种处理特性也称为幂等性,幂等性就是指,对于同一条消息,消费者收到一次的处理结果和收到多次的处理结果是一致的。
不过,List本身是不会为每个消息生成ID号的,所以,消息的全局唯一ID号就需要生产者程序在发送消息前自行生成。生成之后,我们在用LPUSH命令把消息插入List时,需要在消息中包含这个全局唯一ID。
例如,我们执行以下命令,就把一条全局ID为101030001、库存量为5的消息插入了消息队列:
LPUSH mq "101030001:stock:5"
(integer) 1
最后,我们再来看下,List类型是如何保证消息可靠性的。
当消费者程序从List中读取一条消息后,List就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从List中读取消息了。
为了留存消息,List类型提供了BRPOPLPUSH命令,这个命令的作用是让消费者程序从一个List中读取消息,同时,Redis会把这个消息再插入到另一个List(可以叫作备份List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份List中重新读取消息并进行处理了。
我画了一张示意图,展示了使用BRPOPLPUSH命令留存消息,以及消费者再次读取消息的过程,你可以看下。
生产者先用LPUSH把消息“5”“3”插入到消息队列mq中。消费者程序使用BRPOPLPUSH命令读取消息“5”,同时,消息“5”还会被Redis插入到mqback队列中。如果消费者程序处理消息“5”时宕机了,等它重启后,可以从mqback中再次读取消息“5”,继续处理。
好了,到这里,你可以看到,基于List类型,我们可以满足分布式组件对消息队列的三大需求。但是,在用List做消息队列时,我们还可能遇到过一个问题:生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致List中的消息越积越多,给Redis的内存带来很大压力。
这个时候,我们希望启动多个消费者程序组成一个消费组,一起分担处理List中的消息。但是,List类型并不支持消费组的实现。那么,还有没有更合适的解决方案呢?这就要说到Redis从5.0版本开始提供的Streams数据类型了。
和List相比,Streams同样能够满足消息队列的三大需求。而且,它还支持消费组形式的消息读取。接下来,我们就来了解下Streams的使用方法。
Streams是Redis专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
首先,我们来学习下Streams类型存取消息的操作XADD。
XADD命令可以往消息队列中插入新消息,消息的格式是键-值对形式。对于插入的每一条消息,Streams可以自动为其生成一个全局唯一的ID。
比如说,我们执行下面的命令,就可以往名称为mqstream的消息队列中插入一条消息,消息的键是repo,值是5。其中,消息队列名称后面的*
,表示让Redis为插入的数据自动生成一个全局唯一的ID,例如“1599203861727-0”。当然,我们也可以不用*
,直接在消息队列名称后自行设定一个ID号,只要保证这个ID号是全局唯一的就行。不过,相比自行设定ID号,使用*
会更加方便高效。
XADD mqstream * repo 5
"1599203861727-0"
可以看到,消息的全局唯一ID由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,这是从0开始编号的。例如,“1599203861727-0”就表示在“1599203861727”毫秒内的第1条消息。
当消费者需要读取消息时,可以直接使用XREAD命令从消息队列中读取。
XREAD在读取消息时,可以指定一个消息ID,并从这个消息ID的下一条消息开始进行读取。
例如,我们可以执行下面的命令,从ID号为1599203861727-0的消息开始,读取后续的所有消息(示例中一共3条)。
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
2) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
3) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
另外,消费者也可以在调用XRAED时设定block配置项,实现类似于BRPOP的阻塞读取操作。当消息队列中没有消息时,一旦设置了block配置项,XREAD就会阻塞,阻塞的时长可以在block配置项进行设置。
举个例子,我们来看一下下面的命令,其中,命令最后的“$”符号表示读取最新的消息,同时,我们设置了block 10000的配置项,10000的单位是毫秒,表明XREAD在读取最新消息时,如果没有消息到来,XREAD将阻塞10000毫秒(即10秒),然后再返回。下面命令中的XREAD执行后,消息队列mqstream中一直没有消息,所以,XREAD在10秒后返回空值(nil)。
XREAD block 10000 streams mqstream $
(nil)
(10.00s)
刚刚讲到的这些操作是List也支持的,接下来,我们再来学习下Streams特有的功能。
Streams本身可以使用XGROUP创建消费组,创建消费组之后,Streams可以使用XREADGROUP命令让消费组内的消费者读取消息,
例如,我们执行下面的命令,创建一个名为group1的消费组,这个消费组消费的消息队列是mqstream。
XGROUP create mqstream group1 0
OK
然后,我们再执行一段命令,让group1消费组里的消费者consumer1从mqstream中读取所有消息,其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。因为在consumer1读取消息前,group1中没有其他消费者读取过消息,所以,consumer1就得到mqstream消息队列中的所有消息了(一共4条)。
XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
2) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
3) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
4) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
需要注意的是,消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。比如说,我们执行完刚才的XREADGROUP命令后,再执行下面的命令,让group1内的consumer2读取消息时,consumer2读到的就是空值,因为消息已经被consumer1读取完了,如下所示:
XREADGROUP group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
2) (empty list or set)
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让group2中的consumer1、2、3各自读取一条消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams会自动使用内部队列(也称为PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用XACK命令通知Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给Streams发送XACK命令,消息仍然会留存。此时,消费者可以在重启后,用XPENDING命令查看已读取、但尚未确认处理完成的消息。
例如,我们来查看一下group2中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING返回结果的第二、三行分别表示group2中所有消费者读取的消息最小ID和最大ID。
XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"
如果我们还需要进一步查看某个消费者具体读取了哪些数据,可以执行下面的命令:
XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"
2) "consumer2"
3) (integer) 513336
4) (integer) 1
可以看到,consumer2已读取的消息的ID是1599274912765-0。
一旦消息1599274912765-0被consumer2处理了,consumer2就可以使用XACK命令通知Streams,然后这条消息就会被删除。当我们再使用XPENDING命令查看时,就可以看到,consumer2已经没有已读取、但尚未确认处理的消息了。
XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)
现在,我们就知道了用Streams实现消息队列的方法,我还想再强调下,Streams是Redis 5.0专门针对消息队列场景设计的数据类型,如果你的Redis是5.0及5.0以后的版本,就可以考虑把Streams用作消息队列了。
这节课,我们学习了分布式系统组件使用消息队列时的三大需求:消息保序、重复消息处理和消息可靠性保证,这三大需求可以进一步转换为对消息队列的三大要求:消息数据有序存取,消息数据具有全局唯一编号,以及消息数据在消费完成后被删除。
我画了一张表格,汇总了用List和Streams实现消息队列的特点和区别。当然,在实践的过程中,你也可以根据新的积累,进一步补充和完善这张表。
其实,关于Redis是否适合做消息队列,业界一直是有争论的。很多人认为,要使用消息队列,就应该采用Kafka、RabbitMQ这些专门面向消息队列场景的软件,而Redis更加适合做缓存。
根据这些年做Redis研发工作的经验,我的看法是:Redis是一个非常轻量级的键值数据库,部署一个Redis实例就是启动一个进程,部署Redis集群,也就是部署多个Redis实例。而Kafka、RabbitMQ部署时,涉及额外的组件,例如Kafka的运行就需要再部署ZooKeeper。相比Redis来说,Kafka和RabbitMQ一般被认为是重量级的消息队列。
所以,关于是否用Redis做消息队列的问题,不能一概而论,我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,那么,Redis只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。
按照惯例,我给你提个小问题。如果一个生产者发送给消息队列的消息,需要被多个消费者进行读取和处理(例如,一个消息是一条从业务系统采集的数据,既要被消费者1读取进行实时计算,也要被消费者2读取并留存到分布式文件系统HDFS中,以便后续进行历史查询),你会使用Redis的什么数据类型来解决这个问题呢?
欢迎在留言区写下你的思考和答案,如果觉得今天的内容对你有所帮助,也欢迎你帮我分享给更多人。我们下节课见。
评论