LockSupport及HotSpot层Parker::park/unpark分析

1.官方文档

Basic thread blocking primitives for creating locks and other 
synchronization classes.

This class associates, with each thread that uses it, a permit (in the 
sense of the Semaphore class). A call to park will return immediately 
if the permit is available, consuming it in the process; otherwise it may 
block. A call to unpark makes the permit available, if it was not 
already available. (Unlike with Semaphores though, permits do not 
accumulate. There is at most one.)

Methods park and unpark provide efficient means of blocking and 
unblocking threads that do not encounter the problems that cause the 
deprecated methods Thread.suspend and Thread.resume to be 
unusable for such purposes: Races between one thread invoking park 
and another thread trying to unpark it will preserve liveness, due to 
the permit. Additionally, park will return if the caller's thread was 
interrupted, and timeout versions are supported. The park method 
may also return at any other time, for "no reason", so in general must 
be invoked within a loop that rechecks conditions upon return. In this 
sense park serves as an optimization of a "busy wait" that does not 
waste as much time spinning, but must be paired with an unpark to be 
effective.

The three forms of park each also support a blocker object parameter. 
This object is recorded while the thread is blocked to permit 
monitoring and diagnostic tools to identify the reasons that threads 
are blocked. (Such tools may access blockers using method 
getBlocker(Thread).) The use of these forms rather than the original 
forms without this parameter is strongly encouraged. The normal 
argument to supply as a blocker within a lock implementation is this.

These methods are designed to be used as tools for creating higher-
level synchronization utilities, and are not in themselves useful for 
most concurrency control applications. The park method is designed 
for use only in constructions of the form:

 
 while (!canProceed()) { ... LockSupport.park(this); }

where neither canProceed nor any other actions prior to the call to 
park entail locking or blocking. Because only one permit is associated 
with each thread, any intermediary uses of park could interfere with its 
intended effects.

Sample Usage. Here is a sketch of a first-in-first-out non-reentrant 
lock class:

用于创建锁和其他同步类的基本线程阻塞原语。

该类关联一个许可证(参考Semaphore类)。如果许可证可用,park将立即返回,并在此过程中消耗;否则可能会阻塞。如果许可证不可用,则unpark会使许可证可用。 (与信号量不同,许可证不会累积。最多只有一个。)

方法park和unpark提供了阻塞和解除阻塞线程的有效方法,并且没有遇到导致方法Thread.suspend和Thread.resume弃用的问题:一个调用park的线程和另一个尝试unpark的线程之间的竞争由于许可证将保持活动。此外,如果调用者的线程被中断,并且支持超时版本,则park将返回。 park方法也可以在任何其他时间返回,“无理由”,因此通常必须在循环内重复检查条件。在这个意义上,park可以作为“忙碌等待”的优化,不会浪费太多时间自旋,但必须与unpark配对才能有效。

三种形式的park也支持阻塞对象参数。在线程被阻止时记录此对象,以允许监视和诊断工具识别线程被阻止的原因。 (这些工具可以使用方法getBlocker(Thread)访问blockers。)强烈建议使用这些形式而不是没有此参数的原始形式。在锁实现中作为blocker提供的正常参数是this。

这些方法旨在用作创建更高级别同步实用程序的工具,并且对于大多数并发控制应用程序本身并不有用。 park方法仅用于以下形式的结构:

 while (!canProceed()) { ... LockSupport.park(this); }

在调用park之前的canProceed和其他任何行动都不需要锁定或阻塞。因为每个线程只有一个许可证,所以任何在此期间使用park都可能会干扰其预期的效果。

示例用法。以下是FIFO非重入锁的结构草图:

 class FIFOMutex {
   private final AtomicBoolean locked = new AtomicBoolean(false);
   private final Queue<Thread> waiters
     = new ConcurrentLinkedQueue<Thread>();

   public void lock() {
     boolean wasInterrupted = false;
     Thread current = Thread.currentThread();
     waiters.add(current);

     // Block while not first in queue or cannot acquire lock
     while (waiters.peek() != current ||
            !locked.compareAndSet(false, true)) {
       LockSupport.park(this);
       if (Thread.interrupted()) // ignore interrupts while waiting
         wasInterrupted = true;
     }

     waiters.remove();
     if (wasInterrupted)          // reassert interrupt status on exit
       current.interrupt();
   }

   public void unlock() {
     locked.set(false);
     LockSupport.unpark(waiters.peek());
   }
 }

2.park/unpark

2.1 不带blocker的原始版本

    public static void park() {
        UNSAFE.park(false, 0L);
    }
Disables the current thread for thread scheduling purposes 
unless the permit is available.

If the permit is available then it is consumed and the call returns 
immediately; otherwise the current thread becomes disabled for 
thread scheduling purposes and lies dormant until one of three 
things happens:
* Some other thread invokes unpark with the current thread as 
  the target; 
* Some other thread interrupts the current thread; 
* The call spuriously (that is, for no reason) returns.

This method does not report which of these caused the method 
to return. Callers should re-check the conditions which caused 
the thread to park in the first place. Callers may also determine, 
for example, the interrupt status of the thread upon return.

除非许可证可用,否则禁用当前线程进行线程调度。

如果许可证可用,那么它被消耗并且调用立即返回; 否则当前线程禁止进行线程调度,并且在发生以下三种情况之一之前处于休眠状态:

  • 其他一些线程以当前线程作为目标调用unpark;
  • 其他一些线程中断当前线程;
  • 虚假唤醒(即无缘无故)返回。

此方法不会报告这些方法中的哪一个导致返回。调用者应该首先重新检查导致线程park的条件。 例如,调用者还可以在返回时确定线程的中断状态。

    public static void parkNanos(long nanos) {
        if (nanos > 0)
            UNSAFE.park(false, nanos);
    }

最多休眠nanos时间,在park基础上加一条:

  • 如果定时已到,也会返回
    public static void parkUntil(long deadline) {
        UNSAFE.park(true, deadline);
    }

2.2 带有blocker的版本(推荐使用)

blocker:the synchronization object responsible for this thread parking

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, nanos);
            setBlocker(t, null);
        }
    }
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(true, deadline);
        setBlocker(t, null);
    }

2.3 unpark

    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
Makes available the permit for the given thread, if it was not 
already available. If the thread was blocked on park then it will 
unblock. Otherwise, its next call to park is guaranteed not to 
block. This operation is not guaranteed to have any effect at all 
if the given thread has not been started.

如果给定线程尚不可用,则为其提供许可。 如果线程在park时阻塞,则它将解锁。 否则,它下一次调用park保证不会阻塞。 如果给定的线程尚未启动,则无法保证此操作有任何效果。

3.Unsafe.park/unpark调用HotSpot层Parker::park/unpark

wz@wz-All-Series:~/openjdk/jdk8u-src$ grep -rn "Unsafe" ./ |grep -v "build" | grep park
./hotspot/src/share/vm/prims/unsafe.cpp:1265:UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
./hotspot/src/share/vm/prims/unsafe.cpp:1266:  UnsafeWrapper("Unsafe_Unpark");

在./hotspot/src/share/vm/prims/unsafe.cpp中有:

UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  UnsafeWrapper("Unsafe_Unpark");
  Parker* p = NULL;
  if (jthread != NULL) {
    oop java_thread = JNIHandles::resolve_non_null(jthread);
    if (java_thread != NULL) {
      jlong lp = java_lang_Thread::park_event(java_thread);
      if (lp != 0) {
        // This cast is OK even though the jlong might have been read
        // non-atomically on 32bit systems, since there, one word will
        // always be zero anyway and the value set is always the same
        p = (Parker*)addr_from_java(lp);
      } else {
        // Grab lock if apparently null or using older version of library
        MutexLocker mu(Threads_lock);
        java_thread = JNIHandles::resolve_non_null(jthread);
        if (java_thread != NULL) {
          JavaThread* thr = java_lang_Thread::thread(java_thread);
          if (thr != NULL) {
            p = thr->parker();
            if (p != NULL) { // Bind to Java thread for next time.
              java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
            }
          }
        }
      }
    }
  }
  if (p != NULL) {
#ifndef USDT2
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
    HOTSPOT_THREAD_UNPARK(
                          (uintptr_t) p);
#endif /* USDT2 */
    p->unpark();
  }
UNSAFE_END

最后调用的Parker::unpark方法。

同理对于park也一样:

UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  UnsafeWrapper("Unsafe_Park");
  EventThreadPark event;
#ifndef USDT2
  HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
   HOTSPOT_THREAD_PARK_BEGIN(
                             (uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
  HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
  HOTSPOT_THREAD_PARK_END(
                          (uintptr_t) thread->parker());
#endif /* USDT2 */
  if (event.should_commit()) {
    oop obj = thread->current_park_blocker();
    event.set_klass((obj != NULL) ? obj->klass() : NULL);
    event.set_timeout(time);
    event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
    event.commit();
  }
UNSAFE_END
wz@wz-All-Series:~/openjdk/jdk8u-src$ grep -rn "class Parker" . 
./hotspot/src/share/vm/runtime/park.hpp:48:class Parker : public os::PlatformParker {

class Parker : public os::PlatformParker {
private:
  volatile int _counter ;   //计数
  Parker * FreeNext ;      //指向下一个Parker
  JavaThread * AssociatedWith ; // 指向parker所属的线程。
 
public:
  Parker() : PlatformParker() {
    _counter       = 0 ;    //初始化为0
    FreeNext       = NULL ;
    AssociatedWith = NULL ;
  }
protected:
  ~Parker() { ShouldNotReachHere(); }
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();
 
  // Lifecycle operators
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;
 
};

class PlatformParker : public CHeapObj<mtInternal> {
  protected:
    enum {
        REL_INDEX = 0,
        ABS_INDEX = 1
    };
    int _cur_index;  // 条件变量数组下标,which cond is in use: -1, 0, 1
    pthread_mutex_t _mutex [1] ;  //pthread互斥锁
    pthread_cond_t  _cond  [2] ; // pthread条件变量数组,一个用于相对时间,一个用于绝对时间。
 
  public:       // TODO-FIXME: make dtor private
    ~PlatformParker() { guarantee (0, "invariant") ; }
 
  public:
    PlatformParker() {
      int status;
      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
      assert_status(status == 0, status, "cond_init rel");
      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
      assert_status(status == 0, status, "cond_init abs");
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
      _cur_index = -1; // mark as unused
    }
};

可知park和unpark在linux平台是借助于pthread_mutex_t和pthread_cond_t实现的。

3.1 park

public native void park(boolean var1, long var2);

查看底层源码,位于http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/da3a1f729b2b/src/os/linux/vm/os_linux.cpp


void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return;

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
  if (Thread::is_interrupted(thread, false)) {
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }


  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt);

  // Don't wait if cannot get lock since interference arises from
  // unblocking.  Also. check interrupt before trying wait
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
    return;
  }

  int status ;
  if (_counter > 0)  { // no wait needed
    _counter = 0;
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == -1, "invariant");
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  _cur_index = -1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif

  _counter = 0 ;
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // Paranoia to ensure our locked and lock-free paths interact
  // correctly with each other and Java-level accesses.
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

park的流程如下:

  • step1.如果有许可可用,则将_counter原子地设置为0,并直接返回。 xchg返回的是旧的_counter;否则将没有许可可用。
  • step2.获取当前线程,如果当前线程设置了中断标志,则直接返回,因此如果在park前调用了interrupt就会直接返回。
  • step3.获取定时时间,安全点;如果中断或获取_mutex失败,则直接返回
  • step4.如果_counter大于0,说明unpark已经调用完成并且将_counter置为1。所以只需将_counter置为0,解锁返回。
  • step5.对于time = 0,pthread_cond_wait (&_cond[_cur_index], _mutex) 直接挂起;
    对于定时的,挂起指定的时间status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    ------------------------以下为线程被唤醒后操作------------------------------
  • step6.将_counter设置为0,解锁_mutex

3.2 unpark

public native void unpark(Object var1);

查看底层源码,位置为http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/da3a1f729b2b/src/os/linux/vm/os_linux.cpp

void Parker::unpark() {
  int s, status ;
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ;
  s = _counter;
  _counter = 1;
  if (s < 1) {
    // thread might be parked
    if (_cur_index != -1) {
      // thread is definitely parked
      if (WorkAroundNPTLTimedWaitHang) {
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        // must capture correct index before unlocking
        int index = _cur_index;
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        status = pthread_cond_signal (&_cond[index]);
        assert (status == 0, "invariant");
      }
    } else {
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  } else {
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}
  • step1.对_mutex加锁,并将_counter置为1。
  • step2.如果之前的_counter为0则说明调用了park或者为初始状态(此时为0且没有调用park)。
  • step2-1.当前parker对应的线程挂起了。因为_cur_index初始化为-1,且线程唤醒后也会重置为-1。
    调用pthread_cond_signal (&_cond[_cur_index])
    调用pthread_mutex_unlock(_mutex)
  • step2-2.没有线程在等待条件变量,则直接解锁
    pthread_mutex_unlock(_mutex);
  • step3.如果之前的_counter为1,则说明线程调用了一次或多次unpark但是没调用park,则直接解锁。

3.3 总结

  • park和unpark和核心就是_counter、 _cur_index、 _mutex和_cond。
    通过加锁_mutex对counter进行操作;
    通过_cond对线程进行挂起和唤醒操作。
  • park和unpark之间的调用先后顺序。unpark可在park之前,也可在park之后。
  • 在调用park的时候如果counter是0则会去执行挂起的流程,否则返回,在挂起恢复后再将counter置为0。
  • 在unpark的时候如果counter是0则会执行唤醒的流程,否则不执行唤醒流程,并且不管什么情况始终将counter置为1。
  • 注意在park里,调用pthread_cond_wait时,并没有用while来判断,所以posix condition里的"Spurious wakeup"一样会传递到上层Java的代码里(因为条件需要Java层才能提供)。这也就是为什么Java dos里提到需要注意虚假唤醒的情况。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335