AQS同步队列结构分析

同步队列结构

AQS使用的同步队列是基于一种CLH锁算法来实现。

CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋.

图片alt

同步器中包含了两个节点类型的引用,一个指向头节点(head),一个指向尾节点(tail),没有获取到锁的线程,加入到队列的过程必须保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法CompareAndSetTail(Node expect,Node update),它需要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才能正式与之前的尾节点建立关联。

图片alt

同步器队列遵循FIFO,首节点是获取锁成功的节点,首节点的线程在释放锁时,会唤醒后续节点,而后继节点在成功获取到锁后,会把自己设置成首节点,设置首节点是由获取锁成功的线程来完成的,由于只有一个线程能成功获取到锁,所以设置首节点不需要CAS

图片alt

AQS实现一个线程安全的计数器

自定义互斥锁

  1. package com.rumenz.task.aqs;
  2. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  3. public class MyLock {
  4. private static final Sync STATE_HOLDER = new Sync();
  5. /**
  6. * 通过Sync内部类来持有同步状态, 当状态为1表示锁被持有,0表示锁处于空闲状态
  7. */
  8. private static class Sync extends AbstractQueuedSynchronizer {
  9. /**
  10. * 是否被独占, 有两种表示方式
  11. * 1. 可以根据状态,state=1表示锁被占用,0表示空闲
  12. * 2. 可以根据当前独占锁的线程来判断,即getExclusiveOwnerThread()!=null 表示被独占
  13. */
  14. @Override
  15. protected boolean isHeldExclusively() {
  16. return getExclusiveOwnerThread() != null;
  17. }
  18. /**
  19. * 尝试获取锁,将状态从0修改为1,操作成功则将当前线程设置为当前独占锁的线程
  20. */
  21. @Override
  22. protected boolean tryAcquire(int arg) {
  23. if (compareAndSetState(0, 1)) {
  24. setExclusiveOwnerThread(Thread.currentThread());
  25. return true;
  26. }
  27. return false;
  28. }
  29. /**
  30. * 释放锁,将状态修改为0
  31. */
  32. @Override
  33. protected boolean tryRelease(int arg) {
  34. if (getState() == 0) {
  35. throw new UnsupportedOperationException();
  36. }
  37. setExclusiveOwnerThread(null);
  38. setState(0);
  39. return true;
  40. }
  41. }
  42. /**
  43. * 下面的实现Lock接口需要重写的方法,基本是就是调用内部内Sync的方法
  44. */
  45. public void lock() {
  46. STATE_HOLDER.acquire(1);
  47. }
  48. public void unlock() {
  49. STATE_HOLDER.release(1);
  50. }
  51. }

测试案例

  1. package com.rumenz.task.aqs;
  2. import org.omg.Messaging.SYNC_WITH_TRANSPORT;
  3. import java.util.concurrent.CountDownLatch;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.Semaphore;
  7. public class LockTest {
  8. private final static Integer clientTotal=100000;
  9. private final static Integer threadTotal=200;
  10. private static Count count=new Count();
  11. private static Count unSafe=new Count();
  12. public static void main(String[] args) throws Exception {
  13. ExecutorService executorService = Executors.newCachedThreadPool();
  14. final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
  15. final Semaphore semaphore=new Semaphore(threadTotal);
  16. for (int i = 0; i < clientTotal; i++) {
  17. executorService.execute(()->{
  18. try{
  19. semaphore.acquire();
  20. count.getIncrement();
  21. unSafe.getUnSafeIncrement();
  22. semaphore.release();
  23. }catch (Exception e){
  24. e.printStackTrace();
  25. }
  26. countDownLatch.countDown();
  27. });
  28. }
  29. countDownLatch.await();
  30. System.out.println("safe:"+count.getCount());
  31. System.out.println("unSafe:"+unSafe.getCount());
  32. executorService.shutdown();
  33. }
  34. }
  35. class Count{
  36. private MyLock myLock;
  37. private volatile int count;
  38. Count() {
  39. this.myLock=new MyLock();
  40. }
  41. int getCount(){
  42. return count;
  43. }
  44. int getIncrement(){
  45. myLock.lock();
  46. count++;
  47. myLock.unlock();
  48. return count;
  49. }
  50. int getUnSafeIncrement(){
  51. count++;
  52. return count;
  53. }
  54. }

输出结果

  1. safe:100000
  2. unSafe:99995

返回笔记列表
入门小站