一、描述
在单机环境下,想要某个资源同一时间只能一个人修改,加个synchronized就可以。但现在很多网站都是分布式部署的,就没有这么简单了,分布式场景中的数据一致性问题是很重要的。
分布式的CAP理论:一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)
最多只能同时满足两项。绝大多数会牺牲强一致性来换取系统的高可用性,往往只保证“最终一致性”,最终时间在用户可以接受的范围内即可。
分布式锁至少要确保锁的实现同时满足以下三个条件:
1.安全属性: 相互排斥。在任意一个时刻,只有一个客户端持有锁,加锁和解锁都必须是同一个客户端。
2.活性A: 无死锁。即便持有锁的客户端崩溃或者网络故障,锁仍然可以被获取。
3.活性B: 容错。只要大部分Redis节点都活着,客户端就可以获取和释放锁。
二、用Redis实现
参考了网上一些大牛的写法,第一版代码实现如下,环境是Redis单机情况下。
加锁
会先判断有没有锁,没有就获取并设置有效时间,有就不操作,核心只有一行代码,保证了原子性,要么成功要么失败。
public static boolean addlock(Jedis jedis,String lockk,String lockv){
String result = jedis.set(lockk, lockv, "NX", "PX", 5000);
jedis.close();
if ("OK".equals(result)){
return true;
}
return false;
}
解锁
会先获取锁,然后对比内容,是我的锁就解锁,不是就不操作,核心只有一行代码,保证了原子性,要么成功要么失败。
public static boolean dellock(Jedis jedis,String lockk,String lockv){
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 'Fail' end";
Object result = jedis.eval(script, 1, lockk,lockv);
jedis.close();
if (!"Fail".equals(result)){
return true;
}
return false;
}
测试代码
new Thread(new Runnable() {
@Override
public void run() {
while (true){
if (addlock(RedisPool.getJedis(),"l","a")){
System.out.println(Thread.currentThread() + "业务a" + is++);
dellock(RedisPool.getJedis(),"l","a");
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
if (addlock(RedisPool.getJedis(),"l","b")){
System.out.println(Thread.currentThread() + "业务b" + is++);
dellock(RedisPool.getJedis(),"l","b");
}
}
}
}).start();
两个进程按顺序在输出,没有乱序的现象,但这样写存在一个问题,当我的业务所需要的时间超过锁的时间时,就会出现两个进程同时进行一个业务
最终代码
给执行业务的进程分配一个守护进程,定时给锁重置时间,关闭守护进程的时候释放掉锁
//加锁和解锁
public static Thread addlock(Jedis jedis,String lockk,String lockv){
String result = jedis.set(lockk, lockv, "NX", "PX", 5000);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
Thread.sleep(4000);
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('expire',KEYS[1],5) else return 'Fail' end";
jedis.eval(script, 1, lockk,lockv);
} catch (Exception e) {
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 'Fail' end";
jedis.eval(script, 1, lockk,lockv);
jedis.close();
System.out.println("释放锁");
break;
}
}
}
});
if ("OK".equals(result)){
System.out.println("添加锁");
thread.start();
return thread;
}
return null;
}
测试代码
new Thread(new Runnable() {
@Override
public void run() {
while (true){
Thread thread = addlock(RedisPool.getJedis(),"l","a");
if (thread != null){
System.out.println(Thread.currentThread() + "业务a" + is++);
thread.interrupt();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
Thread thread = addlock(RedisPool.getJedis(),"l","b");
if (thread != null){
System.out.println(Thread.currentThread() + "业务b" + is++);
thread.interrupt();
}
}
}
}).start();
三、用Redisson实现
既然有轮子就直接用好了,单Redis节点环境
导入所需jar包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
</dependency>
Redisson配置类
private Config config = new Config();
private RedissonClient redissonClient = null;
private void init() {
try {
config.useSingleServer().setAddress("redis://192.168.137.8:6379");
redissonClient = Redisson.create(config);
} catch (Exception e) {
e.printStackTrace();
}
}
public RedissonManager() {
init();
}
public RedissonClient getRedisson() {
return redissonClient;
}
测试类
public static Integer is = 0;
public static RedissonManager redissonManager = new RedissonManager();
public static void main(String[] args) {
RLock rlock = redissonManager.getRedisson().getLock("Lock");
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
boolean islock = false;
try {
if (islock = rlock.tryLock(0, 50, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread() + "业务a" + is++);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (islock){
rlock.unlock();
}
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
boolean islock = false;
try {
if (islock = rlock.tryLock(0, 50, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread() + "业务b" + is++);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (islock){
rlock.unlock();
}
}
}
}
}).start();
}