Java 并发详解 ④:ReentrantLock

  • boolean tryLock(long time, TimeUnit unit): 尝试获得锁,阻塞时间不会超过给定的值;如果成功返回 true
  • void  lockInterruptibly(): 获得锁,但是会不确定地发生阻塞。如果线程被中断,抛出一个 InterruptedException 异常

1. 条件对象

作用: 条件对象用来管理那些已经进入被保护的代码段但还不能运行的线程。一个 Condition 对象为一个队列。

方法:
ReentrantLock <==> synchronized

  • await () <==> wait (): 将该线程放到条件的等待集中
  • signalAll () <==> notifyAll (): 解除该条件的等待集中的所有线程的阻塞状态
  • signal () <==> notify (): 从该条件的等待集中随机地选择一个线程,解除其阻塞状态
1
2
3
4
5
6
7
8
9
10
Condition sufficientFunds = bankLock.newCondition();
// 当前线程不满足条件,进入该条件的等待集,放弃锁
while (!(ok to proceed)) {
sufficientFunds.await();
}
// do something

// 其他线程的操作应使该线程重新判断条件。
// 重新激活因为这一条件而等待的所有线程。
sufficientFunds.signalAll();

可以创建多个不同的 Condition,实现不同的等待队列

1
2
3
4
5
6
7
8
9
Condition producer = lock.newCondition();
Condition comsumer = lock.newCondition();


while (isProducer) {
comsumer.await();
}

producer.signalAll();

2. 读写锁

  • Lock readLock (): 得到一个可以被多个读操作共用的读锁,但会排斥所有写操作
  • Lock writeLock (): 得到一个写锁,排斥所有其他的读操作和写操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private Lock readLock = rwl.readLock();
    private Lock writeLock = rwl.writeLock();

    /**
    * 对所有的获取方法加读锁
    */
    public doule getTotalBalance() {
    readLock.lock();
    try {
    //...
    } finally {
    readLock.unlock();
    }
    }

    /**
    * 对所有的修改方法加写锁
    */
    public void transfer() {
    writeLock.lock();
    try {
    //...
    } finally {
    writeLock.unlock();
    }
    }

3. 比较

  1. 锁的实现: synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的
  2. 性能: 新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同
  3. 等待可中断: 当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情
    ReentrantLock 可中断,而 synchronized 不行
  4. 公平锁: 公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁
    synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的
  5. 锁绑定多个条件: 一个 ReentrantLock 可以同时绑定多个 Condition 对象

4. CAS

深入解析 volatile 、CAS 的实现原理
面试必问的 CAS,你懂了吗?
《面试必备之乐观锁与悲观锁》

并发策略:

  • 乐观并发策略(乐观锁): 乐观地认为本次操作没有其他线程竞争。
    先尝试进行操作,如果没有其它线程争用共享数据,那操作就成功了。否则采取补偿措施(不断地重试)直到成功为止。
  • 悲观并发策略(悲观锁): 悲观地认为本次操作有其他线程竞争。
    直接使用互斥量进行加锁阻塞。

原理: CAS 指令是原子操作,有 3 个操作数。分别是内存地址 V、旧的预期值 A 和新值 B。

当执行操作时,只有当 V 的值等于 A(即认为未被其他线程修改过),才将 V 的值更新为 B。

若失败了则重新获取这三个值再次进行判断,直到操作成功。

乐观锁适用于多读的应用类型,这样可以提高吞吐量

1. ABA 问题

如果一个变量初次读取的时候是 A 值,它的值被改成了 B,后来又被改回为 A,那 CAS 操作就会误认为它从来没有被改变过。

解决: J.U.C 包提供了一个带有标记的原子引用类 AtomicStampedReference 来解决这个问题,它可以通过控制变量值的版本来保证 CAS 的正确性。

改用传统的互斥同步可能会比原子类更高效。

2. Atomic 类

LongAdder:
思想:分段锁 CAS,分片执行,结果汇总

例:1000 个线程分为 4 个任务,每个任务 250 个线程,执行完后汇总

3. Unsafe

J.U.C 包里面的 AtomicInteger 等原子类的方法调用了 Unsafe 类的 CAS 操作。

AtomicInteger:

  1. 内部调用 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); 本地方法,compareAndSwapInt 定义在 jdk8u: unsafe.cpp 中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // AtomicInteger.class
    public final int incrementAndGet() {
    for (;;) {
    int current = get();
    int next = current + 1;
    if (compareAndSet(current, next))
    return next;
    }
    }

    public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
  2. compareAndSwapInt 调用 cmpxchg(compare and exchange)方法

    1
    2
    3
    4
    5
    6
    7
    // jdk8u: unsafe.cpp
    UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
    UnsafeWrapper("Unsafe_CompareAndSwapInt");
    oop p = JNIHandles::resolve(obj);
    jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
    return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
    UNSAFE_END
  3. cmpxchg 方法执行逻辑
    以下是 cmpxchg 在 JDK 8,Linux 操作系统,X86 处理器环境下的实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // jdk8u: atomic_linux_x86.inline.hpp
    inline jint Atomic::cmpxchg(jint exchange_value, volatile jint* dest, jint compare_value) {
    int mp = os::is_MP();
    __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
    : "=a" (exchange_value)
    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
    : "cc", "memory");
    return exchange_value;
    }


    其详细执行逻辑如下:

  4. is_MP(Multi Processor)方法判断 是否为多个处理器,保存到变量 mp 中。

    1
    2
    3
    4
    // jdk8u: os.hpp
    static inline bool is_MP() {
    return (_processor_count != 1) || AssumeMP;
    }
  5. LOCK_IF_MP

    1
    2
    // jdk8u: atomic_linux_x86.inline.hpp
    #define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
  -  如果是在多处理器上运行就为 cmpxchg 指令加上 `lock` 前缀(lock cmpxchg)。**这就使 cmpxchg 指令变成原子操作** 
  -  如果是在单处理器上运行就省略 `lock` 前缀 
  1. 执行 cmpxchgl:比较并交换,操作成功返回比较值(旧值),操作失败返回目标地址中的值

    “cmpxchgl %1,(%3)”
    : “=a” (exchange_value)
    : “r” (exchange_value), “a” (compare_value), “r” (dest), “r” (mp)
    : “cc”, “memory”

    • 输入 exchange_value(交换值,即更新值,%1)、compare_value(比较值,即期待值,%2)、dest(目标地址值,%3)、mp(是否多核,%4) 四个值
    • 输出 exchange_value(%0)


cmpxchgl %1,(%3) 即表示 cmpxchgl exchange_value,(dest)

  1. 输入:`"r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)`,代表把 compare_value 存入 eax 寄存器,把 exchange_value、dest、mp 的值存入任意的通用寄存器
  2. cmpxchgl 有个隐含操作数 eax,会先比较 eax 的值(也就是 compare_value)和 dest 地址所存的值是否相等
  3. 相等则把 exchange_value 的值写入 dest 指向的地址
  4. 不相等则把 dest 地址所存的值更新到 eax 中(因为最终输出 eax 中的值,写入 dest 的值代表更新失败)
  5. 输出:`"=a" (exchange_value)`,把 eax 中存的值写入 exchange_value 变量中。
  6. 如果输出的是比较值(说明操作成功),Unsafe_CompareAndSwapInt 方法中 `return (jint)(Atomic::cmpxchg(x, addr, e)) == e;` 会返回 `ture`,即表示 CAS 成功!否则表示 CAS失败。
  1. 最终实现 cmpxchg = cas 修改变量值
    lock 指令在执行后面指令的时候锁定一个北桥信号(不采用锁总线的方式)
    1
    lock cmpxchg 指令