日志

RabbitMQ消息积压的几种解决思路

 来源    2020-09-16    1  

在日常工作中使用RabbitMQ偶尔会遇不可预料的情况导致的消息积压,一般出现消息积压基本上分为几种情况:

  1. 消费者消费消息的速度赶不上生产速度,这总问题主要是业务逻辑没设计好消费者和生产者之间的平衡,需要改业务流程或逻辑已保证消费度跟上生产消息的速,譬如增加消费者的数量等。

  2. 消费者出现异常,导致一直无法接收新的消息,这种问题需要排查消费的逻辑是不是又问题,需要优化程序。

除了上面的者两种问题,还有一些其他情况会导致消息积压,譬如一些系统是无法预计成产消息的速度和频率,又或者消费者的速度已经被限制,不能通过加新的消费者来解决,譬如不同的系统间的API对接,对接那一方就做了请求频率的限制,或者对方系统承受不了太大的并发,还有一些系统如果是面对企业客户,譬如电商,物流,仓储等类似平台系统的客户的下单是没有规律的或者集中某一个时间段下单的,这种就不能简单的通过加消费者来解决,就需要分析具体业务来避免消息积压。

针对这种情况,我想到了4中解决思路:

  1. 拆分MQ,生产者一个MQ,消费者一个MQ,写一个程序监听生产者的MQ模拟消费速度(譬如线程休眠),然后发送到消费者的MQ,如果消息积压则只需要处理生产者的MQ的积压消息,不影响消费者MQ

  2. 拆分MQ,生产者一个MQ,消费者一个MQ,写一个程序监听生产者的MQ,定义一个全局静态变量记录上一次消费的时间,如果上一次时间和当前时间只差小于消费者的处理时间,则发送到一个延迟队列(可以使用死信队列实现)发送到消费者的MQ,如果消息积压则只需要处理生产者的MQ的积压消息,不影响消费者MQ

  3. 使用Redis的List或ZSET做接收消息缓存,写一个程序按照消费者处理时间定时从Redis取消息发送到MQ

  4. 设置消息过期时间,过期后转入死信队列,写一个程序处理死信消息(重新如队列或者即使处理或记录到数据库延后处理)

其中使用延时队列会相对来说逻辑简单,业务逻辑变更也不大,在RabbitMQ中,可使用死信来及延时队列插件rabbitmq_delayed_message_exchange两种方式实现延时队列。
使用插件可以在官网找到:https://www.rabbitmq.com/community-plugins.html

插件的安装及使用方式就不做介绍了,主要介绍下使用死信来实现延时队列,原理就是将消息发送到一个死信队列,并设置过期时间,过期后将死信转发到要处理的消息队列。
生产者相关代码:

/// <summary>
        /// 发送延时队列消息
        /// </summary>
        /// <param name="message"></param>
        /// <param name="queueName"></param>
        /// <param name="prefetchCount">默认20</param>
        public void SendDelayQueues(string message, string queueName,double delayMilliseconds,string beDeadLetterPrefix="beDeadLetter_")
        {
            #region 死信到期后转入的交换机及队列
            //死信转入新的队列的路由键(消费者使用的路由键)
            var routingKey = queueName;
            var exchangeName = queueName;
            //定义队列
            Channel.QueueDeclare(queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);
            //定义交换机
            Channel.ExchangeDeclare(exchange: exchangeName,
                type: "direct");
            //队列绑定到交换机
            Channel.QueueBind(queue: queueName,
                exchange: exchangeName,
                routingKey: routingKey);
            #endregion

            //将变成死信的队列名
            var beDeadLetterQueueName = beDeadLetterPrefix + queueName;
            //将变成死信的交换机名
            var beDeadLetterExchangeName = beDeadLetterPrefix + queueName;

            //定义一个有延迟的交换机来做死信(该消息不能有消费者,不然无法变成死信)
            Channel.ExchangeDeclare(exchange:beDeadLetterExchangeName ,
                type: "direct");
            
            //定义该延迟消息过期变成死信后转入的交换机(消费者需要绑定的交换机)
            //Channel.ExchangeDeclare(exchange: queueName,type: "direct");

            var dic = new Dictionary<string, object>();
            //dic.Add("x-expires", 30000);
            //dic.Add("x-message-ttl", 12000);//队列上消息过期时间,应小于队列过期时间  
            dic.Add("x-dead-letter-exchange", queueName);//变成死信后转向的交换机
            dic.Add("x-dead-letter-routing-key",routingKey);//变成死信后转向的路由键
            //定义将变成死信的队列
            Channel.QueueDeclare(queue: beDeadLetterQueueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: dic);

            //队列绑定到交换机
            Channel.QueueBind(queue: beDeadLetterQueueName,
                exchange: beDeadLetterExchangeName,
                routingKey: routingKey);

            //不要同时给一个消费者推送多于prefetchCount个消息, ushort prefetchCount = 20
            //Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
            var body = Encoding.UTF8.GetBytes(message);
            var properties = Channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.DeliveryMode = 2;//持久化消息
            //过期时间
            properties.Expiration = delayMilliseconds.ToString();
            Channel.BasicPublish(exchange: beDeadLetterExchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);
        }

消费者相关代码:

/// <summary>
        /// 设置延迟队列接收的事件
        /// </summary>
        /// <param name="action"></param>
        /// <param name="queueName"></param>
        /// <param name="prefetchCount">默认1</param>
        /// <param name="autoAck"></param>
        /// <param name="consumerCount"></param>
        public void SetDelayQueuesReceivedAction(Action<string> action, string queueName, ushort prefetchCount = 1,
            bool autoAck = false, int consumerCount = 1)
        {
            if (prefetchCount < 1)
            {
                throw new Exception("consumerCount must be greater than 1 !");
            }

            var exchangeName = queueName;
            var routingKey = queueName;
            for (int i = 0; i < consumerCount; i++)
            {
                var Channel = Connection.CreateModel();
                //定义队列
                Channel.QueueDeclare(queue: queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
                //定义交换机
                Channel.ExchangeDeclare(exchange: exchangeName,
                    type: "direct");
                //队列绑定到交换机
                Channel.QueueBind(queue: queueName,
                    exchange: exchangeName,
                    routingKey: routingKey);
                //不要同时给一个消费者推送多于prefetchCount个消息
                Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
                ChannelList.Add(Channel);
                var consumer = new EventingBasicConsumer(Channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    //Console.WriteLine("处理消费者ConsumerTag:" + ea.ConsumerTag);
                    action(message);
                    //手动确认消息应答
                    Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                //autoACK自动消息应答设置为false
                Channel.BasicConsume(queue: queueName, autoAck: autoAck, consumer: consumer);
            }
        }

完整代码实现放到了Github:https://github.com/tanyongzheng/TZ.RabbitMQ

Rabbitmq消息积压清理
日志#!/bin/bash QUE=`rabbitmqctl list_queues messages_ready name durable|grep -v "^Listing" |g ...
1
mycat登录报错Host 'XXX' is blocked because of many connection errors的另一种解决思路
日志报错时机 使用了mycat,而不是单纯使用了mysql. 报错信息 ERROR 1129 (HY000): Host '1.23.22.18' is blocked because of many c ...
1
表单重复提交问题的三种解决思路
日志前端开发中接触的表单提交还是很多的,有时候如果不对提交事件进行处理的话会遇到重复多次提交. 最近开发遇到一个问题,找了挺久才找到原因解决-_-||,表单一直提交两次,以为是双击重复提交了,就在提交之后 ...
关于vuex页面刷新问题的两种解决思路和最终解决方案
日志vuex持久化:https://github.com/zw1371/vuex-persistence 1:设置state为null,然后在对应的getters里面添加sessionStorage控制, ...
1
序列化避免异常的一种解决思路
日志SerializationInfoEnumerator enumerator = info.GetEnumerator(); while (enumerator.MoveNext()) { if (e ...
1
分布式事务的一种解决思路
日志昨晚某技术群里大家热火的在讨论分布式事务的问题,想起了自己前几年由于技术太渣也犯过很多相关错误,现结合自己之前一次BUG案例由感而写此文,希望对看到文章的同学们多少有些帮助(如果发现错误之处,欢迎交流 ...
跨域请求的几种解决思路汇总
日志今天学习了下js的跨域请求,根据曾经用到过的jsonp跨域请求方式,将前辈们用到的以及学习到的原理.思路记录下来,仅供参考.1.通过script标签的方式指定src来实现跨域请求script标签的sr ...
解决RabbitMQ消息丢失问题和保证消息可靠性(一)
日志原文链接(作者一个人):https://juejin.im/post/5d468591f265da03b810427e  工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可 ...
1
RabbitMQ消息队列里积压很多消息
日志1.场景:上千万条消息在mq里积压了几个小时了还没解决 2.解决: 1)先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉2)新建一个topic,partition是原 ...
1
SQL SERVER中的两种常见死锁及解决思路
日志在sql server中,死锁都与一种锁有关,那就是排它锁(x锁).由于在同一时间对同一个数据库资源只能有一个数据库进程可以拥有排它锁.因此,一旦多个进程都需要获取某个或者同一个数据库资源的排它访问权 ...
Spring Boot 之 RabbitMQ 消息队列中间件的三种模式
日志开门见山(文末附有消息队列的几个基本概念) 1.直接模式( Direct)模式  直白的说就是一对一,生产者对应唯一的消费者(当然同一个消费者可以开启多个服务). 虽然使用了自带的交换器(Exchan ...
1
ADO.NET 使用DELETE语句批量删除操作,提示超时,删除失败,几种优化解决思路
日志 起因是如此简单的一句sql DELETE FROM tablename WHERE timekey=20150416 提示:Timeout 时间已到.在操作完成之前超时时间已过或服务器未响应. 提供 ...
1
c# – 如果没有订阅者,RabbitMQ消息会保持多长时间?
问答我正在使用MassTransit和RabbitMQ创建一个简单的发布者/订阅者. Publisher具有以下代码来初始化总线: /** create the bus */ var bus = Bus. ...
1
Python Kombu消费者未通知rabbitmq消息(queue.get确实有效)
问答如果我运行以下代码,则永远不会触发传递给consumer的回调(测试). 但是,如果我密切关注rabbitmq GUI,我确实会看到该消息被检索(但未被确认).所以看起来消费者正在收到消息,但不会将消 ...
2
java – RabbitMQ消息签名
问答我想使用RabbitMQ在多个应用程序之间进行通信,这些应用程序部署在不同的网络上并由不同的人员维护.作为消息(消费者)的接收者,我希望确信消息的发送者(生产者)是他声称的那个人.我能想到的最佳方法是 ...
1
delphi – 是否有一种解决方法可以使类操作符适用于内置类型
问答我可以使用类操作符来使用中间保持记录. 这样我就可以隐式转换内置类型了. program TestNewStringHelper; {$APPTYPE CONSOLE} uses System.Sys ...
2
rabbitmq – 消息代理与MOM(面向消息的中间件)
问答我有点混淆了消息代理之间的区别,例如RabbitMQ和面向消息的中间件.除了维基百科以外,我找不到很多信息.当搜索MOM时,我找到关于AMQP的信息,哪些状态是MOM的协议..这是什么意思?什么是MO ...
1
如何从rabbitmq消息中获取pika python中的basicproperties头字段?
问答def callback(ch, method, properties, body): prop = properties print prop #print prop[1] #print prop[ ...
1
一种解决简单(?)数组问题的算法
问答对于这个问题,速度非常重要.我画了一个很好的图像来更好地解释这个问题.算法需要计算矩形边缘是否在画布的范围内继续,边缘是否与另一个矩形相交? 我们知道: >画布的大小 >每个矩形的大小 & ...
1
c# – 使用Ninject和EasyNetQ / RabbitMQ消息处理程序
问答我正在尝试使用带有Ninject的EasyNetQ来记录消息. 我已经设法将Ninject设置为EasyNetQ DI(我认为),但是当一个消息来到没有无参数构造函数的处理程序时(例如我需要在那里绑定 ...
1