Java 多线程进阶-并发协作控制

2021-02-09 16:18

阅读:504

标签:控制   并发   这一   结果   可重入锁   执行   取出   自己   reading   

Java 多线程进阶-并发协作控制

  • 线程协作对比

    • Thread/Executor/Fork-Join
      • 线程启动, 运行, 结束.
      • 线程之间缺少协作.
    • synchronized 同步
      • 互斥, 限定只有一个线程才能进入关键区.
      • 简单粗暴, 性能损失有点大>_.
  • Lock 锁

    • Lock 也可以实现同步的效果
      • 实现更复杂的临界区结构.
      • tryLock 方法可以预判锁是否空闲.
      • 允许分离读写的操作, 多读单写.
      • 性能更好.
    • ReentrantLock 类, 可重入的互斥锁.
    • RenntrantReadWriteLock 类, 可重入的读写锁.
    • tryLock()/lock()/unlock() 函数.
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.locks.ReentrantLock;
        import java.util.concurrent.locks.ReentrantReadWriteLock;
    
        public class LockExample {
            private static final ReentrantLock queueLock111 = new ReentrantLock(); // 可重入锁
            private static final ReentrantReadWriteLock orderLock111 = new ReentrantReadWriteLock(); // 可重入读写锁
    
            /**
             * 学校门口有家奶茶店, 学生们点单有时需要排队
             * 1. 买奶茶
             * 假设想买奶茶的同学如果看到需要排队, 就决定不买了
             * (一次只有一个买)
             * 

    * 2. 操作奶茶账本 * 假设奶茶店有老板和多名员工, 记录方式比较原始, 只有一个订单本 * (多个读, 一个写) * 老板负责写新订单, 员工不断查看订单本得到信息来制作奶茶, 在老板写新订单的时候员工不能查看订单本 * (写时, 不能读) * 多个员工可以同时查看订单本, 此时老板不能写新订单 * (读时, 不能写) * * @param args 1 */ public static void main(String[] args) throws InterruptedException { // 1. 买奶茶的例子 buyMilkTea(); // 使用可重入锁 // 2. 操作奶茶账本的例子 handleOrder(); // 使用读写锁 } public static void buyMilkTea() throws InterruptedException { LockExample lockExample = new LockExample(); int STUDENTS_COUNT = 10; Thread[] students = new Thread[STUDENTS_COUNT]; for (int i = 0; i " + Thread.currentThread().getName() + " : 来一杯珍珠奶茶, 不要珍珠"); flag = false; queueLock111.unlock(); } else { System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 再等等"); } if (flag) { Thread.sleep(1000); } } } /** * 处理订单 */ static void handleOrder() { LockExample lockExample = new LockExample(); Thread boss = new Thread(() -> { while (true) { try { lockExample.addOrder(); // 老板加新单子 long waitingTime = (long) (Math.random() * 1000); Thread.sleep(waitingTime); } catch (InterruptedException e) { e.printStackTrace(); } } }); boss.start(); int workerCount = 3; Thread[] workers = new Thread[workerCount]; for (int i = 0; i " + "老板新加一个订单"); orderLock111.writeLock().unlock(); } /** * 查看订单本 */ private void viewOrder() throws InterruptedException { orderLock111.readLock().lock(); // readLock 读锁, 可以多个线程共享(同时访问) long readingTime = (long) (Math.random() * 500); Thread.sleep(readingTime); System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 查看了订单本"); orderLock111.readLock().unlock(); } }

  • Semaphore 信号量

    • 由1965年Dijkstra提出的.
    • 信号量: 本质上是一个计数器.
    • 计数器大于0, 可以使用, 等于0不能使用.
    • 可以设置多个并发量, 例如限制10个访问.
    • Semaphore
      • acquire 获取.
      • release 释放.
    • 比 Lock 更进一步, 可以控制多个同时访问关键区.
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.Semaphore;
        import java.util.concurrent.TimeUnit;
    
        /**
         * 控制同时访问代码块的线程数
         * 现在有一个地下车库, 共有5个车位, 有10辆车需要停放, 每次停放时, 去申请信号量
         */
        public class SemaphoreExample {
            private final Semaphore placeSemaphore = new Semaphore(5);
    
            public static void main(String[] args) throws InterruptedException {
                SemaphoreExample example = new SemaphoreExample();
    
                int tryToParkCount = 10;
                Thread[] parkers = new Thread[tryToParkCount];
                for (int i = 0; i  " + Thread.currentThread().getName() + " : 停车成功!");
                    return true;
                } else {
                    System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 没有空位");
                    return false;
                }
            }
    
            private void leaving() {
                placeSemaphore.release();
                System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 开走了");
            }
        }
    
  • Latch 等待锁

    • 是一个同步辅助类.
    • 用来同步执行任务的一个或多个线程
    • 不是用来保护临界区或共享资源
    • CountDownLatch
      • countDown() 计数减一.
      • await() 等待 latch 变成 0.
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.CountDownLatch;
    
        /**
         * 设想百米赛跑, 发令枪发出信号后选手开始跑, 全部选手跑到终点后比赛结束.
         */
        public class CountDownLatchExample {
            public static void main(String[] args) throws InterruptedException {
                CountDownLatch startSignal = new CountDownLatch(1);
                CountDownLatch doneSignal = new CountDownLatch(10);
    
                int runnerCount = 10; // 选手数量
                for (int i = 0; i  " + "准备就绪..");
                startSignal.countDown(); // let all threads proceed
                System.out.println(LocalDateTime.now() + " => " + "比赛开始!");
                doneSignal.await(); // wait for all threads to finish
                System.out.println(LocalDateTime.now() + " => " + "比赛结束!");
            }
        }
    
        class Runner implements Runnable {
            private final CountDownLatch startSignal;
            private final CountDownLatch doneSignal;
    
            Runner(CountDownLatch startSignal, CountDownLatch doneSignal) {
                this.startSignal = startSignal;
                this.doneSignal = doneSignal;
            }
    
            @Override
            public void run() {
                try {
                    startSignal.await();
                    doWork();
                    doneSignal.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            private void doWork() throws InterruptedException {
                long time = (long) (Math.random() * 10 * 1000);
                Thread.sleep(time); // 随机在十秒内跑完
                System.out.printf(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 跑完全程, 用时 %d 秒 \n", time/1000);
            }
        }
    
  • Barrier/?b?ri?r/ n.障碍物

    • 集合点, 也是一个同步辅助类
    • 允许多个线程在某一个点上进行同步
    • CyclicBarrier
      • 构造函数是需要同步的线程数量.
      • await 等待其他线程, 到达数量后就放行.
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.BrokenBarrierException;
        import java.util.concurrent.CyclicBarrier;
    
        /**
         * Barrier/?b?ri?r/ n.障碍物
         * 假定有三行数字, 用三个线程分别计算每一行的和, 最后计算综合
         */
        public class BarrierExample {
            public static void main(String[] args) {
                int rowCount = 3;
                int colCount = 5;
                final int[][] numbers = new int[rowCount][colCount];
                final int[] results = new int[rowCount];
                numbers[0] = new int[]{1, 2, 3, 4, 5};
                numbers[1] = new int[]{6, 7, 8, 9, 10};
                numbers[2] = new int[]{11, 12, 13, 14, 15};
    
                CalcFinalSum111 finalResult = new CalcFinalSum111(results);
                CyclicBarrier cyclicBarrier = new CyclicBarrier(rowCount, finalResult);
                // 当有3个线程在 barrier上await时, 就执行最终计算
    
                for (int i = 0; i  " + Thread.currentThread().getName() + " : 计算第" + (rowNumber + 1) + "行结束, 结果为: " + sum);
                    barrier.await(); // 等待! 只要超过(Barrier的构造参数填入的数量)的个数, 就放行
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    
        class CalcFinalSum111 implements Runnable {
            final int[] eachRowResult;
            int finalResult;
    
            CalcFinalSum111(int[] eachRowResult) {
                this.eachRowResult = eachRowResult;
            }
    
            @Override
            public void run() {
                int sum = 0;
                for (int data : eachRowResult) {
                    sum += data;
                }
                finalResult = sum;
                System.out.println(LocalDateTime.now() + " => " + "最终结果为: " + finalResult);
            }
        }
    
  • Phaser 阶段性控制多个线程

    • 允许执行并发多阶段任务, 同步辅助类.
    • 在每一个阶段结束的位置对线程进行同步, 当所有的线程都到达这一步, 再进行下一步.
    • Phaser
      • arrive()
      • arriveAndAwaitAdvance()
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.concurrent.Phaser;
    
        /**
         * 假设举行考试, 总共三道大题, 每次下发一道题目, 等所有学生都完成之后再进行下一道题
         */
        public class PhaserExample {
            public static void main(String[] args) {
                int studentCount = 5;
                Phaser phaser = new Phaser(studentCount);
    
                for (int i = 0; i  " + Thread.currentThread().getName() + "开始答第" + i + "题");
                long thinkingTime = (long) (Math.random() * 1000);
                Thread.sleep(thinkingTime); // 模拟学生答题时间
                System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "第" + i + "道题答题结束");
            }
        }
    
  • Exchanger 两个线程间交换数据

    • 允许在并发线程中互相交换消息.
    • 允许在2个线程中定义同步点, 当两个线程都到达同步点, 他们交换数据结构
    • Exchanger
      • exchange(), 线程双方互相交互数据
      • 交换数据是双向的
        package concurrentDemo0421;
    
        import java.time.LocalDateTime;
        import java.util.Scanner;
        import java.util.concurrent.Exchanger;
    
        /**
         * 通过Exchanger实现学生成绩查询, 两个线程间简单的数据交换,
         * 把自己线程的内容输出给另一个线程(只能简单的双向传送, 不能向MPI一样随意点对点的传输, 线程1给线程3 线程3向线程2...这样)
         */
        public class ExchangerExample {
            public static void main(String[] args) throws InterruptedException {
                Exchanger exchanger = new Exchanger();
                BackgroundWorker111 backgroundWorker111 = new BackgroundWorker111(exchanger);
                new Thread(backgroundWorker111).start();
    
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    System.out.println(LocalDateTime.now() + " => " + "请输入要查询的学生名字:");
                    String input = scanner.nextLine().trim();
                    exchanger.exchange(input);
                    String exResult = exchanger.exchange(null); // 拿到线程反馈的结果
                    // 当两个线程都同时执行到同一个exchanger.exchange()方法, 两个线程就互相交换数据, 交换是双向的.
                    if ("exit".equals(exResult)) {
                        System.out.println(LocalDateTime.now() + " => " + "退出查询~");
                        break;
                    }
                    System.out.println(LocalDateTime.now() + " => " + "查询结果: " + exResult);
                }
            }
        }
    
        class BackgroundWorker111 implements Runnable {
            final Exchanger exchanger;
    
            BackgroundWorker111(Exchanger exchanger) {
                this.exchanger = exchanger;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        String item = exchanger.exchange(null);
                        switch (item) {
                            case "zhangsan":
                                exchanger.exchange("90");
                                break;
                            case "lisi":
                                exchanger.exchange("80");
                                break;
                            case "wangwu":
                                exchanger.exchange("70");
                                break;
                            case "exit":
                                exchanger.exchange("exit");
                                return; // 退出run, 即结束当前线程
                            default:
                                exchanger.exchange("no body!");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
  • 总结

    • java.util.concurrent 包提供了很多并发编程的控制协作类.
    • 根据业务特点, 使用正确的线程并发控制协作.

Java 多线程进阶-并发协作控制

标签:控制   并发   这一   结果   可重入锁   执行   取出   自己   reading   

原文地址:https://www.cnblogs.com/sweetXiaoma/p/12749714.html


评论


亲,登录后才可以留言!