Redis实现消息队列 - 简书

baacloud免费翻墙vpn注册使用

来源: Redis实现消息队列 – 简书

Redis实现轻量级的消息队列与消息中间件相比,没有高级特性也没有ACK保证,无法做到数据不重不漏,如果业务简单而且对消息的可靠性不是那么严格可以尝试使用。

Redis实现消息队列

列表类型

队列

Redis中列表List类型是按照插入顺序排序的字符串链表,和数据结构中的普通链表一样,可以在头部left和尾部right添加新的元素。插入时如果键不存在Redis将为该键创建一个新的链表。如果链表中所有元素均被删除,那么该键也会被删除。

Redis List

Redis的列表List可以包含的最大元素数量为4294967295,从元素插入和删除的效率来看,如果是在链表的两头插入或删除元素将是非常高效的操作。即使链表中已经存储了数百万条记录,该操作也能在常量时间内完成。然后需要说明的是,如果元素插入或删除操作是作用于链表中间,那将是非常低效的。

Redis中对列表List的操作命令中,L表示从左侧头部开始插入和弹出,R表示从右侧尾部开始插入和弹出。

Redis提供了两种方式来做消息队列,一种是生产消费模式,另一种是发布订阅模式。

生产消费模式

生产消费模式会让一个或多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的。如果队列中没有消息,消费者会继续监听。

PUSH/POP

Redis数据结构的列表List提供了pushpup命令,遵循着先入先出FIFO的原则。使用push/pop方式的优点在于消息可以持久化,缺点是一条消息只能被一个消费者接收,消费者完全靠手速来获取,是一种比较简陋的消息队列。

Redis的队列list是有序的且可以重复的,作为消息队列使用时可使用rpush/lpush操作入队,使用lpop/rpop操作出队。当发布消息是执行lpush命令,将消息从列表左侧加入队列。消息接收方执行rpop命令从列表右侧弹出消息。

如果队列空了怎么办呢?

如果队列空了,消费者会陷入pop死循环,即使没有数据也不会停止。空轮询不但消耗消费者的CPU资源还会影响Redis的性能。傻瓜式的做法是让消费者的线程按照一定的时间间隔不停的循环和监控队列,虽然可行但显然会造成不必要的资源浪费,而且循环周期也很难确定。

对于消费者而言存在一个问题,需要不停的调用rpop查看列表中是否有待处理的消息。每调用一次都会发起一次连接,势必造成不必要的资源浪费。如果使用休眠的方式让消费者线程间隔一段时间再消费,但这样做也有两个问题:

  • 如果生产者速度大于消费者消费的速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
  • 如果休眠时间过长,就无法处理一些时效性的消息。如果休眠时间过短也会在连接上造成比较大的开销。

LPOP返回一个元素给客户端时会从List中将该元素移除,这意味着该元素只存在于客户端的上下文中。如果客户端在处理这个返回元素的过程中崩溃了,这个元素就会永远的丢失掉。

LPUSH/BRPOP

LPUSH BRPOP

使用brpopblpop实现阻塞读取

由于需要一直调用rpop/lpop才可以实现不停的监听且消费消息,为解决这个问题,Redis提供了阻塞命令brpop/blpop。使用brpop会阻塞队列,而且每次只会弹出一个消息,如果没有消息则会阻塞。

Redis列表List支持带阻塞的命令,生产者从列表左侧lpush加入消息到队列,消费者使用brpop命令从列表右侧弹出消息并设置超时时间,如果列表中没有消息则一直阻塞直到超时。这样做的目的在于减小Redis的压力。

对于Redis来说提供了blpop/brpop阻塞读,阻塞读在队列没有数据时会立即进入休眠状态,一旦数据到来则立即被唤醒,消息的延迟几乎为零。需要注意的是如果线程一直阻塞在那里,连接就会被服务器主动断开来减少资源占用,这时blpop/brpop会抛出异常,所以编写消费段时需要注意异常的处理。

BRPOP key [key ...] timeout

BLPOP/BRPOP 列表阻塞式弹出

当给定列表内没有任何元素可供弹出时,连接将被BRPOP命令阻塞,直到等待超时或发现可弹出元素为止。当给定多个key参数时,按参数key的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。另外,BRPOP除了弹出元素的位置和BLPOP不同之处,其他表现一致。

列表的阻塞式弹出特点是如果列表中没有任务时,连接将会被阻塞。连接的阻塞存在一个超时时间,当超时时间设置为0时刻无限等待直到弹出消息。

Redis的PUSH/POP机制,利用Redis的列表list数据结构,生产者lpush消息,消费者brpop消息并设定超时时间以减少Redis压力。这种方案相对于发布订阅模式的好处是数据可靠性提高了,只有在Redis宕机且数据没有持久化的情况下会丢失数据。可以根据业务通过AOF和缩短持久化间隔来保证较高的可靠性,也可以通过多个客户端来提高消息速度。但相对于专业的消息队列中间件,发布订阅模式的状态过于简单(没有状态),而且没有ACK机制,消息取出后消费失败依赖于客户端记录日志或重新push到队列中。

Redis中实现生产者和消费者模型,可使用LPUSHRPOP来实现该功能。不过当列表为空时消费者就需要轮询来获取消息,这样会增加Redis的访问压力和消费者的CPU时间,另外很多访问也是无用的。为此Redis提供了阻塞式访问BRPOPBLPOP命令,消费者可以在获取数据时指定如果数据不存在阻塞的时间,如果在时限内获得数据则立即返回,如果超时还没有数据则返回NULL,可使用0表示一直阻塞。同时Redis会为所有阻塞的消费者以先后顺序排序。

使用Redis的列表来实现一个任务队列,开启两个程序,一个作为生产者使用LPUSH写队列,一个作为消费者使用RPOP读队列。由于消费者并不知道什么时候会有消息过来,所以消费者需要一直循环读取数据。两者的消息可以使用JSON进行封装协议传输。由于消费者在没有读到数据的情况下,会一直循环读取,对系统来说十分耗费资源,此时可利用Redis提供的阻塞读取命令BRPOP进行改进。使用BRPOP改进后,消费者不会一直循环读取,而是一直阻塞等到有消息过来时才读取。

发布订阅模式

发布订阅模式是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。

PUB/SUB

Redis自带pub/sub机制即发布订阅模式,此模式中生产者producer和消费者consumer之间的关系是一对多的,也就是一条消息会被多个消费者所消费,当只有一个消费者时可视为一对一的消息队列。

发布订阅机制模型

首先发布者将消息发布到频道,客户端订阅频道后就能获得频道的消息。

发布订阅模式命令

  • psubscribe 订阅一个或多个符合给定模式的频道
  • publish 将消息发布到指定的频道
  • pubsub查看订阅与发布系统状态
  • pubsub channels pattern 列出当前的活跃频道
  • pubsub numsub channel-1 channel-n 获取给定频道的订阅者数量
  • pubsub numpat 获取订阅模式的数量
  • punsubscribe 指示客户端退订所有给定模式
  • subscribe 订阅给定的一个或多个频道的消息
  • unsubscribe 指示客户端退订给定的频道

实现

使用PHP+Redis实现消息队列

操作流程

  1. PHP接收请求和数据
  2. PHP将数据写入Redis队列(入队)
  3. Shell定时调用PHP读取队列数据并写入数据库(出队)

入队inqueue.php

$result = $redis->rpush("queue", json_encode($data));
if($result){
  echo "inqueue success";
}

出队 outqueue.php

#! /usr/bin/php
<?php
$result = $redis->lpop("queue");
if($result){
  $data = json_decode($result, true);
}

定时任务:process.sh

# 每分钟调用一次定时脚本
* * * * * /scripts/process.sh

定时脚本:process.sh

#! /bin/bash
#filename : process.sh
php /scripts/outqueue.php

出队采用死循环方式,感谢 行走平凡 的指正。

$ vim outqueue.php

while(true){
  $result = $redis->lpop("queue");
  if($result){
    $data = json_decode($result, true);
  }
}

作者:JunChow520
链接:https://www.jianshu.com/p/02a923fea175
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏