澳门新葡萄京官网首页计算机程序的思维逻辑 (79),思维79

最近公司项目很多地方使用多线程处理一些任务,逻辑代码和java多线程处理代码混合在一起,造成代码的可读性超级差,现在把Java多线程相关的处理抽出来,方面代码中重复使用。抽的不好,欢迎大家拍砖

在上一回合谈到,客户端应用程序的所有操作都在主线程上进行,所以一些比较耗时的操作可以在异步线程上去进行,充分利用CPU的性能来达到程序的最佳性能。对于Unity而言,又提供了另外一种『异步』的概念,就是协程(Coroutine),通过反编译,它本质上还是在主线程上的优化手段,并不属于真正的多线程(Thread澳门新葡萄京官网首页,)。那么问题来了,怎样在Unity中使用多线程呢?

计算机程序的思维逻辑 (79),思维79

上节,我们提到,在异步任务程序中,一种常见的场景是,主线程提交多个异步任务,然后希望有任务完成就处理结果,并且按任务完成顺序逐个处理,对于这种场景,Java并发包提供了一个方便的方法,使用CompletionService,这是一个接口,它的实现类是ExecutorCompletionService,本节我们就来探讨它们。

基本用法

接口和类定义

与77节介绍的ExecutorService一样,CompletionService也可以提交异步任务,它的不同是,它可以按任务完成顺序获取结果,其具体定义为:

public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

其submit方法与ExecutorService是一样的,多了take和poll方法,它们都是获取下一个完成任务的结果,take()会阻塞等待,poll()会立即返回,如果没有已完成的任务,返回null,带时间参数的poll方法会最多等待限定的时间。

CompletionService的主要实现类是ExecutorCompletionService,它依赖于一个Executor完成实际的任务提交,而自己主要负责结果的排队和处理,它的构造方法有两个:

public ExecutorCompletionService(Executor executor)
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

至少需要一个Executor参数,可以提供一个BlockingQueue参数,用作完成任务的队列,没有提供的话,ExecutorCompletionService内部会创建一个LinkedBlockingQueue。

基本示例

我们在77节的invokeAll的示例中,演示了并发下载并分析URL的标题,那个例子中,是要等到所有任务都完成才处理结果的,这里,我们修改一下,一有任务完成就输出其结果,代码如下:

public class CompletionServiceDemo {
    static class UrlTitleParser implements Callable<String> {
        private String url;

        public UrlTitleParser(String url) {
            this.url = url;
        }

        @Override
        public String call() throws Exception {
            Document doc = Jsoup.connect(url).get();
            Elements elements = doc.select("head title");
            if (elements.size() > 0) {
                return url + ": " + elements.get(0).text();
            }
            return null;
        }
    }

    public static void parse(List<String> urls) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        try {
            CompletionService<String> completionService = new ExecutorCompletionService<>(
                    executor);
            for (String url : urls) {
                completionService.submit(new UrlTitleParser(url));
            }
            for (int i = 0; i < urls.size(); i++) {
                Future<String> result = completionService.take();
                try {
                    System.out.println(result.get());
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            executor.shutdown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        List<String> urls = Arrays.asList(new String[] {
                "http://www.cnblogs.com/swiftma/p/5396551.html",
                "http://www.cnblogs.com/swiftma/p/5399315.html",
                "http://www.cnblogs.com/swiftma/p/5405417.html",
                "http://www.cnblogs.com/swiftma/p/5409424.html" });
        parse(urls);
    }
}

在parse方法中,首先创建了一个ExecutorService,然后才是CompletionService,通过后者提交任务、按完成顺序逐个处理结果,这样,是不是很方便?

基本原理

ExecutorCompletionService是怎么让结果有序处理的呢?其实,也很简单,如前所述,它有一个额外的队列,每个任务完成之后,都会将代表结果的Future入队。

那问题是,任务完成后,怎么知道入队呢?我们具体来看下。

在77节我们介绍过FutureTask,任务完成后,不管是正常完成、异常结束、还是被取消,都会调用finishCompletion方法,而该方法会调用一个done方法,该方法代码为:

protected void done() { }

它的实现为空,但它是一个protected方法,子类可以重写该方法。

在ExecutorCompletionService中,提交的任务类型不是一般的FutureTask,而是一个子类QueueingFuture,如下所示:

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

该子类重写了done方法,在任务完成时将结果加入到完成队列中,其代码为:

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

ExecutorCompletionService的take/poll方法就是从该队列获取结果,如下所示:

public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}

实现invokeAny

我们在77节提到,AbstractExecutorService的invokeAny的实现,就利用了ExecutorCompletionService,它的基本思路是,提交任务后,通过take方法获取结果,获取到第一个有效结果后,取消所有其他任务,不过,它的具体实现有一些优化,比较复杂。我们看一个模拟的示例,从多个搜索引擎查询一个关键词,但只要任意一个的结果就可以,模拟代码如下:

public class InvokeAnyDemo {
    static class SearchTask implements Callable<String> {
        private String engine;
        private String keyword;

        public SearchTask(String engine, String keyword) {
            this.engine = engine;
            this.keyword = keyword;
        }

        @Override
        public String call() throws Exception {
            // 模拟从给定引擎搜索结果
            Thread.sleep(engine.hashCode() % 1000);
            return "<result for> " + keyword;
        }
    }

    public static String search(List<String> engines, String keyword)
            throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletionService<String> cs = new ExecutorCompletionService<>(executor);
        List<Future<String>> futures = new ArrayList<Future<String>>(
                engines.size());
        String result = null;
        try {
            for (String engine : engines) {
                futures.add(cs.submit(new SearchTask(engine, keyword)));
            }
            for (int i = 0; i < engines.size(); i++) {
                try {
                    result = cs.take().get();
                    if (result != null) {
                        break;
                    }
                } catch (ExecutionException ignore) {
                    // 出现异常,结果无效,继续
                }
            }
        } finally {
            // 取消所有任务,对于已完成的任务,取消没有什么效果
            for (Future<String> f : futures)
                f.cancel(true);
            executor.shutdown();
        }
        return result;
    }

    public static void main(String[] args) throws InterruptedException {
        List<String> engines = Arrays.asList(new String[] { "www.baidu.com",
                "www.sogou.com", "www.so.com", "www.google.com" });
        System.out.println(search(engines, "老马说编程"));
    }
}

SearchTask模拟从指定搜索引擎查询结果,search利用CompletionService/ExecutorService执行并发查询,在得到第一个有效结果后,取消其他任务。

小结

本节比较简单,主要就是介绍了CompletionService的用法和原理,它通过一个额外的结果队列,方便了对于多个异步任务结果的处理。

下一节,我们来探讨一种常见的需求 – 定时任务。

(与其他章节一样,本节所有代码位于
)


未完待续,查看最新文章,敬请关注微信公众号“老马说编程”(扫描下方二维码),从入门到高级,深入浅出,老马和你一起探索Java编程及计算机技术的本质。用心原创,保留所有版权。

澳门新葡萄京官网首页 1

(79),思维79
上节,我们提到,在异步任务程序中,一种常见的场景是,主线程提交多个异步任务,然后希望有任务…

使用方法很简单,有两种使用方法

Thread 初步认识

虽然这不是什么难点,但我觉得还是有必要提一下多线程编程几个值得注意的事项:

  • 线程启动

在Unity中创建一个异步线程是非常简单的,直接使用类System.Threading.Thread就可以创建一个线程,线程启动之后毕竟要帮我们去完成某件事情。在编程领域,这件事就可以描述了一个方法,所以需要在构造函数中传入一个方法的名称。

Worker workerObject = new Worker();
Thread workerThread = new Thread(workerObject.DoWork)
workerThread.Start();
  • 线程终止

线程启动很简单,那么线程终止呢,是不是调用Abort方法。不是,虽然Thread对象提供了Abort方法,但并不推荐使用它,因为它并不会马上停止,如果涉及非托管代码的调用,还需要等待非托管代码的处理结果。

一般停止线程的方法是为线程设定一个条件变量,在线程的执行方法里设定一个循环,并以这个变量为判断条件,如果为false则跳出循环,线程结束。

public class Worker
{
    public void DoWork()
    {
        while (!_shouldStop)
        {
            Console.WriteLine("worker thread: working...");
        }
        Console.WriteLine("worker thread: terminating gracefully.");
    }
    public void RequestStop()
    {
        _shouldStop = true;
    }
    private volatile bool _shouldStop;
}

所以,你可以在应用程序退出(OnApplicationQuit)时,将_shouldStop设置为true来到达线程的安全退出。

  • 共享数据处理

多线程最麻烦的一点就是共享数据的处理了,想象一下A,B两个线程同一时刻处理一个变量,它最终的值到底是什么。所以一般需要使用lock,但C#提供了另一个关键字volatile,告诉CPU不读缓存直接把最新的值返回。所以_shouldStopvolatile修饰。

1.直接传递一批任务给到多线程处理方法,返回处理结果

Dispatcher的引入

是不是觉得多线程好简单,好像也没想象的那么复杂,当你愉快的在多线程中访问UI控件时,Duang~~~,一个错误告诉你,不能在异步线程访问UI控件。这是肯定的,跨线程访问UI控件是不安全的,理应被禁止。那怎么办呢?

如果你有其他客户端的开发经验,比如iOS或者WPF经验,肯定知道Dispatcher。Dispatcher翻译过来就是调度员的意思,简单理解就是每个线程都有唯一的调度员,那么主线程就有主线程的调度员,实际上我们的代码最终也是交给调度员去执行,所以要去访问UI线程上的控件,我们可以间接的向调度员发出命令。

所以在WPF中,跨线程访问UI控件一般的写法如下:

Thread thread=new Thread(()=>{
    this.Dispatcher.Invoke(()=>{
        //UI
        this.textBox.text=...
        this.progressBar.value=...
    });
});

嗯~ o( ̄▽ ̄)o,不错,但尴尬的是Unity没有提供Dispatcher啊!

对,但我们可以自己实现,把握住几个关键点:

  • 自己的Dispatcher一定是一个MonoBehaviour,因为访问UI控件需要在主线程上
  • 什么时候去更新呢,考虑生产者-消费者模式,有任务来了,我就是更新到UI上
  • 在Unity中有这么个方法可以轮询是不是有任务要更新,那就是Update方法,每一帧会执行

所以自定义的UnityDispatcher提供一个BeginInvoke方法,并接送一个Action

public void BeginInvoke(Action action){
    while (true) {
        //以原子操作的形式,将 32 位有符号整数设置为指定的值并返回原始值。
        if (0 == Interlocked.Exchange (ref _lock, 1)) {
            //acquire lock
            _wait.Enqueue(action);
            _run = true;
            //exist
            Interlocked.Exchange (ref _lock,0);
            break;
        }

    }

}

这是一个生产者,向队列里添加需要处理的Action。有了生产者之后,还需要消费者,Unity中的Update就是一个消费者,每一帧都会执行,所以如果队列里有任务,它就执行

 void Update(){

    if (_run) {
        Queue<Action> execute = null;
        //主线程不推荐使用lock关键字,防止block 线程,以至于deadlock
        if (0 == Interlocked.Exchange (ref _lock, 1)) {

            execute = new Queue<Action>(_wait.Count);

            while(_wait.Count!=0){

                Action action = _wait.Dequeue ();
                execute.Enqueue (action);

            }
            //finished
            _run=false;
            //release
            Interlocked.Exchange (ref _lock,0);
        }
        //not block
        if (execute != null) {

            while (execute.Count != 0) {

                Action action = execute.Dequeue ();
                action ();
            }
        }

    }
}

值得注意的是,Queue不是线程安全的,所以需要锁,我使用了Interlocked.Exchange,好处是它以原子的操作来执行并且还不会阻塞线程,因为主线程本身任务繁重,所以我不推荐使用lock

代码如下:

Coroutine和MultiThreading混合使用

到目前为止,相信你对CoroutineThread有清楚的认识,但它们并不是互斥的,可以混合使用,比如Coroutine等待异步线程返回结果,假设异步线程里执行的是非常复杂的AI操作,这显然放在主线程会非常繁重。

由于篇幅有限,我不贴完整代码了,只分析其中最核心思路:
Thread中有一个WaitFor方法,它每一帧都会询问异步任务是否完成:

public bool Update(){
    if(_isDown){
        OnFinished ();
        return true;

    }
    return false;
}
public IEnumerator WaitFor(){
    while(!Update()){
        //暂停协同程序,下一帧再继续往下执行
        yield return null;
    }
}

那么在某一个UI线程中,等待异步线程的结果,注意利用StartCouroutine,此等待并非阻塞线程,相信你已经它内部的机制了。

void Start(){

    Debug.Log("Main Thread :"+Thread.CurrentThread.ManagedThreadId+" work!");
    StartCoroutine (Move());
}

IEnumerator Move()
{
    pinkRect.transform.DOLocalMoveX(250, 1.0f);
    yield return new WaitForSeconds(1);
    pinkRect.transform.DOLocalMoveY(-150, 2);
    yield return new WaitForSeconds(2);
    //AI操作,陷入深思,在异步线程执行,GreenRect不会卡顿
    job.Start();
    yield return StartCoroutine (job.WaitFor());
    pinkRect.transform.DOLocalMoveY(150, 2);

}
/**
 * Created with IntelliJ IDEA.
 * 测试多线程处理任务
 * className: TaskMulThreadServiceTest
 *
 * @version 1.0
 *          Date Time: a
 *@author: ddys
 */
public class TaskMulThreadServiceTest extends TestCase implements ITaskHandle<String,Boolean>{

    public void testExecute() throws Exception {
        String [] taskItems = new String[100];
        for (int i=0;i<100;i++){
            taskItems[i]="任务"+i;
        }
        IMulThreadService<String,Boolean> mulThreadService = new TaskMulThreadService(this);
        long start = System.currentTimeMillis();
        List<Boolean> result = mulThreadService.execute(taskItems);
        for (Boolean e : result){
            if(!e){
                System.out.println("任务处理失败");
            }
        }
        System.out.println("所有任务处理完成,耗时"+(System.currentTimeMillis()-start)+",任务成功数"+result.size());
    }

    /**
     * Created with IntelliJ IDEA.
     * 执行任务,返回所有执行的结果
     * className: TaskMulThreadService
     *
     * @author: ddys
     * @version 1.0
     * Date Time:
     */
    public Boolean execute(String s) {
        System.out.println(Thread.currentThread().getId()+"线程正在处理"+s);
        return true;
    }
}

小结

这两篇文章为大家介绍了怎样在Unity中使用协程和多线程,多线程其实不难,但同步数据是最麻烦的。Coroutine实际上就是IEnumeratoryield这两个语法糖让我们很难理解其中的奥秘,推荐使用反编译工具去查看,相信你会豁然开朗。
源代码托管在Github上,点击此了解

2.附带一个查询任务的方法,实现这个查询任务方法和业务处理方法,然后执行返回处理结果

代码如下:

ate Time: a
 *@author: XWK
 */
public class SelectTaskMulThreadServiceTest extends TestCase implements ISelectTask<String,Boolean>{

    public void testExecute() throws Exception {
        IMulThreadService<String,Boolean> mulThreadService = new SelectTaskMulThreadService(this);
        long start = System.currentTimeMillis();
        List<Boolean> result = mulThreadService.execute();
        for (Boolean e : result){
            if(!e){
                System.out.println("任务处理失败");
            }
        }
        System.out.println("所有任务处理完成,耗时"+(System.currentTimeMillis()-start)+",任务成功数"+result.size());
    }
    /**
     * Created with IntelliJ IDEA.
     * 执行任务,返回所有执行的结果
     * className: TaskMulThreadService
     *
     * @author: ddys
     * @version 1.0
     * Date Time:
     */
    public Boolean execute(String s) {
        System.out.println(Thread.currentThread().getId()+"线程正在处理"+s);
        return true;
    }

    /**
     * @param 'a 传递参数
     * @return a 回类型
     * @throws
     * @Title: a
     * @Description: 获取一批任务
     * @author ddys
     * @date 2015-11-15 21:09
     */
    public String[] getTaskItem() {
        String [] taskItems = new String[100];
        for (int i=0;i<100;i++){
            taskItems[i]="任务"+i;
        }
        return taskItems;
    }
}

发表评论

电子邮件地址不会被公开。 必填项已用*标注