Future> futureRed = (Future>)threadBaseService.getExecutor().submit(
() -> {
DataSourceThreadHolder.set(DataSourceType.ElasticSearch);
List redAllUserConsumer =consumerMetricsDao.getRedAllUserConsumer(startTime,endTime,1);
DataSourceThreadHolder.reset();
return redAllUserConsumer;
}
);
List consumerInfoVoList = futureRed.get();
以下内容为转载:
1. 什么是Future
Future是多线程开发中常见的一种设计模式。Future模式可以返回线程执行结果的契约,通过此契约程序可以选择在合适的时机取回执行的结果,如果取回结果时线程还没有执行完成,将会阻塞调用线程等待执行结果返回。
2. 为什么需要Future
在有些场景下,我们想使用另一个线程去执行复杂耗时的操作,此时又不想让主线程等待白白浪费CPU,此时可以让主线程先去做别的事,然后在合适的时机去通过Future契约取回线程执行的结果。
3. Java中的Future模式
Java中的Future模式主要由以上接口和类组成。
3.1 Callable & Runnable
这是我们普通的线程任务,其中Callable是带返回值(真实数据),Runnable是不带返回值的,因此在我们使用Runnable和Future时,必须传入一个Result对象,通过Future在获取结果时就是获取的该Result,核心代码如下:
publicFutureTask(Runnable runnable,Vresult){this.callable=Executors.callable(runnable,result);this.state=NEW;// ensure visibility of callable}publicstatic<T>Callable<T>callable(Runnable task,Tresult){if(task==null)thrownewNullPointerException();returnnewRunnableAdapter<T>(task,result);}staticfinalclassRunnableAdapter<T>implementsCallable<T>{final Runnable task;finalTresult;RunnableAdapter(Runnable task,Tresult){this.task=task;this.result=result;}publicTcall(){task.run();returnresult;}}
复制
Callable目前只能搭配线程池或者Future来使用,不能直接和new Thread()搭配使用,Runnable可以搭配线程池和new Thread()使用,在配合Future使用时本质上是对其进行了适配,也就是上述代码中的RunnableAdapter。
3.2 Future
publicinterfaceFuture<V>{booleancancel(boolean mayInterruptIfRunning);booleanisCancelled();booleanisDone();Vget()throws InterruptedException,ExecutionException;Vget(long timeout,TimeUnit unit)throws InterruptedException,ExecutionException,TimeoutException;}
复制
Future是线程的契约,通过其get()方法我们可以获取线程执行的结果,当然Future也提供了其他三个方法,分别是:
cancel:取消任务
isCancelled:任务是否已经取消
isDone:任务是否完成
3.3 RunnableFuture
publicinterfaceRunnableFuture<V>extendsRunnable,Future<V>{voidrun();}
复制
RunnableFuture接口继承自Runnable和Future,表明RunnableFuture可以被线程执行并且可以通过契约获取到线程的执行结果。
4. FutureTask
4.1 属性
// 执行任务privateCallable<V>callable;// 任务的实际执行结果privateObject outcome;// 执行任务的线程privatevolatile Thread runner;// 等待结果的线程栈privatevolatile WaitNode waiters;
复制
4.2 状态
privatevolatile int state;privatestaticfinal intNEW=0;privatestaticfinal intCOMPLETING=1;privatestaticfinal intNORMAL=2;privatestaticfinal intEXCEPTIONAL=3;privatestaticfinal intCANCELLED=4;privatestaticfinal intINTERRUPTING=5;privatestaticfinal intINTERRUPTED=6;
复制
FutureTask除了4.1中的属性外,还有一个重要的属性就是state,FutureTask中的状态大约有7种:
NEW:任务的初始状态
COMPLETING:正在设置任务结果
NORMAL:任务执行完毕
EXCEPTIONAL:任务发行异常
CANCELLED:任务被取消
INTERRUPTING:正在中断任务
INTERRUPTED:任务被中断
4.3 run()方法
任务执行的时候实际就是执行run方法,源码如下:
publicvoidrun(){if(state!=NEW||!UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread()))return;try{Callable<V>c=callable;if(c!=null&&state==NEW){Vresult;boolean ran;try{result=c.call();ran=true;}catch(Throwable ex){result=null;ran=false;setException(ex);}if(ran)set(result);}}finally{// runner must be non-null until state is settled to// prevent concurrent calls to run()runner=null;// state must be re-read after nulling runner to prevent// leaked interruptsint s=state;if(s>=INTERRUPTING)handlePossibleCancellationInterrupt(s);}}protectedvoidset(Vv){if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){outcome=v;UNSAFE.putOrderedInt(this,stateOffset,NORMAL);// final statefinishCompletion();}}privatevoidhandlePossibleCancellationInterrupt(int s){// It is possible for our interrupter to stall before getting a// chance to interrupt us. Let's spin-wait patiently.if(s==INTERRUPTING)while(state==INTERRUPTING)Thread.yield();// wait out pending interrupt}
复制
run方法的大致流程如下:
校验任务的状态是否是NEW和当前是否无执行线程,如果校验通过,则获取任务执行
调用任务的call方法
如果执行异常,设置结果,状态修改为EXCEPTIONAL,并将任务结果设置为异常
如果正常执行,调用set(V v)设置结果,状态修改为NORMAL,结果设置为执行结果,并且唤醒等待结果的线程
最后在finally块中,我们将runner属性置为null,并且检查有没有遗漏的中断,如果发现s >= INTERRUPTING, 说明执行任务的线程有可能被中断了,因为s >= INTERRUPTING 只有两种可能,state状态为INTERRUPTING和INTERRUPTED。
4.3 get()方法
当我们需要去获取FutureTask的结果时,我们需要调用get方法获取结果。
publicVget()throws InterruptedException,ExecutionException{int s=state;if(s<=COMPLETING)s=awaitDone(false,0L);returnreport(s);}@SuppressWarnings("unchecked")privateVreport(int s)throws ExecutionException{Object x=outcome;if(s==NORMAL)return(V)x;if(s>=CANCELLED)thrownewCancellationException();thrownewExecutionException((Throwable)x);}privateintawaitDone(boolean timed,long nanos)throws InterruptedException{final long deadline=timed?System.nanoTime()+nanos:0L;WaitNode q=null;boolean queued=false;for(;;){if(Thread.interrupted()){removeWaiter(q);thrownewInterruptedException();}int s=state;if(s>COMPLETING){if(q!=null)q.thread=null;returns;}elseif(s==COMPLETING)// cannot time out yetThread.yield();elseif(q==null)q=newWaitNode();elseif(!queued)queued=UNSAFE.compareAndSwapObject(this,waitersOffset,q.next=waiters,q);elseif(timed){nanos=deadline-System.nanoTime();if(nanos<=0L){removeWaiter(q);returnstate;}LockSupport.parkNanos(this,nanos);}elseLockSupport.park(this);}}
复制
获取结果的大致步骤如下:
检测任务状态是否是NEW或者COMPLETING,如果不是,说明已经执行成功或失败,返回结果
否则就阻塞等待,阻塞等待的步骤如下
检测当前线程是否被中断,如果是就将其从等待线程中移除
再次检测任务状态,如果是异常、中断或者执行完成状态,则直接返回结果。
如果任务是COMPLETING状态,说明任务已经执行完成正在设置结果,此时让获取结果的线程短暂让出CPU继续等待
如果等待结果的线程栈为null,说明还没有生成,则生成等待结果的线程栈
如果queued为false,说明等待结果的线程还没入栈,所以将其入栈
最后看是否是是超时等待,根据是否超时,选择将等待结果的线程永久挂起(等待唤醒)还是具有超时时间的挂起