曹工谈并发:Synchronized升级为重量级锁后,靠什么 API 来阻塞自己
2021-03-07 13:29
标签:exchange pat win 级别 cti amp 失败 tin private 因为想知道java中的关键字,对应的操作系统级别的api是啥,本来打算整理几个我知道的出来,但是,尴尬的是,我发现java里最重要的synchronized关键字,我就不知道它对应的api是什么。 在redis源码里,线程如果要进入一个同步区(只能单线程进入的代码块),会先获取一个互斥量,如果获取到了,则可以执行;否则,会阻塞在在这个互斥量上。 类型为 pthread_mutex_t。 使用互斥量前,要先初始化后,才能使用: pthread_mutex_init 这个函数是操作系统提供出来的api,不过应该是类unix系统才有这个。 shell中执行man pthread_mutex_init,可以看到: pthread_mutex_init 初始化参数mutex指定的互斥量,,使用attr中指定的属性。如果attr为空,使用默认参数。 成功初始化后,互斥量的状态变为已初始化、未锁定。 我们可以看下linux下执行man pthread_mutex_lock后,看到的帮助: 可以重点看下上面那句注释:调用pthread_mutex_lock,会导致参数mutext引用的互斥量被锁定;如果该互斥量早已被锁定,则调用线程将被阻塞。 如要了解更多互斥量,可以看看这篇文章,写的不错: 现在不用考虑各种优化,只考虑最终synchronized已经升级为重量级锁之后的表现,会使用前面的互斥量吗? 由于作者本身也是半桶水,搞了半天也没把jdk的源码调试环境搞起来,只能看看代码了,顺便结合网络上的一些文章,不过结论应该可靠。 大家先可以参考这两篇文章: JVM:锁实现(synchronized&JSR166)行为分析和相关源码 JVM源码分析之synchronized实现 我这里也简单列举一下整个过程,就从下面这里开始: 1处,前面都是偏向锁相关的东西,先跳过,进入slow_enter。 1处,mark->is_neutral(),判断对象头,是否是无锁状态,neutral本来是中立的意思,这里表示无锁。 2处,如果无锁,则调用lock的方法,lock本身是当前线程在栈内持有的对象,调用lock的set_displaced_header方法,参数为待加锁对象(堆里)的对象头,意思是,把待加锁对象的对象头,设置到线程的栈内变量里。 lock变量的class 类型如下: 结合这里的1、2处代码,上面那句,意思就是,把待加锁对象的对象头,存储到lock变量 _displaced_header属性。 3处,这里比较复杂。 这一句里面,cmpxchg_ptr,定义为: 这一句就是平时我们说的那种cas操作,表示,如果第二个参数,dest指向的值,和第三个参数,compare_value的值相等,则把第二个参数中的值,设为参数1的值。 重点来看,第二个参数,是什么鬼意思? Handle obj,说明obj是Handle类型, 那么,obj()的意思,应该就是,代码1处,应该是进行了操作符重载,所以会调用obj()方法,obj方法,请看2处,会返回 属性_handle,当然,这里对属性进行了解引用。 所以,基本的意思就是,返回_handle这个属性,执行的oop对象。 然后,再说说参数3,参数3就是mark。 这个mark,是在代码开头这样被赋值的。 那,我们看看obj的mark方法就行。(不知道为啥,在Handle类里,没找到这个方法,不知道为啥,难道是有什么特殊语法吗。。。),不过这个mark的意思,肯定就是对象里的对象头无误。 然后,第1个参数呢,就是lock,就是那个,如果上面的第二、三个参数相等,就将本参数,即,本线程,栈内对象lock的地址,设置到对象头中,表示,该对象已经被本线程加锁了。 4处,这里表示如果是当前线程重复进入: 5处,开始膨胀为重量级锁,并进入重量级锁的争夺 这里,会先通过调用ObjectSynchronizer::inflate(THREAD, obj()),来完成轻量级锁到重量级锁的升级。 这个注释就很清晰,升级轻锁为重锁,并且,会返回对象的monitor,即对应的重锁对象。 膨胀的过程,太复杂,看不懂(心累。。),有兴趣的可以看看这篇。 https://github.com/farmerjohngit/myblog/issues/15 膨胀后,返回了对应的monitor,然后进入其enter方法。 然后enter也是茫茫多的代码,根据网上博客,即: JVM:锁实现(synchronized&JSR166)行为分析和相关源码 JVM源码分析之synchronized实现 会进入以下方法: 这个里面,也是茫茫多的代码,而且更可怕的是,注释也多得很,快比代码多了。。 我们这里,重点看6处,阻塞自己,采用的方法为: 我们看看这个类: 有点意思,竟然有好几个ParkEvent类型的属性,第一个,看注释,就是用来,synchronized使用的; 第二个是Thread.sleep使用的,第三个是jdk自身的native方法用的 JVM:锁实现(synchronized&JSR166)行为分析和相关源码 大家可以再看下这篇,因为感觉写得不错。 这个类本身的属性,看得一知半解,但是它的父类,是这个。 这个PlatformEvent有意思的很,它是平台相关的。 可以看到,它有5个同名的类,分别在5个文件,分别是什么os_windows.hpp、os_linux.hpp、os_solaris.hpp,盲猜也知道,是不同操作系统下的实现。 我们看看linux下, 看到1处了吗,原来,阻塞自己还是用了pthread_mutex_t啊。 看看park怎么实现的: 1处,加锁; 2处,解锁。 所以,我们本文的答案找到了。 看看其他平台下呢? 其他平台就不一一截图了,除了windows,都是用的pthread_mutex_lock。 为了这个答案,花了一天时间,值得吗,有点不值得,时间花太长了,不过也值得,至少问题解决了。 不过,没把调试环境搭起来太惨了,各种头文件找不到,跳转都点不动,基本上都是全文搜索。。。 谢谢大家。 曹工谈并发:Synchronized升级为重量级锁后,靠什么 API 来阻塞自己 标签:exchange pat win 级别 cti amp 失败 tin private 原文地址:https://www.cnblogs.com/grey-wolf/p/12822729.html背景
redis中如何获取锁
互斥量类型定义:
// 定义互斥量
static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
互斥量初始化:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
The pthread_mutex_init() function shall initialize the mutex referenced by mutex with attributes specified by attr. If attr is NULL, the default mutex
attributes are used; the effect shall be the same as passing the address of a default mutex attributes object. Upon successful initialization, the state of
the mutex becomes initialized and unlocked.
如何锁定、解锁互斥量
// 1
pthread_mutex_lock(&bio_mutex[type]);
// 2
pthread_mutex_unlock(&bio_mutex[type]);
#include
redis中线程使用互斥量的例子
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
// 1 锁定
pthread_mutex_lock(&bio_mutex[type]);
// 将新工作推入队列
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
// 2 解锁
pthread_mutex_unlock(&bio_mutex[type]);
}
Linux C 编程——多线程和互斥锁mutexjdk中synchronized,不考虑轻锁、偏向锁,最终有用到前面的互斥量吗
参考文章
简易流程梳理
// Fast Monitor Enter/Exit
// This the fast monitor enter. The interpreter and compiler use
// some assembly copies of this code. Make sure update those code
// if the following function is changed.
// The implementation is extremely sensitive to race condition. Be careful.
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 1
slow_enter (obj, lock, THREAD) ;
}
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
// 1
if (mark->is_neutral()) {
// Anticipate successful CAS -- the ST of the displaced mark must
// be visible set_displaced_header(mark);
// 3
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
// Fall through to inflate() ...
} else
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don‘t relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
// The object header will never be displaced to this lock,
// so it does not matter what the value is, except that it
// must be non-zero to avoid looking like a re-entrant lock,
// and must not look locked either.
lock->set_displaced_header(markOopDesc::unused_mark());
// 2
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
class BasicLock VALUE_OBJ_CLASS_SPEC {
friend class VMStructs;
private:
// 1
volatile markOop _displaced_header;
public:
markOop displaced_header() const { return _displaced_header; }
// 2
void set_displaced_header(markOop header) { _displaced_header = header; }
void print_on(outputStream* st) const;
// move a basic lock (used during deoptimization
void move_to(oop obj, BasicLock* dest);
static int displaced_header_offset_in_bytes() { return offset_of(BasicLock, _displaced_header); }
};
Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)
inline static intptr_t cmpxchg_ptr(intptr_t exchange_value, volatile intptr_t* dest, intptr_t compare_value);
class Handle VALUE_OBJ_CLASS_SPEC {
private:
oop* _handle;
protected:
// 2
oop obj() const { return *_handle; }
public:
//1
oop operator () () const { return obj(); }
Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)
markOop mark = obj->mark();
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don‘t relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
// --接前面的代码
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
// Inflate light weight monitor to heavy weight monitor
static ObjectMonitor* inflate(Thread * Self, oop obj);
ObjectMonitor::EnterI
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
// 1 Try the lock - TATAS
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
DeferredInitialize () ;
//2 We try one round of spinning *before* enqueueing Self.
if (TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
//3 The Spin failed -- Enqueue and park the thread ...
//4 Enqueue "Self" on ObjectMonitor‘s _cxq
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
// 5 Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
//6 park self
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
// 7
Self->_ParkEvent->park() ;
}
if (TryLock(Self) > 0) break ;
...
}
...
return ;
}
// self的定义,类型为线程
Thread * Self = THREAD ;
...
Self->_ParkEvent->park() ;
thread.hpp
public:
volatile intptr_t _Stalled ;
volatile int _TypeTag ;
// 1
ParkEvent * _ParkEvent ; // for synchronized()
// 2
ParkEvent * _SleepEvent ; // for Thread.sleep
// 3
ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor
// 4
ParkEvent * _MuxEvent ; // for low-level muxAcquire-muxRelease
ParkEvent是什么
class ParkEvent : public os::PlatformEvent
class PlatformEvent : public CHeapObj
void os::PlatformEvent::park() { // AKA "down()"
// Invariant: Only the thread associated with the Event/PlatformEvent
// may call park().
// TODO: assert that _Assoc != NULL or _Assoc == Self
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v == 0) {
//1 Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
while (_Event = 0, "invariant") ;
}
总结
文章标题:曹工谈并发:Synchronized升级为重量级锁后,靠什么 API 来阻塞自己
文章链接:http://soscw.com/index.php/essay/61357.html