Java并发包中的线程同步器
2021-05-02 04:27
标签:red tor owa semaphore wait 个数 退出 线程 shutdown 计数器 不用等到子进程全部执行完毕之后再返回 是基于AQS实现的 AQS中的state用来计数了 阻塞调用线程,当计数器值为0时,返回咯 当计数器为0时返回;或者timeoout之后 state减1 tryReleaseShared() 获取当前的state值 CountDownLatch的计数器是一次性的 CyclicBarrier:回环屏障,它可以让一组线程全部达到一个状态后再全部同时执行。这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。 创建了一个CyclicBarrier对象,其中第一个参数为计数器初始值,第二个参数Runable是当计数器值为0时需要执行的任务。 当第一个线程调用await方法时,计数器减1,第二个线程调用await方法时,减1。如果当前cyclicBarrier中的计数值不等于0时,就线程12都锁住 当cyclicBarrier的值等于0时,才会去执行线程12的任务。然后cyclicBarrier被重置。 等待,阻塞;或者异常退出 等待一定的时间后,如果没有突破屏障,也会返回 如果count == 0了,先执行CyclicBarrier自己的方法,再那唤醒所有的trip条件变量的阻塞进程, 如果count != 0 也没有设置等待,那就直接把线程放进阻塞队列,当前会被挂起并释放lock锁。如果当前线程设置了超时时间,被放进条件变量trip的阻塞队列,不过过段时间后会自动返回 CycleBarrier可以复用 内部计数器是递增,并且在一开始初始化Semaphore时可以指定一个初始值,但是并不需要直到需要同步的线程个数,而是在需要同步的地方调用acquire方法时指定需要同步的线程个数。 阻塞当前main线程,当信号量到达release和初始化值之和时,才能解封当前线程。 当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等于0,则当前线程会被放入AQS的阻塞队列。或者中断线程。 当然有公平的实现和非公平的实现; 获取多个信号量的值,满足就唤醒执行,不满足就不执行呗。 不响应中断 获取指定个数 不响应中断 增加一个信号量,就是state + 1 释放多个信号量 Semaphore的计数器是不可以自动重置的。通过变相改变acquire的参数实现CycleBarrier的功能。AQS实现。 Java并发包中的线程同步器 标签:red tor owa semaphore wait 个数 退出 线程 shutdown 原文地址:https://www.cnblogs.com/sicheng-li/p/13205515.html10.1CountDownLatch
new CountDownLatch(2)
countDownLatch.countDown(); //-1
countDownLatch.await();//当计数器为0时返回和join之间的区别
void await()方法
void await(long timeout, TimeUnit unit)
void countDown()
void getCount()
小结
CyclicBarrier回环屏障
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CycleBarrierTest {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread() + "task1 merge result");
}
});
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("one in");
try {
cyclicBarrier.await();
System.out.println("one out");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println("two in");
try {
cyclicBarrier.await();
System.out.println("two out");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
executorService.shutdown();
}
/**
* one in
* two in
* Thread[pool-1-thread-2,5,main]task1 merge result
* two out
* one out
*/
}
实现原理探究
int await()方法
Boolean await(long timeout, TimeUnit unit)
int dowait(boolean timed, long nanos)
小结
Semaphore
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread() + "start");
semaphore.release();
System.out.println(Thread.currentThread() + "over");
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread() + "start");
semaphore.release();
System.out.println(Thread.currentThread() + "over");
}
});
try {
semaphore.acquire(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("all child thread over");
executorService.shutdown();
}
/**
* Thread[pool-1-thread-1,5,main]start
* Thread[pool-1-thread-1,5,main]over
* Thread[pool-1-thread-2,5,main]start
* Thread[pool-1-thread-2,5,main]over
* all child thread over
*/
}
实现原理探究
void acquire()方法
void acquire(int permits)方法
void acquireUninterruptibly()方法
void acquireUninterruptibly(int permits)方法
void release()方法
void release(int permits)方法
小结
总结