ConcurrentHashMap与CAS

    |     2017年8月5日   |   多线程编程   |     0 条评论   |    2169

一、CAS:Compare and Swap,比较并交换。

1. 悲观锁和乐观锁

java.util.concurrent包中借助CAS实现了区别于synchronouse同步锁的一种乐观锁,乐观锁用到的机制就是CAS.

在JDK 5之前Java语言是靠synchronized关键字保证同步的,这会导致以下问题:

(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。

(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。

(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。

volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。

独占锁是一种悲观锁,synchronized就是一种独占锁,会导致其它所有需要锁的线程挂起,等待持有锁的线程释放锁。而另一个更加有效的锁就是乐观锁。所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。乐观锁用到的机制就是CAS,Compare and Swap。

2. CAS的理解

我们都知道,在java语言之前,并发就已经广泛存在并在服务器领域得到了大量的应用。所以硬件厂商老早就在芯片中加入了大量直至并发操作的原语,从而在硬件层面提升效率。在intel的CPU中,使用cmpxchg指令。

在Java发展初期,java语言是不能够利用硬件提供的这些便利来提升系统的性能的。而随着java不断的发展,Java本地方法(JNI)的出现,使得java程序越过JVM直接调用本地方法提供了一种便捷的方式,因而java在并发的手段上也多了起来。而在Doug Lea提供的cucurenct包中,CAS理论是它实现整个java包的基石。

CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该 位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前 值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”

通常将 CAS 用于同步的方式是从地址 V 读取值 A,执行多步计算来获得新 值 B,然后使用 CAS 将 V 的值从 A 改为 B。如果 V 处的值尚未同时更改,则 CAS 操作成功。

类似于 CAS 的指令允许算法执行读-修改-写操作,而无需害怕其他线程同时 修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法 可以对该操作重新计算。

3. CAS应用

CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

非阻塞算法 (nonblocking algorithms)

一个线程的失败或者挂起不应该影响其他线程的失败或挂起的算法。

现代的CPU提供了特殊的指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而 compareAndSet() 就用这些代替了锁定。

拿出AtomicInteger来研究在没有锁的情况下是如何做到数据正确性的。

private volatile int value;

首先毫无以为,在没有锁的机制下可能需要借助volatile原语,保证线程间的数据是可见的(共享的)。

这样才获取变量的值的时候才能直接读取。

public final int get() {
        return value;
    }

然后来看看++i是怎么做到的。

public final int incrementAndGet() {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

在这里采用了CAS操作,每次从内存中读取数据然后将此数据和+1后的结果进行CAS操作,如果成功就返回结果,否则重试直到成功为止。

而compareAndSet利用JNI来完成CPU指令的操作。

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

整体的过程就是这样子的,利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法。其它原子操作都是利用类似的特性完成的。

二、ConcurrentHashMap

并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求.

1. 设计原理:

JDK7中实现
ConcurrentHashMap采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计,分段锁大大的提高了高并发环境下的处理能力。

JDK8中的实现
它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想(JDK7与JDK8中HashMap的实现),但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。

2. 实例-并发情况下字符串统计-应用于接口限流

说到并发的字符串统计,立即让人联想到的数据结构便是ConcurrentHashpMap<String,Long> urlCounter;
如果你刚刚接触并发可能会写出如代码清单1的代码

代码清单1:

public class CounterDemo1 {

    private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();

    //接口调用次数+1
    public long increase(String url) {
        Long oldValue = urlCounter.get(url);
        Long newValue = (oldValue == null) ? 1L : oldValue + 1;
        urlCounter.put(url, newValue);
        return newValue;
    }

    //获取调用次数
    public Long getCount(String url){
        return urlCounter.get(url);
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        final CounterDemo1 counterDemo = new CounterDemo1();
        int callTime = 100000;
        final String url = "http://localhost:8080/hello";
        CountDownLatch countDownLatch = new CountDownLatch(callTime);
        //模拟并发情况下的接口调用统计
        for(int i=0;i<callTime;i++){
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    counterDemo.increase(url);
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.shutdown();
        //等待所有线程统计完成后输出调用次数
        System.out.println("调用次数:"+counterDemo.getCount(url));
    }
}

console output:
调用次数:96526

都说concurrentHashMap是个线程安全的并发容器,所以没有显示加同步,实际效果呢并不如所愿。

问题就出在increase方法,concurrentHashMap能保证的是每一个操作(put,get,delete…)本身是线程安全的,但是我们的increase方法,对concurrentHashMap的操作是一个组合,先get再put,所以多个线程的操作出现了覆盖。如果对整个increase方法加锁,那么又违背了我们使用并发容器的初衷,因为锁的开销很大。我们有没有方法改善统计方法呢?
代码清单2罗列了concurrentHashMap父接口concurrentMap的一个非常有用但是又常常被忽略的方法。

代码清单2:

/**
 * Replaces the entry for a key only if currently mapped to a given value.
 * This is equivalent to
 *
 * if (map.containsKey(key) && Objects.equals(map.get(key), oldValue)) {
 *   map.put(key, newValue);
 *   return true;
 * } else
 *   return false;
 * }
 *
 * except that the action is performed atomically.
 */
boolean replace(K key, V oldValue, V newValue);

这其实就是一个最典型的CAS操作,except that the action is performed atomically.这句话真是帮了大忙,我们可以保证比较和设置是一个原子操作,当A线程尝试在increase时,旧值被修改的话就回导致replace失效,而我们只需要用一个循环,不断获取最新值,直到成功replace一次,即可完成统计。

改进后的increase方法如下

代码清单3:

public long increase2(String url) {
        Long oldValue, newValue;
        while (true) {
            oldValue = urlCounter.get(url);
            if (oldValue == null) {
                newValue = 1l;
                //初始化成功,退出循环
                if (urlCounter.putIfAbsent(url, 1l) == null)
                    break;
                //如果初始化失败,说明其他线程已经初始化过了
            } else {
                newValue = oldValue + 1;
                //+1成功,退出循环
                if (urlCounter.replace(url, oldValue, newValue))
                    break;
                //如果+1失败,说明其他线程已经修改过了旧值
            }
        }
        return newValue;
    }

console output:
调用次数:100000

再次调用后获得了正确的结果,上述方案看上去比较繁琐,因为第一次调用时需要进行一次初始化,所以多了一个判断,也用到了另一个CAS操作putIfAbsent,他的源代码描述如下

    /** If the specified key is not already associated
     * with a value, associate it with the given value.
     * This is equivalent to
     *
     * if (!map.containsKey(key))
     *   return map.put(key, value);
     * else
     *   return map.get(key);
     * }

简单翻译如下:“如果(调用该方法时)key-value 已经存在,则返回那个 value 值。如果调用时 map 里没有找到 key 的 mapping,返回一个 null 值”。值得注意点的一点就是concurrentHashMap的value是不能存在null值的。实际上呢,上述的方案也可以把Long替换成AtomicLong,可以简化实现, ConcurrentHashMap

private AtomicLongMap<String> urlCounter3 = AtomicLongMap.create();

public long increase3(String url) {
    long newValue = urlCounter3.incrementAndGet(url);
    return newValue;
}

public Long getCount3(String url) {
    return urlCounter3.get(url);

AtomicLong: 在Java并发包中有这样一个包,java.util.concurrent.atomic,该包是对Java部分数据类型的原子封装,在原有数据类型的基础上,提供了原子性的操作方法,保证了线程安全。

参考:
深入理解乐观锁与悲观锁
深入并发包-ConcurrentHashMap
ConcurrentHashMap总结
java并发实践–ConcurrentHashMap与CAS

转载请注明来源:ConcurrentHashMap与CAS
回复 取消