使用netty手写一个简单的RPC框架

        RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

        RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。

        众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它们可以取代 Java 默认的序列化,从而提供更高效的性能。 

        下面是简单实现的基于netty的RPC调用。

一、首先定义消息传递的实体类

public class ClassInfo  implements  Serializable{

        private static final long serialVersionUID = -8970942815543515064L; 

        private String className;//类名     

        private String methodName;//函数名称      

        private Class[] types;//参数类型        

        private Object[] objects;//参数列表        

        public String getClassName() { 

                return className; 

        } 

        public void setClassName(String className) { 

                this.className = className; 

        } 

        public String getMethodName() { 

                return methodName; 

        } 

        public void setMethodName(String methodName) { 

                this.methodName = methodName; 

         } 

         public Class[] getTypes() { 

                return types; 

          } 

         public void setTypes(Class[] types) { 

               this.types = types; 

        } 

        public Object[] getObjects() { 

                return objects; 

        } 

        public void setObjects(Object[] objects) { 

             this.objects = objects; 

        } 

}

二、创建Netty操作的服务端,以及具体操作 

1. 服务端

public class RPCServer {

         private int port; 

         public RPCServer(int port){ 

                 this.port = port; 

         } 

         public void start(){ 

                  EventLoopGroup bossGroup = new NioEventLoopGroup();

                  EventLoopGroup workerGroup = new NioEventLoopGroup();

                 try { 

                     ServerBootstrap serverBootstrap = new ServerBootstrap().

                    group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .localAddress(port).

                    childHandler(new ChannelInitializer() {

                            @Override

                            protected void initChannel(SocketChannel ch) throws Exception { 

                                ChannelPipeline pipeline = ch.pipeline();   

                                pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));   

                                pipeline.addLast(new LengthFieldPrepender(4));   

                                pipeline.addLast("encoder", new ObjectEncoder());     

                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));   

                                pipeline.addLast(new InvokerHandler()); 

                        } 

                    }).option(ChannelOption.SO_BACKLOG, 128)     

                    .childOption(ChannelOption.SO_KEEPALIVE, true); 

                    ChannelFuture future = serverBootstrap.bind(port).sync();     

                     System.out.println("Server start listen at " + port );   

                    future.channel().closeFuture().sync();   

        } catch (Exception e) { 

            bossGroup.shutdownGracefully();   

            workerGroup.shutdownGracefully(); 

        } 

    } 

    public static void main(String[] args) throws Exception {   

        int port;   

        if (args.length > 0) {   

            port = Integer.parseInt(args[0]);   

        } else {   

            port = 8899;   

        }   

        new RPCServer(port).start();   

    }   

}

2、服务端操作

        由服务端我们看到具体的数据传输操作是进行序列化的,具体的操作还是比较简单的,就是获取发送过来的信息,这样就可以通过反射获得类名,根据函数名和参数值,执行具体的操作,将执行结果发送给客户端。

public class InvokerHandler extends ChannelInboundHandlerAdapter {

     public static ConcurrentHashMapclassMap = new ConcurrentHashMap();

     @Override   

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {   

        ClassInfo classInfo = (ClassInfo)msg; 

        Object claszz = null; 

        if(!classMap.containsKey(classInfo.getClassName())){ 

            try { 

                claszz = Class.forName(classInfo.getClassName()).newInstance(); 

                classMap.put(classInfo.getClassName(), claszz); 

            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { 

                e.printStackTrace(); 

            } 

        }else { 

            claszz = classMap.get(classInfo.getClassName()); 

        } 

        Method method = claszz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());   

        Object result = method.invoke(claszz, classInfo.getObjects()); 

        ctx.write(result); 

        ctx.flush();   

        ctx.close(); 

    } 

    @Override   

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {   

        cause.printStackTrace();   

        ctx.close();   

    }   

}

三、客户端,通过代理机制来触发远程调用

1、客户端

        当执行具体的函数时会调用远程操作,将具体操作的类、函数及参数信息发送到服务端

public class RPCProxy{ 

     @SuppressWarnings("unchecked")

     public staticT create(final Object target){

             return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),target.getClass().getInterfaces(), 

                    new InvocationHandler(){ 

                         @Override

                         public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                             ClassInfo classInfo = new ClassInfo(); 

                             classInfo.setClassName(target.getClass().getName());

                             classInfo.setMethodName(method.getName()); 

                             classInfo.setObjects(args); 

                             classInfo.setTypes(method.getParameterTypes());

                             final ResultHandler resultHandler = new ResultHandler();

                             EventLoopGroup group = new NioEventLoopGroup(); 

                             try {

                                     Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class)                                     .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() {

                                             @Override   

                                              public void initChannel(SocketChannel ch) throws Exception {   

                                            ChannelPipeline pipeline = ch.pipeline();   

                                            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));   

                                            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));   

                                            pipeline.addLast("encoder", new ObjectEncoder());     

                                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));   

                                            pipeline.addLast("handler",resultHandler); 

                               }   

                    });   

                    ChannelFuture future = b.connect("localhost", 8899).sync();   

                    future.channel().writeAndFlush(classInfo).sync(); 

                    future.channel().closeFuture().sync();   

                } finally {   

                    group.shutdownGracefully();   

                } 

                return resultHandler.getResponse(); 

            } 

        }); 

    } 

}

2、获取远程调用返回的结果值

public  class  ResultHandler  extends  ChannelInboundHandlerAdapter{

        private  Object response;

        public   Object  getResponse() {

             return  response;    

        }

        @Override

        public  void  channelRead(ChannelHandlerContext ctx, Object msg)throwsException {           

                 response=msg;            

                System.out.println("client接收到服务器返回的消息:"+ msg);       

         }

        @Override

        public  void  exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {  

              System.out.println("client exception is general");       

        }    

}

四、 接口、实现类及Main操作

public interface HelloRpc { 

     String hello(String name);

 }

public  class  HelloRpcImpl  implements  HelloRpc{

        @Override

        public  String  hello(String name) {

            return"hello "+name;     

         }  

}

public class NettyRpcMain {

    public static void main(String[] args){

        HelloRpc helloRpc = new HelloRpcImpl();

        HelloRpc echo = RPCProxy.create(helloRpc);

        System.out.println(echo.hello("这是我的第一个手写rpc!"));

    }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容

  • RPC框架远程调用的实现方式在原理上是比较简单的,即将调用的方法(接口名、方法名、参数类型、参数)序列化之后发送到...
    谜碌小孩阅读 3,080评论 0 13
  • netty常用API学习 netty简介 Netty是基于Java NIO的网络应用框架. Netty是一个NIO...
    花丶小伟阅读 5,984评论 0 20
  • 基于netty的rpc框架 [TOC] 如果你已经对以下东东有所了解,那么你就可以完成一个rpc框架了 Java的...
    帅_zs阅读 3,887评论 0 5
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,579评论 18 139
  • 沃顿商学院的教授Adam Grant在一项研究中把人分为三类: “宁教我负天下人,休教天下人负我”的索取者;“投我...
    海怪多克特阅读 564评论 6 7