澳门新葡萄京官网首页Java Executor 框架学习总结

大超多产出都以经过义务奉行的措施来贯彻的。常常常有二种方式实行职务:串行和相互。

现身设计的真相,正是要把程序的逻辑解释为五个任务,这一个任务独立而又合作的到位程序的作用。而里面最器重的地点正是怎么样将逻辑上的职务分配到实际的线程中去奉行。换来说之,任务是指标,而线程是载体,线程的贯彻要以任务为对象。

class SingleThreadWebServer {
  public static void main(String[] args) throws Exception {
    ServerSocket socket = new ServerSocket(80);
    while(true) {
      Socket conn = socket.accept();
      handleRequest(conn);
    }
  }
}
class ThreadPerTaskWebServer {
  public static void main(String[] args) throws Exception {
    ServerSocket socket = new ServerSocket(80);
    while(true) {
      final Socket conn = socket.accept();
      Runnable task = new Runnable() {
        public void run() {
          handleRequest(conn);
        }
      };
      new Thread(task).start();
    }
  }
}

1. 在线程中实行职分

并发程序设计的第一步正是要分开职责的界限,理想图景下正是享有的天职都独立的:各样任务都以不相信任于任何任务的气象,结果和境界。因为独立的任务是最有扶助并发设计的。

有一种最自然的职务划分方法就是以单独的顾客诉求为天职边界。每一个客户央浼是独立的,则管理职分诉求的职分也是单身的。

在分割完任务之后,下一标题正是怎样调节那一个任务,最简易的章程就是串行调用全体任务,也正是四个一个的实行。

举个例子下面包车型大巴这些套接字服务程序,每一回都只能响应多个须要,下三个伏乞须求等上贰个倡议实行达成之后再被管理。

public class SingleThreadWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }

    private static void handleRequest(Socket connection) {
        // request-handling logic here
    }
}

这种设计当然是不可能满意须求的,并发的高吞吐和高响应速度的优势都没发挥出来。

自然上面包车型的士那二种艺术都以有题目标。单线程的主题素材就是并发量会是瓶颈,多线程版本就是无界定的创办线程会招致能源贫乏难点。

1.1 展现地创立线程

上述代码的优化版正是为各个央求都分配独立的线程来进行,相当于每四个倡议职责都以一个独门线程。

public class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            //为每个请求创建单独的线程任务,保证并发性
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            new Thread(task).start();
        }
    }

    private static void handleRequest(Socket connection) {
        // request-handling logic here
    }
}

那般设计的亮点在于:

  • 义务管理线程从主线程分离出来,使得主线程不用等待职分实现就可以去火速地去响应下四个伸手,以高达高响应速度;
  • 职分管理能够并行,帮助同期管理五个供给;
  • 任务管理是线程安全的,因为每一种职务都是独自的。

而是需求小心的是,职责必需是线程安全的,否者多线程并发时会分外。

Executor 框架

任务是一组逻辑职业单元,而线程是使义务异步试行的机制。

JDK 提供了 Executor 接口:

public interface Executor {
    void execute(Runnable command);
}

就算 Executor
接口比较容易,不过却是异步职务实施框架的根底,该框架能扶助二种区别类型的职责推行计谋。它提供了一种规范的措施把职务的付出进程与实践进度进展通晓耦。用
Runnable 来代表职责。Executor
的贯彻提供了对生命周期的匡助以致总括音讯应用程序管理等编制。

Executor
是根据临盆者消费者形式的,提交职责的操作也就是劳动者,实践任务的线程也正是花费。

依照 Executor 的 WebServer 例子如下:

public class TaskExecutorWebServer {
  private static final int NTHREADS = 100;
  private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
  public static void main(String[] args) throws Exception {
    ServerSocket serverSocket = new ServerSocket(80);
    while (true) {
      final Socket conn = serverSocket.accept();
      Runnable task = new Runnable() {
        @Override
        public void run() {
          handleRequest(conn);
        }
      };
      exec.execute(task);
    }
  }
}

澳门新葡萄京官网首页,此外能够自个儿落成 Executor 来调控是现身照旧并行的,如上边代码:

/**
 * 执行已提交的 Runnable 任务的对象。
 * 此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。
 * 通常使用 Executor 而不是显式地创建线程。
 *
 *
 * @author renchunxiao
 *
 */
public class ExecutorDemo {
  public static void main(String[] args) {
    Executor executor = new ThreadExecutor();
    executor.execute(new Runnable() {
      @Override
      public void run() {
        // do something
      }
    });
    Executor executor2 = new SerialExecutor();
    executor2.execute(new Runnable() {
      @Override
      public void run() {
        // do something
      }
    });
  }
}
/**
 * 创建一个线程来执行 command
 *
 * @author renchunxiao
 *
 */
class ThreadExecutor implements Executor {
  @Override
  public void execute(Runnable command) {
    new Thread(command).start();
  }
}
/**
 * 串行执行 command
 *
 * @author renchunxiao
 *
 */
class SerialExecutor implements Executor {
  @Override
  public void execute(Runnable command) {
    command.run();
  }
}

1.3 无界定成立线程的缺少

可是上述的方案或许有不足的:

  1. 线程的生命周期的开销相当的大:每创立二个线程都以要消耗大批量的计量能源;
  2. 财富的消耗:活跃的线程要消耗内部存储器能源,假如有太多的闲暇资源就能够使得许多内部存款和储蓄器财富浪费,招致内部存款和储蓄器财富不足,四线程并发时就汇合世财富强占的难题;
  3. 上情下达:可创制线程的个数是有约束的,过多的线程数会促成内存溢出;

行使创造线程来攻击的例证中,最招摇过市的正是无休止创设死循环的线程,最后产生整个Computer的财富都耗尽。

线程池

线程池就是线程的能源池,能够通过 Executors
中的静态工厂方法来成立线程池。

  • newFixedThreadPool。成立固定长度的线程池,每便提交职务创设二个线程,直到到达线程池的最大额,线程池的轻重不再变化。
  • newSingleThreadExecutor。单个线程池。
  • newCachedThreadPool。依据任务范围变动的线程池。
  • newScheduledThreadPool。创立固定长度的线程池,以延缓或定期的不二秘技来执行职分。

JVM 唯有在装有非守护线程全部甘休后才会脱离,所以,假设不能够准确的关门
Executor,那么 JVM 就不能收场。

为了消除实行服务的生命周期难题,有个扩展 Executor 接口的新接口
ExecutorService。

public interface ExecutorService extends Executor {
  void shutdown();
  List<Runnable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException;
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                  long timeout, TimeUnit unit)
    throws InterruptedException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;
  <T> T invokeAny(Collection<? extends Callable<T>> tasks,
          long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService 生命周期有二种状态:运营、关闭、已偃旗息鼓。ExecutorService在开班创设时处于运营情状。shutdown
方法会平缓关闭:不在选拔新的职务,而且等待已经施行的职分推行到位(包罗那多少个还没最初的天职卡塔尔。shutdownNow
方法将强行关闭:它将尝试废除全部运转中的职务,况兼不再运维队列中平素不开端的天职。全部职务都奉行到位后跻身到已告一段落情形。

2.Executor框架

任务是一组逻辑专业单元,而线程则是职分异步试行的体制。为了让职务更加好地分配到线程中推行,java.util.concurrent提供了Executor框架。

Executor基于临蓐者-消费者形式:提交任务的操作约等于劳动者(生成待达成的行事单元),推行职分的线程则一定于购买者(推行完这么些职业单元)。

将以上的服务端代码改动为Executor框架如下:

public class TaskExecutionWebServer {
    //设定线程池大小;
    private static final int NTHREADS = 100;
    private static final Executor exec
            = Executors.newFixedThreadPool(NTHREADS);

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(connection);
                }
            };
            exec.execute(task);
        }
    }

    private static void handleRequest(Socket connection) {
        // request-handling logic here
    }
}

Callable 和 Future

Executor 框架使用 Runnable 作为中央的职务表示情势。Runnable
是一种有局限性的空洞,它的 run 方法不能够重临值和抛出二个受检查相当。

成都百货上千职务实际是存在延时的计算,譬如数据库查询,从网络获取能源。对于那么些职务,Callable
是越来越好的架空,它感到 call 将回来三个值,何况可能抛出极其。

Executor
实行的天职有多个生命周期阶段:成立、提交、开端和完毕。由于有个别职责须求不长日子有希望希望收回,在
Executor 框架个中,已交给未起初的职责能够撤除。

Future
表示二个职分的生命周期,并且提供了相应的法子来决断是还是不是曾经到位或注销,以至获得职务的结果和撤回职分等。

2.1 线程池

Executor的本色就是拘系和调解线程池。所谓线程池正是指管理一组同构职业线程的财富池。线程池和职务队列相辅而行:职责队列中保存着富有带推行的天职,而线程池中保有能够去履行职分的行事线程,专门的学问线程从职责队列中世界叁个职务推行,实践任务达成之后在回到线程池中等待下多个职务的赶来。

任务池的优势在于:

  1. 透过复用现成线程而不是成立新的线程,收缩创立线程时的付出;
  2. 复用现存线程,能够从来实践义务,制止因创设线程而让职分等待,进步响应速度。

Executor能够创立的线程池共有各个:

  1. newFixedThreadPool,即一定大小的线程池,假使有线程因爆发了要命而夭亡,会创建新的线程代替:
  2. newCachedThreadPool,即协助缓存的线程池,借使线程池的范畴超过了须要的规模,就能够回收空闲线程,纵然需求大增,则会增添线程池的框框;
  3. newScheduledThreadPool,固定大小的线程池,并且以延时要么定期的主意进行;
  4. newSingleThreadExecutor,单线程情势,串行试行职务;

2.2 Executor的生命周期

此间须求单独说下Executor的生命周期。由于JVM独有在非守护线程全体悬停才会退出,所以只要没正确退出Executor,就能引致JVM不可能平常甘休。不过Executor是利用异步的措施施行线程,并无法即时掌握所有线程的气象。为了越来越好的管理Executor的生命周期,Java1.5始发提供了Executor的恢弘接口ExecutorService

ExecutorService提供了三种方法关闭措施:

  • shutdown:
    平缓的停业过程,即不再接收新的职分,等到已交付的职务实行实现后关闭进度池;
  • shutdownNow: 马上关闭全部职分,无论是或不是再实施;

服务端ExecutorService版的达成如下:

public class LifecycleWebServer {
    private final ExecutorService exec = Executors.newCachedThreadPool();

    public void start() throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (!exec.isShutdown()) {
            try {
                final Socket conn = socket.accept();
                exec.execute(new Runnable() {
                    public void run() {
                        handleRequest(conn);
                    }
                });
            } catch (RejectedExecutionException e) {
                if (!exec.isShutdown())
                    log("task submission rejected", e);
            }
        }
    }

    public void stop() {
        exec.shutdown();
    }

    private void log(String msg, Exception e) {
        Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
    }

    void handleRequest(Socket connection) {
        Request req = readRequest(connection);
        if (isShutdownRequest(req))
            stop();
        else
            dispatchRequest(req);
    }

    interface Request {
    }

    private Request readRequest(Socket s) {
        return null;
    }

    private void dispatchRequest(Request r) {
    }

    private boolean isShutdownRequest(Request r) {
        return false;
    }
}

2.3 延迟职分和周期性职责

Java中提供Timer来施行延时职分和周期任务,不过提姆er类有以下的重疾:

  1. Timer只会创立三个线程来实施职务,假诺有一个提姆erTask推行时间太长,就能够影响到其余TimerTask的定时精度;
  2. Timer不会捕捉TimerTask未定义的十分,所以当有那些抛出到Timer中时,Timer就能够崩溃,而且也不能够复苏,就能影响到已经被调节可是未有奉行的天职,变成“线程走漏”。

提出利用ScheduledThreadPoolExecutor来代表Timer类。

3. Callable & Future

如上文所说,Executor以Runnable的款型描述职责,不过Runnable有超大的局限性:

  • 不曾重临值,只是实践义务;
  • 无法管理被抛出的充裕;

为了弥补以上的难点,Java中设计了另一种接口Callable

public interface Callable<V> {
    V call() throws Exception;
}

Callable扶持任务有再次来到值,并帮忙非常的抛出。假设愿意收获子线程的实践结果,那Callable将比Runnable更为切合。

任凭Callable依然Runnable都是对于职分的思梅止渴描述,即注脚任务的约束:有家弦户诵的起点,並且都会在必然标准下终止。

Executor框架下所奉行的天职都有三种生命周期:

  • 创建;
  • 提交;
  • 开始;
  • 完成;

对此二个已交由但还向来不起来的天职,是能够随即被终止;然则即便一个任务现已假如已经开头实施,就不得不等到其对应中断时再打消;当然,对于一个业已实施到位的职务,对其收回职务是还未有别的效果的。

既然如此职务有生命周期,那要怎么可以力知道叁个义务当前的生命周期状态呢?Callable既然有重临值,如何去在主线程中获取子线程的重返值呢?为了解决那些主题素材,就需求Future类的扶植。

public interface Future<V> {
    //取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 任务是否被取消
    boolean isCancelled();
    // 任务是否完成
    boolean isDone();
    // 获得任务的返回值
    V get() throws InterruptedException, ExecutionException;
    // 在超时期限内等待返回值
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future类表示职责生命周期状态,其命名显示了任务的生命周期只可以前行无法后退。

Future类提供情势查询任务境况外,还提供get办法赢得任务的重返值,可是get方法的行事决意于职分意况:

  • 万一职分现已做到,get方法则会立即回去;
  • 只要职分还在施行中,get方法规会拥塞甘休职分到位;
  • 假设职责在推行的进度中抛出十分,get方法会将该特别封装为ExecutionException中,并能够透过getCase方法获得实际相当原因;

万一将四个Callable对象提交给ExecutorService,submit方法就能够重临三个Future对象,通过这一个Future对象就足以在主线程中获取该职责的景色,并收获再次回到值。

除去,能够显式地把Runnable和Callable对象封装成FutureTask对象,FutureTask不光继承了Future接口,也世袭Runnable接口,所以可以直接调用run方法实践。

既然如此是出新管理,当然会遇上一次性交给一组职分的气象,当时能够选拔CompletionService,CompletionService能够驾驭为Executor和BlockingQueue的构成:当一组任务被提交后,CompletionService将如约职责做到的逐一将职务的Future对象放入队列中。

CompletionService的接口如下:

public interface CompletionService<V> {

    Future<V> submit(Callable<V> task);

    Future<V> submit(Runnable task, V result);
    //如果队列为空,就会阻塞以等待有任务被添加
    Future<V> take() throws InterruptedException;

    //如果队列为空,就会返回null;
    Future<V> poll();

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

请小心take方法和poll方法的分别。

除去利用Completion瑟维斯来一个三个获得成功职责的Future对象外,还足以调用Executor塞里ve的invokeAll()方法。

invokeAll帮助有效期提交一组职责(职分的成团),并得到一个Future数组。invokeAll方法将根据任务集结迭代器的顺序将职分对应的Future对象放入数组中,那样就可以把传播的职分(Callable)和结果(Future)联系起来。当一切任务执行完成,大概逾期,再大概被暂停时,invokeAll将回到Future数组。

当invokeAll方法重回时,每一个职责如故不荒谬实现,要么被收回,即都是截至的情景了。

发表评论

电子邮件地址不会被公开。 必填项已用*标注