Redis之_千帆竞发_分布式锁
前言
Github:https://github.com/HealerJean
1、Redis
分布式锁的实现
1.1、Redis
锁问题和解决方案
1.1.1、 SETNX
+ EXPIRE
非原子性
看到这个命令的时候,分布式锁开发的时候
SET
命令还不支持NX
、PX
,所以才想出这种办法来实现key
过期,NX
、PX
在2.6.12以后开始支持加锁命令:
SETNX key value
,当键不存在时,对键进行设置操作并返回成功,否则返回失败。KEY
是锁的唯一标识,一般按业务来决定命名。锁超时:
EXPIRE key timeout
, 设置 key 的超时时间,以保证即使锁没有被显式释放,锁也可以在一定时间后自动释放,避免资源被永远锁住。解锁命令:
DEL key
,通过删除键值对释放锁,以便其他线程可以通过 SETNX 命令来获取锁。
if (setnx(key, 1) == 1){
expire(key, 30)
try {
//TODO 业务逻辑
} finally {
del(key)
}
}
不能保证原子性原因:
如果
SETNX
成功,在设置锁超时时间时,服务器挂掉、重启或网络问题等,导致EXPIRE
命令没有执行,锁没有设置超时时间变成死锁。这种问题的根源就在于
setnx
和expire
是两条指令而不是原子指令。如果这两条指令可 以一起执行就不会出现问题。也许你会想到用Redis
事务来解决原子性问题。但是这里不行,因为
expire
是依赖于setnx
的执行结果的,如果setnx
没抢到锁,expire
是不应该执行的。事务里没有if- else
分支逻辑,事务的特点是一口气执行,要么全部执行要么一个都不执行。
1.1.3、锁误解除:不直接删除key
现象:
1、如果线程
A
成功获取到了锁,并且设置了过期时间30
秒,但线程 A 执行时间超过了30
秒,锁过期自动释放2、此时线程
B
获取到了锁;3、随后由于线程A 执行完成,线程 A 使用 DEL 命令来释放锁
4、但此时线程
B
加的锁还没有执行完成,线程A
实际释放的线程B
加的锁。5、这时线程
C3
来直接就加锁成功
解决:
答:通过在
value
中设置当前线程加锁的标识,在删除之前验证·key
对应的value
判断锁是否是当前线程持有。可生成一个UUID
标识当前线程,使用lua
脚本做验证标识和解锁操作,这样能保证只删除自己加的锁。释放锁时先匹配 随机数
UUID
是否一致,然后再删除key
。但是匹配value
和删除key
不是一个原子操作,Redis
也 没有提供类似于del if equals
这样的指令,这就需要使用Lua
脚本来处理了,因为Lua
脚本可 以保证连续多个指令的原子性执行
1.1.3、超时解锁导致并发
现象:
如果线程
A
成功获取锁并设置过期时间30
秒,但线程 A 执行时间超过了30
秒,锁过期自动释放,此时线程B
获取到了锁,线程A
和线程B
并发执行。解决
答:
A
、B
两个线程发生并发显然是不被允许的,一般有两种方式解决该问题:1、将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成。
2、为获取锁的线程增加守护线程,为将要过期但未释放的锁增加有效时间。 (
Redision
看门狗)
1.1.4、 不可重入
现象:
当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。
Redis
可通过对锁进行重入计数,加锁时加1
,解锁时减1
,当计数归 0 时释放锁。
代码解决:
Java
中使用ThreadLocal
进行重入次数统计
private static ThreadLocal<Map<String, Integer>> LOCKERS = ThreadLocal.withInitial(HashMap::new);
// 加锁
public boolean lock(String key) {
Map<String, Integer> lockers = LOCKERS.get();
if (lockers.containsKey(key)) {
lockers.put(key, lockers.get(key) + 1);
return true;
} else {
if (SET key uuid NX EX 30) {
lockers.put(key, 1);
return true;
}
}
return false;
}
// 解锁
public void unlock(String key) {
Map<String, Integer> lockers = LOCKERS.get();
if (lockers.getOrDefault(key, 0) == 1) {
lockers.remove(key);
//DEL key (解锁方式看后面)
} else {
lockers.put(key, lockers.get(key) - 1);
}
}
1.1.5、不能阻塞等待锁释放:
原因:
一般情况下,上锁后执行都是立即返回的,不能一直阻塞等待锁释放
答案:
可以通过客户端轮询的方式解决该问题,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率。
2、分布式锁的实现
2.1、Jedis
上锁
public class RedisTool {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
2.1.1、上锁
第一个为
key
:我们使用key
来当锁,因为key
是唯一的。第二个为
value
:我们传的是requestId
,很多童鞋可能不明白,有key
作为锁不就够了吗,为什么还要用到value
?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给
value
赋值为requestId
,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId
可以使用UUID.randomUUID().toString()
方法生成。第三个为
nxxx
,这个参数我们填的是NX
,意思是SET IF NOT EXIST
,即当key
不存在时,我们进行set
操作;若key
已经存在,则不做任何操作;第四个为
expx
,这个参数我们传的是PX
,意思是我们要给这个key
加一个过期的设置,具体时间由第五个参数决定。第五个为
time
,与第四个参数相呼应,代表key
的过期时间。
2.1.2、解锁
首先获取锁对应的
valu
e值,检查是否与requestId
相等,如果相等则删除锁(解锁)。那么为什么要使用Lua
语言来实现呢?答:因为要确保上述操作是原子性的。关于非原子性会带来什么问题 。
那么为什么执行
eval()
方法可以确保原子性答:源于
Redis
的特性,eval
命令执行Lua
代码的时候,Lua
代码将被当成一个命令去执行,并且直到eval
命令执行完成,Redis
才会执行其他命令。
2.2、redisTemplate
锁
package com.hlj.redis.lock.utils;
/**
* redis锁,原地址https://gitee.com/itopener/springboot/tree/master
*/
public interface DistributedLock {
public static final long TIMEOUT_MILLIS = 30000; //超时时间
public static final int RETRY_TIMES = Integer.MAX_VALUE; //重试次数
public static final long SLEEP_MILLIS = 500; //重试时 线程休眠次数
public boolean lock(String key);
public boolean lock(String key, int retryTimes);
public boolean lock(String key, int retryTimes, long sleepMillis);
public boolean lock(String key, long expire);
public boolean lock(String key, long expire, int retryTimes);
public boolean lock(String key, long expire, int retryTimes, long sleepMillis);
/** 自定义requestId
* requestId可以空null ,为null的时候,选择UUID
* @param key
* @param requestId
* @param expire
* @param retryTimes
* @param sleepMillis
* @return
*/
// public boolean lock(String key, String requestId,long expire, int retryTimes, long sleepMillis);
public boolean releaseLock(String key);
}
package com.hlj.redis.lock.utils;
/**
* redis锁,原地址https://gitee.com/itopener/springboot/tree/master
*/
public abstract class AbstractDistributedLock implements DistributedLock {
@Override
public boolean lock(String key) {
return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, int retryTimes) {
return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, int retryTimes, long sleepMillis) {
return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
}
@Override
public boolean lock(String key, long expire) {
return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
}
@Override
public boolean lock(String key, long expire, int retryTimes) {
return lock(key, expire, retryTimes, SLEEP_MILLIS);
}
}
package com.hlj.redis.lock.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* redis锁,原地址https://gitee.com/itopener/springboot/tree/master
*/
@Service("redisDistributedLock")
public class RedisDistributedLock extends AbstractDistributedLock {
private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);
private RedisTemplate<Object, Object> redisTemplate;
private ThreadLocal<String> lockFlag = new ThreadLocal<String>();
public static final String UNLOCK_LUA;
static {
StringBuilder sb = new StringBuilder();
sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call(\"del\",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
}
public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) {
super();
this.redisTemplate = redisTemplate;
}
@Override
public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
boolean result = setRedis(key, expire);
// 如果获取锁失败,按照传入的重试次数进行重试
while ((!result) && retryTimes-- > 0) {
try {
logger.debug("lock failed, retrying..." + retryTimes);
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
return false;
}
result = setRedis(key, expire);
}
return result;
}
// @Override 自定义requestId
// public boolean lock(String key, String requestId,long expire, int retryTimes, long sleepMillis) {
// if(StringUtils.isEmpty(requestId)){
// requestId = UUID.randomUUID().toString();
// }
// boolean result = setRedis(key, expire,requestId);
// // 如果获取锁失败,按照传入的重试次数进行重试
// while ((!result) && retryTimes-- > 0) {
// try {
// logger.debug("lock failed, retrying..." + retryTimes);
// Thread.sleep(sleepMillis);
// } catch (InterruptedException e) {
// return false;
// }
// if(StringUtils.isEmpty(requestId)){
// requestId = UUID.randomUUID().toString();
// }
// result = setRedis(key, expire,requestId);
// }
// return result;
// }
private boolean setRedis(String key, long expire) {
try {
String result = redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands) connection.getNativeConnection();
String uuid = UUID.randomUUID().toString();
lockFlag.set(uuid);
//PX millionSecond
return commands.set(key, uuid, "NX", "PX", expire);
}
});
return !StringUtils.isEmpty(result);
} catch (Exception e) {
logger.error("set redis occured an exception", e);
}
return false;
}
@Override
public boolean releaseLock(String key) {
// 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
try {
List<String> keys = new ArrayList<String>();
keys.add(key);
List<String> args = new ArrayList<String>();
args.add(lockFlag.get());
// 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
// spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本
Long result = redisTemplate.execute(new RedisCallback<Long>() {
public Long doInRedis(RedisConnection connection) throws DataAccessException {
Object nativeConnection = connection.getNativeConnection();
// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
// 单机模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
}
return 0L;
}
});
return result != null && result > 0;
} catch (Exception e) {
logger.error("release lock occured an exception : key = {}", key, e);
} finally {
// 清除掉ThreadLocal中的数据,避免内存溢出
lockFlag.remove();
}
return false;
}
}
package com.hlj.redis.lock;
import com.hlj.redis.lock.utils.DistributedLock;
import com.hlj.redis.lock.utils.RedisTool;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.util.Date;
/**
* @Desc:
* @Author HealerJean
* @Date 2018/9/13 下午12:04.
*/
@RequestMapping("redis/lock")
@Controller
public class LockController {
//库存个数
int goodsCount = 10;
//卖出个数
int saleCount = 0;
/**
* 缓存key-用户体力锁
*/
public static final String TEST_LOCK = "test_lock:";
@Resource
private DistributedLock lock;
@GetMapping("test")
@ResponseBody
public void lockRedis(){
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
try {Thread.sleep(2);} catch (InterruptedException e) {}
if (lock.lock(TEST_LOCK , 3000l, 5, 500)) {
if (goodsCount > 0) {
goodsCount--;
System.out.println("剩余库存:" + goodsCount + " 卖出个数" + ++saleCount);
}
}
lock.releaseLock(TEST_LOCK);
}).start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.3、RedLock
产生背景:
分布式锁,当我们请求一个分布式锁的时候,成功了,但是这时候
slave
还没有复制我们的锁,master down
了,我们的应用继续请求锁的时候,会从继任了master
的原slave上
申请,也会成功,即使这样,也可能出现问题。支持单机,主从,哨兵,集群等模式
//单机
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
//主从
Config config = new Config();
config.useMasterSlaveServers()
.setMasterAddress("127.0.0.1:6379")
.addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
.addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
//哨兵
Config config = new Config();
config.useSentinelServers()
.setMasterName("mymaster")
.addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
.addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
//集群
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // cluster state scan interval in milliseconds
.addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
.addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);
2.3.1、原理
用
Redis
中的多个master
实例,来获取锁,只有大多数实例获取到了锁,才算是获取成功。具体的红锁算法分为以下五步:1、获取当前的时间(单位是毫秒)
2、用相同的
key
和具有唯一性的value
在N
个节点上请求锁(LUA
脚本)。这里获取锁的尝试时间要远远小于锁的超时时间,防止某个master
Down
了,我们还在不断的获取锁,而被阻塞过长的时间。3、只有在大多数节点上获取到了锁(至少
N/2+1
),而且获取时间小于锁的超时时间的情况下,认为锁获取成功了。4、如果锁获取成功了,那么这个锁的有效时间应该重新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间。
5、如果锁获取失败了,(可能由于获取到锁的
Redis
节点个数少于N/2+1
,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该立即向所有Redis
节点发起 释放锁 的操作。。
2.3.2、代码实现
<!-- JDK 1.8+ compatible -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.9.0</version>
</dependency>
@Bean(name = "redisson")
public RedissonClient redissonClient() throws IOException {
InputStream in = this.getClass().getResourceAsStream("/redisson.yaml");
Config config = Config.fromYAML(in);
return Redisson.create(config);
}
redisson.yaml
singleServerConfig:
idleConnectionTimeout: 10000
pingTimeout: 1000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
reconnectionTimeout: 3000
failedAttempts: 3
password: 123456
subscriptionsPerConnection: 5
clientName: null
address: "redis://healerjean:6603"
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
connectionMinimumIdleSize: 32
connectionPoolSize: 64
database: 4
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
@Service
public class CacheServiceImpl implements CacheService {
@Autowired
private RedissonClient redisson;
private final String SEQNO_FORMAT = "0000";
private final String REDIS_PREFIX = REDIS_CSF + ":";
private final String REDIS_LOCK_PREFIX = REDIS_LOCK + ":";
/**
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
@Override
public boolean lock(String key, long timeout, TimeUnit timeUnit) {
try {
RLock lock = redisson.getLock(REDIS_PREFIX + REDIS_LOCK_PREFIX + key);
return lock.tryLock(0, timeout, timeUnit);
} catch (Exception e) {
return false;
}
}
@Override
public void unlock(String key) {
RLock lock = redisson.getLock(REDIS_PREFIX + REDIS_LOCK_PREFIX + key);
lock.unlock();
}
}
2.3.3、核心源码
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
/**
* 1. 允许加锁失败节点个数限制(N-(N/2+1))
*/
int failedLocksLimit = failedLocksLimit();
/**
* 2. 遍历所有节点通过EVAL命令执行lua加锁
*/
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
/**
* 3.对节点尝试加锁
*/
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}
if (lockAcquired) {
/**
*4. 如果获取到锁则添加到已获取锁集合中
*/
acquiredLocks.add(lock);
} else {
/**
* 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
*/
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
/**
* 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
*/
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
/**
* 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
*/
return true;
}
2.3.4、图解流程
线程去获取锁,获取成功: 执行
lua
脚本,保存数据到redis
数据库。线程去获取锁,获取失败: 一直通过
while
循环尝试获取锁,获取成功后,执行lua
脚本,保存数据到redis
数据库。
2.3.5、解决问题
2.3.5.1、可重入加锁机制
1、
Redis
存储锁的数据类型是Hash
类型2、
Hash
数据类型的key
值包含了当前线程信息。
redissonLock.lock("redisson", 1);
这里表面数据类型是Hash
类型,Hash
类型相当于我们java
的 <key,<key1,value>>
类型,这里key
是指 redisson
’ 它的有效期还有9
秒,我们再来看里们的key1
值为078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的组成是: uuid
+ 当前线程的ID
。后面的value
是就和可重入加锁有关。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先分布式锁的KEY不能存在,如果确实不存在,
// 那么执行hset命令(hset REDLOCK_KEY uuid+threadId 1),并通过pexpire设置失效时间(也是锁的租约时间)
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式锁的KEY已经存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 获取分布式锁的KEY的失效时间毫秒数
"return redis.call('pttl', KEYS[1]);",
// 这三个参数分别对应KEYS[1],ARGV[1]和ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
假设前面获取锁时传的key
是“abc”,假设调用的线程ID
是Thread-1
,假设成员变量UUID
类型的id
是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c
⬤ 那么KEYS[1]
=abc
,ARGV[2]
=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
1、判断有没有一个叫“abc
”的key
2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
”,值为“1
”的键值对 ,并设置它的过期时间
3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
”是否存在,若存在,则其值加1
,并重新设置过期时间
4、返回“abc
”的生存时间(毫秒)
2.3.5.2、解决超时解锁导致并发的问题
1、Redisson
所有指令都通过 Lua
脚本执行,Redis
支持 Lua
脚本原子性执行。
2、Redisson
假如设置一个 Key
的默认过期时间为 30s
,如果某个客户端持有一个锁超过了 30s
怎么办?
3、Redisson
中有一个 Watchdog
的概念,翻译过来就是看门狗,它会在你获取锁之后,每隔 10s
帮你把 Key
的超时时间设为 30s
。
这样的话,就算一直持有锁也不会出现 Key
过期了,其他线程获取到锁的问题了。
4、Redisson
的“看门狗”逻辑保证了没有死锁发生。(如果机器宕机了,看门狗也就没了。此时就不会延长 Key
的过期时间,到了 30s
之后就会自动过期了,其他线程可以获取到锁)
2.3.5.3、保证原子性操作
通过封装在
lua
脚本中发送给redis
,而且redis
是单线程的,这样就保证这段复杂业务逻辑执行的原子性
2.3.6、极端问题
问题1:虽然说RedLock
算法可以解决单点Redis
分布式锁的安全性问题,但如果集群中有节点发生崩溃重启,还是会锁的安全性有影响的。具体出现问题的场景如下: 假设一共有5个Redis
节点:A
,B
,C
, D
, E
。设想发生了如下的事件序列:
1、客户端1
成功锁住了A
, B
, C
,获取锁成功(但D
和E
没有锁住)
2、节点C
崩溃重启了,但客户端1在C
上加的锁没有持久化下来,丢失了
3、节点C
重启后,客户端2
锁住了C, D, E,获取锁成功
这样,客户端1和客户端2同时获得了锁(针对同一资源)。针对这样场景,解决方式也很简单,也就是让Redis
崩溃后延迟重启,并且这个延迟时间大于锁的过期时间就好。这样等节点重启后,所有节点上的锁都已经失效了。也不存在以上出现2个客户端获取同一个资源的情况了。 但是这个也要受限于业务上的一个锁的过期时间
2.3.7、Redlock
使用场景
如果你很在乎高可用性,希望挂了一台
redis
完全不受影响,那就应该考虑redlock
。不 过代价也是有的,需要更多的redis
实例,性能也下降了,代码上还需要引入额外的library
,运维上也需要特殊对待,这些都是需要考虑的成本,使用前请再三斟酌。
3、Redis
分布式锁和Zk
分布式锁区别
3.1、Redis
实现分布式锁
另一方面使用
Redis
实现分布式锁在很多企业中非常常见,而且大部分情况下都不会遇到所谓的“极端复杂场景”,所以使用 Redis 作为分布式锁也不失为一种好的方案,最重要的一点是Redis
的性能很高,可以支撑高并发的获取、释放锁操作。
1、Redis
的设计定位决定了它的数据并不是强一致性的,在某些极端情况下,可能会出现问题。锁的模型不够健壮。
在redis sentinel
集群中,我们具有多台redis
,他们之间有着主从的关系,例如一主二从。我们的se
t命令对应的数据写到主库,然后同步到从库。当我们申请一个锁的时候,对应就是一条命令 setnx mykey myvalue
,在redis sentinel
集群中,这条命令先是落到了主库。假设这时主库down
了,而这条数据还没来得及同步到从库,sentinel
将从库中的一台选举为主库了。
这时,我们的新主库中并没有mykey
这条数据,若此时另外一个client
执行 setnx mykey hisvalue
, 也会成功,即也能得到锁。这就意味着,此时有两个client
获得了锁。这不是我们希望看到的,虽然这个情况发生的记录很小,只会在主从failover
的时候才会发生,大多数情况下、大多数系统都可以容忍,但是不是所有的系统都能容忍这种瑕疵.,锁的安全性被打破了。
针对这个问题。Redis
作者antirez
提出了RedLock
算法来解决这个问题,但是也会有重复锁上的问题,具体看上面的的RedLock
分析。
2、Redis
分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。
3、如果是 redis
获取锁的那个客户端 出现 bug
挂了,那么只能等待超时时间之后才能释放锁;
3.2、Zk
分布式锁
ZK
的模型是这样的:ZK
包含一系列的节点,叫做Znode
,就好像文件系统一样,每个Znode
表示一个目录。
有序节点:假如当前有一个父节点为 /lock
,我们可以在这个父节点下面创建子节点,ZK
提供了一个可选的有序特性。
例如我们可以创建子节点“/lock/node
-”并且指明有序,那么 ZK
在生成子节点时会根据当前的子节点数量自动添加整数序号。
也就是说,如果是第一个创建的子节点,那么生成的子节点为 /lock/node-0000000000,下一个节点则为 /lock/node-0000000001,依次类推。
临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,ZK
会自动删除该节点。
事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,ZK 会通知客户端。
3.2.1、ZK
实现分布式锁的落地方案:
1、使用 ZK
的临时节点和有序节点,每个线程获取锁就是在 ZK
创建一个临时有序的节点,比如在 /lock/
目录下。
2、创建节点成功后,获取 /lock
目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。
3、如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。
4、如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。
5、比如当前线程获取到的节点序号为 /lock/003
,然后所有的节点列表为[/lock/001
,/lock/002
,/lock/003
],则对 /lock/002
这个节点添加一个事件监听器。
如果锁释放了,会唤醒下一个序号的节点,然后重新执行第 3 步,判断是否自己的节点序号是最小。比如 /lock/001
释放了,/lock/002
监听到时间,此时节点集合为[/lock/002
,/lock/003
],则 /lock/002
为最小序号节点,获取到锁。
3.2.2、总结
1、ZK
天生设计定位就是分布式协调,强一致性。锁的模型健壮、简单易用、适合做分布式锁。
2、如果获取不到锁,只需要添加一个监听器就可以了,不用一直轮询,性能消耗较小。
3、而 zk
的话,因为创建的是临时 znode
,只要客户端挂了,znode
就没了,此时就自动释放锁。
但是 ZK
也有其缺点:如果有较多的客户端频繁的申请加锁、释放锁,对于 ZK
集群的压力会比较大。
4、LUA
脚本为什么是原子性的
“Atomicity of scripts
Redis uses the same Lua interpreter to run all the commands. Also Redis guarantees that a script is executed in an atomic way: no other script or Redis command will be executed while a script is being executed. This semantic is similar to the one of MULTI / EXEC. From the point of view of all the other clients the effects of a script are either still not visible or already completed.”
Redis
使用(支持)相同的Lua
解释器,来运行所有的命令。Redis
还保证脚本以原子方式执行:在执行脚本时,不会执行其他脚本或Redis
命令(我理解单线程的原因,而且类似于事务)。这个语义类似于
MULTI
(开启事务)/EXE
C(触发事务,一并执行事务中的所有命令)。从所有其他客户端的角度来看,脚本的效果要么仍然不可见,要么已经完成。但是一定要注意,语法不要有错,否则还是会有问题的