【原创】Java并发编程系列17 | 读写锁八讲(上)
2021-03-15 04:30
标签:getname after ali next 调度 接口 alt 准备 rgs 点击上方“java进阶架构师”,选择右上角“置顶公众号” 本文为何适原创并发编程系列第 17 篇,文末有本系列文章汇总。 在并发编程中解决线程安全的问题,通常使用的都是java提供的关键字synchronized或者重入锁ReentrantLock。它们都是独占式获取锁,也就是在同一时刻只有一个线程能够获取锁。 结果: ReentrantReadWriteLock与ReentrantLock一样,其锁主体依然是Sync,读写锁其实就是两个属性:readerLock、writerLock。 我们知道AQS.state使用来表示同步状态的。ReentrantLock中,state=0表示没有线程占用锁,state>0时state表示线程的重入次数。但是读写锁ReentrantReadWriteLock内部维护着两个锁,需要用state这一个变量维护多种状态,应该怎么办呢? 记录获取锁的线程 线程获取写锁后,和重入锁一样,将AQS.exclusiveOwnerThread置为当前线程。但是读锁是共享的,可以多个线程同时获取读锁,那么如何记录获取读锁的多个线程以及每个线程的重入情况呢? 线程与HoldCounter的存储结构如下图: 查看使用示例中代码rwl.readLock().lock()的实现 doAcquireShared(): doAcquireShared():尝试获取读锁,获取到锁返回1,获取不到返回-1。 readerShouldBlock(): readerShouldBlock():检查AQS队列中的情况,看是当前线程是否可以获取读锁,返回true表示当前不能获取读锁。 非公平锁NonfairSync: } tryAcquireShared()方法中因为CAS抢锁失败等原因没有获取到读锁的,fullTryAcquireShared()再次尝试获取读锁。此外,fullTryAcquireShared()还处理了读锁重入的情况。 doAcquireShared() 再回到最开始的acquireShared(),tryAcquireShared()抢锁成功,直接返回,执行同步代码;如果tryAcquireShared()抢锁失败,调用doAcquireShared()。 setHeadAndPropagate() 试想一种情况:当线程1持有写锁时,线程2、线程3、线程4、线程5...来获取读锁是获取不到的,只能排进同步队列。当线程1释放写锁时,唤醒线程2来获取锁。因为读锁是共享锁,当线程2获取到读锁时,线程3也应该被唤醒来获取读锁。 理解了上文读锁的获取过程,读锁的释放过程不看源码也应该可以分析出来: 大多数业务场景,都是读多写少的,采用互斥锁性能较差,所以提供了读写锁。读写锁允许共享资源在同一时刻可以被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。 【原创】01|开篇获奖感言 之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。 看到这里,证明有所收获 【原创】Java并发编程系列17 | 读写锁八讲(上) 标签:getname after ali next 调度 接口 alt 准备 rgs 原文地址:https://blog.51cto.com/15009303/2552741收录于话题
#进阶架构师 | 并发编程专题
12个
20大进阶架构专题每日送达写在前面
通过以下几部分来分析Java提供的读写锁ReentrantReadWriteLock:
为什么需要读写锁
读写锁的使用Demo
ReentrantReadWriteLock类结构
记录读写锁状态
源码分析读锁的获取与释放
源码分析写锁的获取与释放
锁降级
读写锁应用
本文涉及到上下文联系较多,经常需要上下滑动查看,篇幅太多很不方便,而且文章太长阅读体验也不好,所以分成读写锁(上)和读写锁(下)两篇。本文是上篇,只写到“源码分析读锁的获取与释放”。1. 为什么需要读写锁
但是在大多数场景下,大部分时间都是读取共享资源,对共享资源的写操作很少。然而读服务不存在数据竞争问题,如果一个线程在读时禁止其他线程读势必会导致性能降低。
针对这种读多写少的情况,java还提供了另外一个实现Lock接口的ReentrantReadWriteLock(读写锁)。读写锁允许共享资源在同一时刻可以被多个读线程访问,但是在写线程访问时,所有的读线程和其他的写线程都会被阻塞。2. 使用Demo
直接上代码:
public class ReadWriteLockTest {
public static void main(String[] args) {
final Data data = new Data();
for (int i = 0; i
读锁线程-0 开始读取数据
读锁线程-1 开始读取数据
读锁线程-2 开始读取数据
读锁线程-0 读取数据完成 0
读锁线程-1 读取数据完成 0
读锁线程-2 读取数据完成 0
写锁线程-0 开始写数据
写锁线程-0 写数据完成 4306
...
写锁线程-1 开始写数据
写锁线程-1 写数据完成 9114
...
写锁线程-2 开始写数据
写锁线程-2 写数据完成 7709
Data类的共享数据data,get()方法上读锁读data,put()方法上写锁写data。启动3个线程读data,3个线程写data。
从结果可以看出,读锁是共享的,读锁的三个线程是同时读取共享数据data的;写锁是互斥的,写锁的三个线程是依次写共享数据data的。3. 类结构
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
// 属性
private final ReentrantReadWriteLock.ReadLock readerLock; // 读锁
private final ReentrantReadWriteLock.WriteLock writerLock; // 写锁
final Sync sync; // 锁的主体AQS
// 内部类
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class FairSync extends Sync {}
static final class NonfairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {}
// 构造
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
}
一个ReentrantReadWriteLock对象都对应着读锁和写锁两个锁,而这两个锁是通过同一个sync(AQS)实现的。4. 记录读写锁状态
读写锁采用“按位切割使用”的方式,将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态,并通过位运算来快速获取当前的读写锁状态。abstract static class Sync extends AbstractQueuedSynchronizer {
// 将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 >> SHARED_SHIFT; }
/**
* 获取写锁的状态,写锁的重入次数
* c & 0x0000FFFF,将高16位全部抹去,获得低16位
*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}
sycn中提供了一个HoldCounter类,类似计数器,用于记录一个线程读锁的重入次数。将HoldCounter通过ThreadLocal与线程绑定。
源码如下:abstract static class Sync extends AbstractQueuedSynchronizer {
// 这个嵌套类的实例用来记录每个线程持有的读锁数量(读锁重入)
static final class HoldCounter {
int count = 0;// 读锁重入次数
final long tid = getThreadId(Thread.currentThread());// 线程 id
}
// ThreadLocal 的子类
static final class ThreadLocalHoldCounter
extends ThreadLocal
注:属性cachedHoldCounter、firstReader、firstReaderHoldCount都是为了提高性能,目前不用太关注。
(ThreadLocal在之后的文章中会专门讲解)
5. 读锁获取
/**
* rwl.readLock().lock()-->ReadLock.lock()
*/
public void lock() {
sync.acquireShared(1);
}
/**
* ReadLock.lock()-->AQS.acquireShared(int)
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg)
首先来分析一下可以获取读锁的条件:
当前锁的状态
1)读锁写锁都没有被占用
2)只有读锁被占用
3)写锁被自己线程占用
简单总结,只有在其它线程持有写锁时,不能获取读锁,其它情况都可以去获取。
AQS队列中的情况,如果是公平锁,同步队列中有线程等锁时,当前线程是不可以先获取锁的,必须到队列中排队。
读锁的标志位只有16位,最多只能有2^16-1个线程获取读锁或重入
看源码:/**
* 尝试获取读锁,获取到锁返回1,获取不到返回-1
*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
/*
* 根据锁的状态判断可以获取读锁的情况:
* 1. 读锁写锁都没有被占用
* 2. 只有读锁被占用
* 3. 写锁被自己线程占用
* 总结一下,只有在其它线程持有写锁时,不能获取读锁,其它情况都可以去获取。
*/
if (exclusiveCount(c) != 0 && // 写锁被占用
getExclusiveOwnerThread() != current) // 持有写锁的不是当前线程
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() && // 检查AQS队列中的情况,看是当前线程是否可以获取读锁,下文有详细讲解。
r
分别看下公平锁和非公平锁的实现。
公平锁FairSync:
对于公平锁来说,如果队列中还有线程在等锁,就不允许新来的线程获得锁,必须进入队列排队。
hasQueuedPredecessors()方法在重入锁的文章中分析过,判断同步队列中是否还有等锁的线程,如果有其他线程等锁,返回true当前线程不能获取读锁。
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
对于非公平锁来说,原本是不需要关心队列中的情况,有机会直接尝试抢锁就好了,这里问什么会限制获取锁呢?
这里给写锁定义了更高的优先级,如果队列中第一个等锁的线程请求的是写锁,那么当前线程就不能跟那个马上就要获取写锁的线程抢,这样做很好的避免了写锁饥饿。
/**
* 队列中第一个等锁的线程请求的是写锁时,返回true,当前线程不能获取读锁
*/
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
// 返回true-队列中第一个等锁的线程请求的是写锁
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() && // head后继节点线程请求写锁
s.thread != null;
fullTryAcquireShared()/**
* 再次尝试获取读锁
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {// 注意这里是循环
int c = getState();
if (exclusiveCount(c) != 0) {
// 仍然是先检查锁状态:在其它线程持有写锁时,不能获取读锁,返回-1
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
/*
* exclusiveCount(c) == 0 写锁没有被占用
* readerShouldBlock() == true,AQS同步队列中的线程在等锁,当前线程不能抢读锁
* 既然当前线程不能抢读锁,为什么没有直接返回呢?
* 因为这里还有一种情况是可以获取读锁的,那就是读锁重入。
* 以下代码就是检查如果不是重入的话,return -1,不能继续往下获取锁。
*/
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// CAS修改读锁标志位,修改成功表示获取到读锁;CAS失败,则进入下一次for循环继续CAS抢锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
/*
* 到这里已经获取到读锁了
* 以下是修改记录获取读锁的线程和重入次数,以及缓存firstReader和cachedHoldCounter
*/
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
doAcquireShared()应该比较熟悉了吧,类似AQS那篇中分析过acquireQueued():
将当前线程构成节点node
如果node是head的后继节点就可以继续尝试抢锁
如果node不是head的后继节点,将node加入队列的队尾,并将当前线程阻塞,等待node的前节点获取、释放锁之后唤醒node再次抢锁。
node抢到读锁之后执行setHeadAndPropagate()方法,setHeadAndPropagate()是获取读锁的特殊之处,下文分析。public final void acquireShared(int arg) {
if (tryAcquireShared(arg) = 0) {// r>0表示抢锁成功
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断node前驱节点状态,将当前线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate()方法就是在一个线程获取读锁之后,唤醒它之后排队获取读锁的线程的。该方法可以保证线程2获取读锁后,唤醒线程3获取读锁,线程3获取读锁后,唤醒线程4获取读锁,直到遇到后继节点是要获取写锁时才结束。private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);// 因为node获取到锁了,所以设置node为head
if (propagate > 0 || h == null || h.waitStatus
6. 读锁释放
处理firstReader、cachedHoldCounter、readHolds获取读锁线程及读锁重入次数。
修改读锁标志位state的高16位。
释放读锁之后,如果队列中还有线程等锁,唤醒同步队列head后继节点等待写锁的线程。
这里为什么是写锁?因为线程持有读锁时会把它之后要获取读锁的线程全部唤醒直到遇到写锁。
使用示例中释放读锁代码 rwl.readLock().unlock()
/**
* rwl.readLock().unlock()-->ReadLock.unlock()
*/
public void unlock() {
sync.releaseShared(1);
}
/**
* sync.releaseShared(1)-->AQS.releaseShared(int)
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {// 当前线程释放读锁,下文介绍
/*
* 到这里,已经没有任何线程占用锁,调用doReleaseShared()唤醒之后获取写锁的线程
* 如果同步队列中还有线程在排队,head后继节点的线程一定是要获取写锁,因为线程持有读锁时会把它之后要获取读锁的线程全部唤醒
*/
doReleaseShared();// 唤醒head后继节点获取锁
return true;
}
return false;
}
/**
* 释放读锁
* 当前线程释放读锁之后,没有线程占用锁,返回true
*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 处理firstReader、cachedHoldCounter、readHolds获取读锁线程及读锁重入次数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count
总结
一个ReentrantReadWriteLock对象都对应着读锁和写锁两个锁,而这两个锁是通过同一个sync(AQS)实现的。
读写锁采用“按位切割使用”的方式,将state这个int变量分为高16位和低16位,高16位记录读锁状态,低16位记录写锁状态。
读锁获取时,需要判断当时的写锁没有被其他线程占用即可,锁处于的其他状态都可以获取读锁。
参考资料
《Java 并发编程之美》
《Java 并发编程实战》
《Java 并发编程的艺术》并发系列文章汇总
【原创】02|并发编程三大核心问题
【原创】03|重排序-可见性和有序性问题根源
【原创】04|Java 内存模型详解
【原创】05|深入理解 volatile
【原创】06|你不知道的 final
【原创】07|synchronized 原理
【原创】08|synchronized 锁优化
【原创】09|基础干货
【原创】10|线程状态
【原创】11|线程调度
【原创】13|LockSupport
【原创】14|AQS 源码分析
【原创】15|重入锁 ReentrantLock
【原创】16|公平锁与非公平锁
《java面试宝典5.0》(初中级)
《350道Java面试题:整理自100+公司》(中高级)
《资深java面试宝典-视频版》(资深)
《Java[BAT]面试必备》(资深)
分别适用于初中级,中高级,资深级工程师的面试复习。
内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。
获取方式:点“在看”,V信关注上述单号并回复 【面试】即可领取,更多精彩陆续奉上。
必须点个在看支持呀,喵