欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

六、网络编程之生产者消费者(一)

引言

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

首先来熟悉几个生产者和消费者的几个重要方法吧!!!

一、wait()方法

其实wait()方法就是使线程停止运行。
1. 方法 wait() 的作用是使当前执行代码的线程进行等待,wait()方法是 Object 类的方法,该方法是用来将当前线程置入“预执行队列”中,并且在 wait() 所在的代码处停止执行,直到接到通知或被中断为止。
2. wait()方法只能在同步方法中或同步块中调用。如果调用wait()时,没有持有适当的锁,会抛出异常。
3. wait()方法执行后,当前线程释放锁,线程与其它线程竞争重新获取锁。

代码演示:

ublic class TestThread702 {

   public static void main(String[] args) throws InterruptedException {
       Object object = new Object();
       synchronized (object) {
           System.out.println("等待中...");
           //等待
           object.wait();
           System.out.println("等待已过...");
       }
       System.out.println("主线程结束!!!");
   }
}

结果:等待中...

这样在执行到object.wait()之后就一直等待下去,那么程序肯定不能一直这么等待下去了。这个时候就需要使用到了另外一个方法唤醒的方法notify()

二、notify()方法

notify方法就是使停止的线程继续运行。
1. 方法notify()也要在同步方法或同步块中调用,该方法是用来通知那些可能等待该对象的对象锁的其它线程,对其发出通知 notify ,并使它们重新获取该对象的对象锁。如果有多个线程等待,则有线程规划器随机挑选出一个呈 wait 状态的线程。

  1. notify()方法后,当前线程不会马上释放该对象锁,要等到执行 notify() 方法的线程将程序执行完,也就是退出同步代码块之后才会释放对象锁。
    演示一个小例子。
public class TestThread702 {

   public static void main(String[] args) {
       Object obj = new Object();
       Thread thread1 = new Thread(new MyRunnable702(obj,true));
       Thread thread2 = new Thread(new MyRunnable702(obj,false));
       thread1.start();
       thread2.start();
       System.out.println("主线程结束!!!");
   }
}
class MyRunnable702 implements Runnable {
   private Object obj;
   private boolean flag;
   public MyRunnable702(Object obj, boolean flag) {
       this.obj = obj;
       this.flag = flag;
   }
   private void method1(){
       synchronized (obj) {
           try {
               //这里使用一个死循环
               while (true) {
                   System.out.println("wait()...开始");
                   //开始等待
                   obj.wait();
                   System.out.println("wait()...结束");
               }
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
   private void method2() {
       synchronized (obj) {
           System.out.println("notify()...开始");
           //开始唤醒
           obj.notify();
           System.out.println("notify()...结束");
       }
   }
   @Override
   public void run() {
       if (this.flag) {
           method1();
       } else {
           method2();
       }
   }
}

结果:
主线程结束!!!
wait()...开始
notify()...开始
notify()...结束
wait()...结束
wait()...开始

从结果上来看第一个线程执行的是一个 method1 方法,该方法里面有个死循环并且使用了 wait 方法进入等待状态将释放锁,如果这个线程不被唤醒的话将会一直等待下去,这个时候第二个线程执行的是 method2 方法,该方法里面执行了一个唤醒线程的操作,并且一直将 notify 的同步代码块执行完毕之后才会释放锁然后继续执行 wait 结束打印语句。

三、小结

出现阻塞的情况大体分为如下5种:
1. 线程调用 sleep 方法,主动放弃占用的处理器资源。
2. 线程调用了阻塞式 IO 方法,在该方法返回前,该线程被阻塞。
3. 线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有。
4. 线程等待某个通知。
5. 程序调用了 suspend 方法将该线程挂起。此方法容易导致死锁,尽量避免使用该方法。

run() 方法运行结束后进入销毁阶段,整个线程执行完毕。
每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度;反之,一个线程被 wait 后,就会进入阻塞队列,等待下一次被唤醒。

注意:wait,notify必须使用在synchronized同步方法或者代码块内。

notifyAll唤醒所有线程

以上都讲解了 notify 方法只是唤醒某一个等待线程,那么如果有多个线程都在等待中怎么办呢,这个时候就可以使用 notifyAll 方法可以一次唤醒所有的等待线程,看示例。


public class TestThread702 {

   public static void main(String[] args) throws InterruptedException {
       Object lock = new Object();
       Thread thread1 = new Thread(new MyRunnable702(lock,true));
       thread1.setName("waitA");
       Thread thread2 = new Thread(new MyRunnable702(lock,true));
       thread2.setName("waitB");
       Thread thread3 = new Thread(new MyRunnable702(lock,true));
       thread3.setName("waitC");
       thread1.start();
       thread2.start();
       thread3.start();

       Thread.sleep(2000);
       //这里唤醒线程
       Thread thread4 = new Thread(new MyRunnable702(lock,false));
       thread4.setName("notify");
       thread4.start();

       Thread.sleep(5000);
       System.out.println("主线程结束!!!");
   }
}
class MyRunnable702 implements Runnable {
   private Object lock;
   private boolean flag;
   public MyRunnable702(Object lock,boolean flag) {
       this.lock = lock;
       this.flag = flag;
   }
   @Override
   public void run() {
       if (flag) {
           method1(this.lock);
       } else {
           method2(this.lock);
       }
   }
   /**
    * 等待线程方法
    * @param lock
    */
   private void method1(Object lock){
       synchronized (lock) {
           try {
               System.out.println(Thread.currentThread().getName() + "...开始");
               //开始等待
               lock.wait();
               System.out.println(Thread.currentThread().getName() + "...结束");
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
   }

   /**
    * 唤醒线程方法
    * @param lock
    */
   private void method2(Object lock){
       synchronized (lock) {
           try {
               System.out.println(Thread.currentThread().getName() + ",notify()...开始");
               //唤醒某一个线程
               lock.notify();
               System.out.println(Thread.currentThread().getName() + ",notify()...结束");
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
   }
}

结果:
waitA...开始
waitC...开始
waitB...开始
notify,notify()...开始
notify,notify()...结束
waitA...结束
主线程结束!!!

从上面运行结果可以看出有3个线程等待中,然后唤醒了 waitA 线程所以打印出 waitA 线程结束的信息。现在需要唤醒所有线程,所以需要使用到 notifyAll 方法。将 method2 方法中的 notify 方法改为 notifyAll 方法即可。


/**
* 唤醒线程方法
* @param lock
*/
private void method2(Object lock){
   synchronized (lock) {
       try {
           System.out.println(Thread.currentThread().getName() + ",notify()...开始");
           //唤醒所有等待中的线程
           lock.notifyAll();
           System.out.println(Thread.currentThread().getName() + ",notify()...结束");
       } catch (Exception e) {
           e.printStackTrace();
       }
   }
}

结果:
waitA...开始
waitC...开始
waitB...开始
notify,notify()...开始
notify,notify()...结束
waitA...结束
waitB...结束
waitC...结束
主线程结束!!!

现在可以看出 notifyAll 确实是唤醒了所有线程了。
注意:唤醒线程不能过早,如果在还没有线程在等待中时,过早的唤醒线程,这个时候就会出现先唤醒,在等待的效果了。这样就没有必要在去运行 wait 方法了。

生产者与消费者

上面几个重要方法已经介绍完毕,现在来开始学习生产者与消费者模式了。
生产者与消费者开头已经介绍过了,生产者与消费者一般需要第三者来解耦的,所以现在就模拟一个简单的商品的生产者与消费者,由生产者线程生产出一个商品之后将由消费者线程开始消费!
首先需要一个商品 Goods 类,类中有名称以及库存,生产方法和消费方法。

/**
* 商品类
*/
public class Goods {
   /**
    * 商品名称
    */
   private String goodsName;
   /**
    * 库存
    */
   private int count;
   /**
    * 生产方法
    * @param goodsName
    */
   public synchronized void set(String goodsName) {
       //这里就是开始生产商品了,每次生产一个商品
       this.goodsName = goodsName;
       this.count = this.count + 1;
       System.out.println("生产" + this.toString());
   }
   /**
    * 消费方法
    */
   public synchronized void get() {
       //消费商品,每次消费一个商品
       this.count = this.count - 1;
       System.out.println("消费"+this.toString());
   }
   /**
    * 显示效果
    * @return
    */
   @Override
   public String toString() {
       return "商品名称为:" + goodsName + ",库存为:" + count;
   }
}

//现在需要一个生产者和消费者,2个线程类。
/**
* 生产者类
*/
public class Producer implements Runnable {
   /**
    * 需要一个商品类来调用生产方法
    */
   private Goods goods;

   public Producer(Goods goods) {
       this.goods = goods;
   }

   @Override
   public void run() {
       //这里直接生产“中华香烟一条”
       this.goods.set("中华香烟一条");
   }
}

/**
* 消费者类
*/
class Consumer implements Runnable {
   /**
    * 需要一个商品类来调用消费方法
    */
   private Goods goods;

   public Consumer(Goods goods) {
       this.goods = goods;
   }
   @Override
   public void run() {
       //这里消費“中华香烟一条”
       this.goods.get();
   }
}

//现在将在测试类运行试试看。
public static void main(String[] args) {
       /**
        * 首先需要一个商品共享类
        */
       Goods goods = new Goods();
       Thread producer = new Thread(new Producer(goods));
       Thread consumer = new Thread(new Consumer(goods));
       //开始生产者线程
       producer.start();
       //开启消费者线程
       consumer.start();
}

//结果:
生产商品名称为:中华香烟一条,库存为:1
消费商品名称为:中华香烟一条,库存为:0

以上就是正常的生产者与消费者模式,貌似没有出什么问题。

注意:那么现在换一种方式,将生产者线程开启和消费者线程开启的代码换个位置在测试下。

消费商品名称为:null,库存为:-1
生产商品名称为:中华香烟一条,库存为:0

这样就出现问题了。怎么解决呢,就当做习题,大家可以自己想想怎么改造方法。下一节在继续来讲解!!!

赞(1) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 » 六、网络编程之生产者消费者(一)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏