澳门新葡萄京娱乐场 9

澳门新葡萄京娱乐场Java 线程同步原理探析

现如今,服务器性能日益增长,并发(concurrency)编程已经“深入人心”,但由于冯诺依式计算机“指令存储,顺序执行”的特性,使得编写跨越时间维度的并发程序异常困难,所以现代编程语言都对并发编程提供了一定程度的支持,像
Golang
里面的 Goroutines、Clojure
里面的 STM(Software Transactional
Memory)、Erlang
里面的 Actor。

澳门新葡萄京娱乐场 1
LockSupport在JDK源码中描述为:构建锁和其他同步类的基本线程阻塞原语,构建更高级别的同步工具集。LockSupport提供的park/unpark从线程的粒度上进行阻塞和唤醒,park/unpark模型真正解耦了线程之间的同步,线程之间不再需要一个Object或者其它变量来存储状态。

本文涵盖的知识点包括:

Java 对于并发编程的解决方案是多线程(Multi-threaded programming),而且
Java 中的线程 与 native
线程一一对应,多线程也是早期操作系统支持并发的方案之一(其他方案:多进程、IO多路复用)。

本文从阻塞唤醒的语义入手,解释LockSupport的内在机制和注意点,最后与Object的wait和notify做对比,包括以下内容:

Lock和synchronized的对比AbstractQueuedSynchronizer的实现原理ReentrantLock的实现原理LockSupport的使用

本文着重介绍 Java
中线程同步的原理、实现机制,更侧重操作系统层面,部分原理参考 openjdk
源码。阅读本文需要对
CyclicBarrier、CountDownLatch 有基本的使用经验。

  • 阻塞和唤醒的语义
  • 许可机制
  • 底层实现
  • 用法
  • 与Object的wait和notify区别

Lock和synchronized的对比

在Lock接口出现之前,Java应用程序只能依靠synchronized关键字来实现同步锁的功能,在JDK1.5以后,增加了JUC(java.util.concurrent)的并发包且提供了Lock接口用来实现锁的功能。Lock和synchronized的对比:Ø
从层次上,一个是类、一个是关键字Ø
从使用上,lock具备更大的灵活性,可以显式地控制锁的释放和获取;
而synchronized的锁的释放是被动的,当出现异常或者同步代码块执行完以后,才会释放锁Ø
lock可以判断锁的状态,而synchronized无法做到;lock可以实现公平锁、非公平锁,
而synchronized只有非公平锁。

澳门新葡萄京娱乐场,JUC

在 Java 1.5
版本中,引入 JUC 并发编程辅助包,很大程度上降低了并发编程的门槛,JUC
里面主要包括:

  • 线程调度的 Executors
  • 缓冲任务的 Queues
  • 超时相关的 TimeUnit
  • 并发集合(如 ConcurrentHashMap)
  • 线程同步类(Synchronizers,如 CountDownLatch )

个人认为其中最重要也是最核心的是线程同步这一块,因为并发编程的难点就在于如何保证「共享区域(专业术语:临界区,Critical
Section)的访问时序问题」。

阻塞的语义

阻塞是线程在满足某种条件之前暂时停止运行,而被动释放出CPU资源。进入该状态的线程不会主动进入线程队列等待CPU资源,而需要等待满足条件后被唤醒,才能让该线程重新进入到线程队列中排队等待CPU资源。一般造成线程阻塞的原因有:等待获取一个已经被其他线程持有的排他锁、等待某一操作结束、等待某一个时间段。线程阻塞后会被挂起,此时会处于BLOCKEDWAITINGTIMED_WAITING。线程阻塞后会让出CPU资源,这是为避免在自旋上浪费过多的CPU资源,是忙等待(busy
wait)的一种优化。

AbstractQueuedSynchronizer

Lock之所以能实现线程安全的锁,主要的核心是AQS(AbstractQueuedSynchronizer),AbstractQueuedSynchronizer提供了一个FIFO队列,可以看做是一个用来实现锁以及其他需要同步功能的框架。这里简称该类为AQS。AQS的使用依靠继承来完成,子类通过继承自AQS并实现所需的方法来管理同步状态。例如常见的ReentrantLock,CountDownLatch。

从使用上来说,AQS的功能可以分为两种:EXCLUSIVE独占模式和SHARED共享模式。独占模式下,每次只能有一个线程持有锁,比如ReentrantLock就是以独占方式实现的互斥锁。共享模式下,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock中的读锁。

AQS依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。Node的主要属性如下

static final class Node { int waitStatus; //表示节点的状态,包含cancelled;condition 表示节点在等待condition也就是在condition队列中 Node prev; //前继节点 Node next; //后继节点 Node nextWaiter; //存储在condition队列中的后继节点 Thread thread; //当前线程}

AQS类底层的数据结构是使用双向链表,是队列的一种实现。包括一个head节点和一个tail节点,分别表示头结点和尾节点,其中头结点不存储Thread,仅保存next结点的引用。

澳门新葡萄京娱乐场 2

AQS提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node
expect,Nodeupdate),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

澳门新葡萄京娱乐场 3AQS遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。澳门新葡萄京娱乐场 4

AQS中,除了本身的链表结构以外,还有一个很关键的功能,就是CAS,这个是保证在多线程并发的情况下保证线程安全的前提下去把线程加入到AQS中的方法,可以简单理解为乐观锁。

private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update);}

这个方法里面,首先用到了unsafe类(Unsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;Unsafe可认为是Java中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等)。

AbstractQueuedSynchronizer

JUC 提供的同步类主要有如下几种:

  • Semaphore is a classic concurrency tool.
  • CountDownLatch is a very simple yet very common utility for
    blocking until a given number of signals, events, or conditions
    hold.
  • CyclicBarrier is a resettable multiway synchronization point
    useful in some styles of parallel programming.
  • Phaser provides a more flexible form of barrier that may be used
    to control phased computation among multiple threads.
  • An Exchanger allows two threads to exchange objects at a
    rendezvous(约会) point, and is useful in several pipeline designs.

通过阅读其源码可以发现,其实现都基于 AbstractQueuedSynchronizer 这个抽象类(一般简写
AQS),正如其 javadoc 开头所说:

Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues. This class is designed to be a
useful basis for most kinds of synchronizers that rely on a single
atomic int value to represent state.

也就是说,AQS 通过维护内部的 FIFO 队列和具备原子更新的整型 state
这两个属性来实现各种锁机制,包括:是否公平,是否可重入,是否共享,是否可中断(interrupt),并在这基础上,提供了更方便实用的同步类,也就是一开始提及的
Latch、Barrier 等。

这里暂时不去介绍 AQS 实现细节与如何基于 AQS
实现各种同步类(挖个坑),感兴趣的可以移步美团的一篇文章《不可不说的Java“锁”事》 第六部分“独享锁
VS 共享锁”。

在学习 Java
线程同步这一块时,对我来说困扰最大的是「线程唤醒」,试想一个已经
wait/sleep/block 的线程,是如何响应 interrupt 的呢?当调用 Object.wait()
或 lock.lock() 时,JVM 究竟做了什么事情能够在调用 Object.notify 或
lock.unlock 时重新激活相应线程?

带着上面的问题,我们从源码中寻找答案。

许可机制

LockSupport通过“许可”(permit)机制,使用park/unpark实现线程的阻塞和唤醒。许可是指允许线程继续执行,是线程执行的开关,当开关关闭时,线程会阻塞,当开关打开时,线程会立即执行。

park意指线程在获取许可之前会暂停执行(阻塞在获取许可)。这里的“许可”与线程相关联,类似二元信号量,不可叠加且一个线程只能有一个。有些文章描述“许可”是一次性的,例如当线程A调用park消耗掉一个“许可”(最多只有一个“许可”),在未调用unpark释放出线程A的该“许可”之前,线程A再次调用park时会阻塞在获取“许可”。下文引自Understanding
JVM Thread
States

there can be only one permit per thread, when thread consumes the
permit, it disappears.

出于线程调度的目的,调用park时会阻塞直到许可可用时。如果许可可用,调用park就会立即返回。当前线程就会阻塞,直到调用
unpark 方法,释放出许可。由于许可是默认被占用的,当前线程在启动后调用
park 的话就获取不到许可,因此就进入阻塞状态。

ReentrantLock的实现原理分析

之所以叫重入锁是因为同一个线程如果已经获得了锁,那么后续该线程调用lock方法时不需要再次获取锁,也就是不会阻塞;重入锁提供了两种实现,一种是非公平的重入锁,另一种是公平的重入锁。怎么理解公平和非公平呢?如果在绝对时间上,先对锁进行获取的请求一定先被满足获得锁,那么这个锁就是公平锁,反之,就是不公平的。简单来说公平锁就是等待时间最长的线程最优先获取锁。非公平锁的实现流程时序图

澳门新葡萄京娱乐场 51、调用ReentrantLock.lock,这个是获取锁的入口,调用了sync.lock。
sync是一个实现了AQS的抽象类,这个类的主要作用是用来实现同步控制的,并且sync有两个实现,一个是NonfairSync、另一个是FailSync。2、先分析非公平锁的情况NonfairSync.lock

final void lock() { if (compareAndSetState //这是跟公平锁的主要区别,一上来就试探锁是否空闲,如果可以插队,则设置获得锁的线程为当前线程//exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用同步状态的线程 setExclusiveOwnerThread(Thread.currentThread; else acquire; //尝试去获取锁}

compareAndSetState通过cas算法去改变state的值,而这个states是AQS中存在一个变量,对于ReentrantLock来说,如果state=0表示无锁状态、如果state>0表示有锁状态。由于ReentrantLock是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(即exclusiveOwnerThread==Thread.currentThread,每次加锁都会将state的值+1,state等于几,就代表当前持有锁的线程加了几次锁;解锁时每解一次锁就会将state减1,state减到0后,锁就被释放掉,这时其它线程可以加锁。3、AbstractQueuedSynchronizer.acquire如果CAS操作未能成功,说明state已经不为0,此时继续acquire操作,acquire是AQS中的方法
当多个线程同时进入这个方法时,首先通过cas去修改state的状态,如果修改成功表示竞争锁成功,竞争失败的,tryAcquire会返回false。

public final void acquire { if (!tryAcquire && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

4、NonfairSync.tryAcquirenofairTryAcquire这里可以看非公平锁的含义,即获取锁并不会严格根据争用锁的先后顺序决定。

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //获取当前的状态,前面讲过,默认情况下是0表示无锁状态 if  { if (compareAndSetState(0, acquires)) { //通过cas来改变state状态的值,如果更新成功,表示获取锁成功, 这个操作外部方法lock()就做过一次,这里再做只是为了再尝试一次,尽量以最简单的方式获取锁。 setExclusiveOwnerThread; return true; } } else if (current == getExclusiveOwnerThread {//如果当前线程等于获取锁的线程,表示重入,直接累加重入次数 int nextc = c + acquires; if (nextc < 0) // overflow 如果这个状态值越界,抛出异常;如果没有越界,则设置后返回true throw new Error("Maximum lock count exceeded"); setState; return true; }如果状态不为0,且当前线程不是owner,则返回false。 return false; //获取锁失败,返回false}

5、addWaiter当前锁如果已经被其他线程锁持有,那么当前线程来去请求锁的时候,会进入这个方法,这个方法主要是把当前线程封装成node,添加到AQS的链表中。6、acquireQueuedaddWaiter返回了插入的节点,作为acquireQueued方法的入参,这个方法主要用于争抢锁。

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for  { final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出NullPointException if (p == head && tryAcquire {// 如果前驱为head才有资格进行锁的抢夺 setHead; // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点//凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null p.next = null; // help GC failed = false; //获取锁成功 return interrupted; }//如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程 if (shouldParkAfterFailedAcquire && parkAndCheckInterrupt// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标志 interrupted = true; } } finally { if  // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作 cancelAcquire; }}

原来的head节点释放锁以后,会从队列中移除,原来head节点的next节点会成为head节点.

澳门新葡萄京娱乐场 6

Java 如何实现堵塞、通知

底层实现

LockSupport是使用Unsafe的park实现的,HotSpot
Parker用condition和mutex维护了一个_counter变量,park时,变量_counter置为0,unpark时,变量_counter置为1。park操作检查该值是否为1,为1直接返回;不为1,则阻塞。

公平锁和非公平锁的区别

锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
在上面分析的例子来说,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点有两个:FairSync.tryAcquire:非公平锁在获取锁的时候,会先通过CAS进行抢占,而公平锁则不会

final void** lock() { acquire;}

FairSync.tryAcquire:这个方法与nonfairTryAcquire(int
acquires)比较,不同的地方在于判断条件多了hasQueuedPredecessors()方法,也就是加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

protected final boolean* tryAcquire(int acquires) { final Thread current = Thread.currentThread*(); int c = getState(); if  { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread; return true; } } else if (current == getExclusiveOwnerThread { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState; return true; } return false;}

wait/notify

public final native void wait(long timeout) throws InterruptedException;
public final native void notify();

在 JDK 源码中,上述两个方法均用 native 实现(即 cpp 代码),追踪相关代码

// java.base/share/native/libjava/Object.c
static JNINativeMethod methods[] = {
    {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
    {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
    {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
    {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
    {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
};

通过上面的 cpp 代码,我们大概能猜出 JVM 是使用 monitor 来实现的
wait/notify 机制,至于这里的 monitor 是何种机制,这里暂时跳过,接着看
lock 相关实现

用法

看到一个关于park/unpark通俗易懂的的例子,代码如下:

 1 public static void main(String[] args) throws InterruptedException {
 2   Thread threadA = new Thread(new Runnable() {
 3     @Override
 4     public void run() {
 5       System.out.println("周末了我在打游戏");
 6       LockSupport.park();
 7       System.out.println("陪女朋友逛逛街");
 8     }
 9   });
10   threadA.start();
11   Thread.sleep(3000);
12   System.out.println("女朋友准备要喊男朋友逛街");
13   LockSupport.unpark(threadA);
14 }

在第6行park执行操作时,线程尝试获取许可,由于线程threadA在启动后默认已经获取了许可,park必须等待许可释放后才可以执行。当主线程调用unpark方法释放threadA的许可,threadA才可以继续执行第7行。 

LockSupport

LockSupport类是JDK1.6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time); 

LockSupport中的park() 和 unpark()
的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend
和 Thread.resume所可能引发的死锁”问题。因为park() 和
unpark()有许可的存在。

1、
pack时:如果许可存在,那么将这个许可使用掉,并且立即返回。如果许可不存在,那么挂起当前线程。2、
unpack时:设置线程许可为可用。如果线程当前已经被pack挂起,那么这个线程将会被唤醒。如果线程当前没有被挂起,那么下次调用pack不会挂起线程。注意,unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。

在使用LockSupport之前,我们对线程做同步,只能使用wait和notify,但是wait和notify其实不是很灵活,并且耦合性很高,调用notify必须要确保某个线程处于wait状态,而park/unpark模型真正解耦了线程之间的同步,先后顺序没有没有直接关联,同时线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。

park/unpark以线程作为方法的参数,支持指定线程唤醒。wait/notify针对的是对象,通知实际上是不知道唤醒具体哪个线程的,要不随机唤醒一个线程要不唤醒所有的(notifyAll)。

lock/unlock

LockSupport 是用来实现堵塞语义模型的基础辅助类,主要有两个方法:park 与
unpark。(在英文中,park 除了“公园”含义外,还有“停车”的意思)

// LockSupport.java
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
// Unsafe.java
    /**
     * Unblocks the given thread blocked on {@code park}, or, if it is
     * not blocked, causes the subsequent call to {@code park} not to
     * block.  Note: this operation is "unsafe" solely because the
     * caller must somehow ensure that the thread has not been
     * destroyed. Nothing special is usually required to ensure this
     * when called from Java (in which there will ordinarily be a live
     * reference to the thread) but this is not nearly-automatically
     * so when calling from native code.
     *
     * @param thread the thread to unpark.
     */
    @HotSpotIntrinsicCandidate
    public native void unpark(Object thread);

    /**
     * Blocks current thread, returning when a balancing
     * {@code unpark} occurs, or a balancing {@code unpark} has
     * already occurred, or the thread is interrupted, or, if not
     * absolute and time is not zero, the given time nanoseconds have
     * elapsed, or if absolute, the given deadline in milliseconds
     * since Epoch has passed, or spuriously (i.e., returning for no
     * "reason"). Note: This operation is in the Unsafe class only
     * because {@code unpark} is, so it would be strange to place it
     * elsewhere.
     */
    @HotSpotIntrinsicCandidate
    public native void park(boolean isAbsolute, long time);

// hotspot/share/prims/unsafe.cpp
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
  HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
  EventThreadPark event;

  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
  if (event.should_commit()) {
    post_thread_park_event(&event, thread->current_park_blocker(), time);
  }
  HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END

通过上述 unsafe.cpp 可以看到每个 thread 都会有一个 Parker
对象,所以我们需要查看 parker 对象的定义

// hotspot/share/runtime/park.hpp
class Parker : public os::PlatformParker
...
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();

// hotspot/os/posix/os_posix.hpp
class PlatformParker : public CHeapObj<mtInternal> {
 protected:
  enum {
    REL_INDEX = 0,
    ABS_INDEX = 1
  };
  int _cur_index;  // which cond is in use: -1, 0, 1
  pthread_mutex_t _mutex[1];
  pthread_cond_t  _cond[2]; // one for relative times and one for absolute
  ...
};

看到这里大概就能知道 park
是使用 pthread_mutex_t 与 pthread_cond_t 实现。好了,到目前为止,就引出了
Java 中与堵塞相关的实现,不难想象,都是依赖底层操作系统的功能。

与wait和notify区别

park/unpark与wati/notify都提供阻塞唤醒的功能,用做线程间同步,不过两者
的粒度不同,park/unpark作用在线程上,而wait/notify作用在对象上,二者没有交集。Object的wait/notify使用前必须获取对象的监视器,而park/unpark不需要。

写作不易,痛并快乐着;理解可能存在偏差,句句斟酌推敲;抵制抄袭,践行原创技术之路。如果本文能对您有所帮助,实为荣幸,我是葛一凡。

原文
澳门新葡萄京娱乐场 7微信公众号

OS 支持的同步原语

参考

  1. Java的LockSupport.park()实现分析
  2. java线程阻塞中断和LockSupport的常见问题
  3. 多线程之Java线程阻塞与唤醒
  4. Understanding JVM Thread
    States
  5. java并发包系列—LockSupport
  6. Java并发包源码学习之AQS框架(三)LockSupport和interrupt

Semaphore

并发编程领域的先锋人物 Edsger Dijkstra(没错,也是最短路径算法的作者)在
1965 年首次提出了信号量( Semaphores)
这一概念来解决线程同步的问题。信号量是一种特殊的变量类型,为非负整数,只有两个特殊操作PV:

  • P(s) 如果 s!=0,将 s-1;否则将当前线程挂起,直到 s 变为非零
  • V(s) 将 s+1,如果有线程堵塞在 P 操作等待 s 变成非零,那么 V
    操作会重启这些线程中的任意一个

注:Dijkstra 为荷兰人,名字 P 和 V 来源于荷兰单词
Proberen(测试)和Verhogen(增加),为方便理解,后文会用 Wait 与 Signal
来表示。

struct semaphore {
     int val;
     thread_list waiting;  // List of threads waiting for semaphore
}
wait(semaphore Sem):    // Wait until > 0 then decrement
  // 这里用的是 while 而不是 if
  // 这是因为在 wait 过程中,其他线程还可能继续调用 wait
  while (Sem.val <= 0) {
    add this thread to Sem.waiting;
    block(this thread);
  }
  Sem.val = Sem.val - 1;
return;

signal(semaphore Sem):// Increment value and wake up next thread
     Sem.val = Sem.val + 1;
     if (Sem.waiting is nonempty) {
         remove a thread T from Sem.waiting;
         wakeup(T);
     }

有两点注意事项:

  1. wait 中的「测试和减 1 操作」,signal 中的「加 1
    操作」需要保证原子性。一般来说是使用硬件支持的 read-modify-write
    原语,比如
    test-and-set/fetch-and-add/compare-and-swap,除了硬件支持外,还可以用 busy
    wait 的软件方式来模拟。
  2. signal
    中没有定义重新启动的线程顺序,也即多个线程在等待同一信号量时,无法预测重启哪一个线程

使用场景

信号量为控制并发程序的执行提供了强有力工具,这里列举两个场景:

互斥

信号量提供了了一种很方便的方法来保证对共享变量的互斥访问,基本思想是

将每个共享变量(或一组相关的共享变量)与一个信号量 s
(初始化为1)联系起来,然后用 wait/signal 操作将相应的临界区包围起来。

二元信号量也被称为互斥锁(mutex,mutual exclusve, 也称为 binary
semaphore),wait 操作相当于加锁,signal 相当于解锁。
一个被用作一组可用资源的计数器的信号量称为计数信号量(counting
semaphore)

调度共享资源

除了互斥外,信号量的另一个重要作用是调度对共享资源的访问,比较经典的案例是生产者消费者,伪代码如下:

emptySem = N
fullSem = 0
// Producer
while(whatever) {
    locally generate item
    wait(emptySem)
    fill empty buffer with item
    signal(fullSem)
}
// Consumer
while(whatever) {
    wait(fullSem)
    get item from full buffer
    signal(emptySem)
    use item
}

POSIX 实现

POSIX
标准中有定义信号量相关的逻辑,在 semaphore.h 中,为
sem_t 类型,相关 API:

// Intialize: 
sem_init(&theSem, 0, initialVal);
// Wait: 
sem_wait(&theSem);
// Signal: 
sem_post(&theSem);
// Get the current value of the semaphore:       
sem_getvalue(&theSem, &result);

信号量主要有两个缺点:

  • Lack of structure,在设计大型系统时,很难保证 wait/signal
    能以正确的顺序成对出现,顺序与成对缺一不可,否则就会出现死锁!
  • Global visiblity,一旦程序出现死锁,整个程序都需要去检查

解决上述两个缺点的新方案是监控器(monitor)。

Monitors

C. A. R. Hoare 在 1974
年的论文 Monitors: an operating system structuring
concept 首次提出了「监控器」概念,它提供了对信号量互斥和调度能力的更高级别的抽象,使用起来更加方便,一般形式如下:

monitor1 . . . monitorM
process1 . . . processN

我们可以认为监控器是这么一个对象:

  • 所有访问同一监控器的线程通过条件变量(condition variables)间接通信
  • 某一个时刻,只能有一个线程访问监控器

Condition variables

上面提到监控器通过条件变量(简写
cv)来协调线程间的通信,那么条件变量是什么呢?它其实是一个 FIFO
的队列,用来保存那些因等待某些条件成立而被堵塞的线程,对于一个条件变量 c
来说,会关联一个断言(assertion) P。线程在等待 P
成立的过程中,该线程不会锁住该监控器,这样其他线程就能够进入监控器,修改监控器状态;在
P 成立时,其他线程会通知堵塞的线程,因此条件变量上主要有三个操作:

  1. wait(cv, m) 等待 cv 成立,m 表示与监控器关联的一 mutex 锁
  2. signal(cv) 也称为 notify(cv) 用来通知 cv
    成立,这时会唤醒等待的线程中的一个执行。根据唤醒策略,监控器分为两类:Hoare
    vs. Mesa,后面会介绍
  3. broadcast(cv) 也称为 notifyAll(cv) 唤醒所有等待 cv 成立的线程
POSIX 实现

在 pthreads 中,条件变量的类型是 pthread_cond_t,主要有如下几个方法:

// initialize
pthread_cond_init() 
pthread_cond_wait(&theCV, &someLock);
pthread_cond_signal(&theCV);
pthread_cond_broadcast(&theCV);

使用方式

在 pthreads 中,所有使用条件变量的地方都必须用一个 mutex
锁起来,这是为什么呢?看下面一个例子:

pthread_mutex_t myLock;
pthread_cond_t myCV;
int count = 0;

// Thread A
pthread_mutex_lock(&myLock);
while(count < 0) {
    pthread_cond_wait(&myCV, &myLock);
}
pthread_mutex_unlock(&myLock);

// Thread B

pthread_mutex_lock(&myLock);
count ++;
while(count == 10) {
    pthread_cond_signal(&myCV);
}
pthread_mutex_unlock(&myLock);

如果没有锁,那么

  • 线程 A 可能会在其他线程将 count 赋值为10后继续等待
  • 线程 B 无法保证加一操作与测试 count 是否为零 的原子性

这里的关键点是,在进行条件变量的 wait
时,会释放该锁,以保证其他线程能够将之唤醒。不过需要注意的是,在线程 B
通知(signal) myCV 时,线程 A 无法立刻恢复执行,这是因为 myLock
这个锁还被线程 B 持有,只有在线程 B unlock(&myLock) 后,线程 A
才可恢复。总结一下:

  1. wait 时会释放锁
  2. signal 会唤醒等待同一 cv 的线程
  3. 被唤醒的线程需要重新获取锁,然后才能从 wait 中返回

Hoare vs. Mesa 监控器语义

在上面条件变量中,我们提到 signal 在调用时,会去唤醒等待同一 cv
的线程,根据唤醒策略的不同,监控器也分为两类:

  • Hoare 监控器(1974),最早的监控器实现,在调用 signal
    后,会立刻运行等待的线程,这时调用 signal
    的线程会被堵塞(因为锁被等待线程占有了)
  • Mesa 监控器(Xerox PARC, 1980),signal
    会把等待的线程重新放回到监控的 ready 队列中,同时调用 signal
    的线程继续执行。这种方式是现如今 pthreads/Java/C# 采用的

这两类监控器的关键区别在于等待线程被唤醒时,需要重新检查 P 是否成立。

澳门新葡萄京娱乐场 8

监控器工作示意图

上图表示蓝色的线程在调用监控器的 get 方式时,数据为空,因此开始等待
emptyFull 条件;紧接着,红色线程调用监控器的 set 方法改变 emptyFull
条件,这时

  • 按照 Hoare 思路,蓝色线程会立刻执行,并且红色线程堵塞
  • 按照 Mesa
    思路,红色线程会继续执行,蓝色线程会重新与绿色线程竞争与监控器关联的锁

Java 中的监控器

在 Java 中,每个对象都是一个监控器(因此具备一个 lock 与 cv),调用对象
o 的 synchronized 方法 m 时,会首先去获取 o 的锁,除此之外,还可以调用 o
的 wait/notify/notify 方法进行并发控制

Big Picture

澳门新葡萄京娱乐场 9

操作系统并发相关 API 概括图

Interruptible

通过介绍操作系统支持的同步原语,我们知道了 park/unpark、wait/notify
其实就是利用信号量( pthread_mutex_t)、条件变量( pthread_cond_t)实现的,其实监控器也可以用信号量来实现。在查看
AQS 中,发现有这么一个属性:

/**
 * The number of nanoseconds for which it is faster to spin
 * rather than to use timed park. A rough estimate suffices
 * to improve responsiveness with very short timeouts.
 */
static final long spinForTimeoutThreshold = 1000L;

也就是说,在小于 1000 纳秒时,await 条件变量 P
时,会使用一个循环来代替条件变量的堵塞与唤醒,这是由于堵塞与唤醒本身的操作开销可能就远大于
await 的 timeout。相关代码:

// AQS 的 doAcquireNanos 方法节选
for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return true;
    }
    nanosTimeout = deadline - System.nanoTime();
    if (nanosTimeout <= 0L)
        return false;
    if (shouldParkAfterFailedAcquire(p, node) &&
        nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
    if (Thread.interrupted())
        throw new InterruptedException();
}

在 JUC 提供的高级同步类中,acquire 对应 park,release 对应
unpark,interrupt 其实就是个布尔的 flag 位,在 unpark 被唤醒时,检查该
flag ,如果为 true,则会抛出我们熟悉的 InterruptedException。

Selector.select() 响应中断异常的逻辑有些特别,因为对于这类堵塞 IO
操作来说,没有条件变量的堵塞唤醒机制,我们可以再看下 Thread.interrupt
的实现

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

OpenJDK 使用了这么一个技巧来实现堵塞 IO
的中断唤醒:在一个线程被堵塞时,会关联一个 Interruptible 对象。
对于 Selector 来说,在开始时,会关联这么一个Interruptible
对象:

protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread target) {
                    synchronized (closeLock) {
                        if (closed)
                            return;
                        closed = true;
                        interrupted = target;
                        try {
                            AbstractInterruptibleChannel.this.implCloseChannel();
                        } catch (IOException x) { }
                    }
                }};
    }
    blockedOn(interruptor);
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

当调用 interrupt 方式时,会关闭该
channel,这样就会关闭掉这个堵塞线程,可见为了实现这个功能,代价也是比较大的。LockSupport.park
中采用了类似技巧。

总结

也许基于多线程的并发编程不是最好的(可能是最复杂的,Clojure 大法好
:-),但却是最悠久的。
即便我们自己不去写往往也需要阅读别人的多线程代码,而且能够写出“正确”(who
knows?)的多线程程序往往也是区分 senior 与 junior 程序员的标志,希望这篇文章能帮助大家理解
Java 是如何实现线程控制,有疑问欢迎留言指出,谢谢!

参考

  • Java的LockSupport.park()实现分析
  • 课件 COMP3151/9151 Foundations of Concurrency Lecture 6 –
    Semaphores, Monitors, POSIX Threads,
    Java
  • 课件 
  • 课件 
  • Mutexes and Semaphores
    Demystified

发表评论

电子邮件地址不会被公开。 必填项已用*标注