首页技术文章正文

RabbitMQ消息中间件常见问【黑马java培训】

更新时间:2019年07月26日 11时32分50秒 来源:黑马程序员论坛

RabbitMQ (有几种消息模式自己去看,这里不做解释)


1. 使用RabbitMQ的一些经验
1. consume时预取参数的大小对consume性能影响很大,具体可参见官方博客
2. 队列HA的代价非常高,特别是对带宽的占用,有限制的使用HA,且只提供两备份即可;
3. 磁盘也可能形成瓶颈,如果单台机器队列很多,确认只在必要时才使用duration,避免把磁盘跑满;
4. 队列的消息大量累积后,发送和消费速度都会受到影响,导致服务进一步恶化,采用的方法是,额外的脚本监控每个队列的消息数,超过限额会执行purge操作,简单粗暴但是有效的保证了服务稳定;
5. 限制单条消息大小,我们的限制是128k,消息队列只走消息,其他交由存储去做;
6. iptables适当的限制连接;

2. RabbitMQ队列超长导致QueueingConsumerJVM溢出

我们的服务器使用RabbitMQ作为消息中转的容器,一般RabbitMQ进程占用内存不过100M-200M,这些队列超长的RabbitMQ进程可以占用超过2G的内存。
显然消息队列的消费者出现了问题。开发查看日志发现作为该队列消费者的Java服务的日志也卡住了,重启服务后(这点做得不对,应该用jstatjstack进行排查,而不是直接重启)又很快卡住。
通过jstat发现JVM内存都耗尽了,之后进入无尽的Full GC,所以当然不会处理队列消息和输出日志信息了。
使用jmap导出这时候的Java堆栈,命令:jmap -dump:format=b,file=29389.hprof  29389。将得到的dump文件放到MATEclipse Memory Analyzer)里进行分析,发现很明显是QueueingConsumer持有大量对象导致JVM内存溢出
解决方案将basicQos设置为16后重启服务,队列终于开始消化了。用jstat 观察JVM内存使用情况,也没有再出现剧增溢出的现象。
      
总结:使用RabbitMQ的时候,一定要合理地设置QoS参数。RabbitMQ的默认做法其实是很脆弱的,容易导致雪崩。

3. RabbitMQ数据速率问题
在边读边写的情况下:速率只与网络带宽正相关,网络使用率最高能达到接近100%,并且数据使用率很高(90%以上)。
        在千兆网下,以500KB一条数据为例,读写速率均能达到200/s,约为100MB/s
在只写不读的情况下:写入速率瓶颈在于硬盘写入速度。

4. RabbitMQ数据存储路径变更到D盘方法
Windows环境下,在安装前设置环境变量:RABBITMQ_BASE=D:\RabbitMQ_Data

5. RabbitMQ磁盘写满重启后数据丢失问题
表现:磁盘写满后发送、读取程序均不能连接服务。
解决方法:将QueueExchange设置为Durable即不会发生数据丢失问题。
通过a.关闭服务;b.删除占位文件、erl_crash.dumpc.重启服务 三步操作后,磁盘会清理出10M左右空间,此时读取数据程序便可正常工作。
正确设计的架构,应确保RabbitMQ不会发生磁盘写满崩溃的情况。

6. RabbitMQ集群
在网络带宽占满的情况下,通过集群的方式解决吞吐量不足的问题需要多台效果才明显。
假设外设吞吐率为d/s,外设
RabbitMQ1发送的概率为r1
RabbitMQ2发送的概率为r2
RabbitMQ1需要向RabbitMQ2转发的概率为r3
RabbitMQ2需要向RabbitMQ1转发的概率为r3
那么RabbitMQ1进入的吞吐率为:(r1*d + r4*r2*d) /s 3d/4/s
RabbitMQ2进入的吞吐率为:(r2*d + r3*r1*d) /s 3d/4/s
这样的确比只使用一台RabbitMQ的吞吐率d/s要求低些。
NRabbitMQ的集群,每台的平均吞吐率为:(2N-1)d/(N*N) /sN=3时,平均吞吐率为5d/9/sN=4时,平均吞吐率为7d/16/s

解决方法:多台RabbitMQ服务器提供服务,在客户端以轮循方式访问服务,若1down掉则不使用此台的队列服务,服务器之间没有联系,这样NRabbitMQ的平均吞吐率为:1d/N /s。具体实现可以,专写一个用户收发RabbitMQ消息的jar/dll,在配置文件里填写RabbitMQ机器地址,使用轮循询问、收发的方式,提供给应用程序以黑盒方式调用。
下面提供了java版本的收发实现。

发送端sender.xml配置:
<!-- 处理器相关 -->
    <bean id="sender" class="demo.Sender">
        <property name="templates">
            <list>
                <ref bean="template1" />
            <!--     <ref bean="template2" /> -->
            </list>
        </property>
    </bean>
    <bean id="timeFlicker" class="demo.TimeFlicker">
        <property name="handlers">
            <list>
                <ref bean="sender" />
            </list>
        </property>
    </bean>
    <!-- 处理器相关 -->
    <!-- amqp配置 相关 -->
    <rabbit:connection-factory id="connectionFactory1"
        host="192.1.11.108" username="guest" password="guest" virtual-host="/" />
    <rabbit:connection-factory id="connectionFactory2"
        host="192.1.11.172" username="guest" password="guest" virtual-host="/" />
    <!-- amqp配置 相关 -->
    <!-- 发送相关 -->
    <rabbit:template id="template1" connection-factory="connectionFactory1"
        exchange="exchange" />
    <rabbit:template id="template2" connection-factory="connectionFactory2"
        exchange="exchange" />
    <!-- 发送相关 -->
                  说明:这里配置了两个RabbitMQ服务器,timeFlicker的目的是过一段时间把不能服务的RabbitMQ服务器重新添加到列表中,重试发送。
                  接收端receiver.xml配置:
<!-- amqp配置 相关 -->
    <rabbit:connection-factory id="connectionFactory1"
        host="192.1.11.108" username="guest" password="guest" virtual-host="/" />
    <rabbit:connection-factory id="connectionFactory2"
        host="192.1.11.172" username="guest" password="guest" virtual-host="/" />
    <!-- amqp配置 相关 -->
    <!-- 监听相关 -->
    <bean id="Recv1" class="demo.Recv1" />
    <rabbit:listener-container id="Listener1"
        connection-factory="connectionFactory1" prefetch="1" acknowledge="auto">
        <rabbit:listener ref="Recv1" method="listen"
            queue-names="queue1" />
    </rabbit:listener-container>
    <bean id="Recv2" class="demo.Recv2" />
    <rabbit:listener-container id="Listener"
        connection-factory="connectionFactory1" prefetch="1" acknowledge="auto">
        <rabbit:listener ref="Recv2" method="listen"
            queue-names="queue2" />
    </rabbit:listener-container>
    <!-- 监听相关 -->
MQ丢数据的问题,
主要是同步还是异步刷盘、断电是否导致的。只要send反馈正确,确保发送被接收,receive时有反馈后才会删除数据;同步刷盘,或异步刷盘不断电的,就不会丢失消息,
程序对于发送反馈异常的,要记录;MQ对于receive无反馈的,有重发机制,可能会有一条数据发送多次的情况,要在程序中剔除。

7. RabbitMQ 常用命令
1. centos安装epel yum 源   
# rpm -ivh  
http://dl.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
2. 安装erlang运行环境   
# yum install erlang
3. 安装rabbitmq server
# rpm --import
http://www.rabbitmq.com/rabbitmq-signing-key-public.asc  
# rpm -ivh  http://www.rabbitmq.com/releases ... -3.0.0-1.noarch.rpm
5. 打开server  
  # chkconfig rabbitmq-server on  
# rabbitmqctl status
会报异常:  
# rabbitmqctl status  
Status of node rabbit@devnote ...  
Error: unable to connect to node rabbit@devnote: nodedown   
DIAGNOSTICS
===========   
nodes in question: [rabbit@devnote]   
hosts, their running nodes and ports:
- devnote: [{rabbitmqctl24923,51045}]  
current node details:  
- node name: rabbitmqctl24923@devnote
- home dir: /var/lib/rabbitmq  
- cookie hash: TblHThacrBHJzl5Vt7Y4Ww==
执行命令:  
# /sbin/service rabbitmq-server stop
# /sbin/service rabbitmq-server start
# rabbitmqctl status 测试正确   
查看所有队列信息  # rabbitmqctl list_queues
关闭应用  # rabbitmqctl stop_app  
启动应用,和上述关闭命令配合使用,达到清空队列的目的
# rabbitmqctl start_app
清除所有队列  
# rabbitmqctl reset  
更多用法及参数,可以执行如下命令查看
# rabbitmqctl  
1)首先关闭rabbitmq: rabbitmqctl stop_app
(2)还原: rabbitmqctl reset
3)启动: rabbitmqctl start_app  
(3)添加用户: rabbitmqctl add_user root root  
(4)设置权限:rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
(6)查看用户: rabbitmqctl list_users
8. RabbitMQ(消息中间件)在工作中的应用场景有如下几种:
  1、跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。
  2、多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和接收方同时在线。在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。
  3、应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。
  4、消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段处理的结果放入队列,后一个阶段从队列中获取消息继续处理。
  5、应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。
  6、跨局域网,甚至跨城市的通讯(CDN行业),比如北京机房与广州机房的应用程序的通信。
  这里还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能
9. RabbitMQ解释
1.Broker
简单来说就是消息队列服务器的实体。
2.Exchange
接收消息,转发消息到绑定的队列上,指定消息按什么规则,路由到哪个队列。
3.Queue
消息队列载体,用来存储消息,相同属性的 queue 可以重复定义,每个消息都会被投入到一个或多个队列。
4.Binding
绑定,它的作用就是把 Exchange Queue 按照路由规则绑定起来。
5.RoutingKey
路由关键字,Exchange 根据这个关键字进行消息投递。
6.Producter
消息生产者,产生消息的程序。
7.Consumer
消息消费者,接收消息的程序。
8.Channel
消息通道,在客户端的每个连接里可建立多个 Channel,每个 channel 代表一个会话。

推荐了解热门学科

java培训 Python人工智能 Web前端培训 PHP培训
区块链培训 影视制作培训 C++培训 产品经理培训
UI设计培训 新媒体培训 产品经理培训 Linux运维
大数据培训 智能机器人软件开发




传智播客是一家致力于培养高素质软件开发人才的科技公司“黑马程序员”是传智播客旗下高端IT教育品牌。自“黑马程序员”成立以来,教学研发团队一直致力于打造精品课程资源,不断在产、学、研3个层面创新自己的执教理念与教学方针,并集中“黑马程序员”的优势力量,针对性地出版了计算机系列教材50多册,制作教学视频数+套,发表各类技术文章数百篇。

传智播客从未停止思考

传智播客副总裁毕向东在2019IT培训行业变革大会提到,“传智播客意识到企业的用人需求已经从初级程序员升级到中高级程序员,具备多领域、多行业项目经验的人才成为企业用人的首选。”

中级程序员和初级程序员的差别在哪里?
项目经验。毕向东表示,“中级程序员和初级程序员最大的差别在于中级程序员比初级程序员多了三四年的工作经验,从而多出了更多的项目经验。“为此,传智播客研究院引进曾在知名IT企业如阿里、IBM就职的高级技术专家,集中研发面向中高级程序员的课程,用以满足企业用人需求,尽快补全IT行业所需的人才缺口。

何为中高级程序员课程?

传智播客进行了定义。中高级程序员课程,是在当前主流的初级程序员课程的基础上,增加多领域多行业的含金量项目,从技术的广度和深度上进行拓展“我们希望用5年的时间,打造上百个高含金量的项目,覆盖主流的32个行业。”传智播客课程研发总监于洋表示。




黑马程序员热门视频教程

Python入门教程完整版(懂中文就能学会) 零起点打开Java世界的大门
C++| 匠心之作 从0到1入门学编程 PHP|零基础入门开发者编程核心技术
Web前端入门教程_Web前端html+css+JavaScript 软件测试入门到精通


在线咨询 我要报名
和我们在线交谈!