Netty的future继承concurrent包里面Future,增加了对监听者的增删,同步等操作。promise继承Future的接口,增加了对结果的设置。
源码分析:
在channelHandler当中为什么调用await会导致死锁
因为在哪个里面调用await都是io线程等待,而本身唤醒工作就是使用io线程操作的,这样io线程怎么能自己唤醒自己。不过netty在内部做了个死锁的检测,当我们调用await,sync这些阻塞方法时候,会检测是否是io线程自己调用了这些阻塞方法,如果是则会抛出异常
Future和promise如何实现sync
整个原理就是检测promise里面的一个Object类型的result是否有结果,Io线程完成注册后会设置该属性,我们主线程会检测到结果变化,如果没有得到结果,进调用wait方法无限等待,在调用wait方法适合还增加一个计数,即太多的地方同时调用sync 会抛出异常,当我们的操作执行成功后会分别唤醒等待着并回调监听者。sync 可中断
调用父类的sync
@Override
public ChannelPromise sync() throws InterruptedException {
super.sync();
return this;
}
public Promise<V> sync() throws InterruptedException {
调用await方法
await();
如果有异常就抛出
rethrowIfFailed();
return this;
}
public Promise<V> await() throws InterruptedException {
首先检测promise中的result是否已经完成且是非不可取消状态
if (isDone()) {
return this;
}
如果被中断了直接抛出中断异常
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
检测死锁,即不能在io线程中调用sync wait等方法
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
无限等待
wait();
} finally {
decWaiters();
}
}
}
return this;
}
执行过程中有异常的话重新抛出
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
Future和promise如何实现回调
trySuccess 和setSuccess方法的区别,前者如果发现 设置失败返回false 后者抛出异常
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
添加一个今天者
synchronized (this) {
addListener0(listener);
}
如果这个future 已经完成了 唤醒其所有的监听者
if (isDone()) {
notifyListeners();
}
return this;
}
private void notifyListeners() {
EventExecutor executor = executor();
唤醒必须是io线程操作
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
获取该方法执行的嵌套层数
final int stackDepth = threadLocals.futureListenerStackDepth();
大于的就不执行了
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
立即执行唤起
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
如果不是当前io线程,则包装成一个任务 等待io线程执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
如果没有监听者或者正在唤醒 直接返回
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
循环去唤醒监听者
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
唤醒的时候 是直接调用监听者operationComplete方法 并把future作为参数传递
future内部有执行结果,这个operationComplete可以是失败,成功,等一一系列的动作 执行完成
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
尝试去设置成功
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
设置ok 直接唤醒
notifyListeners();
return true;
}
return false;
}
private boolean setSuccess0(V result) {
如果result==null 设置success对象标识
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
先cas 设置objResult 如果失败了 则由可能改结果表标识了UNCANCELLABLE 再去设置下
比如我们bind的时候会先尝试设置UNCANCELLABLE,这样别人想尝试设置RESULT_UPDATER.compareAndSet(this, null, objResult)都会失败,而RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)一般
都是最终设置结果的时候
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
如果有wait对象 ,尝试notifyAll
checkNotifyWaiters();
return true;
}
return false;
}