详解ConcurrentHashMap源码

2023-09-09 八股文

点击进入官方文档 (opens new window)

# 详解put源码,探究CAS和synchronized何时使用

点击展开源码,源码已添加注释
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 1.要求key和value都不能为空
    if (key == null || value == null) throw new NullPointerException();
    // 2.混乱Hash,减少Hash碰撞
    int hash = spread(key.hashCode());
    int binCount = 0; // 记录添加元素的链表长度

    // 无限循环遍历哈希表中的桶
    for (ConcurrentHashMap.Node<K, V>[] tab = table; ; ) {
        ConcurrentHashMap.Node<K, V> f; // 当前桶中的第一个节点
        int n, i, fh; // 第一个节点f,n是哈希表的长度,当前桶的索引i

        // 3.如果哈希表为空,则进行初始化
        if (tab == null || (n = tab.length) == 0) {
            tab = initTable();
        } else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 4.如果当前桶为空,则尝试使用CAS操作将新节点添加到当前桶中
            if (casTabAt(tab, i, null, new ConcurrentHashMap.Node<K, V>(hash, key, value, null))) {
                break; // 添加成功,跳出循环,不需要加锁
            }
        } else if ((fh = f.hash) == MOVED) {
            // 如果当前桶的第一个节点哈希值为MOVED,表示正在进行扩容操作,则帮助进行扩容操作
            tab = helpTransfer(tab, f);
        } else {
            V oldVal = null;
            // 对当前桶的第一个节点加锁
            synchronized (f) {
                // 再次检查当前桶的第一个节点是否发生变化
                if (tabAt(tab, i) == f) {
                    // 如果当前桶的第一个节点哈希值大于等于0,表示是普通节点
                    if (fh >= 0) {
                        binCount = 1; // 初始化链表长度为1
                        for (ConcurrentHashMap.Node<K, V> e = f; ; ++binCount) { // 遍历链表
                            K ek;
                            // 如果找到了键相同的节点
                            if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                oldVal = e.val; // 保存旧的值
                                if (!onlyIfAbsent) {
                                    e.val = value; // 如果不是插入新值,则更新值
                                }
                                break;
                            }
                            ConcurrentHashMap.Node<K, V> pred = e;
                            if ((e = e.next) == null) { // 遍历到链表尾部,将新节点添加到链表尾部
                                pred.next = new ConcurrentHashMap.Node<K, V>(hash, key, value, null);
                                break;
                            }
                        }

                    } else if (f instanceof ConcurrentHashMap.TreeBin) {
                        // 如果当前桶的第一个节点是红黑树节点
                        ConcurrentHashMap.Node<K, V> p;
                        binCount = 2; // 初始化链表长度为2
                        if ((p = ((ConcurrentHashMap.TreeBin<K, V>) f).putTreeVal(hash, key,
                                                                                  value)) != null) { // 在红黑树中添加或更新节点,并返回旧的节点
                            oldVal = p.val; // 保存旧的值
                            if (!onlyIfAbsent)
                                p.val = value; // 如果不是插入新值,则更新值
                        }
                    }
                }
            }
            // 如果链表长度不为0,表示成功添加或更新节点
            if (binCount != 0) {
                // 如果链表长度大于等于树化阈值,则将链表转化为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal; // 返回旧的值
                break;
            }
        }
    }
    addCount(1L, binCount); // 增加元素数量
    return null;
}

执行步骤如下,观察流程图辅助理解:

  1. 如果传递null,抛出空指针异常
  2. 混乱Hash,减少Hash碰撞
  3. 无限循环,不断获取最新桶中数据
  4. 如果当前桶是null,则初始化
  5. 如果桶不是null但没有数据,尝试使用CAS插入数据,如果插入成功,则直接结束循环
  6. 如果桶内有数据且正在进行扩容则辅助进行扩容操作,完成后结束当前循环,获取最新桶内数据进行下次循环
  7. 这里就相当于进行和HashMap相似的put操作
    1. 对桶内第一个节点加锁,如果没有获取到进入阻塞状态,等待其他线程处理完毕
    2. 保险操作,再次检查第一个节点是否发生变化
    3. 如果是普通节点,则遍历直接插入到链表尾部
    4. 如果是红黑树则使用红黑树的插入方法
    5. 最后判断如果链表长度>8,则转换为红黑树
  8. 更新当前Map元素数量

ConcurrentHashMap底层之put原理

总结

在桶内无元素时尝试使用CAS添加第一个节点,每次对桶进行put时都锁住第一个节点。

# ConcurrentHashMap在哪些地方做了并发控制?

ConcurrentHashMap是通过synchronized和CAS自旋保证的线程安全,要想知道ConcurrentHashMap是如何加锁的,就要知道HashMap在哪些地方会导致线程安全问题,如初始化桶数组阶段和设置桶,插入链表,树化等阶段,都会有并发问题。

# 初始化桶阶段

如果在此阶段不做并发控制,那么极有可能出现多个线程都去初始化桶的问题,导致内存浪费,Map在此处采用自旋操作和CAS操作:如果此时没有线程初始化,则去初始化,否则当前线程让出CPU时间片,等待下一次唤醒

while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
       // 如果sizeCtl < 0,表示有其他线程正在初始化,当前线程让出处理器
        Thread.yield(); 
    else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
        try {
            // 再次检查table的值
            if ((tab = table) == null || tab.length == 0) {
                // 进行哈希表的初始化操作……
            }
        } finally {
            // 恢复sizeCtl的值,以便其他线程也能参与初始化竞争
            sizeCtl = sc;
        }
        break; // 跳出循环,初始化成功
    }
}

# put阶段

  1. 如果hash后发现桶中没有值,则会直接采用CAS插入并且返回
  2. 如果发现桶中有值,则对流程按照当前的桶节点为维度进行加锁,将值插入链表或者红黑树中,源码如下:
// 省略....
// 如果当前桶节点为null,直接CAS(casTabAt方法)插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
        break;
}
// 省略....
// 如果桶节点不为空,则对当前桶进行加锁,(只需要对第一个节点加锁即可)
else {
    V oldVal = null;
    synchronized (f) {
    }
}

# 扩容阶段

ConcurrentHashMap并没有一味的通过CAS或者锁去限制多线程,在扩容阶段,ConcurrentHashMap就通过多线程来加加速扩容。

  1. 通过CPU核数为每个线程计算划分任务,每个线程最少的任务是迁移16个桶
  2. 将当前桶扩容的索引transferIndex赋值给当前线程,如果索引小于0,则说明扩容完毕,结束流程
  3. 否则再将当前线程扩容后的索引赋值给transferIndex,譬如,如果transferIndex原来是32,那么赋值之后transferIndex应该变为16,这样下一个线程就可以从16开始扩容了。这里有一个小问题,如果两个线程同时拿到同一段范围之后,该怎么处理?答案是ConcurrentHashMap会通过CAS对transferIndex进行设置,只可能有一个成功,所以就不会存在上面的问题
  4. 之后就可以对真正的扩容流程进行加锁操作了
上次更新: 5 个月前