AbstractQueuedSynchronizer 就是那个大名鼎鼎的 AQS,是java.util.concurrent包下同步器的核心。
CLH(Craig, Landin, and Hagersten)锁
使用队列的方式来解决n个线程来争夺m把锁的问题,每当一个新的线程需要获取锁,为其创建一个节点并放到队尾,如果该线程是队列中的第一个节点,则节点的locked设置成false,如果它不是队列的第一个节点,则它的节点的prev指向原来的队尾节点,并不断自旋查看prev指向节点的locked属性,如果该值变为false,表示轮到它来尝试获取锁了,如果获取成功并最终用完释放后,则将自己的locked设置成false,如果获取失败,locked值不变,还是true,并不断尝试获取锁。
也就是说,每个节点只需要关心前置节点的locked状态,可以发现CLH实现锁的获取是公平的。
Node节点
Node节点维护了双向节点和当前节点状态和线程引用。
static final class Node {volatile Node prev;volatile Node next;volatile Thread thread;volatile int waitStatus; // SIGNAL CANCELLED CONDITION PROPAGATECANCELLED,值为1,表示当前的线程被取消;SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;值为0,表示当前节点在sync队列中,等待着获取锁。}
FIFO队列
* +------+ prev +-----+ +-----+ * head | | <---- | | <---- | | tail * +------+ +-----+ +-----+
核心属性
- private transient volatile Node head; 等待队列首元素;
- private transient volatile Node tail; 等待队列尾元素;
- private volatile int state; 同步状态;
需要在锁定时,需要维护一个状态(int类型),而对状态的操作是原子和非阻塞的,通过同步器提供的对状态访问的方法对状态进行操纵,并基于Unsafe的原子操作来修改state的状态,compareAndSet来确保原子性的修改。
获取/设置当前的同步状态:
protected final int getState() { return state; }protected final void setState(int newState) { state = newState; }
对同步状态对修改采用原子操作:
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
入队操作,如果队列为空则实例化节点,否则插入队尾:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
- protected boolean tryAcquire(int arg) 排它的获取这个状态。这个方法的实现需要查询当前状态是否允许获取,然后再进行获取(使用compareAndSetState来做)状态。
- protected boolean tryRelease(int arg) 释放状态。
- protected int tryAcquireShared(int arg) 共享的模式下获取状态。
- protected boolean tryReleaseShared(int arg) 共享的模式下释放状态。
- protected boolean isHeldExclusively() 在排它模式下,状态是否被占用。
获取锁: 需要获取当前节点的前驱节点,并且头结点能够获取状态,代表能够占有锁。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
这个方法在非公平实现中,主要是通过AQS的state来检查和维护锁状态,如果state是0,说明没有线程占有这个锁,如果不为0并且锁的占有线程是当前线程,则是重入的情况,均可以获得锁并修改state值。
如果是首次获得锁,则设置锁占有线程为当前线程。
当然,如果前面两种情况都不满足,说明尝试获得锁失败,需要做前面段落所述的队列操作,创建一个等待结点并进入循环,循环中的park()调用挂起当前线程。
否则将当前线程挂起:
LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
释放锁
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
如果修改state值成功,则找到队列中应该唤起的结点,对节点中的线程调用unpark()方法,恢复线程执行。