NetCore EasyNetQ 高级使用 RabbitMq_坚定的走-CSDN博客

baacloud免费翻墙vpn注册使用

来源: NetCore EasyNetQ 高级使用 RabbitMq_坚定的走-CSDN博客

一、消息队列

消息队列作为分布式系统中的重要组件,常用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ。至于各种消息队列的优缺点比较,在这里就不做扩展了,网上资源很多。

更多内容可参考 消息队列及常见消息队列介绍。我在这里选用的是RabbitMq。

官网地址:http://www.rabbitmq.com

安装和配置:Windows下RabbitMq安装及配置

二、RabbitMq简单介绍

RabbitMQ是一款基于AMQP(高级消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、   Erlang等。在RabbitMq中首先要弄清楚的概念是 交换机、队列、绑定。基本的消息通讯步骤就是首先定义ExChange,然后定义队列,然后绑定交换机和队列。

需要明确的一点儿是,发布者在发送消息是,并不是把消息直接发送到队列中,而是发送到Exchang,然后由交互机根据定义的消息匹配规则,在将消息发送到队列中。

Exchange有四种消息消息分发规则:direct,topic,fanout,header。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。

详细的概念介绍推荐查看:消息队列之RabbitMq

三、EasyNetQ使用

Easynetq是一个简单易用的Rabbitmq Net客户端。同时支持 NetFramework和NetCore。GitHub地址。它是针对RabbitMq Net客户端的进一步封装。关于EasyNetQ的简单使用推荐教程:EasyNetQ的介绍

本文主要介绍基于EasyNeq的高级API的使用。EasyNetQ的作者在核心的IBus接口中尽量避免暴露AMQP中的交换机、队列、绑定这些概念,使用者即使不去了解这些概念,也能完成消息的发送接收。这相当简洁,但某些情况下,基于应用场景的需要,我们需要自定义交换机、队列、绑定这些信息,EasyNetQ允许你这么做,这些都是通过IAdvanceBus接口实现。

3.1 项目装备

这里为了演示,首先新建一个项目,包括一个发布者,两个接收者,一个公共的类库

安装EasyNetQ: NuGet>Install-Package EasyNetQ

3.2 简单封装

在Common项目里面是针对Easynetq的使用封装,主要目录如下

 

在RabbitMq文件夹下,是针对消息发送接收的简单封装。

首先来看下RabbitMqManage,主要的发送和订阅操作都在这个类中。其中ISend接口定义了发送消息的规范,SendMessageManage是ISend的实现。IMessageConsume接口定义订阅规范。

MesArg 和PushMsg分别是订阅和发送需用到的参数类。RabbitMQManage是暴露在外的操作类。

首先看发送的代码

  1. public enum SendEnum
  2. {
  3. 订阅模式 = 1,
  4. 推送模式 = 2,
  5. 主题路由模式 = 3
  6. }
  7. public class PushMsg
  8. {
  9. /// <summary>
  10. /// 发送的数据
  11. /// </summary>
  12. public object sendMsg { get; set; }
  13. /// <summary>
  14. /// 消息推送的模式
  15. /// 现在支持:订阅模式,推送模式,主题路由模式
  16. /// </summary>
  17. public SendEnum sendEnum { get; set; }
  18. /// <summary>
  19. /// 管道名称
  20. /// </summary>
  21. public string exchangeName { get; set; }
  22. /// <summary>
  23. /// 路由名称
  24. /// </summary>
  25. public string routeName { get; set; }
  26. }
  27. internal interface ISend
  28. {
  29. Task SendMsgAsync(PushMsg pushMsg, IBus bus);
  30. void SendMsg(PushMsg pushMsg, IBus bus);
  31. }
  32. internal class SendMessageMange : ISend
  33. {
  34. public async Task SendMsgAsync(PushMsg pushMsg, IBus bus)
  35. {
  36. //一对一推送
  37. var message = new Message<object>(pushMsg.sendMsg);
  38. IExchange ex = null;
  39. //判断推送模式
  40. if (pushMsg.sendEnum == SendEnum.推送模式)
  41. {
  42. ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct);
  43. }
  44. if (pushMsg.sendEnum == SendEnum.订阅模式)
  45. {
  46. //广播订阅模式
  47. ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout);
  48. }
  49. if (pushMsg.sendEnum == SendEnum.主题路由模式)
  50. {
  51. //主题路由模式
  52. ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic);
  53. }
  54. await bus.Advanced.PublishAsync(ex, pushMsg.routeName.ToSafeString(“”), false, message)
  55. .ContinueWith(task =>
  56. {
  57. if (!task.IsCompleted && task.IsFaulted)//消息投递失败
  58. {
  59. //记录投递失败的消息信息
  60. }
  61. });
  62. }
  63. public void SendMsg(PushMsg pushMsg, IBus bus)
  64. {
  65. //一对一推送
  66. var message = new Message<object>(pushMsg.sendMsg);
  67. IExchange ex = null;
  68. //判断推送模式
  69. if (pushMsg.sendEnum == SendEnum.推送模式)
  70. {
  71. ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Direct);
  72. }
  73. if (pushMsg.sendEnum == SendEnum.订阅模式)
  74. {
  75. //广播订阅模式
  76. ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Fanout);
  77. }
  78. if (pushMsg.sendEnum == SendEnum.主题路由模式)
  79. {
  80. //主题路由模式
  81. ex = bus.Advanced.ExchangeDeclare(pushMsg.exchangeName, ExchangeType.Topic);
  82. }
  83. bus.Advanced.Publish(ex, pushMsg.routeName.ToSafeString(“”), false, message);
  84. }
  85. }

在EasyNetQ中对于异步发送消息的时候,消息是否送达Broker只需要查看异步发送方法最终执行成功还是失败,成功就表示消息送达,如果失败可以将失败后的消息存入数据库中,然后用后台线程轮询

数据库表,将失败后的消息进行重新 发送。这种方式还可以进一步变成消息表,就是先将要发送的消息存入消息表中,然后后台线程轮询消息表来进行消息发送。一般这种方式被广泛用于分布式事务中,

将本地数据库操作和消息表写入放入同一个本地事务中,来保证消息发送和本地数据操作的同步成功,因为我的系统中,分布式事务的涉及很少,所以就没这样去做,只是简单的在异步发送的时候监控下

是否发送失败,然后针对失败的消息做一个重新发送的机制。这里,推荐大佬的NetCore分布式事务解决方案 CAP GitHub地址

 接着看一下消息订阅接收涉及的代码

  1. public class MesArgs
  2. {
  3. /// <summary>
  4. /// 消息推送的模式
  5. /// 现在支持:订阅模式,推送模式,主题路由模式
  6. /// </summary>
  7. public SendEnum sendEnum { get; set; }
  8. /// <summary>
  9. /// 管道名称
  10. /// </summary>
  11. public string exchangeName { get; set; }
  12. /// <summary>
  13. /// 对列名称
  14. /// </summary>
  15. public string rabbitQueeName { get; set; }
  16. /// <summary>
  17. /// 路由名称
  18. /// </summary>
  19. public string routeName { get; set; }
  20. }
  21. public interface IMessageConsume
  22. {
  23. void Consume(string message);
  24. }

在订阅中我定义了一个接口,最终业务代码中,所有的消息订阅类,都需要继续此接口

最后,我们来看下对外使用的操作类

  1. public class RabbitMQManage
  2. {
  3. private volatile static IBus bus = null;
  4. private static readonly object lockHelper = new object();
  5. /// <summary>
  6. /// 创建服务总线
  7. /// </summary>
  8. /// <param name=”config”></param>
  9. /// <returns></returns>
  10. public static IBus CreateEventBus()
  11. {
  12. //获取RabbitMq的连接地址
  13. //SystemJsonConfigManage 是我简单封装的一个json操作类,用于针对json文件的读写操作
  14. var config = SystemJsonConfigManage.GetInstance().AppSettings[“MeessageService”];
  15. if (string.IsNullOrEmpty(config))
  16. throw new Exception(“消息地址未配置”);
  17. if (bus == null && !string.IsNullOrEmpty(config))
  18. {
  19. lock (lockHelper)
  20. {
  21. if (bus == null)
  22. bus = RabbitHutch.CreateBus(config);
  23. }
  24. }
  25. return bus;
  26. }
  27. /// <summary>
  28. /// 释放服务总线
  29. /// </summary>
  30. public static void DisposeBus()
  31. {
  32. bus?.Dispose();
  33. }
  34. /// <summary>
  35. /// 消息同步投递
  36. /// </summary>
  37. /// <param name=”pushMsg”></param>
  38. /// <returns></returns>
  39. public static bool PushMessage(PushMsg pushMsg)
  40. {
  41. bool b = true;
  42. try
  43. {
  44. if (bus == null)
  45. CreateEventBus();
  46. new SendMessageMange().SendMsg(pushMsg, bus);
  47. b = true;
  48. }
  49. catch (Exception ex)
  50. {
  51. b = false;
  52. }
  53. return b;
  54. }
  55. /// <summary>
  56. /// 消息异步投递
  57. /// </summary>
  58. /// <param name=”pushMsg”></param>
  59. public static async Task PushMessageAsync(PushMsg pushMsg)
  60. {
  61. try
  62. {
  63. if (bus == null)
  64. CreateEventBus();
  65. await new SendMessageMange().SendMsgAsync(pushMsg, bus);
  66. }
  67. catch (Exception ex)
  68. {
  69. throw ex;
  70. }
  71. }
  72. /// <summary>
  73. /// 消息订阅
  74. /// </summary>
  75. public static void Subscribe<TConsum>(MesArgs args)
  76. where TConsum : IMessageConsume,new()
  77. {
  78. if (bus == null)
  79. CreateEventBus();
  80. if (string.IsNullOrEmpty(args.exchangeName))
  81. return;
  82. Expression<Action<TConsum>> methodCall;
  83. IExchange ex = null;
  84. //判断推送模式
  85. if (args.sendEnum == SendEnum.推送模式)
  86. {
  87. ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Direct);
  88. }
  89. if (args.sendEnum == SendEnum.订阅模式)
  90. {
  91. //广播订阅模式
  92. ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Fanout);
  93. }
  94. if (args.sendEnum == SendEnum.主题路由模式)
  95. {
  96. //主题路由模式
  97. ex = bus.Advanced.ExchangeDeclare(args.exchangeName, ExchangeType.Topic);
  98. }
  99. IQueue qu;
  100. if (string.IsNullOrEmpty(args.rabbitQueeName))
  101. {
  102. qu = bus.Advanced.QueueDeclare();
  103. }
  104. else
  105. qu = bus.Advanced.QueueDeclare(args.rabbitQueeName);
  106. bus.Advanced.Bind(ex, qu, args.routeName.ToSafeString(“”));
  107. bus.Advanced.Consume(qu, (body, properties, info) => Task.Factory.StartNew(() =>
  108. {
  109. try
  110. {
  111. lock (lockHelper)
  112. {
  113. var message = Encoding.UTF8.GetString(body);
  114. //处理消息
  115. methodCall = job => job.Consume(message);
  116. methodCall.Compile()(new TConsum());
  117. }
  118. }
  119. catch (Exception e)
  120. {
  121. throw e;
  122. }
  123. }));
  124. }
  125. }

这里面主要封装了消息的发送和订阅,以及IBus单例的创建。在后续的消息发送和订阅主要就通过此处来实现。我们看到一开始的类目结构中还有一个RaExMessageHandleJob类,这个类就是一个后台

循环任务,用来监测数据库中是否保存了发送失败的消息,如果有,则将消息取出,尝试重新发送。在此就不做多的介绍,大家可以根据自己的实际需求来实现。

3.3 发布者

现在来看一下消息发布者的代码

主要的发送代码都在Send类中,其中appsettings.json里面配置了Rabbitmq的连接地址,TestDto只是一个为了方便演示的参数类。

下面看一下Program里面的代码

很简单的一个发送消息调用。

然后来看一下Send类中的代码

  1. public class Send
  2. {
  3. /// <summary>
  4. /// 发送消息
  5. /// </summary>
  6. public static void SendMessage()
  7. {
  8. //需要注意一点儿,如果发送的时候,在该管道下找不到相匹配的队列框架将默认丢弃该消息
  9. //推送模式
  10. //推送模式下,需指定管道名称和路由键值名称
  11. //消息只会被发送到和指定路由键值完全匹配的队列中
  12. var directdto = new PushMsg()
  13. {
  14. sendMsg = new TestDto()
  15. {
  16. Var1 = “这是推送模式”
  17. },
  18. exchangeName = “message.directdemo”,
  19. routeName= “routekey”,
  20. sendEnum =SendEnum.推送模式
  21. };
  22. //同步发送 ,返回true或fasle true 发送成功,消息已存储到Rabbitmq中,false表示发送失败
  23. var b= RabbitMQManage.PushMessage(directdto);
  24. //异步发送,如果失败,失败的消息会被写入数据库,会有后台线程轮询数据库进行重新发送
  25. //RabbitMQManage.PushMessageAsync(directlist);
  26. //订阅模式
  27. //订阅模式只需要指定管道名称
  28. //消息会被发送到该管道下的所有队列中
  29. var fanoutdto = new PushMsg()
  30. {
  31. sendMsg = new TestDto()
  32. {
  33. Var1 = “这是订阅模式”
  34. },
  35. exchangeName = “message.fanoutdemo”,
  36. sendEnum = SendEnum.订阅模式
  37. };
  38. //同步发送
  39. var fb = RabbitMQManage.PushMessage(fanoutdto);
  40. //异步发送
  41. //RabbitMQManage.PushMessageAsync(fanoutdto);
  42. //主题路由模式
  43. //路由模式下需指定 管道名称和路由值
  44. //消息会被发送到该管道下,和路由值匹配的队列中去
  45. var routedto = new PushMsg()
  46. {
  47. sendMsg = new TestDto()
  48. {
  49. Var1 = “这是主题路由模式1”,
  50. },
  51. exchangeName = “message.topicdemo”,
  52. routeName=“a.log”,
  53. sendEnum=SendEnum.主题路由模式
  54. };
  55. var routedto2 = new PushMsg()
  56. {
  57. sendMsg = new TestDto()
  58. {
  59. Var1 = “这是主题路由模式2”,
  60. },
  61. exchangeName = “message.topicdemo”,
  62. routeName = “a.log.a.b”,
  63. sendEnum = SendEnum.主题路由模式
  64. };
  65. //同步发送
  66. var rb = RabbitMQManage.PushMessage(routedto);
  67. var rb2 = RabbitMQManage.PushMessage(routedto2);
  68. //异步发送
  69. //RabbitMQManage.PushMessageAsync(routedto);
  70. }
  71. }

3.4 消费者

首先来看下消费者端的目录结构

 

其中appsettings.json中配置Rabbitmq的连接信息,Program中只是简单调用消息订阅

主要的消息订阅代码都在MessageManage文件夹下,MessageManService用于定义消息订阅类型

  1. public class MessageManService
  2. {
  3. public static void Subsribe()
  4. {
  5. Task.Run(() =>
  6. {
  7. //概念 一个管道下面可以绑定多个队列。
  8. //发送消息 是指将消息发送到管道中,然后由rabbitmq根据发送规则在将消息具体的转发到对应到管道下面的队列中
  9. //消费消息 是指消费者(即服务)从管道下面的队列中获取消息
  10. //同一个队列 可以有多个消费者(即不同的服务,都可以连接到同一个队列去获取消息)
  11. //但注意 当一个队列有多个消费者的时候,消息会被依次分发到不同的消费者中。比如第一条消息给第一个消费者,第二条消息给第二个消费者(框架内部有一个公平分发的机制)
  12. //推送模式时 需指定管道名称和路由值
  13. //队列名称可自己指定
  14. //注意 ,管道名称和路由名称一定要和发送方的管道名称和路由名称一致
  15. //无论这个管道下面挂靠有多少个队列,只有路由名称和此处指定的路由名称完全一致的队列,才会收到这条消息。
  16. var dirarg = new MesArgs()
  17. {
  18. sendEnum = SendEnum.推送模式,
  19. exchangeName = “message.directdemo”,
  20. rabbitQueeName = “meesage.directmessagequene”,
  21. routeName = “routekey”
  22. };
  23. RabbitMQManage.Subscribe<DirectMessageConsume>(dirarg);
  24. //订阅模式时需指定管道名称,并且管道名称要和发送方管道名称一致
  25. //队列名称可自己指定
  26. //所有这个管道下面的队列,都将收到该条消息
  27. var fanoutrg = new MesArgs()
  28. {
  29. sendEnum = SendEnum.订阅模式,
  30. exchangeName = “message.fanoutdemo”,
  31. rabbitQueeName = “meesage.fanoutmessagequene”
  32. };
  33. RabbitMQManage.Subscribe<FanoutMessageConsume>(fanoutrg);
  34. //路由模式时需指定管道名称,路由关键字并且管道名称,路由关键字要和发送方的一致
  35. //队列名称可自己指定
  36. //消息将被发送到管道下面的能匹配路由关键字的队列中
  37. //也就是说 路由模式时,有多少队列能收到消息,取决于该队列的路由关键字是否匹配,只要匹配就能收到消息
  38. //符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词
  39. var topicrg = new MesArgs()
  40. {
  41. sendEnum = SendEnum.主题路由模式,
  42. exchangeName = “message.topicdemo”,
  43. rabbitQueeName = “message.topicmessagequene”,
  44. routeName = “#.log.#”
  45. };
  46. RabbitMQManage.Subscribe<TopicMessageConsume>(topicrg);
  47. });
  48. }
  49. }

Consume文件夹下主要定义了消息的业务处理

  1. //推送模式过来的消息
  2. public class DirectMessageConsume : IMessageConsume
  3. {
  4. //消息的处理方法中最好不要进行try catch操作
  5. //如果发送异常,EasyNetQ会自动将消息放入错误队列中
  6. //如果在Consume方法体中捕获了异常并且没有抛出,会默认消息处理成功
  7. //消息的幂等性需业务方自行处理,也就是说同一条消息可能会接收到两次
  8. //(比如说第一次正在处理消息的时候服务挂掉,服务重启后这条消息又会重新推送过来)
  9. public void Consume(string message)
  10. {
  11. var dto = JsonConvert.DeserializeObject<TestDto>(message);
  12. Console.WriteLine(dto.Var1 + “;” + dto.Var2 + “;” + dto.Var3);
  13. }
  14. }
  15. //广播模式过来的消息
  16. public class FanoutMessageConsume : IMessageConsume
  17. {
  18. //消息的处理方法中最好不要进行try catch操作
  19. //如果发送异常,EasyNetQ会自动将消息放入错误队列中
  20. //如果在Consume方法体中捕获了异常并且没有抛出,会默认消息处理成功
  21. //消息的幂等性需业务方自行处理,也就是说同一条消息可能会接收到两次
  22. //(比如说第一次正在处理消息的时候服务挂掉,服务重启后这条消息又会重新推送过来)
  23. public void Consume(string message)
  24. {
  25. var dto = JsonConvert.DeserializeObject<TestDto>(message);
  26. Console.WriteLine(dto.Var1 + “;” + dto.Var2 + “;” + dto.Var3);
  27. }
  28. }
  29. //主题路由模式过来的消息
  30. public class TopicMessageConsume : IMessageConsume
  31. {
  32. //消息的处理方法中最好不要进行try catch操作
  33. //如果发送异常,EasyNetQ会自动将消息放入错误队列中
  34. //如果在Consume方法体中捕获了异常并且没有抛出,会默认消息处理成功
  35. //消息的幂等性需业务方自行处理,也就是说同一条消息可能会接收到两次
  36. //(比如说第一次正在处理消息的时候服务挂掉,服务重启后这条消息又会重新推送过来)
  37. public void Consume(string message)
  38. {
  39. var dto = JsonConvert.DeserializeObject<TestDto>(message);
  40. Console.WriteLine(dto.Var1 + “;” + dto.Var2 + “;” + dto.Var3);
  41. }
  42. }

可以看到,所有的类都集成自我们定义的接口IMessageConsume。

四、总结

在EasyNetQ中如果需要消费者确认功能,则需要在Rabbitmq的连接配置中设置publisherConfirms=true,这将会开启自动确认。在使用高级api定义交换机和队列时可以自己定义多种参数,比如消息是否持久化,消息最大长度等等,具体大家可以去看官方文档,上面有详细介绍。Easynetq会自动去捕获消费异常的消息并将其放入到错误队列中,而且官方提供了重新发送错误队列中消息的方法,当然你也可以自己去监视错误列队,对异常消息进行处理。EasyNetQ里面作者针对消息的发布确认和消费确认都做了封装。在EasyNetQ中发布消息的时候如果选用的同步发送,只要没有抛出异常,我们就可以认为任务消息已经正确到达Broker,而异步发送的话需要我们自己去监视Task是否成功 。如果开启了自动确认,并不需要我们在消息处理的方法体中手动返回ack信息,只要消息被 正确处理就会自动ack。虽然RabbitMq中也有事务消息,但由于性能比较差,并不推荐使用。其实,只要我们能明确消息是否发布成功和消费成功,就将会很容易在这个基础上扩展出分布式事务的处理。

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏