Netty之ChannelPromise源码走读

es里如何使用使用的?

发送响应之后,需要触发es内部的相关逻辑.

Netty4HttpChannel里的sendResponse.

@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
    //channel.
    channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
}

channel执行完writeAndFlush.

ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

es把自己的ActionListener和channel作为入参,需要channe,是因为promise是从channel构建的.

/**
 * Creates a {@link ChannelPromise} for the given {@link Channel} and adds a listener that invokes the given {@link ActionListener}
 * on its completion.
 *
 * @param listener lister to invoke
 * @param channel  channel
 * @return write promise
 */
public static ChannelPromise addPromise(ActionListener<Void> listener, Channel channel) {
    //newPromise
    ChannelPromise writePromise = channel.newPromise();
    //添加监听器
    writePromise.addListener(f -> {
        //成功
        if (f.isSuccess()) {
            listener.onResponse(null);
        } else {
            //失败
            final Throwable cause = f.cause();
            ExceptionsHelper.maybeDieOnAnotherThread(cause);
            if (cause instanceof Error) {
                listener.onFailure(new Exception(cause));
            } else {
                listener.onFailure((Exception) cause);
            }
        }
    });
    //返回promise,传给netty.
    return writePromise;
}

然后netty处理完之后会setSuccess,就会回调es的actionListener.

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

最终会执行notifyListener0.因为l里

@SuppressWarnings({ "unchecked", "rawtypes" })
static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
}

es封装好的需要回调的方法.

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

上面是一个简单的调用链路,下面我们来分析下netty的ChannelPromise的设计.

ChannelPromise

 extends ChannelFuture, Promise<Void> 
image.png

Promise

这个是netty的扩展了jdk的Future.因为jdk的Future太简陋了,需要添加一些监听,等待的方法.

 extends Future<V>
image.png

ChannelFuture

The result of an asynchronous Channel I/O operation.(异步Channel的IO操作的结果).

All I/O operations in Netty are asynchronous. It means any I/O calls will return immediately with no guarantee that the requested I/O operation has been completed at the end of the call. Instead, you will be returned with a ChannelFuture instance which gives you the information about the result or status of the I/O operation.

netty里所有的io操作都是异步的,这就意味着,任何一个IO调用会立马返回,且不保证请求io的操作一点会在调用后完成,相反,调用方会接受到一个ChannelFuture实例,这个实例会给你一些关于这个io操作的结果或者状态的信息.

A ChannelFuture is either uncompleted or completed. When an I/O operation begins, a new future object is created. The new future is uncompleted initially - it is neither succeeded, failed, nor cancelled because the I/O operation is not finished yet. If the I/O operation is finished either successfully, with failure, or by cancellation, the future is marked as completed with more specific information, such as the cause of the failure. Please note that even failure and cancellation belong to the completed state.

一个ChannelFuture要么是未完成的要么是已完成的.当io操作开始的时候,一个新的future就被创建了.这个future刚开始是初始化成未完成的-既不是成功也不是失败,也不是被取消的,因为这个io操作目前还没有完成呢.如果这个io槽子成功了失败了或者被取消了,这个future会被标记成完成,还会携带一些特殊的信息,比如失败的原因.请注意失败和取消都是完成的状态.

实现DefaultPromise

先看一下抽象父类的AbstractFuture,只有两个方法,一个get,一个带超时的get

    @Override
    public V get() throws InterruptedException, ExecutionException {
        //子类实现
        await();
        //子类实现
        Throwable cause = cause();
        //子类实现
        if (cause == null) {
            //子类实现.
            return getNow();
        }
        throw new ExecutionException(cause);
    }

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
  //超时的await  
  if (await(timeout, unit)) {
        Throwable cause = cause();
        if (cause == null) {
            return getNow();
        }
        throw new ExecutionException(cause);
    }
    throw new TimeoutException();
}

DefaultPromise

核心变量

//
private static final int MAX_LISTENER_STACK_DEPTH = 8;
//
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS");
//
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class.getName() + ".UNCANCELLABLE");
//
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(new CancellationException());
//
private final EventExecutor executor;
//结果
private volatile Object result;
//监听者
private Object listeners;
//
private LateListeners lateListeners;
//
private short waiters;

cause实现

   @Override
    public Throwable cause() {
        Object result = this.result;
        if (result instanceof CauseHolder) {
            return ((CauseHolder) result).cause;
        }
        return null;
    }

await实现

   @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        synchronized (this) {
            //死循环,等待完成.
            while (!isDone()) {
                //
                checkDeadLock();
                incWaiters();
                try {
                    //Object的wait
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }

getNow实现

@Override
@SuppressWarnings("unchecked")
public V getNow() {
    //获取结果
    Object result = this.result;
    //如果失败了或者==SUCCESS那么返回null
    if (result instanceof CauseHolder || result == SUCCESS) {
        return null;
    }
    //直接返回结果
    return (V) result;
}

为什么result == SUCCESS会返回null?

因为setSuccess0里

if (result == null) {
    this.result = SUCCESS;
} else {
    this.result = result;
}

核心中的核心setSuccess方法

本质上就是,如果其他的业务完成了,调用了setSuccess方法,那么就调用回调方法.

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

非常简单的setSuccess0

private boolean setSuccess0(V result) {
    //已完成的不通知
    if (isDone()) {
        return false;
    }

    synchronized (this) {
        // Allow only once.
        if (isDone()) {
            return false;
        }
        if (result == null) {
            this.result = SUCCESS;
        } else {
            this.result = result;
        }
        //通知等待的线程.
        if (hasWaiters()) {
            notifyAll();
        }
    }
    return true;
}

然后就是通知监听者了

private void notifyListeners() {
    //这个方法不需要加锁
    // This method doesn't need synchronization because:
    // 1) This method is always called after synchronized (this) block.
    //    Hence any listener list modification happens-before this method.
    // 2) This method is called only when 'done' is true.  Once 'done'
    //    becomes true, the listener list is never modified - see add/removeListener()

    Object listeners = this.listeners;
    if (listeners == null) {
        return;
    }
        //
    EventExecutor executor = executor();
    //如果是线程组中的线程.
    if (executor.inEventLoop()) {
        //
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                //
                if (listeners instanceof DefaultFutureListeners) {
                    notifyListeners0(this, (DefaultFutureListeners) listeners);
                } else {
                    final GenericFutureListener<? extends Future<V>> l =
                            (GenericFutureListener<? extends Future<V>>) listeners;
                    notifyListener0(this, l);
                }
            } finally {
                this.listeners = null;
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            //返回
            return;
        }
    }
        //如果是外部线程调用.
    if (listeners instanceof DefaultFutureListeners) {
        //
        final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
        //executor执行
        execute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListeners0(DefaultPromise.this, dfl);
                DefaultPromise.this.listeners = null;
            }
        });
    } else {
        final GenericFutureListener<? extends Future<V>> l =
                (GenericFutureListener<? extends Future<V>>) listeners;
        //
        execute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListener0(DefaultPromise.this, l);
                DefaultPromise.this.listeners = null;
            }
        });
    }
}

notifyListeners0

单个和多个的区别.

static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
}
private static void notifyListeners0(Future<?> future, DefaultFutureListeners listeners) {
    final GenericFutureListener<?>[] a = listeners.listeners();
    final int size = listeners.size();
    for (int i = 0; i < size; i ++) {
        notifyListener0(future, a[i]);
    }
}

总结

总体来说这个类的逻辑还是很简单的,但是这个类的设计还是蛮顶的.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339

推荐阅读更多精彩内容