Java并发编程系列23 | 循环屏障CyclicBarrie

2021-03-15 04:36

阅读:696

标签:ext   command   一个   参考资料   java并发   重排序   for   源码   jdk8   

Java并发编程系列23 | 循环屏障CyclicBarrier
收录于话题
#进阶架构师 | 并发编程专题
12个

技术图片
技术图片
本篇介绍第二个并发工具类CyclicBarrier,CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier),分以下部分介绍:
CyclicBarrier的使用
CyclicBarrier与CountDownLatch比较
CyclicBarrier源码解析

1. CyclicBarrier的使用


CyclicBarrier要做的事情是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程一起执行。
1.1 API
CyclicBarrier(int parties):构造方法,parties表示拦截线程的数量。
CyclicBarrier(int parties, Runnable barrierAction) :barrierAction用于在线程到达屏障时优先执行b,用于处理更加复杂的业务场景。
await():将当前线程阻塞,等到所有的线程都到达指定的临界点后一起执行。
getNumberWaiting():获取当前有多少个线程阻塞等待在临界点上。
reset():将屏障重置为初始状态。
1.2 使用举例
举个例子说明CyclicBarrier的使用:8个运动员参加比赛,运动员可能到达赛场的时间不一样,要等8个运动员到齐了才开始比赛,代码如下:


public class CyclicBarrierTest {
    private static CyclicBarrier barrier = new CyclicBarrier(8, () -> {
        System.out.println("所有运动员入场,裁判员一声令下!!!");
    });

    public static void main(String[] args) {
        System.out.println("运动员准备进场,全场欢呼......");

        for (int i = 0; i 

输出结果:

运动员准备进场,全场欢呼......
Thread-0 运动员到达起点,准备好了!!!
Thread-2 运动员到达起点,准备好了!!!
Thread-4 运动员到达起点,准备好了!!!
Thread-1 运动员到达起点,准备好了!!!
Thread-3 运动员到达起点,准备好了!!!
Thread-5 运动员到达起点,准备好了!!!
Thread-6 运动员到达起点,准备好了!!!
Thread-7 运动员到达起点,准备好了!!!
所有运动员入场,裁判员一声令下!!!
Thread-7 运动员出发!!!
Thread-0 运动员出发!!!
Thread-1 运动员出发!!!
Thread-4 运动员出发!!!
Thread-2 运动员出发!!!
Thread-6 运动员出发!!!
Thread-5 运动员出发!!!
Thread-3 运动员出发!!!

2. 与CountDownLatch比较


CountDownLatch用于一个线程等待若干个其他线程执行完任务之后才执行,强调一个线程等待,这个线程会阻塞。而CyclicBarrier用于一组线程互相等待至某个状态,然后这一组线程再同时执行,强调的是多个线程互等,这多个线程阻塞,等大家都完成,再携手共进。
CountDownLatch是不能复用的,而CyclicLatch是可以复用的。使用reset()方法将屏障重置为初始状态之后就可以复用。
CyclicBarrier提供了更多的方法,能够通过getNumberWaiting()获取阻塞线程的数量,通过isBroken()方法可以知道阻塞的线程是否被中断。

3. 源码分析


CyclicBarrier是通过Lock的Condition实现的,每个CyclicBarrier对应个Lock锁和该锁的condition条件。
创建CyclicBarrier时设置一个count计数,当调用await()时做两件事:①将count-1 ②将线程阻塞并构造成结点加入condition条件队列。
当count变为0时,达到等待线程数量要求,condition将条件队列中的线程全部唤醒。
3.1 类结构

public class CyclicBarrier {
    private static class Generation { // 内部类,当有parties个线程到达barrier就会更新换代
        boolean broken = false; // 是否损坏
    }
    private final ReentrantLock lock = new ReentrantLock(); // 重入锁
    private final Condition trip = lock.newCondition();
    private final int parties;                              // 等待线程总数量
    private final Runnable barrierCommand;                  // 达到等待线程数量后执行的线程
    private Generation generation = new Generation();       // 当有parties个线程到达barrier,就会更新换代
    private int count;                                      // 记录当前线程数量
}

3.2 构造方法
将parties设置为count值,设置达到等待线程数量后优先执行的线程

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties 

3.3 await()
await()方法:
①将count-1
②将线程阻塞并构造成结点加入condition条件队列。
③当count变为0时,condition将条件队列中的线程全部唤醒。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier(); // 代失效,唤醒所有线程
            throw new InterruptedException();
        }

        int index = --count; // 计数
        if (index == 0) { // 达到要求数量
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run(); // 达到等待线程数量后执行barrierCommand
                ranAction = true;
                nextGeneration(); // 唤醒本代所有线程,生成新一代,重置count
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 线程数量未达到要求数量,将线程挂起等待
        for (;;) {
            try {
                if (!timed)
                    trip.await(); // 将线程加入condition队列挂起
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && !g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 特殊情况处理
            if (g.broken)
                throw new BrokenBarrierException();
            if (g != generation)
                return index;
            if (timed && nanos 

4. 总结


CyclicBarrier要做的事情是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
注意CyclicBarrier与CountDownLatch的区别:CountDownLatch用于一个线程等待若干个其他线程执行完任务之后才执行,而CyclicBarrier强调的是多个线程互等,等大家都完成,再携手共进。此外,CyclicBarrier功能更加强大,可以循环使用。
CyclicBarrier是通过Lock的Condition实现的,每个CyclicBarrier对应个Lock锁和该锁的condition条件。创建CyclicBarrier时设置一个count计数,当调用await()时做两件事:①将count-1 ②将线程阻塞并构造成结点加入condition条件队列。当count变为0时,达到等待线程数量要求,condition将条件队列中的线程全部唤醒。
参考资料
《Java并发编程之美》
《Java并发编程实战》
《Java并发编程的艺术》

并发系列文章汇总


【原创】01|开篇获奖感言
【原创】02|并发编程三大核心问题
【原创】03|重排序-可见性和有序性问题根源
【原创】04|Java 内存模型详解
【原创】05|深入理解 volatile
【原创】06|你不知道的 final
【原创】07|synchronized 原理
【原创】08|synchronized 锁优化
【原创】09|基础干货
【原创】10|线程状态
【原创】11|线程调度
【原创】12|揭秘 CAS
【原创】13|LockSupport
【原创】14|AQS 源码分析
【原创】15|重入锁 ReentrantLock
【原创】16|公平锁与非公平锁
【原创】17|读写锁八讲(上)
【原创】18|读写锁八讲(下)
【原创】19|JDK8新增锁StampedLock
【原创】20|StampedLock源码解析
【原创】21|Condition-Lock的等待通知

之前,给大家发过三份Java面试宝典,这次新增了一份,目前总共是四份面试宝典,相信在跳槽前一个月按照面试宝典准备准备,基本没大问题。
《java面试宝典5.0》(初中级)
《350道Java面试题:整理自100+公司》(中高级)
《资深java面试宝典-视频版》(资深)
《Java[BAT]面试必备》(资深)
分别适用于初中级,中高级,资深级工程师的面试复习。
内容包含java基础、javaweb、mysql性能优化、JVM、锁、百万并发、消息队列,高性能缓存、反射、Spring全家桶原理、微服务、Zookeeper、数据结构、限流熔断降级等等。
技术图片
获取方式:点“在看”,V信关注上述单号并回复 【面试】即可领取,更多精彩陆续奉上。

看到这里,证明有所收获
必须点个在看支持呀,喵

Java并发编程系列23 | 循环屏障CyclicBarrie

标签:ext   command   一个   参考资料   java并发   重排序   for   源码   jdk8   

原文地址:https://blog.51cto.com/15009303/2552593


评论


亲,登录后才可以留言!