前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、单线程Redis为什么那么快

1、纯内存操作

2、单线程操作,避免了资源的竞争

3、采用了非阻塞I/O多路复用机制

2、如何应对缓存穿透和缓存雪崩问题

2.1、缓存穿透,

即黑客故意去请求缓存中不存在的数据,导致所有的请求都怼到数据库上,从而数据库连接异常。

2.1.1、方案1、使用互斥锁排队

业界比价普遍的一种做法,即根据key获取value值为空时,锁上,从数据库中load数据后再释放锁。若其它线程获取锁失败,则等待一段时间后重试。

这里要注意,分布式环境中要使用分布式锁,单机的话用普通的锁(synchronized、Lock)就够了。(因为这样大量查询无效key访问不是很多,如果大量查询的肯定放到缓存里面去了)


public String getWithLock(String key, Jedis jedis, String lockKey, String uniqueId, long expireTime) {
    // 通过key获取value
    String value = redisService.get(key);
    if (StringUtil.isEmpty(value)) {
        // 分布式锁,详细可以参考https://blog.csdn.net/fanrenxiang/article/details/79803037
        //封装的tryDistributedLock包括setnx和expire两个功能,在低版本的redis中不支持
        try {
            boolean locked = redisService.tryDistributedLock(jedis, lockKey, uniqueId, expireTime);
            if (locked) {
                value = userService.getById(key);
                redisService.set(key, value);
                redisService.del(lockKey);
                return value;
            } else {
                // 其它线程进来了没获取到锁便等待50ms后重试
                Thread.sleep(50);
                getWithLock(key, jedis, lockKey, uniqueId, expireTime);
            }
        } catch (Exception e) {
            log.error("getWithLock exception=" + e);
            return value;
        } finally {
            redisService.releaseDistributedLock(jedis, lockKey, uniqueId);
        }
    }
    return value;


2.1.2、布隆过滤器

bloomfilter就类似于一个hash set,用于快速判某个元素是否存在于集合中,其典型的应用场景就是快速判断一个key是否存在于某容器,不存在就直接返回。布隆过滤器的关键就在于hash算法和容器大小,下面先来简单的实现下看看效果,我这里用guava实现的布隆过滤器:

使用定时任务将数据库值放到布隆过滤器中,如果布隆过滤器有的话,则返回,没有的话就是没有



<dependencies>  
    <dependency>  
        <groupId>com.google.guava</groupId>  
        <artifactId>guava</artifactId>  
        <version>23.0</version>  
    </dependency>  
</dependencies> 

public class BloomFilterTest {
 
    private static final int capacity = 1000000;
    private static final int key = 999998;
 
    private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), capacity);
 
    static {
        for (int i = 0; i < capacity; i++) {
            bloomFilter.put(i);
        }
    }
 
    public static void main(String[] args) {
        /*返回计算机最精确的时间,单位微妙*/
        long start = System.nanoTime();
 
        if (bloomFilter.mightContain(key)) {
            System.out.println("成功过滤到" + key);
        }
        long end = System.nanoTime();
        System.out.println("布隆过滤器消耗时间:" + (end - start));
        int sum = 0;
        for (int i = capacity + 20000; i < capacity + 30000; i++) {
            if (bloomFilter.mightContain(i)) {
                sum = sum + 1;
            }
        }
        System.out.println("错判率为:" + sum);
    }
}



成功过滤到999998
布隆过滤器消耗时间:215518
错判率为:318

可以看到100w个数据中只消耗了约0.2毫秒就匹配到了key速度足够快然后模拟了1w个不存在于布隆过滤器中的key匹配错误率为318/10000也就是说出错率大概为3%跟踪下BloomFilter的源码发现默认的容错率就是0.03

public static <T> BloomFilter<T> create(Funnel<T> funnel, int expectedInsertions /* n */) {
  return create(funnel, expectedInsertions, 0.03); // FYI, for 3%, we always get 5 hash functions
}

3.2、缓存雪崩

即缓存同一时间大面积的失效,这个时候又来了一波请求,结果请求都怼到数据库上,从而导致数据库连接异常。

方案1、也是像解决缓存穿透一样加锁排队,实现同上;

方案2、建立备份缓存,缓存A和缓存B,A设置超时时间,B不设值超时时间,先从A读缓存,A没有读B,并且异步启动一个更新线程,更新A缓存和B缓存;

方案3、设置缓存超时时间的时候加上一个随机的时间长度,比如这个缓存key的超时时间是固定的5分钟加上随机的2分钟,酱紫可从一定程度上避免雪崩问题;

4、Redis过期策略

redis设置过期时间:
 (除了字符串自己独有设置过期时间的方法外,其他方法都需要依靠expire方法来设置时间)

1、expire key time(以秒为单位)–这是最常用的方式

2、setex(String key, int seconds, String value)–字符串独有的方式

如果没有设置时间,那缓存就是永不过期 -1

如果设置了过期时间,之后又想让缓存永不过期,使用persist key

4.1、过期策略

4.1.1、定时删除

含义:在设置key的过期时间的同时,为该key创建一个定时器,让定时器在key的过期时间来临时,对key进行删除

优点:保证内存被尽快释放

缺点: 定时器的创建耗时,若为每一个设置过期时间的key创建一个定时器(将会有大量的定时器产生),性能影响严重


4.1.2、懒汉式删除

含义:key过期的时候不删除,每次通过key获取值的时候去检查是否过期,若过期,则删除,返回null。

优点:删除操作只发生在通过key取值的时候发生,而且只删除当前key,所以对CPU时间的占用是比较少的,而且此时的删除是已经到了非做不可的地步(如果此时还不删除的话,我们就会获取到了已经过期的key了)

缺点:若大量的key在超出超时时间后,很久一段时间内,都没有被获取过,那么可能发生内存泄露(无用的垃圾占用了大量的内存)

4.1.3、定期删除

含义:每隔一段时间执行一次删除过期key操作

优点:定期删除过期key–处理”懒汉式删除”的缺点,通过限制删除操作的时长和频率,来减少删除操作对CPU时间的占用–处理”定时删除”的缺点

缺点:在内存友好方面,不如”定时删除”(会造成一定的物理内存占用,但是没有懒汉式那么占用内存)

   在CPU时间友好方面,不如”懒汉式删除”(会定期的去进行比较和删除操作,cpu方面不如懒汉式,但是比定时好)

难点:合理设置删除操作的执行时长(每次删除执行多长时间)和执行频率(每隔多长时间做一次删除,默认每隔100ms检查)(这个要根据服务器运行情况来定了),每次执行时间太长,或者执行频率太高对cpu都是一种压力。

方法:

1、遍历每个数据库(就是redis.conf中配置的"database"数量,默认为16)

2、redis默认每隔100ms检查,是否有过期的key,有过期key则删除。需要说明的是,redis不是每个100ms将所有的key检查一次    

3、检查当前库中的指定个数个key(默认是每个库检查20个key,注意相当于该循环执行20次,循环体是下边的描述)    
   如果当前库中没有一个key设置了过期时间,直接执行下一个库的遍历,随机获取一个设置了过期时间的key,检查该key是否过期,如果过期,删除key,判断定期删除操作是否已经达到指定时长,若已经达到,直接退出定期删除。每次进行定期删除操作执行之后,    


4、对于定期删除,在程序中有一个全局变量current_db来记录下一个将要遍历的库,假设有16个库,我们这一次定期删除遍历了10个,那此时的current_db就是11,下一次定期删除就从第11个库开始遍历,假设current_db等于15了,那么之后遍历就再从0号库开始(此时current_db==0)

4.2、Redis采用的过期策略

  懒汉式删除+定期删除


4.2.1、采用定期删除+惰性删除就没其他问题了么?

内存淘汰机制:如果定期删除没删除key。然后你也没即时去请求key,也就是说惰性删除也没生效。这样,redis的内存会越来越高。那么就应该采用

# maxmemory-policy volatile-lru

该配置就是配内存淘汰策略的(什么,你没配过?好好反省一下自己)
1)noeviction:当内存不足以容纳新写入数据时,新写入操作会报错。应该没人用吧。
2)allkeys-lru:当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的key。推荐使用,目前项目在用这种。
3)allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个key。应该也没人用吧,你不删最少使用Key,去随机删。
4)volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的key。这种情况一般是把redis既当缓存,又做持久化存储的时候才用。不推荐
5)volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个key。依然不推荐
6)volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的key优先移除。不推荐
ps:如果没有设置 expire 的key, 不满足先决条件(prerequisites); 那么 volatile-lru, volatile-random 和 volatile-ttl 策略的行为, 和 noeviction(不删除) 基本上一致。

4.3、Redis内存淘汰机制:LRU算法

Least Recently Used,最近最久未使用法,它是按照一个非常著名的计算机操作系统基础理论得来的:最近使用的页面数据会在未来一段时期内仍然被使用,已经很久没有使用的页面很有可能在未来较长的一段时间内仍然不会被使用

基于这个思想,会存在一种缓存淘汰机制,每次从内存中找到最久未使用的数据然后置换出来,从而存入新的数据!它的主要衡量指标是使用的时间,附加指标是使用的次数。在计算机中大量使用了这个机制,它的合理性在于优先筛选热点数据,所谓热点数据,就是最近最多使用的数据!因为,利用LRU我们可以解决很多实际开发中的问题,并且很符合业务场景

案例:小王在A公司上班,有一天产品提出了一个需求:“咱们系统的用户啊,每天活跃的就那么多,有太多的僵尸用户,根本不登录,你能不能考虑做一个筛选机制把这些用户刨出去,并且给活跃的用户做一个排名,我们可以设计出一些奖励活动,提升咱们的用户粘性,咱们只需要关注些活跃的用户就行了。

小王连忙点头,说可以啊,然而心里犯起嘀咕来了:这简单,按照常规思路,给用户添加一个最近活跃时间长度和登录次数,然后按照这两个数据计算他们的活跃度,最后直接排序就行了。嘿嘿,简直完美!不过!用户表字段已经很多了,又要加两个字段,然后还得遍历所有的数据排序?这样查询效率是不是会受影响啊?并且公司的服务器上次就蹦过一次,差点没忙出命来才调好。有没有更优雅的一种方式呢?小王面朝天空45°,陷入了无限的思考中…..

当小王看到LRU的时候,瞬间感觉抓住了救命稻草,这个算法不是就完全契合产品的需求吗?只要把用户数据按照LRU去筛选,利用数据结构完成的事情,完全减少了自己存储、添加字段判断、排序的过程,这样对于提高服务器性能肯定有很大的帮助,岂不美哉!

4.3.1、利用双向链表实现

双向链表有一个特点就是它的链表是双路的,我们定义好头节点和尾节点,然后利用先进先出(FIFO),最近被放入的数据会最早被获取。其中主要涉及到添加、访问、修改、删除操作。

添加:如果是新元素,直接放在链表头上面,其他的元素顺序往下移动;

访问:在头节点的可以不用管,如果是在中间位置或者尾巴,就要将数据移动到头节点;

修改:修改原值之后,再将数据移动到头部;

删除:直接删除,其他元素顺序移动;

public class Node {
    //键
    Object key;
    //值
    Object value;
    //上一个节点
    Node pre;
    //下一个节点
    Node next;

    public Node(Object key, Object value) {
        this.key = key;
        this.value = value;
    }
}


public class LRU<K, V> {
    private int capcity;//总容量
    private HashMap<K, Node> caches;//所有的node节点
    private Node first;//头节点
    private Node last;//尾节点

    public LRU(int size) {
        this.capcity = size;
        caches = new HashMap<>(size);
    }

    /**
     * 放入元素
     * @param key
     * @param value
     */
    public void put(K key, V value) {
        //1、从缓存中取
        Node node = caches.get(key);
        //2、如果新元素
        if (node == null) {
            //如果超过元素容纳量
            if (caches.size() >= capcity) {
                //移除最后一个节点
                caches.remove(last.key);
                removeLast();
            }
            //创建新节点
            node = new Node(key,value);
        }else{
            //已经存在的元素覆盖旧值
            node.value = value;
        }

        //把元素移动到首部
        moveToHead(node);
        caches.put(key, node);
    }

    /**
     * 通过key获取元素
     * @param key
     * @return
     */
    public Object get(K key) {
        Node node = caches.get(key);
        if (node == null) {
            return null;
        }
        //把访问的节点移动到首部
        moveToHead(node);
        return node.value;
    }

    /**
     * 根据key移除节点
     * @param key
     * @return
     */
    public Object remove(K key) {
        Node node = caches.get(key);
        if (node != null) {
            if (node.pre != null) {
                node.pre.next = node.next;
            }
            if (node.next != null) {
                node.next.pre = node.pre;
            }
            if (node == first) {
                first = node.next;
            }
            if (last  == node) {
                last = node.pre;
            }
        }
        return caches.remove(key);
    }

    /**
     * 清除所有节点
     */
    public void clear() {
        first = null;
        last = null;
        caches.clear();
    }

    /**
     * 把当前节点移动到首部
     * @param node
     */
    private void moveToHead(Node node) {
        //1、如果是第一个节点,则返回即可,不需要变化
        if (first == node) {
            return;
        }
        //2、如果是最后一个节点,则重置最后一个节点
        if (last == node) {
            last = last.pre;
        }

        //3、如果是中间节点,则改变他下一个节点的指针pre指针
        if (node.next != null) {
            node.next.pre = node.pre;
        }
        //3、如果是中间节点,则改变他上一个节点的指针next指针
        if (node.pre != null) {
            node.pre.next = node.next;
        }

        //如果首位节点都是null,则 当前节点就是first节点
        if (first == null || last == null) {
            first = last = node;
            return;
        }

        //此时讲节点设置为第一个节点
        node.next = first;
        first.pre = node;
        first = node;
        first.pre = null;
    }

    /**
     * 移除最后一个节点
     */
    private void removeLast() {
        if (last != null) {
            last = last.pre;
            //如果last等于null,说过该节点是第一个元素
            if (last == null) {
                first = null;
            } else {
                //如果不是则,讲last的nex设置null
                last.next = null;
            }
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        Node node = first;
        while (node != null) {
            sb.append(String.format("%s:%s ", node.key, node.value));
            node = node.next;
        }
        return sb.toString();
    }


    public static void main(String[] args) {
        LRU<Integer, String> lru = new LRU<Integer, String>(5);
        lru.put(1, "a");
        lru.put(2, "b");
        lru.put(3, "c");
        lru.put(4,"d");
        lru.put(5,"e");
        System.out.println("原始链表为:"+lru.toString());

        lru.get(4);
        System.out.println("获取key为4的元素之后的链表:"+lru.toString());

        lru.put(6,"f");
        System.out.println("新添加一个key为6之后的链表:"+lru.toString());

        lru.remove(3);
        System.out.println("移除key=3的之后的链表:"+lru.toString());
    }
}

4.3.2、使用LinkedList实现

public class LRU<K, V> {
    private int capcity;//总容量
    private HashMap<K, Node> caches;//所有的node节点
    LinkedList<Node> linkedList = new LinkedList<>();
    public LRU(int capcity) {
        this.capcity = capcity;
        caches = new HashMap<>(capcity);
    }

    /**
     * 放入元素
     * @param key
     * @param value
     */
    public void put(K key, V value) {
        //1、从缓存中取
        Node node = caches.get(key);
        //2、如果新元素
        if (node == null) {
            //如果超过元素容纳量
            if (caches.size() >= capcity) {
                //移除最后一个节点
                caches.remove(linkedList.getLast().key);
                linkedList.removeLast();
            }
            //创建新节点
            node = new Node(key,value);
        }else{
            //已经存在的元素,先删除
            linkedList.remove(node);
            node.value = value;
        }
        //把元素移动到首部
        linkedList.addFirst(node);
        caches.put(key, node);
    }

    /**
     * 通过key获取元素
     * @param key
     * @return
     */
    public Object get(K key) {
        Node node = caches.get(key);
        if (node == null) {
            return null;
        }
        linkedList.remove(node);
        linkedList.addFirst(node);
        return node.value;
    }

    /**
     * 根据key移除节点
     * @param key
     * @return
     */
    public Object remove(K key) {
        Node node = caches.get(key);
        if (node != null) {
           linkedList.remove(node);
        }
        return caches.remove(key);
    }

    /**
     * 清除所有节点
     */
    public void clear() {
        caches.clear();
        linkedList.clear();
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        if (!linkedList.isEmpty()){
            linkedList.stream().forEach(node ->{
                sb.append(String.format("%s:%s ", node.key, node.value));
            });
        }
        return sb.toString();
    }


    public static void main(String[] args) {
        LRU<Integer, String> lru = new LRU<Integer, String>(5);
        lru.put(1, "a");
        lru.put(2, "b");
        lru.put(3, "c");
        lru.put(4,"d");
        lru.put(5,"e");
        System.out.println("原始链表为:"+lru.toString());

        lru.get(4);
        System.out.println("获取key为4的元素之后的链表:"+lru.toString());

        lru.put(6,"f");
        System.out.println("新添加一个key为6之后的链表:"+lru.toString());

        lru.remove(3);
        System.out.println("移除key=3的之后的链表:"+lru.toString());
    }
}

class Node {
    //键
    Object key;
    //值
    Object value;
    public Node(Object key, Object value) {
        this.key = key;
        this.value = value;
    }
}

4.3.3、重写LinkedHashMap

LinkedHashMap默认的构造参数是默认 插入顺序的,就是说你插入的是什么顺序,读出来的就是什么顺序,但是也有访问顺序,就是说你访问了一个key,这个key就跑到了最后面

accessOrder:设置为false,表示不是访问顺序而是插入顺序存储的,这也是默认值,表示LinkedHashMap中存储的顺序是按照调用put方法插入的顺序进行排序的。LinkedHashMap也提供了可以设置accessOrder的构造方法,如下

accessOrde:设置为 true 表示让 linkedHashMap 按照访问顺序来进行排序,最近访问的放在头部,最老访问的放在尾部。

class LRU<K, V> extends LinkedHashMap<K, V> {

    private final int CACHE_SIZE;

    /**
     * true 表示让 linkedHashMap 按照访问顺序来进行排序,最近访问的放在头部,最老访问的放在尾部。
     * cacheSize / 0.75 来源于 0.75 * capacity = cacheSize
     * Math.ceil()“向上取整”, 即小数部分直接舍去,因为舍去了小数部分所以 + 1
     *
     * capacity =  (int) Math.ceil(cacheSize / 0.75) + 1,
     */
    public LRU(int cacheSize) {
        super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true);
        CACHE_SIZE = cacheSize;
    }

    /**
     * 当 map中的数据量大于指定的缓存个数的时候,就自动删除最老的数据。
     */
    @Override
    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        return super.size() > CACHE_SIZE;
    }

}

5、Redis分布式锁的实现

5.1、Redis锁问题和解决方案

5.1.1、 SETNX + EXPIRE 非原子性

看到这个命令的时候,分布式锁开发的时候SET命令还不支持NX、PX,所以才想出这种办法来实现key过期,NX、PX在2.6.12以后开始支持

加锁命令:SETNX key value,当键不存在时,对键进行设置操作并返回成功,否则返回失败。KEY 是锁的唯一标识,一般按业务来决定命名。

锁超时:EXPIRE key timeout, 设置 key 的超时时间,以保证即使锁没有被显式释放,锁也可以在一定时间后自动释放,避免资源被永远锁住。

解锁命令:DEL key,通过删除键值对释放锁,以便其他线程可以通过 SETNX 命令来获取锁。

if (setnx(key, 1) == 1){
    expire(key, 30)
        try {
            //TODO 业务逻辑
        } finally {
            del(key)
        }
}

不能保证原子性原因:

如果 SETNX 成功,在设置锁超时时间后,服务器挂掉、重启或网络问题等,导致 EXPIRE 命令没有执行,锁没有设置超时时间变成死锁。

image-20201211173718441

5.1.3、锁误解除:不直接删除key

原因:

如果线程 A 成功获取到了锁,并且设置了过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁;随后 A 执行完成,线程 A 使用 DEL 命令来释放锁,但此时线程 B 加的锁还没有执行完成,线程 A 实际释放的线程 B 加的锁。这时C3来直接就加锁成功

答案:

答:通过在 value 中设置当前线程加锁的标识,在删除之前验证 key 对应的 value 判断锁是否是当前线程持有。可生成一个 UUID 标识当前线程,使用 lua 脚本做验证标识和解锁操作。

4.1.3、超时解锁导致并发

原因:

如果线程 A 成功获取锁并设置过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁,线程 A 和线程 B 并发执行。

答案

答:A、B 两个线程发生并发显然是不被允许的,一般有两种方式解决该问题:

1、将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成。

2、为获取锁的线程增加守护线程,为将要过期但未释放的锁增加有效时间。 (Redision 看门狗)

5.1.4、 不可重入

原因:

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis 可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0 时释放锁。

答案:

Java 中使用 ThreadLocal 进行重入次数统计

private static ThreadLocal<Map<String, Integer>> LOCKERS = ThreadLocal.withInitial(HashMap::new);
// 加锁
public boolean lock(String key) {
    Map<String, Integer> lockers = LOCKERS.get();
    if (lockers.containsKey(key)) {
        lockers.put(key, lockers.get(key) + 1);
        return true;
    } else {
        if (SET key uuid NX EX 30) {
            lockers.put(key, 1);
            return true;
        }
    }
    return false;
}
// 解锁
public void unlock(String key) {
    Map<String, Integer> lockers = LOCKERS.get();
    if (lockers.getOrDefault(key, 0) == 1) {
        lockers.remove(key);
        //DEL key (解锁方式看后面)
    } else {
        lockers.put(key, lockers.get(key) - 1);
    }
}

5.1.5、不能阻塞等待锁释放:

原因:

一般情况下,上锁后执行都是立即返回的,不能一直阻塞等待锁释放

答案:

可以通过客户端轮询的方式解决该问题,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率。

5.2、分布式锁的实现

5.2.1、Jedis上锁

public class RedisTool {

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    private static final Long RELEASE_SUCCESS = 1L;


    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }


    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}

5.2.1.1、上锁

第一个为key,我们使用key来当锁,因为key是唯一的。

第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。

第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

第五个为time,与第四个参数相呼应,代表key的过期时间。

5.2.1.2、解锁

首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?

答:因为要确保上述操作是原子性的。关于非原子性会带来什么问题 。

那么为什么执行eval()方法可以确保原子性

答:源于Redis的特性,eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。

5.2.2、redisTemplate锁

package com.hlj.redis.lock.utils;

/**
 * redis锁,原地址https://gitee.com/itopener/springboot/tree/master
 */
public interface DistributedLock {


    public static final long TIMEOUT_MILLIS = 30000; //超时时间

    public static final int RETRY_TIMES = Integer.MAX_VALUE; //重试次数

    public static final long SLEEP_MILLIS = 500; //重试时 线程休眠次数

    public boolean lock(String key);

    public boolean lock(String key, int retryTimes);

    public boolean lock(String key, int retryTimes, long sleepMillis);

    public boolean lock(String key, long expire);

    public boolean lock(String key, long expire, int retryTimes);

    public boolean lock(String key, long expire, int retryTimes, long sleepMillis);

      /**  自定义requestId
     * requestId可以空null ,为null的时候,选择UUID
     * @param key
     * @param requestId
     * @param expire
     * @param retryTimes
     * @param sleepMillis
     * @return
     */
// public boolean lock(String key, String requestId,long expire, int retryTimes, long sleepMillis);

    
    public boolean releaseLock(String key);
}




package com.hlj.redis.lock.utils;

/**
 * redis锁,原地址https://gitee.com/itopener/springboot/tree/master
 */
public abstract class AbstractDistributedLock implements DistributedLock {


    @Override
    public boolean lock(String key) {
        return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, int retryTimes) {
        return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, int retryTimes, long sleepMillis) {
        return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
    }

    @Override
    public boolean lock(String key, long expire) {
        return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, long expire, int retryTimes) {
        return lock(key, expire, retryTimes, SLEEP_MILLIS);
    }

}


package com.hlj.redis.lock.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
 * redis锁,原地址https://gitee.com/itopener/springboot/tree/master
 */
@Service("redisDistributedLock")
public class RedisDistributedLock extends AbstractDistributedLock {

    private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);

    private RedisTemplate<Object, Object> redisTemplate;

    private ThreadLocal<String> lockFlag = new ThreadLocal<String>();

    public static final String UNLOCK_LUA;

    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    }

    public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) {
        super();
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
        boolean result = setRedis(key, expire);
        // 如果获取锁失败,按照传入的重试次数进行重试
        while ((!result) && retryTimes-- > 0) {
            try {
                logger.debug("lock failed, retrying..." + retryTimes);
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                return false;
            }
            result = setRedis(key, expire);
        }
        return result;
    }
    
 //    @Override 自定义requestId
//    public boolean lock(String key, String requestId,long expire, int retryTimes, long sleepMillis) {
//        if(StringUtils.isEmpty(requestId)){
//            requestId = UUID.randomUUID().toString();
//        }
//        boolean result = setRedis(key, expire,requestId);
//        // 如果获取锁失败,按照传入的重试次数进行重试
//        while ((!result) && retryTimes-- > 0) {
//            try {
//                logger.debug("lock failed, retrying..." + retryTimes);
//                Thread.sleep(sleepMillis);
//            } catch (InterruptedException e) {
//                return false;
//            }
//            if(StringUtils.isEmpty(requestId)){
//                requestId = UUID.randomUUID().toString();
//            }
//            result = setRedis(key, expire,requestId);
//        }
//        return result;
//    }

    private boolean setRedis(String key, long expire) {
        try {
            String result = redisTemplate.execute(new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                    String uuid = UUID.randomUUID().toString();
                    lockFlag.set(uuid);
                    //PX millionSecond
                    return commands.set(key, uuid, "NX", "PX", expire);
                }
            });
            return !StringUtils.isEmpty(result);
        } catch (Exception e) {
            logger.error("set redis occured an exception", e);
        }
        return false;
    }

    @Override
    public boolean releaseLock(String key) {
        // 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
        try {
            List<String> keys = new ArrayList<String>();
            keys.add(key);
            List<String> args = new ArrayList<String>();
            args.add(lockFlag.get());

            // 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
            // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本

            Long result = redisTemplate.execute(new RedisCallback<Long>() {
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                    // 集群模式
                    if (nativeConnection instanceof JedisCluster) {
                        return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }

                    // 单机模式
                    else if (nativeConnection instanceof Jedis) {
                        return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
                    return 0L;
                }
            });

            return result != null && result > 0;
        } catch (Exception e) {
            logger.error("release lock occured an exception : key = {}", key, e);
        } finally {
            // 清除掉ThreadLocal中的数据,避免内存溢出
            lockFlag.remove();
        }
        return false;
    }

}


package com.hlj.redis.lock;

import com.hlj.redis.lock.utils.DistributedLock;
import com.hlj.redis.lock.utils.RedisTool;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import java.util.Date;

/**
 * @Desc:
 * @Author HealerJean
 * @Date 2018/9/13  下午12:04.
 */
@RequestMapping("redis/lock")
@Controller
public class LockController {


    //库存个数
     int goodsCount = 10;

    //卖出个数
     int saleCount = 0;

    /**
     * 缓存key-用户体力锁
     */
    public static final String TEST_LOCK = "test_lock:";

    @Resource
    private DistributedLock lock;

    @GetMapping("test")
    @ResponseBody
    public void lockRedis(){
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                try {Thread.sleep(2);} catch (InterruptedException e) {}
                if (lock.lock(TEST_LOCK , 3000l, 5, 500)) {
                    if (goodsCount > 0) {
                        goodsCount--;
                        System.out.println("剩余库存:" + goodsCount + " 卖出个数" + ++saleCount);
                    }
                }
                lock.releaseLock(TEST_LOCK);

            }).start();
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}


5.2.3、RedLock

产生背景:

分布式锁,当我们请求一个分布式锁的时候,成功了,但是这时候slave还没有复制我们的锁,masterDown了,我们的应用继续请求锁的时候,会从继任了master的原slave上申请,也会成功,即使这样,也可能出现问题。

支持单机,主从,哨兵,集群等模式

//单机
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
 
 
//主从
Config config = new Config();
config.useMasterSlaveServers()
    .setMasterAddress("127.0.0.1:6379")
    .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
    .addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
 
 
//哨兵
Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
    .addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
 
 
//集群
Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // cluster state scan interval in milliseconds
    .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
    .addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

5.2.3.1、原理

用Redis中的多个master实例,来获取锁,只有大多数实例获取到了锁,才算是获取成功。具体的红锁算法分为以下五步:

1、获取当前的时间(单位是毫秒)。

2、用相同的key和具有唯一性的value在N个节点上请求锁(LUA脚本)。这里获取锁的尝试时间要远远小于锁的超时时间,防止某个masterDown了,我们还在不断的获取锁,而被阻塞过长的时间。

3、只有在大多数节点上获取到了锁(至少N/2+1),而且获取时间小于锁的超时时间的情况下,认为锁获取成功了。

4、如果锁获取成功了,锁的超时时间就是最初的锁超时时间减去获取锁的总耗时时间。

5、如果锁获取失败了,不管是因为获取成功的节点的数目没有过半,还是因为获取锁的耗时超过了锁的超时时间,都会将已经设置了key的master上的key删除。

5.2.3.2、代码实现

<!-- JDK 1.8+ compatible -->
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.9.0</version>
</dependency> 
@Bean(name = "redisson")
public RedissonClient redissonClient() throws IOException {
    InputStream in = this.getClass().getResourceAsStream("/redisson.yaml");
    Config config = Config.fromYAML(in);
    return Redisson.create(config);
}

redisson.yaml


singleServerConfig:
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  reconnectionTimeout: 3000
  failedAttempts: 3
  password: 123456
  subscriptionsPerConnection: 5
  clientName: null
  address: "redis://healerjean:6603"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 32
  connectionPoolSize: 64
  database: 4
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
@Service
public class CacheServiceImpl implements CacheService {


    @Autowired
    private RedissonClient redisson;

    private final String SEQNO_FORMAT = "0000";
    private final String REDIS_PREFIX = REDIS_CSF + ":";
    private final String REDIS_LOCK_PREFIX = REDIS_LOCK + ":";


    /**
     * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
     * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
     */
    @Override
    public boolean lock(String key, long timeout, TimeUnit timeUnit) {
        try {
            RLock lock = redisson.getLock(REDIS_PREFIX + REDIS_LOCK_PREFIX + key);
            return lock.tryLock(0, timeout, timeUnit);
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public void unlock(String key) {
        RLock lock = redisson.getLock(REDIS_PREFIX + REDIS_LOCK_PREFIX + key);
        lock.unlock();
    }
}

5.2.3.3、核心源码

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }

    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1. 允许加锁失败节点个数限制(N-(N/2+1))
     */
    int failedLocksLimit = failedLocksLimit();
    /**
     * 2. 遍历所有节点通过EVAL命令执行lua加锁
     */
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.对节点尝试加锁
         */
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 抛出异常表示获取锁失败
            lockAcquired = false;
        }

        if (lockAcquired) {
            /**
             *4. 如果获取到锁则添加到已获取锁集合中
             */
            acquiredLocks.add(lock);
        } else {
            /**
             * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
             * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
             * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
             */
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }

        /**
         * 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
         */
        if (remainTime != -1) {
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }

        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }

    /**
     * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
     */
    return true;
}

5.2.3.4、图解流程

线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。

线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。

image-20201218145554687

image-20201218152103903

5.2.3.5、解决问题

5.2.3.5.1、可重入加锁机制

1、Redis存储锁的数据类型是 Hash类型

2、Hash数据类型的key值包含了当前线程信息。

redissonLock.lock("redisson", 1);

image-20201218150320801

这里表面数据类型是Hash类型,Hash类型相当于我们java的 <key,<key1,value>> 类型,这里key是指 ‘redisson’ 它的有效期还有9秒,我们再来看里们的key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是: uuid + 当前线程的ID。后面的value是就和可重入加锁有关。

image-20201218150500553

  

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                    
假设前面获取锁时传的name是“abc”,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1                   
                    

1、判断有没有一个叫“abc”的key

2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对 ,并设置它的过期时间

3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间

4、返回“abc”的生存时间(毫秒)

这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。

用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

5.2.3.5.2、解决超时解锁导致并发的问题

1、Redisson 所有指令都通过 Lua 脚本执行,Redis 支持 Lua 脚本原子性执行。

2、Redisson 设置一个 Key 的默认过期时间为 30s,如果某个客户端持有一个锁超过了 30s 怎么办?

3、Redisson 中有一个 Watchdog 的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔 10s 帮你把 Key 的超时时间设为 30s。

这样的话,就算一直持有锁也不会出现 Key 过期了,其他线程获取到锁的问题了。

4、Redisson 的“看门狗”逻辑保证了没有死锁发生。(如果机器宕机了,看门狗也就没了。此时就不会延长 Key 的过期时间,到了 30s 之后就会自动过期了,其他线程可以获取到锁)

5.2.3.5.3、保证原子性操作

通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性

5.2.3.6、极端问题

问题1:虽然说RedLock算法可以解决单点Redis分布式锁的安全性问题,但如果集群中有节点发生崩溃重启,还是会锁的安全性有影响的。具体出现问题的场景如下: 假设一共有5个Redis节点:A, B, C, D, E。设想发生了如下的事件序列:

1、客户端1成功锁住了A, B, C,获取锁成功(但D和E没有锁住)

2、节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了

3、节点C重启后,客户端2锁住了C, D, E,获取锁成功

这样,客户端1和客户端2同时获得了锁(针对同一资源)。针对这样场景,解决方式也很简单,也就是让Redis崩溃后延迟重启,并且这个延迟时间大于锁的过期时间就好。这样等节点重启后,所有节点上的锁都已经失效了。也不存在以上出现2个客户端获取同一个资源的情况了。 但是这个也要受限于业务上的一个锁的过期时间

6、Redis分布式锁和Zk分布式锁区别

6.1、Redis实现分布式锁

1、Redis 的设计定位决定了它的数据并不是强一致性的,在某些极端情况下,可能会出现问题。锁的模型不够健壮。

在redis sentinel集群中,我们具有多台redis,他们之间有着主从的关系,例如一主二从。我们的set命令对应的数据写到主库,然后同步到从库。当我们申请一个锁的时候,对应就是一条命令 setnx mykey myvalue ,在redis sentinel集群中,这条命令先是落到了主库。假设这时主库down了,而这条数据还没来得及同步到从库,sentinel将从库中的一台选举为主库了。这时,我们的新主库中并没有mykey这条数据,若此时另外一个client执行 setnx mykey hisvalue , 也会成功,即也能得到锁。这就意味着,此时有两个client获得了锁。这不是我们希望看到的,虽然这个情况发生的记录很小,只会在主从failover的时候才会发生,大多数情况下、大多数系统都可以容忍,但是不是所有的系统都能容忍这种瑕疵.,锁的安全性被打破了。针对这个问题。Redis作者antirez提出了RedLock算法来解决这个问题,但是也会有重复锁上的问题,具体看上面的的RedLock分析。

2、Redis 分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。

3、如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁

另一方面使用 Redis 实现分布式锁在很多企业中非常常见,而且大部分情况下都不会遇到所谓的“极端复杂场景”,所以使用 Redis 作为分布式锁也不失为一种好的方案,最重要的一点是 Redis 的性能很高,可以支撑高并发的获取、释放锁操作。

6.2、Zk分布式锁

ZK 的模型是这样的:ZK 包含一系列的节点,叫做 Znode,就好像文件系统一样,每个 Znode 表示一个目录。

然后 Znode 有一些特性:

有序节点:假如当前有一个父节点为 /lock,我们可以在这个父节点下面创建子节点,ZK 提供了一个可选的有序特性。

例如我们可以创建子节点“/lock/node-”并且指明有序,那么 ZK 在生成子节点时会根据当前的子节点数量自动添加整数序号。

也就是说,如果是第一个创建的子节点,那么生成的子节点为 /lock/node-0000000000,下一个节点则为 /lock/node-0000000001,依次类推。

临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,ZK 会自动删除该节点。

事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,ZK 会通知客户端。

6.2.1、ZK 实现分布式锁的落地方案:

1、使用 ZK 的临时节点和有序节点,每个线程获取锁就是在 ZK 创建一个临时有序的节点,比如在 /lock/ 目录下。

2、创建节点成功后,获取 /lock 目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。

3、如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

4、如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。

5、比如当前线程获取到的节点序号为 /lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对 /lock/002 这个节点添加一个事件监听器。

如果锁释放了,会唤醒下一个序号的节点,然后重新执行第 3 步,判断是否自己的节点序号是最小。比如 /lock/001 释放了,/lock/002 监听到时间,此时节点集合为[/lock/002,/lock/003],则 /lock/002 为最小序号节点,获取到锁。

6.2.2、总结

1、ZK 天生设计定位就是分布式协调,强一致性。锁的模型健壮、简单易用、适合做分布式锁。

2、如果获取不到锁,只需要添加一个监听器就可以了,不用一直轮询,性能消耗较小。

3、而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。

但是 ZK 也有其缺点:如果有较多的客户端频繁的申请加锁、释放锁,对于 ZK 集群的压力会比较大。

6、Redis 主从复制、哨兵和集群原理与区别

哨兵(Sentinel):可以管理多个Redis服务器,它提供了监控,提醒以及自动的故障转移的功能。

复制(Replication):则是负责让一个Redis服务器可以配备多个备份的服务器。

6.1、哨兵

哨兵是Redis集群架构中非常重要的一个组件,哨兵的出现主要是解决了主从复制出现故障时需要人为干预的问题。

6.1.1、Redis哨兵主要功能

(1)集群监控:负责监控Redis master和slave进程是否正常工作

(2)消息通知:如果某个Redis实例有故障,那么哨兵负责发送消息作为报警通知给管理员

(3)故障转移:如果master node挂掉了,会自动转移到slave node上

(4)配置中心:如果故障转移发生了,通知client客户端新的master地址

image-20201221173352445

6.1.2、Redis哨兵的高可用

原理:当主节点出现故障时,由Redis Sentinel自动完成故障发现和转移,并通知应用方,实现高可用性

1、哨兵机制建立了多个哨兵节点(进程),共同监控数据节点的运行状况。

2、同时哨兵节点之间也互相通信,交换对主从节点的监控状况。

3、每隔1秒每个哨兵会向整个集群:Master主服务器+Slave从服务器+其他Sentinel(哨兵)进程,发送一次ping命令做一次心跳检测。

这个就是哨兵用来判断节点是否正常的重要依据,涉及两个新的概念:主观下线和客观下线

主观下线:一个哨兵节点判定主节点down掉是主观下线。

客观下线:只有半数哨兵节点都主观判定主节点down掉,此时多个哨兵节点交换主观判定结果,才会判定主节点客观下线。

基本上哪个哨兵节点最先判断出这个主节点客观下线,就会在各个哨兵节点中发起投票机制Raft算法(选举算法),最终被投为领导者的哨兵节点完成主从自动化切换的过程。

6.2、Redis复制

Redis为了解决单点数据库问题,会把数据复制多个副本部署到其他节点上,通过复制,实现Redis的高可用性,实现对数据的冗余备份,保证数据和服务的高度可靠性。

①从数据库向主数据库发送sync(数据同步)命令。

②主数据库接收同步命令后,会保存快照,创建一个RDB文件。

③当主数据库执行完保持快照后,会向从数据库发送RDB文件,而从数据库会接收并载入该文件。

④主数据库将缓冲区的所有写命令发给从服务器执行。

⑤以上处理完之后,之后主数据库每执行一个写命令,都会将被执行的写命令发送给从数据库。

注意:在Redis2.8之后,主从断开重连后会根据断开之前最新的命令偏移量进行增量复制

image-20201221173752786

6.3、区别

1. 主从复制模式读写分离,备份数据、负载均衡,一个Master可以有多个Slaves。

2. 哨兵sentinel:为了高可用,监控,自动转移,哨兵发现主服务器挂了后,就会从slave中重新选举一个主服务器。

3. 集群:为了解决单机Redis容量有限的问题,将数据按一定的规则分配到多台机器,内存/QPS不受限于单机,可受益于分布式集群高扩展性。

ContactAuthor