Java 并发详解 ⑦:AQS

1. CountDownLatch

概述: 用来控制一个线程等待多个线程

原理:
维护了一个计数器 cnt,每次调用 countDown () 方法会让计数器的值减 1,减到 0 的时候,那些因为调用 await () 方法而在等待的线程就会被唤醒

API:

  • await(): 调用 await () 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
  • await(long timeout, TimeUnit unit): 和 await () 类似,只不过等待一定的时间后 count 值还没变为 0 的话就会继续执行
  • countDown(): 将 count 值减 1

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CountdownLatchExample {

public static void main(String[] args) throws InterruptedException {
final int totalThread = 10;
CountDownLatch countDownLatch = new CountDownLatch(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("run..");
countDownLatch.countDown();
});
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
// run..run..run..run..run..run..run..run..run..run..end

场景:

  1. 启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行
  2. 实现多个线程开始执行任务的最大并行性
    CountDownLatch (1),多个线程挂起,当主线程调用 countDown () 时,多个线程同时被唤醒

不足:
CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

2. CyclicBarrier

概述: 用来控制多个线程互相等待,只有当多个线程都到达时,这些线程才会继续执行

原理:
线程执行 await () 方法之后计数器会减 1,并进行等待,直到计数器为 0,所有调用 await () 方法而在等待的线程才能继续执行

方法:

  • await()
  • await(long timeout, TimeUnit unit): 设置超时时间,超过该时间仍然还有线程还没到达屏障则忽略这些线程,将等待的线程全部释放

区别: CyclicBarrier 和 CountdownLatch 的区别是,CyclicBarrier 的计数器通过调用 reset () 方法可以 循环使用,所以它才叫做循环屏障

构造器: CyclicBarrier 有两个构造函数,其中 parties 指示计数器的初始值,barrierAction 在所有线程都到达屏障的时候会选择一个线程执行一次

1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
this(parties, null);
}

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CyclicBarrierExample {

public static void main(String[] args) {
final int totalThread = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalThread; i++) {
executorService.execute(() -> {
System.out.print("before..");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.print("after..");
});
}
executorService.shutdown();
}
}
// before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

重用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);

for (int i = 0; i < N; i++) {
new Writer(barrier).start();
}

try {
Thread.sleep(25000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("CyclicBarrier重用");

for (int i = 0; i < N; i++) {
new Writer(barrier).start();
}
}
static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
try {
//以睡眠来模拟写入数据操作
Thread.sleep(5000);
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");

cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch(BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
}
}
}

3. Phaser

main:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) {
phaser.bulkRegister(7);

for (int i = 0; i < 5; i++) {
new Thread(new Person("p" + i)).start();
}

new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();

}

Phaser:指定什么阶段做什么事

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ass MarriagePhaser extends Phaser {
// phase: 阶段编号: registeredParties: 此阶段注册人数
@Override
protected boolean onAdvance(int phase, int registeredParties) {

switch (phase) {
case 0:
// ...
return false;
case 1:
// ...
return false;
case 2:
// ...
return false;
case 3:
// ...
return true;
default:
return true;
}
}
}

run:如何到达阶段逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Person implements Runnable {
@Override
public void run() {
// doing something
// 等待所有注册的线程全部到达后执行
phaser.arriveAndAwaitAdvance();

if (name.equals("新郎") || name.equals("新娘")) {
System.out.printf("%s 洞房!\n", name);
phaser.arriveAndAwaitAdvance();
}
// 指定注销
else {
phaser.arriveAndDeregister();
//phaser.register()
}
}
}

4. ReadWriteLock

StampedLock

  • 共享锁
  • 排它锁
    1
    2
    3
    ReadWriteLock readWriteLock = new ReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    Lock writeLock = readWriteLock.writeLock();

5. Semaphore

概述: Semaphore 类似于操作系统中的信号量,可以 控制对互斥资源的访问线程数

原理:
acquire () 获取一个许可,如果没有就等待
release () 释放一个许可

构造器:

  • Semaphore(int permits): 参数 permits 表示许可数目,即同时可以允许多少线程进行访问
  • Semaphore(int permits, boolean fair): 这个多了一个参数 fair 表示是否是公平的,即等待时间越久的越先获取许可

API:

  • acquire(): 获取一个许可
  • acquire(int permits): 获取 permits 个许可
  • release(): 释放一个许可
  • release(int permits): 释放 permits 个许可

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class SemaphoreExample {
public static void main(String[] args) {
final int clientCount = 3;
final int totalRequestCount = 10;
Semaphore semaphore = new Semaphore(clientCount);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < totalRequestCount; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
System.out.print(semaphore.availablePermits() + " ");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
});
}
executorService.shutdown();
}
}
// 2 1 2 2 2 2 2 1 2 2

6. Exchanger

线程交换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
String s1 = "T1";
try {
// 阻塞等待交换,交换后才能继续执行
s1 = exchanger.exchange(s1);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" + s1);
}, "t1").start();

new Thread(() -> {
String s2 = "T2";
try {
s2 = exchanger.exchange(s2);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" + s2);
}, "t2").start();

// t1:T2
// t2:T1

7. LockSupport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Thread t = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(i);
if (i == 5) {
// 停止当前线程
LockSupport.park();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();

// 继续执行
LockSupport.unpark(t);

8. CompletableFuture

CompletableFuture 使用详解
CompletableFuture 避坑 1—— 需要自定义线程池
使用 CompletableFuture 时,那些令人头疼的问题
管理多个线程运行后返回的结果

  • allOf ():所有任务全部完成才结束
  • anyOf ():任意一个任务完成才结束
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(() -> priceOfTM())
    // 对结果进行处理
    .thenApply(String::valueOf)
    .thenApply(str -> "price " + str)
    .thenAccept(System.out::println);
    CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(() -> priceOfTB());
    CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(() -> priceOfJD());

    CompletableFuture.allOf(futureTM, futureTB, futureJD).join();
1
2
3
4
5
6
7
8
9
10
11
// 循环线程获取订单数
LocalDate now = LocalDate.now(ZoneOffset.ofHours(8));
int totalCount = 4 * 7;
List<ReserveOrderCountVO> resultList = Lists.newArrayListWithExpectedSize(totalCount);
CompletableFuture[] completableFutures = Stream.iterate(0, n -> ++n).limit(totalCount)
.map(num -> CompletableFuture.supplyAsync(() -> this.getReserveCountByDate(reserveCountQuery, now.plusDays(num)), COUNT_EXECUTOR)
.whenComplete((result, e) -> resultList.add(result)))
.toArray(CompletableFuture[]::new);

// 阻塞直到所有任务运行结束
CompletableFuture.allOf(completableFutures).join();

9. AQS 源码

Java 技术之 AQS 详解

内部维护一个 state 和一个双向线程链表

  1. ReentrantLock#lock
    1. AbstractQueuedSynchronizer#compareAndSetState:CAS 把 state 从 0 变为 1,若成功则代表拿到锁

    2. AbstractOwnableSynchronizer#setExclusiveOwnerThread:若抢到锁,则设置当前线程为独占线程

    3. AbstractQueuedSynchronizer#acquire

      1
      2
      3
      4
      5
      public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
      }
    4. ReentrantLock.NonfairSync#tryAcquire
      1. ReentrantLock.Sync#nonfairTryAcquire:

      • state 为 0,则继续调用 compareAndSetState 抢锁(尝试把 state 变为 1,成功则接着调用 setExclusiveOwnerThread)
      • state 为 > 0,且当前线程是独占访问的那个线程(说明锁重入),则 state++(此时 state 代表重入线程数)
      • 否则 TryAcquire 失败
    5. TryAcquire 失败则调用 AbstractQueuedSynchronizer#addWaiter:使用 CAS 加入链表队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// jdk 8
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// jdk 9 使用 VarHandler.set(this, pred)。
// VarHandler 内部有 CAS 的方法
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


jdk 9 使用 VarHandler.set(this, pred) 代替 node.prev = pred;,其调用 native 实现(相当于直接操纵二进制码),效率比反射高
VarHandler 指向一个变量

     1. AbstractQueuedSynchronizer#acquireQueued:加入队列后不断监听前一个节点,若前节点为头结点(已拿到锁),则试图去抢锁,成功则返回 false(不中断)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}