1. 简介 AbstractQueuedSynchronizer
(抽象队列同步器)简称AQS,是一个用来构建同步组件的基础框架,JDK中java.util.concurrent
这个包的许多同步组件都是基于AQS来构建的,如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock等。AQS运用了模板方法的设计模式,使我们只需重写AQS的tryAcquire
、tryRelease
等方法来快速构造一个同步组件,上面提到的同步组件都是用这种方式创建出来的。比如独占锁这种工具我们只需重写以下3个方法:
tryAcquire
:尝试获取同步状态
tryRelease
:尝试释放同步状态
isHeldExclusively
:是否为独占状态
2. 实现 类继承关系 AbstractOwnableSynchronizer
这个类提供了设置和获取独占同步状态的线程的方法
1 2 3 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io .Serializable {}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public abstract class AbstractOwnableSynchronizer implements java .io .Serializable { private static final long serialVersionUID = 3737899427754241961L ; protected AbstractOwnableSynchronizer () { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread (Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread () { return exclusiveOwnerThread; } }
等待队列 队列同步器的原理是维持一个同步状态和一个FIFO等待队列,获取到同步状态的线程会存在于队列头结点,而获取同步状态失败的线程会插入到队列尾部并且不断自旋检查是否可以获取同步状态,尝试获取失败后会判断是否需要阻塞,阻塞直到释放同步状态的线程按顺序唤醒阻塞线程直到获取到同步状态退出。
AQS等待队列
结点类型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private transient volatile Node head;private transient volatile Node tail;private volatile int state;protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
acquire方法 阻塞获取独占式同步状态。源码中的tryAcquire方法需要在子类中根据需求重写。
acquire过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
release方法 释放独占式同步状态。源码中的tryRelease方法需要在子类中根据需求重写。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
acquireInterruptibly方法 上面讲到的acquire
方法是阻塞式获取同步状态,不支持获取过程中响应中断抛出中断异常,如果需要支持阻塞获取同步状态过程中可以响应中断抛出中断异常可以使用acquireInterruptibly
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; 、、 if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } }
tryAcquireNanos方法 该方法通过设置超时时间nanosTimeout
,当在指定超时时间内未获取到同步状态将返回false,在获取过程中支持响应中断并抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public final boolean tryAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } private boolean doAcquireNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return true ; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) return false ; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
最后更新时间:2018-04-26 22:58:59