为了带你精通 Java AQS,我画了 40 张图,从管程模型讲起!

网站建设3年前发布
57 0 0

大家好,我是君哥。,Java中 AQS 是 AbstractQueuedSynchronizer 类,AQS 依赖 FIFO 队列来提供一个框架,这个框架用于实现锁以及锁相关的同步器,比如信号量、事件等。,在 AQS 中,主要有两部分功能,一部分是操作 state 变量,第二部分是实现排队和阻塞机制。,注意,AQS 并没有实现任何同步接口,它只是提供了类似 acquireInterruptible 的方法,调用这些方法可以实现锁和同步器。,Java 使用 MESA 管程模型来管理类的成员变量和方法,让这个类的成员变量和方法的操作是线程安全的。下图是 MESA 管程模型,里面除了定义共享变量外,还定义了条件变量和条件变量等待队列:,2023030615192759d396304c383ed39eb8548fa73f3b55c466a8796,上图中有三个知识点:,Java 中的 MESA 管程模型有一点改进,就是管程内部只有一个条件变量和一个等待队列。下图是 AQS 的管程模型:,20230307014504388390758570f0c252231295affe526f7a061a792,AQS 的管程模型依赖 AQS 中的 FIFO 队列实现入口等待队列,要进入管程内部,就由各种并发锁的限制。而 ConditionObject 则实现了条件队列,这个队列可以创建多个。,下面就从入口等待队列、并发锁、条件等待队列三个方面来带你彻底理解 AQS。,独占, 忽略 interrupts,这里的 tryAcquire 是抽象方法,由 AQS 的子类来实现,因为每个子类实现的锁是不一样的。,上面的代码可以看到,获取锁失败后,会先执行 addWaiter 方法加入队列,然后执行 acquireQueued 方法自旋地获取锁直到成功。,addWaiter 代码逻辑如下图,简单说就是把 node 入队,入队后返回 node 参数给 acquireQueued 方法:,20230306152225e1305b396060290b33f6387dc406fcd2ac3240703,acquireQueued 自旋获取锁逻辑如下图:,20230306151928d7c0f831529b0099538648bbc941ebba8d3d11790,这里有几个细节:,(1)waitStatus,(2)获取锁失败后挂起,如果前置节点不是头节点,或者前置节点是头节点但当前节点获取锁失败,这时当前节点需要挂起,分三种情况:,2023030615192842f476495b148e62b389713026be3be304959b133,2023030615192997a8cab27fff6cdac7101915f93c117ca3c4c6263,20230306151929b82559c822f25ed354159886f420bd2e105b1b100,(3)取消获取锁,如果获取锁抛出异常,则取消获取锁,如果当前节点是 tail 节点,分两种情况如下图:,2023030615193094941237354e2e2c1597982ea83ef8a911bc08321,如果当前节点不是 tail 节点,也分两种情况,如下图:,20230306152227036019f753dbc6b4c6c143379376d8e0e474ef817,4.对中断状态忽略,5.如果前置节点的状态是 0 或 PROPAGATE,会被当前节点自旋过程中更新成 -1,以便之后通知当前节点。,对应方法 acquireInterruptibly(int arg)。,跟忽略中断(acquire方法)不同的是要响应中断,下面两个地方响应中断:,对应方法 tryAcquireNanos(int arg, long nanosTimeout)。,这个方法具备了独占 + 响应中断 + 超时的功能,下面2个地方要判断是否超时:,独占锁释放分两步:释放锁,唤醒后继节点。,释放锁的方法 tryRelease 是抽象的,由子类去实现。,我们看一下唤醒后继节点的逻辑,首先需要满足两个条件:,20230306151932a166b740492b51a1c3b3222e08360c1803c2fe776,2023030615193299c14c719f64f1fc5e00010291bcaa321f8774806,之前我们讲了独占锁,这一小节我们谈共享锁,有什么不同呢?,对应方法 acquireShared,代码如下:,这里获取锁使用的方法是 tryAcquireShared,获取的是共享锁。获取共享锁跟获取独占锁不同的是,会返回一个整数值,说明如下:,怎么判断队列中等待节点是在等待共享锁呢?nextWaiter == SHARED,这个参数值是入队新建节点的时候构造函数传入的。,自旋过程中,如果获取锁成功(返回正数),首先把自己设置成新的 head 节点,然后把通知传播下去。如下图:,20230306152228729973d16e4e5ae26a8737ed5a10af4ad099d1260,之后会唤醒后面节点并保证唤醒操作可以传播下去。但是需要满足四个条件中的一个:,唤醒后面节点的操作,其实就是释放共享锁,对应方法是 doReleaseShared,见释放共享锁一节。,对应方法 acquireSharedInterruptibly(int arg)。,跟共享忽略中断(acquireShared 方法)不同的是要响应中断,下面两个地方响应中断:,如果检查到当前线程已经中断,则抛出 InterruptedException,当前线程退出。,对应方法 tryAcquireSharedNanos(int arg, long nanosTimeout)。,这个方法具备了共享 + 响应中断 + 超时的功能,下面两个个地方要判断是否超时:,另外,park 线程的操作使用 parkNanos 传入阻塞时间。,释放共享锁代码如下:,首先尝试释放共享锁,tryReleaseShared 代码由子类来实现。释放成功后执行AQS中的 doReleaseShared 方法,是一个自旋操作。,自旋的条件是队列中至少有两个节点,这里分三种情况。,情况一:当前节点 waitStatus 是 -1,如下图:,20230306151934b3e90cf9481017044695276eb808f6bb271ee4177,情况二:当前节点 waitStatus 是 0(被其他线程更新新成了中间状态),如下图:,202303061519347207cef01b09917aaf95639b6d0b8fbaf29c12331,情况三:当前节点 waitStatus 是 -3,为什么会这样呢?需要解释一下,head节点唤醒后继节点之前 waitStatus 已经被更新中间态 0 了,唤醒后继节点动作还没有执行,又被其他线程更成了 -3,也就是其他线程释放锁执行了上面情况二。这时需要先把 waitStatus 再更成 0 (在方法 unparkSuccessor),如下图:,20230306151934e81431876123ded1f00507c746e8dee77da50d435,上面的讲解可以看出,如果要基于 AQS 来实现并发锁,可以根据需求重写下面四个方法来实现,这四个方法在 AQS 中没有具体实现:,AQS 的子类需要重写上面的方法来修改 state 值,并且定义获取锁或者释放锁时 state 值的变化。子类也可以定义自己的 state 变量,但是只有更新 AQS 中的 state变量才会对同步起作用。,还有一个判断当前线程是否持有独占锁的方法 isHeldExclusively,也可以供子类重写后使用。,获取/释放锁的具体实现放到下篇文章讲解。,AQS 使用 FIFO 队列实现了一个锁相关的并发器模板,可以基于这个模板来实现各种锁,包括独占锁、共享锁、信号量等。,AQS 中,有一个核心状态是 waitStatus,这个代表节点的状态,决定了当前节点的后续操作,比如是否等待唤醒,是否要唤醒后继节点。,这一章节讲解 Java AQS 中的并发锁。其实 Java AQS 中的并发锁主要是基于 state 这个变量值来实现的。,我们先来看一下 UML 类图:,202303061522305679e9b25221b890c7a3972f5b3d273b56fffd651,从图中可以看到,ReentrantLock 使用抽象内部类 Sync 来实现了 AQS 的方法,然后基于 Sync 这个同步器实现了公平锁和非公平锁。主要实现了下面 3 个方法:,从实现的方法可以看到,ReentrantLock 中获取的锁是独占锁,我们再来看一下获取和释放独占锁的代码:,独占锁的特点是调用上面 acquire 方法,传入的参数是 1。,获取锁首先判断同步状态(state)的值。,这说明没有线程占用锁,当前线程如果符合下面两个条件,就可以获取到锁:,没有前任节点,如下图:,20230306151935389caa42187f0867e453363e5b0fae09c2f84d831,CAS 的方式更新 state 值(把 0 更新成 1)成功。如果获取独占锁成功,会更新 AQS 中 exclusiveOwnerThread 为当前线程,这个很容易理解。,这说明已经有线程占有锁,判断占有锁的线程是不是当前线程,如下图:,20230306151936215091f809a5468db8e350ab6a6bae78c2088b719,state += 1 值如果小于 0,会抛出异常。,如果获取锁失败,则进入 AQS 队列等待唤醒。,跟公平锁相比,非公平锁的唯一不同是如果判断到 state 等于 0,不用判断有没有前任节点,只要 CAS 设置 state 值(把 0 更新成 1)成功,就获取到了锁。,公平锁和非公平锁,释放逻辑完全一样,都是在内部类 Sync 中实现的。释放锁需要注意两点,如下图:,2023030615223141fd8e4738034787000812421f86f67773b63b331,公平锁的特点是每个线程都要进行排队,不用担心线程永远获取不到锁,但有个缺点是每个线程入队后都需要阻塞和被唤醒,这一定程度上影响了效率。非公平锁的特点是每个线程入队前都会先尝试获取锁,如果获取成功就不会入队了,这比公平锁效率高。但也有一个缺点,队列中的线程有可能等待很长时间,高并发下甚至可能永远获取不到锁。,我们先来看一下 UML 类图:,20230306152232e1c125811497966b19f3604743ed1e82509360401,从图中可以看到,ReentrantReadWriteLock 使用抽象内部类Sync来实现了 AQS 的方法,然后基于 Sync 这个同步器实现了公平锁和非公平锁。主要实现了下面 3 个方法:,下图是定义的几个常用变量:,20230306151940582ee50411e26fc79e93382a23a1a77c41bfe4565,下面这 2 个方法用户获取共享锁和独占锁的数量:,从sharedCount 可以看到,共享锁的数量要右移 16 位获取,也就是说共享锁占了高 16 位。从上图 EXCLUSIVE_MASK 的定义看到,跟 EXCLUSIVE_MASK 进行与运算,得到的是低 16 位的值,所以独占锁占了低 16 位。如下图:,2023030615194082f15eb422271ce25a73247fbe7e8ce0c6e024770,这样上面获取锁数量的方法就很好理解了。,读锁的实现对应内部类 ReadLock。,获取读锁实际上是 ReadLock 调用了 AQS 的下面方法,传入参数是 1:,ReentrantReadWriteLock 内部类 Sync 实现了 tryAcquireShared 方法,主要包括如下三种情况:,a.当前线程不需要阻塞(readerShouldBlock)。在公平锁中,需要判断是否有前置节点,如下图就需要阻塞:,20230306151940a3c0e0231003427761d242b98b83d6df9e64d2617,在非公平锁中,则是判断第一个节点是不是有独占锁,如下图就需要阻塞:,2023030615223324fe3f4808745c9ee418575440c17b175e88bf792,b.使用 CAS 把 state 的值加 SHARED_UNIT(65536)。这里是不是就更理解读锁占高位的说法了,获取一个读锁,state 的值就要加 SHARED_UNIT 这么多个。,c.给当前线程的 holdCount 加 1。,ReentrantReadWriteLock 释放读锁是在 ReadLock 中调用了 AQS 下面方法,传入的参数是1:,ReentrantReadWriteLock 内部类 Sync 实现了 releaseShared 方法,具体逻辑分为下面两步:,写锁的实现对应内部类 WriteLock。,ReentrantReadWriteLock 获取写锁其实是在 WriteLock 中调用了 AQS 的下面方法,传入参数 1:,在ReentrantReadWriteLock 内部类 Sync 实现了 tryAcquire 方法,首先获取 state 值和独占锁数量(exclusiveCount),之后分如下两种情况,如下图:,20230306151941a75af722918ce85d108921dbb596a23c5a74b3765,1.state 不等于 0:,如果当前线程不需要阻塞,并且给 state 赋值成功,使用 CAS 方式把 state 值加 1,把独占线程置为当前线程。,ReentrantReadWriteLock 释放写锁其实是在 WriteLock 中调用了 AQS 的下面方法,传入参数 1:,ReentrantReadWriteLock 在 Sync 中实现了 tryRelease(arg) 方法,逻辑如下:,我们先来看一下UML类图:,202303061523096564f5e018219141ac5517f30b3f7f95634b86246,从上面的图中看出,CountDownLatch 的内部类 Sync 实现了获取共享锁和释放共享锁的逻辑。,使用 CountDownLatch 时,构造函数会传入一个 int 类型的参数 count,表示调动 count 次的 countDown 后主线程才可以被唤醒。,上面的 Sync(count) 就是将 AQS 中的 state 赋值为 count。,CountDownLatch 的 await 方法调用了 AQS 中的 acquireSharedInterruptibly(int arg),传入参数 1,不过这个参数并没有用。代码如下:,Sync 中实现了 tryAcquireShared 方法,await 逻辑如下图:,2023030615194262d1288885532cd16f9651a082a8321e6985d5758,上面的自旋过程就是等待 state 的值不断减小,只有 state 值成为 0 的时候,主线程才会跳出自旋执行之后的逻辑。,CountDownLatch 的 countDown 方法调用了 AQS 的 releaseShared(int arg),传入参数 1,不过这个参数并没有用。内部类 Sync 实现了 tryReleaseShared 方法,逻辑如下图:,20230306152235239c0e5877f18cc1ca692594d07df7dc1e4442528,CountDownLatch 的构造函数入参值会赋值给 state 变量,入队操作是主线程入队,每个子线程调用了countDown 后 state 值减 1,当 state 值成为 0 后唤醒主线程。,Semaphore 是一个信号量,用来保护共享资源。如果线程要访问共享资源,首先从 Semaphore 获取锁(信号量),如果信号量的计数器等于 0,则当前线程进入 AQS 队列阻塞等待。否则,线程获取锁成功,信号量减 1。使用完共享资源后,释放锁(信号量加 1)。,Semaphore 跟管程模型不一样的是,允许多个(构造函数的 permits)线程进入管程内部,因此也常用它来做限流。,UML 类图如下:,20230306152309a200564905e5199faac945fe8b4852a0c5efe2182,Semaphore的构造函数会传入一个int类型参数,用来初始化state的值。,获取锁的操作调用了 AQS 中的 acquireSharedInterruptibly 方法,传入参数 1,代码见 CountDownLatch 中 await 小节。Semaphore 在公平锁和非公平锁中分别实现了 tryAcquireShared 方法。,Semaphore 默认使用非公平锁,如果使用公平锁,需要在构造函数指定。获取公平锁逻辑比较简单,如下图:,20230306151947e30cd50875d5ce714c7709d64530688616efd3790,acquire 在非公平的锁唯一的区别就是不会判断 AQS 队列是否有前置节点(hasQueuedPredecessors),而是直接尝试获取锁。,除了 acquire 方法外,还有其他几个获取锁的方法,原理类似,只是调用了 AQS 中的不同方法。,释放锁的操作调用了 AQS 中的 releaseShared(int arg) 方法,传入参数 1,在内部类 Sync 中实现了 tryReleaseShared 方法,逻辑很简单:使用 CAS 的方式将 state 的值加 1,之后唤醒队列中的后继节点。,ThreadPoolExecutor 中也用到了 AQS,看下面的 UML 类图:,2023030615194767bc29267bd43f56335401b06e19b819edbee7287,Worker 主要在 ThreadPoolExecutor 中断线程的时候使用。Worker 自己实现了独占锁,在中断线程时首先进行加锁,中断操作后释放锁。按照官方说法,这里不直接使用 ReentrantLock 的原因是防止调用控制线程池的方法(类似 setCorePoolSize)时能够重新获取到锁,,使用 CAS 的方式把 AQS 中 state 从 0 改为 1,把当前线程置为独占线程。,把独占线程置为空,把 AQS 中 state 改为 0。,Worker 初始化的时候会把 state 置为 -1,这样是不能获取锁成功的。只有调用了 runWorker 方法,才会通过释放锁操作把 state 更为 0。这样保证了只中断运行中的线程,而不会中断等待中的线程。,AQS 基于双向队列实现了入口等待队列,基于 state 变量实现了各种并发锁,上篇文章讲了入口等待队列,而这篇文章主要讲了基于 AQS 的并发锁原理。,本章节主要讲解管程模型中条件变量等待队列。,首先我们看一下官方给出的示例代码:,这个代码定义了两个条件变量,notFull 和 notEmpty,说明如下:,Java AQS 的条件变量等待队列是基于接口 Condition 和 ConditionObject 来实现的,URM 类图如下:,20230306151948f8f5708379de66877db760381b6de74db27ca7480,Condition 接口主要定义了下面3个方法:,条件等待队列跟入口等待队列有两个不同:,20230306152236833a39579c48d9c659e080864275976d619459225,入队方法对应方法 addConditionWaiter,这里有三种情况:,20230306151949c40d26e8032febe30a693172e9f2957a47857b888,2023030615223647cea5b441423d280a14333b83510a5b9d4fed878,20230306152310d3cdd9f979a548ac522568fd0000d9f951582a303,可以看到,这种情况会从队列第一个元素开始检查 waitStatus 不是 -2 的元素,并从队列中移除。,AQS 的并发锁是基于 state 变量实现的,线程进入条件等待队列后,要释放锁,即 state 会变为 0,释放操作会唤醒入口等待队列中的线程。对应方法 fullyRelease,返回值是释放锁减掉的 state 值 savedState。,释放锁后,线程阻塞,自旋等待被唤醒。,唤醒之后,当前线程主要有四个动作:,上面提到了 interruptMode,这个属性有三个值:,AQS 还提供了其他几个 await 方法,如下:,唤醒条件等待队列中的元素,首先判断当前线程是否持有独占锁,如果没有,抛出异常。,唤醒条件队列中的元素,会从第一个元素也就是 firstWaiter 开始,根据 firstWaiter 的 waitStatus 是不是 -2,分两种情况。,条件队列第一个节点进入入口等待队列,等待获取锁,如下图:,2023030615194992cf5d6221f18235b86139af35324e4fffbc51472,这里有两个注意点:,20230306152236e44adb655500cc6606c578e283a569b19deede957,如果重置 waitStatus 状态失败,则 unpark 节点 firstWaiter。,如果 firstWaiter 的 waitStatus 不等于 -2,则查找 firstWaiter 的 nextWaiter,直到找到一个 waitStatus 等于 -2 的节点,然后将这个节点加入入口等待队列队尾,如下图:,20230306152311743be5940beb643cdf186808a37356323f91bc907,上面的两种情况无论哪种,进入入口等待队列之前都要用 CAS 的方式把 waitStatus 改为 0。,理解了 signal 的逻辑,signalAll 的逻辑就非常容易理解了。首先判断当前线程是否持有独占锁,如果没有,抛出异常。,将条件等待队列中的所有节点依次加入入口等待队列。如下图:,2023030615223809d988f417b51496e9c152cdf845ecce57da9b951,Java 并发包下有很多类使用到了 AQS 中的 Condition,如下图:,2023030615195176ff2208536e850ae5c235f8fbf45168a57355185,这里我们以 CyclicBarrier 为例来讲解。CyclicBarrier 是让一组线程相互等待共同达到一个屏障点。从 Cyclic 可以看出 Barrier 可以循环利用,也就是当线程释放之后可以继续使用。,看下面这段示例代码:,执行结果:,CyclicBarrier 初始化的时候,会指定线程的数量 count,每个线程执行完逻辑后,调用 CyclicBarrier 的 await 方法,这个方法首先将 count 减 1,然后调用 Condition的 await,让当前线程进入条件等待队列。当最后一个线程将 count 减 1 后,count 数量等于 0,这时就会调用 Condition 的 signalAll 方法唤醒所有线程。,Java 的管程模型使用了 MESA 模型,基于 AQS 实现的 MESA 模型中,使用双向队列实现了入口等待队列,使用变量 state 实现了并发锁,使用 Condition 实现了条件等待队列。,在 AQS 的实现中,使用同步队列这个术语来表示双向队列,本文中使用入口等待队列来描述是为了更好的配合管程模型来讲解。,AQS 的 Condition 中,使用 await 方法将当前线程放入条件变量等待队列阻塞等待,使用 notify 来唤醒条件等待队列中的线程,被唤醒之后,线程并不能立刻执行,而是进入入口等待队列等待获取锁。

© 版权声明

相关文章