分布式锁的实现方式和优缺点&Java代码实现
2021-01-29 20:19
标签:st3 connect 目录 end mysq cat cep unlock 开始 1、基于数据库 它的实现方式很简单,就是在数据库中创建一个lock表,申请锁就是向表中插入一行唯一关键字,数据库能够保证只有一个请求执行成功,也就是说这个请求申请到了锁,其他请求会报错说明没有申请到锁。释放锁就是在数据库中删除这一行数据。 分部式锁接口定义: 建表SQL: Java代码实现: 2、基于缓存(redis) 使用redis做分布式锁,主要是因为redis作为高速缓存,它的存储速度非常快。另外redis可对记录设置过期时间,防止系统崩溃锁无法自动释放。 Java实现: 3、基于zookeeper 4、锁测试 分布式锁的实现方式和优缺点&Java代码实现 标签:st3 connect 目录 end mysq cat cep unlock 开始 原文地址:https://www.cnblogs.com/zhi-leaf/p/12821202.htmlpackage com.zhi.util;
/**
* 分布式锁接口
*
* @author 张远志
* @since 2020年5月3日11:38:27
*
*/
public interface DistributeLock {
/**
* 申请锁
*
* @param key 锁关键字
* @param requestId 请求ID,主要用于校验,防止锁被误释放
* @return 是否成功申请锁
*/
boolean lock(String key, String requestId);
/**
* 申请锁
*
* @param key 锁关键字
* @param requestId 请求ID,主要用于校验,防止锁被误释放
* @param second 自动释放时间,防止系统崩溃后锁一直出于无法释放的状态
* @return 是否成功申请锁
*/
boolean lock(String key, String requestId, Integer second);
/**
* 释放锁
*
* @param key 锁关键字
* @param requestId 请求ID
* @return
*/
boolean unlock(String key, String requestId);
}
CREATE TABLE `dis_lock` (
`id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`content` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = ‘分布式锁‘ ROW_FORMAT = Dynamic;
package com.zhi.util;
import java.sql.Connection;
import java.sql.PreparedStatement;
import javax.sql.DataSource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* 数据库实现分布式锁,数据库先创建好dis_lock表
*
* @author 张远志
* @since 2020年5月3日12:05:22
*
*/
public class DatabaseLock implements DistributeLock {
private static Logger logger = LogManager.getLogger(DatabaseLock.class);
private final static String INSET = "insert dis_lock(id, content) values (?, ?)";
private final static String DELETE = "delete from dis_lock where id=? and content=?";
private DataSource dataSource;
public DatabaseLock(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public boolean lock(String key, String requestId) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(INSET);) {
statement.setString(1, key);
statement.setString(2, requestId);
statement.execute();
return true;
} catch (Exception e) {
return false;
}
}
@Override
public boolean lock(String key, String requestId, Integer second) {
if (second != null) {
throw new RuntimeException("not suport expire time set");
}
return lock(key, requestId);
}
@Override
public boolean unlock(String key, String requestId) {
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(DELETE);) {
statement.setString(1, key);
statement.setString(2, requestId);
statement.execute();
return statement.getUpdateCount() > 0;
} catch (Exception e) {
logger.error("释放锁出错", e);
return false;
}
}
}
package com.zhi.util;
import java.util.Collections;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
/**
* Redis分布式锁实现
*
* @author 张远志
* @since 2020年5月3日11:45:56
*
*/
public class RedisLock implements DistributeLock {
private JedisPool jedisPool;
/**
* 加锁成功标示
*/
private static final String LOCK_SUCCESS = "OK";
/**
* 解锁成功标示
*/
private static final Long RELEASE_SUCCESS = 1L;
public RedisLock(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public boolean lock(String key, String requestId) {
return lock(key, requestId, null);
}
@Override
public boolean lock(String key, String requestId, Integer second) {
try (Jedis jedis = jedisPool.getResource()) {
String back = null;
SetParams params = new SetParams();
params.nx(); // 若锁不存在才进行写操作
if (second != null) {
params.ex(second); // 设置超时时间,单位秒
}
back = jedis.set(key, requestId, params);
return LOCK_SUCCESS.equals(back);
}
}
@Override
public boolean unlock(String key, String requestId) {
try (Jedis jedis = jedisPool.getResource();) {
String script = "if redis.call(‘get‘, KEYS[1]) == ARGV[1] then return redis.call(‘del‘, KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(key), Collections.singletonList(requestId));
return RELEASE_SUCCESS.equals(result);
}
}
}
package com.zhi.util;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
/**
* Zookeeper实现分布式锁
*
* @author 张远志
* @since 2020年5月3日12:04:33
*
*/
public class ZookeeperLock implements DistributeLock {
private ZooKeeper zooKeeper;
/**
* 一般锁单独放一个目录中,我这里简单加一个前缀
*/
private static final String prefix = "/lock_";
public ZookeeperLock(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
@Override
public boolean lock(String key, String requestId) {
try {
String path = getPath(key);
Stat stat = zooKeeper.exists(path, false);
if (stat != null) { // 节点已存在,锁已占用
return false;
}
// CreateMode.PERSISTENT表示数据将被持久化保存
zooKeeper.create(path, requestId.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return true;
} catch (Exception e) {
return false;
}
}
@Override
public boolean lock(String key, String requestId, Integer second) {
if (second != null) {
throw new RuntimeException("not suport expire time set");
}
return lock(key, requestId);
}
@Override
public boolean unlock(String key, String requestId) {
try {
String path = getPath(key);
Stat stat = new Stat();
byte[] bits = zooKeeper.getData(path, false, stat);
String oKey = new String(bits, "UTF-8");
if (requestId.equals(oKey)) { // 比较拥有者是否一致
zooKeeper.delete(path, stat.getVersion()); // 删除指定版本数据
} else {
throw new RuntimeException("不允许释放拥有者不是自己的锁");
}
return true;
} catch (Exception e) {
return false;
}
}
private String getPath(String key) {
return prefix + key;
}
}
package com.zhi.test;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.TestMethodOrder;
import com.zhi.util.DatabaseLock;
import com.zhi.util.DistributeLock;
import com.zhi.util.RedisLock;
import com.zhi.util.ZookeeperLock;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* 分布式锁测试
*
* @author 张远志
* @since 2020年5月3日13:10:43
*
*/
@TestInstance(Lifecycle.PER_CLASS)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class LockTest {
private final Logger logger = LogManager.getLogger(this.getClass());
private static final String key = "test";
@Order(1)
@Test
public void test1() {
logger.info("开始测试数据库(MySQL)分布式锁-----------------------------------------------");
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername("root");
dataSource.setPassword("abc123");
dataSource.setUrl("jdbc:mysql://192.168.59.131:3306/zhi_test");
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
doJob(new DatabaseLock(dataSource));
}
@Order(2)
@Test
public void test2() {
logger.info("开始测试Redis分布式锁-----------------------------------------------");
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(5);
config.setTestOnBorrow(false);
JedisPool jedisPool = new JedisPool(config, "192.168.59.131", 6379);
doJob(new RedisLock(jedisPool));
}
@Order(3)
@Test
public void test3() {
logger.info("开始测试Zookeeper分布式锁-----------------------------------------------");
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper("192.168.59.131:2181", 60000, new Watcher() {
public void process(WatchedEvent event) {
logger.info("事件类型:{},路径:{}", event.getType(), event.getPath());
}
});
doJob(new ZookeeperLock(zooKeeper));
} catch (Exception e) {
} finally {
try {
zooKeeper.close();
} catch (Exception e) {
}
}
}
private void doJob(DistributeLock locker) {
int jobCount = 100;
CountDownLatch latch = new CountDownLatch(jobCount);
for (int i = 0; i ) {
new Worker(i, locker, latch).start();
}
try {
latch.await();
} catch (InterruptedException e) {
}
}
class Worker extends Thread {
private DistributeLock locker;
private CountDownLatch latch;
public Worker(int tNo, DistributeLock locker, CountDownLatch latch) {
super("线程" + StringUtils.leftPad(String.valueOf(tNo), 3, "0"));
this.locker = locker;
this.latch = latch;
}
@Override
public void run() {
LockTest.doSleep(new Random().nextInt(20) * 1000);
long startTime = System.currentTimeMillis();
if (locker.lock(key, getName())) {
logger.info("{}申请到锁,申请动作耗时{}毫秒", getName(), System.currentTimeMillis() - startTime);
LockTest.doSleep(1000);
boolean flag = locker.unlock(key, getName());
logger.info("{}释放锁{}", getName(), flag ? "成功" : "失败");
}
latch.countDown();
}
}
public static void doSleep(long millis) {
try {
Thread.sleep(millis);
} catch (Exception e) {
}
}
}