线程通信

2021-06-10 00:06

阅读:782

标签:his   park   初始   override   poll   ESS   阻塞队列   condition   next   

线程通信

  • 等待:
public final void wait();
public final void wait(long timeout);
//必须在对obj加锁的同步代码块中,在一个线程中,调用obj.wait()时,此线程会释放其拥有的所有锁标记,同时此线程在无限期等待的状态中,释放锁,进入等待队列。
  • 通知:
public final void notify();
public final void notifyAll();
//必须在对obj加锁的同步代码块中,从obj的Waiting中释放一个或全部线程。对自身没有任何影响。

例如:

public class getClassTest {
    //全局锁对象
    static final Object lock = new Object();
    
    public static void main(String[] args) {
        //创建线程1
        new Thread(() -> {
            synchronized (lock) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "执行");
            }
        }, "t1").start();
        
        //创建线程2
        new Thread(() -> {
            synchronized (lock) {
                System.out.println(Thread.currentThread().getName() + "执行");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock.notify();
            }
        }, "t2").start();
        
    }
}

//运行结果:
//> t2执行
// 两秒后...
//> t1执行

sleep(long n)和wait(long n)的区别

  1. sleep是Thread方法,而wait是Object的方法
  2. sleep不需要强制和synchronized配合使用,但wait需要和synchronized一起用
  3. sleep在会面的同时,不会释放锁对象,但wait在等待的时候会释放锁对象
  4. 他们的状态都是TIMED_WAITING

wait/notify原理

技术图片
  • Owner线程发现条件不满足,调用wait方法,即可进入WaitSet变为WAITING状态
  • BLOCKED和WAITING的线程都处于阻塞状态,不占用CPU时间片
  • BLOCKED线程会在Owner线程释放锁时唤醒
  • WAITING线程会在Owner线程条用notify或notifyAll时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入EntryList重新竞争

wait/notify正确使用姿势

//线程1
synchronized(lock){
    while(条件不成立){
	lock.wait();
    }
    //  执行接下来的操作
}

//另一个线程
synchronized(lock){
    //用notifyAll()来唤醒,防止虚假唤醒
    //当一个条件满足时,很多线程都被唤醒了,但是只有其中部分是有用的唤醒,其它的唤醒都是无用功
    //1.比如说买货,如果商品本来没有货物,突然进了一件商品,这是所有的线程都被唤醒了,但是只能一个人买,所以其他人都是假唤醒,获取不到对象的锁
    lock.notifyAll();
}

同步模式之保护性暂停

技术图片

用在一个线程等待另一个线程的执行结果

要点:

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列
  • JDK中,join的实现,Future的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式
    public static void main(String[] args) {
        
        final guardedObject lock = new guardedObject();
        
        new Thread(() -> {
            String o = (String) lock.get();
            System.out.println(o);
        }, "t1").start();
        
        new Thread(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.set("这是一个小秘密");
        }, "t2").start();
    }
}

class guardedObject {
    //结果
    private Object response;

    //获取结果
    public Object get() {
        synchronized (this) {
            //没有结果则等待
            while (response == null) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return response;
        }
    }

    //设置一个结果,并唤醒线程
    public void set(Object o) {
        synchronized (this) {
            this.response = o;
            this.notifyAll();
        }
    }
}

生产者消费者问题

  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列容量有限,满时不会再加入数据,空时不再消耗数据
  • JDK中各种阻塞队列,采用的就是这种模式

技术图片

举个栗子:

  • 首先先定义我们要put和get的Message类
//设置成final的,不可被子类继承,防止子类继承篡改方法
//不提供set()方法,防止属性被修改
final class Message {
    private int id;
    //具体结果
    private Object value;

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", value=" + value +
                ‘}‘;
    }

    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }

    public int getId() {
        return id;
    }

    public Object getValue() {
        return value;
    }
}
  • 接下来实现我们的消息队列类
class MessageQueue {

    //消息队列集合
    final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque();

    //消息队列最大容量
    private int capacity;

    //初始化方法
    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    //存入消息
    public void put(Message message) {
        synchronized (queue) {
            //如果说队列满了,则不能再放入消息
            while (queue.size() == capacity) {
                try {
                    System.out.println("队列满了...");
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.printf(Thread.currentThread().getName());
            System.out.println("->放入消息" + message);
            queue.offer(message);
            //唤醒正在等待消息的线程
            queue.notifyAll();
        }
    }

    //获取消息
    public Message get() {
        synchronized (queue) {
            //如果说队列空了,就不能取消息了,调用wait()方法等待
            while (queue.isEmpty()) {
                try {
                    System.out.println("没有消息可以获取...");
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.poll();
            System.out.printf(Thread.currentThread().getName());
            System.out.println("->取出消息" + message);
            //唤醒需要存入消息的线程
            queue.notifyAll();
            return message;
        }
    }
}
  • 进行一下测试:
public class producerAndConsumer {
    public static void main(String[] args) {
        
        MessageQueue queue = new MessageQueue(2);
        
        //创建3个生产者
        for (int i = 0; i  {
                queue.put(new Message(id, "值:" + id));
            }, "生产者" + i).start();
        }
        
        //和一个不停get的消费者
        new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("一秒后..");
                queue.get();
            }
        }, "消费者").start();
        
    }
}
  • 结果:
> 生产者2->放入消息Message{id=2, value=值:2}
> 生产者0->放入消息Message{id=0, value=值:0}
> 队列满了...
> 一秒后..
> 消费者->取出消息Message{id=2, value=值:2}
> 生产者1->放入消息Message{id=1, value=值:1}
> 一秒后..
> 消费者->取出消息Message{id=0, value=值:0}
> 一秒后..
> 消费者->取出消息Message{id=1, value=值:1}
> 一秒后..
> 没有消息可以获取...

park与unpark的使用

基本使用

它们是LockSupport类中的方法

//暂停当前线程
LockSupport.park();

//恢复某个线程的运行
LockSupport.unpark(暂停线程对象);

举个栗子:

public static void main(String[] args) {

    Thread t1 = new Thread(() -> {
        System.out.println("start...");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("park....");
        LockSupport.park();
        System.out.println("resume...");
    }, "t1");
    
    t1.start();
    
    new Thread(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("unpark...");
        LockSupport.unpark(t1);
    }, "t2").start();
    
}

结果

> start...
> unpark...
> park....
> resume...

特点:

与Object的wait&notify相比

  • wait&notify必须配合Object Monitor一起使用 而park,unpark不必
  • park&unpark是以线程为单位来阻塞和唤醒线程,而notify只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么精确
  • park&unpark可以先unpark,而wait&notify不能先notify

技术图片

park使用原理

  1. 当前线程调用Unsafe.park()方法
  2. 检查_counter,本情况为0,这时,获得_mutex互斥锁
  3. 线程进入_cond条件变量阻塞
  4. 设置_counter=0

技术图片

unpark使用原理

  1. 调用Unsafe.unpark(Thread-0)方法,设置_counter为1
  2. 唤醒_cond条件变量中的Thread_0
  3. Thread_0恢复运行
  4. 设置_counter为0

活跃性

死锁:

  • 当第一个线程拥有A对象锁标记,并等待B对象锁标记,同时第二个线程拥有B对象锁标记,并等待A对象锁标记时,产生死锁。
  • 一个线程可以同时拥有多个对象的锁标记,当线程阻塞时,不会释放已经拥有的锁标记,由此可能造成死锁。

例如:

t1线程获得 A对象 锁,接下来想获取 B对象 的锁

t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁

public class Test {
    
private static final Object A = new Object();
    
private static final Object B = new Object();

    public static void main(String[] args) {

        Thread t1 = new Thread(() -> {
            
           synchronized (A){
               System.out.println(Thread.currentThread().getName() + "获得A锁");
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               
               synchronized (B){
                   System.out.println(Thread.currentThread().getName() + "获得B锁");
                   System.out.println("其他操作");
               }
           }
        }, "t1");
        t1.start();
        new Thread(() -> {
            
            synchronized (B){
                System.out.println(Thread.currentThread().getName() + "获得B锁");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                synchronized (A){
                    System.out.println(Thread.currentThread().getName() + "获得A锁");
                    System.out.println("其他操作");
                }
            }
        }, "t2").start();
    }
}

活锁

活锁出现在两个先很互相改变对方的结束条件,最后谁也无法结束,例如:

public class getClassTest {
    
private static final Object A = new Object();
private static volatile int count = 10;

    public static void main(String[] args) {

        Thread t1 = new Thread(() -> {
           while(count > 0){
               try {
                   Thread.sleep(300);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               count--;
               System.out.println(Thread.currentThread().getName() + ":" + count);
           }
        }, "t1");
        t1.start();
        
        Thread t2 = new Thread(() -> {
            while(count 

解决方法就是将 t1 或者 t2 的sleep时间设置成随机数,使两个线程交错开来

ReentrantLock

相较于synchronized 它具备如下特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量

与synchronized一样,都支持可重入

基本语法:

//获取锁
reentrantLock.lock();
try {
    
    //临界区
    
} finally {
    
    //释放锁
    reentrantLock.unlock();
    
}

可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为他是这把锁的拥有者,因此有权利再次获取这把锁

如果是不可重入锁,那么第二次获得锁时,自己也会被锁住

public class ThreadTest2 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        lock.lock();
        try{
            System.out.println("进入 main 方法");
            m1();
        }finally {
            lock.unlock();
        }
    }
    public static void m1(){
        lock.lock();
        try{
            System.out.println("进入 m1 方法");
            m2();
        }finally {
            lock.unlock();
        }
    }
    public static void m2(){
        lock.lock();
        try{
            System.out.println("进入 m2 方法");
        }finally {
            lock.unlock();
        }
    }
}

结果

> 进入 main 方法
> 进入 m1 方法
> 进入 m2 方法

可打断

public class ThreadTest2 {
    private static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "尝试获得锁");
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println(Thread.currentThread().getName() + "获得锁失败");
                return;
            }
            //测试不使用可打断锁
//            System.out.println(Thread.currentThread().getName() + "尝试获得锁");
//            lock.lock();
            try{
                System.out.println(Thread.currentThread().getName() + "获得到锁");
            }finally {
                lock.unlock();
            }
        },"t1");

        //主线程获得锁
        lock.lock();
        System.out.println(Thread.currentThread().getName() + "获得到锁");
        t1.start();
        Thread.sleep(1000);
        t1.interrupt();
    }
}

锁超时

立刻失败

public class ThreadTest2 {
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {

            //测试不使用可打断锁
            System.out.println(Thread.currentThread().getName() + "尝试获得锁");
            try {
                //带参数的tryLock()方法,等待指定时间后停止等待
                if (!lock.tryLock(2, TimeUnit.SECONDS)) {
                    System.out.println(Thread.currentThread().getName() + "获得锁失败");
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println(Thread.currentThread().getName() + "获得锁失败");
                return;
            }
            //不带参数的tryLock()方法,立刻停止等待
//            if (!lock.tryLock()) {
//                System.out.println(Thread.currentThread().getName() + "获得锁失败");
//                return;
//            }
            try {
                System.out.println(Thread.currentThread().getName() + "获得到锁");
            } finally {
                lock.unlock();
            }
        }, "t1");

        //主线程获得锁
        lock.lock();
        System.out.println(Thread.currentThread().getName() + "获得到锁");
        t1.start();
        try {
            Thread.sleep(1000);
        } finally {
            lock.unlock();
        }

    }
}

条件变量

synchronized中也有条件变量,就是之前原理时的那个waitSet休息室,当条件不满足时进入waitSet等待

ReentrantLock的条件变量比synchronized强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized是那些不满足条件的线程都在一间休息室等消息
  • 而ReentrantLock支持多间休息时

使用流程

  • await前需要获得锁
  • await执行后,会释放锁,进入conditionObject等待
  • await的线程被唤醒(或打断或超时)去重新竞争lock锁
  • 竞争lock锁成功后,从await后继续执行

顺序控制

如何控制t1线程保证在t2线程之后运行?

  • 方法一:wait()&notify()
public class ThreadTest2 {
    private static final Object obj = new Object();
    //表示t2是否运行过
    static boolean state = false;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            synchronized (obj){
                //如果说t2没有运行过,就等待
                while (!state) {
                    try {
                        obj.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println(Thread.currentThread().getName() + "执行");
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            synchronized (obj){

                System.out.println(Thread.currentThread().getName() + "执行");
                state = true;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                obj.notify();
            }
        }, "t2");

        t1.start();
        t2.start();
    }
}
  • 方法二: LockSupport.park()&LockSupport.unpark(t1);
public class ThreadTest2 {
    private static final Object obj = new Object();
    //表示t2是否运行过
    static boolean state = false;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            //如果说t2没有运行过,就等待
            while (!state) {
                LockSupport.park();
            }
            System.out.println(Thread.currentThread().getName() + "执行");
        }, "t1");

        Thread t2 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "执行");
            state = true;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LockSupport.unpark(t1);
        }, "t2");

        t1.start();
        t2.start();
    }
}

交替输出

线程1输出 A 5次,线程2输出 B 5次,线程3输出 C 5次,要求输出 ABC ABC ABC ABC ABC

  • 方法一:wait()&notify()
public class ThreadTest2 {
    //表示三个线程的状态,1 代表t1可运行  2 代表t2可运行  3 代表t3可运行
    static int state = 1;

    public static void main(String[] args) {


        Thread t1 = new Thread(() -> {
            synchronized (obj){
                //交替打印5次的操作
                for (int i = 0 ; i  {
            synchronized (obj){
                for (int i = 0 ; i  {
            synchronized (obj){
                for (int i = 0 ; i 
  • 方法二:LockSupport.park()&LockSupport.unpark(t1);
public class ThreadTest2 {
    private static Thread t1,t2,t3;

    public static void main(String[] args) {

        parkAndUnpark parkAndUnpark = new parkAndUnpark();
        
        t1 = new Thread(() -> {
            parkAndUnpark.Print("A",t2);
        }, "t1");
        
        t2 = new Thread(() -> {
            parkAndUnpark.Print("B",t3);
        }, "t2");
        
        t3 = new Thread(() -> {
            parkAndUnpark.Print("C\t",t1);
        }, "t3");
        
        t1.start();
        t2.start();
        t3.start();
        LockSupport.unpark(t1);
    }
}

class parkAndUnpark{
    private final int LoopNumber = 5;

    public void Print(String s,Thread next){
        
        for (int i = 0; i 

线程通信

标签:his   park   初始   override   poll   ESS   阻塞队列   condition   next   

原文地址:https://www.cnblogs.com/LongDa666/p/14470746.html


评论


亲,登录后才可以留言!