澳门新葡萄京官网注册 5

Java多线程编程模式实战指南:Active Object模式

Active Object形式简要介绍

Active
Object格局是一种异步编制程序方式。它通过对艺术的调用与措施的实行实行解耦来拉长并发性。若以任务的定义的话,Active
Object情势的骨干则是它同意职分的交给(约等于对异步方法的调用)和职责的奉行(也就是异步方法的真正实行)抽离。那有一点点形似于System.gc(卡塔尔(قطر‎那个艺术:客商端代码调用完gc(卡塔尔(قطر‎后,二个实行垃圾回笼的职分被提交,但这个时候JVM并不一定进行了排放物回笼,而大概是在gc(卡塔尔方法调用重回后的某段时间才起来施行职责——回笼废品料。我们知道,System.gc(卡塔尔国的调用方代码是运作在和睦的线程上(日常是main线程派生的子线程),而JVM的污源回笼这些动作则由极度的线程(垃圾回笼线程)来履行的。换言之,System.gc(卡塔尔这几个方式所表示的动作(其所定义的作用)的调用方和进行方是运营在不一致的线程中的,从而巩固了并发性。

再进一层介绍Active
Object情势,大家可先简单地将其主干明白为二个名称为ActiveObject的类,该类对外暴光了一些异步方法,如图1所示。

图 1. ActiveObject对象示例

澳门新葡萄京官网注册 1

doSomething方法的调用方和实行方运营在个其余线程上。在产出的景况下,doSomething方法会被多少个线程调用。那时所需的线程安控封装在doSomething方法背后,使得调用方代码无需关切那点,进而简化了调用方代码:从调用方代码来看,调用叁个Active
Object对象的秘籍与调用普通Java对象的章程并无太大差距。如清单1所示。

清单 1. Active Object方法调用示例

ActiveObject ao=...;
Future future = ao.doSomething("data");
//执行其它操作
String result = future.get();
System.out.println(result);

Active Object情势的评论和介绍与落到实处勘探

Active
Object形式通过将艺术的调用与奉行分离,达成了异步编制程序。有利于进步并发性,进而加强系统的吞吐率。

Active
Object形式还有个好处是它能够将任务(MethodRequest)的付出(调用异步方法)和天职的试行攻略(Execution
Policy)抽离。任务的实行战术被封装在Scheduler的实现类之内,由此它对外是不“可以预知”的,一旦供给转移也不会耳濡目染别的代码,减弱了系统的耦合性。职务的实行政策能够反映以下一些标题:

  • 行使什么样顺序去实践职分,如FIFO、LIFO、只怕依赖职责中包罗的音讯所定的优先级?
  • 多少个任务能够并发实践?
  • 些微个职分能够被排队等待实施?
  • 澳门新葡萄京官网注册,假若有职责由于系统过载被谢绝,这时候哪些职责该被入选作为捐躯品,应用程序该如何被文告到?
  • 职务实行前、实行后须求实践怎么着操作?

那意味着,任务的推行各种能够和职务的交付顺序分歧,能够选拔单线程也足以使用多线程去实行职分等等。

本来,好处的私行总是暗藏着代价,Active
Object格局落成异步编制程序也许有其代价。该方式的参加者有6个之多,其达成进程也饱含了广大个中的管理:MethodRequest对象的浮动、MethodRequest对象的运动(进出缓冲区)、MethodRequest对象的运维调解和线程上下文切换等。这个管理皆有其空间和岁月的代价。由此,Active
Object方式适合于分解二个相比耗费时间的职分(如涉及I/O操作的职责):将职务的倡导和进行进行分离,以减弱不供给的等候时间。

固然如此形式的参加者比较多,但正如本文案例的落到实处代码所彰显的,在那之中山大学部分的参加者大家得以应用JDK本人提供的类来兑现,以节约编码时间。如表1所示。

表 1. 行使JDK现成类完毕Active Object的片段参预者

参与者名称 可以借用的JDK类 备注
Scheduler Java Executor Framework中的java.util.concurrent.ExecutorService接口的相关实现类,如java.util.concurrent.ThreadPoolExecutor。 ExecutorService接口所定义的submit(Callable<T> task)方法相当于图2中的enqueue方法。
ActivationQueue java.util.concurrent.LinkedBlockingQueue 若Scheduler采用java.util.concurrent.ThreadPoolExecutor,则java.util.concurrent.LinkedBlockingQueue实例作为ThreadPoolExecutor构造器的参数。
MethodRequest java.util.concurrent.Callable接口的匿名实现类。 Callable接口比起Runnable接口的优势在于它定义的call方法有返回值,便于将该返回值传递给Future实例。
Future java.util.concurrent.Future ExecutorService接口所定义的submit(Callable<T> task)方法的返回值类型就是java.util.concurrent.Future。

接上一篇《八线程设计方式小结(二State of Qatar》,那篇博客再聊一下最复杂的Active
Object模式

Active Object情势的布局

当Active
Object情势对外揭破的异步方法被调用时,与该格局调用相关的上下文音讯,包罗被调用的异步方法名(或其表示的操作)、调用方代码所传递的参数等,会被封装成二个对象。该目的被誉为方法必要(Method
Request)。方法央求对象会被存入Active
Object格局全部限支撑的缓冲区(Activation
Queue)中,并由专门的学业线程担当依据其含有的上下文音信实行相应的操作。也正是说,方法央求对象是由运转调用方代码的线程通过调用Active
Object形式对外暴光的异步方法生成的,而艺术供给所代表的操作则由极度的线程来进行,进而完毕了措施的调用与奉行的拜别,产生了现身。

Active Object方式的主要参加者有以下三种。其类图如图2所示。

图 2. Active Object情势的类图

(点击图像放大卡塔尔(قطر‎

澳门新葡萄京官网注册 2

  • Proxy:担当对外暴光异步方法接口。当调用方代码调用该参预者实例的异步方法doSomething时,该方法会生成三个相应的MethodRequest实例并将其积累到Scheduler所维护的缓冲区中。doSomething方法的再次回到值是三个意味其施行结果的外包装对象:Future参加者的实例。异步方法doSomething运转在调用方代码所在的线程中。
  • MethodRequest:负担将调用方代码对Proxy实例的异步方法的调用封装为一个指标。该目标保留了异步方法的名号及调用方代码传递的参数等上下文音信。它使得将Proxy的异步方法的调用和实行抽离成为大概。其call方法会根据其所包含上下文音信调用Servant实例的应和措施。
  • ActivationQueue:负责有的时候存款和储蓄由Proxy的异步方法被调用时所创办的MethodRequest实例的缓冲区。
  • Scheduler:担负将Proxy的异步方法所开创的MethodRequest实例存入其保证的缓冲区中。并基于早晚的调解攻略,对其爱戴的缓冲区中的MethodRequest实例实行实行。其调整战术能够依照实际需求来定,如FIFO、LIFO和基于MethodRequest中包涵的音信所定的优先级等。
  • Servant:担负对Proxy所暴光的异步方法的现实性达成。
  • Future:肩负积攒和重临Active Object异步方法的实践结果。

Active Object形式的体系图如图3所示。

图 3. Active Object形式的连串图

(点击图像放大State of Qatar

澳门新葡萄京官网注册 3

第1步:调用方代码调用Proxy的异步方法doSomething。

第2~7步:doSomething方法创立Future实例作为该措施的重返值。并将调用方代码对该方法的调用封装为MethodRequest对象。然后以所创办的MethodRequest对象作为参数调用Scheduler的enqueue方法,以将MethodRequest对象存入缓冲区。Scheduler的enqueue方法会调用Scheduler所维护的ActivationQueue实例的enqueue方法,将MethodRequest对象存入缓冲区。

第8步:doSomething再次来到其所开创的Future实例。

第9步:Scheduler实例选用特地的劳作线程运营dispatch方法。

第10~12步:dispatch方法调用ActivationQueue实例的dequeue方法,获取一个MethodRequest对象。然后调用MethodRequest对象的call方法

第13~16步:MethodRequest对象的call方法调用与其涉及的Servant实例的附和措施doSomething。并将Servant.doSomething方法的重返值设置到Future实例上。

第17步:MethodRequest对象的call方法重返。

上述手续中,第1~8步是运营在Active
Object的调用者线程中的,那多少个步骤完毕了将调用方代码对Active
Object所提供的异步方法的调用封装成对象(Method
Request),并将其存入缓冲区。那多少个步骤实现了职务的交给。第9~17步是运转在Active
Object的干活线程中,这么些步骤达成从缓冲区中读取Method
Request,并对其开展实践,完毕了职分的实践。进而完成了Active
Object对外暴露的异步方法的调用与试行的告别。

假定调用方代码关注Active
Object的异步方法的再次回到值,则足以在其索要时,调用Future实例的get方法来博取异步方法的真正实践结果。

荒诞隔绝

荒唐隔开分离指三个职务的管理战败不影响别的任务的管理。每一个MethodRequest实例能够看作一个职分。那么,Scheduler的兑现类在执行MethodRequest时要求专一错误隔断。选取JDK中现存的类(如ThreadPoolExecutor)来促成Scheduler的一个功利正是那么些类大概早已落到实处了不当隔断。而一旦协调编排代码实现Scheduler,用单个Active
Object专门的学问线程逐条执行全体职责,则须要非常注意线程的run方法的可怜管理,确定保证不会因为个别任务实行时遇上一些运行时特别而致使整个线程终止。如项目清单6的亲自过问代码所示。

项目清单 6. 自身动手达成Scheduler的谬误隔开示例代码

public class CustomScheduler implements Runnable {
    private LinkedBlockingQueue<Runnable> activationQueue = 
        new LinkedBlockingQueue<Runnable>();

    @Override
    public void run() {
        dispatch();
    }

    public <T> Future<T> enqueue(Callable<T> methodRequest) {
        final FutureTask<T> task = new FutureTask<T>(methodRequest) {

            @Override
            public void run() {
                try {
                   super.run();
                //捕获所以可能抛出的对象,避免该任务运行失败而导致其所在的线程终止。  
                } catch (Throwable t) {
                   this.setException(t);
                }
            }

        };

        try {
            activationQueue.put(task);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return task;
    }

    public void dispatch() {
        while (true) {
            Runnable methodRequest;
            try {
                methodRequest = activationQueue.take();

                //防止个别任务执行失败导致线程终止的代码在run方法中
                methodRequest.run();
            } catch (InterruptedException e) {
                // 处理该异常
            }

        }
    }
}

12)Active Object

Active
Object模式,也称为Actor模式。Active
Object即积极对象,它不光具有独立线程,而且能够从外表选用异步消息,并能协作供给重返管理结果。这里的Active
Object不是指一个对象,而是指将一堆对象组织起来,对外表现为三个安然无事,这几个欧洲经济共同体具备独立线程,并能接受外界的异步新闻,那些全体(Active
Object卡塔尔管理完异步新闻后还能再次回到结果给调用者。

Future
Pattern也能接过异步音讯并回各管理结果,不过该方式聚集在Future上,不是很爱慕线程主动施行方面,而Activie
Object将独自线程,选取异步信息并回随地理结果这么些地方作为叁个完整。Active
Object格局综合应用了原先介绍的Producer-Consumer方式,Thread-Per-Message格局,Future方式等八线程设计方式。

Active Object格局实战案例

某邮电通讯软件有三个彩信短号模块。其重要性作用是完成手提式有线电话机客商给别的手提式有线电电话机客商发送彩信时,选择方号码能够填充为对方的短号。比如,客户13612345678给其共事13787654321出殡和安葬彩信时,能够将选拔方号码填写为对方的短号,如776,而非其实际的编号。

该模块管理其收受到的发出彩信乞请的一个器重操作是查询数据库以赢得选取方短号对应的真实号码(长号)。该操作只怕因为数据库故障而小败,进而使一切伏乞不可能继续被拍卖。而数据库故障是可过来的故障,由此在短号转变为长号的进程中借使现身数据库非凡,能够先将全体下发彩信央浼音讯缓存到磁盘中,等到数据库复苏后,再从磁盘中读取央浼新闻,进行重试。为低价起见,大家能够由此Java的对象体系化API,将意味下发彩信的靶子种类化到磁盘文件中之所以完结央求缓存。上边咱们批评那么些央浼缓存操作还供给酌量的此外因素,甚至Active
Object格局如何支持大家满意那些思忖。

首先,央求新闻缓存到磁盘中关系文件I/O这种慢的操作,大家不愿意它在伏乞管理的主线程(即Web服务器的干活线程)中执行。因为那样会使该模块的响应延时增大,缩小系统的响应性。并使得Web服务器的办事线程因等待文件I/O而消沉了系统的吞吐量。此时,异步处理就派上用项了。Active
Object格局能够援助我们贯彻诉求缓存那一个任务的交付和推行分离:任务的付出是在Web服务器的专门的学业线程中达成,而任务的试行(满含类别化对象到磁盘文件中等操作)则是在Active
Object专门的学业线程中奉行。那样,乞求管理的主线程在侦测到短号转长号败北时就能够以触发对日前彩信下发央浼实行缓存,接着继续其诉求管理,如给客商端响应。而此时,当前号召音讯或者正在被Active
Object线程缓存到文件中。如图4所示。

图 4 .异步达成缓存

澳门新葡萄京官网注册 4

其次,每一种短号转长号战败的彩信下发乞请音讯会被缓存为三个磁盘文件。但我们不希望那些缓存文件被存在同二个子目录下。而是希望多少个缓存文件会被存放到七个子目录中。每一种子目录最多能够储存钦定个数(如二〇〇一个)的缓存文件。若当前子目录已存满,则新建五个子目录存放新的缓存文件,直到该子目录也存满,就那样类推。当那几个子目录的个数到达钦定数量(如九十九个)时,最老的子目录(连同其下的缓存文件,假诺局地话)会被删除。进而保障子目录的个数也是长久的。显著,在出现情形下,达成这种垄断须要部分产出国访问谈调控(如通过锁来决定),可是大家不指望这种垄断(monopoly卡塔尔国揭破给处理央求的此外轮代理公司码。而Active
Object形式中的Proxy参加者可以协理大家封装并发访谈调整。

上面,我们看该案例的连带代码通过应用Active
Object格局在达成缓存功效时知足上述五个对象。首先看乞请管理的入口类。该类正是本案例的Active
Object格局的客调用方代码。如项目清单2所示。

清单 2. 彩信下发央求管理的入口类

public class MMSDeliveryServlet extends HttpServlet {

    private static final long serialVersionUID = 5886933373599895099L;

    @Override
    public void doPost(HttpServletRequest req, HttpServletResponse resp)
            throws ServletException, IOException {
        //将请求中的数据解析为内部对象
        MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
        Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
        Recipient originalNumberRecipient = null;

        try {
            // 将接收方短号转换为长号
            originalNumberRecipient = convertShortNumber(shortNumberRecipient);
        } catch (SQLException e) {

            // 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存
            AsyncRequestPersistence.getInstance().store(mmsDeliverReq);

            // 继续对当前请求的其它处理,如给客户端响应
            resp.setStatus(202);
        }

    }

    private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
        MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
        //省略其它代码
        return mmsDeliverReq;
    }

    private Recipient convertShortNumber(Recipient shortNumberRecipient)
            throws SQLException {
        Recipient recipent = null;
        //省略其它代码
        return recipent;
    }

}

事项清单第22中学的doPost方法在侦测到短号调换进程中产生的数据库卓殊后,通过调用AsyncRequestPersistence类的store方法触发对彩信下发恳求消息的缓存。这里,AsyncRequestPersistence类也等于Active
Object格局中的Proxy出席者。纵然本案例涉及的是三个涌出景况,但从清单2中的代码可以预知,AsyncRequestPersistence类的调用方代码不要求管理多线程同步难点。那是因为八线程同步难题被封装在AsyncRequestPersistence类之后。

AsyncRequestPersistence类的代码如清单3所示。

清单 3. 彩信下发央求缓存入口类(Active Object方式的Proxy)

// ActiveObjectPattern.Proxy
public class AsyncRequestPersistence implements RequestPersistence {
    private static final long ONE_MINUTE_IN_SECONDS = 60;
    private final Logger logger;
    private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
    private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);

    // ActiveObjectPattern.Servant
    private final DiskbasedRequestPersistence 
                        delegate = new DiskbasedRequestPersistence();
    // ActiveObjectPattern.Scheduler
    private final ThreadPoolExecutor scheduler;

    private static class InstanceHolder {
        final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
    }

    private AsyncRequestPersistence() {
        logger = Logger.getLogger(AsyncRequestPersistence.class);
        scheduler = new ThreadPoolExecutor(1, 3, 
                60 * ONE_MINUTE_IN_SECONDS,
                TimeUnit.SECONDS,
                // ActiveObjectPattern.ActivationQueue
                new LinkedBlockingQueue(200), 
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t;
                        t = new Thread(r, "AsyncRequestPersistence");
                        return t;
                    }

                });

        scheduler.setRejectedExecutionHandler(
                new ThreadPoolExecutor.DiscardOldestPolicy());

        // 启动队列监控定时任务
        Timer monitorTimer = new Timer(true);
        monitorTimer.scheduleAtFixedRate(
            new TimerTask() {

            @Override
            public void run() {
                if (logger.isInfoEnabled()) {

                    logger.info("task count:" 
                            + requestSubmittedPerIterval
                            + ",Queue size:" 
                            + scheduler.getQueue().size()
                            + ",taskTimeConsumedPerInterval:"
                            + taskTimeConsumedPerInterval.get() 
                            + " ms");
                }

                taskTimeConsumedPerInterval.set(0);
                requestSubmittedPerIterval.set(0);

            }
        }, 0, ONE_MINUTE_IN_SECONDS * 1000);
    }

    public static RequestPersistence getInstance() {
        return InstanceHolder.INSTANCE;
    }

    @Override
    public void store(final MMSDeliverRequest request) {
        /*
         * 将对store方法的调用封装成MethodRequest对象, 并存入缓冲区。
         */
        // ActiveObjectPattern.MethodRequest
        Callable methodRequest = new Callable() {
            @Override
            public Boolean call() throws Exception {
                long start = System.currentTimeMillis();
                try {
                    delegate.store(request);
                } finally {
                    taskTimeConsumedPerInterval.addAndGet(
                            System.currentTimeMillis() - start);
                }

                return Boolean.TRUE;
            }

        };
        scheduler.submit(methodRequest);

        requestSubmittedPerIterval.incrementAndGet();
    }

}

AsyncRequestPersistence类所完成的接口RequestPersistence定义了Active
Object对外暴光的异步方法:store方法。由于本案例不关怀伏乞缓存的结果,故该措施未有重回值。其代码如清单4所示。

清单 4. RequestPersistence接口源码

public interface RequestPersistence {

     void store(MMSDeliverRequest request);
}

AsyncRequestPersistence类的实例变量scheduler也正是Active
Object形式中的Scheduler出席者实例。这里大家直接动用了JDK1.5引进的Executor
Framework中的ThreadPoolExecutor。在ThreadPoolExecutor类的实例化时,其构造器的第5个参数(BlockingQueue<Runnable>
workQueue)大家钦赐了叁个有界窒碍队列:new
LinkedBlockingQueue<Runnable>(200State of Qatar。该队列约等于Active
Object方式中的ActivationQueue出席者实例。

AsyncRequestPersistence类的实例变量delegate相当于Active
Object情势中的Servant加入者实例。

AsyncRequestPersistence类的store方法应用无名氏类生成八个java.util.concurrent.Callable实例methodRequest。该实例也正是Active
Object情势中的MethodRequest参预者实例。利用闭包(Closure),该实例封装了对store方法调用的上下文新闻(包涵调用参数、所调用的方法对应的操作消息)。AsyncRequestPersistence类的store方法通过调用scheduler的submit方法,将methodRequest送入ThreadPoolExecutor所保险的缓冲区(梗塞队列)中。确切地说,ThreadPoolExecutor是Scheduler参加者的一个“类似”达成。ThreadPoolExecutor的submit方法相对于Scheduler的enqueue方法,该方式用于吸收接纳MethodRequest对象,以将其存入缓冲区。当ThreadPoolExecutor当前使用的线程数量低于其主导线程数量时,submit方法所收受的职分会间接被新建的线程施行。当ThreadPoolExecutor当前使用的线程数量超过其大旨线程数时,submit方法所收到的天职才会被存入其保障的拥塞队列中。可是,ThreadPoolExecutor的这种任务管理体制,并不要紧碍大家将它用作Scheduler的得以完结。

methodRequest的call方法会调用delegate的store方法来真正贯彻诉求缓存效能。delegate实例对应的类DiskbasedRequestPersistence是呼吁音信缓存作用的真的实现者。其代码如项目清单5所示。

清单 5. DiskbasedRequestPersistence类的源码

public class DiskbasedRequestPersistence implements RequestPersistence {
    // 负责缓存文件的存储管理
    private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
    private final Logger logger = Logger
                                     .getLogger(DiskbasedRequestPersistence.class);

    @Override
    public void store(MMSDeliverRequest request) {
        // 申请缓存文件的文件名
        String[] fileNameParts = storage.apply4Filename(request);
        File file = new File(fileNameParts[0]);
        try {
            ObjectOutputStream objOut = new ObjectOutputStream(
            new FileOutputStream(file));
            try {
                objOut.writeObject(request);
            } finally {
                objOut.close();
            }
        } catch (FileNotFoundException e) {
            storage.decrementSectionFileCount(fileNameParts[1]);
            logger.error("Failed to store request", e);
        } catch (IOException e) {
            storage.decrementSectionFileCount(fileNameParts[1]);
            logger.error("Failed to store request", e);
        }

    }

    class SectionBasedDiskStorage {
        private Deque sectionNames = new LinkedList();
        /*
         * Key->value: 存储子目录名->子目录下缓存文件计数器
         */
        private Map sectionFileCountMap 
                        = new HashMap();
        private int maxFilesPerSection = 2000;
        private int maxSectionCount = 100;
        private String storageBaseDir = System.getProperty("user.dir") + "/vpn";

        private final Object sectionLock = new Object();

        public String[] apply4Filename(MMSDeliverRequest request) {
            String sectionName;
            int iFileCount;
            boolean need2RemoveSection = false;
            String[] fileName = new String[2];
            synchronized (sectionLock) {
                //获取当前的存储子目录名
                sectionName = this.getSectionName();
                AtomicInteger fileCount;
                fileCount = sectionFileCountMap.get(sectionName);
                iFileCount = fileCount.get();
                //当前存储子目录已满
                if (iFileCount >= maxFilesPerSection) {
                    if (sectionNames.size() >= maxSectionCount) {
                        need2RemoveSection = true;
                    }
                    //创建新的存储子目录
                    sectionName = this.makeNewSectionDir();
                    fileCount = sectionFileCountMap.get(sectionName);

                }
                iFileCount = fileCount.addAndGet(1);

            }

            fileName[0] = storageBaseDir + "/" + sectionName + "/"
                + new DecimalFormat("0000").format(iFileCount) + "-"
                + request.getTimeStamp().getTime() / 1000 + "-" 
                               + request.getExpiry()
                + ".rq";
            fileName[1] = sectionName;

            if (need2RemoveSection) {
                //删除最老的存储子目录
                String oldestSectionName = sectionNames.removeFirst();
                this.removeSection(oldestSectionName);
            }

            return fileName;
        }

        public void decrementSectionFileCount(String sectionName) {
            AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
            if (null != fileCount) {
                fileCount.decrementAndGet();
            }
        }

        private boolean removeSection(String sectionName) {
            boolean result = true;
            File dir = new File(storageBaseDir + "/" + sectionName);
            for (File file : dir.listFiles()) {
                result = result && file.delete();
            }
            result = result && dir.delete();
            return result;
        }

        private String getSectionName() {
            String sectionName;

            if (sectionNames.isEmpty()) {
                sectionName = this.makeNewSectionDir();

            } else {
                sectionName = sectionNames.getLast();
            }

            return sectionName;
        }

        private String makeNewSectionDir() {
            String sectionName;
            SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss");
            sectionName = sdf.format(new Date());
            File dir = new File(storageBaseDir + "/" + sectionName);
            if (dir.mkdir()) {
                sectionNames.addLast(sectionName);
                sectionFileCountMap.put(sectionName, new AtomicInteger(0));
            } else {
                throw new RuntimeException(
                                 "Cannot create section dir " + sectionName);
            }

            return sectionName;
        }
    }
}

methodRequest的call方法的调用者代码是运作在ThreadPoolExecutor所保险的劳引力线程中,那就确定保证了store方法的调用方和实在的实施方是分别运营在差异的线程中:服务器专门的学问线程担当触发乞求新闻缓存,ThreadPoolExecutor所保证的劳作线程担当将倡议音讯系列化到磁盘文件中。

DiskbasedRequestPersistence类的store方法中调用的SectionBasedDiskStorage类的apply4Filename方法包括了部分八线程同步调整代码(见项目清单5)。那某些决定是因为是封装在DiskbasedRequestPersistence的里边类中,对于此类之外的代码是不可知的。由此,AsyncRequestPersistence的调用方代码不恐怕领悟该细节,那呈现了Active
Object格局对现身访谈调控的包装。

缓冲区监察和控制

借使ActivationQueue是有界缓冲区,则对缓冲区的一时一刻大小进行监督无论是对于运营依然测验来讲都有其含义。从测验的角度来看,监察和控制缓冲区有扶持鲜明缓冲区容积的提议值(合理值)。清单3所示的代码,便是通过依期职责周期性地调用ThreadPoolExecutor的getQueue方法对缓冲区的大小实行监察。当然,在监督检查缓冲区的时候,往往只要求大致的值,由此在督察代码中要防止不需要的锁。

言传身教程序:

代码可上github下载: https://github.com/cloudchou/Multithread_ActiveObject

类图如下图所示(请点击看大图卡塔尔(قطر‎:

澳门新葡萄京官网注册 5

那是二个特别复杂的格局,ActiveObject接口里的每一种方法对应MethodRequest的四个子类,每个方法的参数对应着MethodRequest的叁个字段,因为ActiveObject的一些方法有重临值,故此设计了Result抽象类,表示重回值,为了让调用和施行抽离,这里运用了Future方式,故此设计了八个类,Result,FutureResult,RealResult。

也是为了分离调用和实践,还运用了劳动者消费者形式,将调用转化为倡议对象放置ActivationQueue里,由SchedulerThread实例从ActivationQueue里不断抽出央求对象,并实施。

小结

本篇介绍了Active
Object形式的用意及构造,并以八个实在的案例展现了该方式的代码完毕。下篇将对Active
Object形式展开探讨,并结合本文案例介绍实际运用Active
Object方式时索要潜心的一些事项。

缓冲区饱和处理政策

当职分的交给速率大于职务的实施数率时,缓冲区可能慢慢积压到满。这个时候新交付的职责会被反驳回绝。无论是本身编辑代码依旧利用JDK现成类来促成Scheduler,对于缓冲区满时新职务交给失败,大家须要叁个管理政策用于决定当时哪些职责会形成“牺牲品”。若使用ThreadPoolExecutor来兑现Scheduler有个便宜是它曾经提供了多少个缓冲区饱和管理政策的实今世码,应用代码能够平素调用。如项目清单3的代码所示,本文案例中大家挑选了扬弃最老的职务作为管理政策。java.util.concurrent.RejectedExecutionHandler接口是ThreadPoolExecutor对缓冲区饱和处理政策的肤浅,JDK中提供的切实贯彻如表2所示。

表 2. JDK提供的缓冲区饱和管理政策完成类

实现类 所实现的处理策略
ThreadPoolExecutor.AbortPolicy 直接抛出异常。
ThreadPoolExecutor.DiscardPolicy 放弃当前被拒绝的任务(而不抛出任何异常)。
ThreadPoolExecutor.DiscardOldestPolicy 将缓冲区中最老的任务放弃,然后重新尝试接纳被拒绝的任务。
ThreadPoolExecutor.CallerRunsPolicy 在任务的提交方线程中运行被拒绝的任务。

人之常情,对于ThreadPoolExecutor来讲,其职业行列满不必然就象征新交付的天职会被驳倒。当其最大线程池大小大于其宗旨线程池大时辰,专门的学业行列满的景况下,新交付的职分会用全部骨干线程之外的新添线程来实践,直到工作线程数达到最大线程数时,新交付的职分会被驳回。

适用项景:

那个设计格局极其复杂,是或不是适当要考虑难题的局面,只有周围的难点才合乎利用该形式。

Scheduler空闲专门的学业线程清理

假若Scheduler采纳七个专业线程(如运用ThreadPoolExecutor那样的线程池)来实施职责。则大概须求清理空闲的线程以节约财富。清单3的代码就是直接运用了ThreadPoolExecutor的共处成效,在开首化其实例时经过点名其构造器的第3、4个参数(
long keepAliveTime, TimeUnit
unit),告诉ThreadPoolExecutor对于基本职业线程以外的线程若其已经没事了点名时间,则将其清理掉。

注意事项:

因为这一个设计格局特别复杂,故此大家在使用时,一定留心各类对象的方法由哪些线程调用。例如Proxy对象的办法恐怕被五个线程相同的时间调用,而Servant对象被查封在Scheduler线程里,唯有SchedulerThread线程才会调用它的章程,故此它是线程安全的,而RealResult大概会被八个线程使用,但它是Immutable的,FutureResult大概被三个线程同不经常间调用,它包裹了两个字段,故此需求采纳synchronized爱护,况且是使用Guarded
Suspension方式爱护FutureResult。

转载 

可复用的Active Object格局完毕

就算接纳JDK中的现存类能够大幅地简化Active
Object情势的落到实处。但如若须要一再地在差别场景下行使Active
Object格局,则须求一套更有益复用的代码,以节约编码的年华和使代码特别轻易理解。项目清单7呈现一段基于Java动态代理的可复用的Active
Object格局的Proxy参预者的落实代码。

清单 7. 可复用的Active Object情势Proxy参加者实现

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public abstract class ActiveObjectProxy {

    private static class DispatchInvocationHandler implements InvocationHandler {
        private final Object delegate;
        private final ExecutorService scheduler;

        public DispatchInvocationHandler(Object delegate,
            ExecutorService executorService) {
            this.delegate = delegate;
            this.scheduler = executorService;
        }

        private String makeDelegateMethodName(final Method method,
            final Object[] arg) {
            String name = method.getName();
            name = "do" + Character.toUpperCase(name.charAt(0)) 
                    + name.substring(1);

            return name;
        }

        @Override
        public Object invoke(final Object proxy, final Method method,
            final Object[] args) throws Throwable {

            Object returnValue = null;
            final Object delegate = this.delegate;
            final Method delegateMethod;

            //如果拦截到的被调用方法是异步方法,则将其转发到相应的doXXX方法
            if (Future.class.isAssignableFrom(method.getReturnType())) {
                delegateMethod = delegate.getClass().getMethod(
                    makeDelegateMethodName(method, args),
                    method.getParameterTypes());

                final ExecutorService scheduler = this.scheduler;

                Callable<Object> methodRequest = new Callable<Object>() {
                    @Override
                    public Object call() throws Exception {
                        Object rv = null;

                        try {
                          rv = delegateMethod.invoke(delegate, args);
                        } catch (IllegalArgumentException e) {
                            throw new Exception(e);
                        } catch (IllegalAccessException e) {
                            throw new Exception(e);
                        } catch (InvocationTargetException e) {
                            throw new Exception(e);
                        }
                        return rv;
                    }
                };
                Future<Object> future = scheduler.submit(methodRequest);
                returnValue = future;

            } else {

                //若拦截到的方法调用不是异步方法,则直接转发
            delegateMethod = delegate.getClass()
            .getMethod(method.getName(),method.getParameterTypes());

                returnValue = delegateMethod.invoke(delegate, args);
            }

            return returnValue;
        }
    }

    /**
     * 生成一个实现指定接口的Active Object proxy实例。
     * 对interf所定义的异步方法的调用会被装发到servant的相应doXXX方法。
     * @param interf 要实现的Active Object接口
     * @param servant Active Object的Servant参与者实例
     * @param scheduler Active Object的Scheduler参与者实例
     * @return Active Object的Proxy参与者实例
     */
    public static <T> T newInstance(Class<T> interf, Object servant,
        ExecutorService scheduler) {

        @SuppressWarnings("unchecked")
        T f = (T) Proxy.newProxyInstance(interf.getClassLoader(),
        new Class[] { interf }, 
        new DispatchInvocationHandler(servant, scheduler));

        return f;
    }
}

清单7的代码达成了可复用的Active
Object格局的Proxy参预者ActiveObjectProxy。ActiveObjectProxy通过接收Java动态代理,动态变化钦定接口的代理对象。对该代理对象的异步方法(即再次回到值类型为java.util.concurrent.Future的主意)的调用会被ActiveObjectProxy达成InvocationHandler(DispatchInvocationHandler)所拦截,并转载给ActiveObjectProxy的newInstance方法中钦定的Servant管理。

项目清单8所示的代码体现了通过运用ActiveObjectProxy火速Active Object形式。

清单 8. 依据可复用的API快捷达成Active Object情势

public static void main(String[] args) throws 
    InterruptedException, ExecutionException {

    SampleActiveObject sao = ActiveObjectProxy.newInstance(
            SampleActiveObject.class, new SampleActiveObjectImpl(),
            Executors.newCachedThreadPool());
    Future<String> ft = sao.process("Something", 1);
    Thread.sleep(500);
    System.out.println(ft.get());

从项目清单8的代码可知,利用可复用的Active
Object格局Proxy实现,应用开拓人士只要内定Active
Object形式对外保留的接口(对应ActiveObjectProxy.newInstance方法的第四个参数),并提供三个该接口的贯彻类(对应ActiveObjectProxy.newInstance方法的第二个参数),再钦赐多个java.util.concurrent.Executor瑟维斯实例(对应ActiveObjectProxy.newInstance方法的第二个参数)即能够兑现Active
Object形式。

总结

正文介绍了Active
Object形式的计划及构造。并提供了二个其实的案例用于呈现使用Java代码完毕Active
Object情势,在那根底上对该格局进行了评价并分享了在实际上利用该方式时索要介意的事项。

参照他事他说加以考查资源

  • 正文的源代码在线阅读:
  • 维基百科Active
    Object方式词条:
  • DougRuss C. Schmidt对Active
    Object情势的概念:
  • Schmidt, Douglas et al. Pattern-Oriented Software Architecture
    Volume 2: Patterns for Concurrent and Networked Objects. Volume 2.
    Wiley, 2000
  • Java theory and practice: Decorating with dynamic
    proxies:

发表评论

电子邮件地址不会被公开。 必填项已用*标注