avatarSHAXUTANG

Java并发编程JUC

JDK1.5引入java.util.current针对并发问题的解决方案

售票案例

import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; class Ticket { private int ticket = 30; //创建锁 Lock lock = new ReentrantLock(); public void sale() { //加锁 lock.lock(); try { if (ticket > 0) { System.out.println(Thread.currentThread().getName() + "卖第" + (ticket--) + "张票,还剩" + ticket + "张票"); } } finally { //释放锁 lock.unlock(); } } } public class SaleTicket { public static void main(String[] args) { Ticket ticket = new Ticket(); //开启三个线程 new Thread(() -> { for (int i = 0; i < 30; i++) { ticket.sale(); } }, "窗口一:").start(); new Thread(() -> { for (int i = 0; i < 30; i++) { ticket.sale(); } }, "窗口二:").start(); new Thread(() -> { for (int i = 0; i < 30; i++) { ticket.sale(); } }, "窗口三:").start(); } }

生产者和消费者

声明一个资源提供者,提供对资源操作的自增自减操作

class AirConditional { private int num = 0; public synchronized void increment() throws InterruptedException { //当num!=0时就释放锁,回到就绪状态,不做自增操作 if (num != 0) { this.wait(); } num++; System.out.println("increment:" + num); //自增之后唤醒其他线程 this.notifyAll(); } public synchronized void decrement() throws InterruptedException { //当num==0时就释放锁,回到就绪状态,不做自减操作 if (num == 0) { this.wait(); } num--; System.out.println("decrement:" + num); //自减之后唤醒其他线程 this.notifyAll(); } }

创建两个线程,一个自增一个自减

AirConditional airConditional = new AirConditional(); new Thread(() -> { //循环自增10次 for (int i = 0; i < 10; i++) { try { airConditional.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(() -> { //循环自减10次 for (int i = 0; i < 10; i++) { try { airConditional.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); /** 结果:自增自减交替进行 1 0 1 0 1 0 **/

虚假唤醒

AirConditional airConditional = new AirConditional(); for (int j = 0; j < 2; j++) { new Thread(() -> { for (int i = 0; i < 10; i++) { try { airConditional.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { airConditional.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } /** 当多个线程操作自增自减,此时if判断只能进行一次,会造成虚假唤醒的结果 结果:自增自减顺序不一致 0 1 5 10 0 **/

解决方案

//修改资源提供类 class AirConditional { private int num = 0; public synchronized void increment() throws InterruptedException { //利用while循环,不断的判断,判断通过即可进行下一步,不会出现一次判断虚假唤醒 while (num != 0) { this.wait(); } num++; System.out.println("increment:" + num); this.notifyAll(); } public synchronized void decrement() throws InterruptedException { while (num == 0) { this.wait(); } num--; System.out.println("decrement:" + num); this.notifyAll(); } }

JUC版本的生产者和消费者

  • 线程等待和唤醒被Condition中的Api替换 await signalAll
  • 使用Lock来进行手动加锁
class AirConditional { private int num = 0; private final Lock lock = new ReentrantLock(); priva public void increment() throws InterruptedException { //加锁 lock.lock(); try { //while循环判断,防止虚假唤醒 while (num != 0) { //JUC 中的Condition 线程等待方法 condition.await(); } num++; System.out.println("increment:" + num); //JUC 中的Condition 线程唤醒方法 condition.signalAll(); } finally { lock.unlock(); } } public void decrement() throws InterruptedException { lock.lock(); try { while (num == 0) { condition.await(); } num--; System.out.println("decrement:" + num); condition.signalAll(); } finally { lock.unlock(); } } }

线程不安全的集合

a. ArrayList:速度快,但线程不安全,

​ 当多个线程同时操作一个ArrayList就会产生并发异常:java.util.ConcurrentModificationException

线程不安全案例

List<String> list = new ArrayList(); for (int i = 0; i < 30; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 8)); System.out.println(list); }).start(); }

创建线程安全的List

  • 使用Collections中的工具方法
  • 使用JUC提供的创建线程安全的List
//Collections创建的线程安全的List List<String> list = Collections.synchronizedList(new ArrayList<>()); //JUC提供的 List<String> list = new CopyOnWriteArrayList<>(); for (int i = 0; i < 30; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 8)); System.out.println(list); }).start(); }

b. Set:可以存入无序且不重复的值,但同样线程不安全,会产生java.util.ConcurrentModificationException

​ 底层由HashMap实现

HashSet底层实现

//声明一个全局静态常量 private static final Object PRESENT = new Object(); //构造方法创建一个HashMap public HashSet() { map = new HashMap<>(); } //利用HashMap的键不会重复的特性,将值作为键存入,value就使用全局静态常量 //因此HashSet值不会重复 public boolean add(E e) { return map.put(e, PRESENT)==null; }

创建线程安全的Set

//Collections API Set<Object> set = Collections.synchronizedSet(new HashSet<>()); // JUC API Set<String> set = new CopyOnWriteArraySet<>();

c. HashMap:以键值对的形式存入数据,但线程不安全

创建线程安全的HashMap

// Collections API Map<String, Object> map = Collections.synchronizedMap(new HashMap<>()); // JUC API Map<String, Object> map = new ConcurrentHashMap<>();

CountDownLatch

计数器,为了让多个线程执行完后,主线程再继续进行

//定义计数器开始数 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "离开了"); //执行完一个线程,计数器减一个 countDownLatch.countDown(); }).start(); } //阻塞主线程 countDownLatch.await(); System.out.println("关门了");

CyclicBarrier

当完成的线程数达到指定数量则执行指定逻辑

//当执行完成的数量达到指定数量则进行 CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("完成")); for (int i = 0; i < 7; i++) { int finalI = i; new Thread(() -> { System.out.println("第" + finalI + "步"); try { //执行完后阻塞, cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); }

信号量Semaphore

限流,指定可以操作资源的数量,超出则不可操作,等腾出位置抢到了之后才可以继续操作

//定义三个位置操作资源,分别由线程来抢占位置 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 3; i++) { try { //抢占到了位置,资源位置数减一 semaphore.acquire(); System.out.println("抢到了资源"); } finally { //执行完毕后,资源位置数+1 semaphore.release(); System.out.println("释放了资源"); } }

ReadWriteLock

所谓读写锁,是对访问资源共享锁和排斥锁,一般的重入性语义为如果对资源加了写锁,其他线程无法再获得写锁与读锁,但是持有写锁的线程,可以对资源加读锁(锁降级);如果一个线程对资源加了读锁,其他线程可以继续加读锁。

class MyCache { private final Map<String, String> map = new HashMap<>(); private final ReadWriteLock lock = new ReentrantReadWriteLock(); public void write(String key) { lock.writeLock().lock(); try { map.put(key, ""); System.out.println("写入成功"); } finally { lock.writeLock().unlock(); } } public void read(String key) { lock.readLock().lock(); try { System.out.println(map.get(key)); System.out.println("读取成功"); } finally { lock.readLock().unlock(); } } } public class ReadWriteLockDemo { public static void main(String[] args) { MyCache myCache = new MyCache(); for (int i = 0; i < 5; i++) { int finalI = i; new Thread(() -> { myCache.write(String.valueOf(finalI)); }).start(); } for (int i = 0; i < 5; i++) { int finalI = i; new Thread(() -> { myCache.read(String.valueOf(finalI)); }).start(); } } }

BlockingQueue阻塞队列

BlockingQueue<String> bq = new ArrayBlockingQueue<>(3); //当超出指定长度,会报错 bq.add("a"); //当队列中没有元素会报错 bq.remove(); //会返回boolean值,成功为true,超出长度为false bq.offer("a"); //当元素无法添加,超过指定元素后停止阻塞 bq.offer(e, longtime, TimeUnit.SECONDS); //返回队列第一个值,没有则为null bq.poll(); //当元素无法移除,超过指定元素后停止阻塞 bq.poll(e, longtime, TimeUnit.SECONDS); //超出长度会阻塞 bq.put("e"); //超出长度会阻塞 bq.take();

线程池

线程池的优势

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
//创建固定数量的线程,总共就只有这几个线程处理 ExecutorService executorService = Executors.newFixedThreadPool(3); // 只创建一个线程,所有任务都这一个线程执行 ExecutorService executorService = Executors.newSingleThreadExecutor(); //一池多线程,有空闲则由空闲线程处理,无空闲则创建新的线程处理 ExecutorService executorService = Executors.newCachedThreadPool();

线程池常用的七个参数,重要的三个:

ThreadPoolExecutor 3 个最重要的参数)(先判断核心线程数-->然后判断队列,队列满了-->判断最大容量)

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。(即使这些线程处于空闲状态,这些线程也不会被销毁)
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数如果达到的话,新任务就会被存放在队列中

ThreadPoolExecutor其他常见参数:

  1. keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁; unit : keepAliveTime 参数的时间单位。
  2. threadFactory :executor 创建新线程的时候会用到。
  3. handler :饱和策略。关于饱和策略下面单独介绍一下。

线程池的饱和策略

  • AbortPolicy(默认):直接抛出RejectedExcutionException异常阻止系统正常运行
  • CallerRunsPolicy:调用者运行一种调节机制,该策略不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务
  • DiscardPolicy:该策略默默地抛弃无法处理的任务,不予处理也不抛出异常,如果允许任务丢失,这是最好的一种策略