Skip to content

Redis分布式锁

1.问题描述

​ 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁主流的实现方案:

  1. 基于数据库实现分布式锁
  2. 基于缓存(Redis等)
  3. 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  1. 性能:redis最高
  2. 可靠性:zookeeper最高

这里,我们就基于redis实现分布式锁。

2.解决方案:redis实现分布式锁

1.思路分析

setnx:只有建不存在时,才对建进行设置操作。设置成功返回true设置失败返回false

image-20220823073830682

1.当多个客户端同时请求时,都会先去获取锁 setnx

2.获取成功,则执行业务逻辑,执行完成之后删除锁(del(“lock”))已完成锁的释放。

3.其他客户端等待重试,比如线程等待几秒后再次尝试获取锁

2.代码实现

1.创建SpringBoot工程,引入相关依赖

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.llp</groupId>
	<artifactId>Springboot-redis</artifactId>
	<version>1.0-SNAPSHOT</version>
	<!--导入springboot父工程-规定写法-->
	<parent>
		<artifactId>spring-boot-starter-parent</artifactId>
		<groupId>org.springframework.boot</groupId>
		<version>2.5.3</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!--引入redis-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<!--redis连接池-->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-pool2</artifactId>
		</dependency>
	</dependencies>

</project>

2.application.yaml, redis集群配置

yaml
spring:
  application:
    name: spring-boot-redis
  redis:
  #redis集群配置,这里是在一台主机上模拟
    cluster:
      nodes:
          - 192.168.79.201:6379
          - 192.168.79.201:6380
          - 192.168.79.201:6381
          - 192.168.79.201:6390
          - 192.168.79.201:6391
          - 192.168.79.201:6389
    connect-timeout: 6000

3.redis序列化配置

java
@EnableCaching
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofSeconds(600))
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }

}

4.创建测试类,实现分布式锁

java
@RestController
public class RedisTestController {

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("/testLock")
    public void testLock(){
        //1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
        //2获取锁成功、查询num的值
        if(lock){

            /**********业务逻辑start**********/
            //从缓存中获取num值
            Object value = redisTemplate.opsForValue().get("num");
            //如果值为空则直接返回
            if(ObjectUtils.isEmpty(value)){
                return;
            }
            //如果有值则转成int
            int num = Integer.parseInt(value+"");
            //把redis的num加1
            redisTemplate.opsForValue().set("num", ++num);
            /**********业务逻辑end**********/

            //释放锁
            redisTemplate.delete("lock");
        }else{
            try {
                //如果获取锁失败,则等待三秒再次尝试获取锁
                Thread.sleep(3000);
                testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

3.存在的问题

问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放。

以上面的代码为例,当redis缓存中不存在num时,value为空那么程序就不会取调用 redisTemplate.delete("lock");删除锁,进而导致锁一直得不到释放。当别的请求打进来 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");始终返回false,无法正常执行业务逻辑。

java
/**********业务逻辑start**********/
            //从缓存中获取num值
            Object value = redisTemplate.opsForValue().get("num");
            //如果值为空则直接返回
            if(ObjectUtils.isEmpty(value)){
                return;
            }
            //如果有值则转成int
            int num = Integer.parseInt(value+"");
            //把redis的num加1
            redisTemplate.opsForValue().set("num", ++num);
            /**********业务逻辑end**********/

解决:设置过期时间,自动释放锁。

4.优化之设置锁的过期时间

设置过期时间有两种方式:

  1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)
  2. 在setnx时指定过期时间(推荐)

image-20220823083657873

设置过期时间:

压力测试肯定也没有问题。自行测试

**问题:**可能会释放其他服务器的锁。

**场景:**如果业务逻辑的执行时间是7s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。
  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。
  3. index3获取到锁,执行业务逻辑
  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

最终等于没锁的情况。

**解决:**setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁

java
@RestController
public class RedisTestController {

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("/testLock")
    public void testLock(){
        //1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true 
        //设置过期时间的长短根据业务执行的时间而定
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS);
        //2获取锁成功、查询num的值
        if(lock){

            /**********业务逻辑start**********/
            //从缓存中获取num值
            Object value = redisTemplate.opsForValue().get("num");
            //如果值为空则直接返回
            if(ObjectUtils.isEmpty(value)){
                return;
            }
            //如果有值则转成int
            int num = Integer.parseInt(value+"");
            //把redis的num加1
            redisTemplate.opsForValue().set("num", ++num);
            /**********业务逻辑end**********/

            //释放锁
            redisTemplate.delete("lock");
        }else{
            try {
                //如果获取锁失败,则等待三秒再次尝试获取锁
                Thread.sleep(3000);
                testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

5.优化之UUID防误删

image-20220823084416381

java
@RestController
public class RedisTestController {

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("/testLock")
    public void testLock(){
        String uuid = UUID.randomUUID().toString();
        //1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true
        //设置过期时间的长短根据业务执行的时间而定
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS);
        //2获取锁成功、查询num的值
        if(lock){

            /**********业务逻辑start**********/
            //从缓存中获取num值
            Object value = redisTemplate.opsForValue().get("num");
            //如果值为空则直接返回
            if(ObjectUtils.isEmpty(value)){
                return;
            }
            //如果有值则转成int
            int num = Integer.parseInt(value+"");
            //把redis的num加1
            redisTemplate.opsForValue().set("num", ++num);
            /**********业务逻辑end**********/
            //释放各自的锁
            if(uuid.equals((String) redisTemplate.opsForValue().get("lock"))){
				this.redisTemplate.delete("lock")
            }
        }else{
            try {
                //如果获取锁失败,则等待三秒再次尝试获取锁
                Thread.sleep(3000);
                testLock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

问题:删除操作缺乏原子性。

场景:

  1. index1执行删除时,查询到的lock值确实和uuid相等

uuid=v1

set(lock,uuid);

java
if(uuid.equals((String) redisTemplate.opsForValue().get("lock"))){
            this.redisTemplate.delete("lock");
        }
  1. index1执行删除前,lock刚好过期时间已到,被redis自动释放

在redis中没有了lock,没有了锁。 this.redisTemplate.delete("lock");

  1. index2获取了lock

index2线程获取到了cpu的资源,开始执行方法

uuid=v2

set(lock,uuid);

  1. index1执行删除,此时会把index2的lock删除

index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行 this.redisTemplate.delete("lock");删除的index2的锁!

6.优化之LUA脚本保证删除的原子性

java
@GetMapping("/testLock")
public void testLock(){
    String uuid = UUID.randomUUID().toString();
    //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
    String skuId = "25"; // 访问skuId 为25号的商品 100008348542
    String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
    //1获取锁,setnx 如果key存在值返回false设置值失败,如果key不存在则设置值成功返回true
    //设置过期时间的长短根据业务执行的时间而定
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid,3, TimeUnit.SECONDS);
    //2获取锁成功、查询num的值
    if(lock){

        /**********业务逻辑start**********/
        //从缓存中获取num值
        Object value = redisTemplate.opsForValue().get("num");
        //如果值为空则直接返回
        if(ObjectUtils.isEmpty(value)){
            return;
        }
        //如果有值则转成int
        int num = Integer.parseInt(value+"");
        //把redis的num加1
        redisTemplate.opsForValue().set("num", ++num);
        /**********业务逻辑end**********/
        /*使用lua脚本来锁*/
        // 定义lua 脚本
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用redis执行lua执行
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 设置一下返回值类型 为Long
        // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
        // 那么返回字符串与0 会有发生错误。
        redisScript.setResultType(Long.class);
        // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
        redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
    }else{
        try {
            //如果获取锁失败,则等待三秒再次尝试获取锁
            Thread.sleep(3000);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

7.总结

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件

- 互斥性。在任意时刻,只有一个客户端能持有锁。

- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

- 加锁和解锁必须具有原子性。