JUC

1/15/2021 JUCCASAtomic

# JUC

JUC 即 (java.util.concurrent)包

# atomic包

核心是CAS,主要提供一系列原子变量更新操作,提供非阻塞算法基础。

atomic

# locks包

locks

此图ReentrantLock并不直接继承AbstractQueuedSynchronizer,而是实现Lock接口。RL内部持有的sync才直接继承AQS。

AQS相对重要,利用AQS实现的组件有:

aqs

# 并发容器

collections

# 执行框架与线程池

executor_pack

# 重要的工具类

tools

# atomic包

# AtomicStampedReference

解决ABA问题

  • 使用:
// 初始值a,版本号1       
AtomicStampedReference asr = new AtomicStampedReference("b", 1);

// 得到版本号,若要防止ABA可以将版本号附加到数据内
int stamp = asr.getStamp();
boolean b = asr.compareAndSet("b", "a", stamp, stamp + 1);
1
2
3
4
5
6
  • 原理

    先判断版本号和reference对象是否能对应,对应了才进行CAS操作

Pair<V> current = pair;
return expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
1
2
3
4
5
6

和AtomicMarkableReference类似,只是amr版本号为bool字段。

# AtomicReference

解决多个字段无法CAS更新问题

注意:比对两个对象,比对用的是==,也就是比对的是地址

# LongAdder

  • 使用
// 线程外
LongAdder longAdder = new LongAdder();
// 线程内
longAdder.add(1L);
// 线程外
long sum = longAdder.sum();
1
2
3
4
5
6
  • 原理

降低争抢。具体见基础文档

# LongAccumulator

可以理解为可以传入回调方法的LongAdder

LongAccumulator la = new LongAccumulator((x, y) -> {
            return x + y;
        }, 0);
la.accumulate(1);
la.get()
1
2
3
4
5

# locks/AQS包

# ReentrantLock

# ReentrantReadWriteLock

# StampdLock

去看基础版

# Condition

  • 使用

    1. 可以由lock.newCondition,多个condition对象可以有选择的进行await(wait)/signal(notify),灵活.
    2. 必须使用在lock/unlock之中
  • 原理

    condition的await和signal就是将node节点在这两个队列中转移的过程。

    condition的await会将线程node添加到condition队列,然后从AQS队列移除。在signal时,会将该线程node添加到AQS阻塞队列中。

# collections包

# ConcurrentLinkedQueue

ConcurrentLinkedQueue采用非阻塞方式。是一个基于链接节点的无界线程安全队列。

  • 使用
ConcurrentLinkedQueue clq = new ConcurrentLinkedQueue();
clq.offer("A");
clq.offer("B");
// 此时 [A, B]
// 出队
clq.poll();
// 看一下队头
clq.peek();
1
2
3
4
5
6
7
8
  • 原理(1.8)

    • 入队

      在这里简单说,因为我没时间了

      为什么tail节点不总是尾节点?因为clq是懒更新,可以降低CAS争抢。

      如果tail->next不为空,则入队节点会被设置为tail;

      如果为空,则将当前节点设置为tail->next;

      初始情况添加第一个元素后,tail和head相同,且next都是第一个元素。

    • 出队。也不是每次都更新head,head有才弹出...

# offer 和 add 区别

offer 在越界时返回false; add抛异常;

无界队列都一样;

# CLQ和AQS区别

clq使用cas操作更新tail和head,但是为懒更新,避免竞争激烈造成一直失败。

而aqs也是先尝试cas,失败就将线程加入等待队列,避免了极端情况cas一直失败一直空转的发生。

主要原因是clq主要适用于生产消费,对于单纯队列的操作比较快。(队头出队尾入),即使高必发也很难出现cas一直失败的情况。

而aqs是锁,加锁释放锁时间可能会很长,所以cas+park更合适。

# 三个不变式

clq使用三个不变式来约束非阻塞算法正确性。

  1. 基本不变式

  2. head不变式

  3. tail不变式

# BlockingQueue

# blockingQueue的常用方法

  • add 不阻塞|队满exception

  • offer 不阻塞|带时间阻塞|队满返回false

  • put 队满阻塞|响应中断

以上都添加到队尾。

以下都移除队头。

  • remove 队空exception

  • poll 带时间阻塞|队空返null

  • take 队空阻塞|取1st|相应中断

# ArrayBlockingQueue

有界队列

  • 用法
BlockingQueue<String> bq = new ArrayBlockingQueue<>(5);
new Thread(() -> {
  try {
      Thread.sleep(1000L);
      // 阻塞
      bq.put("a");
      // offer() 可以在规定时间内重试,超过规定时间返回false
  } catch (InterruptedException e) {
      e.printStackTrace();
  }
}).start();
// 此处是可以阻塞住的
System.out.println(bq.take());
// poll() 在规定时间内重试,超过规定时间返回null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  • 原理

    内部 ReentrantLock 实现。在设置两条condition:

notEmpty = lock.newCondition();
notFull =  lock.newCondition();
1
2

# PriorityQueue

是一个基于优先堆的无界队列。并非blockingQueue。也不是线程安全的。

PriorityQueue是非线程安全的

  • 使用

    非线程安全,建议使用 PriorityBlockingQueue

  • 原理

    二叉小顶堆

# 为什么可以直接用数组来存储堆(数据结构)?

已知父节点下标a,则其左孩子下标为 a×2+1,右孩子为a×2+2。

当前节点下标为b,则父节点下标为 (b-1)/2

# PriorityBlockingQueue

  • 支持优先级的无界阻塞队列
  • 默认情况下元素采用自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。
  • 基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。

和priorityQueue区别。

  • 数组扩容采用CAS+自旋操作
  • 存offer、取take、remove都采用ReentrantLock加锁

# DelayQueue

为一个无界blockingQueue

  • 原理

    PriorityQueue实现

  • 使用

    1. 入队的元素要注意实现 Delayed 接口的 getDelay(TimeUnit) 和 compareTo(Delayed)
    2. getDelay返回触发时间和当前时间的间隔 milliseconds
    3. compareTo 对比当前元素和目标元素优先级,当前元素时间大于目标元素时间,则返回1,否则 -1/0

    其他使用等同于blockingQueue。

# SynchronousQueue

底层公平模式使用TransferQueue,也存在head/tail指针。

非公平使用TransferStack,先入栈的后匹配到。

使用在线程池中可以直接将任务交给消费者,

# CopyOnWriteArrayList

  • 原理

    所有的操作方法都需要在ReentrantLock的lock和unlock中进行。

    添加(add)/修改(set)/删除(remove)数据时,都会新建一个数组,并将原数组中的元素拷贝到新数组中,然后在新数组中更新,最后再将volatile数组引用指向新数组。

  • 缺点

    写操作较多时性能低,因为写时需要复制数组,同时可能造成gc。

    get操作不加锁,读取的是原数组的内容而非新的,所以虽然能保证最终一致,但因为set操作耗时,所以可能不能当前读。

# 1.8 ConcurrentHashMap

  • 散列表
  • 并发扩容

# 问题

  • 扩容中数据能否正常访问,怎么实现的?
  • 扩容过程中,写访问如何处理?
  • 红黑树正在自平衡,能否读?
  • jdk8中,统计当前散列表中元素个数怎么实现?
  • LastRun机制

# ConcurrentSkipListMap

  • 特性

    多线程并发存取<Key, Value>数据并且有序

  • 原理

# 线程池和JUC队列

# tools 工具包


收录时间: 2021/01/15

最后更新时间: 10/11/2021, 3:33:08 PM