RabbitMQ学习系列(一): 介绍 - 章为忠 - 博客园

mikel阅读(905)

来源: RabbitMQ学习系列(一): 介绍 – 章为忠 – 博客园

  1. 介绍

      RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue )协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。

      RabbitMQ的官网:http://www.rabbitmq.com

  2. AMQP

    AMQP,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,同样,消息使用者也不用知道发送者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

  3. 系统架构

       

  消息队列的使用过程大概如下:

    (1)客户端连接到消息队列服务器,打开一个channel。

    (2)客户端声明一个exchange,并设置相关属性。

    (3)客户端声明一个queue,并设置相关属性。

    (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

    (5) 客户端投递消息到exchange。exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

    如下图所示:AMQP 里主要要说两个组件:Exchange 和 Queue

绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,

这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型。

 

  4. 几个概念

    P: 为Producer,数据的发送方。

    C:为Consumer,数据的接收方。

    Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

    Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

    Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

    Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

    channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

   PS: RabbitMQ 的一些基本的概念,就介绍完了,虽然都是些概念,但是了解他的一些原理,还是很重要的,特别是exchange 和 路由的概念和作用。接下来会具体介绍他的安装和使用。

   查看RabbitMQ 系列其他文章,http://www.cnblogs.com/zhangweizhong/category/855479.html

我是如何用 Redis 做实时订阅推送的? - 简书

mikel阅读(1218)

来源: 我是如何用 Redis 做实时订阅推送的? – 简书

前阵子开发了公司领劵中心的项目,这个项目是以redis作为关键技术落地的。

先说一下领劵中心的项目吧,这个项目就类似京东app的领劵中心,当然图是截取京东的,公司的就不截了。。。

image

其中有一个功能叫做领劵的订阅推送。什么是领劵的订阅推送?就是用户订阅了该劵的推送,在可领取前的一分钟就要把提醒信息推送到用户的app中。本来这个订阅功能应该是消息中心那边做的,但他们说这个短时间内做不了。所以让我这个负责优惠劵的做了-.-!。具体方案就是到具体的推送时间点了,coupon系统调用消息中心的推送接口,把信息推送出去。

下面我们分析一下这个功能的业务情景。公司目前注册用户6000W+,是哪家就不要打听了。。。比如有一张无门槛的优惠劵下单立减20元,那么抢这张劵的人就会比较多,我们保守估计10W+,百万级别不好说。我们初定为20W万人,那么这20W条推送信息要在一分钟推送完成!并且一个用户是可以订阅多张劵的。所以我们知道了这个订阅功能的有两个突出的难点:

1、推送的实效性:推送慢了,用户会抱怨没有及时通知他们错过了开抢时机。

2、推送的体量大:爆款的神劵,人人都想抢!

然而推送体量又会影响到推送的实效性。这真是一个让人头疼的问题!

那就让我们把问题一个个解决掉吧!

推送的实效性的问题:当用户在领劵中心订阅了某个劵的领取提醒后,在后台就会生成一条用户的订阅提醒记录,里面记录了在哪个时间点给用户发送推送信息。所以问题就变成了系统如何快速实时选出哪些要推送的记录!

方案1:MQ的延迟投递。MQ虽然支持消息的延迟投递但尺度太大1s 5s 10s 30s 1m,用来做精确时间点投递不行!并且用户执行订阅之后又取消订阅的话,要把发出去的MQ消息delete掉这个操作有点头大,短时间内难以落地!并且用户可以取消之后再订阅,这又涉及到去重的问题。所以MQ的方案否掉。

方案2:传统定时任务。这个相对来说就简单一点,用定时任务是去db里面load用户的订阅提醒记录,从中选出当前可以推送的记录。但有句话说得好任何脱离实际业务的设计都是耍流氓~。下面我们就分析一下传统的定时任务到底适不适合我们的这个业务!

image

综上所述我们就知道了一般传统的定时任务存在以下缺点:

1、性能瓶颈。只有一台机在处理,在大体量数据面前力不从心!

2、实效性差。定时任务的频率不能太高,太高会业务数据库造成很大的压力!

3、单点故障。万一跑的那台机挂了,那整个业务不可用了-。- 这是一个很可怕的事情!

所以传统定时任务也不太适合这个业务。。。

那我们是不是就束手无策了呢?其实不是的! 我们只要对传统的定时任务做一个简单的改造!就可以把它变成可以同时多机跑,并且实效性可以精确到秒级,并且拒绝单点故障的定时任务集群!这其中就要借助我们的强大的redis了。

方案3:定时任务集群

首先我们要定义定时任务集群要解决的三个问题!

1、实效性要高

2、吞吐量要大

3、服务要稳定,不能有单点故障

下面是整个定时任务集群的架构图。

image

架构很简单:我们把用户的订阅推送记录存储到redis集群的sortedSet队列里面,并且以提醒用户提醒时间戳作为score值,然后在我们个每业务server里面起一个定时器频率是秒级,我的设定就是1s,然后经过负载均衡之后从某个队列里面获取要推送的用户记录进行推送。下面我们分析以下这个架构

1、性能:除去带宽等其它因素,基本与机器数成线性相关。机器数量越多吞吐量越大,机器数量少时相对的吞吐量就减少。

2、实效性:提高到了秒级,效果还可以接受。

3、单点故障?不存在的!除非redis集群或者所有server全挂了。。。。

这里解析一下为什么用redis?

第一redis 可以作为一个高性能的存储db,性能要比MySQL好很多,并且支持持久化,稳定性好。

第二redis SortedSet队列天然支持以时间作为条件排序,完美满足我们选出要推送的记录。

ok~既然方案已经有了那如何在一天时间内把这个方案落地呢?是的我设计出这个方案到基本编码完成,时间就是一天。。。因为时间太赶鸟。

首先我们以user_id作为key,然后mod队列数hash到redis SortedSet队列里面。为什么要这样呢,因为如果用户同时订阅了两张劵并且推送时间很近,这样的两条推送就可以合并成一条~,并且这样hash也相对均匀。下面是部分代码的截图:

image

然后要决定队列的数量,一般正常来说我们有多少台处理的服务器就定义多少条队列。因为队列太少,会造成队列竞争,太多可能会导致记录得不到及时处理。

然而最佳实践是队列数量应该是可动态配置化的,因为线上的集群机器数是会经常变的。大促的时候我们会加机器是不是,并且业务量增长了,机器数也是会增加是不是~。所以我是借用了淘宝的diamond进行队列数的动态配置。

image

我们每次从队列里面取多少条记录也是可以动态配置的

image

这样就可以随时根据实际的生产情况调整整个集群的吞吐量~。 所以我们的定时任务集群还是具有一个特性就是支持动态调整~。

最后一个关键组件就是负载均衡了。这个是非常重要的!因为这个做得不好就会可能导致多台机竞争同时处理一个队列,影响整个集群的效率!在时间很紧的情况下我就用了一个简单实用的利用redis一个自增key 然后 mod 队列数量算法。这样就很大程度上就保证不会有两台机器同时去竞争一条队列~.

image

最后我们算一下整个集群的吞吐量

10(机器数) * 2000(一次拉取数) = 20000。然后以MQ的形式把消息推送到消息中心,发MQ是异步的,算上其它处理0.5s。

其实发送20W的推送也就是10几s的事情。

ok~ 到这里我们整个定时任务集群就差不多基本落地好了。如果你问我后面还有什么可以完善的话那就是:

1、加监控, 集群怎么可以木有监控呢,万一出问题有任务堆积怎么办~

2、加上可视化界面。

3、最好有智能调度,增加任务优先级。优先级高的任务先运行嘛。

4、资源调度,万一机器数量不够,力不从心,优先保证重要任务执行。

目前项目已上前线,运行平稳~。


以上,便是今天的分享,希望大家喜欢。

作者:夜空_2cd3
链接:https://www.jianshu.com/p/7ad2d539f010
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(七)如果让你设计一个MQ,你怎么设计 - 简书

mikel阅读(810)

来源: 关于MQ的几件小事(七)如果让你设计一个MQ,你怎么设计 – 简书

其实回答这类问题,说白了,起码不求你看过那技术的源码,起码你大概知道那个技术的基本原理,核心组成部分,基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好

比如说这个消息队列系统,我们来从以下几个角度来考虑一下

(1)首先这个mq得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下kafka的设计理念,broker -> topic -> partition,每个partition放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给topic增加partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?

(2)其次你得考虑一下这个mq的数据要不要落地磁盘吧?那肯定要了,落磁盘,才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是kafka的思路。

(3)其次你考虑一下你的mq的可用性啊?这个事儿,具体参考我们之前可用性那个环节讲解的kafka的高可用保障机制。多副本 -> leader & follower -> broker挂了重新选举leader即可对外服务。

(4)能不能支持数据0丢失啊?可以的,参考我们之前说的那个kafka数据零丢失方案

其实一个mq肯定是很复杂的,其实这是个开放题,就是看看你有没有从架构角度整体构思和设计的思维以及能力。

如果你还不清楚,请参考前面几篇
消息队列的用途、优缺点、技术选型
如何保证消息队列的高可用
如何保证消息不重复消费
如何防止数据队列数据丢失
如何保证消息按顺序执行
消息积压在消息队列里怎么办

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/08ef2219411f
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(五)如何保证消息按顺序执行 - 简书

mikel阅读(984)

来源: 关于MQ的几件小事(五)如何保证消息按顺序执行 – 简书

1.为什么要保证顺序

消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。举例:
比如通过mySQL binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了 删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。

2.出现顺序错乱的场景

(1)rabbitmq
①一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。

rabbitmq消息顺序错乱第一种情况示意图.png

②一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。

abbitmq消息顺序错乱第二种情况示意图.png

(2)kafka
①kafka一个topic,一个partition,一个consumer,但是consumer内部进行多线程消费,这样数据也会出现顺序错乱问题。

kafka消息顺序错乱第一种情况示意图.png

②具有顺序的数据写入到了不同的partition里面,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。

kafka消息顺序错乱第二种情况示意图..png

3.保证消息的消费顺序

(1)rabbitmq
①拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。

一个queue对应一个consumer

②或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理

一个queue对应一个consumer,采用多线程.png

(2)kafka
①确保同一个消息发送到同一个partition,一个topic,一个partition,一个consumer,内部单线程消费。

单线程保证顺序.png

②写N个内存queue,然后N个线程分别消费一个内存queue即可

多线程保证顺序.png

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/02fdcb9e8784
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(一)消息队列的用途、优缺点、技术选型 - 简书

mikel阅读(841)

来源: 关于MQ的几件小事(一)消息队列的用途、优缺点、技术选型 – 简书

1.为什么使用消息队列?

(1)解耦:可以在多个系统之间进行解耦,将原本通过网络之间的调用的方式改为使用MQ进行消息的异步通讯,只要该操作不是需要同步的,就可以改为使用MQ进行不同系统之间的联系,这样项目之间不会存在耦合,系统之间不会产生太大的影响,就算一个系统挂了,也只是消息挤压在MQ里面没人进行消费而已,不会对其他的系统产生影响。

不使用MQ的情况.png
使用MQ进行解耦之后.png

(2)异步:加入一个操作设计到好几个步骤,这些步骤之间不需要同步完成,比如客户去创建了一个订单,还要去客户轨迹系统添加一条轨迹、去库存系统更新库存、去客户系统修改客户的状态等等。这样如果这个系统都直接进行调用,那么将会产生大量的时间,这样对于客户是无法接收的;并且像添加客户轨迹这种操作是不需要去同步操作的,如果使用MQ将客户创建订单时,将后面的轨迹、库存、状态等信息的更新全都放到MQ里面然后去异步操作,这样就可加快系统的访问速度,提供更好的客户体验。

不使用MQ情况.png
使用MQ进行异步之后.png

(3)削峰:一个系统访问流量有高峰时期,也有低峰时期,比如说,中午整点有一个抢购活动等等。比如系统平时流量并不高,一秒钟只有100多个并发请求,系统处理没有任何压力,一切风平浪静,到了某个抢购活动时间,系统并发访问了剧增,比如达到了每秒5000个并发请求,而我们的系统每秒只能处理2000个请求,那么由于流量太大,我们的系统、数据库可能就会崩溃。这时如果使用MQ进行流量削峰,将用户的大量消息直接放到MQ里面,然后我们的系统去按自己的最大消费能力去消费这些消息,就可以保证系统的稳定,只是可能要跟进业务逻辑,给用户返回特定页面或者稍后通过其他方式通知其结果。

使用MQ进行削峰.png

2.消息队列有什么优点和缺点?

优点:1、对结构复杂、设计系统多的操作进行解耦操作,降低系统的操作复杂度、降低系统的维护成本。
2、对一个可以进行异步操作的一些系统操作进行异步,减小操作的响应时间,提供更好的用户体验。
3、可对高流量进行削峰,保证系统的平稳运行。
缺点:1、系统可用性降低。比如在系统中引入MQ,那么万一MQ挂了怎么办呢?一般而言,引入的外部依赖越多,系统越
脆弱,每一个依赖出问题都会导致整个系统的崩溃。
2、系统复杂度提高。需要考虑MQ的各种情况,比如:消息的重复消费、消息丢失、保证消费顺序等等……
3、数据一致性问题。比如A系统已经给客户返回操作成功,这时候操作BC都成功了,操作D却失败了,导致数据不
一致。

3.kafka、activemq、rabbitmq、rocketmq都有什么优点和缺点啊?

特性 ActiveMQ RabbitMQ RocketMQ kafka
单机吞吐量 万级,吞吐量比RocketMQ和kafka要低一个数量级 万级,吞吐量比RocketMQ和kafka要低一个数量级 10万级,RocketMQ也是可以支撑高吞吐的一种MQ 10万级别,kafka最大优点就是吞吐量大,一般配合大数据类的系统来进行实时数据计算、日志采集等场景。
topic数量对吞吐量的影响 topic可以达到几百、几千个的级别,吞吐量会有小幅度的下降。这是RocketMQ的一大优势,可在同等数量机器下支撑大量的topic topic从几十个到几百个的时候,吞吐量会大幅下降。所以在同等机器数量下,kafka尽量保证topic数量不要过多。如果支撑大规模topic需要增加更多的机器
时效性 ms级 微秒级,这是rabbitmq的一大特点,延迟是最低的 ms级 延迟在ms级以内
可用性 高,基于主从架构实现可用性 高,基于主从架构实现可用性 非常高,分布式架构 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 经过参数优化配置,可以做到0丢失 经过参数配置,消息可以做到零丢失
功能支持 MQ领域的功能及其完备 基于erlang开发,所以并发性能极强,性能极好,延时低 MQ功能较为完备,分布式扩展性好 功能较为简单,主要支持加单MQ功能
优势 非常成熟,功能强大,在业内大量公司和项目中都有应用 erlang语言开发,性能极好、延时很低,吞吐量万级、MQ功能完备,管理界面非常好,社区活跃;互联网公司使用较多 接口简单易用,阿里出品有保障,吞吐量大,分布式扩展方便、社区比较活跃,支持大规模的topic、支持复杂的业务场景,可以基于源码进行定制开发 超高吞吐量,ms级的时延,极高的可用性和可靠性,分布式扩展方便
劣势 偶尔有较低概率丢失消息,社区活跃度不高 吞吐量较低,erlang语音开发不容易进行定制开发,集群动态扩展麻烦 接口不是按照标准JMS规范走的,有的系统迁移要修改大量的代码,技术有被抛弃的风险 有可能进行消息的重复消费
应用 主要用于解耦和异步,较少用在大规模吞吐的场景中 都有使用 用于大规模吞吐、复杂业务中 在大数据的实时计算和日志采集中被大规模使用,是业界的标准

综上所述,总结如下:
一般业务系统要引入MQ,最早大家都用ActiveMQ,但现在用的不多了。没有经过大规模吞吐场景的验证,社区也不活跃,不推荐再使用。
后来大家开始用rabbitMQ,但是它是使用erlang语言开发的,如果不精通erlang,对公司而言,几乎处于不可控的状态,单其是开源的,社区活跃度高,拥有比较稳定的支持。
现在越来越多的公司开始使用RocketMQ,但是要小心被抛弃的风险。如果公司有实力自己去维护开发,推荐使用。否则还是选择RabbitMQ。
如果实在大数据的实时计算、日志采集等领域,用kafka是业界标准。

所以,对于中小型公司,技术实力一般的,应该用rabbitmq,对于大公司,基础架构研发能力强大的,推荐使用RocketMQ。

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/fdd94be6037a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(二)如何保证消息队列的高可用 - 简书

mikel阅读(928)

来源: 关于MQ的几件小事(二)如何保证消息队列的高可用 – 简书

1.RabbitMQ的高可用

RabbitMQ基于主从模式实现高可用。RabbitMQ有三种模式:单机模式,普通集群模式,镜像集群模式。
(1)单机模式:
单机模式就是demo级别的,生产中不会有人使用。
(2)普通集群模式
普通集群模式就是在多台机器上启动多个rabbitmq实例,每个机器启动一个。但是创建的queue只会放在一个rabbitmq实例上面,但是其他的实例都同步了这个queue的元数据。在你消费的时候,如果连接到了另一个实例,他会从拥有queue的那个实例获取消息然后再返回给你。

普通集群模式示意图.png

这种方式并没有做到所谓消息的高可用,就是个普通的集群,这样还会导致要么消费者每次随机连接一个实例然后拉取数据,这样的话在实例之间会产生网络传输,增加系统开销,要么固定连接那个queue所在的实例消费,这样会导致单实例的性能瓶颈。

而且如果那个方queue的实例宕机了,会导致接下来其他实例都无法拉取数据;如果没有开启消息的持久化会丢失消息;就算开启了消息的持久化,消息不一定会丢,但是也要等这个实例恢复了,才可以继续拉取数据。
所以这个并没有提供高可用,这种方案只是提高了吞吐量,也就是让集群中多个节点来服务某个queue的读写操作。
(3)镜像集群模式
这种模式,才是rabbitmq提供是真正的高可用模式,跟普通集群不一样的是,你创建的queue,无论元数据还是queue里面是消息数据都存在多个实例当中,然后每次写消息到queue的时候,都会自动把消息到多个queue里进行消息同步。

镜像集群模式示意图.png

这种模式的好处在于,任何一台机器宕机了,其他的机器还可以使用。
坏处在于:1、性能消耗太大,所有机器都要进行消息的同步,导致网络压力和消耗很大。2、没有扩展性可言,如果有一个queue负载很重,就算加了机器,新增的机器还是包含了这个queue的所有数据,并没有办法扩展queue。
如何开启镜像集群模式:在控制台新增一个镜像集群模式的策略,指定的时候可以要求数据同步到所有节点,也可以要求同步到指定节点,然后在创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上面去了。 

2.kafka的高可用

(1)kafka的一个基本架构:多个broker组成,一个broker是一个节点;你创建一个topic,这个topic可以划分成多个partition,每个partition可以存在于不同的broker上面,每个partition存放一部分数据。这是天然的分布式消息队列。

实际上rabbitmq并不是分布式消息队列,他就是传统的消息队列,只不过提供了一些集群、HA的机制而已,因为无论如何配置,rabbitmq一个queue的数据就存放在一个节点里面,镜像集群下,也是每个节点都放这个queue的全部数据。

kafka在0.8以前是没有HA机制的,也就是说任何一个broker宕机了,那个broker上的partition就丢了,没法读也没法写,没有什么高可用可言。

kafka在0.8之后,提过了HA机制,也就是replica副本机制。每个partition的数据都会同步到其他机器上,形成自己的replica副本。然后所有的replica副本会选举一个leader出来,那么生产者消费者都和这个leader打交道,其他的replica就是follower。写的时候,leader会把数据同步到所有follower上面去,读的时候直接从leader上面读取即可。
为什么只能读写leader:因为要是你可以随意去读写每个follower,那么就要关心数据一致性问题,系统复杂度太高,容易出问题。kafka会均匀度讲一个partition的所有数据replica分布在不同的机器上,这样就可以提高容错性。
这样就是高可用了,因为如果某个broker宕机 了,没事儿,那个broker的partition在其他机器上有副本,如果这上面有某个partition的leader,那么此时会重新选举出一个现代leader出来,继续读写这个新的leader即可。

kafka高可用架构示意图.png

写消息: 写数据的时候,生产者就写leader,然后leader将数据落到磁盘上之后,接着其他follower自己主动从leader来pull数据。一旦所有follower同步好了数据,就会发送ack个leader,leader收到了所有的follower的ack之后,就会返回写成功的消息给消息生产者。(这只是一种模式,可以调整)。
读数据:消费数据的时候,只会从leader进行消费。但是只有一个消息已经被所有follower都同步成功返回ack的时候,这个消息才会被消费者读到。

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/ab64681beb17
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(三)如何保证消息不重复消费 - 简书

mikel阅读(822)

来源: 关于MQ的几件小事(三)如何保证消息不重复消费 – 简书

1.幂等性

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。
在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。例如,“setTrue()”函数就是一个幂等函数,无论多次执行,其结果都是一样的.更复杂的操作幂等保证是利用唯一交易号(流水号)实现.

简单来说,幂等性就是一个数据或者一个请求,给你重复来了多次,你得确保对应的数据是不会改变的,不能出错。

2.出现重复消费场景

(1)首先,比如rabbitmq、rocketmq、kafka,都有可能会出现消息重复消费的问题。因为这个问题通常不是由mq来保证的,而是消费方自己来保证的。
(2)举例kafka来说明重复消费问题
kafka有一个叫做offset的概念,就是每个消息写进去,都有一个offset代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次就算重启,kafka就会让消费者从上次消费到的offset来继续消费。

但是万事总有例外,如果consumer消费了数据,还没来得及发送自己已经消费的消息的offset就挂了,那么重启之后就会收到重复的数据。

kafka重复消费示意图.png

3.保证幂等性(重复消费)

要保证消息的幂等性,这个要结合业务的类型来进行处理。下面提供几个思路供参考:
(1)、可在内存中维护一个set,只要从消息队列里面获取到一个消息,先查询这个消息在不在set里面,如果在表示已消费过,直接丢弃;如果不在,则在消费后将其加入set当中。
(2)、如何要写数据库,可以拿唯一键先去数据库查询一下,如果不存在在写,如果存在直接更新或者丢弃消息。
(3)、如果是写redis那没有问题,每次都是set,天然的幂等性。
(4)、让生产者发送消息时,每条消息加一个全局的唯一id,然后消费时,将该id保存到redis里面。消费时先去redis里面查一下有么有,没有再消费。
(5)、数据库操作可以设置唯一键,防止重复数据的插入,这样插入只会报错而不会插入重复数据。

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/172295e2e978
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(四)如何保证消息不丢失 - 简书

mikel阅读(889)

来源: 关于MQ的几件小事(四)如何保证消息不丢失 – 简书

1.mq原则

数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。

2.丢失数据场景

丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下,丢失数据的场景,
(1)rabbitmq
A:生产者弄丢了数据
生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
B:rabbitmq自己丢了数据
如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所依必须开启持久化将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
C:消费端弄丢了数据
主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rabbitmq就认为你已经消费过了,然后就丢了数据。

rabbitmq数据丢失示意图.png

(2)kafka
A:生产者弄丢了数据
生产者没有设置相应的策略,发送过程中丢失数据。
B:kafka弄丢了数据
比较常见的一个场景,就是kafka的某个broker宕机了,然后重新选举partition的leader时。如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,他就少了一部分数据。
C:消费者弄丢了数据
消费者消费到了这个数据,然后消费之自动提交了offset,让kafka知道你已经消费了这个消息,当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。 

kafka丢失数据示意图.png

3.如何防止消息丢失

(1)rabbitmq
A:生产者丢失消息
①:可以选择使用rabbitmq提供是事物功能,就是生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物。

  channel.txSelect();//开启事物
  try{
      //发送消息
  }catch(Exection e){
      channel.txRollback()//回滚事物
      //重新提交
  }

缺点:rabbitmq事物已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。

②:可以开启confirm模式。在生产者哪里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

    //开启confirm
    channel.confirm();
    //发送成功回调
    public void ack(String messageId){
      
    }

    // 发送失败回调
    public void nack(String messageId){
        //重发该消息
    }

二者不同
事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rabbitmq会回调告知成功与否。
一般在生产者这块避免丢失,都是用confirm机制。
B:rabbitmq自己弄丢了数据
设置消息持久化到磁盘。设置持久化有两个步骤:
①创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。
②发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。
必须要同时开启这两个才可以。

而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
C:消费者弄丢了数据
使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。

(2)kafka
A:消费端弄丢了数据
关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。
B:kafka弄丢了数据
一般要求设置4个参数来保证消息不丢失:
①给topic设置 replication.factor参数:这个值必须大于1,表示要求每个partition必须至少有2个副本。

②在kafka服务端设置min.isync.replicas参数:这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。

③在生产者端设置acks=all:表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了

④在生产者端设置retries=MAX(很大的一个值,表示无限重试):表示 这个是要求一旦写入事变,就无限重试
C:生产者弄丢了数据
如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/8ed16edc73e4
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

容器编排系统k8s之Ingress资源 - Linux-1874 - 博客园

mikel阅读(1289)

来源: 容器编排系统k8s之Ingress资源 – Linux-1874 – 博客园

前文我们了解了k8s上的service资源的相关话题,回顾请参考:https://www.cnblogs.com/qiuhom-1874/p/14161950.html;今天我们来了解下k8s上的Ingress资源的相关话题;

我们知道在k8s上service是用来解决Pod访问问题,它是通过kube-proxy在每个节点上创建iptables规则或ipvs规则,在用户请求某个pod时,用户的请求会被其service规则所捕获,从而实现访问对应pod;对于service来讲,用户请求直接在传输层就被捕获转发,效率很高效,但这同时也引入了一个新问题;比如我们运行的pod对外客户端访问需要https通信,如果使用service这种4层调度,那就意味着每个pod上我们要配置证书,这很显然不是我们想要做的;那有没有什么办法做到在用户访问pod对应的service时使用https,而对应pod里又不用https协议呢?答案是有的;比如我们可以使用nginx来做https会话卸载器;我们只需要在代理上配置证书即可;又比如我们在k8s上运行了各种各样的pod,这些pod的功能每个都不一样,有点是专门处理用户认证的,有点是专门处理站点主页的,有点专门处理支付的等等,而这些pod对外都是提供一个独有的url,那么这些pod需要怎么才能被集群外部访问到呢?我们知道对于一个站点来讲,如果后端有多个server同时提供一种服务,我们可以把这些同功能的server定义成一个组,然后使用nginx代理将不同功能url的访问代理到不同组上即可;这样一来解决了后端多server被负载访问的问题;那么对于k8s上这种同功能的pod怎么归并成一个组呢?用户访问不同url怎么调度到不同的组上呢?很显然要想实现这些功能,在k8s上应该有一个类似nginx一样的代理存在;这个代理就叫做ingress 控制器;ingress 控制器和k8s上的其他控制不一样,ingress控制器并不能直接运行为kube-controller-manager的一部分,它类似k8s集群上的coredns,需要在集群上单独部署,本质上就是一个pod,我们可以使用k8s上的ds或deploy控制器来创建它;ingress controller pod的作用主要是引入集群外部流量,并实时监控着apiserver上ingress资源的变动,并将其ingress中定义的规则转化为对应ingress控制器对应应用程序的专有配置,然后动态的重载或重启对应守护进程来使其配置文件生效;在k8s上ingress是一种标准资源,它本质上就是我们定义的基于dns名称(host)或url路径把请求转发至指定service资源的规则;简单讲ingress就是我们用来定义代理的配置所创建的资源;ingress控制器就是把对应ingress规则转换为对应ingress控制器中应用程序的专有配置,然后重启或重载对应配置文件使其生效的组件;

ingress和ingress controller pod的关系

提示:如上图所示,ingress就是ingress 控制器pod的代理规则;用户请求某个后端pod所提供的服务时,首先会通过ingress controller pod把流量引入到集群内部,然后ingress controller pod根据ingress定义的规则,把对应ingress规则转化为对应ingress controller pod实现的对应应用的配置(ingress controller 可以由任何具有七层反向代理功能的服务实现,比如nginx,haproxy等等)然后再适配用户请求,把对应请求反代到对应service上;而对于pod的选择上,ingress控制器可以基于对应service中的标签选择器,直接同pod直接通信,无须通过service对象api的再次转发,从而省去了用户请求到kube-proxy实现的代理开销(本质上ingress controller 也是运行为一个pod,和其他pod在同一网段中);

ingress controller部署

在k8s上ingress controller的实现有很多,比如基于nginx的,基于haproxy的等等,这里以nginx为例;

下载ingress-nginx包

1
wget https://github.com/kubernetes/ingress-nginx/archive/nginx-0.28.0.tar.gz

解压包,找到对应的部署清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@master01 ~]# ll
total 92144
-rw------- 1 root root 65586688 Dec  8 15:16 flannel-v0.13.1-rc1.tar
drwxr-xr-x 2 root root     4096 Dec 21 21:04 manifests
-rw-r--r-- 1 root root 28760559 Dec 21 21:02 nginx-0.28.0.tar.gz
[root@master01 ~]# tar xf nginx-0.28.0.tar.gz
[root@master01 ~]# ls
flannel-v0.13.1-rc1.tar  ingress-nginx-nginx-0.28.0  manifests  nginx-0.28.0.tar.gz
[root@master01 ~]# cd ingress-nginx-nginx-0.28.0/
[root@master01 ingress-nginx-nginx-0.28.0]# ls
build         code-of-conduct.md  docs    hack      labels.yaml  mkdocs.yml      README.md              SECURITY_CONTACTS  version
Changelog.md  CONTRIBUTING.md     go.mod  images    LICENSE      OWNERS          requirements-docs.txt  test
cmd           deploy              go.sum  internal  Makefile     OWNERS_ALIASES  rootfs                 vendor
[root@master01 ingress-nginx-nginx-0.28.0]# cd deploy/
[root@master01 deploy]# ls
aws        cloud-generic  grafana   prometheus  static                       with-validating-webhook.yaml.tpl
baremetal  cluster-wide   minikube  README.md   validating-webhook.yaml.tpl
[root@master01 deploy]# cd static/
[root@master01 static]# ls
configmap.yaml  mandatory.yaml  namespace.yaml  provider  rbac.yaml  with-rbac.yaml
[root@master01 static]# pwd
/root/ingress-nginx-nginx-0.28.0/deploy/static
[root@master01 static]#

提示:资源配置清单在ingress-nginx-nginx-0.28.0/deploy/static下,名为mandatory.yaml;

资源配置清单内容

复制代码
apiVersion: v1
kind: Namespace
metadata:
  name: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---

kind: ConfigMap
apiVersion: v1
metadata:
  name: nginx-configuration
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
  name: tcp-services
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
  name: udp-services
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nginx-ingress-serviceaccount
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: nginx-ingress-clusterrole
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
rules:
  - apiGroups:
      - ""
    resources:
      - configmaps
      - endpoints
      - nodes
      - pods
      - secrets
    verbs:
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - nodes
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      - services
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - events
    verbs:
      - create
      - patch
  - apiGroups:
      - "extensions"
      - "networking.k8s.io"
    resources:
      - ingresses
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - "extensions"
      - "networking.k8s.io"
    resources:
      - ingresses/status
    verbs:
      - update

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
  name: nginx-ingress-role
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
rules:
  - apiGroups:
      - ""
    resources:
      - configmaps
      - pods
      - secrets
      - namespaces
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      - configmaps
    resourceNames:
      # Defaults to "<election-id>-<ingress-class>"
      # Here: "<ingress-controller-leader>-<nginx>"
      # This has to be adapted if you change either parameter
      # when launching the nginx-ingress-controller.
      - "ingress-controller-leader-nginx"
    verbs:
      - get
      - update
  - apiGroups:
      - ""
    resources:
      - configmaps
    verbs:
      - create
  - apiGroups:
      - ""
    resources:
      - endpoints
    verbs:
      - get

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
  name: nginx-ingress-role-nisa-binding
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: nginx-ingress-role
subjects:
  - kind: ServiceAccount
    name: nginx-ingress-serviceaccount
    namespace: ingress-nginx

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: nginx-ingress-clusterrole-nisa-binding
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: nginx-ingress-clusterrole
subjects:
  - kind: ServiceAccount
    name: nginx-ingress-serviceaccount
    namespace: ingress-nginx

---

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-ingress-controller
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: ingress-nginx
      app.kubernetes.io/part-of: ingress-nginx
  template:
    metadata:
      labels:
        app.kubernetes.io/name: ingress-nginx
        app.kubernetes.io/part-of: ingress-nginx
      annotations:
        prometheus.io/port: "10254"
        prometheus.io/scrape: "true"
    spec:
      # wait up to five minutes for the drain of connections
      terminationGracePeriodSeconds: 300
      serviceAccountName: nginx-ingress-serviceaccount
      nodeSelector:
        kubernetes.io/os: linux
      containers:
        - name: nginx-ingress-controller
          image: quay.io/kubernetes-ingress-controller/nginx-ingress-controller:0.28.0
          args:
            - /nginx-ingress-controller
            - --configmap=$(POD_NAMESPACE)/nginx-configuration
            - --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
            - --udp-services-configmap=$(POD_NAMESPACE)/udp-services
            - --publish-service=$(POD_NAMESPACE)/ingress-nginx
            - --annotations-prefix=nginx.ingress.kubernetes.io
          securityContext:
            allowPrivilegeEscalation: true
            capabilities:
              drop:
                - ALL
              add:
                - NET_BIND_SERVICE
            # www-data -> 101
            runAsUser: 101
          env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: POD_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          ports:
            - name: http
              containerPort: 80
              protocol: TCP
            - name: https
              containerPort: 443
              protocol: TCP
          livenessProbe:
            failureThreshold: 3
            httpGet:
              path: /healthz
              port: 10254
              scheme: HTTP
            initialDelaySeconds: 10
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          readinessProbe:
            failureThreshold: 3
            httpGet:
              path: /healthz
              port: 10254
              scheme: HTTP
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          lifecycle:
            preStop:
              exec:
                command:
                  - /wait-shutdown

---

apiVersion: v1
kind: LimitRange
metadata:
  name: ingress-nginx
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
spec:
  limits:
  - default:
    min:
      memory: 90Mi
      cpu: 100m
    type: Container
复制代码

提示:以上清单主要定义了一个名称ingress-nginx的名称空间,在其名称空间下创建了几个configmap,最重要的是用deployment创建了一个ingress-nginx pod;

这里说一下,对于ingress-nginx控制器,它本质还是运行为一个pod,对于pod来说要想接入外部访问流量到集群内部来,有三种方式,一种是使用NodePort类型的service;第二种是使用ds或deploy控制器,在定义pod模板时使用hostPort把pod端口映射到宿主机方式;第三种是定义pod模板时使用hostNetwork,直接共享宿主机网络名称空间;如下所示

使用专有NodePort service来引入外部流量

提示:这种使用deploy控制管理ingress controller pod,如果在pod模板中没有暴露端口,则需要创建一个service资源来暴露ingress controller pod的端口来引入外部流量到集群内部;

使用ds控制器管理ingress controller pod在pod模板中使用hostPort方式暴露端口

提示:使用ds控制器能够保证每个节点上只运行一个ingress controller,所以我们可以把对应ingress controller pod端端口通过端口映射的方式映射到宿主机上的某一固定端口;

使用ds控制器在pod模板中使用hostNetwork方式共享宿主机网络名称空间

提示:共享宿主机网络名称空间,也必须使用ds控制器来确保对应每个节点上只能运行一个ingress controller pod,这样才能确保每个ingress controller pod能够正常把端口暴露出去,以供集群外部客户端访问;

选择上述其中一种方式暴露ingress controller pod的端口即可;如果选择使用ds控制器来暴露端口,我们就需要修改其对应资源配置清单中的pod模板,如下所示

使用ds控制器来管理ingress controller pod在pod模板中使用hostPort方式暴露端口

复制代码
apiVersion: v1
kind: Namespace
metadata:
  name: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---

kind: ConfigMap
apiVersion: v1
metadata:
  name: nginx-configuration
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
  name: tcp-services
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
  name: udp-services
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nginx-ingress-serviceaccount
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: nginx-ingress-clusterrole
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
rules:
  - apiGroups:
      - ""
    resources:
      - configmaps
      - endpoints
      - nodes
      - pods
      - secrets
    verbs:
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - nodes
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      - services
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - events
    verbs:
      - create
      - patch
  - apiGroups:
      - "extensions"
      - "networking.k8s.io"
    resources:
      - ingresses
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - "extensions"
      - "networking.k8s.io"
    resources:
      - ingresses/status
    verbs:
      - update

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
  name: nginx-ingress-role
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
rules:
  - apiGroups:
      - ""
    resources:
      - configmaps
      - pods
      - secrets
      - namespaces
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      - configmaps
    resourceNames:
      # Defaults to "<election-id>-<ingress-class>"
      # Here: "<ingress-controller-leader>-<nginx>"
      # This has to be adapted if you change either parameter
      # when launching the nginx-ingress-controller.
      - "ingress-controller-leader-nginx"
    verbs:
      - get
      - update
  - apiGroups:
      - ""
    resources:
      - configmaps
    verbs:
      - create
  - apiGroups:
      - ""
    resources:
      - endpoints
    verbs:
      - get

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
  name: nginx-ingress-role-nisa-binding
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: nginx-ingress-role
subjects:
  - kind: ServiceAccount
    name: nginx-ingress-serviceaccount
    namespace: ingress-nginx

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: nginx-ingress-clusterrole-nisa-binding
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: nginx-ingress-clusterrole
subjects:
  - kind: ServiceAccount
    name: nginx-ingress-serviceaccount
    namespace: ingress-nginx

---

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: nginx-ingress-controller
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: ingress-nginx
      app.kubernetes.io/part-of: ingress-nginx
  template:
    metadata:
      labels:
        app.kubernetes.io/name: ingress-nginx
        app.kubernetes.io/part-of: ingress-nginx
      annotations:
        prometheus.io/port: "10254"
        prometheus.io/scrape: "true"
    spec:
      # wait up to five minutes for the drain of connections
      terminationGracePeriodSeconds: 300
      serviceAccountName: nginx-ingress-serviceaccount
      nodeSelector:
        kubernetes.io/os: linux
      containers:
        - name: nginx-ingress-controller
          image: quay.io/kubernetes-ingress-controller/nginx-ingress-controller:0.28.0
          args:
            - /nginx-ingress-controller
            - --configmap=$(POD_NAMESPACE)/nginx-configuration
            - --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
            - --udp-services-configmap=$(POD_NAMESPACE)/udp-services
            - --publish-service=$(POD_NAMESPACE)/ingress-nginx
            - --annotations-prefix=nginx.ingress.kubernetes.io
          securityContext:
            allowPrivilegeEscalation: true
            capabilities:
              drop:
                - ALL
              add:
                - NET_BIND_SERVICE
            # www-data -> 101
            runAsUser: 101
          env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: POD_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          ports:
            - name: http
              containerPort: 80
              hostPort: 30080
              protocol: TCP
            - name: https
              containerPort: 443
              hostPort: 30443
              protocol: TCP
          livenessProbe:
            failureThreshold: 3
            httpGet:
              path: /healthz
              port: 10254
              scheme: HTTP
            initialDelaySeconds: 10
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          readinessProbe:
            failureThreshold: 3
            httpGet:
              path: /healthz
              port: 10254
              scheme: HTTP
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          lifecycle:
            preStop:
              exec:
                command:
                  - /wait-shutdown

---

apiVersion: v1
kind: LimitRange
metadata:
  name: ingress-nginx
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
spec:
  limits:
  - default:
    min:
      memory: 90Mi
      cpu: 100m
    type: Container
复制代码

提示:只需把对应控制器类型更改为DaemonSet,在pod模板中spec字段下把replicas去掉;在spec.template.spec.containers.ports字段中加上nodePort字段指定要把容器的端口映射到宿主机上某个端口;如果暴露的端口是非标准端口,在对应k8s集群外部我们还需要部署反代,比如使用nginx,haproxy,lvs;

使用ds控制器管理ingress controller pod在ds控制器资源配置中使用hostNetwork

复制代码
apiVersion: v1
kind: Namespace
metadata:
  name: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---

kind: ConfigMap
apiVersion: v1
metadata:
  name: nginx-configuration
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
  name: tcp-services
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
  name: udp-services
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nginx-ingress-serviceaccount
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: nginx-ingress-clusterrole
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
rules:
  - apiGroups:
      - ""
    resources:
      - configmaps
      - endpoints
      - nodes
      - pods
      - secrets
    verbs:
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - nodes
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      - services
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ""
    resources:
      - events
    verbs:
      - create
      - patch
  - apiGroups:
      - "extensions"
      - "networking.k8s.io"
    resources:
      - ingresses
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - "extensions"
      - "networking.k8s.io"
    resources:
      - ingresses/status
    verbs:
      - update

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
  name: nginx-ingress-role
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
rules:
  - apiGroups:
      - ""
    resources:
      - configmaps
      - pods
      - secrets
      - namespaces
    verbs:
      - get
  - apiGroups:
      - ""
    resources:
      - configmaps
    resourceNames:
      # Defaults to "<election-id>-<ingress-class>"
      # Here: "<ingress-controller-leader>-<nginx>"
      # This has to be adapted if you change either parameter
      # when launching the nginx-ingress-controller.
      - "ingress-controller-leader-nginx"
    verbs:
      - get
      - update
  - apiGroups:
      - ""
    resources:
      - configmaps
    verbs:
      - create
  - apiGroups:
      - ""
    resources:
      - endpoints
    verbs:
      - get

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
  name: nginx-ingress-role-nisa-binding
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: nginx-ingress-role
subjects:
  - kind: ServiceAccount
    name: nginx-ingress-serviceaccount
    namespace: ingress-nginx

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: nginx-ingress-clusterrole-nisa-binding
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: nginx-ingress-clusterrole
subjects:
  - kind: ServiceAccount
    name: nginx-ingress-serviceaccount
    namespace: ingress-nginx

---

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: nginx-ingress-controller
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: ingress-nginx
      app.kubernetes.io/part-of: ingress-nginx
  template:
    metadata:
      labels:
        app.kubernetes.io/name: ingress-nginx
        app.kubernetes.io/part-of: ingress-nginx
      annotations:
        prometheus.io/port: "10254"
        prometheus.io/scrape: "true"
    spec:
      # wait up to five minutes for the drain of connections
      terminationGracePeriodSeconds: 300
      serviceAccountName: nginx-ingress-serviceaccount
      nodeSelector:
        kubernetes.io/os: linux
      hostNetwork: true
      containers:
        - name: nginx-ingress-controller
          image: quay.io/kubernetes-ingress-controller/nginx-ingress-controller:0.28.0
          args:
            - /nginx-ingress-controller
            - --configmap=$(POD_NAMESPACE)/nginx-configuration
            - --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
            - --udp-services-configmap=$(POD_NAMESPACE)/udp-services
            - --publish-service=$(POD_NAMESPACE)/ingress-nginx
            - --annotations-prefix=nginx.ingress.kubernetes.io
          securityContext:
            allowPrivilegeEscalation: true
            capabilities:
              drop:
                - ALL
              add:
                - NET_BIND_SERVICE
            # www-data -> 101
            runAsUser: 101
          env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: POD_NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          ports:
            - name: http
              containerPort: 80
              protocol: TCP
            - name: https
              containerPort: 443
              protocol: TCP
          livenessProbe:
            failureThreshold: 3
            httpGet:
              path: /healthz
              port: 10254
              scheme: HTTP
            initialDelaySeconds: 10
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          readinessProbe:
            failureThreshold: 3
            httpGet:
              path: /healthz
              port: 10254
              scheme: HTTP
            periodSeconds: 10
            successThreshold: 1
            timeoutSeconds: 10
          lifecycle:
            preStop:
              exec:
                command:
                  - /wait-shutdown

---

apiVersion: v1
kind: LimitRange
metadata:
  name: ingress-nginx
  namespace: ingress-nginx
  labels:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
spec:
  limits:
  - default:
    min:
      memory: 90Mi
      cpu: 100m
    type: Container
复制代码

提示:把对应控制器类型更改外DaemonSet,在pod模板中spec字段下的replicas字段去掉;在spec.template.spec字段下加上hostNetwork: true即可;以上两种使用ds控制器管理ingress controller pod也可以使用node选择器,来筛选在某个节点上创建ingress controller pod;

使用deploy控制器管理ingress controller pod,就直接应用mandatory.yaml即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@master01 ~]# kubectl apply -f mandatory.yaml
namespace/ingress-nginx created
configmap/nginx-configuration created
configmap/tcp-services created
configmap/udp-services created
serviceaccount/nginx-ingress-serviceaccount created
Warning: rbac.authorization.k8s.io/v1beta1 ClusterRole is deprecated in v1.17+, unavailable in v1.22+; use rbac.authorization.k8s.io/v1 ClusterRole
clusterrole.rbac.authorization.k8s.io/nginx-ingress-clusterrole created
Warning: rbac.authorization.k8s.io/v1beta1 Role is deprecated in v1.17+, unavailable in v1.22+; use rbac.authorization.k8s.io/v1 Role
role.rbac.authorization.k8s.io/nginx-ingress-role created
Warning: rbac.authorization.k8s.io/v1beta1 RoleBinding is deprecated in v1.17+, unavailable in v1.22+; use rbac.authorization.k8s.io/v1 RoleBinding
rolebinding.rbac.authorization.k8s.io/nginx-ingress-role-nisa-binding created
Warning: rbac.authorization.k8s.io/v1beta1 ClusterRoleBinding is deprecated in v1.17+, unavailable in v1.22+; use rbac.authorization.k8s.io/v1 ClusterRoleBinding
clusterrolebinding.rbac.authorization.k8s.io/nginx-ingress-clusterrole-nisa-binding created
deployment.apps/nginx-ingress-controller created
limitrange/ingress-nginx created
[root@master01 ~]#

查看应用资源清单创建的资源对象

1
2
3
4
5
6
7
8
9
10
[root@master01 ~]# kubectl get all -n ingress-nginx
NAME                                            READY   STATUS    RESTARTS   AGE
pod/nginx-ingress-controller-5466cb8999-4lsjc   1/1     Running   0          80s
NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/nginx-ingress-controller   1/1     1            1           80s
NAME                                                  DESIRED   CURRENT   READY   AGE
replicaset.apps/nginx-ingress-controller-5466cb8999   1         1         1       80s
[root@master01 ~]#

提示:可以看到在ingress-nginx名称空间下创建了一个deploy控制器,对应控制器创建了一个nginx-ingress-controller控制器pod;但是此pod现在不能被外部客户端访问到,我们需要创建一个service来引入外部流量到此pod上;

查看pod标签

1
2
3
4
[root@master01 ~]# kubectl get pod -n ingress-nginx --show-labels
NAME                                        READY   STATUS    RESTARTS   AGE     LABELS
nginx-ingress-controller-5466cb8999-4lsjc   1/1     Running   0          4m38s   app.kubernetes.io/name=ingress-nginx,app.kubernetes.io/part-of=ingress-nginx,pod-template-hash=5466cb8999
[root@master01 ~]#

根据上述标签来写一个创建ingress-service资源的配置清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@master01 ~]# cat ingress-nginx-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: ingress-nginx-svc
  namespace: ingress-nginx
spec:
  type: NodePort
  ports:
    - port: 80
      name: http
      nodePort: 30080
    - port: 443
      name: https
      nodePort: 30443
  selector:
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/part-of: ingress-nginx
[root@master01 ~]#

提示:以上配置清单主要把满足对应标签选择器的pod关联起来;并把对应pod的80和443端口分别映射到对应主机上的30080和30443端口;

应用配置清单

1
2
3
4
5
6
[root@master01 ~]# kubectl apply -f ingress-nginx-service.yaml
service/ingress-nginx-svc created
[root@master01 ~]# kubectl get svc -n ingress-nginx
NAME                TYPE       CLUSTER-IP    EXTERNAL-IP   PORT(S)                      AGE
ingress-nginx-svc   NodePort   10.98.4.208   <none>        80:30080/TCP,443:30443/TCP   13s
[root@master01 ~]#

访问集群任意节点ip的30080和30443端口,看看是否访问到对应pod?

提示:30080是能够正常访问的,只是它显示404,是因为我们没有对应的主页;

访问30443端口

提示:30443是一个https端口,所以访问必须用https协议访问,这里提示访问页面有风险是因为浏览器不信任证书引起的,我们可以点击高级,信任证书即可;同样30443端口也是返回404,是因为没有主页的原因;两个端口能够正常访问,说明我们在k8s上部署的ingress-nginx controller就部署好了;

ingress资源的使用

在k8s上创建一个deploy控制器,让其管理2个 ikubernetes/myapp:v1镜像运行的pod,然后再创建一个对应的service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[root@master01 manifests]# cat myapp-demo.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: myapp
  namespace: default
spec:
  replicas: 2
  selector:
    matchLabels:
      app: myapp
      rel: stable
  template:
    metadata:
      namespace: default
      labels:
        app: myapp
        rel: stable
    spec:
      containers:
      - name: myapp
        image: ikubernetes/myapp:v1
---
apiVersion: v1
kind: Service
metadata:
  name: myapp
  namespace: default
spec:
  selector:
    app: myapp
    rel: stable
  ports:
  - name: http
    port: 80
    targetPort: 80
[root@master01 manifests]#

提示:一个清单中定义多个资源,需要用“—”来分割资源;

应用资源清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@master01 manifests]# kubectl apply -f myapp-demo.yaml
deployment.apps/myapp created
service/myapp created
[root@master01 manifests]# kubectl get pod -o wide
NAME                     READY   STATUS    RESTARTS   AGE   IP            NODE             NOMINATED NODE   READINESS GATES
myapp-6479b786f5-9d4mh   1/1     Running   0          11s   10.244.2.98   node02.k8s.org   <none>           <none>
myapp-6479b786f5-k252c   1/1     Running   0          11s   10.244.4.20   node04.k8s.org   <none>           <none>
[root@master01 manifests]# kubectl get svc
NAME         TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)   AGE
kubernetes   ClusterIP   10.96.0.1        <none>        443/TCP   4h52m
myapp        ClusterIP   10.105.208.218   <none>        80/TCP    21s
[root@master01 manifests]# kubectl describe svc myapp
Name:              myapp
Namespace:         default
Labels:            <none>
Annotations:       <none>
Selector:          app=myapp,rel=stable
Type:              ClusterIP
IP Families:       <none>
IP:                10.105.208.218
IPs:               10.105.208.218
Port:              http  80/TCP
TargetPort:        80/TCP
Endpoints:         10.244.2.98:80,10.244.4.20:80
Session Affinity:  None
Events:            <none>
[root@master01 manifests]#

创建ingress资源来反代以上资源

示例:创建ingress资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@master01 manifests]# cat ingress-myapp.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: ingress-myapp
  namespace: default
  annotations:
    kubernetes.io/ingress.class: "nginx"
spec:
  rules:
  - host: www.myapp.com
    http:
      paths:
      - path: /
        backend:
          serviceName: myapp
          servicePort: 80
[root@master01 manifests]#

提示:创建ingress资源apiVersion的值要写成extensions/v1beta1,kind为Ingress;对应metadata中的annotations的配置表示把ingress资源通知给那个类别的ingress controller,如果k8s集群上有多个类别的ingress controller时,这一项特别有用;在spec字段主要内嵌了三个字段,rules字段用来定义反代规则列表,其值为一个对象列表;其中rules字段里主要host和http字段;host用来指定虚拟主机的fqdn名称,如果不写表示匹配任意虚拟主机名称;http是用来定义指向后端的http选择器列表;其值为一个对象,里面只有一个paths字段,用于指定把请求映射到后端的某个路径;其值为一个对象列表;对应paths字段中可以定义path,用来指定映射后端的路径;backend用于指定后端pod的service,其值为一个对象;serviceName用于指定对应pod的service名称;servicePort用于指定后端服务的端口;以上配置表示把www.myapp.com这个虚拟主机的访问全部反代至服务名称为myapp端口为80的pod上;

应用配置清单

1
2
3
4
5
6
7
[root@master01 manifests]# kubectl apply -f ingress-myapp.yaml
Warning: extensions/v1beta1 Ingress is deprecated in v1.14+, unavailable in v1.22+; use networking.k8s.io/v1 Ingress
ingress.extensions/ingress-myapp created
[root@master01 manifests]# kubectl get ingress
NAME            CLASS    HOSTS           ADDRESS   PORTS   AGE
ingress-myapp   <none>   www.myapp.com             80      29s
[root@master01 manifests]#

查看ingress资源的详细信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@master01 manifests]# kubectl describe ingress ingress-myapp
Name:             ingress-myapp
Namespace:        default
Address:         
Default backend:  default-http-backend:80 (<error: endpoints "default-http-backend" not found>)
Rules:
  Host           Path  Backends
  ----           ----  --------
  www.myapp.com 
                 /   myapp:80 (10.244.2.98:80,10.244.4.20:80)
Annotations:     kubernetes.io/ingress.class: nginx
Events:
  Type    Reason  Age   From                      Message
  ----    ------  ----  ----                      -------
  Normal  CREATE  81s   nginx-ingress-controller  Ingress default/ingress-myapp
[root@master01 manifests]#

提示:可以看到对应满足service名称为myapp并且其端口为80的pod有两个;

进入ingress controller pod里,看看对应配置文件是否有www.myapp.com的配置?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@master01 manifests]# kubectl get pods -n ingress-nginx
NAME                                        READY   STATUS    RESTARTS   AGE
nginx-ingress-controller-5466cb8999-4lsjc   1/1     Running   0          78m
[root@master01 manifests]# kubectl exec -it -n ingress-nginx pod/nginx-ingress-controller-5466cb8999-4lsjc -- /bin/sh
/etc/nginx cd /etc/nginx/
/etc/nginx ls
fastcgi.conf            koi-utf                 modsecurity             owasp-modsecurity-crs   uwsgi_params.default
fastcgi.conf.default    koi-win                 modules                 scgi_params             win-utf
fastcgi_params          lua                     nginx.conf              scgi_params.default
fastcgi_params.default  mime.types              nginx.conf.default      template
geoip                   mime.types.default      opentracing.json        uwsgi_params
/etc/nginx grep "www.myapp.com" nginx.conf
        ## start server www.myapp.com
                server_name www.myapp.com ;
        ## end server www.myapp.com
/etc/nginx $

提示:可以看到在对应ingress-nginx 控制器pod中能够搜索到www.myapp.com的配置;说明我们定义的ingress资源已经被ingress-nginx controller 捕获;

用浏览器访问www.myapp.com看看是否能够访问到内容?

提示:使用www.myapp.com访问,需要确保对应域名能够正常解析到k8s集群任意一节点上;可以看到访问www.myapp.com:30080能够访问到对应pod内容;

删除ingress代理规则

1
2
3
4
5
6
[root@master01 manifests]# kubectl delete -f ingress-myapp.yaml
Warning: extensions/v1beta1 Ingress is deprecated in v1.14+, unavailable in v1.22+; use networking.k8s.io/v1 Ingress
ingress.extensions "ingress-myapp" deleted
[root@master01 manifests]# kubectl get ingress
No resources found in default namespace.
[root@master01 manifests]#

示例:配置基于url路径进行流量分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@master01 manifests]# cat ingress-myapp1.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: ingress-myapp
  namespace: default
  annotations:
    kubernetes.io/ingress.class: "nginx"
    ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
  - host: www.myapp.com
    http:
      paths:
      - path: /bbs
        backend:
          serviceName: myapp
          servicePort: 80
      - path: /blog
        backend:
          serviceName: myapp
          servicePort: 80
[root@master01 manifests]#

提示:以上配置表示把www.myapp.com/bbs反代到service名称为myapp并且端口为80的pod上;把www.myapp.com/blog反代到ervice名称为myapp并且端口为80的pod上;我这里是因为k8s上只有这一种应用,生成环境中按照对应的业务逻辑来反代对应url到对应pod上即可;

应用配置清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[root@master01 manifests]# kubectl apply -f ingress-myapp1.yaml
Warning: extensions/v1beta1 Ingress is deprecated in v1.14+, unavailable in v1.22+; use networking.k8s.io/v1 Ingress
ingress.extensions/ingress-myapp created
[root@master01 manifests]# kubectl get ingress
NAME            CLASS    HOSTS           ADDRESS   PORTS   AGE
ingress-myapp   <none>   www.myapp.com             80      5s
[root@master01 manifests]# kubectl describe ingress ingress-myapp
Name:             ingress-myapp
Namespace:        default
Address:         
Default backend:  default-http-backend:80 (<error: endpoints "default-http-backend" not found>)
Rules:
  Host           Path  Backends
  ----           ----  --------
  www.myapp.com 
                 /bbs    myapp:80 (10.244.2.98:80,10.244.4.20:80)
                 /blog   myapp:80 (10.244.2.98:80,10.244.4.20:80)
Annotations:     ingress.kubernetes.io/rewrite-target: /
                 kubernetes.io/ingress.class: nginx
Events:
  Type    Reason  Age   From                      Message
  ----    ------  ----  ----                      -------
  Normal  CREATE  30s   nginx-ingress-controller  Ingress default/ingress-myapp
[root@master01 manifests]#

提示:可以看到对应ingress上就有两个url分别指向后端service名称为myapp端口为80的pod上;

访问对应url,看看是否访问到内容?

提示:这里访问不到内容的原因是对应pod内部并没有对应url的页面;

进入ingress controller pod内部,查看是否有对应配置?

提示:可以看到对应在ingress中定义的配置,都转为对应该ingress controller pod中的配置,说明我们定义基于url分发流量的ingress没有问题;

示例:定义ingress规则基于主机名称的虚拟主机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[root@master01 manifests]# cat ingress-myapp2.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: ingress-myapp
  namespace: default
  annotations:
    kubernetes.io/ingress.class: "nginx"
    ingress.kubernetes.io/rewrite-target: /
spec:
  rules:
  - host: www.myapp.com
    http:
      paths:
      - path:
        backend:
          serviceName: myapp
          servicePort: 80
  - host: blog.myapp.com
    http:
      paths:
      - path:
        backend:
          serviceName: myapp
          servicePort: 80
[root@master01 manifests]#

提示:以上配置表示把www.myapp.com这个虚拟主机名称的访问流量分发至service名称为myapp端口为80的pod上;把blog.myapp.com的流量分发至至service名称为myapp端口为80的pod上;生成环境按照对应的service名称来分发即可;

应用配置清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[root@master01 manifests]# kubectl apply -f ingress-myapp2.yaml
Warning: extensions/v1beta1 Ingress is deprecated in v1.14+, unavailable in v1.22+; use networking.k8s.io/v1 Ingress
ingress.extensions/ingress-myapp created
[root@master01 manifests]# kubectl get ingress
NAME            CLASS    HOSTS                          ADDRESS   PORTS   AGE
ingress-myapp   <none>   www.myapp.com,blog.myapp.com             80      16s
[root@master01 manifests]# kubectl describe ingress ingress-myapp
Name:             ingress-myapp
Namespace:        default
Address:         
Default backend:  default-http-backend:80 (<error: endpoints "default-http-backend" not found>)
Rules:
  Host            Path  Backends
  ----            ----  --------
  www.myapp.com  
                     myapp:80 (10.244.2.98:80,10.244.4.20:80)
  blog.myapp.com 
                     myapp:80 (10.244.2.98:80,10.244.4.20:80)
Annotations:      ingress.kubernetes.io/rewrite-target: /
                  kubernetes.io/ingress.class: nginx
Events:
  Type    Reason  Age   From                      Message
  ----    ------  ----  ----                      -------
  Normal  CREATE  32s   nginx-ingress-controller  Ingress default/ingress-myapp
[root@master01 manifests]#

验证配置信息

访问对应虚拟主机,看看是否能够访问对应pod?

提示:可以看到两个虚拟主机名称都可以正常访问到,对应也做了调度;

示例:创建tls类型的ingress资源

创建证书

1
2
3
4
5
6
7
[root@master01 manifests]# openssl genrsa -out tls.key 2048
Generating RSA private key, 2048 bit long modulus
.........................................+++
........+++
e is 65537 (0x10001)
[root@master01 manifests]# openssl req -x509 -key tls.key -out tls.crt -subj /C=CN/ST=SiChuan/L=GuangYuan/O=Test/CN=www.myapp.com -days 3650 
[root@master01 manifests]#

提示:以上两条命令创建了一个名为tls.key的私钥和一个自签名证书,其名为tls.crt;

创建Secret资源

1
2
3
4
5
6
7
[root@master01 manifests]# kubectl create secret tls www-myapp-com-ingress-secret --cert=./tls.crt --key=./tls.key
secret/www-myapp-com-ingress-secret created
[root@master01 manifests]# kubectl get secret
NAME                           TYPE                                  DATA   AGE
default-token-xvd4c            kubernetes.io/service-account-token   3      13d
www-myapp-com-ingress-secret   kubernetes.io/tls                     2      21s
[root@master01 manifests]#

提示:在ingress控制器上配置https主机时,不能直接使用私钥和证书文件,而是需要使用secret资源对象来传递相关数据;

定义tls类型ingress资源清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[root@master01 manifests]# cat www-myapp-com-ingress-secret.yaml
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: ingress-myapp-tls
  namespace: default
  annotations:
    kubernetes.io/ingress.class: "nginx"
spec:
  tls:
  - hosts:
    - www.myapp.com
    secretName: www-myapp-com-ingress-secret
  rules:
  - host: www.myapp.com
    http:
      paths:
      - path: /
        backend:
          serviceName: myapp
          servicePort: 80
[root@master01 manifests]#

提示:定义tls类型ingress资源清单,需要在spec字段下用tls字段来指定对应主机名称,以及secret资源对象的名称;

应用资源清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[root@master01 manifests]# kubectl apply -f www-myapp-com-ingress-secret.yaml
Warning: extensions/v1beta1 Ingress is deprecated in v1.14+, unavailable in v1.22+; use networking.k8s.io/v1 Ingress
ingress.extensions/ingress-myapp-tls created
[root@master01 manifests]# kubectl get ingress
NAME                CLASS    HOSTS                          ADDRESS   PORTS     AGE
ingress-myapp       <none>   www.myapp.com,blog.myapp.com             80        31m
ingress-myapp-tls   <none>   www.myapp.com                            80, 443   8s
[root@master01 manifests]# kubectl describe ingress ingress-myapp-tls
Name:             ingress-myapp-tls
Namespace:        default
Address:         
Default backend:  default-http-backend:80 (<error: endpoints "default-http-backend" not found>)
TLS:
  www-myapp-com-ingress-secret terminates www.myapp.com
Rules:
  Host           Path  Backends
  ----           ----  --------
  www.myapp.com 
                 /   myapp:80 (10.244.2.98:80,10.244.4.20:80)
Annotations:     kubernetes.io/ingress.class: nginx
Events:
  Type    Reason  Age   From                      Message
  ----    ------  ----  ----                      -------
  Normal  CREATE  26s   nginx-ingress-controller  Ingress default/ingress-myapp-tls
[root@master01 manifests]#

验证:访问对应虚拟主机名称,看看对应的https端口是否能够正常访问到内容?

提示:可以看到使用https协议访问对应的30443端口能够正常访问到对应后端pod提供的内容;

C#使用RabbitMQ(转) - 廖先生 - 博客园

mikel阅读(885)

来源: C#使用RabbitMQ(转) – 廖先生 – 博客园

1. 说明

在企业应用系统领域,会面对不同系统之间的通信、集成与整合,尤其当面临异构系统时,这种分布式的调用与通信变得越发重要。其次,系统中一般会有很多对实时性要求不高的但是执行起来比较较耗时的地方,比如发送短信,邮件提醒,更新文章阅读计数,记录用户操作日志等等,如果实时处理的话,在用户访问量比较大的情况下,对系统压力比较大。

面对这些问题,我们一般会将这些请求,放在消息队列MQ中处理;异构系统之间使用消息进行通讯。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

消息传递相较文件传递与远程过程调用(RPC)而言,似乎更胜一筹,因为它具有更好的平台无关性,并能够很好地支持并发与异步调用。所以如果系统中出现了如下情况:

  • 对操作的实时性要求不高,而需要执行的任务极为耗时;
  • 存在异构系统间的整合;

一般的可以考虑引入消息队列。对于第一种情况,常常会选择消息队列来处理执行时间较长的任务。引入的消息队列就成了消息处理的缓冲区。消息队列引入的异步通信机制,使得发送方和接收方都不用等待对方返回成功消息,就可以继续执行下面的代码,从而提高了数据处理的能力。尤其是当访问量和数据流量较大的情况下,就可以结合消息队列与后台任务,通过避开高峰期对大数据进行处理,就可以有效降低数据库处理数据的负荷。

本文简单介绍在RabbitMQ这一消息代理工具,以及在.NET中如何使用RabbitMQ.

2. 搭建环境

2.1 安装Erlang语言运行环境

由于RabbitMQ使用Erlang语言编写,所以先安装Erlang语言运行环境。具体移步博客:windows配置Erlang环境

2.2 安装RabbitMQ服务端

地址 http://www.rabbitmq.com/

下载安装。

使RabbitMQ以Windows Service的方式在后台运行:打开cmd切换到sbin目录下执行

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

现在RabbitMQ的服务端已经启动起来了。

要查看和控制RabbitMQ服务端的状态,可以用rabbitmqctl这个脚本。

比如查看状态:

rabbitmqctl status

假如显示node没有连接上,需要到C:\Windows目录下,将.erlang.cookie文件,拷贝到用户目录下 C:\Users\{用户名},这是Erlang的Cookie文件,允许与Erlang进行交互。

使用命令查看用户:

rabbitmqctl list_users

RabbitMQ会为我们创建默认的用户名guest和密码guest,guest默认拥有RabbitMQ的所有权限。

一般的,我们需要新建一个我们自己的用户,设置密码,并授予权限,并将其设置为管理员,可以使用下面的命令来执行这一操作:

rabbitmqctl  add_user  JC JayChou   //创建用户JC密码为JayChou
rabbitmqctl  set_permissions  JC ".*"  ".*"  ".*"    //赋予JC读写所有消息队列的权限
rabbitmqctl  set_user_tags JC administrator    //分配用户组

修改JC密码为123:

rabbitmqctl change_password JC  123

删除用户JC:

rabbitmqctl delete_user  JC

也可以开启rabbitmq_management插件,在web界面查看和管理RabbitMQ服务

rabbitmq-plugins enable rabbitmq_management

 

2.3下载RabbitMQ的Client端dll

下载地址:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

本人下载了这个 rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip

解压,我们需要的是这个文件,以后会引用到vs的项目中:

3.使用

3.1在使用RabitMQ之前,先对几个概念做一下说明

 

RabbitMQ是一个消息代理。他从消息生产者(producers)那里接收消息,然后把消息送给消息消费者(consumer)在发送和接受之间,他能够根据设置的规则进行路由,缓存和持久化。

一般提到RabbitMQ和消息,都用到一些专有名词。

  • 生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用”P”来表示:

producer

  • 队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们只能存储在队列(queue)中。 队列(queue)容量没有限制,你要存储多少消息都可以——基本上是一个无限的缓冲区。多个生产者(producers)能够把消息发送给同一个队列,同样,多个消费者(consumers)也能从同一个队列(queue)中获取数据。队列可以画成这样(图上是队列的名称):

queue

  • 消费(Consuming)和获取消息是一样的意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它画作”C”:

consumer

通常,消息生产者,消息消费者和消息代理不在同一台机器上。

3.2 Hello Word

下面来展示简单的RabbitMQ的使用:

rabbitmq hello world

3.2.1 首先创建名为ProjectSend的控制台项目,需要引用RabbitMQ.Client.dll。这个程序作为Producer生产者,用来发送数据:

复制代码
static void Main(string[] args)
    {
        var factory = new ConnectionFactory();
        factory.HostName = "localhost";//RabbitMQ服务在本地运行
        factory.UserName = "guest";//用户名
        factory.Password = "guest";//密码

        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("hello", false, false, false, null);//创建一个名称为hello的消息队列
                string message = "Hello World"; //传递的消息内容
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("", "hello", null, body); //开始传递
                Console.WriteLine("已发送: {0}", message);
          Console.ReadLine();
            }
        }
    }
复制代码

 

首先,需要创建一个ConnectionFactory,设置目标,由于是在本机,所以设置为localhost,如果RabbitMQ不在本机,只需要设置目标机器的IP地址或者机器名称即可,然后设置前面创建的用户名和密码。

紧接着要创建一个Channel,如果要发送消息,需要创建一个队列,然后将消息发布到这个队列中。在创建队列的时候,只有RabbitMQ上该队列不存在,才会去创建。消息是以二进制数组的形式传输的,所以如果消息是实体对象的话,需要序列化和然后转化为二进制数组。

现在客户端发送代码已经写好了,运行之后,消息会发布到RabbitMQ的消息队列中,现在需要编写服务端的代码连接到RabbitMQ上去获取这些消息。

3.2.2创建名为ProjectReceive的控制台项目,引用RabbitMQ.Client.dll。作为Consumer消费者,用来接收数据:

复制代码
static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("hello", false, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume("hello", false, consumer);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body); 
                        Console.WriteLine("已接收: {0}", message);   
                    };
                    Console.ReadLine(); 
                }
            }
        }
复制代码

和发送一样,首先需要定义连接,然后声明消息队列。要接收消息,需要定义一个Consume,然后在接收消息的事件中处理数据。

3.2.3 现在发送和接收的客户端都写好了,让我们编译执行起来

发送消息:

现在,名为hello的消息队列中,发送了一条消息。这条消息存储到了RabbitMQ的服务器上了。使用rabbitmqctl 的list_queues可以查看所有的消息队列,以及里面的消息个数,可以看到,目前Rabbitmq上只有一个消息队列,里面只有一条消息:

也可以在web管理界面查看此queue的相关信息:

 

接收消息:

既然消息已经被接收了,那我们再来看queue的内容:

可见,消息中的内容在接收之后已被删除了。

3.3 工作队列

前面的例子展示了如何在指定的消息队列发送和接收消息。

现在我们创建一个工作队列(work queue)来将一些耗时的任务分发给多个工作者(workers):

rabbitmq-work queue

工作队列(work queues, 又称任务队列Task Queues)的主要思想是为了避免立即执行并等待一些占用大量资源、时间的操作完成。而是把任务(Task)当作消息发送到队列中,稍后处理。一个运行在后台的工作者(worker)进程就会取出任务然后处理。当运行多个工作者(workers)时,任务会在它们之间共享。

这个在网络应用中非常有用,它可以在短暂的HTTP请求中处理一些复杂的任务。在一些实时性要求不太高的地方,我们可以处理完主要操作之后,以消息的方式来处理其他的不紧要的操作,比如写日志等等。

准备

在第一部分,发送了一个包含“Hello World!”的字符串消息。现在发送一些字符串,把这些字符串当作复杂的任务。这里使用time.sleep()函数来模拟耗时的任务。在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比如”Hello…”就会耗时3秒钟。

对之前示例的send.cs做些简单的调整,以便可以发送随意的消息。这个程序会按照计划发送任务到我们的工作队列中。

复制代码
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2;

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "hello", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
复制代码

 

接着我们修改接收端,让他根据消息中的逗点的个数来Sleep对应的秒数:

复制代码
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("hello", true, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);
                        
                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");
            }
        }
    }
}
复制代码

 

轮询分发

使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。

现在,我们先启动两个接收端,等待接受消息,然后启动一个发送端开始发送消息。

Send message queue

在cmd条件下,发送了5条消息,每条消息后面的逗点表示该消息需要执行的时长,来模拟耗时的操作。

然后可以看到,两个接收端依次接收到了发出的消息:

receive message queue

默认,RabbitMQ会将每个消息按照顺序依次分发给下一个消费者。所以每个消费者接收到的消息个数大致是平均的。 这种消息分发的方式称之为轮询(round-robin)。

3.4 消息响应

当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在当前的代码中,当RabbitMQ将消息发送给消费者(consumers)之后,马上就会将该消息从队列中移除。此时,如果把处理这个消息的工作者(worker)停掉,正在处理的这条消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望该消息会重新发送给其他的工作者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。

如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

消息响应默认是开启的。在之前的例子中使用了no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

复制代码
channel.BasicConsume("hello", false, consumer);

while (true)
{
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine("Received {0}", message);
    Console.WriteLine("Done");

    channel.BasicAck(ea.DeliveryTag, false);
}
复制代码

 

现在,可以保证,即使正在处理消息的工作者被停掉,这些消息也不会丢失,所有没有被应答的消息会被重新发送给其他工作者.

一个很常见的错误就是忘掉了BasicAck这个方法,这个错误很常见,但是后果很严重. 当客户端退出时,待处理的消息就会被重新分发,但是RabitMQ会消耗越来越多的内存,因为这些没有被应答的消息不能够被释放。调试这种case,可以使用rabbitmqct打印messages_unacknoledged字段。

rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

 

3.5 消息持久化

前面已经搞定了即使消费者down掉,任务也不会丢失,但是,如果RabbitMQ Server停掉了,那么这些消息还是会丢失。

当RabbitMQ Server 关闭或者崩溃,那么里面存储的队列和消息默认是不会保存下来的。如果要让RabbitMQ保存住消息,需要在两个地方同时设置:需要保证队列和消息都是持久化的。

首先,要保证RabbitMQ不会丢失队列,所以要做如下设置:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

 

虽然在语法上是正确的,但是在目前阶段是不正确的,因为我们之前已经定义了一个非持久化的hello队列。RabbitMQ不允许我们使用不同的参数重新定义一个已经存在的同名队列,如果这样做就会报错。现在,定义另外一个不同名称的队列:

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

 

queueDeclare 这个改动需要在发送端和接收端同时设置。

现在保证了task_queue这个消息队列即使在RabbitMQ Server重启之后,队列也不会丢失。 然后需要保证消息也是持久化的, 这可以通过设置IBasicProperties.SetPersistent 为true来实现:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

 

需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms

3.6 公平分发

你可能会注意到,消息的分发可能并没有如我们想要的那样公平分配。比如,对于两个工作者。当奇数个消息的任务比较重,但是偶数个消息任务比较轻时,奇数个工作者始终处理忙碌状态,而偶数个工作者始终处理空闲状态。但是RabbitMQ并不知道这些,他仍然会平均依次的分发消息。

为了改变这一状态,我们可以使用basicQos方法,设置perfetchCount=1 。这样就告诉RabbitMQ 不要在同一时间给一个工作者发送多于1个的消息,或者换句话说。在一个工作者还在处理消息,并且没有响应消息之前,不要给他分发新的消息。相反,将这条新的消息发送给下一个不那么忙碌的工作者。

channel.BasicQos(0, 1, false);

 

3.7 完整实例

现在将所有这些放在一起:

发送端代码如下:

复制代码
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
                   
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
                    
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.SetPersistent(true);
                  

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "task_queue", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
复制代码

 

接收端代码如下:

复制代码
static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
            channel.BasicQos(0, 1, false);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("task_queue", false, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");

                channel.BasicAck(ea.DeliveryTag, false);
            }
        }
    }
}
复制代码

 

4 管理界面

RabbitMQ管理界面,通过该界面可以查看RabbitMQ Server 当前的状态,该界面是以插件形式提供的,并且在安装RabbitMQ的时候已经自带了该插件。需要做的是在RabbitMQ控制台界面中启用该插件,命令如下:

rabbitmq-plugins enable rabbitmq_management

rabbitmq management

现在,在浏览器中输入 http://server-name:15672/ server-name换成机器地址或者域名,如果是本地的,直接用localhost(RabbitMQ 3.0之前版本端口号为55672)在输入之后,弹出登录界面,使用我们之前创建的用户登录。

RabbitMQ Web management .

在该界面上可以看到当前RabbitMQServer的所有状态。

5 总结

本文简单介绍了消息队列的相关概念,并介绍了RabbitMQ消息代理的基本原理以及在Windows 上如何安装RabbitMQ和在.NET中如何使用RabbitMQ。消息队列在构建分布式系统和提高系统的可扩展性和响应性方面有着很重要的作用,希望本文对您了解消息队列以及如何使用RabbitMQ有所帮助。