1. CountDownLatch
概述: 用来控制一个线程等待多个线程
原理:
维护了一个计数器 cnt,每次调用 countDown () 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await () 方法而在等待的线程就会被唤醒
![]()
API:
- await(): 调用 await () 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
-  await(long timeout, TimeUnit unit): 和 await () 类似,只不过等待一定的时间后 count 值还没变为 0 的话就会继续执行
-  countDown(): 将 count 值减 1
实例:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | public class CountdownLatchExample {
 public static void main(String[] args) throws InterruptedException {
 final int totalThread = 10;
 CountDownLatch countDownLatch = new CountDownLatch(totalThread);
 ExecutorService executorService = Executors.newCachedThreadPool();
 for (int i = 0; i < totalThread; i++) {
 executorService.execute(() -> {
 System.out.print("run..");
 countDownLatch.countDown();
 });
 }
 countDownLatch.await();
 System.out.println("end");
 executorService.shutdown();
 }
 }
 
 
 | 
场景:
- 启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行 
- 实现多个线程开始执行任务的最大并行性
 CountDownLatch (1),多个线程挂起,当主线程调用 countDown () 时,多个线程同时被唤醒
不足:
CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。
2. CyclicBarrier
概述: 用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行
原理:
线程执行 await () 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await () 方法而在等待的线程才能继续执行
![]()
方法:
- await()
- await(long timeout, TimeUnit unit): 设置超时时间,超过该时间仍然还有线程还没到达屏障则忽略这些线程,将等待的线程全部释放
区别: CyclicBarrier 和 CountdownLatch 的区别是,CyclicBarrier 的计数器通过调用 reset () 方法可以 循环使用,所以它才叫做循环屏障
构造器: CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会选择一个线程执行一次
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 
 | public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) throw new IllegalArgumentException();
 this.parties = parties;
 this.count = parties;
 this.barrierCommand = barrierAction;
 }
 
 public CyclicBarrier(int parties) {
 this(parties, null);
 }
 
 | 
实例:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | public class CyclicBarrierExample {
 public static void main(String[] args) {
 final int totalThread = 10;
 CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
 ExecutorService executorService = Executors.newCachedThreadPool();
 for (int i = 0; i < totalThread; i++) {
 executorService.execute(() -> {
 System.out.print("before..");
 try {
 cyclicBarrier.await();
 } catch (InterruptedException | BrokenBarrierException e) {
 e.printStackTrace();
 }
 System.out.print("after..");
 });
 }
 executorService.shutdown();
 }
 }
 
 
 | 
重用:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 
 | public class Test {public static void main(String[] args) {
 int N = 4;
 CyclicBarrier barrier  = new CyclicBarrier(N);
 
 for (int i = 0; i < N; i++) {
 new Writer(barrier).start();
 }
 
 try {
 Thread.sleep(25000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 
 System.out.println("CyclicBarrier重用");
 
 for (int i = 0; i < N; i++) {
 new Writer(barrier).start();
 }
 }
 static class Writer extends Thread {
 private CyclicBarrier cyclicBarrier;
 public Writer(CyclicBarrier cyclicBarrier) {
 this.cyclicBarrier = cyclicBarrier;
 }
 
 @Override
 public void run() {
 System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
 try {
 
 Thread.sleep(5000);
 System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
 
 cyclicBarrier.await();
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch(BrokenBarrierException e) {
 e.printStackTrace();
 }
 System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
 }
 }
 }
 
 | 
3. Phaser
main:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | public static void main(String[] args) {phaser.bulkRegister(7);
 
 for (int i = 0; i < 5; i++) {
 new Thread(new Person("p" + i)).start();
 }
 
 new Thread(new Person("新郎")).start();
 new Thread(new Person("新娘")).start();
 
 }
 
 | 
Phaser:指定什么阶段做什么事
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 
 | public ass MarriagePhaser extends Phaser {
 @Override
 protected boolean onAdvance(int phase, int registeredParties) {
 
 switch (phase) {
 case 0:
 
 return false;
 case 1:
 
 return false;
 case 2:
 
 return false;
 case 3:
 
 return true;
 default:
 return true;
 }
 }
 }
 
 | 
run:如何到达阶段逻辑
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | public class Person implements Runnable {@Override
 public void run() {
 
 
 phaser.arriveAndAwaitAdvance();
 
 if (name.equals("新郎") || name.equals("新娘")) {
 System.out.printf("%s 洞房!\n", name);
 phaser.arriveAndAwaitAdvance();
 }
 
 else {
 phaser.arriveAndDeregister();
 
 }
 }
 }
 
 | 
4. ReadWriteLock
StampedLock
- 共享锁
- 排它锁 | 12
 3
 
 | ReadWriteLock readWriteLock = new ReadWriteLock();Lock readLock = readWriteLock.readLock();
 Lock writeLock = readWriteLock.writeLock();
 
 |  
 
5. Semaphore
概述: Semaphore 类似于操作系统中的信号量,可以 控制对互斥资源的访问线程数
原理:
acquire () 获取一个许可,如果没有就等待
release () 释放一个许可
![]()
构造器:
- Semaphore(int permits): 参数 permits 表示许可数目,即同时可以允许多少线程进行访问
-  Semaphore(int permits, boolean fair): 这个多了一个参数 fair 表示是否是公平的,即等待时间越久的越先获取许可
API:
- acquire(): 获取一个许可
-  acquire(int permits): 获取 permits 个许可
-  release(): 释放一个许可
-  release(int permits): 释放 permits 个许可
实例:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 
 | public class SemaphoreExample {public static void main(String[] args) {
 final int clientCount = 3;
 final int totalRequestCount = 10;
 Semaphore semaphore = new Semaphore(clientCount);
 ExecutorService executorService = Executors.newCachedThreadPool();
 for (int i = 0; i < totalRequestCount; i++) {
 executorService.execute(() -> {
 try {
 semaphore.acquire();
 System.out.print(semaphore.availablePermits() + " ");
 } catch (InterruptedException e) {
 e.printStackTrace();
 } finally {
 semaphore.release();
 }
 });
 }
 executorService.shutdown();
 }
 }
 
 
 | 
6. Exchanger
线程交换
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 
 | Exchanger<String> exchanger = new Exchanger<>();
 new Thread(() -> {
 String s1 = "T1";
 try {
 
 s1 = exchanger.exchange(s1);
 } catch (Exception e) {
 e.printStackTrace();
 }
 System.out.println(Thread.currentThread().getName() + ":" + s1);
 }, "t1").start();
 
 new Thread(() -> {
 String s2 = "T2";
 try {
 s2 = exchanger.exchange(s2);
 } catch (Exception e) {
 e.printStackTrace();
 }
 System.out.println(Thread.currentThread().getName() + ":" + s2);
 }, "t2").start();
 
 
 
 
 | 
7. LockSupport
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | Thread t = new Thread(() -> {for (int i = 0; i < 10; i++) {
 System.out.println(i);
 if (i == 5) {
 
 LockSupport.park();
 }
 try {
 TimeUnit.SECONDS.sleep(1);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 });
 t.start();
 
 
 LockSupport.unpark(t);
 
 | 
8. CompletableFuture
CompletableFuture 使用详解
CompletableFuture 避坑 1—— 需要自定义线程池
使用 CompletableFuture 时,那些令人头疼的问题
管理多个线程运行后返回的结果
- allOf ():所有任务全部完成才结束
-  anyOf ():任意一个任务完成才结束 | 12
 3
 4
 5
 6
 7
 8
 9
 
 | CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(() -> priceOfTM())
 .thenApply(String::valueOf)
 .thenApply(str -> "price " + str)
 .thenAccept(System.out::println);
 CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(() -> priceOfTB());
 CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(() -> priceOfJD());
 
 CompletableFuture.allOf(futureTM, futureTB, futureJD).join();
 
 |  
 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | LocalDate now = LocalDate.now(ZoneOffset.ofHours(8));
 int totalCount = 4 * 7;
 List<ReserveOrderCountVO> resultList = Lists.newArrayListWithExpectedSize(totalCount);
 CompletableFuture[] completableFutures = Stream.iterate(0, n -> ++n).limit(totalCount)
 .map(num -> CompletableFuture.supplyAsync(() -> this.getReserveCountByDate(reserveCountQuery, now.plusDays(num)), COUNT_EXECUTOR)
 .whenComplete((result, e) -> resultList.add(result)))
 .toArray(CompletableFuture[]::new);
 
 
 CompletableFuture.allOf(completableFutures).join();
 
 | 
9. AQS 源码
Java 技术之 AQS 详解
内部维护一个 state 和一个双向线程链表
![]()
![]()
- ReentrantLock#lock 
- AbstractQueuedSynchronizer#compareAndSetState:CAS 把 state 从 0 变为 1,若成功则代表拿到锁  
- AbstractOwnableSynchronizer#setExclusiveOwnerThread:若抢到锁,则设置当前线程为独占线程  
- AbstractQueuedSynchronizer#acquire  | 12
 3
 4
 5
 
 | public final void acquire(int arg) {if (!tryAcquire(arg) &&
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 selfInterrupt();
 }
 
 |  
 
- ReentrantLock.NonfairSync#tryAcquire
 1. ReentrantLock.Sync#nonfairTryAcquire:
 - 
- state 为 0,则继续调用 compareAndSetState 抢锁(尝试把 state 变为 1,成功则接着调用 setExclusiveOwnerThread)
- state 为 > 0,且当前线程是独占访问的那个线程(说明锁重入),则 state++(此时 state 代表重入线程数)
- 否则 TryAcquire 失败
 
-  TryAcquire 失败则调用 AbstractQueuedSynchronizer#addWaiter:使用 CAS 加入链表队列 
 
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 
 | private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);
 
 Node pred = tail;
 if (pred != null) {
 
 
 node.prev = pred;
 if (compareAndSetTail(pred, node)) {
 pred.next = node;
 return node;
 }
 }
 enq(node);
 return node;
 }
 private Node enq(final Node node) {
 for (;;) {
 Node t = tail;
 if (t == null) {
 if (compareAndSetHead(new Node()))
 tail = head;
 } else {
 node.prev = t;
 if (compareAndSetTail(t, node)) {
 t.next = node;
 return t;
 }
 }
 }
 }
 
 | 
jdk 9 使用 VarHandler.set(this, pred) 代替 node.prev = pred;,其调用 native 实现(相当于直接操纵二进制码),效率比反射高
VarHandler 指向一个变量  
     1. AbstractQueuedSynchronizer#acquireQueued:加入队列后不断监听前一个节点,若前节点为头结点(已拿到锁),则试图去抢锁,成功则返回 false(不中断)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | final boolean acquireQueued(final Node node, int arg) {boolean failed = true;
 try {
 boolean interrupted = false;
 for (;;) {
 final Node p = node.predecessor();
 if (p == head && tryAcquire(arg)) {
 setHead(node);
 p.next = null;
 failed = false;
 return interrupted;
 }
 if (shouldParkAfterFailedAcquire(p, node) &&
 parkAndCheckInterrupt())
 interrupted = true;
 }
 } finally {
 if (failed)
 cancelAcquire(node);
 }
 }
 
 |