Java同步容器

ArrayList,HashSet,HashMap都是线程非安全的,在多线程环境下,会导致线程安全问题,所以在使用的时候需要进行同步,这无疑增加了程序开发的难度。所以JAVA提供了同步容器

同步容器

  • ArrayList ===> Vector,Stack
  • HashMap ===> HashTable(key,value都不能为空)
  • Collections.synchronizedXXX(List,Set,Map)

Vector实现List接口,底层和ArrayList类似,但是Vector中的方法都是使用synchronized修饰,即进行了同步的措施。 但是,Vector并不是线程安全的。

Stack也是一个同步容器,也是使用synchronized进行同步,继承与Vector,是数据结构中的,先进后出。

HashTableHashMap很相似,但HashTable进行了同步处理。

Collections工具类提供了大量的方法,比如对集合的排序、查找等常用的操作。同时也通过了相关了方法创建同步容器类

Vector

  1. package com.rumenz.task;
  2. import java.util.List;
  3. import java.util.Vector;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.Semaphore;
  8. //线程安全
  9. public class VectorExample1 {
  10. public static Integer clientTotal=5000;
  11. public static Integer thradTotal=200;
  12. private static List<Integer> list=new Vector<>();
  13. public static void main(String[] args) throws Exception{
  14. ExecutorService executorService = Executors.newCachedThreadPool();
  15. final Semaphore semaphore=new Semaphore(thradTotal);
  16. final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
  17. for (int i = 0; i < clientTotal; i++) {
  18. final Integer j=i;
  19. executorService.execute(()->{
  20. try {
  21. semaphore.acquire();
  22. update(j);
  23. semaphore.release();
  24. }catch (Exception e){
  25. e.printStackTrace();
  26. }
  27. countDownLatch.countDown();
  28. });
  29. }
  30. countDownLatch.await();
  31. executorService.shutdown();
  32. System.out.println("size:"+list.size());
  33. }
  34. private static void update(Integer j) {
  35. list.add(j);
  36. }
  37. }

同步容器不一定就线程安全

  1. package com.rumenz.task;
  2. import scala.collection.convert.impl.VectorStepperBase;
  3. import java.util.List;
  4. import java.util.Vector;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.Semaphore;
  8. //线程不安全
  9. public class VectorExample2 {
  10. public static Integer clientTotal=5000;
  11. public static Integer threadTotal=200;
  12. private static List<Integer> list=new Vector();
  13. public static void main(String[] args) {
  14. ExecutorService executorService = Executors.newCachedThreadPool();
  15. final Semaphore semaphore=new Semaphore(threadTotal);
  16. for (int i = 0; i < threadTotal; i++) {
  17. list.add(i);
  18. }
  19. for (int i = 0; i < clientTotal; i++) {
  20. try{
  21. semaphore.acquire();
  22. executorService.execute(()->{
  23. for (int j = 0; j < list.size(); j++) {
  24. list.remove(j);
  25. }
  26. });
  27. executorService.execute(()->{
  28. for (int j = 0; j < list.size(); j++) {
  29. list.get(j);
  30. }
  31. });
  32. semaphore.release();
  33. }catch (Exception e){
  34. e.printStackTrace();
  35. }
  36. }
  37. executorService.shutdown();
  38. }
  39. }

运行报错

  1. Exception in thread "pool-1-thread-2" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 36
  2. at java.util.Vector.get(Vector.java:751)
  3. at com.rumenz.task.VectorExample2.lambda$main$1(VectorExample2.java:38)
  4. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  5. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  6. at java.lang.Thread.run(Thread.java:748)

原因分析

Vector是线程同步容器,size(),get(),remove()都是被synchronized修饰的,为什么会有线程安全问题呢?

get()抛出的异常肯定是remove()引起的,Vector能同时保证同一时刻只有一个线程进入,但是:

  1. //线程1
  2. executorService.execute(()->{
  3. for (int j = 0; j < list.size(); j++) {
  4. list.remove(j);
  5. }
  6. });
  7. //线程2
  8. executorService.execute(()->{
  9. for (int j = 0; j < list.size(); j++) {
  10. list.get(j);
  11. }
  12. });
  • 线程1和线程2都执行完list.size(),都等于200,并且j=100
  • 线程1执行list.remove(100)操作,
  • 线程2执行list.get(100)就会抛出数组越界的异常。

同步容器虽然是线程安全的,但是不代表在任何环境下都是线程安全的。

HashTable

线程安全,key,value都不能为null。在修改数据时锁住整个HashTable,效率低下。初始size=11

  1. package com.rumenz.task;
  2. import java.util.Hashtable;
  3. import java.util.Map;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.Semaphore;
  8. //线程安全
  9. public class HashTableExample1 {
  10. public static Integer clientTotal=5000;
  11. public static Integer threadTotal=200;
  12. private static Map<Integer,Integer> map=new Hashtable<>();
  13. public static void main(String[] args) throws Exception {
  14. ExecutorService executorService = Executors.newCachedThreadPool();
  15. final Semaphore semaphore=new Semaphore(threadTotal);
  16. final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
  17. for (int i = 0; i < clientTotal; i++) {
  18. final Integer j=i;
  19. try{
  20. semaphore.acquire();
  21. update(j);
  22. semaphore.release();
  23. }catch (Exception e){
  24. e.printStackTrace();
  25. }
  26. countDownLatch.countDown();
  27. }
  28. countDownLatch.await();
  29. executorService.shutdown();
  30. System.out.println("size:"+map.size());
  31. }
  32. private static void update(Integer j) {
  33. map.put(j, j);
  34. }
  35. }
  36. //size:5000

Collections.synchronizedList线程安全

  1. package com.rumenz.task.Collections;
  2. import com.google.common.collect.Lists;
  3. import java.util.Collections;
  4. import java.util.List;
  5. import java.util.concurrent.CountDownLatch;
  6. import java.util.concurrent.ExecutorService;
  7. import java.util.concurrent.Executors;
  8. import java.util.concurrent.Semaphore;
  9. //线程安全
  10. public class synchronizedExample {
  11. public static Integer clientTotal=5000;
  12. public static Integer threadTotal=200;
  13. private static List<Integer> list=Collections.synchronizedList(Lists.newArrayList());
  14. public static void main(String[] args) throws Exception{
  15. ExecutorService executorService = Executors.newCachedThreadPool();
  16. final Semaphore semaphore=new Semaphore(threadTotal);
  17. final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
  18. for (int i = 0; i < clientTotal; i++) {
  19. final Integer j=i;
  20. try{
  21. semaphore.acquire();
  22. update(j);
  23. semaphore.release();
  24. }catch (Exception e){
  25. e.printStackTrace();
  26. }
  27. countDownLatch.countDown();
  28. }
  29. countDownLatch.await();
  30. executorService.shutdown();
  31. System.out.println("size:"+list.size());
  32. }
  33. private static void update(Integer j) {
  34. list.add(j);
  35. }
  36. }
  37. //size:5000

Collections.synchronizedSet线程安全

  1. package com.rumenz.task.Collections;
  2. import com.google.common.collect.Lists;
  3. import org.assertj.core.util.Sets;
  4. import java.util.Collections;
  5. import java.util.List;
  6. import java.util.Set;
  7. import java.util.concurrent.CountDownLatch;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.Semaphore;
  11. //线程安全
  12. public class synchronizedSetExample {
  13. public static Integer clientTotal=5000;
  14. public static Integer threadTotal=200;
  15. private static Set<Integer> set=Collections.synchronizedSet(Sets.newHashSet());
  16. public static void main(String[] args) throws Exception{
  17. ExecutorService executorService = Executors.newCachedThreadPool();
  18. final Semaphore semaphore=new Semaphore(threadTotal);
  19. final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
  20. for (int i = 0; i < clientTotal; i++) {
  21. final Integer j=i;
  22. try{
  23. semaphore.acquire();
  24. update(j);
  25. semaphore.release();
  26. }catch (Exception e){
  27. e.printStackTrace();
  28. }
  29. countDownLatch.countDown();
  30. }
  31. countDownLatch.await();
  32. executorService.shutdown();
  33. System.out.println("size:"+set.size());
  34. }
  35. private static void update(Integer j) {
  36. set.add(j);
  37. }
  38. }
  39. //size:5000

Collections.synchronizedMap线程安全

  1. package com.rumenz.task.Collections;
  2. import org.assertj.core.util.Sets;
  3. import java.util.Collections;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import java.util.concurrent.CountDownLatch;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.Semaphore;
  11. public class synchronizedMapExample {
  12. public static Integer clientTotal=5000;
  13. public static Integer threadTotal=200;
  14. private static Map<Integer,Integer> map=Collections.synchronizedMap(new HashMap<>());
  15. public static void main(String[] args) throws Exception{
  16. ExecutorService executorService = Executors.newCachedThreadPool();
  17. final Semaphore semaphore=new Semaphore(threadTotal);
  18. final CountDownLatch countDownLatch=new CountDownLatch(clientTotal);
  19. for (int i = 0; i < clientTotal; i++) {
  20. final Integer j=i;
  21. try{
  22. semaphore.acquire();
  23. update(j);
  24. semaphore.release();
  25. }catch (Exception e){
  26. e.printStackTrace();
  27. }
  28. countDownLatch.countDown();
  29. }
  30. countDownLatch.await();
  31. executorService.shutdown();
  32. System.out.println("size:"+map.size());
  33. }
  34. private static void update(Integer j) {
  35. map.put(j, j);
  36. }
  37. }
  38. //size:5000

Collections.synchronizedXXX在迭代的时候,需要开发者自己加上线程锁控制代码,因为在整个迭代过程中循环外面不加同步代码,在一次次迭代之间,其他线程对于这个容器的add或者remove会影响整个迭代的预期效果,这个时候需要在循环外面加上synchronized(XXX)

集合的删除

  • 如果在使用foreach或iterator进集合的遍历,
  • 尽量不要在操作的过程中进行remove等相关的更新操作。
  • 如果非要进行操作,则可以在遍历的过程中记录需要操作元素的序号,
  • 待遍历结束后方可进行操作,让这两个动作分开进行
  1. package com.rumenz.task;
  2. import com.google.common.collect.Lists;
  3. import java.util.Collections;
  4. import java.util.Iterator;
  5. import java.util.List;
  6. public class CollectionsExample {
  7. private static List<Integer> list=Collections.synchronizedList(Lists.newArrayList());
  8. public static void main(String[] args) {
  9. list.add(1);
  10. list.add(2);
  11. list.add(3);
  12. list.add(4);
  13. //del1();
  14. //del2();
  15. del3();
  16. }
  17. private static void del3() {
  18. for(Integer i:list){
  19. if(i==4){
  20. list.remove(i);
  21. }
  22. }
  23. }
  24. //Exception in thread "main" java.util.ConcurrentModificationException
  25. private static void del2() {
  26. Iterator<Integer> iterator = list.iterator();
  27. while (iterator.hasNext()){
  28. Integer i = iterator.next();
  29. if(i==4){
  30. list.remove(i);
  31. }
  32. }
  33. }
  34. //Exception in thread "main" java.util.ConcurrentModificationException
  35. private static void del1() {
  36. for (int i = 0; i < list.size(); i++) {
  37. if(list.get(i)==4){
  38. list.remove(i);
  39. }
  40. }
  41. }
  42. }

在单线程会出现以上错误,在多线程情况下,并且集合时共享的,出现异常的概率会更大,需要特别的注意。解决方案是希望在foreach或iterator时,对要操作的元素进行标记,待循环结束之后,在执行相关操作。

总结

同步容器采用synchronized进行同步,因此执行的性能会受到影响,并且同步容器也并不一定会做到线程安全。

返回笔记列表
入门小站