22、The Advanced API 高级API - 困兽斗 - 博客园

来源: 22、The Advanced API 高级API – 困兽斗 – 博客园

EasyNetQ的使命是为RabbitMQ消息传递提供最简单的API。核心IBus接口有意避免暴露AMQP概念:如交换器、绑定、队列。相反,EasyNetQ实现一个默认基于消息的class type的“交换器+绑定+队列”拓扑结构。

有些场景下,需要能配置自定义的“交换器+绑定+队列”拓扑。EasyNetQ的The Advanced API 就可以提供这些功能。这个高级API对AMQP标准有很好的理解。

高级API是通过IAdvancedBus接口提供的,你可以通过IBus的Advanced属性获得一个IAdvancedBus接口的实例。

var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;

 

一、如何声明交换器

要声明一个交换器(在RabbitMQ上),你可以使用EasyNetQ.IAdvancedBus接口的ExchangeDeclare方法,方法原型:

复制代码
IExchange ExchangeDeclare(
    string name, 
    string type, 
    bool passive = false, 
    bool durable = true, 
    bool autoDelete = false, 
    bool @internal = false, 
    string alternateExchange = null, 
    bool delayed = false);
复制代码

形参的含义如下:

复制代码
name:                欲创建的交换器名The name of the exchange you want to create
type:                欲创建交换器类型,必须是AMQP标准里定义的类型,你可以通过ExchangeType类的静态属性安全地指定它。
passive:            指定为true时,如果该名字的交换器之前不存在,不会创建它,而是抛出异常。(默认 false)
durable:            交换器是否可持久。(默认 true)
autoDelete:            当最后一个队列解绑定后,该交换器是否自动删除。(默认 false)
internal:            指定为true时,该交换器不能直接被发布者使用,而只能被其他普通交换器绑定使用。(默认 false)
alternateExchange:    替代交换器名。如果无法路由消息,就将消息路由到该交换器。
delayed:            指定为true时,声明一个x-delayed-type交换器,用于路由延迟消息。
复制代码

小例子:

复制代码
//创建一个直接交换器
var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct);

//创建一个主题交换器
var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Topic);

//创建一个扇出交换器
var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Fanout);
复制代码

获取RabbitMQ默认的交换器:

var exchange = Exchange.GetDefault();

 

 

二、如何声明队列

要声明一个消息队列(在RabbitMQ上),你可以使用EasyNetQ.IAdvancedBus接口的QueueDeclare方法,方法原型:

复制代码
IQueue QueueDeclare(
    string name, 
    bool passive = false, 
    bool durable = true, 
    bool exclusive = false, 
    bool autoDelete = false,
    int? perQueueMessageTtl  = null, 
    int? expires = null,
    byte? maxPriority = null,
    string deadLetterExchange = null, 
    string deadLetterRoutingKey = null,
    int? maxLength = null,
    int? maxLengthBytes = null);
复制代码

形参含义如下:

复制代码
name:                      队列名
passive:                   如果该队列之前不存在,不创建它,而是抛出异常。(默认 false)
durable:                   队列是否可持久。(默认 true)
exclusive:                 是否当前连接专用。(默认 false)
autoDelete:                是否自动删除队列,一旦所有消费者断开连接。(默认 false)
perQueueMessageTtl:        在被丢弃之前,消息应该在队列中保留多长时间(毫秒)。(默认 null,即不设置)
expires:                   在自动删除队列之前,该队列应该保持未使用状态多长时间(毫秒)。(默认 null,即不设置)
maxPriority:               指定队列应该支持的最大消息优先级。
deadLetterExchange:        指定在被RabbitMQ服务器自动删除之前,交换的名称是否保持未占用状态。
deadLetterRoutingKey:      如果设置了,将使用指定的路由键路由消息,如果没有设置,消息将使用它们最初发布的同一路由键进行路由。
maxLength:                 队列中能够存放的ready消息的最大数量。 一旦超限,为了给新来消息腾位置,队首消息将被丢弃或者成为死信。
maxLengthBytes:            队列最大字节数。 一旦超限,为了给新来消息腾位置,队首消息将被丢弃或者成为死信。
复制代码

请注意RabbitMQ对待上面两个maxLength的行为,它们并不像人们想象那样。有人可能以为超限后RabbitMQ会拒绝接收(生产者)更多的消息,然而RabbitMQ文档指出一旦超限,队首消息将被丢弃或者成为死信,要为新来的消息腾地方。

 

小例子:

// 声明一个持久化队列
var queue = advancedBus.QueueDeclare("my_queue");

// declare a queue with message TTL of 10 seconds:
var queue = advancedBus.QueueDeclare("my_queue", perQueueMessageTtl:10000);

 

要声明一个“未命名”的独占队列,(实际上由RabbitMQ为之产生一个队列名),请调用QueueDeclare() 无参重载方法:

var queue = advancedBus.QueueDeclare();

请注意,EasyNetQ的自动消费者重连接逻辑不能用于“独占队列”。

 

 

三、绑定

你可以像这样把一个队列绑定到一个交换器:

var queue = advancedBus.QueueDeclare("my.queue");    //队列
var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);    //主题交换器
var binding = advancedBus.Bind(exchange, queue, "A.*");//绑定,指定路由键

 

要指定一个队列和一个交换之间的多个绑定,只需多次调用Bind方法:

var queue = advancedBus.QueueDeclare("my.queue");   //声明队列
var exchange = advancedBus.ExchangeDeclare("my.exchange", ExchangeType.Topic);  //声明交换器

advancedBus.Bind(exchange, queue, "A.B");   //绑定, 主题设置为 A.B
advancedBus.Bind(exchange, queue, "A.C");   //绑定,主题设置为 A.C

 

你也可以把一个交换器绑定到另一个交换器上,穿成串

复制代码
var sourceExchange = advancedBus.ExchangeDeclare("my.exchange.1", ExchangeType.Topic);       //源交换器
var destinationExchange = advancedBus.ExchangeDeclare("my.exchange.2", ExchangeType.Topic);  //目标交换器
var queue = advancedBus.QueueDeclare("my.queue");        //声明队列

advancedBus.Bind(sourceExchange, destinationExchange, "A.*");   //把源交换器绑定到目标交换器
advancedBus.Bind(destinationExchange, queue, "A.C");            //把目标交换器绑定到队列
复制代码

注意上面穿成串后,目标交换器收到A主题和 *(任一个字母)的主题消息;而队列只能收到A和C的主题消息。

 

 

四、发布

高级发布方法允许指定你要把消息发布到哪个交换器上,它还允许访问消息的AMQP标准的basic属性。

高级API要求将你的消息封装到Message类对象中

var myMessage = new MyMessage {Text = "Hello from the publisher"};
var message = new Message<MyMessage>(myMessage);

Message类使你可以访问AMQP的basic属性,例如:

message.Properties.AppId = "my_app_id";
message.Properties.ReplyTo = "my_reply_queue";

最后你只要调用Publish方法发布你的消息,在下例我们发布到默认交换器

bus.Publish(Exchange.GetDefault(), queueName, false, false, message);

一个重载Publish方法允许你绕过EasyNetQ的消息序列化,直接创建你自己的字节数组作为消息。

var properties = new MessageProperties();
var body = Encoding.UTF8.GetBytes("Hello World!");
bus.Publish(Exchange.GetDefault(), queueName, false, false, properties, body);

 

 

五、消费

使用IAdvancedBus接口的Consume方法,就可以消费队列中的消息。

IDisposable Consume<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task> onMessage) where T : class;

onMessage 委托是你要提供的消息处理方法。

正如上面的“发布”那一节所描述的,IMessage可以让你访问消息和它的MessageProperties。而这里MessageRecivedInfo提供了关于消息被消费的上下文的额外信息:

复制代码
public class MessageReceivedInfo
{
    public string ConsumerTag { get; set; }
    public ulong DeliverTag { get; set; }
    public bool Redelivered { get; set; }
    public string Exchange { get; set; }
    public string RoutingKey { get; set; }         
}
复制代码

onMessage委托返回一个Task,该任务允许你编写非阻塞的异步处理程序。

该消费方法返回一个IDisposable接口实例,调用该实例的Dispose方法,可以撤销该消费者。

如果你仅仅需要同步处理消息,你可以调用同步的Consume重载方法:

IDisposable Consume<T>(IQueue queue, Action<IMessage<T>, MessageReceivedInfo> onMessage) where T : class;

 

如果要绕过EasyNetQ的消息序列化,调用下面的Consume重载方法,提供一个字节数组(作为消息):

void Consume(IQueue queue, Func<Byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

在下面示例中,我们正在消费队列“myqueue”中的原始字节数组(即消息):

复制代码
var queue = advancedBus.QueueDeclare("my_queue");   //声明队列
advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
    {
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine("Got message: '{0}'", message);
    }));
复制代码

你可以调用另一个重载的Consume方法,让单个消费者可选地注册多个处理委托:

IDisposable Consume(IQueue queue, Action<IHandlerRegistration> addHandlers);

IHandlerRegistration 接口如下所示:

复制代码
public interface IHandlerRegistration
{
    /// <summary>
    /// 添加一个异步处理委托Add an asynchronous handler
    /// </summary>
    /// <typeparam name="T">消息类型The message type</typeparam>
    /// <param name="handler">处理委托The handler</param>
    /// <returns></returns>
    IHandlerRegistration Add<T>(Func<IMessage<T>, MessageReceivedInfo, Task> handler)
        where T : class;

    /// <summary>
    /// 添加一个同步处理委托Add a synchronous handler
    /// </summary>
    /// <typeparam name="T">消息类型The message type</typeparam>
    /// <param name="handler">处理委托The handler</param>
    /// <returns></returns>
    IHandlerRegistration Add<T>(Action<IMessage<T>, MessageReceivedInfo> handler)
        where T : class;

    /// <summary>
    /// 如果设置为true,如果没有适合的处理委托,将会抛出异常。
    /// 设置为false,返回一个无操作(什么也不做)的委托。(默认 true)
    /// Set to true if the handler collection should throw an EasyNetQException when no
    /// matching handler is found, or false if it should return a noop handler.
    /// Default is true.
    /// </summary>
    bool ThrowOnNoMatchingHandler { get; set; }
}
复制代码

在下面例子中,我们注册了两个不同的处理委托:一个处理MyMessage类型消息,另一个处理MyOtherMessage类型消息:

复制代码
bus.Advanced.Consume(queue, x => x
        .Add<MyMessage>((message, info) => 
            { 
                Console.WriteLine("Got MyMessage {0}", message.Body.Text);
                countdownEvent.Signal();
            })
        .Add<MyOtherMessage>((message, info) =>
            {
                Console.WriteLine("Got MyOtherMessage {0}", message.Body.Text);
                countdownEvent.Signal();
            })
    );
复制代码

更多信息请参阅这篇博客文章:

http://mikehadlow.blogspot.co.uk/2013/11/easynetq-multiple-handlers-per-consumer.html

 

 

6、从队列获取单条消息

要从队列中获得单条消息,请使用IAdvancedBus.Get() 方法:

IBasicGetResult<T> Get<T>(IQueue queue) where T : class;

AMQP文档说:“该方法使用同步对话直接访问队列中的消息,该对话是为特定类型的应用程序设计的,而同步功能比性能更重要。”

不要在循环中调用Get方法访问消息队列。在一般场景中,我想你一定只喜欢使用Consume方法。

IBasicGetResult接口如下所示:

复制代码
/// <summary>
/// AdvancedBus.Get 方法获取的结果
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IBasicGetResult<T> where T : class
{
    /// <summary>
    /// 消息是否可用。True if a message is availabe, false if not.
    /// </summary>
    bool MessageAvailable { get; }

    /// <summary>
    /// The message retreived from the queue. 
    /// This property will throw a MessageNotAvailableException if no message
    /// was available. You should check the MessageAvailable property before
    /// attempting to access it.你应该先检查MessageAvailable属性值,再读取该属性值。
    /// </summary>
    IMessage<T> Message { get; }
}
复制代码

注意:在读取Message属性前,你应该总是先检查MessageAvailable属性值是否为true才行(避免抛出异常),如下例所示:

复制代码
var queue = advancedBus.QueueDeclare("get_test");    //声明队列
advancedBus.Publish(Exchange.GetDefault(), "get_test", false, false,
    new Message<MyMessage>(new MyMessage{ Text = "Oh! Hello!" }));    //发布消息

var getResult = advancedBus.Get<MyMessage>(queue);   //获取单条消息

if (getResult.MessageAvailable)   //如果消息可用
{
    Console.Out.WriteLine("Got message: {0}", getResult.Message.Body.Text);
}
else
{
    Console.Out.WriteLine("Failed to get message!");
}
复制代码

要访问二进制消息,请使用非泛型的Get方法:

IBasicGetResult Get(IQueue queue);

非泛型的IBasicGetResult接口定义如下:

复制代码
public interface IBasicGetResult
{
    byte[] Body { get; }
    MessageProperties Properties { get; }
    MessageReceivedInfo Info { get; }
}
复制代码

 

 

7、消息类型必须匹配

EasyNetQ 高级API要求订阅者只接收泛型类型参数指定的类型的消息。在上例中,只接收类型MyMessage类型的消息。

但是,EasyNetQ不担保你发布错误类型的消息给订阅者。比如:我可以很容易地设置一个 “交换器-绑定-队列”拓扑来发布NotMyMessage类型的消息,而用上面的处理程序接收它。

如果接收到错误类型的消息,EasyNetQ会抛出EasyNetQInvalidMessageTypeException 异常,如下:

EasyNetQ.EasyNetQInvalidMessageTypeException: Message type is incorrect. Expected 'EasyNetQ_Tests_MyMessage:EasyNetQ_Tests', but was 'EasyNetQ_Tests_MyOtherMessage:EasyNetQ_Tests'
   at EasyNetQ.RabbitAdvancedBus.CheckMessageType[TMessage](MessageProperties properties) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 217
   at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass1`1.<Subscribe>b__0(Byte[] body, MessageProperties properties, MessageReceivedInfo messageRecievedInfo) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 131
   at EasyNetQ.RabbitAdvancedBus.<>c__DisplayClass6.<Subscribe>b__5(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, Byte[] body) in D:\Source\EasyNetQ\Source\EasyNetQ\RabbitAdvancedBus.cs:line 176
   at EasyNetQ.QueueingConsumerFactory.HandleMessageDelivery(BasicDeliverEventArgs basicDeliverEventArgs) in D:\Source\EasyNetQ\Source\EasyNetQ\QueueingConsumerFactory.cs:line 85

 

8、事件

当通过RabbitHutch方法实例化一个IBus接口实例时,您可以指定一个AdvancedBusEventHandlers委托。

这个类包含一个事件处理委托属性,用于在 IAdvancedBus中的每个事件,提供了在bus实例化之前指定事件处理程序的方法。

不需要使用它,因为一旦创建了bus,仍然可以添加事件处理程序。

但是,你想要抓到 RabbitAdvancedBus.的首次已连接事件,你必须使创建 AdvancedBusEventHandlers 委托,注册已连接事件Connected

这是因为bus在其构造函数返回前只尝试连接一次。如果连接成功,会触发RabbitAdvancedBus.OnConnected事件。

var bus = RabbitHutch.CreateBus("host=localhost", new AdvancedBusEventHandlers(connected: (s, e) =>
{
      var advancedBus = (IAdvancedBus)s;
      Console.WriteLine(advancedBus.IsConnected); // This will print true.
}));

 

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏