前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、BlockingQueue

解释:队列不支持空元素

1.1、LinkedBlockingQueue

基于链表

LinkedBlockingQueue是一个线程安全的阻塞队列(使用了lock锁机制),实现了先进先出等特性,是作为生产者消费者的首选,可以指定容量,也可以不指定,不指定的话默认最大是Integer.MAX_VALUE

1.1.1、放入元素

1.1.1.1、#add(anObject): 能放就放不能放就生气了

Object添加到BlockingQueue里,添加成功返回true,如果BlockingQueue空间已满则抛出异常。

1.1.1.2、#offer(anObject): 能放就放,不能放拉倒,返回false

表示如果可能的话,将Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。

  

1.1.1.3、#put(anObject: 我脾气好,我等还不行

Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。

1.1.2、取出元素 

1.1.2.1、#poll(time): 按照一段时间取出队头

获取并移除此队列的头,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。

1.1.2.2、 #take():我脾气好,我等还不行
  

获取BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。

1.1.2.3、#clear(): 清除整个队列


从队列彻底移除所有元素。


1.1.2.4、#remove() 把头砍掉

方法直接删除队头的元素

1.1.2.5、#peek(): 查看队头,但是不删除

方法直接取出队头的元素,并不删除

1.3、PriorityBlockingQueue

1、优先队列特性

1、基于数组实现的,默认长度是 11,真正的无界队列,它在队列满的时候会进行扩容,而前面说的无界阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647)

2、权重队列可以理解为它可以进行排序但是排序不是从小到大排或从大到小排,是基于数组的堆结构完全二叉堆,内部是平衡二叉树堆的实现堆又分为 大顶堆 和 小顶堆。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的,这里是小顶堆,)

3、出队方式和前面的也不同,是根据权重进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。

PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。

二叉堆用数组表示的父子节点关系如下

leftNo = parentNo*2+1

rightNo = parentNo*2+2

parentNo = (nodeNo-1)/2

1.3.1、入队图解

入队元素:[10,20,5,1,21,30]

队列初始化时:

image-20201208105311217

这时,我们开始将元素 元素 10 入队,并用二叉树辅助理解:

image-20201208105322643

我们在将元素 20 入队:

image-20201208105337122

将元素 5 入队后发现父节点大于子节点,这时需要进行 上浮调整

image-20201208105400066

开始进行 上浮调整,将元素 10 和元素 5进行位置调换,结果如下:

image-20201208105413145

接着将元素 1 入队后发现父节点大于子节点,继续进行调整:

image-20201208105436929

第一次调整将元素 20 和元素 1 进行位置交换,交换完毕后结果如下:

image-20201208105451615

交换完毕后,我们发现父节点的元素值还是大于子节点,说明还需要进行一次交换,最后交换结果如下:

image-20201208105501667

接下来将元素 25 和 30 入队,结果如下:

image-20201208105514443

1.3.2、出队图解

队列的初始化状态如下:

image-20201208110053585

这时,我们需要从队列中取出第一个元素 1,元素 1 取出时会与队列中最后一个元素进行交换,并将最后一个元素置空。(实际上源码不是这么做的,源代码中是用变量来保存索引,直到全部 下沉调整 完成才进行替换)

image-20201208110110259

替换后,结果就如下图显示一样。我们发现父节点大于子节点了,所以还需要再一次进行替换操作。

image-20201208110120714

再一次替换后,将元素 30 下沉到下一个左边子节点,子节点上浮到原父节点位置。这就完成了下沉调整了

image-20201208110132617

1.3.3、涉及算法

1.3.3.1、10亿个数中找出最大的k个数

一个较好的方法:先拿出10000个建立小根堆,对于剩下的元素,如果大于堆顶元素的值,删除堆顶元素,再进行插入操作,否则直接跳过,这样知道所有元素遍历完,堆中的10000个就是最大的10000个。时间复杂度: m + (n-1)logm = O(nlogm)

方法1:局部淘汰法

第二种方法为局部淘汰法,该方法与排序方法类似,用一个容器保存前10000个数,然后将剩余的所有数字——与容器内的最小数字相比,如果所有后续的元素都比容器内的10000个数还小,那么容器内这个10000个数就是最大10000个数。如果某一后续元素比容器内最小数字大,则删掉容器内最小元素,并将该元素插入容器,最后遍历完这1亿个数,得到的结果容器中保存的数即为最终结果了。

此时的时间复杂度为O(n+m^2),其中m为容器的大小,即10000。

方法2:最小堆

第五种方法采用最小堆。首先读入前10000个数来创建大小为10000的最小堆,建堆的时间复杂度为O(mlogm)(m为数组的大小即为10000)

然后遍历后续的数字,并于堆顶(最小)数字进行比较。如果比最小的数小,则继续读取后续数字;如果比堆顶数字大,则替换堆顶元素并重新调整堆为最小堆。整个过程直至1亿个数全部遍历完为止。然后按照中序遍历的方式输出当前堆中的所有10000个数字。该算法的时间复杂度为O(nmlogm),空间复杂度是10000(常数)。

1.3.3.2、10亿个数中找出第k大个数

方法一:排序

可以使用排序算法对原数组进行排序,然后取出其下标为k的数即为第k大数。

方法:最小堆

根据原数组建小根堆,然后依次弹出k次堆顶,其第·k次弹出的数即为第k大数

1.3、ArrayBlockingQueue

根据 ArrayBlockingQueue 的名字我们都可以看出,它是一个队列,并且是一个基于数组的阻塞队列。

1、先进先出队列(队列头的是最先进队的元素;队列尾的是最后进队的元素)

2、有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)

1.4、SynchronousQueue

不存储元素的阻塞队列 ,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态

2、延迟队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

延时队列在项目中的应用还是比较多的,尤其像电商类平台:

1、订单成功后,在30分钟内没有支付,自动取消订单

2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。

3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存

4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

2.1、DelayQueue

JDK 中提供了一组实现延迟队列的API,位于Java.util.concurrent包下DelayQueue

DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆(来实现队列元素排序,队列中可以放基本数据类型或自定义实体类,在存放基本数据类型时,优先队列中元素默认升序排列,自定义实体类就需要我们根据类属性值比较计算了。

我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出

先简单实现一下看看效果,添加三个order入队DelayQueue,分别设置订单在当前时间的5秒10秒15秒后取消。

image-20201208140919936

要实现DelayQueue延时队列,队中元素要implements Delayed 接口,这个接口里只有一个getDelay方法,用于设置延期时间。Order类中compareTo方法负责对队列中的元素进行排序。

2.1.1、实例代码

public class Order implements Delayed {


    /** 延迟时间  */
    private long delayTime;
    String name;

    public Order(String name, long delay, TimeUnit unit) {
        this.name = name;
        this.delayTime = System.currentTimeMillis() + (delay > 0 ? unit.toMillis(delay) : 0);
    }

    /** 用于设置延期时间 */
    @Override
    public long getDelay(TimeUnit unit) {
        return delayTime - System.currentTimeMillis();
    }

    /** 方法负责对队列中的元素进行排序 */
    @Override
    public int compareTo(Delayed o) {
        Order order = (Order) o;
        long diff = this.delayTime - order.delayTime;
        if (diff <= 0) {
            return -1;
        } else {
            return 1;
        }
    }

}





public class TestMain {

    @Test
    public void test() throws InterruptedException {
        Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
        Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
        Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
        DelayQueue<Order> delayQueue = new DelayQueue<>();
        delayQueue.put(Order1);
        delayQueue.put(Order2);
        delayQueue.put(Order3);

        System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        while (true){
            Order task = delayQueue.take();
            System.out.format("订单:task.name被取消, 取消时间:LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))", , );
        }
    }

}

2.1.1.1、控制台日志

订单延迟队列开始时间:2020-12-08 14:20:38
订单:{Order1}被取消, 取消时间:{2020-12-08 14:20:43}
订单:{Order2}被取消, 取消时间:{2020-12-08 14:20:48}
订单:{Order3}被取消, 取消时间:{2020-12-08 14:20:53}

2.2、Quartz 定时任务

Quartz一款非常经典任务调度框架,在RedisRabbitMQ还未广泛应用时,超时未支付取消订单功能都是由定时任务实现的。定时任务它有一定的周期性,可能很多单子已经超时,但还没到达触发执行的时间点,那么就会造成订单处理的不够及时。

2.3、Redis Sorted set

Redis的数据结构Zset,同样可以实现延迟队列的效果,主要利用它的score属性,redis通过score来为集合中的成员进行从小到大的排序。

image-20201208142447067

通过zadd命令向队列delayqueue 中添加元素,并设置score值表示元素过期的时间;向delayqueue 添加三个order1order2order3,分别是10秒20秒30秒后过期。

消费端轮询队列delayqueue, 将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key

/**
* 消费消息
 */
public void pollOrderQueue() {

    while (true) {
        Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);

        String value = ((Tuple) set.toArray()[0]).getElement();
        int score = (int) ((Tuple) set.toArray()[0]).getScore();

        Calendar cal = Calendar.getInstance();
        int nowSecond = (int) (cal.getTimeInMillis() / 1000);
        if (nowSecond >= score) {
            jedis.zrem(DELAY_QUEUE, value);
            System.out.println(sdf.format(new Date()) + " removed key:" + value);
        }

        if (jedis.zcard(DELAY_QUEUE) <= 0) {
            System.out.println(sdf.format(new Date()) + " zset empty ");
            return;
        }
        Thread.sleep(1000);
    }
}

2.4、Redis 过期回调

Rediskey过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听key是否过期的事件,一旦key过期会触发一个callback事件。

修改redis.conf文件开启notify-keyspace-events Ex

@Configuration
public class RedisListenerConfig {
  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
 
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    return container;
  }
}

编写Redis过期回调监听方法,必须继承KeyExpirationEventMessageListener ,有点类似于MQ的消息监听。

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        System.out.println("监听到key:" + expiredKey + "已过期");
    }
}

2.5、MQ延迟队列

public class Producer {

    /**
     * 队列的名称
     */
    public static final String QUEUE_NAME = "queue";

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMqConstant.USERNAME,
                ActiveMqConstant.PASSWORD,
                ActiveMqConstant.BROKER_URL);
        try {
            // 构造从工厂得到连接对象
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 获取操作连接,一个发送或接收消息的线程
            Session session = connection.createSession(
                    Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地;消息发送给谁.
            Destination destination = session.createQueue(QUEUE_NAME);

            // 根据目的地获取一个生产者
            MessageProducer producer = session.createProducer(destination);

            //构造消息
            //1 、创建TextMessage
            sendTextMessage(session, producer);

            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
            //延迟3秒
            Long delay = 3L * 1000L;
            //周期3秒
            Long period = 3L * 1000L;
            //重复5次 消费者受到的为 1(不重复的)  + 5 = 6
            int repeat = 5;
            TextMessage message = session.createTextMessage("MESSAGE消息");
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
            producer.send(message);
    }


}

2.6、时间轮

前边几种延时队列的实现方法相对简单,比较容易理解,时间轮算法就稍微有点抽象了。kafkanetty都有基于时间轮算法实现延时队列,下边主要实践Netty的延时队列讲一下时间轮是什么原理。

先来看一张时间轮的原理图,解读一下时间轮的几个基本概念

image-20201208143052436

wheel :时间轮,图中的圆盘可以看作是钟表的刻度。比如一圈round 长度为24秒,刻度数为 8,那么每一个刻度表示 3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越大,精度越大。

当添加一个定时、延时任务A,假如会延迟25秒后才会执行,可时间轮一圈round 的长度才24秒,那么此时会根据时间轮长度和刻度得到一个圈数 round和对应的指针位置 index,也是就任务A会绕一圈指向0格子上,此时时间轮会记录该任务的roundindex信息。当round=0,index=0 ,指针指向0格子 任务A并不会执行,因为 round=0不满足要求。

所以每一个格子代表的是一些时间,比如1秒25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap的数据有些类似。

Netty构建延时队列主要用HashedWheelTimer,它底层数据结构依然是使用DelayedQueue,只是采用时间轮的算法来实现。

下面我们用Netty 简单实现延时队列,HashedWheelTimer构造函数比较多,解释一下各参数的含义。

/**
`ThreadFactory` :表示用于生成工作线程,一般采用线程池;      
`tickDuration`和`unit`:每格的时间间隔,默认100ms;     
`ticksPerWheel`:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化`hash`值的计算。     
*/
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}

Timer:是HashedWheelTimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制

TimerTask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。

Timeout:一个定时任务提交到Timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。

public class NettyDelayQueue {

  public static void main(String[] args) {

    final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);

    //定时任务
    TimerTask task1 = new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order1 5s 后执行 ");
        timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
      }
    };
    timer.newTimeout(task1, 5, TimeUnit.SECONDS);
    
    TimerTask task2 = new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order2 10s 后执行");
        timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
      }
    };
    timer.newTimeout(task2, 10, TimeUnit.SECONDS);

      
    //延迟任务
    timer.newTimeout(new TimerTask() {
      public void run(Timeout timeout) throws Exception {
        System.out.println("order3 15s 后执行一次");
      }
    }, 15, TimeUnit.SECONDS);

  }
}

从执行的结果看,order3order3延时任务只执行了一次,而order2order1为定时任务,按照不同的周期重复执行。

order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行

ContactAuthor