1. What?
1.1 原子操作
前提:Java语言最终也是也汇编的方式执行,所以从汇编的角度理解对变量的赋值操作过程:读取 -> 更新 -> 写入,在多线程下,赋值操作是不安全的。
原子操作是指“不可中断的一个或一系列操作”,所以jdk提供ReentrantLock,在多线程的情况下,让线程并行操作改为高效的串行操作。
2. Why?
为了在多线程的情况下,每个线程做修改操作,按一定顺序执行,得到准确的字段值。
3. How?
3.1 测试
首先测试在多线程中的自增操作是否真正的自增
为了更清楚的看到效果,自增时线程堵塞10毫秒,发现在100个线程对同一个count自增时,线程执行无规律,最终没有达到预期的值100;
3.2 原子锁的使用
对自增代码块加锁后,线程执行有序,最终实现了在多线程的情况下的自增操作
3.3 源码分析
AbstractQueuedSynchronizer简称AQS
非公平锁加锁的调用链
ReentrantLock$lock -> ReentrantLock.Sync.NonfairSync$lock -> AQS$acquire -> AQS$tryAcquire && AQS$addWaiter && AQS$acquireQueued
3.3.1 原子锁的创建
原子锁的默认是非公平锁,大概过程就是利用AQS的CLH数据结构来存储CAS修改状态失败的线程,当获取锁资源的线程执行同步代码后,释放锁,会唤醒CLH中等待的线程,非公平锁与公平锁的不同点在源码中会讲到(@see NonfairSync$lock)
- ReentrantLock构造器
//新建原子锁
private final ReentrantLock lock = new ReentrantLock();
// ReentrantLock无参构造器源码如下,所以默认为非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
// 有参构造器,公平锁创建:new ReentrantLock(true)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
3.3.2 加锁
- NonfairSync$lock
与公平锁的不同之处多了如下代码中的步骤1,即非公平锁让后进入的线程先cas修改state,修改成功获取锁,不会按照申请锁的顺序执行
//非公平加锁逻辑
final void lock() {
// 1. cas操作,将state值由 0 改为 1
if (compareAndSetState(0, 1))
// 1.1 修改state成功,则获取锁资源,并设置当前线程为独占模式,即AQS的获取锁资源的线程是本线程
setExclusiveOwnerThread(Thread.currentThread());
// 2. 修改失败
else
// 2.1 若cas操作失败,则再次获取锁资源
acquire(1);//@See AQS$acquire
}
- AQS$acquire
public final void acquire(int arg) {
// 1. 尝试获取锁资源,若获取成功,则返回true,则不会执行后面的操作
if (!tryAcquire(arg) // @see Sync$nonfairTryAcquire
&&
// 1.1 若失败,则addWaiter 新建节点到队列中
// @see Sync$nonfairTryAcquire and AQS$addWaiter
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 1.2 中断当前线程
selfInterrupt();
}
- Sync$nonfairTryAcquire
tryAcquire最终调用nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
// 1. 获取当前线程
final Thread current = Thread.currentThread();
// 2. 获取 state 状态
int c = getState();
// 3. 若state==0,则cas操作,若成功,则当前线程独占
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 4. 若当前线程已是独占,则可重入锁(获取锁资源的线程可多次执行同步代码块),state自增
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 5. 失败,返回false
return false;
}
- AQS$addWaiter
添加尾节点过程:
过程 | 说明 |
---|---|
a. O <-p n-> O(tail) | 原队列 |
b. O <-p n-> O(tail) <- O(new) | 新建节点,并前置指向尾节点 |
c. O <-p n-> O <-p O(tail,new) | cas操作将新建的节点改为尾节点 |
d. O <-p n-> O <-p n-> O(tail,new) | 最后新建的节点(tail) 的后置指向 原尾节点 |
O:节点,O(tail):尾节点,O(new):新添加的节点,O(tail,new):新节点设置为尾节点,<-p n-> : 表示双向链表,p代表前置,n代表后置
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 1. 尾节点不为空
if (pred != null) {
// 1.1 则新建的节点的前置指向尾节点
node.prev = pred;
// 1.2 cas修改新建的节点为尾节点
if (compareAndSetTail(pred, node)) {
// 设置原来的尾节点的下一节点为当前节点(现尾节点)
pred.next = node;
return node;
}
}
// 2. 将节点插入队列,必要时进行初始化
// @see AQS$enq
enq(node);
return node;
}
- AQS$enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 1. 尾节点为空,初始化头节点,head赋值到tail
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
// 2. 若尾节点不为空
} else {
// 2.1 新建的节点的前置指向尾节点
node.prev = t;
// 2.2 修改新建节点为尾节点
if (compareAndSetTail(t, node)) {
// 成功后原尾节点的后置指向新节点(tail)
t.next = node;
return t;
}
// 2.3 cas操作失败自旋
}
}
}
- AQS$acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前置节点
final Node p = node.predecessor();
// 1. 若是头节点 并且 同步状态成功
if (p == head && tryAcquire(arg)) {
// 1.1 成功后,设置新建节点为头节点
setHead(node);
// 1.2 回收原头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2. 如果前驱节点不是头节点,或者同步状态失败了,则park当前线程并且state=-1,等待前驱节点unpark
if (shouldParkAfterFailedAcquire(p, node) &&
// 2.1 park当前线程并状态为 interrupted
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 3. 若失败 取消正在进行的获取尝试,唤起<0的后置节点
if (failed)
// @see AQS$cancelAcquire
cancelAcquire(node);
}
}
- AQS$cancelAcquire
private void cancelAcquire(Node node) {
// 1. Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 2. Skip cancelled predecessors
Node pred = node.prev;
// 3. 删除前置节点为cancelled的
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// 4. If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
// 5. 如果不是尾节点,或cas修改尾节点失败
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 5.1 前置是头节点 并且(前置的状态是SIGNAL,或者前置的状态<=0,则修改状态为SIGNAL)并且前置的线程不为空
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 5.1.1 后置节点不为空并且状态<=0(SIGNAL)
if (next != null && next.waitStatus <= 0)
// 修改后置为前置?
compareAndSetNext(pred, predNext, next);
} else {
// 5.2 唤醒
// @see AQS$unparkSuccessor
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- AQS$unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
// 1. 若状态为<0(SIGNAL),则cas修改状态为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 2. 下一节点为空或者状态大于0
if (s == null || s.waitStatus > 0) {
s = null;
// 2.1 从尾节点遍历找到 <=0 的节点(SIGNAL),若不为空则唤起
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 3. 后置不为空,则唤起后置的线程
if (s != null)
LockSupport.unpark(s.thread);
}
3.3.2 解锁
- ReentrantLock$unlock
释放锁资源,即state自减,唤醒CLH中等待的线程
public void unlock() {
// sate减1操作
sync.release(1);
}
public final boolean release(int arg) {
// 1. 尝试释放锁
// @see Sync$tryRelease
if (tryRelease(arg)) {
Node h = head;
// 1.1 头节点不为空,且状态不为0时
if (h != null && h.waitStatus != 0)
// 唤起
// @see AQS$unparkSuccessor
unparkSuccessor(h);
return true;
}
return false;
}
- Sync$tryRelease
protected final boolean tryRelease(int releases) {
// 1. state自减
int c = getState() - releases;
// 2. 若当前线程不是AQS中的独占线程,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 3. c等于0时,即释放锁成功,注:可重入锁state>1,即需要再次释放
if (c == 0) {
free = true;
// 3.1 释放独占线程
setExclusiveOwnerThread(null);
}
// 4. 赋值state
setState(c);
return free;
}
建议:原子锁的源码中涉及到双向链表的添加与删除操作,建议在阅读源码时在纸上画出流程,加强理解