哎呀,我老大写Bug啦——记一次MessageQueue的优化

发表于:2019-7-11 10:16

字体: | 上一篇 | 下一篇 | 我要投稿

 作者:EminemJK(山治先生)    来源:博客园

  MessageQueue,顾名思义消息队列,在系统开发中也是用的比较多的一个中间件吧。我们这里主要用它来做日志管理和订单管理的,记得老老大(恩,是的,就是老老大,因为他已经跳槽了)还在的时候,当时也是为了赶项目进度,他也参与开发了,那时候我才刚刚入职,他负责写后端这块,我来了就把他手上的任务接过来了,(接着接着……就辞职了)。
  之后我们的开发仍然有条不紊的开发着,直到今年的一月份吧,才上线开始运行,然后就出现了常规状态,上线之后就开始爆炸,这个页面打不开呀,那个内容没东西呀,第三方登录问题呀,支付问题呀,临时再改需求呀……(该来的都来了),加班、debug、测试、再debug……,然后经过几天的修复,终于完成了跟自己电脑一样稳定的运行,组员们都美滋滋的,今晚加个鸡腿才行。
                                                                                      
  都说祸不单行,古人是不会骗我们的,Bug怎么会修得完呢?天真,要是Bug能修得完还要我们来干啥,好景不长,果然,过了一周之后,组员突然群里叫喳喳。
  what is it ? 
   
  来了,今天的主角登场了,我也要开始加班了。
  RabbitMQ
  这个是今天要说的东西,基础概念什么的不是今天要说的重点,重点是:
   
  RabbitMQ内存暴涨!使得整个服务器濒临瘫痪,远程登录服务器都差点挤不进去的状态,别看截图目前才1.3G,吃个午饭回来,就2.3G了,可怕不可怕?咋回事?
  老板喊你回来加班啦
  先不管了,线上优先解决,手动先Reset回收资源以释放空间,这个只是临时的办法,然后检查一下rabbitMQ的配置有没有问题,路径在
   C:\Users\Administrator\AppData\Roaming\RabbitMQ 
  完全是默认的配置,完全ojbk啊,那到底咋回事?继续检查,想想不如从项目开始吧,然后查看项目中的代码,都是从来自【MessageLib】的组件调用

  好了,叫我老老大要这个组件的代码,他把git的地址就发给我,我把项目down下来,
  这个封装的组件内容不多,主要的文件一目了然,其实就是用到这个两个组件来进行的二次封装来调用
  主要的代码是在【MessageQueue.cs】文件里,展示一下当时的代码情况:
   using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Text;
  using System.Threading.Tasks;
  using MessageLib.ClassBean;
  using EasyNetQ;
  using System.Threading;
  namespace MessageLib
  {
      public static class MessageQueue
      {
          public static IBus bus = MQBusBuilder.CreateMessageBus();
          //消息队列
          private static Queue<Item> NoticQueue = new Queue<Item>(5000);
          //日志队列
          private static Queue<Item> LogQueue = new Queue<Item>(5000);
          //队列数目发布数量
          private static int max_count_to_pulish = 1000;
          /// <summary>
          /// 可供外部使用的消息入列操作
          /// </summary>
          public static void push(Item item)
          {
              if (item.type == ItemType.notic)
              {
                  NoticQueue.Enqueue(item);
              }
              if (item.type == ItemType.log)
              {
                  LogQueue.Enqueue(item);
              }
          }
          /// <summary>
          /// 监听后需要调用的发布接口
          /// </summary>
          private static void Pulish(object source, System.Timers.ElapsedEventArgs e)
          {
              if (NoticQueue.Count > 0 || LogQueue.Count > 0)
              {
                  if (bus == null || !bus.IsConnected)
                  {
                      bus = MQBusBuilder.CreateMessageBus();
                  }
                  if (bus.IsConnected)
                  {
                      Send(ItemType.notic);
                      Send(ItemType.log);
                  }
              }
          }
          /// <summary>
          /// 程序自运行并开始监听
          /// </summary>
          public static void Run()
          {
              System.Timers.Timer timer = new System.Timers.Timer();
              timer.Interval = 1000;
              timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;    
              timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);    
              timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;    
          }
          /// <summary>
          /// 启动线程异步调用
          /// </summary>
          /// <param name="channelType"></param>
          private static void Send(string channelType)
          {
              Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
              thread.IsBackground = true;
              thread.Start(channelType);
          }
          /// <summary>
          /// 调用发布日志及提醒两个接口
          /// </summary>
          /// <param name="channel"></param>
          private static void PublishAction(object channel)
          {
              PublisLog();
              PublisNotic();
          }
          /// <summary>
          /// 日志消息发送至RabbitMQ指定exchange、Queue
          /// </summary>
          private static void PublisLog()
          {
              string channelName = ItemType.log;
              try
              {
                  var routingKey = channelName;
                  var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                  var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}",channelName), "direct");
                  var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
                  while (LogQueue.Count > 0)
                  {
                      Item item = LogQueue.Dequeue();
                      if (item != null)
                      {
                          var properties = new MessageProperties();
                          var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                          Message.Properties.AppId = item.appid;
                          bus.Advanced.Publish(exchange, routingKey, false, Message);
                      }
                  }
              }
              catch (Exception ex)
              {
                  throw ex;
              }
          }
          /// <summary>
          /// 提醒消息发送至RabbitMQ指定exchange、Queue
          /// </summary>
          private static void PublisNotic()
          {
              string channelName = ItemType.notic;
              var routingKey = channelName;
              var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
              var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");
              var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
              while(NoticQueue.Count > 0)
              {
                  Item item = NoticQueue.Dequeue();
                  if (item != null)
                  {
                      var properties = new MessageProperties();
                      var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                      Message.Properties.AppId = item.appid;
                      bus.Advanced.Publish(exchange, routingKey, false, Message);
                  }
              }
          }
      }
  }
   然后我就发现了这一段代码!
 
    /// <summary>
          /// 程序自运行并开始监听
          /// </summary>
          public static void Run()
          {
              System.Timers.Timer timer = new System.Timers.Timer();
              timer.Interval = 1000;
              timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;    
              timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);    
              timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;    
          }

    /// <summary>
          /// 启动线程异步调用
          /// </summary>
          /// <param name="channelType"></param>
          private static void Send(string channelType)
          {
              Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
              thread.IsBackground = true;
              thread.Start(channelType);
          }

   老老大写Bug了,当Run()起来之后,队列中【NoticQueue】有内容,就开始推送消息,发送消息Send(),每来一次推送new一个线程并设置为后台线程,然后发送消息。好了,明白了,这里的线程很混乱,因为线程操作不当,new了N多个频道,并且没有主动回收,这也难怪内存暴涨呢。并且要是Run()调用多次,后果更加不堪设想。
  加班改起来
  开始动手吧,业务主要推送有普通消息、错误消息和通知消息,那么将队列与线程组装一起,新增一个类QueueTask.cs:
   using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Text;
  using System.Threading;
  using System.Threading.Tasks;
  using MessageLib.Core;
  using MessageLib.Core.ClassBean;
  using EasyNetQ;
  using EasyNetQ.Topology;
  using System.Linq.Expressions;
  namespace MessageLib.Core
  {
      public class QueueTask
      {
          private Queue<Item> QueueData = new Queue<Item>(5000);
          //队列数目发布数量
          private int max_count_to_pulish = 1000;
          public  bool isRunning = false;
          private string itemType = ItemType.info;
          private string MessageRouter = ItemType.info;
          public QueueTask(string itemType,string MessageRouter)
          {
              this.itemType = itemType;
              this.MessageRouter = MessageRouter;
          }
          /// <summary>
          /// 可供外部使用的消息入列操作
          /// </summary>
          public void Push(Item item, IBus IBus)
          {
              QueueData.Enqueue(item);
              if (!isRunning)
                  Run(IBus);
          }
          public void Run(IBus IBus)
          {
              if (!isRunning)
              {
                  Timer timerNotic = new Timer(PulishMsg, IBus, 1000, 1000);
                  isRunning = true;
              }
          }
          private void PulishMsg(object state)
          {
              IBus IBus = state as IBus;
              if (QueueData.Count > 0)
              {
                  PublisMsg(itemType, IBus);
              }
          }
          private void PublisMsg(object channel, IBus BusInstance)
          {
              try
              {
                  string channelName = channel as string;
                  if (QueueData.Count > 0)
                  {
                      var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                      var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);
                      var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);
                      while (QueueData.Count > 0)
                      {
                          Item item = QueueData.Dequeue();
                          if (item != null)
                          {
                              var properties = new MessageProperties();
                              var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                              Message.Properties.AppId = item.appid;
                              BusInstance.Advanced.Publish(exchange, mqqueue.Name, false, Message);
                          }
                      }
                  }
              }
              catch (Exception ex)
              {
                  Console.WriteLine("PublisMsg error:" + ex.Message);
              }
          } 
          public void Read<T>(IBus BusInstance,Action<Item> dealAction) where T : Item
          {
              try
              {
                  string channelName = itemType;
                  var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                  var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);
                  var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);
                  var Consume = BusInstance.Advanced.Consume(mqqueue, registration => Task.Run(() =>
                  {
                      registration.Add<string>((message, info) => 
                      {
                          Item data = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message.Body);
                          dealAction(data);
                      });
                  }));
              }
              catch (Exception ex)
              {
                  Console.WriteLine("Read error:" + ex.Message);
              }
          }
      }
  }
   然后,在MessageQueue.cs修改为单例模式:
   public static class MessageQueue
  {
  /*Install-Package EasyNetQ-dotnet-core -Version 2.0.2-radicalgeek-netc0001 -Pre*/
  private static IBus bus = null;
  public static bool isRunning = false;
  //消息队列
  private static QueueTask NoticQueue = null;
  //日志队列
  private static QueueTask LogQueue = null;
  //自定义
  private static QueueTask InfoQueue = null;
  #region 同步锁
  private static readonly object obj = new object();
  #endregion
  public static void Init(string Connection, string routeKey)
  {
  if (NoticQueue == null)
  NoticQueue = new QueueTask(ItemType.notic, ItemType.notic);
  if (LogQueue == null)
  LogQueue = new QueueTask(ItemType.error, ItemType.error);
  if (InfoQueue == null)
  InfoQueue = new QueueTask(ItemType.info, routeKey);
  if (string.IsNullOrEmpty(MQBusBuilder.Connnection))
  MQBusBuilder.Connnection = Connection;
  }
  public static IBus BusInstance
  {
  get
  {
  if (bus == null)
  {
  lock (obj)
  {
  if (bus == null|| !bus.IsConnected)
  {
  bus = MQBusBuilder.CreateMessageBus();
  }
  }
  }
  return bus;
  }
  }
  /// <summary>
  /// 可供外部使用的消息入列操作
  /// </summary>
  public static void PushAndRun(Item item)
  {
  if (string.IsNullOrWhiteSpace(MQBusBuilder.Connnection) || BusInstance == null)
  return;
  if (item.type == ItemType.notic)
  {
  NoticQueue.Push(item, BusInstance);
  }
  if (item.type == ItemType.error)
  {
  LogQueue.Push(item, BusInstance);
  }
  if (item.type == ItemType.info)
  {
  InfoQueue.Push(item, BusInstance);
  }
  }
  public static void Read(string itemType, Action<Item> dealAction)
  {
  if (itemType == ItemType.notic)
  {
  NoticQueue.Read<NoticItem>(BusInstance, dealAction);
  }
  if (itemType == ItemType.error)
  {
  LogQueue.Read<ErrorItem>(BusInstance, dealAction);
  }
  if (itemType == ItemType.info)
  {
  InfoQueue.Read<Message>(BusInstance, dealAction);
  }
  }
  }
  每次推送消息的时候,每个QueueTask就自己维护自己的线程和队列了,当调用推送之后,就开始运作起来。恩,应该没问题了。然后就发布nuget,再更新项目,然后发布。观察一段时间,恩,完美。
   
  事件二
  事情过后,B端开始搞起来了,然后涉及到订单系统,跟老大(不是老老大,老老大那时候已经跑了)商量之后确定使用消息队列来做订单的事件的拓展,然后就直接美滋滋的调用好之前写的了,没想到啊,这次是线程暴涨!因为订单是从B端推送过来的,B端肯定没事,订单后台订阅消息之后,读取过程中出现的线程增多,然后看看之前写的Read()方法,感觉没啥问题啊,每运行完一次,就多了一个线程,这个神奇了啊,那么源代码撸起来。
  翻来覆去,看到这个Consume方法,继承的是IDisposable接口,得勒,知道咋回事了。
  Consume.Dispose(); 多个消费者的情况下,用完请记得主动释放啊。
  这回真的可以浪了。
  总结
  遇到问题,冷静下来,耐得了寂寞才行。线上的问题优先解决,然后再慢慢Debug,解决不了,看源码,再解决不了,降级处理,欢迎共同探讨。同时也感谢一下技术群里的兄弟给的一些建议,并帮忙查找资料,还好EasyNetQ是开源了,不然也打算说先不用了,毕竟一开始没什么用户量,所以没必要整那么麻烦,加班加点的弄这个问题。不过最终都完美的解决了,心里还是挺美滋滋的,程序猿随之而来的成就感。
  别看我们在工位上默不作声,我们可能在拯救世界呢!老板,该加工资啦!
  补充
  鉴于大伙私信我想看看原来的bug修复后的情况,毕竟是公司代码不适合完全开源,我单独把例子源码做过修改的发布出来,思路都差不多的,对比一下文章中原来的有问题的代码就可以了吧。因为都已经修复掉了,修改后的在这里:https://github.com/EminemJK/MessageLib.Core

     上文内容不用于商业目的,如涉及知识产权问题,请权利人联系博为峰小编(021-64471599-8017),我们将立即处理
《2023软件测试行业现状调查报告》独家发布~

关注51Testing

联系我们

快捷面板 站点地图 联系我们 广告服务 关于我们 站长统计 发展历程

法律顾问:上海兰迪律师事务所 项棋律师
版权所有 上海博为峰软件技术股份有限公司 Copyright©51testing.com 2003-2024
投诉及意见反馈:webmaster@51testing.com; 业务联系:service@51testing.com 021-64471599-8017

沪ICP备05003035号

沪公网安备 31010102002173号