上篇我们看了RMI的服务注册与客户端请求调用的原理与源码实现,大概了解了RMI是如何实现的简单rpc服
务,那么本篇我们就来仿写一个简易的RMI类的rpc请求调度框架模型
首先我们在编写代码之前,思考一下,一个简单的rpc需要实现什么,即如果实现rpc通信,我们如何调用对应的方法,并且如何实现两个服务之间的互通与互相调用操作。
客户端与服务端如何互通
java开发过程中常见的交互方式有两种,一种为轮询方式的交互,不停循环获取数据,直到获取或者退出为止,还有一种方式为推拉/发布订阅模式的数据交互,根据这两种方式目前也有很多种主流服务框架的实现,而这里我们选择使用sokect通讯,利用轮询操作实现数据交互
如何实现调用服务的方法
常见的rpc都是基于服务进行请求交互,我们也从RMI的源码中看到了,RMI将服务的class作为url,远程对象作为引用值存入map中,那么我们也可以将服务类以及方法等参数封装,传递到远程连接对象中,这样只要sokect进行交互的时候获取远程连接对象,其中持有的服务信息都可以获取到了
构建服务端
前面确定了思路以后,我们可以开始编码了,首先是服务端代码的编写,而服务端我们要实现两点,第一实现长连接,并且可以轮询获取请求,解析请求并获取当前服务端对应的实例将结果返回,第二我们需要能够动态获取到请求来的信息,所以我们可以考虑使用动态代理技术处理接口请求与实例之间的关系
那么我们开始编写代码,首先我们看下服务端代码的结构:
可以看到有对外发布的服务IHelloService,服务的实现类HelloServiceImpl,sokect请求处理器ProcessorHanlder以及真正启动服务的RPCServer类,其中IHelloService作为对外公开的服务,需要在服务端和客户端使用同一份,一般在企业开发的时候往往会作为依赖进行加载,这里我们并没有将服务单独拆分出去进行实现,而是选择使用同一份服务代码,来实现动态代理、序列化以及反序列化操作。而RpcRequest类用来将我们的请求相关参数以及服务相关信息进行封装,作为客户端与服务端传输的基类实现。
通用服务--IHelloService
我们首先定义一个对外的rpc通用服接口,代码如下:
/**
* 对外公开的服务
*/
public interface IHelloService {
String getResource(String type);
}
此服务中仅有一个getResource方法,根据传入的type内容的不同返回不同的内容,我们将此接口实现:
/**
* IHelloService服务实现
*/
public class HelloServiceImpl implements IHelloService {
@Override
public String getResource(String type) {
if(null == type || "".equals(type.trim())){
return "请不要传输空数据好吗?";
}
if("json".equals(type.trim())){
return "{className:HelloServiceName,methodName:getResource,args:json}";
}
return "找不到匹配的数据";
}
}
请求类构建--RpcRequest
客户端与服务端实现固定的交互传输,需要访问的远程服务接口类信息、请求的方法以及参数等封装进去,并且由于RpcRequest类作为远程服务请求的载体,所以同时也需要实现序列化操作,此时一个简单的rpc请求载体类--RpcRequest构建完成,如下:
/**
* 服务请求类
* @desc
*/
public class RpcRequest implements Serializable{
private static final long serialVersionUID = -601583969544463744L;
private String className;//远程服务接口类名
private String methodName;//需要调用的方法名
private Object[] args;//当前方法需要的参数
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
public RpcRequest(){}
public RpcRequest(String className, String methodName, Object[] args) {
this.className = className;
this.methodName = methodName;
this.args = args;
}
@Override
public String toString() {
return "RpcRequest [className=" + className + ", methodName=" + methodName + ", args=" + Arrays.toString(args)
+ "]";
}
}
请求处理器--ProcessorHanlder
当我们发起一个请求,将封装好的远程服务信息传递到服务端的时候,往往会按照固定的规则,将远程服务信息解析,并且处理构建获取远程服务实例,进行操作,这个时候,我们可以考虑将解析、构建、调用的过程封装成一个hanlder处理器,实现通用的服务解析与方法调用操作,如下:
/**
* 服务请求处理器
*/
public class ProcessorHanlder implements Runnable{
private Socket socket;
private Object service;//服务的impl
public ProcessorHanlder(final Socket socket,final Object service) {
this.socket = socket;
this.service = service;
}
public ProcessorHanlder() {
}
public Socket getSocket() {
return socket;
}
public void setSocket(Socket socket) {
this.socket = socket;
}
public Object getService() {
return service;
}
public void setService(Object service) {
this.service = service;
}
@Override
public void run() {
try(ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())){
RpcRequest request = (RpcRequest) objectInputStream.readObject();
//反射触发请求服务返回结果
Object invoke = invoke(request);
//根据输出流返回给客户端
outputStream.writeObject(invoke);
outputStream.flush();
}catch (Exception e) {
try {
socket.close();
} catch (Exception e2) {
System.err.println("==》service关闭socket失败:"+e2.getMessage());
}
}
}
/**
* 反射调用服务接口
* @param request
* @return
* @throws SecurityException
* @throws NoSuchMethodException
* @throws InvocationTargetException
* @throws IllegalArgumentException
* @throws IllegalAccessException
*/
private Object invoke(RpcRequest request) throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException{
Object[] args = request.getArgs();//获取请求参数
String methodName = request.getMethodName();//获取请求方法名
//创建参数类型数组
Class<?>[] argClasses = new Class<?>[args.length];
for (int i = 0; i < args.length; i++) {
Class<? extends Object> type = args[i].getClass();
argClasses[i] = type;
}
Method method = service.getClass().getMethod(methodName, argClasses);
//反射调用方法
return method.invoke(service, args);
}
}
启动服务
当一切准备就绪后,我们需要将服务端启动,发布出去,使其客户端可以访问,这里由于存在并发访问场景,我们考虑加入了线程池机制,使用线程池操作每个线程请求:
/**
* rpc服务类
*/
public class RPCServer {
private static final ExecutorService SERVICE = Executors.newCachedThreadPool();//创建无固定容量的缓存池
public void listener(final Object service,int port){
try(ServerSocket serverSocket = new ServerSocket(port);){
for(;;){
Socket accept = serverSocket.accept();
//挂起线程池来处理
SERVICE.execute(new ProcessorHanlder(accept, service));
}
} catch (Exception e) {
System.err.println("==>启动rpc服务失败:"+e.getMessage());
}
}
}
现在我们来将服务端启动,将其发布在8080端口上:
public static void main(String[] args) {
RPCServer rpcServer = new RPCServer();
IHelloService helloService = new HelloServiceImpl();
rpcServer.listener(helloService,8080);
}
客户端请求
客户端负责连接服务端,将请求封装传输到服务端,而每次请求的具体参数和服务都是动态的,所以我们使用动态代理机制,将客户端的rpc操作封装,构建动态请求。首先我们看下客户端代码目录:
可以看到有动态代理rpc操作的RPCClientProxy类,封装了远程通讯连接与数据传输操作的TCPTransport类,还有将代理的请求进行处理的RemoteInvocationHandler类,除此之外客户端也必须包含远程服务接口IHelloService以及请求封装类RpcRequest(同一个工程中只需要一份,独立工程建议作为依赖引入),接下来我们来看看客户端的请求操作
动态请求操作
在java中可以使用动态代理机制,实现某一操作的动态处理,而实现过程则是需要将真正操作的类实现InvocationHandler接口,在invoke方法中拿到代理对象进行操作即可,而代理对象类中需要提供构建Proxy.newProxyInstance方法创建的代理对象的方法,其参数固定为ClassLoader类加载器、需要实现的Class<?>[]接口类型以及需要实现动态代理的InvocationHandler接口子类实例,接下来我们来根据动态代理机制构建一个动态请求操作:
/**
* 代理请求操作--处理器
*/
public class RemoteInvocationHandler implements InvocationHandler {
private String host;//服务端所在地址
private int port;//服务所在端口
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//组装请求参数
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());//获取方法所在类的name
request.setMethodName(method.getName());
request.setArgs(args);
//创建TCP传输类 ==>并且返回当前返回的数据
TCPTransport tcpTransport = new TCPTransport(host, port);
return tcpTransport.handlerMsg(request);
}
}
接着,我们实现具体动态代理类实例的构建:
/**
* 动态代理rpc操作类
*/
public class RPCClientProxy {
@SuppressWarnings("unchecked")
public <T> T clientProxy(final Class<T> interfaceClass,final String host,final int port){
return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(host, port));
}
}
启动客户端
当我们完成客户端访问服务端的请求的动态代理操作后,这个时候我们可以尝试连接刚刚启动的IHelloService远程服务了:
public static void main(String[] args) {
RPCClientProxy clientProxy = new RPCClientProxy();
//开始接口调用
IHelloService helloService = clientProxy.clientProxy(IHelloService.class,"localhost",8080);
System.out.println("收到回复的消息:"+helloService.getResource("json"));
}
这个时候可以看到控制台输出的服务接口返回的内容,至此一个简易的仿rpc请求操作完成