RabbitMQ系列(七)--批量消息和延时消息 - Diamond-Shine - 博客园

来源: RabbitMQ系列(七)–批量消息和延时消息 – Diamond-Shine – 博客园

批量消息发送模式

批量消息是指把消息放到一个集合统一进行提交,这种方案设计思路是希望消息在一个会话里,比如放到ThreadLocal里的集合,拥有相同

的会话ID,带有这次提交信息的size等属性,最重要的是吧这一批消息进行合并。对于channel就是发送一次消息。这种方式也是希望消费端在消

费的时候,可以进行批量化的消费,针对一个原子业务的操作进行处理,但是不保证可靠性,需要进行补偿机制。

图例:

伪代码思路:

复制代码
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BatchMessage {
    private long sessionId;
    private List<String> messageHolder = new ArrayList<>();    //用来保存message的集合
    private int listSize;    //集合的条数
}
复制代码

 

把message放到ThreadLocal里面使用,这些批量的消息需要同一个sessionId,如果要入库,只是保存sessionId对应的消息集合,而不是每条消息

步骤:

1、首先业务数据入库

2、将批量消息对应的BatchMessage入库,状态为发送中

3、发送message到Broker

4、返回confirm确认

5、修改状态为消费成功

6、。。。。后面不讲了,和之前博客思路一样

延迟消息发送模式

使用场景:

1、在电商平台买到的商品签收后,不点击确认支付,系统自动在一定时间进行支付操作

2、自动超时作废的场景,你的优惠券/红包也有使用时限,也可以用延迟消息机制

实现:

1、DLX和TTL:Consumer订阅DLX,Message发送到原Queue,设置TTL为30分钟。TTL到期,消息发送到DLX,然后被Consumer消费,就可以实现延迟队列

2、rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能

安装、启用插件

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 30*60*1000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

 

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏