前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、3种过期策略

1.1、定时删除

在设置key的过期时间的同时,为该key创建一个定时器,让定时器在key的过期时间来临时,对key进行删除

优点:保证内存被尽快释放

缺点: 定时器的创建耗时,若为每一个设置过期时间的key创建一个定时器(将会有大量的定时器产生),性能影响严重


1.2、懒汉式删除

含义:key过期的时候不删除,每次通过key获取值的时候去检查是否过期,若过期,则删除,返回null。

优点:删除操作只发生在通过key取值的时候发生,而且只删除当前key,所以对CPU时间的占用是比较少的

缺点:若大量的key在超出超时时间后,很久一段时间内,都没有被获取过,那么可能发生内存泄露(无用的垃圾占用了大量的内存)

1.3、定期删除

含义:每隔一段时间执行一次删除过期key操作

优点:

1、减少懒汉式删除”的缺点,定期就会删除过期 key

2、减少定时删除缺点,不需要每个key都设置一个定时器

缺点:

1、在 CPU 时间友好方面,不如”懒汉式删除”,但是比定时删除好

2、在内存友好方面,不如”定时删除”,但是比懒汉式好

2、Redis采用的过期策略

2.1、为实现过期策略做了什么?

2.1.1、过期的 key 集合

redis 会将每个设置了过期时间的 key 放入到一个独立的字典中,定时遍历这个 字典来删除到期的 key

除了定时遍历之外,它还会使用惰性策略来删除过期的 key,所谓 惰性策略就是在客户端访问这个 key 的时候,rediskey 的过期时间进行检查,如果过期 了就立即删除。定时删除是集中处理,惰性删除是零散处理。

2.1.2、定期扫描策略

难点:合理设置删除操作的**执行时长(默认 25ms)和频率(默认100ms) **

Redis 默认会每秒进行十次过期扫描,过期扫描不会遍历过期字典中所有的 key,而是采用了一种简单的贪心策略。

1、从过期字典中随机 20key;

2、删除这 20key 中已经过期的 key;

3、如果过期的 key 比率超过 1/4,那就重复步骤 1;

2.1.3、从库的过期策略

主库同步一个del命令到从库

因为指令同步是异步进行的,所以主库过期的 keydel 指令没有及时同步到从库的 话,会出现主从数据的不一致,主库没有的数据在从库里还存在

2.2、问题和回答

问题1:设想一个大型的 Redis 实例中所有的 key 在同一时间过期了,会出现怎样的结果?

答案:毫无疑问,Redis 会持续扫描过期字典 (循环多次),直到过期字典中过期的 key 变得稀疏,才会停止 (循环次数明显下降)。

问题2:那又怎么样,有什么问题吗?

答案: 如下

1、Redis单线程,定时任务频繁执行,导致线上读写请求出现明显的卡顿现象。

2、内存管理器需要频繁回收内存页,这也会产生一定的 CPU 消耗。

问题3:怎么解决这种大量过期的 key 呢?

答案:给 key 过期时间设置 一个随机范围,而不能全部在同一时间过期。

问题4:采用定期删除+惰性删除就没其他问题了么?

答案:如果定期删除没删除key。然后你也没即时去请求key,也就是说惰性删除也没生效。这样,redis的内存会越来越高。那么就应该采用一种淘汰策略

3、优胜劣汰 —— LRU

3.1、淘汰策略

如果没有设置 expire 的key, 不满足先决条件(prerequisites); 那么 volatile-lru, volatile-random 和 volatile-ttl 策略的行为, 和 noeviction(不删除) 基本上一致。

参数 说明
noeviction 当内存不足以容纳新写入数据时,新写入操作会报错。应该没人用吧
allkeys-random 当内存不足以容纳新写入数据时,在键空间中,随机移除某个key。应该也没人用吧,你不删最少使用Key,去随机删干嘛。
allkeys-lru 当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的key。推荐使用,目前项目在用这种LRU算法
volatile-lru 当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的 key。这种情况一般是把redis既当缓存,又做持久化存储的时候才用。正常情况下不推荐
volatile-random 当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个key。依然不推荐
volatile-ttl 当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的key优先移除。不推荐

3.2、LUR算法

Least Recently Used, 最近最久未使用法,它是按照一个非常著名的计算机操作系统基础理论得来的:最近使用的页面数据会在未来一段时期内仍然被使用,已经很久没有使用的页面很有可能在未来较长的一段时间内仍然不会被使用

1、基于这个思想,会存在一种缓存淘汰机制,每次从内存中找到最久未使用的数据然后置换出来,从而存入新的数据!**它的主要衡量指标是使用的时间,附加指标是使用的次数。 **

2、在计算机中大量使用了这个机制,它的合理性在于优先筛选热点数据,所谓热点数据,就是最近最多使用的数据!因为,利用LRU我们可以解决很多实际开发中的问题,并且很符合业务场景

算法实现:

实现 LRU 算法除了需要 key/value 字典外,还需要附加一个链表,链表中的元素按照 一定的顺序进行排列。

1、当空间满的时候,会踢掉链表尾部的元素。当字典的某个元素被访问时,它在链表中的位置会被移动到表头。所以链表的元素排列顺序就是元素最近被访问的时间顺序。

2、位于链表尾部的元素就是不被重用的元素,所以会被踢掉。位于表头的元素就是最近刚刚被人用过的元素,所以暂时不会被踢。

3.2.1、利用双向链表实现

双向链表有一个特点就是它的链表是双路的,我们定义好头节点和尾节点,然后利用先进先出(FIFO),最近被放入的数据会最早被获取。其中主要涉及到添加、访问、修改、删除操作。

添加:如果是新元素,直接放在链表头上面,其他的元素顺序往下移动;

访问:在头节点的可以不用管,如果是在中间位置或者尾巴,就要将数据移动到头节点;

修改:修改原值之后,再将数据移动到头部;

删除:直接删除,其他元素顺序移动;

public class Node {
    //键
    Object key;
    //值
    Object value;
    //上一个节点
    Node pre;
    //下一个节点
    Node next;

    public Node(Object key, Object value) {
        this.key = key;
        this.value = value;
    }
}


public class LRU<K, V> {
    private int capcity;//总容量
    private HashMap<K, Node> caches;//所有的node节点
    private Node first;//头节点
    private Node last;//尾节点

    public LRU(int size) {
        this.capcity = size;
        caches = new HashMap<>(size);
    }

    /**
     * 放入元素
     * @param key
     * @param value
     */
    public void put(K key, V value) {
        //1、从缓存中取
        Node node = caches.get(key);
        //2、如果新元素
        if (node == null) {
            //如果超过元素容纳量
            if (caches.size() >= capcity) {
                //移除最后一个节点
                caches.remove(last.key);
                removeLast();
            }
            //创建新节点
            node = new Node(key,value);
        }else{
            //已经存在的元素覆盖旧值
            node.value = value;
        }

        //把元素移动到首部
        moveToHead(node);
        caches.put(key, node);
    }

    /**
     * 通过key获取元素
     * @param key
     * @return
     */
    public Object get(K key) {
        Node node = caches.get(key);
        if (node == null) {
            return null;
        }
        //把访问的节点移动到首部
        moveToHead(node);
        return node.value;
    }

    /**
     * 根据key移除节点
     * @param key
     * @return
     */
    public Object remove(K key) {
        Node node = caches.get(key);
        if (node != null) {
            if (node.pre != null) {
                node.pre.next = node.next;
            }
            if (node.next != null) {
                node.next.pre = node.pre;
            }
            if (node == first) {
                first = node.next;
            }
            if (last  == node) {
                last = node.pre;
            }
        }
        return caches.remove(key);
    }

    /**
     * 清除所有节点
     */
    public void clear() {
        first = null;
        last = null;
        caches.clear();
    }

    /**
     * 把当前节点移动到首部
     * @param node
     */
    private void moveToHead(Node node) {
        //1、如果是第一个节点,则返回即可,不需要变化
        if (first == node) {
            return;
        }
        //2、如果是最后一个节点,则重置最后一个节点
        if (last == node) {
            last = last.pre;
        }

        //3、如果是中间节点,则改变他下一个节点的指针pre指针
        if (node.next != null) {
            node.next.pre = node.pre;
        }
        //3、如果是中间节点,则改变他上一个节点的指针next指针
        if (node.pre != null) {
            node.pre.next = node.next;
        }

        //如果首位节点都是null,则 当前节点就是first节点
        if (first == null || last == null) {
            first = last = node;
            return;
        }

        //此时讲节点设置为第一个节点
        node.next = first;
        first.pre = node;
        first = node;
        first.pre = null;
    }

    /**
     * 移除最后一个节点
     */
    private void removeLast() {
        if (last != null) {
            last = last.pre;
            //如果last等于null,说过该节点是第一个元素
            if (last == null) {
                first = null;
            } else {
                //如果不是则,讲last的nex设置null
                last.next = null;
            }
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        Node node = first;
        while (node != null) {
            sb.append(String.format("%s:%s ", node.key, node.value));
            node = node.next;
        }
        return sb.toString();
    }


    public static void main(String[] args) {
        LRU<Integer, String> lru = new LRU<Integer, String>(5);
        lru.put(1, "a");
        lru.put(2, "b");
        lru.put(3, "c");
        lru.put(4,"d");
        lru.put(5,"e");
        System.out.println("原始链表为:"+lru.toString());

        lru.get(4);
        System.out.println("获取key为4的元素之后的链表:"+lru.toString());

        lru.put(6,"f");
        System.out.println("新添加一个key为6之后的链表:"+lru.toString());

        lru.remove(3);
        System.out.println("移除key=3的之后的链表:"+lru.toString());
    }
}

3.2.2、使用LinkedList实现

public class LRU<K, V> {
  private int capcity;//总容量
  private HashMap<K, Node> caches;//所有的node节点
  LinkedList<Node> linkedList = new LinkedList<>();
  public LRU(int capcity) {
    this.capcity = capcity;
    caches = new HashMap<>(capcity);
  }

  /**
     * 放入元素
     * @param key
     * @param value
     */
  public void put(K key, V value) {
    //1、从缓存中取
    Node node = caches.get(key);
    //2、如果新元素
    if (node == null) {
      //如果超过元素容纳量
      if (caches.size() >= capcity) {
        //移除最后一个节点
        caches.remove(linkedList.getLast().key);
        linkedList.removeLast();
      }
      //创建新节点
      node = new Node(key,value);
    }else{
      //已经存在的元素,先删除
      linkedList.remove(node);
      node.value = value;
    }
    //把元素移动到首部
    linkedList.addFirst(node);
    caches.put(key, node);
  }

  /**
     * 通过key获取元素
     * @param key
     * @return
     */
  public Object get(K key) {
    Node node = caches.get(key);
    if (node == null) {
      return null;
    }
    linkedList.remove(node);
    linkedList.addFirst(node);
    return node.value;
  }

  /**
     * 根据key移除节点
     * @param key
     * @return
     */
  public Object remove(K key) {
    Node node = caches.get(key);
    if (node != null) {
      linkedList.remove(node);
    }
    return caches.remove(key);
  }

  /**
     * 清除所有节点
     */
  public void clear() {
    caches.clear();
    linkedList.clear();
  }

  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();
    if (!linkedList.isEmpty()){
      linkedList.stream().forEach(node ->{
        sb.append(String.format("%s:%s ", node.key, node.value));
      });
    }
    return sb.toString();
  }


  public static void main(String[] args) {
    LRU<Integer, String> lru = new LRU<Integer, String>(5);
    lru.put(1, "a");
    lru.put(2, "b");
    lru.put(3, "c");
    lru.put(4,"d");
    lru.put(5,"e");
    System.out.println("原始链表为:"+lru.toString());

    lru.get(4);
    System.out.println("获取key为4的元素之后的链表:"+lru.toString());

    lru.put(6,"f");
    System.out.println("新添加一个key为6之后的链表:"+lru.toString());

    lru.remove(3);
    System.out.println("移除key=3的之后的链表:"+lru.toString());
  }
}

class Node {
  //键
  Object key;
  //值
  Object value;
  public Node(Object key, Object value) {
    this.key = key;
    this.value = value;
  }
}

3.2.3、重写LinkedHashMap

LinkedHashMap默认的构造参数是默认插入顺序的,就是说你插入的是什么顺序,读出来的就是什么顺序,但是也有访问顺序,就是说你访问了一个key,这个key就跑到了最后面

accessOrder:设置为false,表示不是访问顺序而是插入顺序存储的,这也是默认值,表示LinkedHashMap中存储的顺序是按照调用put方法插入的顺序进行排序的。LinkedHashMap也提供了可以设置accessOrder的构造方法,如下

accessOrde:设置为 true 表示让 linkedHashMap 按照访问顺序来进行排序,最近访问的放在头部,最老访问的放在尾部。

class LRU<K, V> extends LinkedHashMap<K, V> {

    private final int CACHE_SIZE;

    /**
     * true 表示让 linkedHashMap 按照访问顺序来进行排序,最近访问的放在头部,最老访问的放在尾部。
     * cacheSize / 0.75 来源于 0.75 * capacity = cacheSize
     * Math.ceil()“向上取整”, 即小数部分直接舍去,因为舍去了小数部分所以 + 1
     *
     * capacity =  (int) Math.ceil(cacheSize / 0.75) + 1,
     */
    public LRU(int cacheSize) {
        super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true);
        CACHE_SIZE = cacheSize;
    }

    /**
     * 当 map中的数据量大于指定的缓存个数的时候,就自动删除最老的数据。
     */
    @Override
    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        return super.size() > CACHE_SIZE;
    }

}

4、平波缓进 —— 懒惰删除

一直以来我们认为 Redis 是单线程的,单线程为 Redis 带来了代码的简洁性和丰富多样 的数据结构。不过 Redis 内部实际上并不是只有一个主线程,它还有几个异步线程专门用来 处理一些耗时的操作。

4.2、Redis 为什么要懒惰删除(lazy free)

删除指令 del 会直接释放对象的内存,大部分情况下,这个指令非常快,没有明显延迟。不过如果删除的 key 是一个非常大的对象,比如一个包含了千万元素的 hash,那么删除操作就会导致内存会一次性回收,单线程卡顿。

问题1:怎么解决删除大key的卡顿问题?

答案:Redis 为了解决这个卡顿问题,在 4.0 版本引入了 unlink 指令,它能对删除操作进行懒处理,丢给后台线程来异步回收内存。

\> unlink key 
OK

问题2:多线程开发的时候,有线程安全问题吧?

答案: 如下

如果有多线程的开发经验,你肯定会担心这里的线程安全问题,会不会出现多个线程同时并发修改数据结构的情况存在。

关于这点,我打个比方。可以将整个 Redis 内存里面所有有效的数据想象成一棵大树。 当 unlink 指令发出时,它只是把大树中的一个树枝别断了,然后扔到旁边的火堆里焚烧

树枝离开大树的一瞬间,它就再也无法被主线程中的其它指令访问到了,因为主 线程只会沿着这颗大树来访问。

4.2、flush

Redis 提供了 flushdbflushall 指令,用来清空数据库,这也是极其缓慢的操作。Redis 4.0 同样给这两个指令也带来了异步化,在指令后面增加 async 参数就可以将整棵大树连根拔起,扔给后台线程慢慢焚烧。

> flushall async 
OK

4.3、异步队列

主线程将对象的引用从「大树」中摘除后,会将这个 key 的内存回收操作包装成一个任务,塞进异步任务队列,后台线程会从这个异步队列中取任务。任务队列被主线程和异步线程同时操作,所以必须是一个线程安全的队列

注意:不是所有的 unlink 操作都会延后处理,如果对应 key 所占用的内存很小,延后处理就 没有必要了,这时候 Redis 会将对应的 key 内存立即回收,跟 del 指令一样。

image-20210608191741230

4.4、AOF Sync 也很慢

Redis 需要每秒一次(可配置)同步 AOF 日志到磁盘,确保消息尽量不丢失,需要调用 sync 函数

这个操作会比较耗时,会导致主线程的效率下降,所以 Redis 也将这个操作移到 异步线程来完成。执行 AOF Sync 操作的线程是一个独立的异步线程,和前面的懒惰删除线程不是一个线程,同样它也有一个属于自己的任务队列,队列里只用来存放 AOF Sync 任 务。

ContactAuthor