前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、Redis分布式锁的实现

1.1、Redis锁问题和解决方案

1.1.1、 SETNX + EXPIRE 非原子性

看到这个命令的时候,分布式锁开发的时候SET命令还不支持NXPX,所以才想出这种办法来实现key过期,NXPX在2.6.12以后开始支持

加锁命令SETNX key value,当键不存在时,对键进行设置操作并返回成功,否则返回失败。KEY 是锁的唯一标识,一般按业务来决定命名。

锁超时EXPIRE key timeout, 设置 key 的超时时间,以保证即使锁没有被显式释放,锁也可以在一定时间后自动释放,避免资源被永远锁住。

解锁命令DEL key,通过删除键值对释放锁,以便其他线程可以通过 SETNX 命令来获取锁。

if (setnx(key, 1) == 1){
  expire(key, 30)
    try {
      //TODO 业务逻辑
    } finally {
      del(key)
    }
}

不能保证原子性原因:

如果 SETNX 成功,在设置锁超时时间时,服务器挂掉、重启或网络问题等,导致 EXPIRE 命令没有执行,锁没有设置超时时间变成死锁。

这种问题的根源就在于 setnxexpire 是两条指令而不是原子指令。如果这两条指令可 以一起执行就不会出现问题。也许你会想到用 Redis 事务来解决原子性问题。

但是这里不行,因为 expire 是依赖于 setnx 的执行结果的,如果 setnx 没抢到锁,expire 是不应该执行的。事务里没有 if- else 分支逻辑,事务的特点是一口气执行,要么全部执行要么一个都不执行。

image-20210428172407172

1.1.3、锁误解除:不直接删除key

现象:

1、如果线程 A 成功获取到了锁,并且设置了过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放

2、此时线程 B 获取到了锁;

3、随后由于线程A 执行完成,线程 A 使用 DEL 命令来释放锁

4、但此时线程 B 加的锁还没有执行完成,线程 A 实际释放的线程B 加的锁。

5、这时线程C3来直接就加锁成功

解决:

答:通过在 value 中设置当前线程加锁的标识,在删除之前验证·key 对应的 value 判断锁是否是当前线程持有。可生成一个 UUID 标识当前线程,使用 lua 脚本做验证标识和解锁操作,这样能保证只删除自己加的锁。

释放锁时先匹配 随机数UUID是否一致,然后再删除 key。但是匹配 value 和删除 key 不是一个原子操作,Redis 也 没有提供类似于 del if equals 这样的指令,这就需要使用 Lua 脚本来处理了,因为 Lua 脚本可 以保证连续多个指令的原子性执行

1.1.3、超时解锁导致并发

现象:

如果线程 A 成功获取锁并设置过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁,线程 A 和线程 B 并发执行。

解决

答:AB 两个线程发生并发显然是不被允许的,一般有两种方式解决该问题:

1、将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成。

2、为获取锁的线程增加守护线程,为将要过期但未释放的锁增加有效时间。 (Redision 看门狗)

1.1.4、 不可重入

现象:

当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis 可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0 时释放锁。

代码解决:

Java 中使用 ThreadLocal 进行重入次数统计

private static ThreadLocal<Map<String, Integer>> LOCKERS = ThreadLocal.withInitial(HashMap::new);
// 加锁
public boolean lock(String key) {
    Map<String, Integer> lockers = LOCKERS.get();
    if (lockers.containsKey(key)) {
        lockers.put(key, lockers.get(key) + 1);
        return true;
    } else {
        if (SET key uuid NX EX 30) {
            lockers.put(key, 1);
            return true;
        }
    }
    return false;
}
// 解锁
public void unlock(String key) {
    Map<String, Integer> lockers = LOCKERS.get();
    if (lockers.getOrDefault(key, 0) == 1) {
        lockers.remove(key);
        //DEL key (解锁方式看后面)
    } else {
        lockers.put(key, lockers.get(key) - 1);
    }
}

1.1.5、不能阻塞等待锁释放:

原因:

一般情况下,上锁后执行都是立即返回的,不能一直阻塞等待锁释放

答案:

可以通过客户端轮询的方式解决该问题,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率。

2、分布式锁的实现

2.1、Jedis上锁

public class RedisTool {

  private static final String LOCK_SUCCESS = "OK";
  private static final String SET_IF_NOT_EXIST = "NX";
  private static final String SET_WITH_EXPIRE_TIME = "PX";
  private static final Long RELEASE_SUCCESS = 1L;


  public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
    if (LOCK_SUCCESS.equals(result)) {
      return true;
    }
    return false;
  }


  public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(script, Collections.singletonList(lockKey), 
                               Collections.singletonList(requestId));
    if (RELEASE_SUCCESS.equals(result)) {
      return true;
    }
    return false;
  }
}

2.1.1、上锁

第一个为key:我们使用key来当锁,因为key是唯一的。

第二个为value:我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value

原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。

第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;

第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

第五个为time,与第四个参数相呼应,代表key的过期时间。

2.1.2、解锁

首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?

答:因为要确保上述操作是原子性的。关于非原子性会带来什么问题 。

那么为什么执行eval()方法可以确保原子性

答:源于Redis的特性,eval命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到eval命令执行完成,Redis才会执行其他命令。

2.2、redisTemplate

package com.hlj.redis.lock.utils;

/**
 * redis锁,原地址https://gitee.com/itopener/springboot/tree/master
 */
public interface DistributedLock {


  public static final long TIMEOUT_MILLIS = 30000; //超时时间

  public static final int RETRY_TIMES = Integer.MAX_VALUE; //重试次数

  public static final long SLEEP_MILLIS = 500; //重试时 线程休眠次数

  public boolean lock(String key);

  public boolean lock(String key, int retryTimes);

  public boolean lock(String key, int retryTimes, long sleepMillis);

  public boolean lock(String key, long expire);

  public boolean lock(String key, long expire, int retryTimes);

  public boolean lock(String key, long expire, int retryTimes, long sleepMillis);

  /**  自定义requestId
     * requestId可以空null ,为null的时候,选择UUID
     * @param key
     * @param requestId
     * @param expire
     * @param retryTimes
     * @param sleepMillis
     * @return
     */
  // public boolean lock(String key, String requestId,long expire, int retryTimes, long sleepMillis);


  public boolean releaseLock(String key);
}




package com.hlj.redis.lock.utils;

/**
 * redis锁,原地址https://gitee.com/itopener/springboot/tree/master
 */
public abstract class AbstractDistributedLock implements DistributedLock {


  @Override
  public boolean lock(String key) {
    return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
  }

  @Override
  public boolean lock(String key, int retryTimes) {
    return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
  }

  @Override
  public boolean lock(String key, int retryTimes, long sleepMillis) {
    return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
  }

  @Override
  public boolean lock(String key, long expire) {
    return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
  }

  @Override
  public boolean lock(String key, long expire, int retryTimes) {
    return lock(key, expire, retryTimes, SLEEP_MILLIS);
  }

}


package com.hlj.redis.lock.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

/**
 * redis锁,原地址https://gitee.com/itopener/springboot/tree/master
 */
@Service("redisDistributedLock")
public class RedisDistributedLock extends AbstractDistributedLock {

  private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);

  private RedisTemplate<Object, Object> redisTemplate;

  private ThreadLocal<String> lockFlag = new ThreadLocal<String>();

  public static final String UNLOCK_LUA;

  static {
    StringBuilder sb = new StringBuilder();
    sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
    sb.append("then ");
    sb.append("    return redis.call(\"del\",KEYS[1]) ");
    sb.append("else ");
    sb.append("    return 0 ");
    sb.append("end ");
    UNLOCK_LUA = sb.toString();
  }

  public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) {
    super();
    this.redisTemplate = redisTemplate;
  }

  @Override
  public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
    boolean result = setRedis(key, expire);
    // 如果获取锁失败,按照传入的重试次数进行重试
    while ((!result) && retryTimes-- > 0) {
      try {
        logger.debug("lock failed, retrying..." + retryTimes);
        Thread.sleep(sleepMillis);
      } catch (InterruptedException e) {
        return false;
      }
      result = setRedis(key, expire);
    }
    return result;
  }

  //    @Override 自定义requestId
  //    public boolean lock(String key, String requestId,long expire, int retryTimes, long sleepMillis) {
  //        if(StringUtils.isEmpty(requestId)){
  //            requestId = UUID.randomUUID().toString();
  //        }
  //        boolean result = setRedis(key, expire,requestId);
  //        // 如果获取锁失败,按照传入的重试次数进行重试
  //        while ((!result) && retryTimes-- > 0) {
  //            try {
  //                logger.debug("lock failed, retrying..." + retryTimes);
  //                Thread.sleep(sleepMillis);
  //            } catch (InterruptedException e) {
  //                return false;
  //            }
  //            if(StringUtils.isEmpty(requestId)){
  //                requestId = UUID.randomUUID().toString();
  //            }
  //            result = setRedis(key, expire,requestId);
  //        }
  //        return result;
  //    }

  private boolean setRedis(String key, long expire) {
    try {
      String result = redisTemplate.execute(new RedisCallback<String>() {
        @Override
        public String doInRedis(RedisConnection connection) throws DataAccessException {
          JedisCommands commands = (JedisCommands) connection.getNativeConnection();
          String uuid = UUID.randomUUID().toString();
          lockFlag.set(uuid);
          //PX millionSecond
          return commands.set(key, uuid, "NX", "PX", expire);
        }
      });
      return !StringUtils.isEmpty(result);
    } catch (Exception e) {
      logger.error("set redis occured an exception", e);
    }
    return false;
  }

  @Override
  public boolean releaseLock(String key) {
    // 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
    try {
      List<String> keys = new ArrayList<String>();
      keys.add(key);
      List<String> args = new ArrayList<String>();
      args.add(lockFlag.get());

      // 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
      // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本

      Long result = redisTemplate.execute(new RedisCallback<Long>() {
        public Long doInRedis(RedisConnection connection) throws DataAccessException {
          Object nativeConnection = connection.getNativeConnection();
          // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
          // 集群模式
          if (nativeConnection instanceof JedisCluster) {
            return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
          }

          // 单机模式
          else if (nativeConnection instanceof Jedis) {
            return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
          }
          return 0L;
        }
      });

      return result != null && result > 0;
    } catch (Exception e) {
      logger.error("release lock occured an exception : key = {}", key, e);
    } finally {
      // 清除掉ThreadLocal中的数据,避免内存溢出
      lockFlag.remove();
    }
    return false;
  }

}


package com.hlj.redis.lock;

import com.hlj.redis.lock.utils.DistributedLock;
import com.hlj.redis.lock.utils.RedisTool;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import java.util.Date;

/**
 * @Desc:
 * @Author HealerJean
 * @Date 2018/9/13  下午12:04.
 */
@RequestMapping("redis/lock")
@Controller
public class LockController {


  //库存个数
  int goodsCount = 10;

  //卖出个数
  int saleCount = 0;

  /**
     * 缓存key-用户体力锁
     */
  public static final String TEST_LOCK = "test_lock:";

  @Resource
  private DistributedLock lock;

  @GetMapping("test")
  @ResponseBody
  public void lockRedis(){
    for (int i = 0; i < 1000; i++) {
      new Thread(() -> {
        try {Thread.sleep(2);} catch (InterruptedException e) {}
        if (lock.lock(TEST_LOCK , 3000l, 5, 500)) {
          if (goodsCount > 0) {
            goodsCount--;
            System.out.println("剩余库存:" + goodsCount + " 卖出个数" + ++saleCount);
          }
        }
        lock.releaseLock(TEST_LOCK);

      }).start();
    }
    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

  }
}


2.3、RedLock

产生背景:

分布式锁,当我们请求一个分布式锁的时候,成功了,但是这时候slave还没有复制我们的锁,master down了,我们的应用继续请求锁的时候,会从继任了master的原slave上申请,也会成功,即使这样,也可能出现问题。

支持单机,主从,哨兵,集群等模式

//单机
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
 
 
//主从
Config config = new Config();
config.useMasterSlaveServers()
    .setMasterAddress("127.0.0.1:6379")
    .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
    .addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
 
 
//哨兵
Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
    .addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
 
 
//集群
Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // cluster state scan interval in milliseconds
    .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
    .addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

2.3.1、原理

Redis中的多个master实例,来获取锁,只有大多数实例获取到了锁,才算是获取成功。具体的红锁算法分为以下五步

1、获取当前的时间(单位是毫秒)

2、用相同的key和具有唯一性的valueN个节点上请求锁(LUA脚本)。这里获取锁的尝试时间要远远小于锁的超时时间,防止某个master Down了,我们还在不断的获取锁,而被阻塞过长的时间。

3、只有在大多数节点上获取到了锁(至少N/2+1),而且获取时间小于锁的超时时间的情况下,认为锁获取成功了。

4、如果锁获取成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间。

5、如果锁获取失败了,(可能由于获取到锁的Redis节点个数少于N/2+1,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该立即向所有Redis节点发起 释放锁 的操作。。

2.3.2、代码实现

<!-- JDK 1.8+ compatible -->
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.9.0</version>
</dependency> 
@Bean(name = "redisson")
public RedissonClient redissonClient() throws IOException {
    InputStream in = this.getClass().getResourceAsStream("/redisson.yaml");
    Config config = Config.fromYAML(in);
    return Redisson.create(config);
}

redisson.yaml


singleServerConfig:
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  reconnectionTimeout: 3000
  failedAttempts: 3
  password: 123456
  subscriptionsPerConnection: 5
  clientName: null
  address: "redis://healerjean:6603"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 32
  connectionPoolSize: 64
  database: 4
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
@Service
public class CacheServiceImpl implements CacheService {


    @Autowired
    private RedissonClient redisson;

    private final String SEQNO_FORMAT = "0000";
    private final String REDIS_PREFIX = REDIS_CSF + ":";
    private final String REDIS_LOCK_PREFIX = REDIS_LOCK + ":";


    /**
     * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
     * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
     */
    @Override
    public boolean lock(String key, long timeout, TimeUnit timeUnit) {
        try {
            RLock lock = redisson.getLock(REDIS_PREFIX + REDIS_LOCK_PREFIX + key);
            return lock.tryLock(0, timeout, timeUnit);
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public void unlock(String key) {
        RLock lock = redisson.getLock(REDIS_PREFIX + REDIS_LOCK_PREFIX + key);
        lock.unlock();
    }
}

2.3.3、核心源码

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }

    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1. 允许加锁失败节点个数限制(N-(N/2+1))
     */
    int failedLocksLimit = failedLocksLimit();
    /**
     * 2. 遍历所有节点通过EVAL命令执行lua加锁
     */
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.对节点尝试加锁
         */
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 抛出异常表示获取锁失败
            lockAcquired = false;
        }

        if (lockAcquired) {
            /**
             *4. 如果获取到锁则添加到已获取锁集合中
             */
            acquiredLocks.add(lock);
        } else {
            /**
             * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
             * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
             * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
             */
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }

        /**
         * 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
         */
        if (remainTime != -1) {
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }

        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }

    /**
     * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
     */
    return true;
}

2.3.4、图解流程

线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。

线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。

image-20201218145554687

image-20201218152103903

2.3.5、解决问题

2.3.5.1、可重入加锁机制

1、Redis存储锁的数据类型是 Hash类型

2、Hash数据类型的key值包含了当前线程信息。

redissonLock.lock("redisson", 1);

image-20201218150320801

这里表面数据类型是Hash类型,Hash类型相当于我们java<key,<key1,value>> 类型,这里key是指 redisson’ 它的有效期还有9秒,我们再来看里们的key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是: uuid + 当前线程的ID。后面的value是就和可重入加锁有关。

image-20201218150500553

  

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              // 首先分布式锁的KEY不能存在,如果确实不存在,
              // 那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 获取分布式锁的KEY的失效时间毫秒数
              "return redis.call('pttl', KEYS[1]);",
              // 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

假设前面获取锁时传的key是“abc”,假设调用的线程IDThread-1,假设成员变量UUID类型的id6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

⬤ 那么KEYS[1]=abcARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

1、判断有没有一个叫“abc”的key

2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对 ,并设置它的过期时间

3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间

4、返回“abc”的生存时间(毫秒)

2.3.5.2、解决超时解锁导致并发的问题

1、Redisson 所有指令都通过 Lua 脚本执行,Redis 支持 Lua 脚本原子性执行。

2、Redisson 假如设置一个 Key 的默认过期时间为 30s,如果某个客户端持有一个锁超过了 30s 怎么办?

3、Redisson 中有一个 Watchdog 的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔 10s 帮你把 Key 的超时时间设为 30s

这样的话,就算一直持有锁也不会出现 Key 过期了,其他线程获取到锁的问题了。

4、Redisson 的“看门狗”逻辑保证了没有死锁发生。(如果机器宕机了,看门狗也就没了。此时就不会延长 Key 的过期时间,到了 30s 之后就会自动过期了,其他线程可以获取到锁)

2.3.5.3、保证原子性操作

通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性

2.3.6、极端问题

问题1:虽然说RedLock算法可以解决单点Redis分布式锁的安全性问题,但如果集群中有节点发生崩溃重启,还是会锁的安全性有影响的。具体出现问题的场景如下: 假设一共有5个Redis节点:A,B,C, D, E。设想发生了如下的事件序列:

1、客户端1成功锁住了A, B, C获取锁成功(但DE没有锁住)

2、节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了

3、节点C重启后,客户端2锁住了C, D, E,获取锁成功

这样,客户端1和客户端2同时获得了锁(针对同一资源)。针对这样场景,解决方式也很简单,也就是让Redis崩溃后延迟重启,并且这个延迟时间大于锁的过期时间就好。这样等节点重启后,所有节点上的锁都已经失效了。也不存在以上出现2个客户端获取同一个资源的情况了。 但是这个也要受限于业务上的一个锁的过期时间

3、Redis分布式锁和Zk分布式锁区别

3.1、Redis实现分布式锁

另一方面使用 Redis 实现分布式锁在很多企业中非常常见,而且大部分情况下都不会遇到所谓的“极端复杂场景”,所以使用 Redis 作为分布式锁也不失为一种好的方案,最重要的一点是 Redis 的性能很高,可以支撑高并发的获取、释放锁操作。

1、Redis 的设计定位决定了它的数据并不是强一致性的,在某些极端情况下,可能会出现问题。锁的模型不够健壮

redis sentinel集群中,我们具有多台redis,他们之间有着主从的关系,例如一主二从。我们的set命令对应的数据写到主库,然后同步到从库。当我们申请一个锁的时候,对应就是一条命令 setnx mykey myvalue ,在redis sentinel集群中,这条命令先是落到了主库。假设这时主库down了,而这条数据还没来得及同步到从库,sentinel将从库中的一台选举为主库了。

这时,我们的新主库中并没有mykey这条数据,若此时另外一个client执行 setnx mykey hisvalue , 也会成功,即也能得到锁。这就意味着,此时有两个client获得了锁。这不是我们希望看到的,虽然这个情况发生的记录很小,只会在主从failover的时候才会发生,大多数情况下、大多数系统都可以容忍,但是不是所有的系统都能容忍这种瑕疵.,锁的安全性被打破了。

针对这个问题。Redis作者antirez提出了RedLock算法来解决这个问题,但是也会有重复锁上的问题,具体看上面的的RedLock分析。

2、Redis 分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。

3、如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁

3.2、Zk分布式锁

ZK 的模型是这样的:ZK 包含一系列的节点,叫做 Znode,就好像文件系统一样,每个 Znode 表示一个目录。

有序节点:假如当前有一个父节点为 /lock,我们可以在这个父节点下面创建子节点,ZK 提供了一个可选的有序特性。

例如我们可以创建子节点“/lock/node-”并且指明有序,那么 ZK 在生成子节点时会根据当前的子节点数量自动添加整数序号。

也就是说,如果是第一个创建的子节点,那么生成的子节点为 /lock/node-0000000000,下一个节点则为 /lock/node-0000000001,依次类推。

临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,ZK 会自动删除该节点。

事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,ZK 会通知客户端。

3.2.1、ZK 实现分布式锁的落地方案:

1、使用 ZK 的临时节点和有序节点,每个线程获取锁就是在 ZK 创建一个临时有序的节点,比如在 /lock/ 目录下。

2、创建节点成功后,获取 /lock 目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。

3、如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

4、如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。

5、比如当前线程获取到的节点序号为 /lock/003,然后所有的节点列表为[/lock/001/lock/002/lock/003],则对 /lock/002 这个节点添加一个事件监听器。

如果锁释放了,会唤醒下一个序号的节点,然后重新执行第 3 步,判断是否自己的节点序号是最小。比如 /lock/001 释放了,/lock/002 监听到时间,此时节点集合为[/lock/002/lock/003],则 /lock/002 为最小序号节点,获取到锁。

3.2.2、总结

1、ZK 天生设计定位就是分布式协调,强一致性。锁的模型健壮、简单易用、适合做分布式锁。

2、如果获取不到锁,只需要添加一个监听器就可以了,不用一直轮询,性能消耗较小。

3、而 zk 的话,因为创建的是临时 znode只要客户端挂了,znode 就没了,此时就自动释放锁。

但是 ZK 也有其缺点:如果有较多的客户端频繁的申请加锁、释放锁,对于 ZK 集群的压力会比较大

ContactAuthor