1. 简介

AbstractQueuedSynchronizer(抽象队列同步器)简称AQS,是一个用来构建同步组件的基础框架,JDK中java.util.concurrent这个包的许多同步组件都是基于AQS来构建的,如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock等。AQS运用了模板方法的设计模式,使我们只需重写AQS的tryAcquiretryRelease等方法来快速构造一个同步组件,上面提到的同步组件都是用这种方式创建出来的。比如独占锁这种工具我们只需重写以下3个方法:

  • tryAcquire:尝试获取同步状态
  • tryRelease:尝试释放同步状态
  • isHeldExclusively:是否为独占状态

2. 实现

类继承关系

AbstractOwnableSynchronizer这个类提供了设置和获取独占同步状态的线程的方法

1
2
3
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {

/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;

protected AbstractOwnableSynchronizer() { }

private transient Thread exclusiveOwnerThread;

protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}

等待队列

队列同步器的原理是维持一个同步状态和一个FIFO等待队列,获取到同步状态的线程会存在于队列头结点,而获取同步状态失败的线程会插入到队列尾部并且不断自旋检查是否可以获取同步状态,尝试获取失败后会判断是否需要阻塞,阻塞直到释放同步状态的线程按顺序唤醒阻塞线程直到获取到同步状态退出。

AQS等待队列
AQS等待队列

结点类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
static final class Node {
// 结点位于共享模式
static final Node SHARED = new Node();
// 结点位于独占模式
static final Node EXCLUSIVE = null;

// 以下4个状态是同步队列中结点线程的等待状态

// 取消状态,表示线程取消获取同步状态,可能是因为中断或超时取消
static final int CANCELLED = 1;
// 需要唤醒等待状态为SIGNAL的后继结点
static final int SIGNAL = -1;
// 等待某个条件
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 等待状态,值为0或以上4个状态
volatile int waitStatus;
// 当前结点的前驱结点
volatile Node prev;
// 当前结点的后驱结点
volatile Node next;
// 被结点包装的线程
volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 等待队列头结点
private transient volatile Node head;
// 等待队列尾结点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// 获取同步状态
protected final int getState() {
return state;
}
// 设置同步状态
protected final void setState(int newState) {
state = newState;
}
// 以CAS的方式设置同步状态
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

acquire方法

阻塞获取独占式同步状态。源码中的tryAcquire方法需要在子类中根据需求重写。

acquire过程
acquire过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public final void acquire(int arg) {
// 调用tryAcquire方法尝试获取同步状态,获取成功直接返回,获取失败调用addWaiter方法将当前线程封装为独占模式的等待结点加入等待队列,接着调用acquireQueued方法自旋获取同步状态。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


private Node addWaiter(Node mode) {
// 创建一个指定模式的包含当前线程的等待结点
Node node = new Node(Thread.currentThread(), mode);
// 当尾结点不为空,即队列不为空,使用CAS方法将新建结点添加到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}

// 队列为空或在刚才入队时出现CAS冲突执行enq方法
enq(node);
// 返回插入的新结点
return node;
}

// 该方法使用循环重试保证node一定会入队成功
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 队列为空,先新建一个空结点作为头结点,使用CAS方式插入
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else { // 否则队列不为空使用CAS方法添加结点node到队尾,失败则重试
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前结点的前驱结点
final Node p = node.predecessor();
// 如果前驱结点为头结点表示当前结点为等待队列中的第一个有资格获取同步状态的结点
// 接着尝试获取同步状态
if (p == head && tryAcquire(arg)) {
// 设置头结点为当前结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取同步状态失败调用shouldParkAfterFailedAcquire方法判断是否需要阻塞,
// 如果需要则调用parkAndCheckInterrupt方法阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点等待状态
int ws = pred.waitStatus;
// 如果前驱结点等待状态为SIGNAL则表示当前线程需要阻塞,直接返回true
if (ws == Node.SIGNAL)
return true;
// 如果前驱结点等待状态为CANCELLED,说明前驱结点取消获取锁,向前遍历链表,跳过等待状态为CANCELLED的结点直到找到一个等待状态不为CANCELLED的结点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 否则将前驱结点的等待状态使用CAS方式设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}


private final boolean parkAndCheckInterrupt() {
// 调用LockSupport.park方法阻塞当前线程,当释放同步状态的线程执行LockSupport.unpark方法唤醒当前线程或其他线程执行中断方法中断当前线程则会从LockSupport.park中返回
LockSupport.park(this);
// 返回中断状态并清除
return Thread.interrupted();
}

release方法

释放独占式同步状态。源码中的tryRelease方法需要在子类中根据需求重写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果头结点不为空,并且等待状态不为初始状态0则唤醒后继结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

// 唤醒后继结点
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
// 如果当前结点状态小于0则使用CAS方式将其修改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

// 获取后继结点
Node s = node.next;
// 如果后继结点为空或者取消获取同步状态则后续第一个等待状态不为CANCELLED的结点
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾结点开始向前遍历到node结点,遍历过程中记录最新的等待状态小于等于0的结点,该最新结点里的线程为需要唤醒的线程
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}

acquireInterruptibly方法

上面讲到的acquire方法是阻塞式获取同步状态,不支持获取过程中响应中断抛出中断异常,如果需要支持阻塞获取同步状态过程中可以响应中断抛出中断异常可以使用acquireInterruptibly方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 判断是否中断,是则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取同步状态,成功则返回,失败调用doAcquireInterruptibly方法
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

// 这个方法跟上面的acquireQueued方法基本一样,就是在被唤醒后检测到中断状态后会抛出中断异常
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 检测到中断,抛出中断异常
throw new InterruptedException();
}
} finally {
// 当failed为true时,表示检查到中断抛出异常,需要执行cancelAcquire方法取消获取同步状态
if (failed)
cancelAcquire(node);
}
}

private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// 向前遍历,跳过等待状态为CANCELLED的结点,直到找到一个等待状态不为CANCELLED的结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

Node predNext = pred.next;
//设置当前结点等待状态为CANCELLED
node.waitStatus = Node.CANCELLED;

// 如果node为尾结点则移除尾结点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 否则如果node前驱结点不为head结点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
、、
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {// 如果node前驱结点为head
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

tryAcquireNanos方法

该方法通过设置超时时间nanosTimeout,当在指定超时时间内未获取到同步状态将返回false,在获取过程中支持响应中断并抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 判断是否中断,是则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取同步状态,失败则调用doAcquireNanos方法超时获取
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算截止时间deadline
final long deadline = System.nanoTime() + nanosTimeout;
// 创建独占模式下的结点并入队
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 当前驱结点为head则尝试获取同步状态
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 计算还有多长时间到达截止时间
nanosTimeout = deadline - System.nanoTime();
// 如果超时时间小于0则已到达截止时间,返回false
if (nanosTimeout <= 0L)
return false;
// 判断是否要阻塞,并且超时时间大于自旋超时时间的阈值则阻塞,因为小于spinForTimeoutThreshold没必要阻塞,因为唤醒的时间可能超过该阈值
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 检查到中断则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}