前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、回顾消息队列笔记之使用场景:

消息发布者:只管将消息发布到消息队列中而不需要管谁来获取。

消息使用者:只管从消息队列中取消息,而不管是谁发布的。这样发布者和使用者都不需要知道对方的存在

1.1、使用场景

1.1.1、流量削峰

比如:秒杀。

场景:如果是秒杀,顾名思义,就是非常快速,很多了来抢了,这个时候流量会猛增。而且很多请求都需要服务器处理逻辑, 为了防止服务器挂掉,我们在加入消息队列

为什么这么做就能缓解呢,多加几个服务器不就好了。

多加几个服务器固然很好,但是上亿的访问,上亿的线程,怎么缓解,难道要一直买服务器吗(而且用户每个线程都要进行处理),哈,相信没有公司愿意为了偶尔的秒杀买想象不到的服务器。所以呢,消息队列就能派上用场了

​ 1、用户请求来了后,服务器接收了,先写入消息队列。如果说超过消息队列的长度或者超过我们这次秒杀活动的最大数量,就抛弃用户或者告诉用户已经秒杀完毕了

2、消息队列中已经有了用户的请求信息,我们可以慢慢处理

1564480177803

1.1.2、解耦

开启一个业务节点端进行处理消息,

举例:签到后,送积分 。从业务上来说,签到后我们一定要保证用户的签到正常,而积分的赠送我们可以延迟给用户传递,但是最终我们一定要保证签到后给积分,只是延迟而已。

某一时刻积分服务不正常。不会影响到用户签到,专业积分服务即使报错了,我们后台可以自动重试,保证积分顺利送出,而不出现异常

1564479935871

### 1.1.3、日志记录

相信没有人认为日志是不重要的,在我们已知的日志框架中,有可能会以为写入日志时的某些故障而导致业务系统访问阻塞,请求延迟,所以我们可以构建一个日志系统,用来提供分析数据

1.1.4、事务一致性

举个例子,我正在开发的小米供应链金融,我们有用户的金额信息,我们也有个业务系统是账户系统,它里面也有余额信息,涉及到钱,我们就必须保证二者是一致的, 那么这个时候我们就可以使用消息队列来进行处理

2、消息队列功能特点以及解决的问题

1566902171300

Producer 消息生产者,负责产生和发送消息到消息处理中心

Broker 消息处理中心,负责消息接收、存储、转发

Consumer 消息消费者,负责从消息处理中心获取消息,并进行处理

生产环境中,对于消息队列的要求可不止基本的消息发送、暂存、接收,在不同的业务中,需要消息队列产品能解决 消息堆积、消息持久化、消息重复、严格有序等各种问题

2.1、消息堆积

最开始介绍过,消息队列的生产者和消费者是分开处理消息的系统,所以我们无法预知两者处理消息速度的快慢,如果一段时间内消息消费者处理消息的速度太慢,必然会导致消息堆积,

因此有时候需要给消息队列设一个阈值,超过的不再放入消息处理中心,以防止系统资源耗尽,导致机器挂掉,整个消息队列不可用

2.1.1、消息过期

默认情况下,activemq永远不会过期,如果业务场景需要,可以指定过期时间,如果下面这个设置为0

MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(1000L); //每一条都设置过期时间


//单独设置过期时间
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive);

2.1.2、死信队列 (阅读删除积压已久的缓冲区)

2.1.3、慢消费者策略设置

如果慢速消费者最后一个ACK距离现在的时间间隔超过阀maxTimeSinceLastAck,则中断慢速消费者。

Broker将会启动一个后台线程用来检测所有的慢速消费者, Broker端一旦发现slow consumer,就将它注册到慢速消费者列表中,此后将有额外的线程扫描并关闭它们(定期关闭它们)

其中abortConnection表示,是否关闭底层的transport

默认为false,此时将会通过transport向client端发送一个指令(其中包括consumerId),当client端(Session)接收之后,将会调用consumer.close()方法;

如果此值为true,将会导致底层的transport链接被关闭,这是很粗暴的办法,不过如果client端多个consumer共享一个connection的话,会导致所有的consumer被关闭,那就等老板开你把

  • AbortSlowConsumerStrategy, 如果慢速消费者最后一个ACK距离现在的时间间隔超过阀值,则中断慢速消费者
<slowConsumerStrategy>    
    <abortSlowConsumerStrategy abortConnection="false"  maxTimeSinceLastAck="30000"/><!-- 30秒滞后 -->    
</slowConsumerStrategy>  

2.2、消息持久化

有时候我们 需要在合适的时机传递给消费者,内存肯定是不可以的,因为一旦机器挂掉消息就会丢失,所以我们可以放到本地文件,分布式文件系统,数据库系统中。这样就可以持久化了

2.3、可靠投递( 消息丢失,分布式那一篇文章解决这个问题)

这个去年面试官问过我,当时回答的不是很好,可靠投递是不允许存在消息丢失的情况,从消息的整个生命周期来看,消息丢失我们可以分析下存在于那个阶段

1、从生产者到消息处理中心

2、消息处理中心持久化消息

3、从消息处理中心到消费者

2.4、消息重复

在接收到消息后先持久化到数据库,然后放到消息队列,再发送给消费者,当消息发送失败或者不知道是否发送成功时,消息的状态是待发送,定时任务不停读取待发送的消息,然后发送消息,最终保证消息不会丢失,

但是这样有一个问题,虽然超时了,但是消费者毕竟接收到了。而我这里因为超时却记录到了。这就是消息重复,怎么处理呢,一个编号搞定

2.5、严格有序

比如,淘宝购物,每一笔订单必须经过创建订单,支付完成,已发货,已收货,订单完成等环节,如果说每个环节都是消息的话,我们必须按照顺序消费消息,否则在业务上是不合理的,我们不允许顺序有误

2.6、集群

大型系统中,系统一般是集群的,集群可以让消费者和生产者在某个节点崩溃的情况下继续运行

2.7、消息中间件

上面介绍了很多,那什么是消息中间件 ?

消息中间件关注数据的发送和接收,利用高效,可高的异步消息传递机制集成分布式系统。不是直接给最终用户使用的,不能直接给用户带来价值的软件称为中间件,

3、Queue 和 Topic

JMS中定义了两种消息模型

点对点(point to point, queue):不可重复消费,默认是持久化的

发布/订阅(publish/subscribe,topic) : 重复消费,默认是非持久的(要求消费者必须在线)

这里重复消费的意思是,可以多次读取,也就是同一个消息被多个消费者读取。而不是消息签收

3.1、Queue

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息,消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时,消息会一直保存,直到有消费消费

3.2、Topic

消息生产者发布将消息发布到topic中同时有多个消息消费者订阅消费该消息和点对点方式不同发布到topic的消息会被所有订阅者消费

非持久化当生产者发布消息不管是否有消费者都不会保存消息
持久化会保存消息等待消息超时删除

ContactAuthor