RabbitMQ学习系列(五): RPC 远程过程调用 - 章为忠 - 博客园

mikel阅读(785)

来源: RabbitMQ学习系列(五): RPC 远程过程调用 – 章为忠 – 博客园

前面讲过一些RabbitMQ的安装和用法,也说了说RabbitMQ在一般的业务场景下如何使用。不知道的可以看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.html

不过,最近有朋友问我,RabbitMQ RPC 是干嘛的,有什么用。

其实,RabbitMQ RPC 就是通过消息队列(Message Queue)来实现rpc的功能,就是,客户端向服务端发送定义好的Queue消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue。

1.RabbitMQ RPC的特点

  • Message Queue把所有的请求消息存储起来,然后处理,和客户端解耦。
  • Message Queue引入新的结点,系统的可靠性会受Message Queue结点的影响。
  • Message Queue是异步单向的消息。发送消息设计成是不需要等待消息处理的完成。

所以对于有同步返回需求,Message Queue是个不错的方向。

2.普通PRC的特点

  • 同步调用,对于要等待返回结果/处理结果的场景,RPC是可以非常自然直觉的使用方式。当然RPC也可以是异步调用。
  • 由于等待结果,客户端会有线程消耗。

如果以异步RPC的方式使用,客户端线程消耗可以去掉。但不能做到像消息一样暂存消息请求,压力会直接传导到服务端。

3.适用场合说明

  • 希望同步得到结果的场合,RPC合适。
  • 希望使用简单,则RPC;RPC操作基于接口,使用简单,使用方式模拟本地调用。异步的方式编程比较复杂。
  • 不希望客户端受限于服务端的速度等,可以使用Message Queue。

4.RabbitMQ RPC工作流程:

 

基本概念:

Callback queue 回调队列客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。

Correlation id 关联标识客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

流程说明

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用

 5.完整代码:

  1. 创建两个控制台程序,作为RPC Server和RPC Client, 引用 RabbitMQ.Client,

  2. RPC Server

复制代码
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
                channel.BasicQos(0, 1, false);
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: "rpc_queue",
                                     noAck: false,
                                     consumer: consumer);
                Console.WriteLine(" [x] Awaiting RPC requests");

                while (true)
                {
                    string response = null;
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "",
                                             routingKey: props.ReplyTo,
                                             basicProperties: replyProps,
                                             body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                                         multiple: false);
                    }
                }
            }
        }

        /// <summary>
        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers,
        /// and it's probably the slowest recursive implementation possible.
        /// </summary>
        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }

            Thread.Sleep(1000 * 10);

            return n;
        }
    }
复制代码

 

3. RPC Client

复制代码
    class Program
    {
        static void Main(string[] args)
        {
            for (int i = 0; i < 10; i++)
            {
                Stopwatch watch = new Stopwatch();

                watch.Start();

                var rpcClient = new RPCClient();

                Console.WriteLine(string.Format(" [x] Requesting fib({0})", i));

                var response = rpcClient.Call(i.ToString());

                Console.WriteLine(" [.] Got '{0}'", response);

                rpcClient.Close();

                watch.Stop();

                Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
            }

            Console.WriteLine(" complete!!!! ");


            Console.ReadLine();
        }
    }

    class RPCClient
    {
        private IConnection connection;
        private IModel channel;
        private string replyQueueName;
        private QueueingBasicConsumer consumer;

        public RPCClient()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue: replyQueueName,
                                 noAck: true,
                                 consumer: consumer);
        }

        public string Call(string message)
        {
            var corrId = Guid.NewGuid().ToString();
            var props = channel.CreateBasicProperties();
            props.ReplyTo = replyQueueName;
            props.CorrelationId = corrId;

            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "",
                                 routingKey: "rpc_queue",
                                 basicProperties: props,
                                 body: messageBytes);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                if (ea.BasicProperties.CorrelationId == corrId)
                {
                    return Encoding.UTF8.GetString(ea.Body);
                }
            }
        }

        public void Close()
        {
            connection.Close();
        }
    }
复制代码

4.分别运行Server和Client

 

6.最后

1.参照RabbitMQ官方教程的RPC,地址:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

2.本文源代码下载,http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar

3.博客原地址:http://fpeach.com/post/2016/12/01/RabbitMQ%E5%AD%A6%E4%B9%A0%E7%B3%BB%E5%88%97%EF%BC%88%E4%BA%94%EF%BC%89-RPC-%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8.aspx

RabbitMQ学习系列(四): 几种Exchange 模式 - 章为忠 - 博客园

mikel阅读(963)

来源: RabbitMQ学习系列(四): 几种Exchange 模式 – 章为忠 – 博客园

上一篇,讲了RabbitMQ的具体用法,可以看看这篇文章:RabbitMQ学习系列(三): C# 如何使用 RabbitMQ。今天说些理论的东西,Exchange 的几种模式。

 

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

 

RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header 。 header模式在实际使用中较少,本文只对前三种模式进行比较。

 

一. Fanout Exchange

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。

Fanout Exchange  不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。

所以,Fanout Exchange 转发消息是最快的。

复制代码
     /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="change"></param>
        private static void ProducerMessage(MyMessage msg)
        {
            var advancedBus = CreateAdvancedBus();

            if (advancedBus.IsConnected)
            {
                var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);

                advancedBus.Publish(exchange, "", false, new Message<MyMessage>(msg));
            }
            else
            {
                Console.WriteLine("Can't connect");
            }

        }

        /// <summary>
        /// 消费者
        /// </summary>
        private static void ConsumeMessage()
        {
            var advancedBus = CreateAdvancedBus();
            var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);

            var queue = advancedBus.QueueDeclare("user.notice.wangwu");
            advancedBus.Bind(exchange, queue, "user.notice.wangwu");
            advancedBus.Consume(queue, registration =>
            {
                registration.Add<MyMessage>((message, info) => { Console.WriteLine("Body: {0}", message.Body); });
            });
        }
复制代码

 

复制代码
public static IAdvancedBus CreateAdvancedBus()
{
    // 消息服务器连接字符串
    string connString = "host=dev.corp.wingoht.com:5672;virtualHost=cd;username=ishowfun;password=123456";
    if (connString == null || connString == string.Empty)
    {
        throw new Exception("messageserver connection string is missing or empty");
    }

    return RabbitHutch.CreateBus(connString).Advanced;
}
复制代码

 

 

  二. Direct Exchange

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。

Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

复制代码
     /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="change"></param>
        private static void ProducerMessage(MyMessage msg)
        {
            var advancedBus = CreateAdvancedBus();

            if (advancedBus.IsConnected)
            {
                var queue = advancedBus.QueueDeclare("user.notice.zhangsan");

                advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message<MyMessage>(msg));
            }
            else
            {
                Console.WriteLine("Can't connect");
            }

        }

        /// <summary>
        /// 消费者
        /// </summary>
        private static void ConsumeMessage()
        {
            var advancedBus = CreateAdvancedBus();

            var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Direct);

            var queue = advancedBus.QueueDeclare("user.notice.lisi");

            advancedBus.Bind(exchange, queue, "user.notice.lisi");

            advancedBus.Consume(queue, registration =>
            {
                registration.Add<MyMessage>((message, info) =>
                {
                    Console.WriteLine("Body: {0}", message.Body);
                });
            });
        }
复制代码

 

三. Topic Exchange

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,

Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。

所以,Topic Exchange 使用非常灵活。

复制代码
     /// <summary>
        /// 生产者
        /// </summary>
        /// <param name="change"></param>
        private static void ProducerMessage(MyMessage msg)
        {
            //// 创建消息bus
            IBus bus = CreateBus();

            try
            {
                bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
            }
            catch (EasyNetQException ex)
            {
                //处理连接消息服务器异常 
            }

            bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
        }

        /// <summary>
        /// 消费者
        /// </summary>
        private static void ConsumeMessage(MyMessage msg)
        {
            //// 创建消息bus
            IBus bus = CreateBus();

            try
            {
                bus.Subscribe<MyMessage>(msg.MessageRouter, message => Console.WriteLine(msg.MessageBody), x => x.WithTopic("user.notice.#"));
            }
            catch (EasyNetQException ex)
            {
                //处理连接消息服务器异常 
            }
        }
复制代码

 

这个是RabbitMQ 的实际使用的几个场景,熟悉了这个,基本上rabbitmq 也就了解了。http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

 

至此,Rabbitmq 几种Exchange 模式已经介绍完了,实际使用过程中,我们会更具不同的场景,来使用不同的exchange 模式。

查看RabbitMQ 系列其他文章,http://www.cnblogs.com/zhangweizhong/category/855479.html

RabbitMQ学习系列(三): C# 如何使用 RabbitMQ - 章为忠 - 博客园

mikel阅读(1513)

来源: RabbitMQ学习系列(三): C# 如何使用 RabbitMQ – 章为忠 – 博客园

上一篇已经讲了Rabbitmq如何在Windows平台安装,还不了解如何安装的朋友,请看我前面几篇文章:RabbitMQ学习系列一:windows下安装RabbitMQ服务 , 今天就来聊聊 C# 实际开发的过程中,怎么调用 用RabbitMQ。

 一、客户端

RabbitMQ.Client 是rabbitmq 官方提供的的客户端,net 版本地址 :http://www.rabbitmq.com/dotnet.html

EasyNetQ 是基于RabbitMQ.Client 基础上封装的开源客户端。使用非常方便。地址:http://easynetq.com/ 。 本篇所使用示例代码下载地址:  demo示例下载 。

RabbitMQ 还有很多其他客户端API,都非常的好用。我们在一边,一直用的都是 EasyNetQ,所以这里的 demo 只介绍 EasyNetQ 客户端实现。其他的客户端,大家自己去研究吧。

 

二、项目结构

说明:前面我们提到过,RabbitMQ由 Producer(生成者) 和 Consumer(消费者) 两部分组成。Weiz.Consumer 就是Consumer(消费者),Weiz. Producer 为 Producer(生成者),Weiz.MQ 为消息队列的通用处理类库。

三、项目搭建

1. Weiz.MQ 项目,消息队列的通用处理类库,用于正在的订阅和发布消息。

1. 通过nuget安装项目EasyNetQ 相关组件, (略)

2. 增加BusBuilder.cs管道创建类,主要负责链接Rabbitmq。

复制代码
using System;
using System.Configuration;
using EasyNetQ;

namespace Weiz.MQ
{
    /// <summary>
    /// 消息服务器连接器
    /// </summary>
    public class BusBuilder
    {
        public static IBus CreateMessageBus()
        {
            // 消息服务器连接字符串
            // var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"];
            string connString = "host=192.168.98.107:5672;virtualHost=OrderQueue;username=zhangweizhong;password=weizhong1988";
            if (connString == null || connString == string.Empty)
            {
                throw new Exception("messageserver connection string is missing or empty");
            }
            
            return RabbitHutch.CreateBus(connString);
        }
    }
}
复制代码

 

3. 增加IProcessMessage类,定义了一个消息方法,用于消息传递

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Weiz.MQ
{
    public interface IProcessMessage
    {
        void ProcessMsg(Message msg);
    }
}
复制代码

 

4. 增加Message类,定义了消息传递的实体属性字段等信息

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Weiz.MQ
{
    public class Message
    {
        public string MessageID { get; set; }
        
        public string MessageTitle { get; set; }

        public string MessageBody { get; set; }

        public string MessageRouter { get; set; }
    }
}
复制代码

 

5. 增加MQHelper类,用于正在的订阅和发布消息。

 

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Configuration;

using EasyNetQ;

namespace Weiz.MQ
{
    public class MQHelper
    {
        /// <summary>
        /// 发送消息
        /// </summary>
        public static void Publish(Message msg)
        {
            //// 创建消息bus
            IBus bus = BusBuilder.CreateMessageBus();

            try
            {
                bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
            }
            catch (EasyNetQException ex)
            {
                //处理连接消息服务器异常 
            }

            bus.Dispose();//与数据库connection类似,使用后记得销毁bus对象
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="msg"></param>
        public static void Subscribe(Message msg, IProcessMessage ipro)
        {
            //// 创建消息bus
            IBus bus = BusBuilder.CreateMessageBus();

            try
            {
                bus.Subscribe<Message>(msg.MessageRouter, message => ipro.ProcessMsg(message), x => x.WithTopic(msg.MessageRouter));

            }
            catch (EasyNetQException ex)
            {
                //处理连接消息服务器异常 
            }
        }
    }
}
复制代码

 

 

    2. RabbitMQ由 Producer(生成者)

1. 创建一个aspx 页面,增加如下代码

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

using Weiz.MQ;

namespace Weiz.Producer
{
    public partial class TestMQ : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {

        }

        protected void Button1_Click(object sender, EventArgs e)
        {
            Message msg = new Message();
            msg.MessageID = "1";
            msg.MessageBody = DateTime.Now.ToString();
            msg.MessageTitle = "1";
            msg.MessageRouter = "pcm.notice.zhangsan";
            MQHelper.Publish(msg);

        }
    }
}
复制代码

 

3. Weiz.Consumer 就是Consumer(消费者)

1 . 新增OrderProcessMessage.cs

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;

namespace Weiz.Consumer
{
    public class OrderProcessMessage:MQ.IProcessMessage
    {
        public void ProcessMsg(MQ.Message msg)
        {
            Console.WriteLine(msg.MessageBody);
        }
    }
}
复制代码

2. Program 增加如下代码

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Weiz.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            OrderProcessMessage order = new OrderProcessMessage();
            MQ.Message msg = new MQ.Message();
            msg.MessageID = "1";
            msg.MessageRouter = "pcm.notice.zhangsan";

            MQ.MQHelper.Subscribe(msg, order);
        }
    }
}
复制代码

 

  四、运行

1. 启动 Weiz.Consumer (消费者),启动消费者,会自动在RabbitMQ 服务器上创建相关的exchange 和 queue 。

 

 

Consumer 消费者,使用的是Subscribe (订阅)的模式,所以,Weiz.Consumer客户端启动后,会自动创建connection,生成相关的exchange 和queue。

2. 启动Weiz. Producer 里的TestMQ.aspx 页面,往队列里面写一条消息。订阅的消费者立马就能拿到这条消息。

 

 

至此,C#向Rabbitmq消息队列发送消息已经简单完成。

查看RabbitMQ 系列其他文章,http://www.cnblogs.com/zhangweizhong/category/855479.html

RabbitMQ学习系列(二): RabbitMQ安装与配置 - 章为忠 - 博客园

mikel阅读(964)

来源: RabbitMQ学习系列(二): RabbitMQ安装与配置 – 章为忠 – 博客园

  上一篇,简单介绍了RabbitMQ的情况还有一些相关的概念,这一篇,会讲讲 RabbitMQ安装与配置。

  1.安装

    Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装RabbitMQ之前要先安装Erlang。

erlang:http://www.erlang.org/download.html

rabbitmq:http://www.rabbitmq.com/download.html

注意:

1.现在先别装最新的 3.6.3 ,本人在安装完最新的版本,queue 队列有问题,降到了 3.6.2 就解决了。

2.默认安装的Rabbit MQ 监听端口是:5672

  2.配置

1. 安装完以后erlang需要手动设置ERLANG_HOME 的系统变量。

输入:set ERLANG_HOME=C:\Program Files\erl8.0

 

2.激活Rabbit MQ’s Management Plugin

使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态,你可以在命令行中使用下面的命令激活。

输入:rabbitmq-plugins.bat  enable  rabbitmq_management

 

同时,我们也使用rabbitmqctl控制台命令(位于 rabbitmq_server-3.6.3\sbin>)来创建用户,密码,绑定权限等。

3.创建管理用户

输入:rabbitmqctl.bat add_user zhangweizhong weizhong1988

 

4. 设置管理员

输入:rabbitmqctl.bat set_user_tags zhangweizhong administrator

 

5.设置权限

输入:rabbitmqctl.bat set_permissions -p / zhangweizhong “.*” “.*” “.*”

 

6. 其他命令

a. 查询用户: rabbitmqctl.bat list_users

b. 查询vhosts: rabbitmqctl.bat list_vhosts

c. 启动RabbitMQ服务: net stop RabbitMQ && net start RabbitMQ

 

以上这些,账号、vhost、权限、作用域等基本就设置完了。

 

  3Rabbit MQ管理后台

使用浏览器打开http://localhost:15672 访问Rabbit Mq的管理控制台,使用刚才创建的账号登陆系统即可。

Rabbit MQ 管理后台,可以更好的可视化方式查看RabbitMQ服务器实例的状态。

4. 创建vhosts

1. 创建vhosts,  在admin页面,点击右侧Virtual Hosts ,

 

2. 将刚创建的OrderQueue分配给相关用户。

3. 其他创建exchange ,queue 大家自己在后台创建吧,这里不再赘述。

 

好了,RabbitMQ安装与配置就写到这里,后续写C# 程序如何进行连同 rabbitmq 进行 发布、订阅等消息队列操作demo。

查看RabbitMQ 系列其他文章,http://www.cnblogs.com/zhangweizhong/category/855479.html

RabbitMQ学习系列(一): 介绍 - 章为忠 - 博客园

mikel阅读(915)

来源: RabbitMQ学习系列(一): 介绍 – 章为忠 – 博客园

  1. 介绍

      RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue )协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。

      RabbitMQ的官网:http://www.rabbitmq.com

  2. AMQP

    AMQP,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,同样,消息使用者也不用知道发送者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

  3. 系统架构

       

  消息队列的使用过程大概如下:

    (1)客户端连接到消息队列服务器,打开一个channel。

    (2)客户端声明一个exchange,并设置相关属性。

    (3)客户端声明一个queue,并设置相关属性。

    (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

    (5) 客户端投递消息到exchange。exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

    如下图所示:AMQP 里主要要说两个组件:Exchange 和 Queue

绿色的 X 就是 Exchange ,红色的是 Queue ,这两者都在 Server 端,又称作 Broker ,

这部分是 RabbitMQ 实现的,而蓝色的则是客户端,通常有 Producer 和 Consumer 两种类型。

 

  4. 几个概念

    P: 为Producer,数据的发送方。

    C:为Consumer,数据的接收方。

    Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

    Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

    Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

    Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

    channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

   PS: RabbitMQ 的一些基本的概念,就介绍完了,虽然都是些概念,但是了解他的一些原理,还是很重要的,特别是exchange 和 路由的概念和作用。接下来会具体介绍他的安装和使用。

   查看RabbitMQ 系列其他文章,http://www.cnblogs.com/zhangweizhong/category/855479.html

我是如何用 Redis 做实时订阅推送的? - 简书

mikel阅读(1246)

来源: 我是如何用 Redis 做实时订阅推送的? – 简书

前阵子开发了公司领劵中心的项目,这个项目是以redis作为关键技术落地的。

先说一下领劵中心的项目吧,这个项目就类似京东app的领劵中心,当然图是截取京东的,公司的就不截了。。。

image

其中有一个功能叫做领劵的订阅推送。什么是领劵的订阅推送?就是用户订阅了该劵的推送,在可领取前的一分钟就要把提醒信息推送到用户的app中。本来这个订阅功能应该是消息中心那边做的,但他们说这个短时间内做不了。所以让我这个负责优惠劵的做了-.-!。具体方案就是到具体的推送时间点了,coupon系统调用消息中心的推送接口,把信息推送出去。

下面我们分析一下这个功能的业务情景。公司目前注册用户6000W+,是哪家就不要打听了。。。比如有一张无门槛的优惠劵下单立减20元,那么抢这张劵的人就会比较多,我们保守估计10W+,百万级别不好说。我们初定为20W万人,那么这20W条推送信息要在一分钟推送完成!并且一个用户是可以订阅多张劵的。所以我们知道了这个订阅功能的有两个突出的难点:

1、推送的实效性:推送慢了,用户会抱怨没有及时通知他们错过了开抢时机。

2、推送的体量大:爆款的神劵,人人都想抢!

然而推送体量又会影响到推送的实效性。这真是一个让人头疼的问题!

那就让我们把问题一个个解决掉吧!

推送的实效性的问题:当用户在领劵中心订阅了某个劵的领取提醒后,在后台就会生成一条用户的订阅提醒记录,里面记录了在哪个时间点给用户发送推送信息。所以问题就变成了系统如何快速实时选出哪些要推送的记录!

方案1:MQ的延迟投递。MQ虽然支持消息的延迟投递但尺度太大1s 5s 10s 30s 1m,用来做精确时间点投递不行!并且用户执行订阅之后又取消订阅的话,要把发出去的MQ消息delete掉这个操作有点头大,短时间内难以落地!并且用户可以取消之后再订阅,这又涉及到去重的问题。所以MQ的方案否掉。

方案2:传统定时任务。这个相对来说就简单一点,用定时任务是去db里面load用户的订阅提醒记录,从中选出当前可以推送的记录。但有句话说得好任何脱离实际业务的设计都是耍流氓~。下面我们就分析一下传统的定时任务到底适不适合我们的这个业务!

image

综上所述我们就知道了一般传统的定时任务存在以下缺点:

1、性能瓶颈。只有一台机在处理,在大体量数据面前力不从心!

2、实效性差。定时任务的频率不能太高,太高会业务数据库造成很大的压力!

3、单点故障。万一跑的那台机挂了,那整个业务不可用了-。- 这是一个很可怕的事情!

所以传统定时任务也不太适合这个业务。。。

那我们是不是就束手无策了呢?其实不是的! 我们只要对传统的定时任务做一个简单的改造!就可以把它变成可以同时多机跑,并且实效性可以精确到秒级,并且拒绝单点故障的定时任务集群!这其中就要借助我们的强大的redis了。

方案3:定时任务集群

首先我们要定义定时任务集群要解决的三个问题!

1、实效性要高

2、吞吐量要大

3、服务要稳定,不能有单点故障

下面是整个定时任务集群的架构图。

image

架构很简单:我们把用户的订阅推送记录存储到redis集群的sortedSet队列里面,并且以提醒用户提醒时间戳作为score值,然后在我们个每业务server里面起一个定时器频率是秒级,我的设定就是1s,然后经过负载均衡之后从某个队列里面获取要推送的用户记录进行推送。下面我们分析以下这个架构

1、性能:除去带宽等其它因素,基本与机器数成线性相关。机器数量越多吞吐量越大,机器数量少时相对的吞吐量就减少。

2、实效性:提高到了秒级,效果还可以接受。

3、单点故障?不存在的!除非redis集群或者所有server全挂了。。。。

这里解析一下为什么用redis?

第一redis 可以作为一个高性能的存储db,性能要比MySQL好很多,并且支持持久化,稳定性好。

第二redis SortedSet队列天然支持以时间作为条件排序,完美满足我们选出要推送的记录。

ok~既然方案已经有了那如何在一天时间内把这个方案落地呢?是的我设计出这个方案到基本编码完成,时间就是一天。。。因为时间太赶鸟。

首先我们以user_id作为key,然后mod队列数hash到redis SortedSet队列里面。为什么要这样呢,因为如果用户同时订阅了两张劵并且推送时间很近,这样的两条推送就可以合并成一条~,并且这样hash也相对均匀。下面是部分代码的截图:

image

然后要决定队列的数量,一般正常来说我们有多少台处理的服务器就定义多少条队列。因为队列太少,会造成队列竞争,太多可能会导致记录得不到及时处理。

然而最佳实践是队列数量应该是可动态配置化的,因为线上的集群机器数是会经常变的。大促的时候我们会加机器是不是,并且业务量增长了,机器数也是会增加是不是~。所以我是借用了淘宝的diamond进行队列数的动态配置。

image

我们每次从队列里面取多少条记录也是可以动态配置的

image

这样就可以随时根据实际的生产情况调整整个集群的吞吐量~。 所以我们的定时任务集群还是具有一个特性就是支持动态调整~。

最后一个关键组件就是负载均衡了。这个是非常重要的!因为这个做得不好就会可能导致多台机竞争同时处理一个队列,影响整个集群的效率!在时间很紧的情况下我就用了一个简单实用的利用redis一个自增key 然后 mod 队列数量算法。这样就很大程度上就保证不会有两台机器同时去竞争一条队列~.

image

最后我们算一下整个集群的吞吐量

10(机器数) * 2000(一次拉取数) = 20000。然后以MQ的形式把消息推送到消息中心,发MQ是异步的,算上其它处理0.5s。

其实发送20W的推送也就是10几s的事情。

ok~ 到这里我们整个定时任务集群就差不多基本落地好了。如果你问我后面还有什么可以完善的话那就是:

1、加监控, 集群怎么可以木有监控呢,万一出问题有任务堆积怎么办~

2、加上可视化界面。

3、最好有智能调度,增加任务优先级。优先级高的任务先运行嘛。

4、资源调度,万一机器数量不够,力不从心,优先保证重要任务执行。

目前项目已上前线,运行平稳~。


以上,便是今天的分享,希望大家喜欢。

作者:夜空_2cd3
链接:https://www.jianshu.com/p/7ad2d539f010
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(七)如果让你设计一个MQ,你怎么设计 - 简书

mikel阅读(822)

来源: 关于MQ的几件小事(七)如果让你设计一个MQ,你怎么设计 – 简书

其实回答这类问题,说白了,起码不求你看过那技术的源码,起码你大概知道那个技术的基本原理,核心组成部分,基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好

比如说这个消息队列系统,我们来从以下几个角度来考虑一下

(1)首先这个mq得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下kafka的设计理念,broker -> topic -> partition,每个partition放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给topic增加partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?

(2)其次你得考虑一下这个mq的数据要不要落地磁盘吧?那肯定要了,落磁盘,才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是kafka的思路。

(3)其次你考虑一下你的mq的可用性啊?这个事儿,具体参考我们之前可用性那个环节讲解的kafka的高可用保障机制。多副本 -> leader & follower -> broker挂了重新选举leader即可对外服务。

(4)能不能支持数据0丢失啊?可以的,参考我们之前说的那个kafka数据零丢失方案

其实一个mq肯定是很复杂的,其实这是个开放题,就是看看你有没有从架构角度整体构思和设计的思维以及能力。

如果你还不清楚,请参考前面几篇
消息队列的用途、优缺点、技术选型
如何保证消息队列的高可用
如何保证消息不重复消费
如何防止数据队列数据丢失
如何保证消息按顺序执行
消息积压在消息队列里怎么办

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/08ef2219411f
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(五)如何保证消息按顺序执行 - 简书

mikel阅读(993)

来源: 关于MQ的几件小事(五)如何保证消息按顺序执行 – 简书

1.为什么要保证顺序

消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。举例:
比如通过mySQL binlog进行两个数据库的数据同步,由于对数据库的数据操作是具有顺序性的,如果操作顺序搞反,就会造成不可估量的错误。比如数据库对一条数据依次进行了 插入->更新->删除操作,这个顺序必须是这样,如果在同步过程中,消息的顺序变成了 删除->插入->更新,那么原本应该被删除的数据,就没有被删除,造成数据的不一致问题。

2.出现顺序错乱的场景

(1)rabbitmq
①一个queue,有多个consumer去消费,这样就会造成顺序的错误,consumer从MQ里面读取数据是有序的,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。

rabbitmq消息顺序错乱第一种情况示意图.png

②一个queue对应一个consumer,但是consumer里面进行了多线程消费,这样也会造成消息消费顺序错误。

abbitmq消息顺序错乱第二种情况示意图.png

(2)kafka
①kafka一个topic,一个partition,一个consumer,但是consumer内部进行多线程消费,这样数据也会出现顺序错乱问题。

kafka消息顺序错乱第一种情况示意图.png

②具有顺序的数据写入到了不同的partition里面,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。

kafka消息顺序错乱第二种情况示意图..png

3.保证消息的消费顺序

(1)rabbitmq
①拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。

一个queue对应一个consumer

②或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理

一个queue对应一个consumer,采用多线程.png

(2)kafka
①确保同一个消息发送到同一个partition,一个topic,一个partition,一个consumer,内部单线程消费。

单线程保证顺序.png

②写N个内存queue,然后N个线程分别消费一个内存queue即可

多线程保证顺序.png

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/02fdcb9e8784
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(一)消息队列的用途、优缺点、技术选型 - 简书

mikel阅读(856)

来源: 关于MQ的几件小事(一)消息队列的用途、优缺点、技术选型 – 简书

1.为什么使用消息队列?

(1)解耦:可以在多个系统之间进行解耦,将原本通过网络之间的调用的方式改为使用MQ进行消息的异步通讯,只要该操作不是需要同步的,就可以改为使用MQ进行不同系统之间的联系,这样项目之间不会存在耦合,系统之间不会产生太大的影响,就算一个系统挂了,也只是消息挤压在MQ里面没人进行消费而已,不会对其他的系统产生影响。

不使用MQ的情况.png
使用MQ进行解耦之后.png

(2)异步:加入一个操作设计到好几个步骤,这些步骤之间不需要同步完成,比如客户去创建了一个订单,还要去客户轨迹系统添加一条轨迹、去库存系统更新库存、去客户系统修改客户的状态等等。这样如果这个系统都直接进行调用,那么将会产生大量的时间,这样对于客户是无法接收的;并且像添加客户轨迹这种操作是不需要去同步操作的,如果使用MQ将客户创建订单时,将后面的轨迹、库存、状态等信息的更新全都放到MQ里面然后去异步操作,这样就可加快系统的访问速度,提供更好的客户体验。

不使用MQ情况.png
使用MQ进行异步之后.png

(3)削峰:一个系统访问流量有高峰时期,也有低峰时期,比如说,中午整点有一个抢购活动等等。比如系统平时流量并不高,一秒钟只有100多个并发请求,系统处理没有任何压力,一切风平浪静,到了某个抢购活动时间,系统并发访问了剧增,比如达到了每秒5000个并发请求,而我们的系统每秒只能处理2000个请求,那么由于流量太大,我们的系统、数据库可能就会崩溃。这时如果使用MQ进行流量削峰,将用户的大量消息直接放到MQ里面,然后我们的系统去按自己的最大消费能力去消费这些消息,就可以保证系统的稳定,只是可能要跟进业务逻辑,给用户返回特定页面或者稍后通过其他方式通知其结果。

使用MQ进行削峰.png

2.消息队列有什么优点和缺点?

优点:1、对结构复杂、设计系统多的操作进行解耦操作,降低系统的操作复杂度、降低系统的维护成本。
2、对一个可以进行异步操作的一些系统操作进行异步,减小操作的响应时间,提供更好的用户体验。
3、可对高流量进行削峰,保证系统的平稳运行。
缺点:1、系统可用性降低。比如在系统中引入MQ,那么万一MQ挂了怎么办呢?一般而言,引入的外部依赖越多,系统越
脆弱,每一个依赖出问题都会导致整个系统的崩溃。
2、系统复杂度提高。需要考虑MQ的各种情况,比如:消息的重复消费、消息丢失、保证消费顺序等等……
3、数据一致性问题。比如A系统已经给客户返回操作成功,这时候操作BC都成功了,操作D却失败了,导致数据不
一致。

3.kafka、activemq、rabbitmq、rocketmq都有什么优点和缺点啊?

特性 ActiveMQ RabbitMQ RocketMQ kafka
单机吞吐量 万级,吞吐量比RocketMQ和kafka要低一个数量级 万级,吞吐量比RocketMQ和kafka要低一个数量级 10万级,RocketMQ也是可以支撑高吞吐的一种MQ 10万级别,kafka最大优点就是吞吐量大,一般配合大数据类的系统来进行实时数据计算、日志采集等场景。
topic数量对吞吐量的影响 topic可以达到几百、几千个的级别,吞吐量会有小幅度的下降。这是RocketMQ的一大优势,可在同等数量机器下支撑大量的topic topic从几十个到几百个的时候,吞吐量会大幅下降。所以在同等机器数量下,kafka尽量保证topic数量不要过多。如果支撑大规模topic需要增加更多的机器
时效性 ms级 微秒级,这是rabbitmq的一大特点,延迟是最低的 ms级 延迟在ms级以内
可用性 高,基于主从架构实现可用性 高,基于主从架构实现可用性 非常高,分布式架构 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 经过参数优化配置,可以做到0丢失 经过参数配置,消息可以做到零丢失
功能支持 MQ领域的功能及其完备 基于erlang开发,所以并发性能极强,性能极好,延时低 MQ功能较为完备,分布式扩展性好 功能较为简单,主要支持加单MQ功能
优势 非常成熟,功能强大,在业内大量公司和项目中都有应用 erlang语言开发,性能极好、延时很低,吞吐量万级、MQ功能完备,管理界面非常好,社区活跃;互联网公司使用较多 接口简单易用,阿里出品有保障,吞吐量大,分布式扩展方便、社区比较活跃,支持大规模的topic、支持复杂的业务场景,可以基于源码进行定制开发 超高吞吐量,ms级的时延,极高的可用性和可靠性,分布式扩展方便
劣势 偶尔有较低概率丢失消息,社区活跃度不高 吞吐量较低,erlang语音开发不容易进行定制开发,集群动态扩展麻烦 接口不是按照标准JMS规范走的,有的系统迁移要修改大量的代码,技术有被抛弃的风险 有可能进行消息的重复消费
应用 主要用于解耦和异步,较少用在大规模吞吐的场景中 都有使用 用于大规模吞吐、复杂业务中 在大数据的实时计算和日志采集中被大规模使用,是业界的标准

综上所述,总结如下:
一般业务系统要引入MQ,最早大家都用ActiveMQ,但现在用的不多了。没有经过大规模吞吐场景的验证,社区也不活跃,不推荐再使用。
后来大家开始用rabbitMQ,但是它是使用erlang语言开发的,如果不精通erlang,对公司而言,几乎处于不可控的状态,单其是开源的,社区活跃度高,拥有比较稳定的支持。
现在越来越多的公司开始使用RocketMQ,但是要小心被抛弃的风险。如果公司有实力自己去维护开发,推荐使用。否则还是选择RabbitMQ。
如果实在大数据的实时计算、日志采集等领域,用kafka是业界标准。

所以,对于中小型公司,技术实力一般的,应该用rabbitmq,对于大公司,基础架构研发能力强大的,推荐使用RocketMQ。

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/fdd94be6037a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

关于MQ的几件小事(二)如何保证消息队列的高可用 - 简书

mikel阅读(941)

来源: 关于MQ的几件小事(二)如何保证消息队列的高可用 – 简书

1.RabbitMQ的高可用

RabbitMQ基于主从模式实现高可用。RabbitMQ有三种模式:单机模式,普通集群模式,镜像集群模式。
(1)单机模式:
单机模式就是demo级别的,生产中不会有人使用。
(2)普通集群模式
普通集群模式就是在多台机器上启动多个rabbitmq实例,每个机器启动一个。但是创建的queue只会放在一个rabbitmq实例上面,但是其他的实例都同步了这个queue的元数据。在你消费的时候,如果连接到了另一个实例,他会从拥有queue的那个实例获取消息然后再返回给你。

普通集群模式示意图.png

这种方式并没有做到所谓消息的高可用,就是个普通的集群,这样还会导致要么消费者每次随机连接一个实例然后拉取数据,这样的话在实例之间会产生网络传输,增加系统开销,要么固定连接那个queue所在的实例消费,这样会导致单实例的性能瓶颈。

而且如果那个方queue的实例宕机了,会导致接下来其他实例都无法拉取数据;如果没有开启消息的持久化会丢失消息;就算开启了消息的持久化,消息不一定会丢,但是也要等这个实例恢复了,才可以继续拉取数据。
所以这个并没有提供高可用,这种方案只是提高了吞吐量,也就是让集群中多个节点来服务某个queue的读写操作。
(3)镜像集群模式
这种模式,才是rabbitmq提供是真正的高可用模式,跟普通集群不一样的是,你创建的queue,无论元数据还是queue里面是消息数据都存在多个实例当中,然后每次写消息到queue的时候,都会自动把消息到多个queue里进行消息同步。

镜像集群模式示意图.png

这种模式的好处在于,任何一台机器宕机了,其他的机器还可以使用。
坏处在于:1、性能消耗太大,所有机器都要进行消息的同步,导致网络压力和消耗很大。2、没有扩展性可言,如果有一个queue负载很重,就算加了机器,新增的机器还是包含了这个queue的所有数据,并没有办法扩展queue。
如何开启镜像集群模式:在控制台新增一个镜像集群模式的策略,指定的时候可以要求数据同步到所有节点,也可以要求同步到指定节点,然后在创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上面去了。 

2.kafka的高可用

(1)kafka的一个基本架构:多个broker组成,一个broker是一个节点;你创建一个topic,这个topic可以划分成多个partition,每个partition可以存在于不同的broker上面,每个partition存放一部分数据。这是天然的分布式消息队列。

实际上rabbitmq并不是分布式消息队列,他就是传统的消息队列,只不过提供了一些集群、HA的机制而已,因为无论如何配置,rabbitmq一个queue的数据就存放在一个节点里面,镜像集群下,也是每个节点都放这个queue的全部数据。

kafka在0.8以前是没有HA机制的,也就是说任何一个broker宕机了,那个broker上的partition就丢了,没法读也没法写,没有什么高可用可言。

kafka在0.8之后,提过了HA机制,也就是replica副本机制。每个partition的数据都会同步到其他机器上,形成自己的replica副本。然后所有的replica副本会选举一个leader出来,那么生产者消费者都和这个leader打交道,其他的replica就是follower。写的时候,leader会把数据同步到所有follower上面去,读的时候直接从leader上面读取即可。
为什么只能读写leader:因为要是你可以随意去读写每个follower,那么就要关心数据一致性问题,系统复杂度太高,容易出问题。kafka会均匀度讲一个partition的所有数据replica分布在不同的机器上,这样就可以提高容错性。
这样就是高可用了,因为如果某个broker宕机 了,没事儿,那个broker的partition在其他机器上有副本,如果这上面有某个partition的leader,那么此时会重新选举出一个现代leader出来,继续读写这个新的leader即可。

kafka高可用架构示意图.png

写消息: 写数据的时候,生产者就写leader,然后leader将数据落到磁盘上之后,接着其他follower自己主动从leader来pull数据。一旦所有follower同步好了数据,就会发送ack个leader,leader收到了所有的follower的ack之后,就会返回写成功的消息给消息生产者。(这只是一种模式,可以调整)。
读数据:消费数据的时候,只会从leader进行消费。但是只有一个消息已经被所有follower都同步成功返回ack的时候,这个消息才会被消费者读到。

作者:一条路上的咸鱼
链接:https://www.jianshu.com/p/ab64681beb17
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。