澳门新葡萄京官网首页Java并发容器与框架(一)阻塞队列

队列以一种先进先出的格局管理数据。假如您筹算向一个早已满了的围堵队列中增多一个成分,或是从叁个空的窒碍队列中移除三个因素,将形成线程拥塞。在四线程进行同盟时,堵塞队列是很有用的工具。工小编线程能够定时的把高级中学级结果存到拥塞队列中。而此外工笔者线程把高级中学级结果抽取并在明日涂改它们。队列会自动平衡负载。假若第一个线程集运转的比第3个慢,则第四个线程集在等候结果时就能够拥塞。假若第一个线程集运维的快,那么它将翘首以待第一个线程集超出来。

大家曾经见到了变异Java并发程序设计根基的平底构建块。然则,对于实际编制程序来讲,应该尽量远隔底层布局。使用由并发管理的专门的工作职员实现的较高档案的次序的组织要惠及得多、安全得多。

java并发:堵塞队列,java并发队列

率先节 窒碍队列

1.1 初识拥塞队列

  队列以一种先进先出的不二法门管理数据,拥塞队列(BlockingQueue)是三个支撑八个附加操作的行列,那三个叠加的操作是:在队列为空时,获取成分的线程会等待队列变为非空;当队列满时,存款和储蓄成分的线程会等待队列可用。在十二线程进行合营时,拥塞队列是很有用的工具。

  坐褥者-消费者情势:窒碍队列常用于坐蓐者和买主的情景,生产者线程能够按时的把高级中学级结果存到堵塞队列中,而客商线程把高中级结果收取并在现在涂改它们。队列会自行平衡负载,借使劳动者线程集运营的比客户线程集慢,则购买者线程集在守候结果时就能够窒碍;假如劳动者线程集运转的快,那么它将翘首以待客商线程集高出来。

    澳门新葡萄京官网首页 1

  简单说贝因美(Beingmate卡塔尔(قطر‎下怎么领悟上表,比方说堵塞队列的插入方法,add(eState of Qatar、offer(e卡塔尔(قطر‎、put(e卡塔尔(قطر‎等均为绿灯队列的插入方法,但它们的管理方式不平等,add(e卡塔尔(قطر‎方法恐怕会抛出十三分,而put(e卡塔尔国方法规大概直接处于窒碍状态,下面演讲一下这一个管理方式:

  A、抛出特别:所谓抛出十分是指当窒碍队列满时,再往队列里插入成分,会抛出IllegalStateException(“Queue
full”卡塔尔(قطر‎十分;当队列为空时,从队列里获取成分时会抛出NoSuchElementException非凡。

  B、重回特殊值:插入方法,该方法会重回是不是中标,成功则赶回true;移除方法,该方法是从队列里拿出二个因素,若无则赶回null

  C、从来不通:当梗塞队列满时,若是劳动者线程往队列里put成分,队列会间接不通生产者线程,直到将数据放入队列或是响应中断退出;当队列为空时,消费者线程试图从队列里take成分,队列也会一向不通消费者线程,直到队列可用。

  D、超时退出:当拥塞队列满时,队列会梗塞生产者线程一段时间,要是超越一定的岁月,坐褥者线程就能够脱离。

 

1.2 Java中的堵塞队列

  java.util.concurrent包提供了三种不一致情势的封堵队列,如数组堵塞队列ArrayBlockingQueue、链表梗塞队列LinkedBlockingQueue、优先级梗塞队列PriorityBlockingQueue和延时队列DelayQueue等,上面简介一下这多少个闭塞队列:

  数组窒碍队列:ArrayBlockingQueue是一个由数组帮衬的有界拥塞队列,内部维持着一个定长的多寡缓冲队列(该队列由数组构成),此行列根据先进先出(FIFO)的法规对成分进行排序,在布局时索要给定容积。ArrayBlockingQueue内部还保存着两个整形变量,分别标记着队列的尾部和尾巴部分在数组中之处。

  对于数组梗塞队列,能够筛选是不是要求公平性,所谓公平访问队列是指窒碍的富有生产者线程或消费者线程,当队列可用时,能够依据堵塞的前后相继顺序访问队列,即先梗塞的临盆者线程,能够先往队列里插入成分,先窒碍的主顾线程,能够先从队列里得到成分。平时,公平性会令你在质量上付出代价,独有在真的拾壹分须要的时候再使用它。

  我们得以应用以下代码创立一个公正的短路队列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

  数组拥塞队列的公平性是利用可重入锁落成的,其布局函数代码如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

  链表阻塞队列:LinkedBlockingQueue依据链表的有界窒碍队列,内部维持着三个数量缓冲队列(该队列由链表构成),此行列遵照先进先出的标准化对成分实行排序。当分娩者往队列中放入贰个数额时,队列会从劳动者手中获取数据,并缓存在队列之中,而分娩者立刻回去;唯有当队列缓冲区达到最大值缓存体积时(能够因此LinkedBlockingQueue的构造函数内定该值),才会卡住临蓐者队列,直到消费者从队列中消费掉一份数据,生产者线程将会被晋升,反之对于客商这端的处理也依照相符的原理。须求注意的是,假如组织一个LinkedBlockingQueue对象,而从不点名其容积大小,LinkedBlockingQueue会暗许三个周边Infiniti大小(Integer.马克斯_VALUE)的体量,那样的话,假使劳动者的速度一旦超越消费者的快慢,也许还并未等到行列满堵塞发生,系统内部存款和储蓄器就有望早就被消耗殆尽了。

  LinkedBlockingQueue之所以能够相当的慢的管理并发数据,是因为其对于生产者端和消费者端独家选拔了单身的锁来调节数据同步,那也象征在高并发的情状下生产者和买主能够互相地操作队列中的数据,以此来加强全部队列的现身质量。

  优先级堵塞队列:PriorityBlockingQueue是一个扶助先行级排序的无界窒碍队列,暗许情状下成分运用自然顺序排列,也足以经过结构函数传入的Compator对象来支配。在落到实处PriorityBlockingQueue时,内控线程同步的锁接受的是公平锁。需求在乎的是PriorityBlockingQueue并不会卡住数据坐褥者,而只是在未有可费用的数码时打断数据的顾客,因而利用的时候要特别注意,生产者临蓐数据的进程绝对不可能快于购买者成本数量的速度,不然时间一长,会最后耗尽全数的可用堆内部存款和储蓄器空间。

  延时队列:DelayQueue是一个帮忙延时获取成分的应用优先级队列达成的无界拥塞队列。队列中的成分必得贯彻Delayed接口和Comparable接口(用以钦点成分的依次),约等于说DelayQueue里面包车型客车要素必得有public
void compareTo(ToState of Qatar和long getDelay(TimeUnit
unit卡塔尔(قطر‎方法存在;在开立成分时能够钦点多长期本领从队列中收获当前因素,独有在延迟期满时工夫从队列中领到成分。

  链表双向拥塞队列:LinkedBlockingDeque是由一个链表构造构成的双向堵塞队列。所谓双向队列指的是您能够从队列的双面插入和移出成分,双端队列因多了二个操作入口,在四十三十二线程同一时候入队时减弱了八分之四的竞争。在伊始化LinkedBlockingDeque时,能够设置容积,幸免其联网膨胀,比较其余的短路队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等艺术,以First单词结尾的法子,表示插入,获取(peek)或移除双端队列的率先个要素;以Last单词结尾的点子,表示插入,获取或移除双端队列的最后三个因素;插入方法add等同于addLast,移除方法remove等同于removeFirst。双向梗塞队列能够选择在“职业盗取”方式中。

  链表传输队列:LinkedTransferQueue是三个由链表构造构成的无界传输窒碍队列,绝对于其余堵塞队列,LinkedTransferQueue多了tryTransfer(卡塔尔(قطر‎方法和transfer(卡塔尔国方法。

  transfer(卡塔尔国方法:假设当前有消费者正在等待选用成分(消费者使用take(卡塔尔方法或带时间范围的poll(卡塔尔国方法),transfer(卡塔尔方法能够把劳动者传入的要素马上传输给买主;若无客商在伺机选用成分,transfer(卡塔尔(قطر‎方法会将成分贮存到行列的tail节点,并等到该因素被消费者花费了才回到。

  transfer(卡塔尔方法的第一代码如下:

Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);

  代码解说:第一行代码是寻思把存放在当前成分的s节点作为tail节点,第二行代码是让CPU自旋等待顾客成本成分。因为自旋会损耗CPU,所以自旋一定的次数后使用Thread.yield(State of Qatar方法来脚刹踏板当前正值实施的线程,并实行此外线程。

  tryTransfer(卡塔尔方法:该形式是用来试探分娩者传入的因素是不是能直接传给消费者,若无顾客等待接收成分,则赶回false。与transfer(卡塔尔方法的界别:tryTransfer(卡塔尔方法是马上重临(无论消费者是或不是抽取),transfer(State of Qatar方法是必需等到买主花费了才回到。对于富含时间约束的tryTransfer(E
e, long timeout, TimeUnit
unitState of Qatar方法,则是意欲把劳动者传入的因素直接传给购买者,不过倘诺未有顾客花费该成分则等待钦定的光阴过后再回到,假使超时尚未花费成分,则赶回false,如若在逾期时间内开销了成分,则赶回true。

  欲精晓LinkedTransferQueue的越多内容,可查看以下小说:

  SynchronousQueue:SynchronousQueue是一种无界、无缓冲的堵截队列,可以以为SynchronousQueue是叁个缓存值为1的短路队列,然而SynchronousQueue内部并不曾多少缓存空间,数据是在杂交的劳动者和消费者线程之间直接传送的。能够那样来明白:SynchronousQueue是二个传球手,SynchronousQueue不存款和储蓄数据成分,队列头成分是第二个排队要插入数据的线程,实际不是要换到的数额,SynchronousQueue负担把劳动者线程管理的多少直接传送给消费者线程,临蓐者和消费者相互影响等待对方,握手,然后协作离开。SynchronousQueue的吞吐量高于LinkedBlockingQueue
和 ArrayBlockingQueue。

 

1.4 详解SynchronousQueue

【本小节首要摘自参谋资料,作为读书笔记O(∩_∩)O】

(1)认识SynchronousQueue 

  SynchronousQueue的isEmpty(卡塔尔(قطر‎方法永世重返true,remainingCapacity(卡塔尔(قطر‎方法恒久重临0,remove(State of Qatar和removeAll(卡塔尔国 方法永恒重临false,iterator(卡塔尔(قطر‎方法永世再次回到null,peek(卡塔尔(قطر‎方法恒久再次来到null,故大家不能够经过调用peek(卡塔尔国方法来看队列中是或不是有数据成分,因为数量成分唯有当你试着取走的时候才或然存在,不取走而只想偷窥一下是不行的,同样遍历那几个行列的操作也是不许的。

  SynchronousQueue的三个应用意况是在线程池里,Executors.newCachedThreadPool(卡塔尔(قطر‎就接收了SynchronousQueue,那一个线程池依据必要创制新的线程(新职责到来时),当然假若有空暇线程的话,则会复用这个线程。

(2)SynchronousQueue福寿无疆机制

  梗塞算法的贯彻普通是在此中使用三个锁来承保在八个线程中的put(卡塔尔国和take(State of Qatar方法是串行实施的,如下代码是相仿put(卡塔尔和take(State of Qatar方法的贯彻:

public class NativeSynchronousQueue<E> {
    boolean putting = false;
    E item = null;

    public synchronized E take() throws InterruptedException {
        while (item == null)
            wait();
        E e = item;
        item = null;
        notifyAll();
        return e;
    }

    public synchronized void put(E e) throws InterruptedException {
        if (e==null) return;
        while (putting)
            wait();
        putting = true;
        item = e;
        notifyAll();
        while (item!=null)
            wait();
        putting = false;
        notifyAll();
    }
}

  经典同步队列的落到实处应用了多个实信号量,代码如下:

public class SemaphoreSynchronousQueue<E> {
    E item = null;
    Semaphore sync = new Semaphore(0);
    Semaphore send = new Semaphore(1);
    Semaphore recv = new Semaphore(0);

    public E take() throws InterruptedException {
        recv.acquire();
        E x = item;
        sync.release();
        send.release();
        return x;
    }

    public void put (E x) throws InterruptedException{
        send.acquire();
        item = x;
        recv.release();
        sync.acquire();
    }
}

  Java5中SynchronousQueue的落到实处相对来讲做了有的优化,它只使用了叁个锁,使用队列代替时限信号量,允许公布者直接透露数量,并非要首先从绿灯在频域信号量处被唤醒,代码如下:

public class Java5SynchronousQueue<E> {
    ReentrantLock qlock = new ReentrantLock();
    Queue waitingProducers = new Queue();
    Queue waitingConsumers = new Queue();

    static class Node extends AbstractQueuedSynchronizer {
        E item;
        Node next;

        Node(Object x) { item = x; }
        void waitForTake() { /* (uses AQS) */ }
           E waitForPut() { /* (uses AQS) */ }
    }

    public E take() {
        Node node;
        boolean mustWait;
        qlock.lock();
        node = waitingProducers.pop();
        if(mustWait = (node == null))
           node = waitingConsumers.push(null);
         qlock.unlock();

        if (mustWait)
           return node.waitForPut();
        else
            return node.item;
    }

    public void put(E e) {
         Node node;
         boolean mustWait;
         qlock.lock();
         node = waitingConsumers.pop();
         if (mustWait = (node == null))
             node = waitingProducers.push(e);
         qlock.unlock();

         if (mustWait)
             node.waitForTake();
         else
            node.item = e;
    }
}

  Java6中SynchronousQueue的兑现应用了一种属性更加好的无锁算法——扩展的“Dual
stack and Dual queue”算法,质量比Java5的贯彻有不小进级。

  声Bellamy个SynchronousQueue有二种区别的章程,扶植正义和非公平二种角逐机制,它们中间全部不太同样的行为,同等看待格局和非公平形式的分别:若果接纳公平形式,SynchronousQueue会选用公平锁,并协作一个FIFO队列来梗塞多余的生产者和消费者;若是是非公平形式(SynchronousQueue暗中认可),SynchronousQueue会采纳非公平锁,同一时间相称叁个LIFO队列来处理多余的劳动者和买主。两个质量异常,平常景观下,Fifo平时能够支撑越来越大的吞吐量,但Lifo能够越来越大程度的保证线程的本地化,需求介意的是,若使用非公平格局,假诺劳动者和购买者的管理速度有差异,则比较轻松并发饥渴的景色(大概有个别临蓐者恐怕消费者的数据永久都得不到拍卖)。

(3)参照他事他说加以考察资料

(1)

(2)

 

 

第2节 使用示例

2.1 生产者-消费者示例

叁个劳动者-N个买主,程序功用:在三个索引及它的全部子目录下寻觅全数文件,打字与印刷出含有钦赐关键字的文书列表。

package com.test;

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class BlockingQueueTest {
    public static void main(String[] args) {
        Scanner in = new Scanner(System.in);
        System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
        String directory = in.nextLine();
        System.out.print("Enter keyword (e.g. volatile): ");
        String keyword = in.nextLine();
        final int FILE_QUEUE_SIZE = 10;
        final int SEARCH_THREADS = 100;
        BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
        FileEnumerationTask enumerator = new FileEnumerationTask(queue,new File(directory));
        new Thread(enumerator).start();
        for (int i = 1; i <= SEARCH_THREADS; i++)
            new Thread(new SearchTask(queue, keyword)).start();
    }
}

/**
 * This task enumerates all files in a directory and its subdirectories.
 */
class FileEnumerationTask implements Runnable {
    /**
     * Constructs a FileEnumerationTask.
     * 
     * @param queue
     *            the blocking queue to which the enumerated files are added
     * @param startingDirectory
     *            the directory in which to start the enumeration
     */
    public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory) {
        this.queue = queue;
        this.startingDirectory = startingDirectory;
    }

    public void run() {
        try {
            enumerate(startingDirectory);
            queue.put(DUMMY);
        } catch (InterruptedException e) {
        }
    }

    /**
     * Recursively enumerates all files in a given directory and its
     * subdirectories
     * 
     * @param directory
     *            the directory in which to start
     */
    public void enumerate(File directory) throws InterruptedException {
        File[] files = directory.listFiles();
        for (File file : files) {
            if (file.isDirectory())
                enumerate(file);
            else
                queue.put(file);
        }
    }

    public static File DUMMY = new File("");
    private BlockingQueue<File> queue;
    private File startingDirectory;
}

/**
 * This task searches files for a given keyword.
 */
class SearchTask implements Runnable {
    /**
     * Constructs a SearchTask.
     * 
     * @param queue
     *            the queue from which to take files
     * @param keyword
     *            the keyword to look for
     */
    public SearchTask(BlockingQueue<File> queue, String keyword) {
        this.queue = queue;
        this.keyword = keyword;
    }

    public void run() {
        try {
            boolean done = false;
            while (!done) {
                File file = queue.take();
                if (file == FileEnumerationTask.DUMMY) {
                    queue.put(file);
                    done = true;
                } else
                    search(file);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
        }
    }

    /**
     * Searches a file for a given keyword and prints all matching lines.
     * 
     * @param file
     *            the file to search
     */
    public void search(File file) throws IOException {
        Scanner in = new Scanner(new FileInputStream(file));
        int lineNumber = 0;
        while (in.hasNextLine()) {
            lineNumber++;
            String line = in.nextLine().trim();
            if (line.contains(keyword))
                System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber,
                        line);
        }
        in.close();
    }

    private BlockingQueue<File> queue;
    private String keyword;
}

解释:上述顺序展现了如何接收梗塞队列来调控线程集,分娩者线程枚举在全部子目录下的有着文件并把它们放到一个拥塞队列中,相同的时候大家还运行了汪洋的检索线程,各种寻找线程从队列中抽取三个文件,展开它,打字与印刷出含有关键字的保有行,然后抽取下三个文本。

  在上述代码中,大家运用了叁个小才能来在干活完结后停下线程,为了发出实现非确定性信号,枚举线程把一个假造对象归入队列,当搜索线程取到这一个虚构对象时,就将其放回并结束(那看似于在行李输送带上放叁个写着“最后二个包”的设想包)。

留意:在此个程序中,大家选取的是ArrayBlockingQueue,使用队列数据布局作为一种合作机制,这里无需人其它显示的线程同步。

相对来说解析:

  ArrayBlockingQueue在劳动者归入数据和消费者获取数据,都以共用同二个锁对象,由此也表示双方不能够真正相互影响运维,这一点更为分歧于LinkedBlockingQueue;依照得以达成原理来深入分析,ArrayBlockingQueue完全能够运用抽离锁,进而达成临盆者和消费者操作的通通并行运转。DougLea之所以没那样去做,恐怕是因为ArrayBlockingQueue的数额写入和获取操作已经足足轻松,以至于引入独立的锁机制,除了给代码带来拾分的目不暇接外,其在性质上完全占不到任何方便。

  ArrayBlockingQueue和LinkedBlockingQueue间还恐怕有二个鲜明的差异的地方在于,前面三个在插入或删除成分时不会发出或销毁任何附加的靶子实例,而后人则会调换四个外加的Node对象,那在长日子内亟待快捷并发地管理大量数码的系统中,其对于GC的影响照旧存在必然的区分。

 

2.2 DelayQueue使用示例

小编们得以将延时队列DelayQueue运用在偏下场景中:

  (1)缓存系统的筹算:能够用DelayQueue保存缓存成分的保质期,使用五个线程循环查询DelayQueue,一旦能从DelayQueue中赢得成分时,表示缓存保藏期到了。

  (2)依期职责调整:使用DelayQueue保存当天就要履行的天职和试行时间,一旦从DelayQueue中获取到义务就从头实行任务,例如TimerQueue正是使用DelayQueue达成的。

DelayQueue使用实譬喻下:

(1)达成三个Student对象作为DelayQueue的成分,Student必需得以达成Delayed接口的五个措施

package com.test;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Student implements Delayed {//必须实现Delayed接口

    private String name;
    private long submitTime;// 交卷时间
    private long workTime;// 考试时间

    public Student(String name, long submitTime) {
        this.name = name;
        this.workTime = submitTime;
        this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
        System.out.println(this.name + " 交卷,用时" + workTime);
    }

    public String getName() {
        return this.name + " 交卷,用时" + workTime;
    }

    //必须实现getDelay方法
    public long getDelay(TimeUnit unit) {
        //返回一个延迟时间
        return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
    }

    //必须实现compareTo方法
    public int compareTo(Delayed o) {
        Student that = (Student) o;
        return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
    }

}

(2)主线程程序

package com.test;
import java.util.concurrent.DelayQueue;
public class DelayQueueTest {
    public static void main(String[] args) throws Exception {

        // 新建一个等待队列
        final DelayQueue<Student> bq = new DelayQueue<Student>();
for (int i = 0; i < 5; i++) {
            Student student = new Student("学生"+i,Math.round((Math.random()*10+i)));
            bq.put(student); // 将数据存到队列里!
        }
        //获取但不移除此队列的头部;如果此队列为空,则返回 null。
        System.out.println(bq.peek().getName());
    }
}

上述程序运转结果如下:

学生0 交卷,用时8
学生1 交卷,用时9
学生2 交卷,用时4
学生3 交卷,用时9
学生4 交卷,用时12
学生2 交卷,用时4

 

 

其三节 使用拥塞式队列管理大数量

 鄙人一时尚未色金属研究所究那某个剧情,此处仅贴出多个财富,以供后续学习

(1)

(2)

 

第2节 参谋资料

(1)

第4节 拥塞队列
1.1 初识堵塞队列
队列以一种先进先出的点子管理数据,梗塞队列(BlockingQueue)是叁个…

上面包车型客车程序展现了什么样运用梗塞队列来调节线程集。程序在贰个目录及它的全部子目录下搜寻全部文件,打字与印刷出含有钦赐关键字的文本列表。

对此许三十二线程难点,能够通过运用三个或多少个种类以温婉且安全的法子将其格局化。临蓐者线程向队列插入成分,消费者线程则收取他妈。使用队列,能够安全地从八个线程向另叁个线程传递数据。比如,思忖银行转账程序,转账线程将转速指令对象插入到一个系列中,并非一贯访谈银行对象。另贰个线程从队列中抽取指令转账。独有该线程能够访问该银行对象的里边。因而不须要一块。(当然,线程安全的行列类的达成者一定要思索锁和规范,但是,那是他俩的难点实际不是你的主题素材。)

java.util.concurrent包提供了堵截队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。大家用的是ArrayBlockingQueue。ArrayBlockingQueue在协会时必要给定体量,并可以选取是还是不是供给公平性。如若公平参数被设置了,等待时间最长的线程会先行获得处理。日常,公平性会让你在性质上付出代价,唯有在真的十三分须求的时候再采用它。

一、窒碍队列方法


方法 正常动作 特殊情况下的动作
put 添加一个元素 如果队列满,则阻塞
take 移出并返回头元素 如果队列空,则阻塞
add 添加一个元素 如果队列满,抛出IllegalStateException
remove 移出并返回头元素 如果队列空,则抛出NoSuchElementException
element 返回队列头元素 如果队列空,抛出NoSuchElementException
offer 添加一个元素并返回true 如果队列满,返回false
poll 移出并返回队列的头元素 如果队列空,返回null
peek 返回队列的头元素 如果队列空,返回null

窒碍队列方法运用上分下边三类:
1 当将队列充当线程管理工科具来选用时,将在利用put和take方法。
2
当视图向满的队列中增加或从空的队列中移出元素时,add、remove和element抛出非常。
3
当然,在多少个四线程程序中,队列会在任哪一天空或满,因而,可以用offer、poll和peek取代。

劳动者线程枚举在全体子目录下的享有文件并把它们放到二个绿灯队列中。那些操作便捷,固然队列没有设上限的话,异常的快它就隐含了从未有过找到的文件。

二、java.util.concurrent包


java.util.concurrent包提供了不通队列的多少个变种:

  • BlockingQueue

  • LinkedBlockingQueue:链表布局构成的无界(可钦定体量)窒碍队列

  • ArrayBlockingQueue:数组构造组成的有界堵塞队列

  • PriorityBlockingQueue:无界优先级队列,非FIFO队列,成分根据优先级依次被移出

  • DelayQueue:无界梗塞队列,独有延迟期满时本领领到元素

  • SynchronousQueue:堵塞队列,插入操作必须等待另一线程的移除操作
    ,反之亦然

  • BlockingDeque

  • LinkedBlockingDeque:链表布局组成的无界(可内定体积)双端堵塞队列

四头光景下,我们只要使用ArrayBlockingQueue或LinkedBlockingQueue就够了。

以身作则程序显示了如何使用窒碍队列来调整一组线程。程序在多个索引及它的全部子目录下寻找全体文件,打字与印刷出含有钦赐关键字的行:

public class BlockingQueueTest {

    private static final int FILE_QUEUE_SIZE = 10;
    private static final int SEARCH_THREADS = 100;
    private static final File DUMMY = new File("");
    private static BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);

    public static void main(String[] args) {
        try (Scanner in = new Scanner(System.in)) {
            System.out.println("Enter base directory (e.g. /opt/jdk1.8.0/src): ");
            String directory = in.nextLine();
            System.out.println("Enter keyword (e.g. volatile): ");
            String keyword = in.nextLine();

            Runnable enumerator = () -> {
                try {
                    enumerate(new File(directory));
                    queue.put(DUMMY);
                } catch (InterruptedException e) {

                }
            };
            new Thread(enumerator).start();

            for(int i = 1; i <= SEARCH_THREADS; i++) {
                Runnable searcher = () -> {
                    try {
                        boolean done = false;
                        while (!done) {
                            File file = queue.take();
                            if(file == DUMMY) {
                                queue.put(file);
                                done = true;
                            } else {
                                search(file, keyword);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {

                    }
                };
                new Thread(searcher).start();
            }
        }
    }

    // 递归地枚举给定目录及其子目录中的所有文件
    public static void enumerate(File directory) throws InterruptedException {
        File[] files = directory.listFiles();
        for (File file : files) {
            if (file.isDirectory()) enumerate(file);
            else queue.put(file);
        }
    }

    // 搜索一个给定关键字的文件,并打印出所有匹配的行
    public static void search(File file, String keyword) throws IOException {
        try (Scanner in = new Scanner(file, "UTF-8")) {
            int lineNumber = 0;
            while (in.hasNextLine()) {
                lineNumber++;
                String line = in.nextLine();
                if (line.contains(keyword))
                    System.out.printf("%s:%d:%n", file.getPath(), lineNumber, line);
            }
        }
    }

}

劳动者线程枚举全数子目录下的持有文件并把它们放到五个围堵队列中。同期运营大气主顾线程,各样消费者线程从队列抽取一个文书,展开它,打字与印刷全体包含该重大字的行,然后取出下多个文件。大家使用多少个小本领在办事完结后终止这么些应用程序。为了产生完结实信号,枚举线程放置三个虚构对象到行列中(那就疑似在行李输送带上放着三个写着“最后一个包”的虚构包)。当找寻线程取到那几个编造对象时,将其放回并终止。

瞩目,无需突显的线程同步。在此个示例中,已利用了不通队列作为一种合营机制。

咱俩还要还运营了汪洋的物色线程。种种寻找线程从队列中抽出八个文书,展开它,打字与印刷出含有关键字的全体行,然后抽取下七个文件。大家选拔了三个小工夫来在干活停止后终止线程。为了发出完毕数字信号,枚举线程把贰个伪造对象放入队列。(那好像于在行李输送带上放叁个写着“最终二个包”的虚构包。)当找出线程取到这些编造对象时,就将其放回并终止。

只顾,这里没有必要人别的展现的线程同步。在这里个顺序中,大家选择队列数据布局作为一种合作机制。

import java.io.*;  
import java.util.*;  
import java.util.concurrent.*;  

public class BlockingQueueTest  
{  
   public static void main(String[] args)  
   {  
      Scanner in = new Scanner(System.in);  
      System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");  
      String directory = in.nextLine();  
      System.out.print("Enter keyword (e.g. volatile): ");  
      String keyword = in.nextLine();  

      final int FILE_QUEUE_SIZE = 10;  
      final int SEARCH_THREADS = 100;  

      BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);  

      FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));  
      new Thread(enumerator).start();  
      for (int i = 1; i <= SEARCH_THREADS; i++)  
         new Thread(new SearchTask(queue, keyword)).start();  
   }  
}  

/**  
 * This task enumerates all files in a directory and its subdirectories.  
 */ 
class FileEnumerationTask implements Runnable  
{  
   /**  
    * Constructs a FileEnumerationTask.  
    * @param queue the blocking queue to which the enumerated files are added  
    * @param startingDirectory the directory in which to start the enumeration  
    */ 
   public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)  
   {  
      this.queue = queue;  
      this.startingDirectory = startingDirectory;  
   }  

   public void run()  
   {  
      try 
      {  
         enumerate(startingDirectory);  
         queue.put(DUMMY);  
      }  
      catch (InterruptedException e)  
      {  
      }  
   }  

   /**  
    * Recursively enumerates all files in a given directory and its subdirectories  
    * @param directory the directory in which to start  
    */ 
   public void enumerate(File directory) throws InterruptedException  
   {  
      File[] files = directory.listFiles();  
      for (File file : files)  
      {  
         if (file.isDirectory()) enumerate(file);  
         else queue.put(file);  
      }  
   }  

   public static File DUMMY = new File("");  

   private BlockingQueue<File> queue;  
   private File startingDirectory;  
}  

/**  
 * This task searches files for a given keyword.  
 */ 
class SearchTask implements Runnable  
{  
   /**  
    * Constructs a SearchTask.  
    * @param queue the queue from which to take files  
    * @param keyword the keyword to look for  
    */ 
   public SearchTask(BlockingQueue<File> queue, String keyword)  
   {  
      this.queue = queue;  
      this.keyword = keyword;  
   }  

   public void run()  
   {  
      try 
      {  
         boolean done = false;  
         while (!done)  
         {  
            File file = queue.take();  
            if (file == FileEnumerationTask.DUMMY)  
            {  
               queue.put(file);  
               done = true;  
            }  
            else search(file);              
         }  
      }  
      catch (IOException e)  
      {  
         e.printStackTrace();  
      }  
      catch (InterruptedException e)  
      {  
      }        
   }  

   /**  
    * Searches a file for a given keyword and prints all matching lines.  
    * @param file the file to search  
    */ 
   public void search(File file) throws IOException  
   {  
      Scanner in = new Scanner(new FileInputStream(file));  
      int lineNumber = 0;  
      while (in.hasNextLine())  
      {  
         lineNumber++;  
         String line = in.nextLine().trim();  
         if (line.contains(keyword)) System.out.printf("%s:%d    %s%n", file.getPath(), lineNumber, line);  
      }  
      in.close();  
   }  

   private BlockingQueue<File> queue;  
   private String keyword;  
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注