澳门新葡萄京官网首页 14

并发编程 – Concurrent 用户指南( 一)

这篇文章里我们主要讨论下如何在Java里实现一个对象池。最近几年,Java虚拟机的性能在各方面都得到了极大的提升,因此对大多数对象而言,已经没有必要通过对象池来提高性能了。根本的原因是,创建一个新的对象的开销已经不像过去那样昂贵了。

  1. java.util.concurrent – Java 并发工具包

然而,还是有些对象,它们的创建开销是非常大的,比如线程,数据库连接等这些非轻量级的对象。在任何一个应用程序里面,我们肯定会用到不止一个这样的对象。如果有一种很方便的创建管理这些对象的池,使得这些对象能够动态的重用,而客户端代码也不用关心它们的生命周期,还是会很给力的。

Java 5 添加了一个新的包到 Java 平台,java.util.concurrent
包。这个包包含有一系列能够让 Java
的并发编程变得更加简单轻松的类。在这个包被添加以前,你需要自己去动手实现自己的相关工具类。

在真正开始写代码前,我们先来梳理下一个对象池需要完成哪些功能。

本文我将带你一一认识 java.util.concurrent
包里的这些类,然后你可以尝试着如何在项目中使用它们。本文中我将使用 Java
6 版本,我不确定这和 Java 5 版本里的是否有一些差异。我不会去解释关于
Java 并发的核心问题 –
其背后的原理,也就是说,如果你对那些东西感兴趣,参考《Java
并发指南》。

  • 如果有可用的对象,对象池应当能返回给客户端。
  • 客户端把对象放回池里后,可以对这些对象进行重用。
  • 对象池能够创建新的对象来满足客户端不断增长的需求。
  • 需要有一个正确关闭池的机制来确保关闭后不会发生内存泄露。

半成品

不用说了,上面几点就是我们要暴露给客户端的连接池的接口的基本功能。

本文很大程度上还是个
“半成品”,所以当你发现一些被漏掉的类或接口时,请耐心等待。在我空闲的时候会把它们加进来的。

我们的声明的接口如下:

  1. 阻塞队列 BlockingQueue
package com.test.pool;

/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param <T> the type of object to pool.
 */
public interface Pool<T>
{
 /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get();

 /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */

 void release(T t);

 /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */

 void shutdown();
}

java.util.concurrent 包里的 BlockingQueue
接口表示一个线程安放入和提取实例的队列。本小节我将给你演示如何使用这个
BlockingQueue。本节不会讨论如何在 Java 中实现一个你自己的
BlockingQueue。如果你对那个感兴趣,参考《Java
并发指南》

为了能够支持任意对象,上面这个接口故意设计得很简单通用。它提供了从池里获取/返回对象的方法,还有一个关闭池的机制,以便释放对象。

BlockingQueue 用法

现在我们来实现一下这个接口。开始动手之前,值得一提的是,一个理想的release方法应该先尝试检查下这个客户端返回的对象是否还能重复使用。如果是的话再把它扔回池里,如果不是,就舍弃掉这个对象。我们希望这个Pool接口的所有实现都能遵循这个规则。在开始具体的实现类前,我们先创建一个抽象类,以便限制后续的实现能遵循这点。我们实现的抽象类就叫做AbstractPool,它的定义如下:

BlockingQueue
通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:

package com.test.pool;

/**
 * Represents an abstract pool, that defines the procedure
 * of returning an object to the pool.
 *
 * @author Swaranga
 *
 * @param <T> the type of pooled objects.
 */
abstract class AbstractPool <T> implements Pool <T>
{
 /**
  * Returns the object to the pool.
  * The method first validates the object if it is
  * re-usable and then puts returns it to the pool.
  *
  * If the object validation fails,
  * some implementations
  * will try to create a new one
  * and put it into the pool; however
  * this behaviour is subject to change
  * from implementation to implementation
  *
  */
 @Override
 public final void release(T t)
 {
  if(isValid(t))
  {
   returnToPool(t);
  }
  else
  {
   handleInvalidReturn(t);
  }
 }

 protected abstract void handleInvalidReturn(T t);

 protected abstract void returnToPool(T t);

 protected abstract boolean isValid(T t);
}

在上面这个类里,我们让对象池必须得先验证对象后才能把它放回到池里。具体的实现可以自由选择如何实现这三种方法,以便定制自己的行为。它们根据自己的逻辑来决定如何判断一个对象有效,无效的话应该怎么处理(handleInvalidReturn方法),怎么把一个有效的对象放回到池里(returnToPool方法)。

澳门新葡萄京官网首页 1

有了上面这几个类,我们就可以着手开始具体的实现了。不过还有个问题,由于上面这些类是设计成能支持通用的对象池的,因此具体的实现不知道该如何验证对象的有效性(因为对象都是泛型的)。因此我们还需要些别的东西来帮助我们完成这个。

一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。

我们需要一个通用的方法来完成对象的校验,而具体的实现不必关心对象是何种类型。因此我们引入了一个新的接口,Validator,它定义了验证对象的方法。这个接口的定义如下:

一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。

package com.test.pool;

 /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);

  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */

  public void invalidate(T t);
 }

BlockingQueue 的方法

上面这个接口定义了一个检验对象的方法,以及一个把对象置为无效的方法。当准备废弃一个对象并清理内存的时候,invalidate方法就派上用场了。值得注意的是这个接口本身没有任何意义,只有当它在对象池里使用的时候才有意义,所以我们把这个接口定义到Pool接口里面。这和Java集合库里的Map和Map.Entry是一样的。所以我们的Pool接口就成了这样:

BlockingQueue 具有 4
组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

package com.test.pool;

/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */
public interface Pool< T >
{
 /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get();

 /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */

 void release(T t);

 /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */

 void shutdown();

 /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);

  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */

  public void invalidate(T t);
 }
}

准备工作已经差不多了,在最后开始前我们还需要一个终极武器,这才是这个对象池的杀手锏。就是“能够创建新的对象”。我们的对象池是泛型的,因此它们得知道如何去生成新的对象来填充这个池子。这个功能不能依赖于对象池本身,必须要有一个通用的方式来创建新的对象。通过一个ObjectFactory的接口就能完成这个,它只有一个“如何创建新的对象”的方法。我们的ObjectFactory接口如下:

澳门新葡萄京官网首页 2

package com.test.pool;

/**
 * Represents the mechanism to create
 * new objects to be used in an object pool.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to create.
 */
public interface ObjectFactory < T >
{
 /**
  * Returns a new instance of an object of type T.
  *
  * @return T an new instance of the object of type T
  */
 public abstract T createNew();
}

四组不同的行为方式解释:

我们的工具类都已经搞定了,现在可以开始真正实现我们的Pool接口了。因为我们希望这个池能在并发程序里面使用,所以我们会创建一个阻塞的对象池,当没有对象可用的时候,让客户端先阻塞住。我们的阻塞机制是让客户端一直阻塞直到有对象可用为止。这样的话导致我们还需要再增加一个只阻塞一定时间的方法,如果在超时时间到来前有对象可用则返回,如果超时了就返回null而不是一直等待下去。这样的实现有点类似Java并发库里的LinkedBlockingQueue,因此真正实现前我们再暴露一个接口,BlockingPool,类似于Java并发库里的BlockingQueue接口。

抛异常:如果试图的操作无法立即执行,抛一个异常。

这里是BlockingQueue的声明:

特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true /
false)。

package com.test.pool;

import java.util.concurrent.TimeUnit;

/**
 * Represents a pool of objects that makes the
 * requesting threads wait if no object is available.
 *
 * @author Swaranga
 *
 * @param < T > the type of objects to pool.
 */
public interface BlockingPool < T > extends Pool < T >
{
 /**
  * Returns an instance of type T from the pool.
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * indefinitely until an object is available.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * sets the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  * @return T an instance of the Object
  * of type T from the pool.
  */
 T get();

 /**
  * Returns an instance of type T from the pool,
  * waiting up to the
  * specified wait time if necessary
  * for an object to become available..
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * for time until an object is available
  * or until the timeout occurs.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * set the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  *
  * @param time amount of time to wait before giving up,
  *   in units of <tt>unit</tt>
  * @param unit a <tt>TimeUnit</tt> determining
  *   how to interpret the
  *        <tt>timeout</tt> parameter
  *
  * @return T an instance of the Object
  * of type T from the pool.
  *
  * @throws InterruptedException
  * if interrupted while waiting
  */

 T get(long time, TimeUnit unit) throws InterruptedException;
}

澳门新葡萄京官网首页 ,阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

BoundedBlockingPool的实现如下:

超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是
true / false)。

package com.test.pool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public final class BoundedBlockingPool
        extends <AbstractPool>
        implements <BlockingPool>
{
    private int size;
    private BlockingQueue  objects;
    private Validator  validator;
    private ObjectFactory  objectFactory;
    private ExecutorService executor =
            Executors.newCachedThreadPool();
    private volatile boolean shutdownCalled;

    public BoundedBlockingPool(
            int size,
            Validator  validator,
            ObjectFactory  objectFactory)
    {
        super();
        this.objectFactory = objectFactory;
        this.size = size;
        this.validator = validator;
        objects = new LinkedBlockingQueue (size);
        initializeObjects();
        shutdownCalled = false;
    }

    public T get(long timeOut, TimeUnit unit)
    {
        if(!shutdownCalled)
        {
            T t = null;
            try
            {
                t = objects.poll(timeOut, unit);
                return t;
            }
            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
            }
            return t;
        }
        throw new IllegalStateException(
                'Object pool is already shutdown');
    }

    public T get()
    {
        if(!shutdownCalled)
        {
            T t = null;
            try
            {
                t = objects.take();
            }

            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
            }
            return t;
        }

        throw new IllegalStateException(
                'Object pool is already shutdown');
    }

    public void shutdown()
    {
        shutdownCalled = true;
        executor.shutdownNow();
        clearResources();
    }

    private void clearResources()
    {
        for(T t : objects)
        {
            validator.invalidate(t);
        }
    }

    @Override
    protected void returnToPool(T t)
    {
        if(validator.isValid(t))
        {
            executor.submit(new ObjectReturner(objects, t));
        }
    }

    @Override
    protected void handleInvalidReturn(T t)
    {
    }

    @Override
    protected boolean isValid(T t)
    {
        return validator.isValid(t);
    }

    private void initializeObjects()
    {
        for(int i = 0; i < size; i++)
        {
            objects.add(objectFactory.createNew());
        }
    }

    private class ObjectReturner
            implements <Callable>
    {
        private BlockingQueue  queue;
        private E e;
        public ObjectReturner(BlockingQueue  queue, E e)
        {
            this.queue = queue;
            this.e = e;
        }

        public Void call()
        {
            while(true)
            {
                try
                {
                    queue.put(e);
                    break;
                }
                catch(InterruptedException ie)
                {
                    Thread.currentThread().interrupt();
                }
            }
            return null;
        }

    }

}

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue
将会抛出一个 NullPointerException。

上面是一个非常基本的对象池,它内部是基于一个LinkedBlockingQueue来实现的。这里唯一比较有意思的方法就是returnToPool。因为内部的存储是一个LinkedBlockingQueue实现的,如果我们直接把返回的对象扔进去的话,如果队列已满可能会阻塞住客户端。不过我们不希望客户端因为把对象放回池里这么个普通的方法就阻塞住了。所以我们把最终将对象插入到队列里的任务作为一个异步的的任务提交给一个Executor来执行,以便让客户端线程能立即返回。

可以访问到 BlockingQueue
中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如
remove(o)
方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

现在我们将在自己的代码中使用上面这个对象池,用它来缓存数据库连接。我们需要一个校验器来验证数据库连接是否有效。

BlockingQueue 的实现

下面是这个JDBCConnectionValidator:

BlockingQueue 是个接口,你需要使用它的实现之一来使用
BlockingQueue。java.util.concurrent 具有以下 BlockingQueue
接口的实现(Java 6):

package com.test;

import java.sql.Connection;
import java.sql.SQLException;
import com.test.pool.Pool.Validator;
public final class JDBCConnectionValidator implements Validator < Connection >
{
    public boolean isValid(Connection con)
    {
        if(con == null)
        {
            return false;
        }

        try
        {
            return !con.isClosed();
        }
        catch(SQLException se)
        {
            return false;
        }

    }

    public void invalidate(Connection con)
    {
        try
        {
            con.close();
        }
        catch(SQLException se)
        {
        }
    }

}

ArrayBlockingQueue

还有一个JDBCObjectFactory,它将用来生成新的数据库连接对象:

DelayQueue

package com.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import com.test.pool.ObjectFactory;
public class JDBCConnectionFactory implements ObjectFactory < Connection >
{
   private String connectionURL;
   private String userName;
   private String password;
   public JDBCConnectionFactory(
     String driver,
     String connectionURL,
     String userName,
     String password) {
     super();
     try
     {
        Class.forName(driver);
     }
     catch(ClassNotFoundException ce)
     {
        throw new IllegalArgumentException('Unable to find driver in classpath', ce);
     }
     this.connectionURL = connectionURL;
     this.userName = userName;
     this.password = password;
   }

   public Connection createNew()
   {
      try
      {
         return DriverManager.getConnection(
            connectionURL,
            userName,
            password);
      }
      catch(SQLException se)
      {
         throw new IllegalArgumentException('Unable to create new connection', se);
      }
   }
}

LinkedBlockingQueue

现在我们用上述的Validator和ObjectFactory来创建一个JDBC的连接池:

PriorityBlockingQueue

package com.test;

import java.sql.Connection;
import com.test.pool.Pool;
import com.test.pool.PoolFactory;

public class Main
{
    public static void main(String[] args)
    {
        Pool < Connection > pool =
            new BoundedBlockingPool < Connection > (
            10,
            new JDBCConnectionValidator(),
            new JDBCConnectionFactory('', '', '', '')
        );
        //do whatever you like
    }
}

SynchronousQueue

为了犒劳下能读完整篇文章的读者,我这再提供另一个非阻塞的对象池的实现,这个实现和前面的唯一不同就是即使对象不可用,它也不会让客户端阻塞,而是直接返回null。具体的实现在这:

Java 中使用 BlockingQueue 的例子

package com.test.pool;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;

public class BoundedPool < T > extends AbstractPool < T >
{
    private int size;
    private Queue < T > objects;
    private Validator < T > validator;
    private ObjectFactory < T > objectFactory;
    private Semaphore permits;
    private volatile boolean shutdownCalled;

    public BoundedPool(
        int size,
        Validator < T > validator,
        ObjectFactory < T > objectFactory)
        {
        super();
        this.objectFactory = objectFactory;
        this.size = size;
        this.validator = validator;
        objects = new LinkedList < T >();
        initializeObjects();
        shutdownCalled = false;
    }

    @Override
    public T get()
    {
        T t = null;

        if(!shutdownCalled)
        {
            if(permits.tryAcquire())
            {
                t = objects.poll();
            }

         }
         else
         {
             throw new IllegalStateException('Object pool already shutdown');
         }
         return t;
     }

     @Override
     public void shutdown()
     {
         shutdownCalled = true;
         clearResources();
     }

     private void clearResources()
     {
         for(T t : objects)
         {
             validator.invalidate(t);
         }
     }

     @Override
     protected void returnToPool(T t)
     {
         boolean added = objects.add(t);
         if(added)
         {
             permits.release();
         }
     }

     @Override
     protected void handleInvalidReturn(T t)
     {
     }

     @Override
     protected boolean isValid(T t)
     {
         return validator.isValid(t);
     }

     private void initializeObjects()
     {
         for(int i = 0; i < size; i++)
         {
             objects.add(objectFactory.createNew());
         }
     }
}

这里是一个 Java 中使用 BlockingQueue 的示例。本示例使用的是
BlockingQueue 接口的 ArrayBlockingQueue 实现。

考虑到我们现在已经有两种实现,非常威武了,得让用户通过工厂用具体的名称来创建不同的对象池了。工厂来了:

首先,BlockingQueueExample 类分别在两个独立的线程中启动了一个 Producer
和 一个 Consumer。

package com.test.pool;

import com.test.pool.Pool.Validator;

/**

* Factory and utility methods for

* {@link Pool} and {@link BlockingPool} classes

* defined in this package.
* This class supports the following kinds of methods:
*
*
<ul>
*
<li> Method that creates and returns a default non-blocking
*        implementation of the {@link Pool} interface.
*   </li>
*
*
<li> Method that creates and returns a
*        default implementation of
*        the {@link BlockingPool} interface.
*   </li>
*
</ul>
*
* @author Swaranga
*/
public final class PoolFactory
{
    private PoolFactory()
    {
    }

/**
* Creates a and returns a new object pool,
* that is an implementation of the {@link BlockingPool},
* whose size is limited by
* the <tt> size </tt> parameter.
*
* @param size the number of objects in the pool.
* @param factory the factory to create new objects.
* @param validator the validator to
* validate the re-usability of returned objects.
*
* @return a blocking object pool
* bounded by <tt> size </tt>
*/
public static < T > Pool < T >
newBoundedBlockingPool(
int size,
ObjectFactory < T > factory,
Validator < T > validator)
{
    return new BoundedBlockingPool < T > (
    size,
    validator,
    factory);
}
/*
* Creates a and returns a new object pool,
* that is an implementation of the {@link Pool}
* whose size is limited
* by the <tt> size </tt> parameter.
*
* @param size the number of objects in the pool.
* @param factory the factory to create new objects.
* @param validator the validator to validate
* the re-usability of returned objects.
*
* @return an object pool bounded by <tt> size </tt>
*/
public static < T > Pool < T > newBoundedNonBlockingPool(
    int size,
    ObjectFactory < T > factory,
    Validator < T > validator)
{
    return new BoundedPool < T >(size, validator, factory);
}
}

Producer 向一个共享的 BlockingQueue 中注入字符串,而 Consumer
则会从中把它们拿出来。

现在我们的客户端就能用一种可读性更强的方式来创建对象池了:

澳门新葡萄京官网首页 3

package com.test;

import java.sql.Connection;
import com.test.pool.Pool;
import com.test.pool.PoolFactory;

public class Main
{
    public static void main(String[] args)
    {
        Pool < Connection > pool =
        PoolFactory.newBoundedBlockingPool(
        10,
        new JDBCConnectionFactory('', '', '', ''),
        new JDBCConnectionValidator());
        //do whatever you like
     }
}

以下是 Producer 类。注意它在每次 put()
调用时是如何休眠一秒钟的。这将导致 Consumer
在等待队列中对象的时候发生阻塞。

好吧,终于写完了,拖了这么久了。尽情使用和完善它吧,或者再多加几种实现。

澳门新葡萄京官网首页 4

快乐编码,快乐分享!

以下是 Consumer 类。它只是把对象从队列中抽取出来,然后将它们打印到
System.out。

澳门新葡萄京官网首页 5

  1. 数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 类实现了 BlockingQueue 接口。

ArrayBlockingQueue
是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。

‘ArrayBlockingQueue 内部以
FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

以下是在使用 ArrayBlockingQueue 的时候对其初始化的一个示例:

澳门新葡萄京官网首页 6

  1. 延迟队列 DelayQueue

DelayQueue 实现了 BlockingQueue 接口。DelayQueue
对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现
java.util.concurrent.Delayed 接口,该接口定义:

澳门新葡萄京官网首页 7

DelayedElement 是我所创建的一个 DelayedElement 接口的实现类,它不在
Java.util.concurrent 包里。你需要自行创建你自己的 Delayed
接口的实现以使用 DelayQueue 类。

  1. 链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 类实现了 BlockingQueue 接口。

LinkedBlockingQueue
内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用
Integer.MAX_VALUE 作为上限。

LinkedBlockingQueue 内部以
FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

以下是 LinkedBlockingQueue 的初始化和使用示例代码:

澳门新葡萄京官网首页 8

  1. 具有优先级的阻塞队列 PriorityBlockingQueue

PriorityBlockingQueue 类实现了 BlockingQueue 接口。

PriorityBlockingQueue 是一个无界的并发队列。它使用了和类
java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null
值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable
接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。注意
PriorityBlockingQueue 对于具有相等优先级(compare() ==
0)的元素并不强制任何特定行为。

同时注意,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该
Iterator 并不能保证它对元素的遍历是以优先级为序的。

以下是使用 PriorityBlockingQueue 的示例:

澳门新葡萄京官网首页 9

  1. 同步队列 SynchronousQueue

SynchronousQueue 类实现了 BlockingQueue 接口。

SynchronousQueue
是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

  1. 阻塞双端队列 BlockingDeque

java.util.concurrent 包里的 BlockingDeque
接口表示一个线程安放入和提取实例的双端队列。本小节我将给你演示如何使用
BlockingDeque。BlockingDeque
类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。deque(双端队列)
是 “Double Ended Queue”
的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

BlockingDeque 的使用

在线程既是一个队列的生产者又是这个队列的消费者的时候可以使用到
BlockingDeque。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候也可以使用
BlockingDeque。

澳门新葡萄京官网首页 10

一个 BlockingDeque – 线程在双端队列的两端都可以插入和提取元素。

一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

BlockingDeque 的方法

BlockingDeque 具有 4
组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

澳门新葡萄京官网首页 11

四组不同的行为方式解释:

抛异常:如果试图的操作无法立即执行,抛一个异常。

特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true /
false)。

阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是
true / false)。

BlockingDeque 继承自 BlockingQueue

BlockingDeque 接口继承自 BlockingQueue 接口。

这就意味着你可以像使用一个 BlockingQueue 那样使用
BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如
BlockingQueue 接口的插入和移除方法一样。

以下是 BlockingDeque 对 BlockingQueue 接口的方法的具体内部实现:

澳门新葡萄京官网首页 12

BlockingDeque 的实现

既然 BlockingDeque
是一个接口,那么你想要使用它的话就得使用它的众多的实现类的其中一个。java.util.concurrent
包提供了以下 BlockingDeque 接口的实现类:

LinkedBlockingDeque

BlockingDeque 代码示例

以下是如何使用 BlockingDeque 方法的一个简短代码示例:

澳门新葡萄京官网首页 13

  1. 链阻塞双端队列 LinkedBlockingDeque

LinkedBlockingDeque 类实现了 BlockingDeque 接口。

deque(双端队列) 是 “Double Ended Queue”
的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。(译者注:唐僧啊,受不了。)LinkedBlockingDeque
是一个双端队列,在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据。以下是
LinkedBlockingDeque 实例化以及使用的示例:

澳门新葡萄京官网首页 14

发表评论

电子邮件地址不会被公开。 必填项已用*标注