动手写RPC起步

前一篇文章简单介绍了下RPC的基本原理,同时附上了一个小的demo,但是这个小的demo并不能在生产上使用,因为生产上的RPC还需要考虑很多因素,比如接入注册中心、高性能的网络通信、高性能的序列化和反序列化、自动路由、容错处理等等。要实现生产上使用的先不谈,我们先来实现一个稍微复杂的RPC,借助这个RPC例子来更深刻的理解RPC原理,为后续Dubbo源码的分析做准备。

一、简单RPC架构设计

回顾下RPC原理图:



如果要自己设计实现稍微简单的一个rpc框架,应该需要考虑注册中心、网络通信、序列化等内容,因为可以设计出如下键略的RPC架构图:


二、项目起步说明

1、本RCP项目涉及的功能点

动态代理、反射、序列化、反序列化、网络通信、编解码、服务发现和注册、心跳与链路检测等

2、本小节内容说明

设计的技术点:动态代理技术、反射(有关动态代理可以看我的另一篇博文:代理模式
实现的功能:简易的基于socket的rpc

3、本小节项目总体框架图

三、编码实现

1、api模块

创建用于测试的接口

public interface HelloService {
    String sayHello(String name);
}
2、common模块

request请求实体类

public class Request implements Serializable {
    private static final long serialVersionUID = 7929047349488932740L;
    /**
     * 请求表示id
     */
    private String requestId;
    /**
     * 请求服务类型
     */
    private String className;
    /**
     * 请求方法名称
     */
    private String methodName;
    /**
     * 请求方法参数类型数组
     */
    private Class<?>[] parameterTypes;
    /**
     * 请求参数列表
     */
    private Object[] args;
  ......省略getter/setter
}

response响应实体类:

public class Response {
    private static final long serialVersionUID = -1023480952777229650L;

    private String requestId;
    /**
     * 响应状态吗
     */
    private int code;
    /**
     * 响应消息说明
     */
    private String msg;
    /**
     * 相应数据
     */
    private Object data;
......省略getter/setter
3、provider模块

服务的暴露(包好服务的注册和服务的发布),服务端基本流程是

服务注册->服务发布->服务启动监听请求(socket)->处理请求

//rpc代理服务,用于暴露服务
public class RpcProxyServer {
    /**
     * 创建一个线程池
     */
    ExecutorService executorService = Executors.newCachedThreadPool();
    /**
     * 端口号
     */
    private int port;

    /**
     * 1、服务注册
     * @param serviceInterface
     * @param impClass
     * @return
     */
    public RpcProxyServer register(Class serviceInterface, Class impClass) {
        //注册服务(接口名:实现类名)
        ProcessorHandler.register(serviceInterface, impClass);
        return this;
    }

    public RpcProxyServer(int port) {
        this.port = port;
    }

    /**
     *2、 启动发布(启动)
     */
    public void start() {
        System.out.println("服务启动====");
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);

            while (true) {//3、通过循环不断接受请求
                Socket socket = serverSocket.accept();//监听客户端的请求
                //4、每一个socket交给一个processorhandler处理,这里的target就是真正的业务类
                executorService.execute(new ProcessorHandler(socket));//处理客户端的请求
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

具体处理请求的handler

//服务端接受请求处理线程
public class ProcessorHandler implements Runnable {

    private static final HashMap<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>();
    /**
     * socket
     */
    private Socket socket;

    public static void register(Class serviceInterface, Class impClass) {
        //注册服务(接口名:实现类名)
        serviceRegistry.put(serviceInterface.getName(), impClass);
    }
    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        //用于定义输入流和输出流
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;

        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());
            //从socket中读取请求流对象
            Request rpcRequest = (Request) objectInputStream.readObject();
            //调用正真的处理方法
            Object result = invoke(rpcRequest);
            Response response = new Response();
            response.setRequestId(rpcRequest.getRequestId());
            response.setData(result);
            response.setMsg(ResponseCodeEnum.SUCCESS.getMsg());
            response.setCode(ResponseCodeEnum.SUCCESS.getCode());
            //将结果通过socket输出
            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeOpenSource(objectInputStream, objectOutputStream);
        }
    }

    private void closeOpenSource(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream) {
        if (objectInputStream != null) {
            try {
                objectInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (objectOutputStream != null) {
            try {
                objectOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 利用反射技术执行正真的方法(这里只是简单的实现,没有容错处理)
     *
     * @param rpcRequest
     * @return
     */
    private Object invoke(Request rpcRequest) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
        //获取目标对象并执行目标方法(也就是获取注册后的接口实现类对象)
        Class<?> targetClass = serviceRegistry.get(rpcRequest.getClassName());
        Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
        Method method = targetClass.getMethod(rpcRequest.getMethodName(), parameterTypes);
        Object[] args = rpcRequest.getArgs();
        return method.invoke(targetClass.newInstance(), args);
    }
}

用到的枚举

public enum ResponseCodeEnum {
    SUCCESS(0, "success"),
    FAIL(1, "fail");
....省略

rpc接口实现类:

public class HelloServiceImpl implements HelloService {
    public String sayHello(String name) {
        return "hello " + name;
    }
}

启动服务main方法:

public class Demo1Main {
    public static void main(String[] args) {
        //创建代理服务
        RpcProxyServer rpcProxyServer = new RpcProxyServer(8888);
        //注册服务
        rpcProxyServer.register(HelloService.class, HelloServiceImpl.class);
        //启动服务
        rpcProxyServer.start();
    }
}
4、consumer模块

消费端模块主要是通过jdk动态代理的方式实现rpc接口代理请求远程,基本流程

client->创建代理对象->通过代理对象请求远程服务->接受返回的信息

public class ClientProxy<T> {
    /**
     * 服务端代理接口
     */
    private Class<T> serverInstance;

    /**
     * 服务端地址
     */
    private InetSocketAddress address;

    public ClientProxy(Class<T> serverInstance, String ip, Integer port) {
        this.address = new InetSocketAddress(ip, port);
        this.serverInstance = serverInstance;
    }

    /**
     * 获取客户端代理对象
     *
     * @return
     */
    public T getClientInstance() {
        return (T) Proxy.newProxyInstance(serverInstance.getClassLoader(), new Class<?>[]{serverInstance}, new RemoteInvocationHandler(address));
    }
}

具体远程调用invoke方法(jdk动态代理InvocationHandler)

public class RemoteInvocationHandler implements InvocationHandler {

    /**
     * 服务端地址
     */
    private InetSocketAddress address;

    public RemoteInvocationHandler(InetSocketAddress address) {
        this.address=address;
    }


    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request rpcRequest = new Request();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameterTypes(method.getParameterTypes());
        rpcRequest.setArgs(args);

        //通过网络发送正式请求
        RpcNetTransport netTransport = new RpcNetTransport(address.getPort(), address.getHostName());
        Object result = (Object) netTransport.send(rpcRequest);
        return result;//返回收到的结果
    }
}

具体的rpc网络请求(socket)

//网络传送
public class RpcNetTransport {

    private int port;
    private String host;

    public RpcNetTransport(int port, String host) {
        this.port = port;
        this.host = host;
    }

    /**
     * 发送请求
     *
     * @param request
     */
    public Object send(Request request) throws IOException, ClassNotFoundException {
        Socket socket = null;
        ObjectOutputStream outputStream = null;
        ObjectInputStream inputStream = null;
        try {
            // 1.创建Socket客户端,根据指定地址连接远程服务提供者
            socket = new Socket(host, port);
            //2、将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者
            outputStream=new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(request);
            //3、同步阻塞等待服务器返回应答,获取应答后返回
            inputStream = new ObjectInputStream(socket.getInputStream());
            return inputStream.readObject();
        } finally {
            if (socket != null) {
                socket.close();
            }
            if (outputStream != null) {
                outputStream.close();
            }
            if (inputStream != null) {
                inputStream.close();
            }
        }
    }

}

消费端消费服务main方法:

public class Demo1Main {
    public static void main(String[] args) {
        ClientProxy clientProxy = new ClientProxy(HelloService.class, "127.0.0.1", 8888);
        HelloService helloService = (HelloService) clientProxy.getClientInstance();
        String result = helloService.sayHello("嘿嘿嘿");
        System.out.println(result);
    }
}

三、总结与思考

总结:本节实现了一个非常简单的rpc原型项目,包含了服务注册、采用BIO的网络通信模型传送数据、采用jdk原生代理模式进行服务代理、采用jdk原生的序列化方式进行序列化和反序列化等。后续将会针对该原型项目不断的改进,不断的引入新的“武器”,来丰富整个rpc项目。
后期预热:引入注册中心(解决服务治理问题)、引入多种高效的序列化机制、引入NIO的网络通信模型、引入软负载均衡机制、引入spi扩展机制、接入spring等等,敬请期待。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • 本文将从大的框架层面来聊聊RPC原理和实现,既然叫跨语言RPC,也将以thrift为例讲讲跨语言RPC如何实现。在...
    彦帧阅读 14,306评论 0 19
  • 前言 在微服务当道的今天,分布式系统越来越重要,实现服务化首先就要考虑服务之间的通信问题。这里面涉及序列化、反序列...
    habit_learning阅读 2,355评论 1 26
  • 网络通信模块是分布式系统中最底层的模块,他直接支撑了上层分布式环境下复杂的进程间通信逻辑,是所有分布式系统的基础。...
    SmallBird_阅读 2,189评论 0 1
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,846评论 6 13
  • 我想有时候我一定是疯了,是被s逼疯了
    砸扁回忆阅读 132评论 0 0