数据库

分布式锁常见的解决方案,你知道几种

时间:2010-12-5 17:23:32  作者:系统运维   来源:应用开发  查看:  评论:0
内容摘要:前言1、什么是分布式锁要介绍分布式锁,首先要知道与分布式锁相对应的是线程锁、进程锁。1.线程锁主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在

前言

1、分布什么是式锁分布式锁

要介绍分布式锁,首先要知道与分布式锁相对应的解道种是线程锁、进程锁。决方

1.线程锁

主要用来给方法、案知代码块加锁。分布当某个方法或代码使用锁,式锁在同一时刻仅有一个线程执行该方法或该代码段。解道种线程锁只在同一JVM中有效果,决方因为线程锁的案知实现在根本上是依靠线程之间共享内存实现的,比如Synchronized、分布Lock等。式锁

2.进程锁

为了控制多个进程访问某个共享资源,解道种因为进程具有独立性,决方各个进程无法访问其他进程的案知资源,因此无法通过synchronized等线程锁实现进程锁。

3.分布式锁

当在分布式系统中,一个实例往往具有多个节点。这时候就面临多个进程对同一资源资源的访问。因此jvm级别的锁满足不了需求了。这个时候就需要用分布式锁控制访问的资源

2、分布式锁的特点

1、互斥性:任意时刻,亿华云只能有一个客户端获取锁,不能同时有两个客户端获取到锁。

2、安全性:锁只能被持有该锁的客户端删除,不能由其它客户端删除。

3、死锁:获取锁的客户端因为某些原因(如down机等)而未能释放锁,其它客户端再也无法获取到该锁。

4、容错:当部分节点(redis节点等)down机时,客户端仍然能够获取锁和释放锁。

3、常见分布式锁的解决方案

1、基于数据库

2、基于ZooKeeper

3、etcd

3、基于redis(推荐)

4、数据库

两种实现

唯一索引排它锁

1、基于表实现的分布式锁

CREATE TABLE `methodLock` (

`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 主键,

`method_name` varchar(64) NOT NULL DEFAULT COMMENT 锁定的方法名,

`node_info` varchar(64) NOT NULL DEFAULT COMMENT 结点信息/线程信息,

`count` int NOT NULL DEFAULT 0 COMMENT 锁的次数,实现可重入,

`desc` varchar(1024) NOT NULL DEFAULT 备注信息,

`update_time` timestamp NOT NULL DEFAULT now() ON UPDATE now() COMMENT 保存数据时间,自动生成,

PRIMARY KEY (`id`),

UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=锁定中的方法;

`当我们想要锁住某个方法时,执行以下SQL:

insert into methodLock(method_name,node_info,desc) values (method_name,node_info,desc)

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的源码库那个线程获得了该方法的锁,可以执行方法体内容。

当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:

delete from methodLock where method_name =method_name

2、借助数据库的排他锁

排他锁又称写锁、独占锁,如果事务T对数据A加上排他锁后,则其他事务不能再对A加任何类型的封锁。获准排他锁的事务既能读数据,又能修改数据在查询语句后面增加FOR UPDATE,MySQL 就会对查询结果中的每行都加排他锁,当没有其他线程对查询结果集中的任何一行使用排他锁时,可以成功申请排他锁,否则会被阻塞。

SELECT ... FOR UPDATE;

伪代码实现

//阻塞试获取锁

//事务保证原子性

@Transactional

public void lock(){

if(select * from methodLock where method_name=xxx for update ==>有数据){

//有数据,表示资源已经被加锁,需要判断是否是重入

if(current==resultNodeInfo){

//是自己加的锁,增加count,表示可重入

update methodLock set count=count+1 where method_name=xxx

return true;

}else{

return false;

}

}

insert into methodLock(method_name,desc) values (‘method_name’,‘desc’);

return true;

}

//非阻塞

public bool trylock(){

long endTimeout=System.currentTimeMills()+timeout;

while(true){

if(mysqlLock.lock()){

return true;

}

//判断是否超时,如果超时。云南idc服务商枷锁失败

if(endTimeout

return false;

}

}

}

//释放锁

@Transactional

public void unlock(){

if(select * from methodLock where method_name=xxx for update ==>有数据){

//有数据,表示资源已经被加锁,需要判断是否是重入

if(current==resultNodeInfo){

//判断是否是自己加锁,如果是自己的锁,需要解锁

if(count>1){

update count=count-1;

}else{

//锁没有重入

delete from methodLock where method_name =method_name

}

//是自己加的锁,增加count,表示可重入

update methodLock set count=count+1 where method_name=xxx

return true;

}else{

//不是自己的锁不释放

return false;

}

}else{

//没有数据,表示资源未被加锁,无需释放

return true;

}

}

3、缺点

依赖数据库的可用性,数据库挂掉,导致业务不可用锁没有失效时间,解锁一旦失败,锁记录会一直在数据库,其他线程则不能获取锁,或者启动定时任务循环遍历锁,长时间未被释放的,认定为超时,直接删除多个线程抢锁时,抢锁失败的线程会抛异常,如果需要再次获取锁,需要再次触发业务请求。或者需要自己实现CAS抢锁

5、借助redison来实现redis分布式锁

5.1、Redison分布式锁实现原理

6、SpringBoot整合Redisson

6.1、添加maven依赖

org.redisson

redisson

3.16.0

6.2、自定义配置类

单机模式为例

import org.redisson.Redisson;

import org.redisson.api.RedissonClient;

import org.redisson.config.Config;

import org.redisson.config.ReadMode;

import org.springframework.boot.autoconfigure.data.redis.RedisProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration

public class RedissonConfig {

@Resource

private RedisProperties redisProperties;

@Bean(destroyMethod = "shutdown")

public RedissonClient redissonClient() {

Config config = new Config();

String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");

config.useSingleServer()

.setDatabase(0)

.setAddress(redisUrl)

.setPassword(redisProperties.getPassword());

config.setLockWatchdogTimeout(9000);

return Redisson.create(config);

}

}

7、分布式锁常见api说明

public interface RRLock {

//----------------------Lock接口方法-----------------------

/

**

* 加锁 锁的有效期默认30秒

*/

void lock();

/

**

* tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .

*/

boolean tryLock();

/

**

* tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,

* 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。

*

* @param time 等待时间

* @param unit 时间单位 小时、分、秒、毫秒等

*/

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

/

**

* 解锁

*/

void unlock();

/

**

* 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过

* Thread.currentThread().interrupt(); 方法真正中断该线程

*/

void lockInterruptibly();

//----------------------RLock接口方法-----------------------

/

**

* 加锁 上面是默认30秒这里可以手动设置锁的有效时间

*

* @param leaseTime 锁有效时间

* @param unit 时间单位 小时、分、秒、毫秒等

*/

void lock(long leaseTime, TimeUnit unit);

/

**

* 这里比上面多一个参数,多添加一个锁的有效时间

*

* @param waitTime 等待时间

* @param leaseTime 锁有效时间

* @param unit 时间单位 小时、分、秒、毫秒等

*/

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

/

**

* 检验该锁是否被线程使用,如果被使用返回True

*/

boolean isLocked();

/

**

* 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)

* 这个比上面那个实用

*/

boolean isHeldByCurrentThread();

/

**

* 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间

* @param leaseTime 锁有效时间

* @param unit 时间单位 小时、分、秒、毫秒等

*/

void lockInterruptibly(long leaseTime, TimeUnit unit);

}

8、如何使用

@Autowired

private RedissonClient redissonClient;

public String test(String id) {

RLock lock = redissonClient.getLock(id);

try {

// 获取锁等待时间2秒,锁过期时间30秒

// 当leaseTime参数值设置为:-1时,锁自动续约机制生效

boolean b = lock.tryLock(2, 30, TimeUnit.SECONDS);

if (b) {

log.info("获取锁成功,id={ }", id);

Thread.sleep(10000);

log.info("业务处理结束...");

} else {

log.warn("获取锁失败,id={ }", id);

}

} catch (InterruptedException e) {

log.error("获取锁异常", e);

} finally {

// 仅允许锁的持有者线程解锁

if (lock.isHeldByCurrentThread()) {

lock.unlock();

}

}

return id;

}

9、实战数据防重

9.1、先新建一个网关服务,项目结构如下,用来实现转发,负载均衡

9.2、新建t_person_test表

CREATE TABLE `t_person_test` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`phone` varchar(30) DEFAULT NULL,

`num` int(11) DEFAULT NULL,

`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,

PRIMARY KEY (`id`),

UNIQUE KEY `t_person_test_id_uindex` (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1900802051 DEFAULT CHARSET=utf8

9.3、新建一个名为redisson-demo的项目,项目结构如下

9.4、application.properties

server.port=8384

spring.application.name=demo

spring.datasource.url=jdbc:mysql://10.1x.xx.x:3306/attendance_saas?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&serverTimezone=GMT

spring.datasource.username=test

spring.datasource.password=Test123.

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

spring.datasource.type=com.alibaba.druid.pool.DruidDataSource

# 服务注册到nacos

spring.cloud.nacos.discovery.server-addr=10.x.x.2x0:8848

spring.cloud.nacos.discovery.namespace=12d825d5-165b-489c-beef-c72944cac9d2

spring.cloud.nacos.discovery.password=nacos

spring.cloud.nacos.discovery.username=nacos

# redis配置

spring.redis.port=6379

spring.redis.host=1x.x.xx.xx

9.5、对应的实体类

import com.baomidou.mybatisplus.annotation.TableName;

import lombok.Builder;

import lombok.Data;

@Data

@Builder

@TableName("t_person_test")

public class PersonTestEntity {

private Integer id;

private String phone;

private Integer num;

}

9.6、对应的mapper

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

import com.example.redissondemo.entity.PersonTestEntity;

import org.apache.ibatis.annotations.Param;

import org.apache.ibatis.annotations.Select;

import org.apache.ibatis.annotations.Update;

public interface PersonTestMapper extends BaseMapper{

@Update("update t_person_test set num=num-1 where phone=#{ phone}")

int updateNum(@Param("phone") String phone);

@Select("select * from t_person_test where phone=#{ phone}")

PersonTestEntity selectPersonByPhone(@Param("phone") String phone);

}

9.7、对应的控制器RedissonController

import com.example.redissondemo.entity.PersonTestEntity;

import com.example.redissondemo.mapper.PersonTestMapper;

import lombok.extern.slf4j.Slf4j;

import org.redisson.api.RLock;

import org.redisson.api.RedissonClient;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController

@Slf4j

public class RedissonController {

@Resource

private RedissonClient redissonClient;

@Resource

private PersonTestMapper personTestMapper;

/

**

* 常规插入代码代码-大多数人写的

* @return

*/

@PostMapping("/insert")

public int insert(){

String phone="15797638118";

PersonTestEntity build = PersonTestEntity.builder()

.num(10)

.phone(phone).build();

PersonTestEntity testEntity = personTestMapper.selectPersonByPhone(phone);

if(null!=testEntity){

throw new RuntimeException("数据已存在");

}

int insert = personTestMapper.insert(build);

return insert;

}

/

**

* 使用分布式锁的插入代码-保证数据防重

* @return

*/

@PostMapping("/insertLock")

public int insertLock(){

String phone="15797638118";

PersonTestEntity build = PersonTestEntity.builder()

.num(10)

.phone(phone).build();

RLock lock = redissonClient.getLock("insertKey");

int insert = 0;

try {

lock.lock();

PersonTestEntity testEntity = personTestMapper.selectPersonByPhone(phone);

if(null!=testEntity){

throw new RuntimeException("数据已存在");

}

insert = personTestMapper.insert(build);

} catch (RuntimeException e) {

log.error("insertLock方法抛出异常",e);

} finally {

if(lock.isHeldByCurrentThread()){

lock.unlock();

}

}

return insert;

}

/

**

* 常规的更新代码-大多数人写的-字段更新结果超出预期

* @return

*/

@PostMapping("/update")

public int update(){

int update = 0;

String phone="15797638118";

PersonTestEntity testEntity = personTestMapper.selectPersonByPhone(phone);

if(testEntity!=null&&testEntity.getNum()>0){

update = personTestMapper.updateNum(phone);

}else {

log.info("num已经等于0");

}

return update;

}

/

**

* 使用分布式锁的更新代码-保证数据在预期内相减

* @return

*/

@PostMapping("/updateLock")

public int updateLock(){

int insert = 0;

String phone="15797638118";

RLock lock = redissonClient.getLock("updateKey");

try {

lock.unlock();

PersonTestEntity testEntity = personTestMapper.selectPersonByPhone(phone);

if(testEntity!=null&&testEntity.getNum()>0){

insert = personTestMapper.updateNum(phone);

}else {

log.info("num已经等于0");

}

} catch (Exception e) {

log.error("updateLock方法抛出异常",e);

} finally {

if(lock.isHeldByCurrentThread()){

lock.unlock();

}

}

return insert;

}

}

9.8、将Redisson-demo项目启动两个实例,端口号8383,8384

9.9、jmter压力测试,新建一个线程组,指定线程数为10,循环数次为2。相当于20个线程

8、新建一个http请求,这个我们借助spring cloud gateway 来实现负载均衡,先测试插入接口

10、测试结果

10.1、insert接口先查询数据库内容吗,数据为空,

执行jmter的20个线程结果

查询到有10数据入库,显然不对,我们的代码期望不符合

10.2、insertLock接口

执行jmter的20个线程结果

数据库中只有一条数据,分布式锁生效

10.3update接口

我们将jemter接口换成/demo/update,线程数任然是20个,执行前的结果如果下,num字段为10

num字段被更新成-8,显然不符合预期

10.3updateLock接口

jmter执行前

jmter执行后,多次测试num字段符合预期

11、分布式锁与本地事物

问题代码

@Transactional

public void update(int id) {

boolean lock = redisLock.lock(id);

if (!lock) {

throw new RuntimeException("当前人数过多,请稍后再试");

}

/

*

业务代码在该区域

*/

redisLock.unlock(id);

}

@Transactional是spring的aop实现,会在update方法之前开启事务,之后再加锁,当锁住的代码执行完成后,再提交事务,因此锁住的代码块执行是在事务之内执行的,可以推断在代码块执行完时,事务还未提交,锁已经被释放,此时其他线程拿到锁之后进行锁住的代码块,读取的库存数据不是最新的。先拿到锁的线程修改的数据,可能被覆盖。

解决方案:

@Transactional一定要的分布式锁内,或者直接抛弃事物。

copyright © 2025 powered by 益强资讯全景  滇ICP备2023006006号-31sitemap