Skip to main content

并发编程容器和工具

ConcurrentHashMap

JDK1.7

容器实体

// 每个Segment里面都有一个 Node<K,V>[] table
final Segment<K,V>[] segments;

segment相关

Segment本身就是一把锁,并且组合了table属性
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// ...
transient volatile HashEntry<K,V>[] table;
}
获取锁的方法
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
// 自适应编程,计数尝试次数
int retries = -1; // negative while locating node
// 自旋
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
// 重试大于MAX_SCAN_RETRIES就阻塞
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 根据hash得到的node变了,扩容了?
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

put

    public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 通过key找到所在的段(Segment)
return s.put(key, hash, value, false);
}
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 获取锁,粒度是一段
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

JDK1.8

容器实体

    transient volatile Node<K,V>[] table;

put

    final V putVal(K key, V value, boolean onlyIfAbsent) {
// kv都不能是null
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//找key的位置
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 桶数组里没有这个key对应的hash
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// f.hash被用来存状态了? MOVED=-1
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 找到这个hash对应的节点了
else {
V oldVal = null;
// 锁node节点
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树添加key
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

扩容

1.8: 分段扩容,用ForwardingNode记录旧桶用来get,并且当其他线程遇到了ForwardingNode的时候,根据transferIndex找到下一个待扩容段

Q&A

  • 1.7和1.8的区别
    • 1.7是分段锁,用ReentrantLock。1.8的锁粒度是node节点,并且阻塞锁用的是Synchronized,并且在添加节点,修改状态的时候用了cas
  • 在调用put之后,在实际替换node的value之前,调用get,那不是获取到旧值了
    • 有可能,不提供强一致性

CopyOnWriteArrayList

容器实体:

 private transient volatile Object[] array;
  • 只在写的时候加锁
  • 写时复制的机制,修改共享数据的时候会先复制旧的数组再修改
  • 写写之间阻塞
  • 扩容:每次add的时候copy新数组容量+1
  • 适合读多写少的场景

CountDownLatch

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 CountDownLatch,初始计数为 3
CountDownLatch latch = new CountDownLatch(3);

// 创建三个工作线程
Thread thread1 = new Thread(new Worker(latch, "Thread 1"));
Thread thread2 = new Thread(new Worker(latch, "Thread 2"));
Thread thread3 = new Thread(new Worker(latch, "Thread 3"));

// 启动工作线程
thread1.start();
thread2.start();
thread3.start();

// 等待所有工作线程完成
latch.await();

System.out.println("All workers have finished!");
}
}

class Worker implements Runnable {
private final CountDownLatch latch;
private final String name;

public Worker(CountDownLatch latch, String name) {
this.latch = latch;
this.name = name;
}

@Override
public void run() {
System.out.println(name + " is working");
latch.countDown();
}
}