`
周凡杨
  • 浏览: 229688 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Callable<V>、Future<V>详解 | Executor框架

阅读更多
 

一:关于 Callable<V>的源码

package java.util.concurrent;

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable<V>: 返回结果并且可能抛出异常的任务。实现者定义了一个不带任何参数的叫做 call的方法。 Callable<V>接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable不会返回结果,并且无法抛出经过检查的异常。 Executors类包含一些从其他普通形式转换成 Callable<V>类的实用方法。

 

 

在并发编程时,一般使用Runnable接口,然后扔给线程池完事,这种情况下不需要线程的结果。 所以run()的返回值是void类型。如下代码所示:

 
package future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureTest {
	public static class Task implements Runnable{
		@Override
		public void run() {
			System.out.println("做任务----"+Thread.currentThread().getName());
		}
	}
	public static void main(String[] args) {
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			es.submit(new Task());
		}
	}
}
 

 

如果是一个多线程协作程序,比如菲波拉切数列,112358...使用多线程来计算。
但后者需要前者的结果,就需要用Callable<V>接口了。如下代码所示:

 

package future;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureTest2 {
	public static class Task implements Callable<String>{
		@Override
		public String call() throws Exception {
			System.out.println("做任务----"+Thread.currentThread().getName());
			return "返回运算结果";
		}
	}
	public static void main(String[] args) {
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			es.submit(new Task());
		}
	}
}

 

 

我们看到,当Task类实现Callable接口后,重写了call()方法,call()方法是有返回值的。但我们如何来接受call()方法的返回值呢?

 

package future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureTest3 {
	public static class Task implements Callable<String>{
		@Override
		public String call() throws Exception {
			System.out.println("做任务----"+Thread.currentThread().getName());
			return "返回运算结果";
		}
	}
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		
		List<Future<String>> result = new ArrayList<Future<String>>();
		
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			result.add(es.submit(new Task()));
		}
		for(Future<String> f : result){
			System.out.println(f.get());
		}
	}
}

 

从这里可以看出,这时候,Future<V>就出场了,使用Future<V>接口的get()方法可以获得call()方法返回的结果,当调用Futureget()方法时,当前线程就开始阻塞,直到call()方法结束返回结果。

 

Future模式可以这样来描述:我有一个任务,提交给了FutureFuture我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。就相当于下了一张订货单,一段时间后可以拿着提订单来提货,这期间可以干别的任何事情。其中Future接口就是订货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。

 

 

二:关于 Future<V>的源码

 

 

package java.util.concurrent;

/**
* Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使
* 用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确     * 定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结     * 果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
*
  * @see FutureTask
  * @see Executor
  * @since 1.5
  * @author Doug Lea
  * @param<V> The result type returned by this Future's <tt>get</tt> method
  */
publicinterface Future<V> {

    /**
     * 试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel * 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数     
     * 确定是否应该以试图停止任务的方式来中断执行此任务的线程。
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 如果在任务正常完成前将其取消,则返回 true。   
     */
    boolean isCancelled();

    /**
     * 如果任务已完成,则返回 true。可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。    
     */
    boolean isDone();

    /**
     * 如有必要,等待计算完成,然后获取其结果。 
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
     */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

 

1)   Future接口是一个泛型接口,严格的格式应该是Future<V>,其中V代表了Future执行的任务返回值的类型

2)   Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间。这个设置超时的方法就是实现Java程序执行超时的关键。

 

 

 

 

三:关于 Future实现

 

 
   



 

 

Future的实现类有FutureTaskSwingWork,其中SwingWork用于GUI编程模块,在并发包里经常用到的是FutureTask,  这里主要介绍一下FutureTask

 

 

类声明部分

 

public class FutureTask<V> implements RunnableFuture<V> {}

 

 

FutureTask: 实现了RunnableFuture<V>接口,而RunnableFuture<V>又是extends(继承) RunnableFuture<V>两个接口,所以它既可以作为Runnable被线程执行,又可以作为Future<V>得到 Callable<V>的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组 合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future<V>得到,岂不美哉!

 

变量部分


private volatile int state;
private static final int NEW          = 0;               //新建  
private static final int COMPLETING   = 1;              //执行中 
private static final int NORMAL       = 2;               //正常
private static final int EXCEPTIONAL  = 3;              //异常       
private static final int CANCELLED    = 4;              //取消
private static final int INTERRUPTING = 5;             //中断中
private static final int INTERRUPTED  = 6;             //被中断
 

state:任务的状态,最初是 NEW,完成期间,状态也许暂时呈现为COMPLETING(当结果已经被设值)或者INTERRUPTING(only while interrupting the runner to satisfy a cancel(true)),

 

可能出现的状态转换情况:

* NEW -> COMPLETING -> NORMAL                    正常完成的流程

* NEW -> COMPLETING -> EXCEPTIONAL              出现异常的流程

* NEW -> CANCELLED                                 被取消的流程

* NEW -> INTERRUPTING -> INTERRUPTED            被中断的流程

构造函数:

   /**
     *  创建一个 FutureTask,一旦运行就执行给定的 Callable。    
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            thrownew NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     *  创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
 

从构造函数中可以看出,当构造一个FutureTask时,state会被置为NEWNEW也就是所有状态变化路径的起始状态

 

FutureTask生命周期的变化,主要取决于 run()方法先被调用还是cancel ()方法会被调用,这两个方法的执行顺序决定了FutureTask的生命周期的四种走向。

 

先来看run()方法

 

publicvoid run() {
     //首先判断任务的状态,如果任务的状态值不为NEW,或 runner变量的值不为null,则返回(说明正在走或已经走了4种状态变化的一种)
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
//如果状态值是NEW,则开始执行任务
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) { //如果任务不为空,且状态为NEW,开始执行
                V result;                         //任务返回的结果
                boolean ran;
                try {
                    result = c.call();          //执行任务并返回结果
                    ran = true;                  //标记任务执行成功
                } catch (Throwable ex) {  //任务执行中发生异常
                    result = null;
                    ran = false;
                    setException(ex);           //设置异常
                }
                if (ran)                          //任务执行成功,设置结果
                    set(result);
            }
        } finally {
            runner = null;                       //runner置为null
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
 

 

1)任务执行成功,会调用set()方法设置结果

protected void set(V v) {
//如过state是NEW,把state设置成COMPLETING 
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          outcome = v;
        //将任务设置成NORMAL   over the task  
          UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
          finishCompletion();
     }
}
 

 

set()方法可以看出,把任务运行的结果赋值给了outcome变量,这个执行流程导致的状态变化就是  NEW->COMPLETING->NORMAL   

 

2)任务执行中发生异常,会调用setException()方法

protecte dvoid setException(Throwable t) {
//如过state是NEW,把state设置成COMPLETING    
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          outcome = t;
        //将任务设置成EXCEPTIONAL  
          UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
          finishCompletion();
     }
}
 

 

这个执行流程导致的状态变化就是  NEW->COMPLETING->EXCEPTIONAL

 

再来看cancel()方法:

 

//参数:mayInterruptIfRunning   是否中断running
publicboolean cancel(boolean mayInterruptIfRunning) { 
     if (state != NEW)      //状态不为NEW,返回
          returnfalse;
     if (mayInterruptIfRunning) { //如果应该中断执行此任务的线程
          //如过state是NEW,把state设置成INTERRUPTING
          if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
               returnfalse;
          Thread t = runner;
          if (t != null)
                t.interrupt();
         //将任务设置成INTERRUPTED
          UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
     }
     //如过state是NEW,把state设置成CANCELLED
     elseif (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            returnfalse;
     finishCompletion();
     returntrue;
}

 

如果mayInterruptIfRunning==true,则流程为:NEW->INTERRUPTING ->INTERRUPTED, 否则流程为:NEW->CANCELLED

 

到此,四个流程走完了!

 

 

参考资料:

   JDK API文档

   http://blog.csdn.net/yangyan19870319/article/details/6093481

http://www.oschina.net/question/54100_83333

http://www.2cto.com/kf/201411/351903.html

http://blog.csdn.net/ghsau/article/details/7451464

http://blog.csdn.net/liulipuo/article/details/39029643

 

  • 大小: 6.1 KB
0
0
分享到:
评论

相关推荐

    Java JDK实例宝典

    11 线程——Callable和Future &lt;br&gt;16. 12 线程——任务执行架构 &lt;br&gt;16. 13 线程——锁Lock &lt;br&gt;16. 14 线程——条件Condition &lt;br&gt;16. 15 线程——Semaphore &lt;br&gt;16. 16 线程——CountDownLatch &lt;br&gt;16. 17 线程...

    james-thread.rar

    分布式系统 Callable&lt;JSONObject&gt; queryMoneyInfo = new Callable&lt;JSONObject&gt;(){ @Override public JSONObject call() throws Exception{ JSONObject my= null; return my; } }; ...

    Callable接口源码阅读1

    2.细看2.1接口说明2.1.1泛型参数说明Interface Callable&lt;V&gt;V - the result type of method call2.1

    \java超强笔记(超级经典)

    static &lt;E&gt; void copyArrayToList(E[] os,List&lt;E&gt; ls){ //泛型方法,正确的 } 泛型不能使用简单类型 GenList&lt;int&gt; nList = new GenList&lt;int&gt;(); //编译错误 泛型类不能是异常类,也就是该...

    【java面试题】综合性经典Java面试题

    Java具有简单性、面向对象、分布式、健壮性、安全性、平台独立与可移植性、多线程、动态性等特点 。Java可以编写桌面应用程序、Web...Callable是需要使用java.util.concurrent.ExecutorService.submit(Callable&lt;T&gt;)方

    java8源码-Java8:Java8

    Callable&lt;V&gt; { V call() throws Exception; } public interface ActionListener { void actionPerformed(ActionEvent e); } public interface Comparator&lt;T&gt; { int compare(T o1, T o2); boolean equals(Object obj)...

    线程超时死掉

    Future接口是一个泛型接口,严格的格式应该是Future&lt;V&gt;,其中V代表了Future执行的任务返回值的类型。 Future接口的方法介绍如下: boolean cancel(boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否...

    asyncall:轻量级异步并行调用工具

    Java轻量级异步并行调用工具基于java.util.concurrent线程池,主要做了如下简化或处理轻量级:没有通过使用代理降低侵入性,因为那样有很多因素开发人员无法知道,...new Callable&lt;Model&gt;(&#41; { @Override public Mod

    Java使用Callable和Future创建线程操作示例

    主要介绍了Java使用Callable和Future创建线程操作,结合实例形式分析了java使用Callable接口和Future类创建线程的相关操作技巧与注意事项,需要的朋友可以参考下

    android-fixedtimes-ScheduledExecutorService:具有指定执行时间的ScheduledExecutorService

    FixedTimesScheduledExecutorService 您可以在工作线程上运行任务(Runnable或Callable),以指定执行计划(如ScheduledExecutorService),并指定...&lt; V&gt; ScheduledFuture&lt; V&gt; schedule( Callable&lt; V&gt; callable, in

    Callable,Future的使用方式

    Callable,Future的使用方式,里面使用了三种使用方式分别是FutureTask,ExecutorService,ExecutorCompletionService

    java7源码-java8:java8函数式编程

    最重要的几个函数接口:(Predicate&lt;T&gt;,Consumer&lt;T&gt;,Function&lt;T&gt;,Supplier&lt;T&gt;,UnaryOperator&lt;T&gt;,BinaryOperator&lt;T&gt; 类型推断 流(Stream) 和Iterator类似,是使用内部迭代,经常和Lambda表达式配合使用,可自动并行...

    Andorid jar库源码Bolts原理解析

    Task.call(new Callable&lt;Boolean&gt;() { @Override public Boolean call() throws Exception { return true; } }, Task.UI_THREAD_EXECUTOR); Task.callInBackground(new Callable&lt;Boolean&gt;() { @Override public ...

    java并发包之Callable和Future

    java并发包之Callable和Future java并发包之Callable和Future java并发包之Callable和Future java并发包之Callable和Future java并发包之Callable和Future java并发包之Callable和Future

    Java中的Runnable,Callable,Future,FutureTask的比较

    主要介绍了Java中的Runnable,Callable,Future,FutureTask的比较的相关资料,需要的朋友可以参考下

    Callable和Future.doc

    Callable和Future详解: Callable和Runnable有几点不同: (1)Callable规定的方法是call(),而Runnable规定的方法是run(); (2)call()方法可抛出异常,而run()方法是不能抛出异常的。 (3)Runnable不会返回结果,...

    线程池之Executor框架.docx

    Executor框架主要由3部分组成: 任务 。包括被执行任务需要实现的接口:Runnable接口或者Callable接口。 任务的执行 。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。...

    线程池ThreadPoolExecutor

    引子 线程的创建和销毁比较消耗资源,... Future submit(Callable task):执行任务,有返回值,一般又来执行Callable void shutdown() :关闭线程池 AbstractExecutorService:基本实现了ExecutorService的所有方法 Th

Global site tag (gtag.js) - Google Analytics