一、摘要
在《深入剖析Java关键字之synchronized(原理篇)》中,我们从使用和原理上面分析了synchronized关键字,我们知道,synchronized是通过monitor监视器锁来实现;这一篇我们将更多的从字节码以及JVM的源码层面来分析synchronized关键字的实现。
二、synchronized的字节码分析
在《Java class文件结构(规范篇)》中我们知道由synchronized修饰的方法编译成字节码的时候方法的access_flags会有ACC_SYNCHRONIZED标识,我们把《深入剖析Java关键字之synchronized(原理篇)》中SynchronizedMethod的class文件使用javap -v SynchronizedMethod.class
命令来查看之后如下:
在方法的描述上面果然有ACC_SYNCHRONIZED;那么,如果反编译代码块,会得到什么结果呢?我们使用
javap -v SynchronizedCodeBlock.class
命令来查看SynchronizedCodeBlock的字节码,如下:我们可以看到increase方法中有一个monitorenter和两个monitorexit;无论ACC_SYNCHRONIZED模式还是monitorenter、monitorexit模式,本质上都是对一个对象的监视器(monitor)进行获取,而这个获取的过程是排他的,也就是同一个时刻只能有一个线程获得同步块对象的监视器, ACC_SYNCHRONIZED只是通过隐式的方式实现。在 《深入剖析Java关键字之synchronized(原理篇)》文章中,有提到对象监视器:
synchronized关键字经过编译之后,会在同步块的前后分别形成monitorenter和monitorexit这两个字节码指令。当我们的JVM把字节码加载到内存的时候,会对这两个指令进行解析。这两个字节码都需要一个Object类型的参数来指明要锁定和解锁的对象。如果Java程序中的synchronized明确指定了对象参数,那么这个对象就是加锁和解锁的对象;如果没有明确指定,那就根据synchronized修饰的是实例方法还是类方法,获取对应的对象实例或Class对象来作为锁对象
接下来,我们来看下JVM规范中关于同步的描述:
Java虚拟机可以支持方法级的同步和方法内一段指令序列的同步,这两种同步结构都是使用同步锁(monitor)来支持的。
方法级的同步是隐式的,即无需通过字节码指令来控制,它实现在方法调用和返回操作之中。虚拟机可以从方法常量池中的方法表结构(method_info structure)中的 ACC_SYNCHRONIZED访问标志是否设置,如果设置了,执行线程将先持有同步锁,然后执行方法,最后在方法完成(无论是正常完成还是非正常完成)时释放同步锁。在方法执行期间,执行线程持有了同步锁,其他任何线程都无法再获得同一个锁。如果一个同步方法执行期间抛出了异常,并且在方法内部无法处理此异常,那这个同步方法所持有的锁将在异常抛到同步方法之外时自动释放。
指令序列的同步通常用来表示Java语言中的synchronized块,Java虚拟机的指令集中有monitorenter和monitorexit两个指令来支持这种synchronized关键字的语义。正确实现synchronized关键字需要编译器与Java虚拟机两者协作支持。
结构化锁定(structure locking)是指在方法调用期间每一个同步锁退出都与前面的同步锁进入相匹配的情形。因为无法保证所有提交给Java虚拟机执行的代码都满足结构化锁定,所以允许Java虚拟机允许(但不强制要求)通过以下两条规则来保证结构化锁定成立。假设T代表一个线程,M代表一个同步锁,那么:
1. T在方法执行时持有同步锁M的次数必须与T在方法执行(包括正常和非正常完成)时释放同步锁M的次数相等。
2. 在方法调用过程中,任何时刻都不会出现线程T释放同步锁M的次数比T持有同步锁M次数多的情况。
请注意,在调用同步方法时也认为自动持有和释放锁的过程是在方法调用期间发生。
从JVM规范中,我们也可以看出来Java的同步机制是通过monitor来实现的,那么什么是monitor呢?JVM源码又是怎么实现的呢?
三、monitor源码分析
在分析monitor的源码之前,我们需要了解oop
,oopDesc
,markOop
等相关概念,在《深入剖析Java关键字之synchronized(原理篇)》这篇文章中,我们知道了synchronized的同步锁实际上是存储在对象头中,这个对象头是一个Java对象在内存中布局的一部分。Java中的每一个Object在JVM内部都会有一个native的C++对象oop/oopDesc
与之对应。在hotspot源码 oop.hpp
中oopDesc
的定义如下:
class oopDesc {
friend class VMStructs;
friend class JVMCIVMStructs;
private:
volatile markOop _mark;
union _metadata {
Klass* _klass;
narrowKlass _compressed_klass;
} _metadata;
其中 markOop
就是我们所说的Mark Word,用于存储锁的标识。 hotspot源码 markOop.hpp
文件代码片段:
class markOopDesc: public oopDesc {
private:
// Conversion
uintptr_t value() const { return (uintptr_t) this; }
public:
// Constants
enum { age_bits = 4,
lock_bits = 2,
biased_lock_bits = 1,
max_hash_bits = BitsPerWord - age_bits - lock_bits - biased_lock_bits,
hash_bits = max_hash_bits > 31 ? 31 : max_hash_bits,
cms_bits = LP64_ONLY(1) NOT_LP64(0),
epoch_bits = 2
};
......
}
markOopDesc
继承自oopDesc
,并且扩展了自己的monitor
方法,这个方法返回一个ObjectMonitor
指针对象,在hotspot虚拟机中,采用ObjectMonitor
类来实现monitor
:
bool has_monitor() const {
return ((value() & monitor_value) != 0);
}
ObjectMonitor* monitor() const {
assert(has_monitor(), "check");
// Use xor instead of &~ to provide one extra tag-bit check.
return (ObjectMonitor*) (value() ^ monitor_value);
}
在ObjectMonitor.hpp
中,可以看到ObjectMonitor
的定义:
class ObjectMonitor {
public:
enum {
OM_OK, // no error
OM_SYSTEM_ERROR, // operating system error
OM_ILLEGAL_MONITOR_STATE, // IllegalMonitorStateException
OM_INTERRUPTED, // Thread.interrupt()
OM_TIMED_OUT // Object.wait() timed out
};
private:
friend class ObjectSynchronizer;
friend class ObjectWaiter;
friend class VMStructs;
volatile markOop _header; // displaced object header word - mark
void* volatile _object; // backward object pointer - strong root
public:
ObjectMonitor* FreeNext; // Free list linkage
private:
DEFINE_PAD_MINUS_SIZE(0, DEFAULT_CACHE_LINE_SIZE,
sizeof(volatile markOop) + sizeof(void * volatile) +
sizeof(ObjectMonitor *));
protected: // protected for JvmtiRawMonitor
void * volatile _owner; // pointer to owning thread OR BasicLock
volatile jlong _previous_owner_tid; // thread id of the previous owner of the monitor
volatile intptr_t _recursions; // recursion count, 0 for first entry
ObjectWaiter * volatile _EntryList; // Threads blocked on entry or reentry.
// The list is actually composed of WaitNodes,
// acting as proxies for Threads.
private:
ObjectWaiter * volatile _cxq; // LL of recently-arrived threads blocked on entry.
Thread * volatile _succ; // Heir presumptive thread - used for futile wakeup throttling
Thread * volatile _Responsible;
volatile int _Spinner; // for exit->spinner handoff optimization
volatile int _SpinDuration;
volatile jint _count; // reference count to prevent reclamation/deflation
// at stop-the-world time. See deflate_idle_monitors().
// _count is approximately |_WaitSet| + |_EntryList|
protected:
ObjectWaiter * volatile _WaitSet; // LL of threads wait()ing on the monitor
volatile jint _waiters; // number of waiting threads
private:
volatile int _WaitSetLock; // protects Wait Queue - simple spinlock
public:
static void Initialize();
// Only perform a PerfData operation if the PerfData object has been
// allocated and if the PerfDataManager has not freed the PerfData
// objects which can happen at normal VM shutdown.
//
#define OM_PERFDATA_OP(f, op_str) \
do { \
if (ObjectMonitor::_sync_ ## f != NULL && \
PerfDataManager::has_PerfData()) { \
ObjectMonitor::_sync_ ## f->op_str; \
} \
} while (0)
static PerfCounter * _sync_ContendedLockAttempts;
static PerfCounter * _sync_FutileWakeups;
static PerfCounter * _sync_Parks;
static PerfCounter * _sync_Notifications;
static PerfCounter * _sync_Inflations;
static PerfCounter * _sync_Deflations;
static PerfLongVariable * _sync_MonExtant;
static int Knob_ExitRelease;
static int Knob_InlineNotify;
static int Knob_Verbose;
static int Knob_VerifyInUse;
static int Knob_VerifyMatch;
static int Knob_SpinLimit;
void* operator new (size_t size) throw();
void* operator new[] (size_t size) throw();
void operator delete(void* p);
void operator delete[] (void *p);
// TODO-FIXME: the "offset" routines should return a type of off_t instead of int ...
// ByteSize would also be an appropriate type.
static int header_offset_in_bytes() { return offset_of(ObjectMonitor, _header); }
static int object_offset_in_bytes() { return offset_of(ObjectMonitor, _object); }
static int owner_offset_in_bytes() { return offset_of(ObjectMonitor, _owner); }
static int count_offset_in_bytes() { return offset_of(ObjectMonitor, _count); }
static int recursions_offset_in_bytes() { return offset_of(ObjectMonitor, _recursions); }
static int cxq_offset_in_bytes() { return offset_of(ObjectMonitor, _cxq); }
static int succ_offset_in_bytes() { return offset_of(ObjectMonitor, _succ); }
static int EntryList_offset_in_bytes() { return offset_of(ObjectMonitor, _EntryList); }
#define OM_OFFSET_NO_MONITOR_VALUE_TAG(f) \
((ObjectMonitor::f ## _offset_in_bytes()) - markOopDesc::monitor_value)
markOop header() const;
volatile markOop* header_addr();
void set_header(markOop hdr);
intptr_t is_busy() const {
// TODO-FIXME: merge _count and _waiters.
// TODO-FIXME: assert _owner == null implies _recursions = 0
// TODO-FIXME: assert _WaitSet != null implies _count > 0
return _count|_waiters|intptr_t(_owner)|intptr_t(_cxq)|intptr_t(_EntryList);
}
intptr_t is_entered(Thread* current) const;
void* owner() const;
void set_owner(void* owner);
jint waiters() const;
jint count() const;
void set_count(jint count);
jint contentions() const;
intptr_t recursions() const { return _recursions; }
// JVM/TI GetObjectMonitorUsage() needs this:
ObjectWaiter* first_waiter() { return _WaitSet; }
ObjectWaiter* next_waiter(ObjectWaiter* o) { return o->_next; }
Thread* thread_of_waiter(ObjectWaiter* o) { return o->_thread; }
protected:
// We don't typically expect or want the ctors or dtors to run.
// normal ObjectMonitors are type-stable and immortal.
ObjectMonitor() { ::memset((void *)this, 0, sizeof(*this)); }
~ObjectMonitor() {
// TODO: Add asserts ...
// _cxq == 0 _succ == NULL _owner == NULL _waiters == 0
// _count == 0 _EntryList == NULL etc
}
......
#endif // SHARE_VM_RUNTIME_OBJECTMONITOR_HPP
所以同步块的实现使用 monitorenter和 monitorexit指令,而同步方法是依靠方法修饰符上的flag ACC_SYNCHRONIZED来完成。其本质是对一个对象监视器(monitor)进行获取,这个获取过程是排他的,也就是同一个时刻只能有一个线程获得由synchronized所保护对象的监视器。所谓的监视器,实际上可以理解为一个同步工具,它是由Java对象进行描述的。在Hotspot中,是通过ObjectMonitor来实现,每个对象中都会内置一个ObjectMonitor对象。如图所示:
四、synchronized源码分析
从 monitorenter
和 monitorexit
这两个指令来开始阅读源码,JVM将字节码加载到内存以后,会对这两个指令进行解释执行, monitorenter
, monitorexit
的指令解析是通过 interpreterRuntime.cpp
中的两个方法实现:
// Synchronization
static void monitorenter(JavaThread* thread, BasicObjectLock* elem);
static void monitorexit (JavaThread* thread, BasicObjectLock* elem);
//JavaThread 当前获取锁的线程
//BasicObjectLock 基础对象锁
我们基于monitorenter
为入口,沿着偏向锁 -> 轻量级锁 -> 重量级锁的路径来分析synchronized的实现过程,继续看interpreterRuntime.cpp
源码:
//------------------------------------------------------------------------------------------------------------------------
// Synchronization
//
// The interpreter's synchronization code is factored out so that it can
// be shared by method invocation and synchronized blocks.
//%note synchronization_3
//%note monitor_1
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
if (PrintBiasedLockingStatistics) {
Atomic::inc(BiasedLocking::slow_path_entry_count_addr());
}
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (UseBiasedLocking) {
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
UseBiasedLocking
是在JVM启动的时候,是否启动偏向锁的标识::
- 如果支持偏向锁,则执行
ObjectSynchronizer::fast_enter
的逻辑 - 如果不支持偏向锁,则执行
ObjectSynchronizer::slow_enter
逻辑,绕过偏向锁,直接进入轻量级锁
ObjectSynchronizer::fast_enter
的实现在 synchronizer.cpp
文件中,代码如下:
// -----------------------------------------------------------------------------
// Fast Monitor Enter/Exit
// This the fast monitor enter. The interpreter and compiler use
// some assembly copies of this code. Make sure update those code
// if the following function is changed. The implementation is
// extremely sensitive to race condition. Be careful.
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock,
bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) { //判断是否开启了偏向锁
if (!SafepointSynchronize::is_at_safepoint()) { //如果不处于全局安全点
//通过`revoke_and_rebias`这个函数尝试获取偏向锁
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) { //如果是撤销与重偏向直接返回
return;
}
} else { //如果在安全点,撤销偏向锁
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter(obj, lock, THREAD);
}
fast_enter
方法的主要流程做一个简单的解释:
- 再次检查偏向锁是否开启
- 当处于不安全点时,通过
revoke_and_rebias
尝试获取偏向锁,如果成功则直接返回,如果失败则进入轻量级锁获取过程 -
revoke_and_rebias
这个偏向锁的获取逻辑在biasedLocking.cpp
中 - 如果偏向锁未开启,则进入
slow_enter
获取轻量级锁的流程
偏向锁的获取
BiasedLocking::revoke_and_rebias
是用来获取当前偏向锁的状态(可能是偏向锁撤销后重新偏向)。这个方法的逻辑在 biasedLocking.cpp
中
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
// We can revoke the biases of anonymously-biased objects
// efficiently enough that we should not cause these revocations to
// update the heuristics because doing so may cause unwanted bulk
// revocations (which are expensive) to occur.
markOop mark = obj->mark(); //获取锁对象的对象头
//判断mark是否为可偏向状态,即mark的偏向锁标志位为1,锁标志位为 01,线程id为null
if (mark->is_biased_anonymously() && !attempt_rebias) {
// We are probably trying to revoke the bias of this object due to
// an identity hash code computation. Try to revoke the bias
// without a safepoint. This is possible if we can successfully
// compare-and-exchange an unbiased header into the mark word of
// the object, meaning that no other thread has raced to acquire
// the bias of the object.
//这个分支是进行对象的hashCode计算时会进入,在一个非全局安全点进行偏向锁撤销
markOop biased_value = mark;
//创建一个非偏向的markword
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = obj->cas_set_mark(unbiased_prototype, mark);
if (res_mark == biased_value) { //如果CAS成功,返回偏向锁撤销状态
return BIAS_REVOKED;
}
} else if (mark->has_bias_pattern()) {//如果锁对象为可偏向状态(biased_lock:1, lock:01,不管线程id是否为空),尝试重新偏向
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
if (!prototype_header->has_bias_pattern()) { //如果已经有线程对锁对象进行了全局锁定,则取消偏向锁操作
// This object has a stale bias from before the bulk revocation
// for this data type occurred. It's pointless to update the
// heuristics at this point so simply update the header with a
// CAS. If we fail this race, the object's bias has been revoked
// by another thread so we simply return and let the caller deal
// with it.
markOop biased_value = mark;
//CAS 更新对象头markword为非偏向锁
markOop res_mark = obj->cas_set_mark(prototype_header, mark);
assert(!obj->mark()->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED; //返回偏向锁撤销状态
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
// The epoch of this biasing has expired indicating that the
// object is effectively unbiased. Depending on whether we need
// to rebias or revoke the bias of this object we can do it
// efficiently enough with a CAS that we shouldn't update the
// heuristics. This is normally done in the assembly code but we
// can reach this point due to various points in the runtime
// needing to revoke biases.
//如果偏向锁过期,则进入当前分支
if (attempt_rebias) { //如果允许尝试获取偏向锁
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
//通过CAS 操作, 将本线程的 ThreadID 、时间错、分代年龄尝试写入对象头中
markOop res_mark = obj->cas_set_mark(rebiased_prototype, mark);
if (res_mark == biased_value) { //CAS成功,则返回撤销和重新偏向状态
return BIAS_REVOKED_AND_REBIASED;
}
} else { //不尝试获取偏向锁,则取消偏向锁
//通过CAS操作更新分代年龄
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = obj->cas_set_mark(unbiased_prototype, mark);
if (res_mark == biased_value) { //如果CAS操作成功,返回偏向锁撤销状态
return BIAS_REVOKED;
}
}
}
}
偏向锁的撤销
当到达一个全局安全点时,这时会根据偏向锁的状态来判断是否需要撤销偏向锁,调用 revoke_at_safepoint方法,这个方法也是在 biasedLocking.cpp中定义的:
void BiasedLocking::revoke_at_safepoint(Handle h_obj) {
assert(SafepointSynchronize::is_at_safepoint(), "must only be called while at safepoint");
oop obj = h_obj()
//更新撤销偏向锁计数,并返回偏向锁撤销次数和偏向次数
HeuristicsResult heuristics = update_heuristics(obj, false);
if (heuristics == HR_SINGLE_REVOKE) { //可偏向且未达到批量处理的阈值(下面会单独解释)
revoke_bias(obj, false, false, NULL, NULL);
} else if ((heuristics == HR_BULK_REBIAS) ||
(heuristics == HR_BULK_REVOKE)) { //如果是多次撤销或者多次偏向
//批量撤销
bulk_revoke_or_rebias_at_safepoint(obj, (heuristics == HR_BULK_REBIAS), false, NULL);
}
clean_up_cached_monitor_info();
}
偏向锁的释放,需要等待全局安全点(在这个时间点上没有正在执行的字节码),首先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否还活着,如果线程不处于活动状态,则将对象头设置成无锁状态。如果线程仍然活着,则会升级为轻量级锁,遍历偏向对象的所记录。栈帧中的锁记录和对象头的Mark Word要么重新偏向其他线程,要么恢复到无锁,或者标记对象不适合作为偏向锁。最后唤醒暂停的线程。
JVM内部为每个类维护了一个偏向锁
revoke
计数器,对偏向锁撤销进行计数,当这个值达到指定阈值时,JVM会认为这个类的偏向锁有问题,需要重新偏向(rebias
),对所有属于这个类的对象进行重偏向的操作成为 批量重偏向(bulk rebias)。在做bulk rebias
时,会对这个类的epoch的值做递增,这个epoch
会存储在对象头中的epoch
字段。在判断这个对象是否获得偏向锁的条件是:markword
的biased_lock:1、lock:01、threadid
和当前线程id相等、epoch字段和所属类的epoch值相同,如果epoch的值不一样,要么就是撤销偏向锁、要么就是rebias
; 如果这个类的revoke
计数器的值继续增加到一个阈值,那么jvm会认为这个类不适合偏向锁,就需要进行bulk revoke
操作
轻量级锁的获取
轻量级锁的获取,是调用 ::slow_enter
方法,该方法同样位于 synchronizer.cpp
文件中
// -----------------------------------------------------------------------------
// Interpreter/Compiler Slow Case
// This routine is used to handle interpreter/compiler slow case
// We don't need to use fast path here, because it must have been
// failed in the interpreter/compiler code.
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
if (mark->is_neutral()) { //如果当前是无锁状态, markword的biase_lock:0,lock:01
// Anticipate successful CAS -- the ST of the displaced mark must
// be visible <= the ST performed by the CAS.
//直接把mark保存到BasicLock对象的_displaced_header字段
lock->set_displaced_header(mark);
//通过CAS将mark word更新为指向BasicLock对象的指针,更新成功表示获得了轻量级锁
if (mark == obj()->cas_set_mark((markOop) lock, mark)) {
TEVENT(slow_enter: release stacklock);
return;
}
//如果markword处于加锁状态、且markword中的ptr指针指向当前线程的栈帧,表示为重入操作,不需要争抢锁
// Fall through to inflate() ...
} else if (mark->has_locker() &&
THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
// The object header will never be displaced to this lock,
// so it does not matter what the value is, except that it
// must be non-zero to avoid looking like a re-entrant lock,
// and must not look locked either.
//代码执行到这里,说明有多个线程竞争轻量级锁,轻量级锁通过`inflate`进行膨胀升级为重量级锁
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD,
obj(),
inflate_cause_monitor_enter)->enter(THREAD);
}
轻量级锁的获取逻辑如下:
mark->is_neutral()
方法,is_neutral
这个方法是在markOop.hpp
中定义,如果biased_lock:0
且lock:01
表示无锁状态如果
mark
处于无锁状态,则进入步骤(3),否则执行步骤(5)把
mark
保存到BasicLock
对象的displacedheader
字段通过
CAS
尝试将markword
更新为指向BasicLock
对象的指针,如果更新成功,表示竞争到锁,则执行同步代码,否则执行步骤(5)如果当前
mark
处于加锁状态,且mark
中的ptr
指针指向当前线程的栈帧,则执行同步代码,否则说明有多个线程竞争轻量级锁,轻量级锁需要膨胀升级为重量级锁
轻量级锁的释放
轻量级锁的释放是通过interpreterRuntime.cpp
文件中的monitorexit
调用:
//%note monitor_1
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
Handle h_obj(thread, elem->obj());
assert(Universe::heap()->is_in_reserved_or_null(h_obj()),
"must be NULL or an object");
if (elem == NULL || h_obj()->is_unlocked()) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
// Free entry. This must be done here, since a pending exception might be installed on
// exit. If it is not cleared, the exception handling code will try to unlock the monitor again.
elem->set_obj(NULL);
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
这段代码中主要是通过synchronizer.cpp
文件中 ObjectSynchronizer::slow_exit
来执行
// This routine is used to handle interpreter/compiler slow case
// We don't need to use fast path here, because it must have
// failed in the interpreter/compiler code. Simply use the heavy
// weight monitor should be ok, unless someone find otherwise.
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit(object, lock, THREAD);
}
ObjectSynchronizer::fast_exit
的代码如下:
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
markOop mark = object->mark(); //获取线程栈帧中锁记录(LockRecord)中的markword
// We cannot check for Biased Locking if we are racing an inflation.
assert(mark == markOopDesc::INFLATING() ||
!mark->has_bias_pattern(), "should not see bias pattern here");
markOop dhw = lock->displaced_header();
if (dhw == NULL) {
// If the displaced header is NULL, then this exit matches up with
// a recursive enter. No real work to do here except for diagnostics.
#ifndef PRODUCT
if (mark != markOopDesc::INFLATING()) {
// Only do diagnostics if we are not racing an inflation. Simply
// exiting a recursive enter of a Java Monitor that is being
// inflated is safe; see the has_monitor() comment below.
assert(!mark->is_neutral(), "invariant");
assert(!mark->has_locker() ||
THREAD->is_lock_owned((address)mark->locker()), "invariant");
if (mark->has_monitor()) {
// The BasicLock's displaced_header is marked as a recursive
// enter and we have an inflated Java Monitor (ObjectMonitor).
// This is a special case where the Java Monitor was inflated
// after this thread entered the stack-lock recursively. When a
// Java Monitor is inflated, we cannot safely walk the Java
// Monitor owner's stack and update the BasicLocks because a
// Java Monitor can be asynchronously inflated by a thread that
// does not own the Java Monitor.
ObjectMonitor * m = mark->monitor();
assert(((oop)(m->object()))->mark() == mark, "invariant");
assert(m->is_entered(THREAD), "invariant");
}
}
#endif
return;
}
if (mark == (markOop) lock) {
// If the object is stack-locked by the current thread, try to
// swing the displaced header from the BasicLock back to the mark.
assert(dhw->is_neutral(), "invariant");
//通过CAS尝试将Displaced Mark Word替换回对象头,如果成功,表示锁释放成功。
if (object->cas_set_mark(dhw, mark) == mark) {
TEVENT(fast_exit: release stack-lock);
return;
}
}
//锁膨胀,调用重量级锁的释放锁方法
// We have to take the slow-path of possible inflation and then exit.
ObjectSynchronizer::inflate(THREAD,
object,
inflate_cause_vm_internal)->exit(true, THREAD);
}
轻量级锁的释放,就是将当前线程栈帧中锁记录空间中的Mark Word替换到锁对象的对象头中,如果成功表示锁释放成功。否则,锁膨胀成重量级锁,实现重量级锁的释放锁逻辑
锁膨胀的过程分析
重量级锁是通过对象内部的监视器(monitor)来实现,而monitor的本质是依赖操作系统底层的MutexLock实现的。我们先来看锁的膨胀过程,从前面的分析中已经知道了所膨胀的过程是通过 ObjectSynchronizer::inflate
方法实现的,代码如下:
ObjectMonitor* ObjectSynchronizer::inflate(Thread * Self,
oop object,
const InflateCause cause) {
// Inflate mutates the heap ...
// Relaxing assertion for bug 6320749.
assert(Universe::verify_in_progress() ||
!SafepointSynchronize::is_at_safepoint(), "invariant");
EventJavaMonitorInflate event;
for (;;) { //自旋
const markOop mark = object->mark();
assert(!mark->has_bias_pattern(), "invariant");
// The mark can be in one of the following states:
// * Inflated - just return
// * Stack-locked - coerce it to inflated
// * INFLATING - busy wait for conversion to complete
// * Neutral - aggressively inflate the object.
// * BIASED - Illegal. We should never see this
// CASE: inflated
if (mark->has_monitor()) { //has_monitor是markOop.hpp中的方法,如果为true表示当前锁已经是重量级锁了
ObjectMonitor * inf = mark->monitor(); //获得重量级锁的对象监视器直接返回
assert(inf->header()->is_neutral(), "invariant");
assert(oopDesc::equals((oop) inf->object(), object), "invariant");
assert(ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf;
}
// CASE: inflation in progress - inflating over a stack-lock.
// Some other thread is converting from stack-locked to inflated.
// Only that thread can complete inflation -- other threads must wait.
// The INFLATING value is transient.
// Currently, we spin/yield/park and poll the markword, waiting for inflation to finish.
// We could always eliminate polling by parking the thread on some auxiliary list.
if (mark == markOopDesc::INFLATING()) { //膨胀等待,表示存在线程正在膨胀,通过continue进行下一轮的膨胀
TEVENT(Inflate: spin while INFLATING);
ReadStableMark(object);
continue;
}
// CASE: stack-locked
// Could be stack-locked either by this thread or by some other thread.
if (mark->has_locker()) { //表示当前锁为轻量级锁,以下是轻量级锁的膨胀逻辑
ObjectMonitor * m = omAlloc(Self); //获取一个可用的ObjectMonitor
// Optimistically prepare the objectmonitor - anticipate successful CAS
// We do this before the CAS in order to minimize the length of time
// in which INFLATING appears in the mark.
m->Recycle();
m->_Responsible = NULL;
m->_recursions = 0;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit; // Consider: maintain by type/class
/**将object->mark_addr()和mark比较,如果这两个值相等,则将object->mark_addr()
改成markOopDesc::INFLATING(),相等返回是mark,不相等返回的是object->mark_addr()**/
markOop cmp = object->cas_set_mark(markOopDesc::INFLATING(), mark);
if (cmp != mark) { // CAS失败
omRelease(Self, m, true); //释放监视器
continue; // Interference -- just retry
}
// fetch the displaced mark from the owner's stack.
// The owner can't die or unwind past the lock while our INFLATING
// object is in the mark. Furthermore the owner can't complete
// an unlock on the object, either.
markOop dmw = mark->displaced_mark_helper();
assert(dmw->is_neutral(), "invariant");
//CAS成功以后,设置ObjectMonitor相关属性
// Setup monitor fields to proper values -- prepare the monitor
m->set_header(dmw);
// Optimization: if the mark->locker stack address is associated
// with this thread we could simply set m->_owner = Self.
// Note that a thread can inflate an object
// that it has stack-locked -- as might happen in wait() -- directly
// with CAS. That is, we can avoid the xchg-NULL .... ST idiom.
m->set_owner(mark->locker());
m->set_object(object);
// TODO-FIXME: assert BasicLock->dhw != 0.
// Must preserve store ordering. The monitor state must
// be stable at the time of publishing the monitor address.
guarantee(object->mark() == markOopDesc::INFLATING(), "invariant");
object->release_set_mark(markOopDesc::encode(m));
// Hopefully the performance counters are allocated on distinct cache lines
// to avoid false sharing on MP systems ...
OM_PERFDATA_OP(Inflations, inc());
TEVENT(Inflate: overwrite stacklock);
if (log_is_enabled(Debug, monitorinflation)) {
if (object->is_instance()) {
ResourceMark rm;
log_debug(monitorinflation)("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
p2i(object), p2i(object->mark()),
object->klass()->external_name());
}
}
if (event.should_commit()) {
post_monitor_inflate_event(&event, object, cause);
}
return m; //返回ObjectMonitor
}
//如果是无锁状态
assert(mark->is_neutral(), "invariant");
ObjectMonitor * m = omAlloc(Self);//获取一个可用的ObjectMonitor
// prepare m for installation - set monitor to initial state
//设置ObjectMonitor相关属性
m->Recycle();
m->set_header(mark);
m->set_owner(NULL);
m->set_object(object);
m->_recursions = 0;
m->_Responsible = NULL;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit; // consider: keep metastats by type/class
/**将object->mark_addr()和mark比较,如果这两个值相等,则将object->mark_addr()
改成markOopDesc::encode(m),相等返回是mark,不相等返回的是object->mark_addr()**/
if (object->cas_set_mark(markOopDesc::encode(m), mark) != mark) {
//CAS失败,说明出现了锁竞争,则释放监视器重行竞争锁
m->set_object(NULL);
m->set_owner(NULL);
m->Recycle();
omRelease(Self, m, true);
m = NULL;
continue;
// interference - the markword changed - just retry.
// The state-transitions are one-way, so there's no chance of
// live-lock -- "Inflated" is an absorbing state.
}
// Hopefully the performance counters are allocated on distinct
// cache lines to avoid false sharing on MP systems ...
OM_PERFDATA_OP(Inflations, inc());
TEVENT(Inflate: overwrite neutral);
if (log_is_enabled(Debug, monitorinflation)) {
if (object->is_instance()) {
ResourceMark rm;
log_debug(monitorinflation)("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
p2i(object), p2i(object->mark()),
object->klass()->external_name());
}
}
if (event.should_commit()) {
post_monitor_inflate_event(&event, object, cause);
}
return m; //返回ObjectMonitor对象
}
}
整个锁膨胀的过程是通过自旋来完成的,具体的实现逻辑简答总结以下几点:
-
mark->has_monitor()
判断如果当前锁对象为重量级锁,也就是lock:10
,则执行(2),否则执行(3);
-
- 通过
mark->monitor
获得重量级锁的对象监视器ObjectMonitor
并返回,锁膨胀过程结束;
- 通过
- 如果当前锁处于
INFLATING
,说明有其他线程在执行锁膨胀,那么当前线程通过自旋等待其他线程锁膨胀完成;
- 如果当前锁处于
- 如果当前是轻量级锁状态
mark->has_locker()
,则进行锁膨胀。首先,通过omAlloc
方法获得一个可用的;ObjectMonitor
,并设置初始数据;然后通过CAS
将对象头设置为markOopDesc:INFLATING
,表示当前锁正在膨胀,如果CAS
失败,继续自旋;
- 如果当前是轻量级锁状态
- 如果是无锁状态,逻辑类似第4步骤
锁膨胀的过程实际上是获得一个
ObjectMonitor
对象监视器,而真正抢占锁的逻辑,在ObjectMonitor::enter
方法里面;
重量级锁的竞争
重量级锁的竞争,在 ObjectMonitor::enter
方法中,代码文件在 objectMonitor.cpp
重量级锁的代码就不一一分析了,简单说一下下面这段代码主要做的几件事:
- 通过
CAS
将monitor
的_owner
字段设置为当前线程,如果设置成功,则直接返回; - 如果之前的
_owner
指向的是当前的线程,说明是重入,执行_recursions++
增加重入次数; - 如果当前线程获取监视器锁成功,将
_recursions
设置为1,_owner设
置为当前线程; - 如果获取锁失败,则等待锁释放;
void ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD;
void * cur = Atomic::cmpxchg(Self, &_owner, (void*)NULL);
if (cur == NULL) { //CAS成功
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
assert(_recursions == 0, "invariant");
assert(_owner == Self, "invariant");
return;
}
if (cur == Self) {
// TODO-FIXME: check for integer overflow! BUGID 6557169.
_recursions++;
return;
}
if (Self->is_lock_owned ((address)cur)) {
assert(_recursions == 0, "internal state error");
_recursions = 1;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
_owner = Self;
return;
}
// We've encountered genuine contention.
assert(Self->_Stalled == 0, "invariant");
Self->_Stalled = intptr_t(this);
// Try one round of spinning *before* enqueueing Self
// and before going through the awkward and expensive state
// transitions. The following spin is strictly optional ...
// Note that if we acquire the monitor from an initial spin
// we forgo posting JVMTI events and firing DTRACE probes.
if (Knob_SpinEarly && TrySpin (Self) > 0) {
assert(_owner == Self, "invariant");
assert(_recursions == 0, "invariant");
assert(((oop)(object()))->mark() == markOopDesc::encode(this), "invariant");
Self->_Stalled = 0;
return;
}
assert(_owner != Self, "invariant");
assert(_succ != Self, "invariant");
assert(Self->is_Java_thread(), "invariant");
JavaThread * jt = (JavaThread *) Self;
assert(!SafepointSynchronize::is_at_safepoint(), "invariant");
assert(jt->thread_state() != _thread_blocked, "invariant");
assert(this->object() != NULL, "invariant");
assert(_count >= 0, "invariant");
// Prevent deflation at STW-time. See deflate_idle_monitors() and is_busy().
// Ensure the object-monitor relationship remains stable while there's contention.
Atomic::inc(&_count);
JFR_ONLY(JfrConditionalFlushWithStacktrace<EventJavaMonitorEnter> flush(jt);)
EventJavaMonitorEnter event;
if (event.should_commit()) {
event.set_monitorClass(((oop)this->object())->klass());
event.set_address((uintptr_t)(this->object_addr()));
}
{ // Change java thread status to indicate blocked on monitor enter.
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);
Self->set_current_pending_monitor(this);
DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_enter()) {
JvmtiExport::post_monitor_contended_enter(jt, this);
// The current thread does not yet own the monitor and does not
// yet appear on any queues that would get it made the successor.
// This means that the JVMTI_EVENT_MONITOR_CONTENDED_ENTER event
// handler cannot accidentally consume an unpark() meant for the
// ParkEvent associated with this ObjectMonitor.
}
OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);
// TODO-FIXME: change the following for(;;) loop to straight-line code.
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
EnterI(THREAD);
if (!ExitSuspendEquivalent(jt)) break;
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don't want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0;
_succ = NULL;
exit(false, Self);
jt->java_suspend_self();
}
Self->set_current_pending_monitor(NULL);
}
Atomic::dec(&_count);
assert(_count >= 0, "invariant");
Self->_Stalled = 0;
// Must either set _recursions = 0 or ASSERT _recursions == 0.
assert(_recursions == 0, "invariant");
assert(_owner == Self, "invariant");
assert(_succ != Self, "invariant");
assert(((oop)(object()))->mark() == markOopDesc::encode(this), "invariant");
DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_entered()) {
JvmtiExport::post_monitor_contended_entered(jt, this);
// The current thread already owns the monitor and is not going to
// call park() for the remainder of the monitor enter protocol. So
// it doesn't matter if the JVMTI_EVENT_MONITOR_CONTENDED_ENTERED
// event handler consumed an unpark() issued by the thread that
// just exited the monitor.
}
if (event.should_commit()) {
event.set_previousOwner((uintptr_t)_previous_owner_tid);
event.commit();
}
OM_PERFDATA_OP(ContendedLockAttempts, inc());
}
如果获取锁失败,则需要通过自旋的方式等待锁释放,自旋执行的方法是 ObjectMonitor::EnterI
,部分代码如下
- 将当前线程封装成
ObjectWaiter
对象node
,状态设置成TS_CXQ
; - 通过自旋操作将
node
节点push到_cxq
队列; -
node
节点添加到_cxq
队列之后,继续通过自旋尝试获取锁,如果在指定的阈值范围内没有获得锁,则通过park
将当前线程挂起,等待被唤醒;
void ObjectMonitor::EnterI(TRAPS) {
Thread * const Self = THREAD;
assert(Self->is_Java_thread(), "invariant");
assert(((JavaThread *) Self)->thread_state() == _thread_blocked, "invariant");
// Try the lock - TATAS
if (TryLock (Self) > 0) {
assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
DeferredInitialize();
// We try one round of spinning *before* enqueueing Self.
//
// If the _owner is ready but OFFPROC we could use a YieldTo()
// operation to donate the remainder of this thread's quantum
// to the owner. This has subtle but beneficial affinity
// effects.
if (TrySpin (Self) > 0) {
assert(_owner == Self, "invariant");
assert(_succ != Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
// The Spin failed -- Enqueue and park the thread ...
assert(_succ != Self, "invariant");
assert(_owner != Self, "invariant");
assert(_Responsible != Self, "invariant");
// Enqueue "Self" on ObjectMonitor's _cxq.
//
// Node acts as a proxy for Self.
// As an aside, if were to ever rewrite the synchronization code mostly
// in Java, WaitNodes, ObjectMonitors, and Events would become 1st-class
// Java objects. This would avoid awkward lifecycle and liveness issues,
// as well as eliminate a subset of ABA issues.
// TODO: eliminate ObjectWaiter and enqueue either Threads or Events.
ObjectWaiter node(Self);
Self->_ParkEvent->reset();
node._prev = (ObjectWaiter *) 0xBAD;
node.TState = ObjectWaiter::TS_CXQ;
// Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
ObjectWaiter * nxt;
for (;;) {
node._next = nxt = _cxq;
if (Atomic::cmpxchg(&node, &_cxq, nxt) == nxt) break;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert(_succ != Self, "invariant");
assert(_owner == Self, "invariant");
assert(_Responsible != Self, "invariant");
return;
}
}
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
// Try to assume the role of responsible thread for the monitor.
// CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
Atomic::replace_if_null(Self, &_Responsible);
}
// The lock might have been released while this thread was occupied queueing
// itself onto _cxq. To close the race and avoid "stranding" and
// progress-liveness failure we must resample-retry _owner before parking.
// Note the Dekker/Lamport duality: ST cxq; MEMBAR; LD Owner.
// In this case the ST-MEMBAR is accomplished with CAS().
//
// TODO: Defer all thread state transitions until park-time.
// Since state transitions are heavy and inefficient we'd like
// to defer the state transitions until absolutely necessary,
// and in doing so avoid some transitions ...
TEVENT(Inflated enter - Contention);
int nWakeups = 0;
int recheckInterval = 1;
//node节点添加到_cxq队列之后,继续通过自旋尝试获取锁,如果在指定的阈值范围内没有获得锁,则通过park将当前线程挂起,等待被唤醒
for (;;) {
if (TryLock(Self) > 0) break;
assert(_owner != Self, "invariant");
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::replace_if_null(Self, &_Responsible);
}
// park self
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT(Inflated enter - park TIMED);
Self->_ParkEvent->park((jlong) recheckInterval);
// Increase the recheckInterval, but clamp the value.
recheckInterval *= 8;
if (recheckInterval > MAX_RECHECK_INTERVAL) {
recheckInterval = MAX_RECHECK_INTERVAL;
}
} else {
TEVENT(Inflated enter - park UNTIMED);
Self->_ParkEvent->park();
}
if (TryLock(Self) > 0) break;
// The lock is still contested.
// Keep a tally of the # of futile wakeups.
// Note that the counter is not protected by a lock or updated by atomics.
// That is by design - we trade "lossy" counters which are exposed to
// races during updates for a lower probe effect.
TEVENT(Inflated enter - Futile wakeup);
// This PerfData object can be used in parallel with a safepoint.
// See the work around in PerfDataManager::destroy().
OM_PERFDATA_OP(FutileWakeups, inc());
++nWakeups;
// Assuming this is not a spurious wakeup we'll normally find _succ == Self.
// We can defer clearing _succ until after the spin completes
// TrySpin() must tolerate being called with _succ == Self.
// Try yet another round of adaptive spinning.
if ((Knob_SpinAfterFutile & 1) && TrySpin(Self) > 0) break;
// We can find that we were unpark()ed and redesignated _succ while
// we were spinning. That's harmless. If we iterate and call park(),
// park() will consume the event and return immediately and we'll
// just spin again. This pattern can repeat, leaving _succ to simply
// spin on a CPU. Enable Knob_ResetEvent to clear pending unparks().
// Alternately, we can sample fired() here, and if set, forgo spinning
// in the next iteration.
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset();
OrderAccess::fence();
}
if (_succ == Self) _succ = NULL;
// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence();
}
// Egress :
// Self has acquired the lock -- Unlink Self from the cxq or EntryList.
// Normally we'll find Self on the EntryList .
// From the perspective of the lock owner (this thread), the
// EntryList is stable and cxq is prepend-only.
// The head of cxq is volatile but the interior is stable.
// In addition, Self.TState is stable.
assert(_owner == Self, "invariant");
assert(object() != NULL, "invariant");
// I'd like to write:
// guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
// but as we're at a safepoint that's not safe.
UnlinkAfterAcquire(Self, &node);
if (_succ == Self) _succ = NULL;
assert(_succ != Self, "invariant");
if (_Responsible == Self) {
_Responsible = NULL;
OrderAccess::fence(); // Dekker pivot-point
}
if (SyncFlags & 8) {
OrderAccess::fence();
}
return;
}
TryLock(self)
的代码是在 ObjectMonitor::TryLock
定义的,代码的实现如下:
代码的实现原理很简单,通过自旋,
CAS
设置monitor
的_owner
字段为当前线程,如果成功,表示获取到了锁,如果失败,则继续被挂起。
// Caveat: TryLock() is not necessarily serializing if it returns failure.
// Callers must compensate as needed.
int ObjectMonitor::TryLock(Thread * Self) {
void * own = _owner;
if (own != NULL) return 0;
if (Atomic::replace_if_null(Self, &_owner)) {
// Either guarantee _recursions == 0 or set _recursions = 0.
assert(_recursions == 0, "invariant");
assert(_owner == Self, "invariant");
return 1;
}
// The lock had been free momentarily, but we lost the race to the lock.
// Interference -- the CAS failed.
// We can either return -1 or retry.
// Retry doesn't make as much sense because the lock was just acquired.
return -1;
}
重量级锁的释放
重量级锁的释放是通过 ObjectMonitor::exit
来实现的,释放以后会通知被阻塞的线程去竞争锁:
- 判断当前锁对象中的owner没有指向当前线程,如果owner指向的BasicLock在当前线程栈上,那么将_owner指向当前线程;
- 如果当前锁对象中的_owner指向当前线程,则判断当前线程重入锁的次数,如果不为0,继续执行ObjectMonitor::exit(),直到重入锁次数为0为止;
- 释放当前锁,并根据QMode的模式判断,是将_cxq中挂起的线程唤醒,还是其他操作;
void ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * const Self = THREAD;
if (THREAD != _owner) { //如果当前锁对象中的_owner没有指向当前线程
//如果_owner指向的BasicLock在当前线程栈上,那么将_owner指向当前线程
if (THREAD->is_lock_owned((address) _owner)) {
// Transmute _owner from a BasicLock pointer to a Thread address.
// We don't need to hold _mutex for this transition.
// Non-null to Non-null is safe as long as all readers can
// tolerate either flavor.
assert(_recursions == 0, "invariant");
_owner = THREAD;
_recursions = 0;
} else {
// Apparent unbalanced locking ...
// Naively we'd like to throw IllegalMonitorStateException.
// As a practical matter we can neither allocate nor throw an
// exception as ::exit() can be called from leaf routines.
// see x86_32.ad Fast_Unlock() and the I1 and I2 properties.
// Upon deeper reflection, however, in a properly run JVM the only
// way we should encounter this situation is in the presence of
// unbalanced JNI locking. TODO: CheckJNICalls.
// See also: CR4414101
TEVENT(Exit - Throw IMSX);
assert(false, "Non-balanced monitor enter/exit! Likely JNI locking");
return;
}
}
//如果当前,线程重入锁的次数,不为0,那么就重新走ObjectMonitor::exit,直到重入锁次数为0为止
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT(Inflated exit - recursive);
return;
}
// Invariant: after setting Responsible=null an thread must execute
// a MEMBAR or other serializing instruction before fetching EntryList|cxq.
if ((SyncFlags & 4) == 0) {
_Responsible = NULL;
}
#if INCLUDE_JFR
// get the owner's thread id for the MonitorEnter event
// if it is enabled and the thread isn't suspended
if (not_suspended && EventJavaMonitorEnter::is_enabled()) {
_previous_owner_tid = JFR_THREAD_ID(Self);
}
#endif
for (;;) {
assert(THREAD == _owner, "invariant");
if (Knob_ExitPolicy == 0) {
OrderAccess::release_store(&_owner, (void*)NULL); // drop the lock
OrderAccess::storeload(); // See if we need to wake a successor
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT(Inflated exit - simple egress);
return;
}
TEVENT(Inflated exit - complex egress);
if (!Atomic::replace_if_null(THREAD, &_owner)) {
return;
}
TEVENT(Exit - Reacquired);
} else {
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
OrderAccess::release_store(&_owner, (void*)NULL); // drop the lock
OrderAccess::storeload();
// Ratify the previously observed values.
if (_cxq == NULL || _succ != NULL) {
TEVENT(Inflated exit - simple egress);
return;
}
if (!Atomic::replace_if_null(THREAD, &_owner)) {
TEVENT(Inflated exit - reacquired succeeded);
return;
}
TEVENT(Inflated exit - reacquired failed);
} else {
TEVENT(Inflated exit - complex egress);
}
}
guarantee(_owner == THREAD, "invariant");
ObjectWaiter * w = NULL;
int QMode = Knob_QMode;
//根据QMode的模式判断,
//如果QMode == 2则直接从_cxq挂起的线程中唤醒
if (QMode == 2 && _cxq != NULL) {
// QMode == 2 : cxq has precedence over EntryList.
// Try to directly wake a successor from the cxq.
// If successful, the successor will need to unlink itself from cxq.
w = _cxq;
assert(w != NULL, "invariant");
assert(w->TState == ObjectWaiter::TS_CXQ, "Invariant");
ExitEpilog(Self, w);
return;
}
if (QMode == 3 && _cxq != NULL) {
// Aggressively drain cxq into EntryList at the first opportunity.
// This policy ensure that recently-run threads live at the head of EntryList.
// Drain _cxq into EntryList - bulk transfer.
// First, detach _cxq.
// The following loop is tantamount to: w = swap(&cxq, NULL)
w = _cxq;
for (;;) {
assert(w != NULL, "Invariant");
ObjectWaiter * u = Atomic::cmpxchg((ObjectWaiter*)NULL, &_cxq, w);
if (u == w) break;
w = u;
}
assert(w != NULL, "invariant");
ObjectWaiter * q = NULL;
ObjectWaiter * p;
for (p = w; p != NULL; p = p->_next) {
guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
p->TState = ObjectWaiter::TS_ENTER;
p->_prev = q;
q = p;
}
// Append the RATs to the EntryList
// TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
ObjectWaiter * Tail;
for (Tail = _EntryList; Tail != NULL && Tail->_next != NULL;
Tail = Tail->_next)
/* empty */;
if (Tail == NULL) {
_EntryList = w;
} else {
Tail->_next = w;
w->_prev = Tail;
}
// Fall thru into code that tries to wake a successor from EntryList
}
if (QMode == 4 && _cxq != NULL) {
// Aggressively drain cxq into EntryList at the first opportunity.
// This policy ensure that recently-run threads live at the head of EntryList.
// Drain _cxq into EntryList - bulk transfer.
// First, detach _cxq.
// The following loop is tantamount to: w = swap(&cxq, NULL)
w = _cxq;
for (;;) {
assert(w != NULL, "Invariant");
ObjectWaiter * u = Atomic::cmpxchg((ObjectWaiter*)NULL, &_cxq, w);
if (u == w) break;
w = u;
}
assert(w != NULL, "invariant");
ObjectWaiter * q = NULL;
ObjectWaiter * p;
for (p = w; p != NULL; p = p->_next) {
guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
p->TState = ObjectWaiter::TS_ENTER;
p->_prev = q;
q = p;
}
// Prepend the RATs to the EntryList
if (_EntryList != NULL) {
q->_next = _EntryList;
_EntryList->_prev = q;
}
_EntryList = w;
// Fall thru into code that tries to wake a successor from EntryList
}
w = _EntryList;
if (w != NULL) {
assert(w->TState == ObjectWaiter::TS_ENTER, "invariant");
ExitEpilog(Self, w);
return;
}
// If we find that both _cxq and EntryList are null then just
// re-run the exit protocol from the top.
w = _cxq;
if (w == NULL) continue;
// Drain _cxq into EntryList - bulk transfer.
// First, detach _cxq.
// The following loop is tantamount to: w = swap(&cxq, NULL)
for (;;) {
assert(w != NULL, "Invariant");
ObjectWaiter * u = Atomic::cmpxchg((ObjectWaiter*)NULL, &_cxq, w);
if (u == w) break;
w = u;
}
TEVENT(Inflated exit - drain cxq into EntryList);
assert(w != NULL, "invariant");
assert(_EntryList == NULL, "invariant");
// Convert the LIFO SLL anchored by _cxq into a DLL.
// The list reorganization step operates in O(LENGTH(w)) time.
// It's critical that this step operate quickly as
// "Self" still holds the outer-lock, restricting parallelism
// and effectively lengthening the critical section.
// Invariant: s chases t chases u.
// TODO-FIXME: consider changing EntryList from a DLL to a CDLL so
// we have faster access to the tail.
if (QMode == 1) {
// QMode == 1 : drain cxq to EntryList, reversing order
// We also reverse the order of the list.
ObjectWaiter * s = NULL;
ObjectWaiter * t = w;
ObjectWaiter * u = NULL;
while (t != NULL) {
guarantee(t->TState == ObjectWaiter::TS_CXQ, "invariant");
t->TState = ObjectWaiter::TS_ENTER;
u = t->_next;
t->_prev = u;
t->_next = s;
s = t;
t = u;
}
_EntryList = s;
assert(s != NULL, "invariant");
} else {
// QMode == 0 or QMode == 2
_EntryList = w;
ObjectWaiter * q = NULL;
ObjectWaiter * p;
for (p = w; p != NULL; p = p->_next) {
guarantee(p->TState == ObjectWaiter::TS_CXQ, "Invariant");
p->TState = ObjectWaiter::TS_ENTER;
p->_prev = q;
q = p;
}
}
// In 1-0 mode we need: ST EntryList; MEMBAR #storestore; ST _owner = NULL
// The MEMBAR is satisfied by the release_store() operation in ExitEpilog().
// See if we can abdicate to a spinner instead of waking a thread.
// A primary goal of the implementation is to reduce the
// context-switch rate.
if (_succ != NULL) continue;
w = _EntryList;
if (w != NULL) {
guarantee(w->TState == ObjectWaiter::TS_ENTER, "invariant");
ExitEpilog(Self, w);
return;
}
}
}
根据不同的策略(由QMode
指定),从cxq
或EntryList
中获取头节点,通过ObjectMonitor::ExitEpilog
方法唤醒该节点封装的线程,唤醒操作最终由unpark
完成:
void ObjectMonitor::ExitEpilog(Thread * Self, ObjectWaiter * Wakee) {
assert(_owner == Self, "invariant");
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL;
ParkEvent * Trigger = Wakee->_event;
// Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again.
// The thread associated with Wakee may have grabbed the lock and "Wakee" may be
// out-of-scope (non-extant).
Wakee = NULL;
// Drop the lock
OrderAccess::release_store(&_owner, (void*)NULL);
OrderAccess::fence(); // ST _owner vs LD in unpark()
if (SafepointMechanism::poll(Self)) {
TEVENT(unpark before SAFEPOINT);
}
DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
Trigger->unpark(); //unpark唤醒线程
// Maintain stats and report events to JVMTI
OM_PERFDATA_OP(Parks, inc());
}