基于Netty的高性能JAVA的RPC框架

RPC的实现

1. RPC客户端 

2. RPC服务端

RPC客户端的实现

RPC客户端和RPC服务器端需要一个相同的接口类,RPC客户端通过一个代理类来调用RPC服务器端的函数

RpcConsumerImpl的实现 ...... package com.alibaba.middleware.race.rpc.api.impl; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.middleware.race.rpc.aop.ConsumerHook; import com.alibaba.middleware.race.rpc.api.RpcConsumer; import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener; import com.alibaba.middleware.race.rpc.context.RpcContext; import com.alibaba.middleware.race.rpc.model.RpcRequest; import com.alibaba.middleware.race.rpc.model.RpcResponse; import com.alibaba.middleware.race.rpc.netty.RpcConnection; import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection; import com.alibaba.middleware.race.rpc.tool.Tool; public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler { private static AtomicLong callTimes = new AtomicLong(0L); private RpcConnection connection; private List<RpcConnection> connection_list; private Map<String,ResponseCallbackListener> asyncMethods; private Class<?> interfaceClass; private String version; private int timeout; private ConsumerHook hook; public Class<?> getInterfaceClass() { return interfaceClass; } public String getVersion() { return version; } public int getTimeout() { this.connection.setTimeOut(timeout); return timeout; } public ConsumerHook getHook() { return hook; } RpcConnection select() { //Random rd=new Random(System.currentTimeMillis()); int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1)); if(d==0) return connection; else { return connection_list.get(d-1); } } public RpcConsumerImpl() { //String ip=System.getProperty("SIP"); String ip="127.0.0.1"; this.asyncMethods=new HashMap<String,ResponseCallbackListener>(); this.connection=new RpcNettyConnection(ip,8888); this.connection.connect(); connection_list=new ArrayList<RpcConnection>(); int num=Runtime.getRuntime().availableProcessors()/3 -2; for (int i = 0; i < num; i++) { connection_list.add(new RpcNettyConnection(ip, 8888)); } for (RpcConnection conn:connection_list) { conn.connect(); } } public void destroy() throws Throwable { if (null != connection) { connection.close(); } } @SuppressWarnings("unchecked") public <T> T proxy(Class<T> interfaceClass) throws Throwable { if (!interfaceClass.isInterface()) { throw new IllegalArgumentException(interfaceClass.getName() + " is not an interface"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, this); } @Override public RpcConsumer interfaceClass(Class<?> interfaceClass) { // TODO Auto-generated method stub this.interfaceClass=interfaceClass; return this; } @Override public RpcConsumer version(String version) { // TODO Auto-generated method stub this.version=version; return this; } @Override public RpcConsumer clientTimeout(int clientTimeout) { // TODO Auto-generated method stub this.timeout=clientTimeout; return this; } @Override public RpcConsumer hook(ConsumerHook hook) { // TODO Auto-generated method stub this.hook=hook; return this; } @Override public Object instance() { // TODO Auto-generated method stub try { return proxy(this.interfaceClass); } catch (Throwable e) { e.printStackTrace(); } return null; } @Override public void asynCall(String methodName) { // TODO Auto-generated method stub asynCall(methodName, null); } @Override public <T extends ResponseCallbackListener> void asynCall( String methodName, T callbackListener) { this.asyncMethods.put(methodName, callbackListener); this.connection.setAsyncMethod(asyncMethods); for (RpcConnection conn:connection_list) { conn.setAsyncMethod(asyncMethods); } } @Override public void cancelAsyn(String methodName) { // TODO Auto-generated method stub this.asyncMethods.remove(methodName); this.connection.setAsyncMethod(asyncMethods); for (RpcConnection conn:connection_list) { conn.setAsyncMethod(asyncMethods); } } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TODO Auto-generated method stub List<String> parameterTypes = new LinkedList<String>(); for (Class<?> parameterType : method.getParameterTypes()) { parameterTypes.add(parameterType.getName()); } RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); if(hook!=null) hook.before(request); RpcResponse response = null; try { request.setContext(RpcContext.props); response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName())); if(hook!=null) hook.after(request); if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null) { Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz()); throw e.getCause(); } } catch (Throwable t) { //t.printStackTrace(); //throw new RuntimeException(t); throw t; } finally { // if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null) // { // cancelAsyn(request.getMethodName()); // } } if(response==null) { return null; } else if (response.getErrorMsg() != null) { throw response.getErrorMsg(); } else { return response.getAppResponse(); } } }

RpcConsumer consumer; consumer = (RpcConsumer) getConsumerImplClass().newInstance(); consumer.someMethod();

因为consumer对象是通过代理生成的,所以当consumer调用的时候,就会调用invoke函数,我们就可以把这次本地的函数调用的信息通过网络发送到RPC服务器然后等待服务器返回的信息后再返回。

服务器实现

RPC服务器主要是在收到RPC客户端之后解析出RPC调用的接口名,函数名以及参数。

package com.alibaba.middleware.race.rpc.api.impl; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; import net.sf.cglib.reflect.FastClass; import net.sf.cglib.reflect.FastMethod; import com.alibaba.middleware.race.rpc.context.RpcContext; import com.alibaba.middleware.race.rpc.model.RpcRequest; import com.alibaba.middleware.race.rpc.model.RpcResponse; import com.alibaba.middleware.race.rpc.serializer.KryoSerialization; import com.alibaba.middleware.race.rpc.tool.ByteObjConverter; import com.alibaba.middleware.race.rpc.tool.ReflectionCache; import com.alibaba.middleware.race.rpc.tool.Tool; /** * 处理服务器收到的RPC请求并返回结果 * @author sei.zz * */ public class RpcRequestHandler extends ChannelInboundHandlerAdapter { //对应每个请求ID和端口好 对应一个RpcContext的Map; private static Map<String,Map<String,Object>> ThreadLocalMap=new HashMap<String, Map<String,Object>>(); //服务端接口-实现类的映射表 private final Map<String, Object> handlerMap; KryoSerialization kryo=new KryoSerialization(); public RpcRequestHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("active"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("disconnected"); } //更新RpcContext的类容 private void UpdateRpcContext(String host,Map<String,Object> map) { if(ThreadLocalMap.containsKey(host)) { Map<String,Object> local=ThreadLocalMap.get(host); local.putAll(map);//把客户端的加进来 ThreadLocalMap.put(host, local);//放回去 for(Map.Entry<String, Object> entry:map.entrySet()){ //更新变量 RpcContext.addProp(entry.getKey(), entry.getValue()); } } else { ThreadLocalMap.put(host, map); //把对应线程的Context更新 for(Map.Entry<String, Object> entry:map.entrySet()){ RpcContext.addProp(entry.getKey(), entry.getValue()); } } } //用来缓存住需要序列化的结果 private static Object cacheName=null; private static Object cacheVaule=null; @Override public void channelRead( ChannelHandlerContext ctx, Object msg) throws Exception { RpcRequest request=(RpcRequest)msg; String host=ctx.channel().remoteAddress().toString(); //更新上下文 UpdateRpcContext(host,request.getContext()); //TODO 获取接口名 函数名 参数 找到实现类 反射实现 RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { Object result = handle(request); if(cacheName!=null&&cacheName.equals(result)) { response.setAppResponse(cacheVaule); } else { response.setAppResponse(ByteObjConverter.ObjectToByte(result)); cacheName=result; cacheVaule=ByteObjConverter.ObjectToByte(result); } } catch (Throwable t) { //response.setErrorMsg(t); response.setExption(Tool.serialize(t)); response.setClazz(t.getClass()); } ctx.writeAndFlush(response); } /** * 运行调用的函数返回结果 * @param request * @return * @throws Throwable */ private static RpcRequest methodCacheName=null; private static Object methodCacheValue=null; private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); Object classimpl = handlerMap.get(className);//通过类名找到实现的类 Class<?> clazz = classimpl.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); // Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes); // method.setAccessible(true); //System.out.println(className+":"+methodName+":"+parameters.length); if(methodCacheName!=null&&methodCacheName.equals(request)) { return methodCacheValue; } else { try { methodCacheName=request; if(methodMap.containsKey(methodName)) { methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters); return methodCacheValue; } else { FastClass serviceFastClass = FastClass.create(clazz); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); methodMap.put(methodName, serviceFastMethod); methodCacheValue= serviceFastMethod.invoke(classimpl, parameters); return methodCacheValue; } //return method.invoke(classimpl, parameters); } catch (Throwable e) { throw e.getCause(); } } } private Map<String,FastMethod> methodMap=new HashMap<String, FastMethod>(); @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //ctx.close(); //cause.printStackTrace(); ctx.close(); } }

handel函数通过Java的反射机制,找到要调用的接口类然后调用对应函数然后执行,然后返回结果到客户端,本次RPC调用结束。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • RPC框架远程调用的实现方式在原理上是比较简单的,即将调用的方法(接口名、方法名、参数类型、参数)序列化之后发送到...
    谜碌小孩阅读 3,080评论 0 13
  • 1、后海散步清理,出汗排毒,持续和身体链接,提升补充身体能量 2、感谢自己每天坚持认真地活在功课中,把自己铆钉在线...
    张艾雯阅读 243评论 0 0
  • 越来越像空心的稻草人,身体和灵魂同时被抽离,越来越麻木,冷淡龟缩,难道这就是所谓的成长?每每想到这,脑海中便惊现一...
    icexu阅读 174评论 0 0
  • 前些天,看了一部电影,然后问了身边朋友这样一个问题:假设你对象出轨了,你更容易接受他/她的出轨对象是男生还是女生?...
    菠菜猩球阅读 164评论 0 1