死磕 java同步系列之Semaphore源码解析
2020-12-13 02:16
标签:string time 访问 更新 img hsv rup overflow 功能 (1)Semaphore是什么? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什么场景中? (4)Semaphore的许可次数是否可以动态增减? (5)Semaphore如何实现限流? Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可。 Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流。 下面我们一起来学习Java中Semaphore是如何实现的。 Semaphore中包含了一个实现了AQS的同步器Sync,以及它的两个子类FairSync和NonFairSync,这说明Semaphore也是区分公平模式和非公平模式的。 基于之前对于ReentrantLock和ReentrantReadWriteLock的分析,这篇文章相对来说比较简单,之前讲过的一些方法将直接略过,有兴趣的可以拉到文章底部查看之前的文章。 通过Sync的几个实现方法,我们获取到以下几点信息: (1)许可是在构造方法时传入的; (2)许可存放在状态变量state中; (3)尝试获取一个许可的时候,则state的值减1; (4)当state的值为0的时候,则无法再获取许可; (5)释放一个许可的时候,则state的值加1; (6)许可的个数可以动态改变; 非公平模式下,直接调用父类的nonfairTryAcquireShared()尝试获取许可。 公平模式下,先检测前面是否有排队的,如果有排队的则获取许可失败,进入队列排队,否则尝试原子更新state的值。 创建Semaphore时需要传入许可次数。 Semaphore默认也是非公平模式,但是你可以调用第二个构造方法声明其为公平模式。 下面的方法在学习过前面的内容看来都比较简单,彤哥这里只列举Semaphore支持的一些功能了。 以下的方法都是针对非公平模式来描述。 获取一个许可,默认使用的是可中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。 获取一个许可,非中断方式,如果尝试获取许可失败,会进入AQS的队列中排队。 尝试获取一个许可,使用Sync的非公平模式尝试获取许可方法,不论是否获取到许可都返回,只尝试一次,不会进入队列排队。 尝试获取一个许可,先尝试一次获取许可,如果失败则会等待timeout时间,这段时间内都没有获取到许可,则返回false,否则返回true; 释放一个许可,释放一个许可时state的值会加1,并且会唤醒下一个等待获取许可的线程。 一次获取多个许可,可中断方式。 一次获取多个许可,非中断方式。 一次尝试获取多个许可,只尝试一次。 尝试获取多个许可,并会等待timeout时间,这段时间没获取到许可则返回false,否则返回true。 一次释放多个许可,state的值会相应增加permits的数量。 获取可用的许可次数。 销毁当前可用的许可次数,对于已经获取的许可没有影响,会把当前剩余的许可全部销毁。 减少许可的次数。 (1)Semaphore,也叫信号量,通常用于控制同一时刻对共享资源的访问上,也就是限流场景; (2)Semaphore的内部实现是基于AQS的共享锁来实现的; (3)Semaphore初始化的时候需要指定许可的次数,许可的次数是存储在state中; (4)获取一个许可时,则state值减1; (5)释放一个许可时,则state值加1; (6)可以动态减少n个许可; (7)可以动态增加n个许可吗? (1)如何动态增加n个许可? 答:调用release(int permits)即可。我们知道释放许可的时候state的值会相应增加,再回头看看释放许可的源码,发现与ReentrantLock的释放锁还是有点区别的,Semaphore释放许可的时候并不会检查当前线程有没有获取过许可,所以可以调用释放许可的方法动态增加一些许可。 (2)如何实现限流? 答:限流,即在流量突然增大的时候,上层要能够限制住突然的大流量对下游服务的冲击,在分布式系统中限流一般做在网关层,当然在个别功能中也可以自己简单地来限流,比如秒杀场景,假如只有10个商品需要秒杀,那么,服务本身可以限制同时只进来100个请求,其它请求全部作废,这样服务的压力也不会太大。 使用Semaphore就可以直接针对这个功能来限流,以下是代码实现: 1、 死磕 java同步系列之开篇 2、 死磕 java魔法类之Unsafe解析 3、 死磕 java同步系列之JMM(Java Memory Model) 4、 死磕 java同步系列之volatile解析 5、 死磕 java同步系列之synchronized解析 6、 死磕 java同步系列之自己动手写一个锁Lock 7、 死磕 java同步系列之AQS起篇 8、 死磕 java同步系列之ReentrantLock源码解析(一)——公平锁、非公平锁 9、 死磕 java同步系列之ReentrantLock源码解析(二)——条件锁 10、 死磕 java同步系列之ReentrantLock VS synchronized 11、 死磕 java同步系列之ReentrantReadWriteLock源码解析 欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。 死磕 java同步系列之Semaphore源码解析 标签:string time 访问 更新 img hsv rup overflow 功能 原文地址:https://www.cnblogs.com/tong-yuan/p/Semaphore.html问题
简介
特性
类结构
源码分析
内部类Sync
// java.util.concurrent.Semaphore.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 构造方法,传入许可次数,放入state中
Sync(int permits) {
setState(permits);
}
// 获取许可次数
final int getPermits() {
return getState();
}
// 非公平模式尝试获取许可
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 看看还有几个许可
int available = getState();
// 减去这次需要获取的许可还剩下几个许可
int remaining = available - acquires;
// 如果剩余许可小于0了则直接返回
// 如果剩余许可不小于0,则尝试原子更新state的值,成功了返回剩余许可
if (remaining current) // underflow
throw new Error("Permit count underflow");
// 原子更新state的值,成功了返回true
if (compareAndSetState(current, next))
return;
}
}
// 销毁许可
final int drainPermits() {
for (;;) {
// 看看还有几个许可
int current = getState();
// 如果为0,直接返回
// 如果不为0,把state原子更新为0
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
内部类NonfairSync
// java.util.concurrent.Semaphore.NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// 构造方法,调用父类的构造方法
NonfairSync(int permits) {
super(permits);
}
// 尝试获取许可,调用父类的nonfairTryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
内部类FairSync
// java.util.concurrent.Semaphore.FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
// 构造方法,调用父类的构造方法
FairSync(int permits) {
super(permits);
}
// 尝试获取许可
protected int tryAcquireShared(int acquires) {
for (;;) {
// 公平模式需要检测是否前面有排队的
// 如果有排队的直接返回失败
if (hasQueuedPredecessors())
return -1;
// 没有排队的再尝试更新state的值
int available = getState();
int remaining = available - acquires;
if (remaining
构造方法
// 构造方法,创建时要传入许可次数,默认使用非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 构造方法,需要传入许可次数,及是否公平模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire()方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireUninterruptibly()方法
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
tryAcquire()方法
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
tryAcquire(long timeout, TimeUnit unit)方法
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
release()方法
public void release() {
sync.releaseShared(1);
}
acquire(int permits)方法
public void acquire(int permits) throws InterruptedException {
if (permits
acquireUninterruptibly(int permits)方法
public void acquireUninterruptibly(int permits) {
if (permits
tryAcquire(int permits)方法
public boolean tryAcquire(int permits) {
if (permits = 0;
}
tryAcquire(int permits, long timeout, TimeUnit unit)方法
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits
release(int permits)方法
public void release(int permits) {
if (permits
availablePermits()方法
public int availablePermits() {
return sync.getPermits();
}
drainPermits()方法
public int drainPermits() {
return sync.drainPermits();
}
reducePermits(int reduction)方法
protected void reducePermits(int reduction) {
if (reduction
总结
彩蛋
public class SemaphoreTest {
public static final Semaphore SEMAPHORE = new Semaphore(100);
public static final AtomicInteger failCount = new AtomicInteger(0);
public static final AtomicInteger successCount = new AtomicInteger(0);
public static void main(String[] args) {
for (int i = 0; i seckill()).start();
}
}
public static boolean seckill() {
if (!SEMAPHORE.tryAcquire()) {
System.out.println("no permits, count="+failCount.incrementAndGet());
return false;
}
try {
// 处理业务逻辑
Thread.sleep(2000);
System.out.println("seckill success, count="+successCount.incrementAndGet());
} catch (InterruptedException e) {
// todo 处理异常
e.printStackTrace();
} finally {
SEMAPHORE.release();
}
return true;
}
}
推荐阅读