3.3ReentrantLock源码分析
ReentrantLock类图
我们看一下重入锁ReentrantLock类关系图,它是实现了Lock接口的类。NonfairSync和FairSync都继承自抽象类Sync,在ReentrantLock中有非公平锁NonfairSync和公平锁FairSync的实现。
在重入锁ReentrantLock类关系图中,我们可以看到NonfairSync和FairSync都继承自抽象类Sync,而Sync类继承自抽象类AbstractQueuedSynchronizer(简称AQS)。如果我们看过JUC的源代码,会发现不仅重入锁用到了AQS, JUC 中绝大部分的同步工具也都是基于AQS构建的。那AQS是什么作用呢?
3.4 AQS简介
AQS(全称AbstractQueuedSynchronizer)即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等)。AQS是JUC并发包中的核心基础组件,其本身是一个抽象类。理论上还是利用管程实现的,在AQS中,有一个volatile修饰的state,获取锁的时候,会读写state的值,解锁的时候,也会读写state的值。所以AQS就拥有了volatile的happens-before规则。加锁与解锁的效果上与synchronized是相同的。
由类图可以看到,AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。
Node中的thread变量用来存放进入AQS队列里面的线程,Node节点内部:
prev记录当前节点的前驱节点
next 记录当前节点的后继节点
SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的
EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的
waitStatus 记录当前线程等待状态,可以为①CANCELLED (线程被取消了)、②SIGNAL(线程需要被唤醒)、③CONDITION(线程在CONDITION条件队列里面等待)、④PROPAGATE(释放共享资源时需要通知其他节点);
在AQS中维持了一个单一的状态信息state,对于ReentrantLock的实现来说,state 可以用来表示当前线程获取锁的可重入次数;AQS继承自AbstractOwnableSynchronizer,其中exclusiveOwnerThread变量表示当前共享资源的持有线程。
3.5 AQS实现原理
AQS是一个同步队列,内部使用一个FIFO的双向链表,管理线程同步时的所有被阻塞线程。双向链表这种数据结构,它的每个数据节点中都有两个指针,分别指向直接后继节点和直接前驱节点。所以,从双向链表中的任意一个节点开始,都可以很方便地访问它的前驱节点和后继节点。
我们看下面的AQS的数据结构,AQS有两个节点head,tail分别是头节点和尾节点指针,默认为null。AQS中的内部静态类Node为链表节点,AQS会在线程获取锁失败后,线程会被阻塞并被封装成Node加入到AQS队列中;当获取锁的线程释放锁后,会从AQS队列中的唤醒一个线程(节点)。
场景01-线程抢夺锁失败时,AQS队列的变化
AQS的head,tail分别代表同步队列头节点和尾节点指针,默认为null。
当第一个线程抢夺锁失败,同步队列会先初始化,随后线程会被封装成Node节点追加到AQS队列中。假设当前独占锁的的线程为ThreadA,抢占锁失败的线程为ThreadB。
(1)同步队列初始化,首先会在队列中添加一个空Node,这个节点中的thread=null,代表当前获取锁成功的线程。随后,AQS的head和tail会同时指向这个节点。
(2)接下来将ThreadB封装成Node节点,追加到AQS队列。设置新节点的prev指向AQS队尾节点;将队尾节点的next指向新节点;最后将AQS尾节点指针指向新节点。此时AQS变化,如下图:
当下一个线程抢夺锁失败时,重复上面步骤即可。将线程封装成Node,追加到AQS队列。假设此次抢占锁失败的线程为ThreadC,此时AQS变化,如下图:
场景02-线程被唤醒时,AQS队列的变化
ReentrantLock唤醒阻塞线程时,会按照FIFO的原则从AQS中head头部开始唤醒首个节点中线程。
head节点表示当前获取锁成功的线程ThreadA节点。
当ThreadA释放锁时,它会唤醒后继节点线程ThreadB,ThreadB开始尝试获得锁,如果ThreadB获得锁成功,会将自己设置为AQS的头节点。ThreadB获取锁成功后,AQS变化如下:
head指针指向ThreadB节点。
将原来头节点的next指向Null,从AQS中删除。
将ThreadB节点的prev指向Null,设置节点的thread=null。
上面是线程在竞争锁时,线程被阻塞和被唤醒时AQS同步队列的基本实现过程。
3.6 ReentrantLock源码分析:锁的获取
研究任何框架或工具都就要一个入口,我们以重入锁为切入点来理解AQS的作用及实现。下面我们深入ReentrantLock源码来分析AQS是如何实现线程同步的。
AQS其实使用了一种典型的设计模式:模板方法。我们如果查看AQS的源码可以看到,AQS为一个抽象类,AQS中大多数方法都是final或private的,也就是说AQS并不希望用户覆盖或直接使用这些方法,而是只能重写AQS规定的部分方法。
//AQS内部维护这一个双向链表,AQS主要属性
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;//头节点指针
private transient volatile Node tail;//尾节点指针
private volatile int state;//同步状态,0无锁;大于0,有锁,state的值代表重
入次数。
//AQS链表节点结构
static final class Node {
static final Node SHARED = new Node();//共享模式
static final Node EXCLUSIVE = null;//独占模式
/**
*等待状态:取消。表明线程已取消争抢锁并从队列中删除。
*取消动作:获取锁超时或者被其他线程中断。
*/
static final int CANCELLED = 1;
/**
*等待状态:通知。表明线程为竞争锁的候选者。
*只要持有锁的线程释放锁,会通知该线程。
*/
static final int SIGNAL = -1;
/**
*等待状态:条件等待
*表明线程当前线程在condition队列中。
*/
static final int CONDITION = -2;
/**
*等待状态:传播。
*用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机
制。
*在一个节点成为头节点之前,是不会跃迁为此状态的
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;//直接前驱节点指针
volatile Node next;//直接后继节点指针
volatile Thread thread;//线程
Node nextWaiter;//condition队列中的后继节点
final boolean isShared() {//是否是共享
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {//默认构造器
}
//在重入锁中用于addWaiter方法中,用于将阻塞的线程封装成一个Node
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {//用于Condition中
this.waitStatus = waitStatus;
this.thread = thread;
}
}
}
我们以重入锁中相对简单的公平锁为例,以获取锁的 lock 方法为入口,一直深入到AQS,来分析多线程是如何同步获取锁的。
获取锁时源码的调用过程,时序图如下:
第一步:ReentrantLock.lock()
ReentrantLock获取锁调用了 lock 方法,我们看下该方法的内部:调用了sync.lock()。
public void lock() {
sync.lock();
}
sync是Sync类的一个实例,Sync类实际上是ReentrantLock的抽象静态内部类,它集成了AQS来实现重入锁的具体业务逻辑。AQS是一个同步队列,实现了线程的阻塞和唤醒,没有实现具体的业务功能。在不同的同步场景中,需要用户继承AQS来实现对应的功能。
我们查看ReentrantLock源码,可以看到,Sync有两个实现类公平锁FairSync和非公平锁NoFairSync。
重入锁实例化时,根据参数fair为属性sync创建对应锁的实例。以公平锁为例,调用sync.lock事实上调用的是FairSync的lock方法。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
第二步:FairSync.lock()
我们看下该方法的内部,执行了方法acquire(1),acquire为AQS中的final方法,用于竞争锁。
final void lock() {
acquire(1);
}
第三步:AQS.acquire(1)
线程进入AQS中的acquire方法,arg=1。
这个方法逻辑:先尝试抢占锁,抢占成功,直接返回;
抢占失败,将线程封装成Node节点追加到AQS队列中并使线程阻塞等待。
(1)首先会执行tryAcquire(1)尝试抢占锁,成功返回true,失败返回false。抢占成功了,就不会执行
下面的代码了
(2)抢占锁失败后,执行addWaiter(Node.EXCLUSIVE)将x线程封装成Node节点追加到AQS队列。
(3)然后调用acquireQueued将线程阻塞,线程阻塞。
线程阻塞后,接下来就只需等待其他线程唤醒它,线程被唤醒后会重新竞争锁的使用。
接下来,我们看看这个三个方法具体是如何实现的。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
第四步:FairSync.tryAcquire(1)
尝试获取锁:若获取锁成功,返回true;获取锁失败,返回false。
这个方法逻辑:获取当前的锁状态,如果为无锁状态,当前线程会执行CAS操作尝试获取锁;若当前线程是重入获取锁,只需增加锁的重入次数即可。
//尝试以独占模式获取锁
//若锁是未锁定状态state=0,CAS修改state=1,修改成功说明当前线程获取锁成功,设置当前线程
为锁持有者,然后返回true。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();//状态:0未锁定,大于0已被其他线程独占。
if (c == 0) {//未锁定,可以获取锁
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires))
{//CAS设置state为1
setExclusiveOwnerThread(current);//设置当前线程为独占资源持有者
return true;
}
}
//如果当前线程已经是为锁持有者,设置重入次数,state + 1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;//设置重入次数+1
//重入次数,超过int最大值,溢出。
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//设置重入次数
return true;
}
return false;
}
第五步:AQS.addWaiter(Node.EXCLUSIVE)
线程抢占锁失败后,执行addWaiter(Node.EXCLUSIVE)将线程封装成Node节点追加到AQS队列。
addWaiter(Node mode)的mode表示节点的类型,Node.EXCLUSIVE表示是独占排他锁,也就是说重入锁是独占锁,用到了AQS的独占模式。
Node定义了两种节点类型:
共享模式:Node.SHARED。共享锁,可以被多个线程同时持有,如读写锁的读锁。
独占模式:Node.EXCLUSIVE。独占很好理解,是自己独占资源,独占排他锁同时只能由一个线程持有。
static final Node SHARED = new Node();//共享模式
static final Node EXCLUSIVE = null;//独占模式
相应的AQS支持两种模式:支持独占模式和共享模式。
/*
* 模式有两种:共享模式和独占模式
*/
private Node addWaiter(Node mode) {
//当前线程封装为Node准备排队获取锁
Node node = new Node(Thread.currentThread(), mode);
//先尝试快速插入同步队列。如果失败,再使用完整的排队策略。
Node pred = tail;
if (pred != null) {//如果双向链表不为空链表(有节点),追加节点到尾部
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);//链表为空,将节点追加到同步队列队尾
return node;
}
//通过自旋插入节点到同步队列AQS中,如果队列为空时,需先初始化队列。
private Node enq(final Node node) {
for (;;) {//自旋,至少会有两次循环。
Node t = tail;
if (t == null) { //队列为空,先初始化队列
if (compareAndSetHead(new Node()))//CAS插入节点
tail = head;
} else {//插入节点,追加节点到尾部
node.prev = t;
if (compareAndSetTail(t, node)) {//CAS插入节点
t.next = node;
return t;
}
}
}
}
第六步:AQS.acquireQueued(newNode,1)
这个方法的主要作用就是将线程阻塞。
- 若同步队列中,若当前节点为队列第一个线程,则有资格竞争锁,再次尝试获得锁。
- 尝试获得锁成功,移除链表head节点,并将当前线程节点设置为head节点。
- 尝试获得锁失败,判断是否需要阻塞当前线程。
- 若发生异常,取消当前线程获得锁的资格。
/**
*等待队列中的线程以独占的模式获取锁
* @param node 新加入等待队列线程节点
* @param arg 获取参数
* @return {@code true} 在等待中是否被中断
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//获取锁是否失败,一般是发生异常
try {
boolean interrupted = false;//是否中断
for (;;) {//无限循环,线程获得锁或者线程被阻塞
final Node p = node.predecessor();//获取此节点的前一个节点
//若此节点的前个节点为头节点,说明当前线程可以获取锁,阻塞前尝试获取锁,若获取锁成功,将当前线程从同步队列中删除。
if (p == head && tryAcquire(arg)) {//获取锁成功
/**
* 将当前线程从同步队列中删除。
* 将当前节点置为空节点,节点的prev,next和thread都为null。
* 将等待列表头节点指向当前节点
*/
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;//当前线程被中断
}
} finally {
//如果出现异常,取消线程获取锁请求
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
AQS.shouldParkAfterFailedAcquire
这个方法的主要作用是:线程竞争锁失败以后,通过Node的前驱节点的waitStatus状态来判断, 线程是否需要被阻塞。
如果前驱节点状态为 SIGNAL,当前线程可以被放心的阻塞,返回true。
若前驱节点状态为CANCELLED,向前扫描链表把 CANCELLED 状态的节点从同步队列中移除,返回false。
若前驱节点状态为默认状态或PROPAGATE,修改前驱节点的状态为 SIGNAL,返回 false。
若返回false,会退回到acquireQueued方法,重新执行自旋操作。自旋会重复执行acquireQueued和shouldParkAfterFailedAcquire,会有两个结果:
(1)线程尝试获得锁成功或者线程异常,退出acquireQueued,直接返回。
(2)执行shouldParkAfterFailedAcquire成功,当前线程可以被阻塞。
若返回true,调用parkAndCheckInterrupt阻塞当前线程。
Node 有 5 种状态,分别是:
0:默认状态。
1:CANCELLED,取消/结束状态。表明线程已取消争抢锁。线程等待超时或者被中断,节点的waitStatus为CANCELLED,线程取消获取锁请求。需要从同步队列中删除该节点
-1:SIGNAL,通知。状态为SIGNAL节点中的线程释放锁时,就会通知后续节点的线程。
-2:CONDITION,条件等待。表明节点当前线程在condition队列中。
-3:PROPAGATE,传播。在一个节点成为头节点之前,是不会跃迁为PROPAGATE状态的。用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。
/**
*是否需要阻塞当前线程,根据前驱节点中的waitStatus来判断是否需要阻塞当前线程。如果线
程需要被阻塞,返回true,这是自旋中的主要的信号量。
* @return {@code true} 如果线程需要被阻塞,返回true。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node
node) {
int ws = pred.waitStatus;//上一个节点的waitStatus的状态
if (ws == Node.SIGNAL)
//前驱节点为SIGNAL状态,在释放锁的时候会唤醒后继节点, 当前节点可以阻塞自己。
return true;
if (ws > 0) {
/**
* 向前扫描链表把 CANCELLED 状态的节点从同步队列中移除。
* 前驱节点状态为取消CANCELLED(1)时,向前遍历,更新当前节点的前驱节点为第一个非取消状态节点。
* 之后,
* (1)当前线程会再次返回方法acquireQueued,再次循环,尝试获取锁;
* (2)再次执行shouldParkAfterFailedAcquire判断是否需要阻塞。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*前驱节点状态<=0,此时还未判断的状态有 默认状态(0)/CONDITION(-2)/PROPAGATE(-3)。
*此时,不可能是CONDITION(-2),所以只能是默认状态(0)/PROPAGATE(-3)。
*CAS设置前驱节点的等待状态waitStatus为SIGNAL状态。
*此次,当前线程先暂时不阻塞。
*之后,
* (1)当前线程会再次返回方法acquireQueued,再次循环,尝试获取锁;
* (2)再次执行shouldParkAfterFailedAcquire判断是否需要阻塞。
* (3)前驱节点为SIGNAL状态,可以被阻塞。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
AQS.parkAndCheckInterrupt
将当前线程阻塞挂起。
LockSupport.park(this)会阻塞当前线程,会使当前线程(如ThreadB)处于等待状态,不再往下执行。
/**
* 将当前线程阻塞,并且在被唤醒时检查是否被中断
* @return {@code true} 如果被中断,返回true
*/
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
//检测当前线程是否已被中断(若被中断,并清除中断标志),中断返回 true,否则返回false。
return Thread.interrupted();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
LockSupport类
LockSupport类是Java1.6引入的一个类,所有的方法都是静态方法。它提供了基本的线程同步原语,提供了可以使线程阻塞和唤醒的方法。LockSupport实际上是调用了Unsafe类里的函数,调用了Unsafe的两个函数。
//取消阻塞(唤醒)线程
public native void unpark(Object thread);
/**阻塞(挂起)线程。当前线程被阻塞后,当前线程就会被挂起,直到其他线程unpark此线程。
*isAbsolute是否为绝对时间,true绝对时间,false相对时间。
*park(false,0):阻塞线程,直至被唤醒。
*park(true,time):暂停当前线程,增加了相对时间的限制,如
*park(true,time):暂停当前线程,增加了绝对时间的限制,如2020-12-01 21:00:00的long值
*/
public native void park(boolean isAbsolute, long time);
3.7 ReentrantLock源码分析:锁的释放
公平锁的释放,源码调用链路图
第一步:ReentrantLock.unlock
释放锁时,需调用ReentrantLock的unlock方法。这个方法内部,会调用sync.release(1),release方法为AQS类的final方法。
public void unlock() {
sync.release(1);
}
第二步:AQS.release(1)
首先执行方法tryRelease(1),tryRelease方法为ReentrantLock中Sync类的final方法,用于释放锁。
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁。若释放后锁状态为无锁状态,需唤醒后继线程
Node h = head;//同步队列头节点
if (h != null && h.waitStatus != 0)//若head不为null,说明链表中有节点。其状态不为0,唤醒后继线程。
unparkSuccessor(h);
return true;
}
return false;
}
第三步:Sync.tryRelease(1)
判断当前线程是否为锁持有者,若不是持有者,不能释放锁,直接抛出异常。
若当前线程是锁的持有者,将重入次数减1,并判断当前线程是否完全释放了锁。
若重入次数为0,则当前新线程完全释放了锁,将锁拥有线程设置为null,并将锁状态置为无锁状态(state=0),返回true。
若重入次数>0,则当前新线程仍然持有锁,设置重入次数=重入次数-1,返回false。
返回true说明,当前锁被释放,需要唤醒同步队列中的一个线程,执行unparkSuccessor唤醒同步队列中节点线程。
/**
* 释放锁返回值:true释放成功;false释放失败
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//重入次数减去1
//如果当前线程不是锁的独占线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果线程将锁完全释放,将锁初始化未无锁状态
free = true;
setExclusiveOwnerThread(null);
}
setState(c);//修改锁重入次数
return free;
}
第四步:AQS.unparkSuccessor
//唤醒后继线程
private void unparkSuccessor(Node node) {
/*
* 头节点waitStatus状态 SIGNAL或PROPAGATE
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//查找需要唤醒的节点:正常情况下,它应该是下一个节点。但是如果下一个节点为null或者它的waitStatus为取消时,则需要从同步队列tail节点向前遍历,查找到队列中首个不是取消状态的节点。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//将下一个节点中的线程unpark唤醒
if (s != null)
LockSupport.unpark(s.thread);
}
第五步:LockSupport.unpark(s.thread)
会唤醒挂起的线程,使被阻塞的线程继续执行。
3.8 公平锁和非公平锁源码实现区别
公平锁和非公平锁在获取锁和释放锁时有什么区别呢?
非公平锁与非公平锁释放锁是没有差异,释放锁时调用方法都是AQS的方法。
非公平锁与非公平锁获取锁的差异
我们可以看到上面在公平锁中,线程获得锁的顺序按照请求锁的顺序,按照先来后到的规则获取锁。如果线程竞争公平锁失败后,都会到AQS同步队列队尾排队,将自己阻塞等待锁的使用资格,锁被释放后,会从队首开始查找可以获得锁的线程并唤醒。
而非公平锁,允许新线程请求锁时,可以插队,新线程先尝试获取锁,如果获取锁失败,才会AQS同步队列队尾排队。
我们对比下两种锁的源码,非公平锁与非公平锁获取锁的差异有两处:
1.lock方法差异:
FairSync.lock:公平锁获取锁
final void lock() {
acquire(1);
}
NoFairSync.lock:非公平锁获取锁,lock方法中新线程会先通过CAS操作compareAndSetState(0, 1),尝试获得锁。
final void lock() {
if (compareAndSetState(0, 1))//新线程,第一次插队
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
lock方法中的acquire为AQS的final方法,公平锁和非公平锁,执行代码没有差别。差别之处在于公平锁和非公平锁对tryAcquire方法的实现。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
2.tryAcquire差异
FairSync.tryAcquire:公平锁获取锁,若锁为无锁状态时,本着公平原则,新线程在尝试获得锁前,需先判断AQS同步队列中是否有线程在等待,若有线程在等待,当前线程只能进入同步队列等待。若AQS同步无线程等待,则通过CAS抢占锁。而非公平锁,不管AQS是否有线程在等待,则都会先通过CAS抢占锁。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//公平锁,先判断同步队列中是否有线程在等待
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
NoFairSync.tryAcquire和NoFairSync.nonfairTryAcquire:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//非公平锁,入队前,二次插队
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁和非公平锁获取锁时,其他方法都是调用AQS的final方法,所以没有不同之处。
3.9读写锁ReentrantReadWriteLock
可重入锁ReentrantLock是互斥锁,互斥锁在同一时刻仅有一个线程可以进行访问,但是在大多数场景下,大部分时间都是提供读服务,而写服务占有的时间较少。然而读服务不存在数据竞争问题,如果一个线程在读时禁止其他线程读势必会导致性能降低,所以就出现了读写锁。
读写锁维护着一对锁,一个读锁和一个写锁。通过分离读锁和写锁,使得并发性比一般的互斥锁有了较大的提升:在同一时间可以允许多个读线程同时访问,但是在写线程访问时,所有读线程和写线程都会被阻塞。
读写锁的主要特性:
公平性:支持公平性和非公平性。
重入性:支持重入。读写锁最多支持65535个递归写入锁和65535个递归读取锁。
锁降级:写锁能够降级成为读锁,但读锁不能升级为写锁。遵循获取写锁、获取读锁在释放写锁的次序
读写锁ReentrantReadWriteLock实现接口ReadWriteLock,该接口维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
ReadWriteLock定义了两个方法。readLock()返回用于读操作的锁,writeLock()返回用于写操作的锁。ReentrantReadWriteLock定义如下:
/** 内部类 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/** 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}
/** 使用给定的公平策略创建一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 返回用于写入操作的锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用于读取操作的锁 */
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
//省略其余源代码
}
public static class WriteLock implements Lock, java.io.Serializable{
//省略其余源代码
}
public static class ReadLock implements Lock, java.io.Serializable {
//省略其余源代码
}
ReentrantReadWriteLock与ReentrantLock一样,其锁主体依然是Sync,它的读锁、写锁都是依靠Sync来实现的。所以ReentrantReadWriteLock实际上只有一个锁,只是在获取读取锁和写入锁的方式上不一样而已,它的读写锁其实就是两个类:ReadLock、writeLock,这两个类都是lock的实现。
案例
package com.hero.multithreading;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Demo10ReentrantReadWriteLock {
private static volatile int count = 0;
public static void main(String[] args) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
WriteDemo writeDemo = new WriteDemo(lock);
ReadDemo readDemo = new ReadDemo(lock);
for (int i = 0; i < 3; i++) {
new Thread(writeDemo).start();
}
for (int i = 0; i < 5; i++) {
new Thread(readDemo).start();
}
}
static class WriteDemo implements Runnable {
ReentrantReadWriteLock lock;
public WriteDemo(ReentrantReadWriteLock lock) {
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.writeLock().lock();
count++;
System.out.println("写锁:"+count);
lock.writeLock().unlock();
}
}
}
static class ReadDemo implements Runnable {
ReentrantReadWriteLock lock;
public ReadDemo(ReentrantReadWriteLock lock) {
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.readLock().lock();
System.out.println("读锁:"+count);
lock.readLock().unlock();
}
}
}
}
3.10 锁优化
减少锁持有时间
减少锁粒度
将大对象拆分成小对象,增加并行度,降低锁竞争。
ConcurrentHashMap允许多个线程同时进入
锁分离
根据功能进行锁分离
ReadWriteLock在读多写少时,可以提高性能。
锁消除
锁消除是发生在编译器级别的一种锁优化方式。
有时候我们写的代码完全不需要加锁,却执行了加锁操作。
锁粗化
通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽可能短,但是在某些情况下,一个程序对同一个锁不间断、高频地请求、同步与释放,会消耗掉一定的系统资源,因为锁的请求、同步与释放本身会带来性能损耗,这样高频的锁请求就反而不利于系统性能的优化了,虽然单次同步操作的时间可能很短。锁粗化就是告诉我们任何事情都有个度,有些情况下我们反而希望把很多次锁的请求合并成一个请求,以降低短时间内大量锁请求、同步、释放带来的性能损耗。
上面我们讲解了线程之间的互斥,接下来我们看一下线程之间如何进行合作
4.线程协作工具类
线程协作工具类就是帮助程序员更容易的让线程之间进行协作,来完成某个业务功能。
4.1 CountDownLatch倒数门闩
倒数结束之前,一直处于等待状态,直到数到0结束,此线程才继续工作。
场景:购物拼团,大巴人满发车,分布式锁
主要方法:
构造函数:CountDownLatch(int count):只有一个构造函数,参数count为需要倒数的数值。
await():当一个或多个线程调用await()时,这些线程会阻塞。
countDown():其他线程调用countDown()会将计数器减1,调用countDown方法的线程不会阻塞。当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行
用法:
一个线程等待多个线程都执行完,再继续自己的工作。
package com.hero.multithreading;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* CountDownLatch案例:6个程序猿加班
* 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行
*/
public class Demo11CountDownLatch {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try { TimeUnit.SECONDS.sleep(5); }
catch(InterruptedException e) {e.printStackTrace(); }
System.out.println(Thread.currentThread().getName() + "\t上完班,离开公司");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
new Thread(()->{
try {
countDownLatch.await();//卷王也是有极限的,设置超时时间
System.out.println(Thread.currentThread().getName()+"\t卷王最后关灯走人");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "7").start();
}
}
4.2 Semaphore信号量
用来限制或管理数量有限资源的使用情况。
信号量的作用就是维护一个”许可证”的计数,线程可以”获取”许可证,那信号量剩余的许可证就减少一个,线程也可以”释放”一个许可证,那信号量剩余的许可证就可以加一个。当信号量拥有的许可证数为0时,下一个还要要获取许可证的线程就需要等待,直到有另外的线程释放了许可证。
主要方法:
构造函数:Semaphore(int permits,Boolean fair):可以设置是否使用公平策略,如果传入true,则Semaphore会把之前等待的线程放到FIFO队列里,以便有了新许可证可以分给之前等待时间最长的线程。
acquire():获取许可证,当一个线程调用acquire操作时,他要么通过成功获取信号量(信号量减1),要么一直等待下去,直到有线程释放信号量,或超时。
release():释放许可证,会将信号量加1,然后唤醒等待的线程。
package com.hero.multithreading;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Semaphore案例:三辆小汽车抢车位
* Semaphore信号量主要作用:1.用于多个共享资源的互斥使用,2.用于并发线程数的控制
*/
public class Demo12Semaphore {
public static void main(String[] args) {
//模拟资源类,有3个空车位
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try{
//占有资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t抢到车位");
try {
TimeUnit.SECONDS.sleep(3);
} catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"\t停车3秒后离开车位");
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放资源
semaphore.release();
}
}, "Thread-Car-"+String.valueOf(i)).start();
}
}
}
4.3 CyclicBarrier循环栅栏
线程会等待,直到线程到了事先规定的数目,然后触发执行条件进行下一步动作
当有大量线程互相配合,分别计算不同任务,并且需要最后统一汇总时,就可以用CyclicBarrier,它可以构造一个集结点,当某一个线程执行完,它就会到集结点等待,直到所有线程都到集结点,则该栅栏就被撤销,所有线程统一出再,继续执行剩下的任务。
主要方法:
构造函数:CyclicBarrier(int parties, Runnable barrierAction),设置聚集的线程数量和集齐线程数的结果之后要执行的动作。
await():阻塞当前线程,待凑齐线程数量之后继续执行
package com.hero.multithreading;
import java.util.concurrent.CyclicBarrier;
/**
* 案例:集齐7龙珠召唤神龙
*/
public class Demo13CyclicBarrier {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("======召唤神龙");
});
for (int i = 1; i <= 7; i++) {
final int tempInt = i;
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "\t收集到第" + tempInt + "颗龙珠");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() +"\t第" + tempInt + "颗龙珠飞走了");
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-"+String.valueOf(i)).start();
}
}
}
CyclicBarrier和CountDownLatch区别:
- 作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需要等待数字到0,也就是说,CountDownLatch用于事件,而CyclicBarrier用于线程。
- 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用,而CyclicBarrier可以重复使用。
4.4 Condition接口(条件对象)
当线程1需要等待某个条件时就去执行condition.await()方法,一旦执行await()方法,线程就会进入阻塞状态。通常会有另一个线程2去执行对应条件,直到这个条件达成时,线程2就会执行condition.signal()方法,此时JVM就会从被阻塞的线程里找到那些等待该condition的线程,当线程1收到可执行信号时,它的线程状态就会变成Runnable可执行状态。
signalAll()会唤起所有正在等待的线程。
signal()是公平的,只会唤起那个等待时间最长的线程。
注意点:
Condition用来代替Object.wait/notify两者用法一样
Condition的await()会自动释放持有的Lock锁这点也和Object.wait一样
调用await时必须持有锁,否则会抛出异常。
package com.hero.multithreading;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 案例:Tony仨小哥洗剪吹
* 演示多线程之间按顺序调用,实现A->B->C
* 三个线程Tony要求如下:
* tony雄雄-洗头,tony超超-理发,tony麦麦-吹干
* 。。。
* tony雄雄-洗头,tony超超-理发,tony麦麦-吹干
* 依次来10轮
*/
public class Demo14ConditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(()->{
for (int i = 0; i < 10; i++) {
shareData.wash();
}
}, "tony-雄雄").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
shareData.cut();
}
}, "tony-超超").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
shareData.cook();
}
}, "tony-麦麦").start();
}
}
class ShareData {
private volatile int number = 1; //tony-雄雄:1, tony-超超:2, tony-麦麦:3
private Lock lock = new ReentrantLock();
private Condition c1 = lock.newCondition(); //number == 1
private Condition c2 = lock.newCondition(); //number == 2
private Condition c3 = lock.newCondition(); //number == 3
/**
* A线程每一轮要执行的操作
*/
public void wash() {
lock.lock();
try{
//判断
while(number != 1){
c1.await();
}
//模拟线程执行的任务
System.out.println(Thread.currentThread().getName()+"-洗头");
//通知
number = 2;
c2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* B线程每一轮要执行的操作
*/
public void cut() {
lock.lock();
try{
//判断
while(number != 2){
c2.await();
}
//模拟线程执行的任务
System.out.println(Thread.currentThread().getName()+"-理发");
//通知
number = 3;
c3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void cook() {
lock.lock();
try{
//判断
while(number != 3){
c3.await();
}
//模拟线程执行的任务
System.out.println(Thread.currentThread().getName()+"-吹干");
//通知
number = 1;
c1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
5.并发容器
5.1 什么是并发容器?
在JUC包中,有一大部分是关于并发容器的,如ConcurrentHashMap,ConcurrentSkipListMap,CopyOnWriteArrayList及阻塞队列。这里将介绍使用频率、面试中出现频繁的最高的ConcurrentHashMap和阻塞队列。
注意:这里说到的容器概念,相当于我们理解中的集合的概念。
同步容器:
Java中的集合主要分为四大类:List、Map、Set和Queue,但是并不是所有集合都是线程安全的。比如,我们经常使用的ArrayList,HashMap,HashSet就不是线程安全的。
早期的JDK1.0中的就提供了线程安全的集合,包括Vector,Stack和Hashtable。此外还有在JDK1.2中增加的Collections中内部SynchronizedXxx类,它们也是线程安全的集合,可以由对应
Collections.synchronizedXxx工厂方法创建。这些类实现线程安全的方式都是一样的:都是基于
synchronized这个同步关键字实现的,对每个公有方法都进行了同步,保证每次只有一个线程能访问集合,所以它们被称为线程安全的集合(同步容器)。
并发容器:
在JDK1.5之前,JDK提供的线程安全的类都是同步集合容器。同步容器都是线程安全的,但是所有线程对容器只能串行访问,性能很差。在JDK1.5之后引入的JUC并发包,提供的更多类型的并发容器,在性能上做了很多改进优化,可以用来替代同步容器。它们都是针对多线程并发访问来进行设计的,我们称它们为并发容器。
并发容器依然可以归属到我们提到的四大类:List、Map、Set 和 Queue。
这里我总结了一下它们特性和使用场景:
- List容器:
Vector:使用synchronized同步锁,数据具有强一致性。适合于对数据有强一致性要求的场景,但性能较差。
CopyOnWriteArrayList:底层使用数组存储数据,使用复制副本实现有锁写操作,不能保证强一致性。适合于读多写少,允许读写数据短暂不一致的高并发场景。
- Map容器
Hashtable:使用synchronized同步锁,数据具有强一致性。适合于对数据有强一致性要求的场景,但性能较差。
ConcurrentHashMap:基于数组+链表+红黑树实现,写操作时通过synchronized同步锁将HashEntry作为锁的粒度支持一定程度的并发写,具有弱一致性。适合于存储数据量较小,读多写少且不要求强一致性的高并发场景。
ConcurrentSkipListMap:基于跳表实现的有序Map,使用CAS实现无锁化读写,具有弱一致性。适合于存储数据量大,读写都比较频繁,对数据不要求强一致性的高并发场景。
- Set容器
CopyOnWriteArraySet:底层使用数组存储数据,使用复制副本实现有锁写操作,不能保证强一致性。适合于读多写少,允许读写数据短暂不一致的场景。
ConcurrentSkipListSet:基于跳表实现的有序Set,使用CAS实现无锁化读写,具有弱一致性。适合于存储数据量大,读写都比较频繁,对数据不要求强一致性的高并发场景。
5.2 ConcurrentHashMap
结构图
JDK1.7结构图
Java7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,仍然是数组和链表组成。
每个segment独立上ReentrantLock锁,每个segment之间互不影响,提高并发效率。
默认有16个segment,最多可以同时支持16个线程并发写(操作分别分布在不同的Segment上)。这个默认值可以在初始化时设置,但一旦初始化以后,就不可以再扩容了。
JDK1.8结构图
ConcurrentHashMap是一个存储 key/value 对的容器,并且是线程安全的。
改进一: 取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
改进二: 将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。查询更快
这是经典的数组加链表的形式。 并且在链表长度过长时转化为红黑树存储( Java 8 的优化) , 加快查找速度。
小结:
ConcurrentHashMap 采用数组 + 链表 + 红黑树的存储结构
存入的Key值通过自己的 hashCode 映射到数组的相应位置
ConcurrentHashMap 为保障查询效率, 在特定的时候会对数据增加长度【扩容】
当链表长度增加到 8 时, 可能会触发链表转为红黑树
5.3 CopyOnWriteArrayList
实现原理
CopyOnWrite 思想:是平时查询的时候,都不需要加锁,随便访问,只有在更新的时候,才会从原来的数据复制一个副本出来,然后修改这个副本,最后把原数据替换成当前的副本。修改操作的同时,读操作不会被阻塞,而是继续读取旧的数据。这点要跟读写锁区分一下。
package com.hero.multithreading;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Demo15CopyOnWriteArrayList {
public static void main(String[] args) {
//1、初始化CopyOnWriteArrayList
List<Integer> tempList = Arrays.asList(new Integer [] {1,2});
CopyOnWriteArrayList<Integer> copyList = new CopyOnWriteArrayList<>(tempList);
//2、模拟多线程对list进行读和写
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new ReadThread(copyList));
executorService.execute(new WriteThread(copyList));
executorService.execute(new WriteThread(copyList));
executorService.execute(new WriteThread(copyList));
executorService.execute(new ReadThread(copyList));
executorService.execute(new WriteThread(copyList));
executorService.execute(new ReadThread(copyList));
executorService.execute(new WriteThread(copyList));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("copyList size:"+copyList.size());
executorService.shutdown();
}
}
class ReadThread implements Runnable {
private List<Integer> list;
public ReadThread(List<Integer> list) {
this.list = list;
}
@Override
public void run() {
System.out.print("size:="+list.size()+",::");
for (Integer ele : list) {
System.out.print(ele + ",");
}
System.out.println();
}
}
class WriteThread implements Runnable {
private List<Integer> list;
public WriteThread(List<Integer> list) {
this.list = list;
}
@Override
public void run() {
this.list.add(9);
}
}
优缺点
优点
对于一些读多写少的数据,写入时复制的做法就很不错,例如:配置、黑名单、物流地址等变化非常少的数据,这是一种无锁的实现。可以帮我们实现程序更高的并发。
CopyOnWriteArrayList 并发安全且性能比 Vector 好。Vector 是增删改查方法都加了synchronized 来保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而
CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于 Vector。
缺点
数据一致性问题。这种实现只是保证数据的最终一致性,在添加到拷贝数据而还没进行替换的时候,读到的仍然是旧数据。
内存占用问题。如果对象比较大,频繁地进行替换会消耗内存,从而引发 Java 的 GC 问题,这个时候,我们应该考虑其他的容器,例如 ConcurrentHashMap。
5.4 并发队列
为什么要用队列
通过队列可以很容易的实现数据共享,并且解决上下游处理速度不匹配的问题,典型的生产者消费者模式队列中的读写等线程安全问题由队列负责处理。
常用并发队列
JUC提供了7种适合与不同应用场景的阻塞队列。
ArrayBlockingQueue :基于数组实现的有界阻塞队列
LinkedBlockingQueue :基于链表实现的有界阻塞队列
SynchronousQueue:不存储元素的阻塞队列
PriorityBlockingQueue :支持按优先级排序的无界阻塞队列
DelayQueue:优先级队列实现的无界阻塞队列
LinkedTransferQueue:基于链表实现的无界阻塞队列
LinkedBlockingDeque:基于链表实现的双向无界阻塞队列
阻塞队列
阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用。阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的。
take()方法获取并移除队列的头结点,一旦执行take时,队列里无数据则阻塞,直到队列里有数据。
put()方法是插入元素,但是如何队列已满,则无法继续插入,则阻塞,直到队列中有空闲空间。
是否有界(容量多大),这是非常重要的属性,无界队列Integer.MAX_VALUE,认为是无限容量。
ArrayBlockingQueue
有界,可以指定容量
公平:可以指定是否需要保证公平,如果想要保证公平,则等待最长时间的线程会被优先处理,不过会
带来一定的性能损耗。
场景:有10个面试者,只有1个面试官,大厅有3个位子让面试者休息,每个人面试时间10秒,模拟所有人面试的场景。
package com.hero.multithreading;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 案例:有10个面试者,只有1个面试官,大厅有3个位子让面试者休息,每个人面试时间10秒,模拟
所有人面试的场景。
*/
public class Demo16ArrayBlockingQueue {
static ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
public static void main(String[] args) {
Interviewer r1 = new Interviewer(queue);//面试官
Engineers e2 = new Engineers(queue);//程序员们
new Thread(r1).start();
new Thread(e2).start();
}
}
class Interviewer implements Runnable {
BlockingQueue<String> queue;
public Interviewer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("面试官:我准备好了,可以开始面试");
String msg;
try {
while(!(msg = queue.take()).equals("stop")){
System.out.println(msg + " 面试+开始...");
TimeUnit.SECONDS.sleep(10);//面试10s
System.out.println(msg + " 面试-结束...");
}
System.out.println("所有候选人都结束了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Engineers implements Runnable {
BlockingQueue<String> queue;
public Engineers(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
String candidate = "程序员" + i;
try {
queue.put(candidate);
System.out.println(candidate+" 就坐=等待面试~");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
queue.put("stop");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}