多线程生产者/消费者模式实现
2021-06-24 09:04
标签:实现 end create list not style waiting rand port 参考书籍《java多线程编程核心技术》 都是基于wait/notify实现的 线程类 测试运行 打印输出 如果以此为基础,设计多个生产者和多个消费者,那么运行过程中很可能会发生假死的情况,也就是所有线程都呈现等待的状态 修改Producer.java,Consumer.java以及测试类 打印结果 主要原因是因为notify可能唤醒的是同类(生产者唤醒生产者,消费者唤醒消费者)。最终导致所有线程都处于WAITING状态,程序进而呈现假死状态 只要将Producer和Consumer中的notify修改为notifyAll即可,这样就不至于出现假死状态 生产者/消费者 线程类 测试运行 打印结果 多线程生产者/消费者模式实现 标签:实现 end create list not style waiting rand port 原文地址:https://www.cnblogs.com/qf123/p/9670370.html一个生产者和一个消费者:操作值
1 package com.qf.test10.pojo;
2
3 /**
4 * @author qf
5 * @create 2018-09-18 15:59
6 */
7 public class Entity {
8 public static String value = "";
9 }
1 package com.qf.test10;
2
3 import com.qf.test10.pojo.Entity;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 15:52
8 * 生产者类
9 */
10 public class Producer {
11 private String lock;
12
13 public Producer(String lock) {
14 this.lock = lock;
15 }
16
17 public void setValue(){
18 try {
19 synchronized (lock){
20 if(!Entity.value.equals("")){
21 lock.wait();
22 }
23 String value = System.currentTimeMillis()+"_"+System.nanoTime();
24 System.out.println("set的值是"+value);
25 Entity.value = value;
26 lock.notify();
27 }
28 } catch (InterruptedException e) {
29 e.printStackTrace();
30 }
31 }
32 }
1 package com.qf.test10;
2
3 import com.qf.test10.pojo.Entity;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 15:52
8 * 消费者类
9 */
10 public class Consumer {
11 private String lock;
12
13 public Consumer(String lock) {
14 this.lock = lock;
15 }
16
17 public void getValue(){
18 try {
19 synchronized (lock){
20 if(Entity.value.equals("")){
21 lock.wait();
22 }
23 System.out.println("get的值"+Entity.value);
24 Entity.value = "";
25 lock.notify();
26 }
27 } catch (InterruptedException e) {
28 e.printStackTrace();
29 }
30 }
31 }
1 package com.qf.test10.thread;
2
3 import com.qf.test10.Producer;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 16:08
8 */
9 public class ThreadP extends Thread {
10 private Producer producer;
11
12 public ThreadP(Producer producer) {
13 this.producer = producer;
14 }
15
16 @Override
17 public void run() {
18 while(true) {
19 producer.setValue();
20 }
21 }
22 }
1 package com.qf.test10.thread;
2
3 import com.qf.test10.Consumer;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 16:11
8 */
9 public class ThreadC extends Thread {
10 private Consumer consumer;
11
12 public ThreadC(Consumer consumer) {
13 this.consumer = consumer;
14 }
15
16 @Override
17 public void run() {
18 while (true) {
19 consumer.getValue();
20 }
21 }
22 }
1 package com.qf.test10;
2
3 import com.qf.test10.thread.ThreadC;
4 import com.qf.test10.thread.ThreadP;
5
6 /**
7 * @author qf
8 * @create 2018-09-18 16:12
9 */
10 public class Run {
11 public static void main(String[] args) {
12 String lock = new String("");
13 Producer p = new Producer(lock);
14 Consumer c = new Consumer(lock);
15 ThreadP tp = new ThreadP(p);
16 tp.start();
17 ThreadC tc = new ThreadC(c);
18 tc.start();
19 }
20 }
set的值是1537259244097_800479975994656
get的值1537259244097_800479975994656
set的值是1537259244097_800479976020503
get的值1537259244097_800479976020503
set的值是1537259244097_800479976042246
get的值1537259244097_800479976042246
set的值是1537259244097_800479976062349
get的值1537259244097_800479976062349
set的值是1537259244097_800479976083272
get的值1537259244097_800479976083272
set的值是1537259244097_800479976103785
get的值1537259244097_800479976103785
set的值是1537259244097_800479976124298
get的值1537259244097_800479976124298
set的值是1537259244097_800479976144400
get的值1537259244097_800479976144400
.............
多个生产者与多个消费者:操作值
1 package com.qf.test10;
2
3 import com.qf.test10.pojo.Entity;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 15:52
8 * 生产者类
9 */
10 public class Producer {
11 private String lock;
12
13 public Producer(String lock) {
14 this.lock = lock;
15 }
16
17 public void setValue(){
18 try {
19 synchronized (lock){
20 while (!Entity.value.equals("")){
21 System.out.println("生产者 "+Thread.currentThread().getName()+" WAITING了★");
22 lock.wait();
23 }
24 System.out.println("生产者 "+Thread.currentThread().getName()+" RUNNABLE了");
25 String value = System.currentTimeMillis()+"_"+System.nanoTime();
26 //System.out.println("set的值是"+value);
27 Entity.value = value;
28 lock.notify();
29 }
30 } catch (InterruptedException e) {
31 e.printStackTrace();
32 }
33 }
34 }
1 package com.qf.test10;
2
3 import com.qf.test10.pojo.Entity;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 15:52
8 * 消费者类
9 */
10 public class Consumer {
11 private String lock;
12
13 public Consumer(String lock) {
14 this.lock = lock;
15 }
16
17 public void getValue(){
18 try {
19 synchronized (lock){
20 if(Entity.value.equals("")){
21 System.out.println("消费者 "+Thread.currentThread().getName()+" WAITING了☆");
22 lock.wait();
23 }
24 System.out.println("消费者 "+Thread.currentThread().getName()+" RUNNABLE了");
25 //System.out.println("get的值"+Entity.value);
26 Entity.value = "";
27 lock.notify();
28 }
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 }
32 }
33 }
1 package com.qf.test10;
2
3 import com.qf.test10.thread.ThreadC;
4 import com.qf.test10.thread.ThreadP;
5
6 /**
7 * @author qf
8 * @create 2018-09-18 16:12
9 */
10 public class Run {
11 public static void main(String[] args) throws InterruptedException {
12 String lock = new String("");
13 Producer p = new Producer(lock);
14 Consumer c = new Consumer(lock);
15 /*ThreadP tp = new ThreadP(p);
16 tp.start();
17 ThreadC tc = new ThreadC(c);
18 tc.start();*/
19 ThreadP[] threadPS = new ThreadP[2];
20 ThreadC[] threadCS = new ThreadC[2];
21 for (int i = 0; i ) {
22 threadPS[i] = new ThreadP(p);
23 threadPS[i].setName("生产者"+(i+1));
24 threadPS[i].start();
25 threadCS[i] = new ThreadC(c);
26 threadCS[i].setName("消费者"+(i+1));
27 threadCS[i].start();
28 }
29
30 Thread.sleep(5000);
31 Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
32 Thread.currentThread().getThreadGroup().enumerate(threads);
33 for (int i = 0; i ) {
34 System.out.println(threads[i].getName()+" "+threads[i].getState());
35 }
36 }
37 }
生产者 生产者1 RUNNABLE了
生产者 生产者1 WAITING了★
生产者 生产者2 WAITING了★
消费者 消费者1 RUNNABLE了
消费者 消费者1 WAITING了☆
生产者 生产者1 RUNNABLE了
生产者 生产者1 WAITING了★
生产者 生产者2 WAITING了★
消费者 消费者2 RUNNABLE了
消费者 消费者2 WAITING了☆
消费者 消费者1 RUNNABLE了
消费者 消费者1 WAITING了☆
生产者 生产者1 RUNNABLE了
生产者 生产者1 WAITING了★
生产者 生产者2 WAITING了★
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
生产者1 WAITING
消费者1 WAITING
生产者2 WAITING
消费者2 WAITING
一个生产者和一个消费者:操作栈
1 package com.qf.test11.pojo;
2
3 import java.util.ArrayList;
4 import java.util.List;
5
6 /**
7 * @author qf
8 * @create 2018-09-18 17:14
9 */
10 public class MyStack {
11 private List list = new ArrayList();
12 synchronized public void push(){
13 try {
14 if (list.size() == 1){
15 this.wait();
16 }
17 list.add("test"+Math.random());
18 this.notify();
19 System.out.println("push = "+list.size());
20 } catch (InterruptedException e) {
21 e.printStackTrace();
22 }
23 }
24 public synchronized void pop(){
25 try {
26 if(list.size() == 0){
27 //System.out.println("pop操作: "+Thread.currentThread().getName()+"线程wait状态");
28 this.wait();
29 }
30 System.out.println("pop操作: "+Thread.currentThread().getName()+"线程,获取值="+list.get(0));
31 list.remove(0);
32 this.notify();
33 System.out.println("pop = "+list.size());
34 } catch (InterruptedException e) {
35 e.printStackTrace();
36 }
37 }
38 }
1 package com.qf.test11;
2
3 import com.qf.test11.pojo.MyStack;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 17:13
8 * 生产者
9 */
10 public class Producer {
11 private MyStack myStack;
12
13 public Producer(MyStack myStack) {
14 this.myStack = myStack;
15 }
16
17 public void pushService(){
18 myStack.push();
19 }
20 }
1 package com.qf.test11;
2
3 import com.qf.test11.pojo.MyStack;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 17:14
8 */
9 public class Consumer {
10 private MyStack myStack;
11
12 public Consumer(MyStack myStack) {
13 this.myStack = myStack;
14 }
15 public void popService(){
16 myStack.pop();
17 }
18 }
1 package com.qf.test11.thread;
2
3 import com.qf.test11.Producer;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 17:13
8 */
9 public class ThreadP extends Thread {
10 private Producer producer;
11
12 public ThreadP(Producer producer) {
13 this.producer = producer;
14 }
15
16 @Override
17 public void run() {
18 while (true){
19 producer.pushService();
20 }
21 }
22 }
1 package com.qf.test11.thread;
2
3 import com.qf.test11.Consumer;
4
5 /**
6 * @author qf
7 * @create 2018-09-18 17:14
8 */
9 public class ThreadC extends Thread {
10 private Consumer consumer;
11
12 public ThreadC(Consumer consumer) {
13 this.consumer = consumer;
14 }
15
16 @Override
17 public void run() {
18 while (true){
19 consumer.popService();
20 }
21 }
22 }
1 package com.qf.test11;
2
3 import com.qf.test11.pojo.MyStack;
4 import com.qf.test11.thread.ThreadC;
5 import com.qf.test11.thread.ThreadP;
6
7 /**
8 * @author qf
9 * @create 2018-09-18 17:34
10 */
11 public class Run {
12 public static void main(String[] args) {
13 MyStack myStack = new MyStack();
14 Producer p = new Producer(myStack);
15 Consumer c = new Consumer(myStack);
16 ThreadP tp = new ThreadP(p);
17 ThreadC tc = new ThreadC(c);
18 tp.setName("tp");
19 tc.setName("tc");
20 tp.start();
21 tc.start();
22 }
23 }
push = 1
pop操作: tc线程,获取值=test0.8957260024057878
pop = 0
push = 1
pop操作: tc线程,获取值=test0.9236606274738514
pop = 0
push = 1
pop操作: tc线程,获取值=test0.7661156573296891
pop = 0
push = 1
pop操作: tc线程,获取值=test0.6523634151650343
pop = 0
push = 1
pop操作: tc线程,获取值=test0.08728918553111287
pop = 0
push = 1
pop操作: tc线程,获取值=test0.472483808512989
pop = 0
push = 1
pop操作: tc线程,获取值=test0.17456918848050884
pop = 0
push = 1
pop操作: tc线程,获取值=test0.1785536700399648
pop = 0
............