AtomicStampedReference解决CAS的ABA问题

AtomicStampReference

解决CASABA问题

什么是ABA

ABA问题:指CAS操作的时候,线程将某个变量值由A修改为B,但是又改回了A,其他线程发现A并未改变,于是CAS将进行值交换操作,实际上该值已经被改变过,这与CAS的核心思想是不符合的

ABA解决方案

每次变量更新的时候,把变量的版本号进行更新,如果某变量被某个线程修改过,那么版本号一定会递增更新,从而解决ABA问题

AtomicReference 演示ABA问题

  1. package com.keytech.task;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. import java.util.concurrent.atomic.AtomicReference;
  6. public class AtomicIntegerTest {
  7. private static AtomicReference<Integer> count=new AtomicReference<>(10);
  8. public static void main(String[] args) {
  9. ExecutorService executorService = Executors.newCachedThreadPool();
  10. executorService.execute(()->{
  11. boolean b = count.compareAndSet(10, 12);
  12. if(b){
  13. System.out.println(Thread.currentThread().getName()+"修改成功count="+count.get());
  14. }
  15. boolean c =count.compareAndSet(12, 10);
  16. if(c){
  17. System.out.println(Thread.currentThread().getName()+"修改成功count="+count.get());
  18. }
  19. });
  20. executorService.execute(()->{
  21. boolean b = count.compareAndSet(10, 100);
  22. if(b){
  23. System.out.println(Thread.currentThread().getName()+"修改成功count="+count.get());
  24. }
  25. });
  26. executorService.shutdown();
  27. }
  28. }
  29. //pool-1-thread-1修改成功count=12
  30. //pool-1-thread-1修改成功count=10
  31. //pool-1-thread-2修改成功count=100

pool-1-thread-1count由10修改成12,又将count从12改成10。 pool-1-thread-2count从10成功改成100。出现了ABA的问题。

AtomicStampedReference解决ABA的问题

以计数器的实现为例,计数器通常用来统计在线人数,在线+1,离线-1,是ABA的典型场景。

  1. package com.keytech.task;
  2. import java.util.concurrent.atomic.AtomicStampedReference;
  3. public class CounterTest {
  4. private AtomicStampedReference<Integer> count=new AtomicStampedReference<Integer>(0,0);
  5. public int getCount(){
  6. return count.getReference();
  7. }
  8. public int increment(){
  9. int[] stamp=new int[1];
  10. while (true){
  11. Integer value = count.get(stamp);
  12. int newValue=value+1;
  13. boolean b = count.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
  14. if(b){
  15. return newValue;
  16. }
  17. }
  18. }
  19. public int decrement(){
  20. int[] stamp=new int[1];
  21. while(true){
  22. Integer value=count.get(stamp);
  23. int newValue=value-1;
  24. boolean b = count.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
  25. if(b){
  26. return newValue;
  27. }
  28. }
  29. }
  30. }

调用计数器

  1. package com.keytech.task;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.Semaphore;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. import java.util.concurrent.atomic.AtomicReference;
  7. public class AtomicIntegerTest {
  8. public static void main(String[] args) {
  9. ExecutorService executorService = Executors.newCachedThreadPool();
  10. Semaphore semaphore=new Semaphore(200);
  11. CounterTest counterTest=new CounterTest();
  12. for (int i = 0; i < 5000; i++) {
  13. executorService.execute(()->{
  14. try{
  15. semaphore.acquire();
  16. counterTest.increment();
  17. semaphore.release();
  18. }catch (Exception e){
  19. e.printStackTrace();
  20. }
  21. });
  22. executorService.execute(()->{
  23. try{
  24. semaphore.acquire();
  25. counterTest.decrement();
  26. semaphore.release();
  27. }catch (Exception e){
  28. e.printStackTrace();
  29. }
  30. });
  31. }
  32. executorService.shutdown();
  33. System.out.println(counterTest.getCount());
  34. }
  35. }
  36. //输出0

AtomicBoolean保证高并发下只执行一次

  1. package com.keytech.task;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. import java.util.concurrent.Semaphore;
  5. import java.util.concurrent.atomic.AtomicBoolean;
  6. public class AtomicBooleanTest {
  7. private static AtomicBoolean isHappen=new AtomicBoolean(false);
  8. public static int clientTotal=5000;
  9. public static int threadTotal=200;
  10. public static void main(String[] args) {
  11. ExecutorService executorService = Executors.newCachedThreadPool();
  12. Semaphore semaphore=new Semaphore(threadTotal);
  13. for (int i = 0; i < clientTotal; i++) {
  14. executorService.execute(()->{
  15. try {
  16. semaphore.acquire();
  17. update();
  18. semaphore.release();
  19. }catch (Exception e){
  20. e.printStackTrace();
  21. }
  22. });
  23. }
  24. executorService.shutdown();
  25. }
  26. private static void update(){
  27. if(isHappen.compareAndSet(false, true)){
  28. System.out.println("只执行一次");
  29. }
  30. }
  31. }
  32. //只执行一次

返回笔记列表
入门小站