队列
前言
Github:https://github.com/HealerJean
1、BlockingQueue
解释:队列不支持空元素
1.1、LinkedBlockingQueue
基于链表
LinkedBlockingQueue
是一个线程安全的阻塞队列(使用了lock
锁机制),实现了先进先出等特性,是作为生产者消费者的首选,可以指定容量,也可以不指定,不指定的话默认最大是Integer.MAX_VALUE
1.1.1、放入元素
1.1.1.1、#add(anObject)
: 能放就放不能放就生气了
把
Object
添加到BlockingQueue
里,添加成功返回true,如果BlockingQueue空间已满则抛出异常。
1.1.1.2、#offer(anObject)
: 能放就放,不能放拉倒,返回false
表示如果可能的话,将
Object
加到BlockingQueue
里,即如果BlockingQueue
可以容纳,则返回true,否则返回false。
1.1.1.3、#put(anObject
: 我脾气好,我等还不行
把
Object
加到BlockingQueue
里,如果BlockingQueue
没有空间,则调用此方法的线程被阻断直到BlockingQueue
里有空间再继续。
1.1.2、取出元素
1.1.2.1、#poll(time)
: 按照一段时间取出队头
获取并移除此队列的头,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
1.1.2.2、 #take()
:我脾气好,我等还不行
获取
BlockingQueue
里排在首位的对象,若BlockingQueue
为空,阻断进入等待状态直到BlockingQueue
有新的对象被加入为止。
1.1.2.3、#clear()
: 清除整个队列
从队列彻底移除所有元素。
1.1.2.4、#remove()
把头砍掉
方法直接删除队头的元素
1.1.2.5、#peek()
: 查看队头,但是不删除
方法直接取出队头的元素,并不删除
1.3、PriorityBlockingQueue
1、优先队列特性
1、基于数组实现的,默认长度是 11,真正的无界队列,它在队列满的时候会进行扩容,而前面说的无界阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647)
2、权重队列,可以理解为它可以进行排序,但是排序不是从小到大排或从大到小排,是基于数组的堆结构
完全二叉堆
,内部是平衡二叉树堆的实现(堆又分为 大顶堆 和 小顶堆。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的,这里是小顶堆,)3、出队方式和前面的也不同,是根据权重进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。
PriorityBlockingQueue
真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。
二叉堆用数组表示的父子节点关系如下
leftNo = parentNo*2+1
rightNo = parentNo*2+2
parentNo = (nodeNo-1)/2
1.3.1、入队图解
入队元素:[10,20,5,1,21,30]
队列初始化时:
这时,我们开始将元素 元素 10 入队,并用二叉树辅助理解:
我们在将元素 20 入队:
将元素 5 入队后发现父节点大于子节点,这时需要进行 上浮调整
开始进行 上浮调整,将元素 10 和元素 5进行位置调换,结果如下:
接着将元素 1 入队后发现父节点大于子节点,继续进行调整:
第一次调整将元素 20 和元素 1 进行位置交换,交换完毕后结果如下:
交换完毕后,我们发现父节点的元素值还是大于子节点,说明还需要进行一次交换,最后交换结果如下:
接下来将元素 25 和 30 入队,结果如下:
1.3.2、出队图解
队列的初始化状态如下:
这时,我们需要从队列中取出第一个元素 1,元素 1 取出时会与队列中最后一个元素进行交换,并将最后一个元素置空。(实际上源码不是这么做的,源代码中是用变量来保存索引,直到全部 下沉调整 完成才进行替换)
替换后,结果就如下图显示一样。我们发现父节点大于子节点了,所以还需要再一次进行替换操作。
再一次替换后,将元素 30 下沉到下一个左边子节点,子节点上浮到原父节点位置。这就完成了下沉调整了。
1.3.3、涉及算法
1.3.3.1、10亿个数中找出最大的k个数
一个较好的方法:先拿出10000个建立小根堆,对于剩下的元素,如果大于堆顶元素的值,删除堆顶元素,再进行插入操作,否则直接跳过,这样知道所有元素遍历完,堆中的10000个就是最大的10000个。时间复杂度: m + (n-1)logm = O(nlogm)
方法1:局部淘汰法
第二种方法为局部淘汰法,该方法与排序方法类似,用一个容器保存前10000个数,然后将剩余的所有数字——与容器内的最小数字相比,如果所有后续的元素都比容器内的10000个数还小,那么容器内这个10000个数就是最大10000个数。如果某一后续元素比容器内最小数字大,则删掉容器内最小元素,并将该元素插入容器,最后遍历完这1亿个数,得到的结果容器中保存的数即为最终结果了。
此时的时间复杂度为O(n+m^2),其中m为容器的大小,即10000。
方法2:最小堆
第五种方法采用最小堆。首先读入前10000个数来创建大小为10000的最小堆,建堆的时间复杂度为O(mlogm)(m为数组的大小即为10000)
然后遍历后续的数字,并于堆顶(最小)数字进行比较。如果比最小的数小,则继续读取后续数字;如果比堆顶数字大,则替换堆顶元素并重新调整堆为最小堆。整个过程直至1亿个数全部遍历完为止。然后按照中序遍历的方式输出当前堆中的所有10000个数字。该算法的时间复杂度为O(nmlogm),空间复杂度是10000(常数)。
1.3.3.2、10亿个数中找出第k大个数
方法一:排序
可以使用排序算法对原数组进行排序,然后取出其下标为
k
的数即为第k
大数。
方法:最小堆
根据原数组建小根堆,然后依次弹出
k
次堆顶,其第·k
次弹出的数即为第k
大数
1.3、ArrayBlockingQueue
根据
ArrayBlockingQueue
的名字我们都可以看出,它是一个队列,并且是一个基于数组的阻塞队列。1、先进先出队列(队列头的是最先进队的元素;队列尾的是最后进队的元素)
2、有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)
1.4、SynchronousQueue
不存储元素的阻塞队列 ,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
2、延迟队列
什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等
2.1、DelayQueue
JDK
中提供了一组实现延迟队列的API
,位于Java.util.concurrent
包下DelayQueue
。
DelayQueue
是一个BlockingQueue
(无界阻塞)队列,它本质就是封装了一个PriorityQueue
(优先队列),PriorityQueue
内部使用完全二叉堆
(来实现队列元素排序,队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。我们在向
DelayQueue
队列中添加元素时,会给元素一个Delay
(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay
时间才允许从队列中取出。
先简单实现一下看看效果,添加三个order
入队DelayQueue
,分别设置订单在当前时间的5秒
、10秒
、15秒
后取消。
要实现DelayQueue
延时队列,队中元素要implements
Delayed
接口,这个接口里只有一个getDelay
方法,用于设置延期时间。Order
类中compareTo
方法负责对队列中的元素进行排序。
2.1.1、实例代码
public class Order implements Delayed {
/** 延迟时间 */
private long delayTime;
String name;
public Order(String name, long delay, TimeUnit unit) {
this.name = name;
this.delayTime = System.currentTimeMillis() + (delay > 0 ? unit.toMillis(delay) : 0);
}
/** 用于设置延期时间 */
@Override
public long getDelay(TimeUnit unit) {
return delayTime - System.currentTimeMillis();
}
/** 方法负责对队列中的元素进行排序 */
@Override
public int compareTo(Delayed o) {
Order order = (Order) o;
long diff = this.delayTime - order.delayTime;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
public class TestMain {
@Test
public void test() throws InterruptedException {
Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);
System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (true){
Order task = delayQueue.take();
System.out.format("订单:task.name被取消, 取消时间:LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))", , );
}
}
}
2.1.1.1、控制台日志
订单延迟队列开始时间:2020-12-08 14:20:38
订单:{Order1}被取消, 取消时间:{2020-12-08 14:20:43}
订单:{Order2}被取消, 取消时间:{2020-12-08 14:20:48}
订单:{Order3}被取消, 取消时间:{2020-12-08 14:20:53}
2.2、Quartz
定时任务
Quartz
一款非常经典任务调度框架,在Redis
、RabbitMQ
还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。定时任务它有一定的周期性,可能很多单子已经超时,但还没到达触发执行的时间点,那么就会造成订单处理的不够及时。
2.3、Redis Sorted set
Redis
的数据结构Zset
,同样可以实现延迟队列的效果,主要利用它的score
属性,redis
通过score
来为集合中的成员进行从小到大的排序。
通过 zadd
命令向队列delayqueue
中添加元素,并设置score
值表示元素过期的时间;向delayqueue
添加三个order1
、order2
、order3
,分别是10秒
、20秒
、30秒
后过期。
消费端轮询队列delayqueue
, 将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key
。
/**
* 消费消息
*/
public void pollOrderQueue() {
while (true) {
Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
String value = ((Tuple) set.toArray()[0]).getElement();
int score = (int) ((Tuple) set.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if (nowSecond >= score) {
jedis.zrem(DELAY_QUEUE, value);
System.out.println(sdf.format(new Date()) + " removed key:" + value);
}
if (jedis.zcard(DELAY_QUEUE) <= 0) {
System.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
Thread.sleep(1000);
}
}
2.4、Redis
过期回调
Redis
的key
过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback
事件。修改
redis.conf
文件开启notify-keyspace-events Ex
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
编写 Redis
过期回调监听方法,必须继承 KeyExpirationEventMessageListener
,有点类似于 MQ
的消息监听。
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
System.out.println("监听到key:" + expiredKey + "已过期");
}
}
2.5、MQ
延迟队列
public class Producer {
/**
* 队列的名称
*/
public static final String QUEUE_NAME = "queue";
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMqConstant.USERNAME,
ActiveMqConstant.PASSWORD,
ActiveMqConstant.BROKER_URL);
try {
// 构造从工厂得到连接对象
Connection connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接,一个发送或接收消息的线程
Session session = connection.createSession(
Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地;消息发送给谁.
Destination destination = session.createQueue(QUEUE_NAME);
// 根据目的地获取一个生产者
MessageProducer producer = session.createProducer(destination);
//构造消息
//1 、创建TextMessage
sendTextMessage(session, producer);
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
//延迟3秒
Long delay = 3L * 1000L;
//周期3秒
Long period = 3L * 1000L;
//重复5次 消费者受到的为 1(不重复的) + 5 = 6
int repeat = 5;
TextMessage message = session.createTextMessage("MESSAGE消息");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
}
}
2.6、时间轮
前边几种延时队列的实现方法相对简单,比较容易理解,时间轮算法就稍微有点抽象了。
kafka
、netty
都有基于时间轮算法实现延时队列,下边主要实践Netty
的延时队列讲一下时间轮是什么原理。先来看一张时间轮的原理图,解读一下时间轮的几个基本概念
wheel
:时间轮,图中的圆盘可以看作是钟表的刻度。比如一圈 round
长度为24秒
,刻度数为 8
,那么每一个刻度表示 3秒
。那么时间精度就是 3秒
。时间长度 / 刻度数值越大,精度越大。
当添加一个定时、延时任务A
,假如会延迟25秒
后才会执行,可时间轮一圈round
的长度才24秒
,那么此时会根据时间轮长度和刻度得到一个圈数 round
和对应的指针位置 index
,也是就 任务A
会绕一圈指向 0
格子上,此时时间轮会记录该任务的round
和 index
信息。当 round = 0
,index=0
,指针指向0
格子 任务A
并不会执行,因为 round = 0
不满足要求。
所以每一个格子代表的是一些时间,比如1秒
和25秒
都会指向 0
格子上,而任务则放在每个格子对应的链表中,这点和HashMap
的数据有些类似。
Netty
构建延时队列主要用 HashedWheelTimer
,它底层数据结构依然是使用 DelayedQueue
,只是采用时间轮的算法来实现。
下面我们用Netty
简单实现延时队列,HashedWheelTimer
构造函数比较多,解释一下各参数的含义。
/**
`ThreadFactory` :表示用于生成工作线程,一般采用线程池;
`tickDuration`和`unit`:每格的时间间隔,默认100ms;
`ticksPerWheel`:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化`hash`值的计算。
*/
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
Timer
:是HashedWheelTimer
实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。
TimerTask
:一个定时任务的实现接口,其中 run
方法包装了定时任务的逻辑。
Timeout
:一个定时任务提交到Timer
之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。
public class NettyDelayQueue {
public static void main(String[] args) {
final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
//定时任务
TimerTask task1 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order1 5s 后执行 ");
timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
}
};
timer.newTimeout(task1, 5, TimeUnit.SECONDS);
TimerTask task2 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order2 10s 后执行");
timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
}
};
timer.newTimeout(task2, 10, TimeUnit.SECONDS);
//延迟任务
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order3 15s 后执行一次");
}
}, 15, TimeUnit.SECONDS);
}
}
从执行的结果看,order3
、order3
延时任务只执行了一次,而order2
、order1
为定时任务,按照不同的周期重复执行。
order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行