文章 62
浏览 15135
单生产者单消费者模式队列的搭建(一)

单生产者单消费者模式队列的搭建(一)

简单回顾 JDK 中的队列体系

**队列是什么,我相信大家都很清楚了。**所谓队列,就是一个线性容器,加上操作容器的一些规则。线程容器就是数组,链表(其实,在我看来链表并不能算作容器,它的每一个节点都是一个对象,用指针连接构成了链表本身。我们可以说链表就是把一些对象或者说一些数据精心设计之后,实现了容器的作用,但不能说链表本身就是一个容器。但这些小概念都无关紧要了)。而操作这个容器的规则就是从数组的一端存入数据,从另一端将数据取出,并且操作的过程中要让这些数据先进先出。

按照上面的规则来实现一个队列,显然非常容易了。请看下面的定义。

数组 + 先进先出 = 队列
链表 + 先进先出 = 队列

其实,在 JDK 中的队列,几乎都是采用了上面两种定义实现的。比如大家最熟悉的 ArrayBlockingQueue 和 LinkedBlockingQueue 队列。这些队列的结构也都很简单,首先,还是定义一个通用的接口 Queue,然后在该接口中定义几个基础方法。请看下面代码块。

public interface Queue<E> extends Collection<E> {
    //向队列中添加数据的方法,也就是所谓的入队方法
    boolean offer(E e);
  
    //也是入队方法,
    boolean add(E e);
  
    //从队列中取出数据的方法,也就是所谓的出队方法
    E poll();
  

    //也是出队方法
    E remove();

    //查看队列的首位数据
    E element();
  
    //查看队列的首位数据
    E peek();
  
}

接下来就是一系列的抽象实现类和非抽象实现类。但是,先不忙着讲解下面的内容,我想先问问大家,如果要让你设计一整个队列的体系,在给出你上面那个接口之后,你会怎样在这个接口的基础上完善队列体系呢?比如,在这个接口之下应该怎么细分?毕竟这只是一个总的接口,只定义了最基本的入队和出队的方法。举一个最直接的例子,队列要遵循先进先出的规则,那么在队列的容器中,要从哪一端进,从哪一端出呢?如果一个队列是单向的,这就意味着这个队列只能从队列容器的一端写入数据,从另一端取走数据。可有没有一种情况,有一种队列是双向的呢?所谓双向,就是哪一端都可以作为入队口,也可以作为出队口。如果要这么划分,那么 Queue 接口下面似乎应该再定义一个新的接口,这个接口就最为所有双向队列要实现的接口,JDK 中恰好就有这么一个接口,叫做 Deque,内部定义的方法如下所示。

//实现了Deque接口的队列,一般称其为双端队列
public interface Deque<E> extends Queue<E> {
  
    //从队列头部存入数据的方法
    boolean offerFirst(E e);
  

    //从队列尾部存入数据的方法
    boolean offerLast(E e);
  

    //从队列尾部存入数据的方法
    boolean offer(E e);
  

    //从队列头部存入数据的方法
    void addFirst(E e);
  

    //从队列尾部存入数据的方法
    void addLast(E e);

    //从队列尾部存入数据的方法
    boolean add(E e);
  
    boolean addAll(Collection<? extends E> c);


    //从队列头部取出数据的方法
    E pollFirst();
  

    //从队列尾部取出数据的方法
    E pollLast();
  
	//下面这几个都是取出数据的方法,就不再一一注释了
    E poll();
  

    E removeFirst();
  

    E removeLast();
  

    E remove();
  

    int size();

    //还有很多方法就不再列出来了
}

上面这个 Deque 接口就继承了 Queue 接口,可以说是在 Queue 体系中开辟了一个新的分支。只要是实现了 Deque 接口的队列,都是双端队列。在 JDK 中,大家熟悉的 LinkList 就实现了 Deque 接口,ArrayDeque 也实现了 Deque 接口。这显然是一个很清晰的划分方法。那么,除了队列的单向和双向可以作为细分体系的条件,还有什么条件可以将队列体系细分呢?其实有一种情况大家一定很清楚,那就是当一个队列被创建的时候,肯定要考虑到队列的容量,所谓容量就是队列可以存放多少个数据。如果一个队列的容量是有限的,那么当一个线程向队列中存放数据的时候,显然要考虑到队列是否已经装满了的情况。如果队列中放满了数据,这时候线程就不能再向队列中存放数据,最常规的做法就是让线程阻塞一会,等待队列中的数据被另外的线程取走了,有了可以存放数据的空间了,被阻塞的线程就可以继续工作,把数据放到队列中。当然,当一个线程希望从队列中取走一个数据,但此刻队列是空的,那么希望取走数据的这个线程也就要阻塞一会,等待其它线程向队列中放入了数据,被阻塞的线程就可以继续工作,把刚刚放入的数据取走了。

与上面这种情况相对的一种情况就是,队列的容量是没有限制的,或者说是无界的,这样一来,也就不存在什么容量不够用的情况,如果本次放入数据的时候容量不够了,直接给队列扩容即可,然后把数据放入队列中。这也就意味着,将数据放入队列的线程不必再阻塞;那么,当队列中空无一物时,有一个线程来获取数据,这时候也就直接返回 null 即可。

我想,分析到这里,大家应该也意识到了,其实完全可以根据是否阻塞和队列是否有界来进一步划分队列体系。而 JDK 中也正是这么做的。JDK 中就是通过是否阻塞来进一步细分队列体系的。而进行细分的接口就是 BlockingQueue,该接口直接继承了 Queue 接口。请看下面代码块。

public interface BlockingQueue<E> extends Queue<E> {
  
    //存放数据到队列的方法
    void put(E e) throws InterruptedException;
  
    boolean add(E e);
  
    boolean offer(E e);
  
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
  
	//取出数据的方法
    E take() throws InterruptedException;
  
    boolean remove(Object o);
  
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
  
    int drainTo(Collection<? super E> c);
  
    int drainTo(Collection<? super E> c, int maxElements);

    boolean contains(Object o);
  
    int remainingCapacity();

}

在上面的代码块中可以看到 BlockingQueue 接口直接继承了 Queue 接口,并没有和 Deque 接口发生任何关系,因此,一个队列如果直接实现了 BlockingQueue 接口,那么这个接口大概率就是个单向并且具有阻塞线程作用的队列,除非这个队列在实现 BlockingQueue 接口的同时也实现了 Deque 接口。

当然,我还要向大家强调一点,实现了 BlockingQueue 接口的队列,并不意味着队列本身中的所有入队和出队方法都具有阻塞的功能,有些入队和出队方法并不会导致线程阻塞,就比如说实现了 BlockingQueue 接口的 ArrayBlockingQueue 队列,其内部的一些方法并不会在队列已满或者队列为空的状态下,导致存放数据的线程和取出数据的线程阻塞。请看下面代码块。

//可以看到,该类实现了BlockingQueue接口
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {

    //省略部分内容


    //该方法在队列已满时,并不会让存放数据的线程阻塞,只会直接返回false
    public boolean offer(E e) {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //判断队列是否已满
            if(count == items.length) {
                //如果满了,直接返回false
                return false;
            }
            //如果没满,在这里把数据放到队列中
            enqueue(e);
            //返回true
            return true;
        } finally {
            lock.unlock();
        }
    }


    //该方法在队列为空时,并不会让取出数据的线程阻塞,而是直接返回null
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //判断队列是否为空
            if(count == 0) {
                //如果为空,直接返回null
                return null;
            }
            //如果不为空,就在这里返回数据
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

}

看了上面的代码块,我想大家应该明白了,一个实现了 BlockingQueue 接口的队列,只代表着这个队列拥有了阻塞线程的功能,但是并不意味着它只拥有这个功能。这一点大家一定要理清楚。如果上面这些逻辑大家都理清楚了,那么,请大家再仔细想一想,整个队列体系似乎既可以按照是否双端来细分,又可以按照是否阻塞来细分,那么,双端和阻塞会不会有交集呢?就比如说,一个队列是双端阻塞队列。这当然是有可能的,并且在 JDK 的队列体系中就存在这样的队列,就是 LinkedBlockingDeque 队列,我会用一副简图来为大家展示一下 JDK 队列体系中一些类和接口的关系。请看下面的简图。

上面就是我为大家提供的简图,展示了 JDK 队列体系中一些接口和类的关系。我并没有把所有接口和类都列出来,只是挑选了几个有代表性的展现给大家。我相信仅凭这几个类和接口,大家应该也彻底理清楚了 JDK 中队列体系的划分。整个队列体系其实就可以按照下面四种功能划分(当然,你也可以使用线程是否安全来划分,而阻塞队列往往也是线程安全的,因为使用了同步锁,没抢到锁的线程会阻塞)。

单向阻塞队列
单向非阻塞队列
双端阻塞队列
双端非阻塞队列

而且,如果大家仔细学习源码,还会发现非阻塞队列往往都是无界的。原因很简单,正因为队列无界,所以才可以无限扩容,永远都有空闲存放数据,所以存放数据的线程才不会阻塞呀。

好了,到此为止,JDK 队列的体系结构已经讲解完了,下面,我想跟大家简单聊聊这些队列的用途。当然,用途也没什么可聊的,说到底,队列只是提供了容器的作用,只是用来存放数据的,所以,队列经常在生产者消费者模型中使用。所谓生产者消费者模型,就是一边生产物品,一遍消费物品。在编程的世界中,这个模型就是一个或多个线程负责生产数据或消息,一个或多个线程负责消费数据或消息。生产数据的线程就叫生产者,消费数据的线程就叫消费者。而队列的存在就起到了中转站的作用,生产者不断把生产的数据全放到队列中,消费者则从队列中不断地获取数据消费,这样,一个完美的生产者消费者模型就实现了。

如果只有一个线程在生产数据,同时也只有一个线程在消费数据,那么我们就可以称当前的生产者消费者模型为单生产者单消费者模型;如果是多个线程在生产和消费数据,这就是多生产者多消费者模型。当然,在这种情况下,多个线程要操纵同一个队列,如果队列是使用数组提供容器的功能,就意味着是多个线程同时操纵一个数组,显然会发生并发问题。所以这就需要队列在提供容器作用的同时,也要尽量解决多线程操作数组的并发问题。因此,有些队列中往往会引入同步锁,最典型的就是 ArrayBlockingQueue 类中定义了 ReentrantLock 成员变量。在多线程模式下,数据入队和出队的时候都要先让线程去争抢 ReentrantLock 锁,得到锁的线程才能真正把数据入队或者取走。至于 ReentrantLock 的使用方法,我相信大家已经很熟悉了,也许有的朋友并没有仔细看过这个类的源码,这没关系,后面会在从零带你写 JDK 专题中为大家手写实现这个类。至于怎么手写实现,这都是后话了。总之,到此为止,关于队列的体系以及用途,我都为大家讲解完了。而且在讲解的过程中,我也心血来潮,想实现一个自己的任务队列。 那我该怎么设计呢?当然是照葫芦画瓢,先仿照着现有的队列实现一下,比如说就参照 ArrayBlockingQueue 队列实现。

实现自己的任务队列

**既然要实现一个自己的任务队列,刚开始还是简单点比较好,所以我就先不考虑双端的情况。在我的程序中,只有一个线程充当生产者,也只有一个线程充当消费者。而且,既然是队列,肯定是需要容器来存放数据的,至于容器,我就选择数组吧。选择的原因很简单,数组的内存是连续的,链表是不连续的,只要我设计好队列中数据先进先出的方式,完全可以使用一块连续的内存来存放这些数据,因为内存地址连续,这样 CPU 访问这些数据会更快一些。至于队列的容量也好说,我选择有界队列,原因也很简单,无界队列可以无限存放数据,如果真的有太多数据要放入队列中,而消费者线程来不及消费的话,这就意味会占用太多内存,很容易出现内存溢出的情况。当然,就算消费者消费的速度也很快,被消费完的数据可能会被垃圾回收,数据越多垃圾回收就会越频繁,这对性能同样是有影响的,因此,有界队列是再好不过的选择。当然,既然是有界队列,这就意味着是队列是有阻塞线程的功能的。当队列满了的时候,生产者线程在继续把数据放入队列中时,会阻塞等待,直到队列中有空闲的位置才可以继续工作,把数据放入队列,消费者线程是同样的道理。现在,我自己要实现的任务队列就已经确定了,是一个单向的阻塞队列。既然是用数组实现的容器,同时也是可以阻塞线程的,就把这个队列叫做 ArrayBlockingQueue 吧,真是不好意思,一不小心就和 JDK 源码中的 ArrayBlockingQueue 队列同名了

上面分析了一大堆,现在总该上一点代码,先给我自己的队列一个大概的轮廓,然后再往内部填充细节。请大家看下面的代码块。

public class ArrayBlockingQueue<E>{

    //存放数据的数组
    final Object[] items;
	//队列中存放数据的个数
    int count;

    //队列的构造方法
    public ArrayBlockingQueue(int capacity) {
        //暂不实现
    }


    //存放数据到队列的方法,不限时阻塞,直到能够放入数据
    public void put(E e) throws InterruptedException {
        //暂不实现
    }

  
  
    //取出数据的方法,不限时阻塞
    public E take() throws InterruptedException {
        //暂不实现
    }
}

好了,上面的代码块已经把我的队列的轮廓描绘清楚了,接下来,就该具体填充内容了。在填充之前,首先要明确一点,那就是队列必须要遵循先进先出的原则,那么,该怎么实现这个原则呢?换句话说,我的队列容器是数组,该如何操控数组,让存入数组的数据满足先进先出的要求呢?其实也不用怎么深思熟虑,直接使用最直接最简单存储数据的方式即可:从数组的头部存放数据,取走数据的时候,也从数组的头部取走数据。就像下图展示得这样。

在上面的简图中,大家可以看到,当有数据想要放入数组中时,会从头向尾按顺序存放,而取走这些数据的时候,也是按照从头向尾的顺序依次取走,这样不就做到了数据的先进先出吗?既然编码方向已经找到了,那该怎么在代码层面上实现呢?最简单的方法就是在 ArrayBlockingQueue 类中定义两个指针,一个是存放数据的指针,简称写指针,一个是取走数据的指针,简称读指针,指针最开始都指向数组的 0 号索引位置。当有数据存放的时候,就把数据放到写指针指向的位置索引,然后把写指针加 1;当有数据要被取走时,就从读指针指向的索引位置获得数据,然后把读指针加 1。这样就做到了写指针是按从前往后的顺序在数组中写入数据,读指针也是按从前往后的顺序在数组中取出数据,做了数据的先进先出。当然,还有一个问题,那就读指针和写指针已经到头了该怎么办?也就是说它们自增之后的值最终和队列容器的长度一样了,这时候该怎么办呢?其实也很简单,重置读写指针为 0 就好了,因为数据肯定是一边存放一边消费的,等读写指针自增到和容器长度一样了,最先存入的数据早就被消费了。所以,这时候就可以从容器数组的头部重新存放数据了。好了,既然编码的思路也确定的,现在就让我来实现一下代码吧。请看下面代码块。

public class ArrayBlockingQueue<E>{

    //存放数据的数组
    final Object[] items;
	//队列中存放数据的个数
    int count;

    //读指针
    int takeIndex;

    //写指针
    int putIndex;

    //队列的构造方法
    public ArrayBlockingQueue(int capacity) {
        //初始化数组
        this.items = new Object[capacity];
    }


    //存放数据到队列的方法,不限时阻塞,直到能够放入数据
    public void put(E e) throws InterruptedException {
        //队列存放数据的个数等于容器数组容量,就意味着队列满了
         while(count == items.length) {
             System.out.println("如果队列满了,生产者线程就在此阻塞");
         }
         System.out.println("队列有位置了,生产者线程被唤醒,继续执行把数据放入数组中");
         //把数据放入队列中
         enqueue(e);
    }
  
  
    //取出数据的方法,不限时阻塞
    public E take() throws InterruptedException {
        //队列存放数据的个数等于0,就意味着队列空了
         while(count == 0) {
             System.out.println("如果队列空了,消费者线程就在此阻塞");
         }
         System.out.println("队列有数据了,消费者线程被唤醒,继续执行把数据取走的操作");
        return dequeue();
    }

    //把数据添加到数组的方法,按顺序从前往后添加
    private void enqueue(E e) {
        //得到容器数组
        final Object[] items = this.items;
        //把数据放到写指针的索引位置
        items[putIndex] = e;
        //写指针加1,然后判断写指针是否等于容器长度
        if(++putIndex == items.length) {
            //等于就重置写指针为0
            putIndex = 0;
        }
        //容器存储数据的个数加1
        count++;
    }

    //把数据从队列容器中拿走的方法,按顺序从前往后拿走
    private E dequeue() {
        //得到容器数组
        final Object[] items = this.items;
        //根据读指针得到要取走的数据
        E e = (E) items[takeIndex];
        //把数组中的对应位置置为null
        items[takeIndex] = null;
        //判断读指针是否等于数组长度,如果等于数组长度,说明读到数组结尾了
        if(++takeIndex == items.length) {
            //重置读指针,可以从数组头部重新读取数据了
            takeIndex = 0;
        }
        //数组存储数据的个数减1
        count--;
        //返回要取走的数据
        return e;
    }
}

上面的代码块已经把我目前的队列代码都展示出来了,当然,存在着一些伪代码,这是无可奈何的,因为我还没有真的实现,队列未满或空的时候,线程阻塞的功能。大家索性就先当成已经实现了,并且程序是可以运行的,很快我就会完善它。现在,请大家看一看目前的队列,假如现在有一个线程要不停地向队列中存放数据,而另一个线程不断地从线程中获取数据消费。这就意味着程序中存在着一个生产者和一个消费者。请看下面的测试类。

public class Test {

    public static void main(String[] args) {

        //创建我自己定义的任务队列
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(16);
  
        //启动一个生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (true) {
                    //在一个循环中一直向队列中放入数据
                    queue.put(i++);
                }
            }
        }).start();

        //启动一个消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    //在一个循环中不断地从队列中取出数据
                    queue.take();
                }
            }
        }).start();
  
    }
}

上面的测试类一旦启动,一个生产者线程就会一直向我定义的队列中放入数据,与此同时,也会有一个消费者线程不断地从队列中取出数据。当队列已满的时候,生产者线程肯定会阻塞,等待消费者线程消费了数据,队列中有空位了,才能继续工作,把数据放到队列中;而消费者线程也是如此,只有当队列中有数据的时候才能真的开始消费,否则就会一直阻塞。程序是不是已经建立起来了,只要我紧接着把线程阻塞的功能实现了,我自己定义的队列就完成了。话是这么说,但还是别高兴的太早。程序一旦可以运行了,就要考虑两个问题,一是安全性,也就是所谓的并发情况;二是性能,但现在谈性能还太早,还是让我们先来看看,程序会不会存在并发问题呢?

要想知道一个程序会不会有并发问题,并不是看看程序中有几个线程在执行任务,而是要找准这几个线程是不是会同时操作某个数据,这才是关键所在。就拿我目前的队列来说吧,容器数组就是这个所谓的数据,很有可能会被几个线程同时操纵,向里面存放或者取走数据。但是,我又在数组之上设计了两个指针,分别为读指针和写指针,现在线程都是和这两个指针打交道,根据指针指向的索引位置,存放或者读取数据。这么一来,请大家想想,是不是只要读写指针不出现并发问题,那么数组存放和取走数据就不会有并发问题? 当然,ArrayBlockingQueue 类中的 count 这个成员变量代表了目前队列中存放数据的个数,这个成员变量可能会被生产者线程和消费者线程同时操纵,显然会出现并发问题。但是在我目前的程序中,我希望大家暂时先忘了它,就把它当作并发安全的,最后我会为大家解决它的并发问题。 好了,还是让我们再次回到读写指针上。请大家想想,我目前的程序中,读写指针有并发问题吗?让我来为大家稍微分析一下。

要想明确读写指针是否有并发问题,就要看看读写指针的值会不会被多个线程同时修改。从我任务队列的 enqueue 和 dequeue 方法中可以看出,实际上,读写指针是互不干涉的,读指针只会在 dequeue 方法中做自增操作,写指针只会在 enqueue 方法中做自增操作。换句话说,读指针只会被消费者线程修改,写指针只会被生产者线程修改。所以,我们只需看看会不会多个生产者线程同时向容器数组中存放数据,这样多个线程都会调用队列的 enqueue 方法,这就意味着多个线程要同时让写指针做自增操作,如果不采取同步措施,很容易出现写指针的并发问题。同理,如果有多个消费者线程同时从容器数组中取出数据,就意味着都会调用 dequeue 方法,同时让读指针做自增操作,也会出现并发问题。但是,幸运的是,我目前的程序中只存在一个生产者线程和一个消费者线程,再没有其他生产者或者消费者线程了。这就是说,在同一时间只会有一个生产者线程调用 enqueue 方法,修改写指针的值,只会有一个消费者线程调用 dequeue 方法,修改读指针的值。显然不会有什么并发问题。因此,现在我可以十分自信地说一句,我实现的队列,在单生产者和单消费者模式下,不会出现并发问题。所以,现在只要实现了队列已满或为空时阻塞线程的功能,就大功告成了。

要实现这个阻塞功能简直太简单了,直接引入一个同步锁就行了,调用同步锁的 wait 和 notify 方法;或者就在我的 ArrayBlockingQueue 类中,引入 ReentrantLock,用 ReentrantLock 创建两个 Condition 对象。调用 Condition 的 await 和 signal 方法来实现线程的阻塞和唤醒。这也太简单了,对吧?就是因为太简单了,才让我忽然意识到,我自己的实现的 ArrayBlockingQueue 类怎么和 Doug Lea 实现的越来越像了?如果大家看过 ArrayBlockingQueue 的源码就会发现,我就是在对着 ArrayBlockingQueue 源码照抄而已。怎么搞的?和 Doug Lea 手写 ArrayBlockingQueue 类越来越像并不能证明我的编程功底深厚,只能证明我抄得太好了。他娘的,好好地我怎么开始抄袭了?

我是个有追求的程序员,当然不能干抄袭的事,所以,就算我心里再想使用同步锁的阻塞当前线程的方法,我也不会使用了。属于自绝后路了,非要开辟一个新的阻塞线程的道路。当然,我也不全是为了证明自己才不使用同步锁的阻塞线程的方法的,还有一个很重要的原因,那就是我的程序目前并没有并发问题,我脑子发昏了才会引入同步锁( 当然,还是请大家先忽略 count 带来的并发问题 )。那么,如果不使用同步锁,还有什么方法能使线程阻塞一会呢?如果对 JDK 的 JUC 包下的类了解一些的话,大家肯定知道 LockSupport 这个类,这个类中提供了很多静态方法,可以操纵线程的阻塞和唤醒。所以,我完全可以在程序中使用 LockSupport 类中的静态方法,比如 LockSupport 的 park 和 unpark 方法。如果我只想让线程短暂的睡一会,那么还可以调用该类的 parkNanos(long nanos) 方法。总之方法有很多,我有很多种选择来和 Doug Lea 一争高下。既然编码思路也已经确定了,那就立刻让我重构代码吧。

但是,在重构之前我忽然想到,既然我在目前的程序中打算使用不同于 JDK 的 ArrayBlockingQueue 中阻塞线程的策略,这其实就在告诉我,所谓的阻塞策略,其实是可以扩展的,或者说是必须可扩展的。这是一个很明显的扩展点,可能那一天我又厌烦了 LockSupport 实现的阻塞策略,想使用另一种阻塞策略了。就比如说,后面我的队列肯定要在多生产者和多消费者模型下使用,这个时候肯定就要使用同步锁来解决并发问题,如果引入了同步锁,那我就可以直接使用同步锁阻塞线程的策略了。当然,这都是后话了,总之,大家从中可以意识到,阻塞策略不能在代码中写死,写成固定不变的,一定要用接口暴露给用户,让用户可以自己定义。这样一来,也为以后重构程序减轻了一些压力。我想,这种以接口为扩展点的编程套路,应该都已经深深刻在大家的心里了。

所以,接下来我就要先为我的程序定义一个阻塞策略的接口,这个接口的实现类就是真正可以使用的阻塞策略。

引入 WaitStrategy 接口

首先先定义一个阻塞策略的接口,我就把它叫做 WaitStrategy。请看下面代码块。

public interface WaitStrategy{
  
    void waitFor();

}

可以看到,WaitStrategy 接口中只有一个 waitFor 方法,就是让线程等待的。接口定义好了,接下来就该定义一个实现类了,所以,现在重点就成了这个接口的实现类该怎么定义。既然是使用 LockSupport 的静态方法,我们不妨先来分析一下。怎么使用 LockSupport 中的方法,因为我意识到 LockSupport 中的 unpark 方法必须传入被阻塞的线程,才能把线程唤醒。如果我们要使用 unpark 方法来唤醒生产者或者消费者线程,这就意味着生产者线程要唤醒消费者线程时,必须得到消费者的线程,换成消费者唤醒生产者也是一样。这也太麻烦了,再说,多线程并发的情况下,多个生产者阻塞了,我怎么知道唤醒的是哪个生产者线程?显然,使用 park 和 unpark 这对方法是行不通的。既然行不通,不如就换个思路,只让线程限时阻塞好了。也就是说,让线程阻塞一会然后就自己醒过来,不用任何其他的线程来唤醒。为什么可以这么做呢?因为在生产者和消费者模型中,不出意外的话,生产者和消费者的速度应该是差不多的,即便生产者赶不上消费者的速度,消费者赶不上生产者的速度,也不会相差太多。在 CPU 层面上,假如消费者线程消费得比较慢,导致队列中没有空间了,可能生产者线程等待个 100 纳秒,消费者就把数据消费而腾出新的空间了。所以,在我现在定义的阻塞策略中,我当然可以让线程睡个 100 纳秒,然后自己醒过来,看看是不是有空闲可以存放数据了。恰好 LockSupport 中就有一个方法实现了这个功能,就是 parkNanos(long nanos) 方法,所以我直接调用它就行了。编码的思路已经分析完了,接下来就可以实现了。

既然是让线程自己睡一会,然后再醒过来,我就把这个阻塞策略接口的实现类定义为 SleepingWaitStrategy。请看下面代码块。

public final class SleepingWaitStrategy implements WaitStrategy{

    //无参构造方法
    public SleepingWaitStrategy(){
  
    }

    @Override
    public void waitFor(){
        //如果队列已满或者队列已空,就让生产者或消费者线程自己睡100纳秒
        LockSupport.parkNanos(sleepTimeNs);
    }
}

这么一来,我刚才的队列代码就要重构一下了。当然,大家应该能想到了,既然要在我自己的 ArrayBlockingQueue 类中使用阻塞策略,那这个阻塞策略难免要定义成这个类的成员变量了。请看下面代码。

public class ArrayBlockingQueue<E>{

    //存放数据的数组
    final Object[] items;
	//队列中存放数据的个数
    int count;

    //读指针
    int takeIndex;

    //写指针
    int putIndex;

    //阻塞策略,程序默认的
    WaitStrategy wait = new SleepingWaitStrategy();

    //队列的构造方法
    public ArrayBlockingQueue(int capacity) {
        //初始化数组
        this.items = new Object[capacity];
    }

    //队列的构造方法,如果用户设置了阻塞策略,就把阻塞策略替换成用户设置的
    public ArrayBlockingQueue(int capacity,WaitStrategy waitStrategy) {
        //初始化数组
        this.items = new Object[capacity];
        this.wait = waitStrategy;
    }


    //存放数据到队列的方法,不限时阻塞,直到能够放入数据
    public void put(E e) throws InterruptedException {
        //队列存放数据的个数等于容器数组容量,就意味着队列满了
         while(count == items.length) {
             //每过100纳秒线程就会自动醒过来,判断存放数据
             //的个数是否和数组容量相等,如果相等说明没有空间,仍然会睡一会
             wait.waitFor();
         }
         //把数据放入队列中
         enqueue(e);
    }
  
  
    //取出数据的方法,不限时阻塞
    public E take() throws InterruptedException {
        //队列存放数据的个数等于0,就意味着队列空了
         while(count == 0) {
             //每过100纳秒线程就会自动醒过来,判断存放数据
             //的个数是否和0相等,如果相等说明没有数据,仍然会睡一会
             wait.waitFor();
         }
        return dequeue();
    }

    //把数据添加到数组的方法,按顺序从前往后添加
    private void enqueue(E e) {
        //得到容器数组
        final Object[] items = this.items;
        //把数据放到写指针的索引位置
        items[putIndex] = e;
        //写指针加1,然后判断写指针是否等于容器长度
        if(++putIndex == items.length) {
            //等于就重置写指针为0
            putIndex = 0;
        }
        //容器存储数据的个数加1
        count++;
    }

    //把数据从队列容器中拿走的方法,按顺序从前往后拿走
    private E dequeue() {
        //得到容器数组
        final Object[] items = this.items;
        //根据读指针得到要取走的数据
        E e = (E) items[takeIndex];
        //把数组中的对应位置置为null
        items[takeIndex] = null;
        //判断读指针是否等于数组长度,如果等于数组长度,说明读到数组结尾了
        if(++takeIndex == items.length) {
            //重置读指针,可以从数组头部重新读取数据了
            takeIndex = 0;
        }
        //数组存储数据的个数减1
        count--;
        //返回要取走的数据
        return e;
    }
}

到此为止,我的 ArrayBlockingQueue 就已经重构完成了,当然,它的一切都是建立在单生产者和单消费者模型之上,并且仍然不考虑 count 这个成员变量可能带来的并发问题。那么,这就意味着程序已经完美了吗?当我提出这个问题的时候,肯定就意味着程序还有可以改进的地方。就比如说我刚才为自己的队列实现的阻塞策略中,直接就让线程阻塞了,并且阻塞 100 纳秒。但是还是让我们再仔细想想,我们认知的 100 纳秒和 CPU 认知的 100 纳秒根本不同,100 纳秒的时间对我们来说连放个屁都不够用,但对 CPU 来说,这时间可能不仅能放个屁,还能把裤子脱了再放屁。而回到生产者和消费者模型中,CPU 执行程序的时候速度是极快的,就算消费者或生产者的速度跟不上,可能并不需要等待 100 纳秒就可以让线程继续工作了。所以,我想接着改造一下刚才实现的阻塞策略,我希望线程需要阻塞的时候,不必立刻就阻塞,而是先空转一会,如果空转一会还是不能继续工作,这时候就可能出现一种情况,生产者线程或者消费者速度不够快了,可能是消费速度跟不上了,也可能是生产速度更不上了,这时候,再让线程睡过去,并且睡了 100 纳秒后,如果还是不能继续工作,就再次睡过去。其实就是让线程分阶段阻塞,一开始并不直接阻塞,而是空转,处于随时待命的状态;如果待命不成功,才真正阻塞过去。好了,代码的编写逻辑又一次分析好了,那该怎么去实现呢?请看下面的代码块。

public final class SleepingWaitStrategy implements WaitStrategy{

    //默认的自旋次数,自旋就是空转
    private static final int DEFAULT_RETRIES = 200;
    //默认的睡眠时间
    private static final long DEFAULT_SLEEP = 100;
	//自旋次数
    private  int retries;
    //睡眠时间
    private final long sleepTimeNs;

    //无参构造方法
    public SleepingWaitStrategy(){
        this(DEFAULT_RETRIES, DEFAULT_SLEEP);
    }

    //这里多出来一个retries成员变量的set方法,因为线程结束阻塞后要重置retries属性为200
    public void setRetries(int retries) {
        this.retries = retries;
    }

    public SleepingWaitStrategy(int retries, long sleepTimeNs){
        //自旋次数被赋值为200
        this.retries = retries;
        //睡眠时间被赋值为100
        this.sleepTimeNs = sleepTimeNs;
    }

    //while循环是在ArrayBlockingQueue类的put方法中
    //waitFor方法就是在一个while循环中的,这个要理清楚
    //在这个方法中,线程阻塞结束之后需要将retries属性重置为200
    //否则线程下一次需要阻塞的时候,retries不是200,就会少自旋甚至根本不自旋就进入阻塞了
    //但是该怎么在该方法中重置retries属性呢?就在ArrayBlockingQueue类中重置,因为在下面的
    //方法中无法判断是否阻塞结束了,大的while循环在ArrayBlockingQueue类中呢
    @Override
    public void waitFor(){
  
        //这意味着自旋次数是200,也就是线程刚要被阻塞之前
        //所谓自旋,就是让线程空转,就是下面这样,每空转1次,就让自旋次数减1
        if (retries > 100){
            --retries;
        }
        //如果自旋次数小于100,大于0了,说明已经自旋了很多次了
        //但还是不能继续向下工作,这时候尝试让该线程让出CPU
        else if (retries > 0){
            --retries;
            Thread.yield();
        }
        else{//走到这里意味着自旋次数到达200了,这时候就干脆让线程睡一会吧
            //睡的时间就是100纳秒,不能睡得太久,因为生产者可能随时发布新的数据
            LockSupport.parkNanos(sleepTimeNs);
        }
    }
}

阻塞策略重构了,我的 ArrayBlockingQueue 其实也要重构一小下。请看下面代码块。

public class ArrayBlockingQueue<E>{

    //只展示部分代码
  
    //这个方法有新的改动
    public void put(E e) throws InterruptedException {
        //队列存放数据的个数等于容器数组容量,就意味着队列满了
         while(count == items.length) {
             //每过100纳秒线程就会自动醒过来,判断存放数据
             //的个数是否和数组容量相等,如果相等说明没有空间,仍然会睡一会
             wait.waitFor();
         }
         //走到这里就意味着线程可能没有阻塞
         //也可能是阻塞结束了,总之,不管线程有没有真的阻塞过
         //都要在这里重置一下SleepingWaitStrategy的retries属性
         wait.setRetries(200);
         //把数据放入队列中
         enqueue(e);
    }
  
  
    //上面的方法改动了,消费者取走数据的方法也要改动一下啊,都涉及到线程阻塞的问题
    //肯定要改一下
    public E take() throws InterruptedException {
         while(count == 0) {
             wait.waitFor();
         }
        wait.setRetries(200);
        return dequeue();
    }
}

这样一来,我的程序就有一次重构好了。让我再来为大家简单分析一下。首先要明确一下,在我目前的单生产者单消费者模型中,程序内的生产者线程和消费者线程同一时间,只可能有一个会发生阻塞。因为生产者阻塞的时候,消费者肯定在拼命消费数据;消费者阻塞的时候,生产者肯定在拼命生产数据。所以,在 ArrayBlockingQueue 类中,虽然 SleepingWaitStrategy 类的 retries 属性分别出现在了 put 和 take 方法中,要被两个线程操纵,但是并不会发生并发问题。因为如果生产者和消费者线程都不阻塞的话,retries 的值就不会变,再怎么赋值 200,也和原来一样。如果有线程阻塞了,也只是一个线程阻塞,一个线程改变 retries 的值,然后重新赋值为 200,不会有另一个线程来干扰。所以,这种处理方式是不会给 retries 带来并发问题的。但是,容我说一句,这样写代码太丑了,我实在受不了。因为我本来可以把 retries 的重置放在阻塞策略 SleepingWaitStrategy 本身中的,甚至我也可以把 put 和 take 方法中的 while 循环放到 SleepingWaitStrategy 类中。代码写得丑陋,还引入了更复杂的并发情况,相当于把程序写死了


标题:单生产者单消费者模式队列的搭建(一)
作者:xiaohugg
地址:https://xiaohugg.top/articles/2024/03/05/1709602358125.html

人民有信仰 民族有希望 国家有力量