手写RPC框架(3)-引入Hessian序列化工具

手写RPC框架
1、手写一个RPC框架,看看100个线程同时调用效果如何
2、手写RPC框架(2)-引入zookeeper做服务治理

本次进行第三个版本的迭代,支持了自定义序列化工具,在代码中默认实现了Java内置的序列化方式以及Hessian序列化方式,并对比其两者的效果,以及泛型参数传递的小优化点、空指针的处理等。最后聊一下InputStream.read 读取数据的问题以及整个手写框架的进度情况

image

这张图在手写RPC框架(2)-引入zookeeper做服务治理已经贴出了过,而我们这次的重点也就是在这里的1-4个点上。

  • 1、客户端代理对象产生的RpcRequest序列化成byte,发送给服务端
  • 2、服务端收到byte数据,反序列化成RpcRequest对象
  • 3、服务端生成数据后,拼接成RpcResponse,序列化成byte,发送给客户端
  • 4、客户端收到byte数据

在使用类似于Hessian序列化工具时,需要先引入该jar包

<!--hessian-->
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>4.0.38</version>
</dependency>

RPC 实践 V3版本

image

圈住的代码就是本次的重点序列化和反序列化

MessageProtocol 消息协议

/**
 * 请求、应答 解析和反解析,包含了序列化以及反序列化操作
 *
 * @author jwfy
 */
public interface MessageProtocol {

    /**
     * 服务端解析从网络传输的数据,转变成request
     * @param inputStream
     * @return
     */
    RpcRequest serviceToRequest(InputStream inputStream);

    /**
     * 服务端把计算结果包装好,通过outputStream 返回给客户端
     * @param response
     * @param outputStream
     * @param <T>
     */
     <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream);

    /**
     * 客户端把请求拼接好,通过outputStream发送到服务端
     * @param request
     * @param outputStream
     */
     void clientToRequest(RpcRequest request, OutputStream outputStream);

    /**
     * 客户端接收到服务端响应的结果,转变成response
     * @param inputStream
     */
    <T> RpcResponse<T>  clientGetResponse(InputStream inputStream);
}

主要是修改了serviceToRequest以及clientGetResponse接口,从参数挪到返回值,使其更加容易理解

MessageProtocol 消息协议实现类

public class DefaultMessageProtocol implements MessageProtocol {

    private SerializeProtocol serializeProtocol;

    public DefaultMessageProtocol() {
        // 默认是采用了Hessian协议
        this.serializeProtocol = new HessianSerialize();
    }

    public void setSerializeProtocol(SerializeProtocol serializeProtocol) {
        // 可通过set方法替换序列化协议
        this.serializeProtocol = serializeProtocol;
    }

    @Override
    public RpcRequest serviceToRequest(InputStream inputStream) {
        try {
            // 2、bytes -> request 反序列化
            byte[] bytes = readBytes(inputStream);
            // System.out.println("[2]服务端反序列化出obj:[" + new String(bytes) + "], length:" + bytes.length);
            System.out.println("[2]服务端反序列化出obj length:" + bytes.length);
            RpcRequest request = serializeProtocol.deserialize(RpcRequest.class, bytes);
            return request;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream) {
        try {
            // 3、把response 序列化成bytes 传给客户端
            byte[] bytes = serializeProtocol.serialize(RpcResponse.class, response);
            // System.out.println("[3]服务端序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
            System.out.println("[3]服务端序列化出bytes length:" + bytes.length);
            outputStream.write(bytes);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void clientToRequest(RpcRequest request, OutputStream outputStream) {
        try {
            // 1、先把这个request -> bytes 序列化掉
            byte[] bytes = serializeProtocol.serialize(RpcRequest.class, request);
            // System.out.println("[1]客户端序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
            System.out.println("[1]客户端序列化出bytes length:" + bytes.length);
            outputStream.write(bytes);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public <T> RpcResponse<T>  clientGetResponse(InputStream inputStream) {
        try {
            // 4、bytes 反序列化成response
            byte[] bytes = readBytes(inputStream);
            // System.out.println("[4]客户端反序列化出bytes:[" + new String(bytes) + "], length:" + bytes.length);
            System.out.println("[4]客户端反序列化出bytes length:" + bytes.length);
            RpcResponse response = serializeProtocol.deserialize(RpcResponse.class, bytes);
            return response;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private byte[] readBytes(InputStream inputStream) throws IOException {
        if (inputStream == null) {
            throw new RuntimeException("input为空");
        }
        // return fun1(inputStream);
        return fun2(inputStream);
        // return fun3(inputStream);
    }

    private byte[] fun1(InputStream inputStream) throws IOException {
        // 有个前提是数据最大是1024,并没有迭代读取数据
        byte[] bytes = new byte[1024];
        int count = inputStream.read(bytes, 0, 1024);
        return Arrays.copyOf(bytes, count);
    }

    private byte[] fun2(InputStream inputStream) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        int bufesize = 1024;
        while (true) {
            byte[] data = new byte[bufesize];
            int count = inputStream.read(data,0,bufesize);
            byteArrayOutputStream.write(data, 0, count);
            if (count < bufesize) {
                break;
            }
        }
        return byteArrayOutputStream.toByteArray();
    }

    /**
     * 有问题的fun3,调用之后会阻塞在read,可通过jstack查看相关信息
     * @param inputStream
     * @return
     * @throws IOException
     */
    private byte[] fun3(InputStream inputStream) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        int bufesize = 1024;

        byte[] buff = new byte[bufesize];
        int rc = 0;
        while ((rc = inputStream.read(buff, 0, bufesize)) > 0) {
            byteArrayOutputStream.write(buff, 0, rc);
            buff = new byte[bufesize];
        }
        byte[] bytes = byteArrayOutputStream.toByteArray();
        return bytes;
    }

}

四个方法也是依次针对上述四个步骤的流程操作,其中包含了对byte数据(内容和长度)的输出观察,在此就不细说了。重点关注下fun1、fun2、fun3 三个方法的细节操作,其中fun1和fun2都测试是没有问题的唯独第三个使用的inputStream.read会时常出现阻塞的情况,如下图观察线程运行情况。

image

目前还没有深入的了解InputStream.read方法的原理细节,本人也需要仔细学习和了解,后面将会作为一个单独的学习笔记进行补充说明

序列化&反序列化接口

public interface SerializeProtocol {
    /**
     * 序列化
     */
    <T> byte[] serialize(Class<T> clazz, T t);

    /**
     * 反序列化
     */
     <T> T deserialize(Class<T> clazz, byte[] bytes);
}

这个就没什么可说的,一个非常简单的序列化和反序列化接口,序列化返回byte数据,反序列化根据泛型返回对应实体对象

Hessian序列化&反序列化实现

public class HessianSerialize implements SerializeProtocol {

    @Override
    public <T> byte[] serialize(Class<T> clazz, T t) {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Hessian2Output hessian2Output = new Hessian2Output(outputStream);
        try {
            hessian2Output.writeObject(t);
            // NOTICE 验证过,一定需要在flush刷新之前关闭hessian2Output,否则无法有效获取字节数据
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        } finally {
            try {
                hessian2Output.close();
            } catch (IOException e){
                e.printStackTrace();
            }
        }
        try {
            outputStream.flush();
            byte[] bytes = outputStream.toByteArray();
            return bytes;
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        } finally {
            try {
                outputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
        Hessian2Input hessian2Input = new Hessian2Input(inputStream);
        try {
            T t = (T) hessian2Input.readObject();
            return t;
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        } finally {
            try {
                hessian2Input.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

关于Hessian的使用细节,自行查询官网http://hessian.caucho.com/
这里并未对抛出的异常做很好的处理,只是简单的抛出、日志输出而已
关于Java内置的序列化方式就不贴出来了,代码较为简单,只需要按照类似的套路自行编写即可。

实践

样例按照手写RPC框架(2)-引入zookeeper做服务治理所说的保持一致。

Hessian序列化工具

image

image

Java内置序列化工具

image

image

对比这两者结果很明显

  • Hessian:请求数据 204 字节,响应数据 75 字节
  • Java内置:请求数据 430 字节,响应数据 280 字节

单就这一个简单试验而言,Hessian的效率就比Java内置的高出100%还要多,可见在RPC框架中一个优秀的序列化框架多么重要,毕竟数据的大小即影响网络传输的速率,也影响序列号和反序列化的执行性能

总结思考

本次更新并没有更新太多内容,只是在v2版本上替换了之前的Java内置的序列化工具,而且本文也只是介绍了Hessian序列化工具,其实Google的ProtoBuffer也是一个不错的选择,甚至FastJson也可作为序列化工具使用。

文中还遗留了一个问题:InputStream.read 何时会被阻塞?,暂时未对其细节原理有更多的认识,接下来会出一篇附加的学习笔记好好学习总结一下其原因。

整个RPC学习笔记不出意外的话,应该还剩下3篇,一篇引入Netty,替换当前的BIO模型;一篇引入日志,并完善整个的代码的一些异常点;最后一篇结合Spring,使其成为一个项目中真正可用的Simple-RPC框架。此外关于SPI、快速失败、监控、等由于本人能力&精力问题看时间更新。

本学习笔记主要目的是学习和了解RPC框架,并及时进行总结和反思

如代码存在的问题欢迎提出~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,830评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,992评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,875评论 0 331
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,837评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,734评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,091评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,550评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,217评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,368评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,298评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,350评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,027评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,623评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,706评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,940评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,349评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,936评论 2 341

推荐阅读更多精彩内容

  • 什么是序列化与反序列化 序列化是指把对象转换为字节序列的过程(Encoding an object as a by...
    小X感悟阅读 879评论 0 4
  • 概念 Java中号称一切皆是对象,在Java程序运行过程中,都是借助对象来完成一系列我们想要的操作。但是对象...
    still_loving阅读 2,119评论 0 1
  • 在序列化技术中,除了java提供的序列化,还有很多其他的序列化技术。对于java而已,java序列化是由java语...
    先生zeng阅读 588评论 0 0
  • 1. RPC 1.1 简介 RPC 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损...
    wy_sure阅读 6,966评论 0 1
  • 常见的序列化框架 xml序列化 在java发展早期开始,为了统一接口,xml协议横空出世,良好的可读性,自由度极高...
    逐梦々少年阅读 19,327评论 2 11