首先来看 RPC 的整体系统架构图:
图中做事端启动时将自己的做事节点信息注册到注册中央,客户端调用远程方法时会订阅注册中央中的可用做事节点信息,拿到可用做事节点之后远程调用方法,当注册中央中的可用做事节点发生变革时会关照客户端,避免客户端连续调用已经失落效的节点。那客户端是如何调用远程方法的呢,来看一下远程调用示意图:
客户端模块代理所有远程方法的调用将目标做事、目标方法、调用目标方法的参数等必要信息序列化序列化之后的数据包进一步压缩,压缩后的数据包通过网络通信传输到目标做事节点做事节点将接管到的数据包进行解压解压后的数据包反序列化成目标做事、目标方法、目标方法的调用参数通过做事端代理调用目标方法获取结果,结果同样须要序列化、压缩然后回传给客户端
通过以上描述,相信读者该当大体上理解了 RPC 是如何事情的,接下来看如何利用代码详细实现上述的流程。鉴于篇幅笔者会选择主要或者网络上先容相对较少的模块来讲述。

作为一个入门项目,我们的系统选用 Zookeeper 作为注册中央, ZooKeeper 将数据保存在内存中,性能很高。在读多写少的场景中尤实在用,由于写操作会导致所有的做事器间同步状态。做事注册与创造是范例的读多写少的折衷做事场景。Zookeeper 是一个范例的CP系统,在做事选举或者集群半数机器宕机时是不可用状态,相对付做事创造中主流的AP系统来说,可用性稍低,但是用于理解RPC的实现,也是绰绰有余。
ZooKeeper节点先容持久节点( PERSISENT ):一旦创建,除非主动调用删除操作,否则一贯持久化存储。临时节点( EPHEMERAL ):与客户端会话绑定,客户端会话失落效,这个客户端所创建的所有临时节点都会被删除除。节点顺序( SEQUENTIAL ):创建子节点时,如果设置SEQUENTIAL属性,则会自动在节点名后追加一个整形数字,上限是整形的最大值;同一目录下共享顺序,例如(/a0000000001,/b0000000002,/c0000000003,/test0000000004)。ZooKeeper做事注册在 ZooKeeper 根节点下根据做事名创建持久节点 /rpc/{serviceName}/service ,将该做事的所有做事节点利用临时节点创建在 /rpc/{serviceName}/service 目录下,代码如下(为方便展示,后续展示代码都做了删减):
public void exportService(Service serviceResource) { String name = serviceResource.getName(); String uri = GSON.toJson(serviceResource); String servicePath = "rpc/" + name + "/service"; zkClient.createPersistent(servicePath, true); String uriPath = servicePath + "/" + uri; //创建一个新的临时节点,当该节点宕机会话失落效时,该临时节点会被清理 zkClient.createEphemeral(uriPath);}
注册效果如图,本地启动两个做事则 service 下有两个做事节点信息:
存储的节点信息包括做事名,做事 IP:PORT ,序列化协议,压缩协议等。
ZooKeeper做事创造客户端启动后,不会立即从注册中央获取可用做事节点,而是在调用远程方法时获取节点信息(
public List<Service> getServices(String name) { Map<String, List<Service>> SERVER_MAP = new ConcurrentHashMap<>(); String servicePath = "rpc/" + name + "/service"; List<String> children = zkClient.getChildren(servicePath); List<Service> serviceList = Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> { String deCh = URLDecoder.decode(str, StandardCharsets.UTF_8.toString()); return gson.fromJson(deCh, Service.class); }).collect(Collectors.toList()); SERVER_MAP.put(name, serviceList); return serviceList;}
public class ZkChildListenerImpl implements IZkChildListener { //监听子节点的删除和新增事宜 @Override public void handleChildChange(String parentPath, List<String> childList) throws Exception { //有变动就清空做事所有节点缓存 String[] arr = parentPath.split("/"); SERVER_MAP.remove(arr[2]); }}
PS:美团分布式 ID 天生系统Leaf就利用 Zookeeper 的顺序节点来注册 WorkerID ,临时节点保存节点 IP:PORT 信息。
2. 客户端实现客户端调用本地方法一样调用远程方法的完美体验与 Java 动态代理的强大密不可分。
DefaultRpcBaseProcessor 抽象类实现了 ApplicationListener , onApplicationEvent 方法在 Spring 项目启动完毕会收到韶光关照,获取 ApplicationContext 高下文之后开始注入做事 injectService (依赖其他做事)或者启动做事 startServer (自身做事实现)。
injectService 方法会遍历 ApplicationContext 高下文中的所有 Bean , Bean 中是否有属性利用了 InjectService 表明。有的话天生代理类,注入到 Bean 的属性中。代码如下:
public abstract class DefaultRpcBaseProcessor implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { //Spring启动完毕会收到Event if (Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent())) { ApplicationContext applicationContext = contextRefreshedEvent.getApplicationContext(); //保存spring高下文 后续利用 Container.setSpringContext(applicationContext); startServer(applicationContext); injectService(applicationContext); } } private void injectService(ApplicationContext context) { String[] names = context.getBeanDefinitionNames(); for (String name : names) { Object bean = context.getBean(name); Class<?> clazz = bean.getClass(); //clazz = clazz.getSuperclass(); aop增强的类天生cglib类,须要Superclass才能获取定义的字段 Field[] declaredFields = clazz.getDeclaredFields(); //设置InjectService的代理类 for (Field field : declaredFields) { InjectService injectService = field.getAnnotation(InjectService.class); if (injectService == null) {continue; Class<?> fieldClass = field.getType(); Object object = context.getBean(name); field.set(object, clientProxyFactory.getProxy(fieldClass, injectService.group(), injectService.version())); ServerDiscoveryCache.SERVER_CLASS_NAMES.add(fieldClass.getName()); } } } protected abstract void startServer(ApplicationContext context);}
调用 ClientProxyFactory 类的 getProxy ,根据做事接口、做事分组、做事版本、是否异步调用来创建该接口的代理类,对该接口的所有方法都会利用创建的代理类来调用。方法调用的实现细节都在 ClientInvocationHandler 中的 invoke 方法,紧张内容是,获取做事节点信息,选择调用节点,构建 request 工具,末了调用网络模块发送要求。
public class ClientProxyFactory { public <T> T getProxy(Class<T> clazz, String group, String version, boolean async) { return (T) objectCache.computeIfAbsent(clazz.getName() + group + version, clz -> Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new ClientInvocationHandler(clazz, group, version, async))); } private class ClientInvocationHandler implements InvocationHandler { public ClientInvocationHandler(Class<?> clazz, String group, String version, boolean async) { } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //1. 得到做事信息 String serviceName = clazz.getName(); List<Service> serviceList = getServiceList(serviceName); Service service = loadBalance.selectOne(serviceList); //2. 构建request工具 RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setServiceName(service.getName()); rpcRequest.setMethod(method.getName()); rpcRequest.setGroup(group); rpcRequest.setVersion(version); rpcRequest.setParameters(args); rpcRequest.setParametersTypes(method.getParameterTypes()); //3. 协议编组 RpcProtocolEnum messageProtocol = RpcProtocolEnum.getProtocol(service.getProtocol()); RpcCompressEnum compresser = RpcCompressEnum.getCompress(service.getCompress()); RpcResponse response = netClient.sendRequest(rpcRequest, service, messageProtocol, compresser); return response.getReturnValue(); } }}
3. 网络传输
客户端封装调用要求工具之后须要通过网络将调用信息发送到做事端,在发送要求工具之前还须要经历序列化、压缩两个阶段。
序列化与反序列化序列化与反序列化的核心浸染便是工具的保存与重修,方便客户端与做事端通过字节流通报工具,快速对接交互。
序列化便是指把 Java 工具转换为字节序列的过程。反序列化便是指把字节序列规复为 Java 工具的过程。Java序列化的办法有很多,诸如 JDK 自带的 Serializable 、 Protobuf 、 kryo 等,上述三种笔者自测性能最高的是 Kryo 、其次是 Protobuf 。Json 也不失落为一种大略且高效的序列化方法,有很多大道至简的框架采取。序列化接口比较大略,读者可以自行查看实当代码。
public interface MessageProtocol { byte[] marshallingRequest(RpcRequest request) throws Exception; RpcRequest unmarshallingRequest(byte[] data) throws Exception; byte[] marshallingResponse(RpcResponse response) throws Exception; RpcResponse unmarshallingResponse(byte[] data) throws Exception;}
压缩与解压
网络通信的本钱很高,为了减小网络传输数据包的体积,将序列化之后的字节码压缩不失落为一种很好的选择。Gzip 压缩算法比率在3到10倍旁边,可以大大节省做事器的网络带宽,各种盛行的 web 做事器也都支持 Gzip 压缩算法。Java 接入也比较随意马虎,接入代码可以查看下方接口的实现。
public interface Compresser { byte[] compress(byte[] bytes); byte[] decompress(byte[] bytes);}
网络通信
万事俱备只欠东风。将要求工具序列化成字节码,并且压缩体积之后,须要利用网络将字节码传输到做事器。常用网络传输协议有 HTTP 、 TCP 、 WebSocke t等。HTTP、WebSocket 是运用层协议,TCP 是传输层协议。有些追求简洁、易用的 RPC 框架也有选择 HTTP 协议的。TCP传输的高可靠性和极致性能是主流RPC框架选择的最紧张缘故原由。谈到 Java 生态的通信领域,Netty 的领衔地位短韶光内无人能及。选用 Netty 作为网络通信模块, TCP 数据流的粘包、拆包不可避免。
粘包、拆包问题
TCP 传输协议是一种面向连接的、可靠的、基于字节流的传输层通信协议。为了最大化传输效率。发送方可能将单个较小数据包合并发送,这种情形就须要吸收方来拆包处理数据了。
Netty 供应了3种类型的解码器来处理 TCP 粘包/拆包问题:
定长解码器:FixedLengthFrameDecoder 。发送方和吸收方规定一个固定的长度,不足用空格等字符补全,这样吸收方每次从接管到的字节流中读取固定长度的字节即可,长度不足就保留本次接管的数据,再不才一个字节流中获取剩下数量的字节数据。分隔符解码器:LineBasedFrameDecoder 或 DelimiterBasedFrameDecoder。LineBasedFrameDecoder 是行分隔符解码器,分隔符为 \n 或 \r\n ;DelimiterBasedFrameDecoder 是自定义分隔符解码器,可以定义一个或多个分隔符。吸收端在收到的字节流中查找分隔符,然后返回分隔符之前的数据,没找到就连续从下一个字节流中查找。数据长度解码器:LengthFieldBasedFrameDecoder。将发送的分为 header 和 body,header 存储的长度(字节数),body 是发送的的内容。同时发送方和吸收方要协商好这个 header 的字节数,由于 int 能表示长度,long 也能表示长度。吸收方首先从字节流中读取前n(header的字节数)个字节(header),然后根据长度读取等量的字节,不足就从下一个数据流中查找。不想利用内置的解码器也可自定义解码器,自定传输协议。
网络通信这部分内容比较繁芜,说来话长,代码易读,读者可先自行阅读代码。后续有机会细说此节内容。
5. 做事端实现客户端通过网络传输将要求工具序列化、压缩之后的字节码传输到做事端之后,同样先通过解压、反序列化将字节码重修为要求工具。有了要求工具之后,就可以进行关键的方法调用环节了。
public abstract class RequestBaseHandler { public RpcResponse handleRequest(RpcRequest request) throws Exception { //1. 查找目标做事代理工具 ServiceObject serviceObject = serverRegister.getServiceObject(request.getServiceName() + request.getGroup() + request.getVersion()); RpcResponse response = null; //2. 调用对应的方法 response = invoke(serviceObject, request); //相应客户端 return response; } //详细代理调用 public abstract RpcResponse invoke(ServiceObject serviceObject, RpcRequest request) throws Exception;}
上述抽象类 RequestBaseHandler 是调用做事方法的抽象实现 handleRequest 通过要求工具的做事名、做事分组、做事版本在 serverRegister.getServiceObject 获取代理工具。然后调用 invoke 抽象方法来真正通过代理工具调用方法得到结果。
做事的代理工具怎么产生的?如何通过代理工具调用方法?天生做事代理工具带着上述问题来看 DefaultRpcBaseProcessor 抽象类:
public abstract class DefaultRpcBaseProcessor implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { //Spring启动完毕会收到Event if (Objects.isNull(contextRefreshedEvent.getApplicationContext().getParent())) { ApplicationContext applicationContext = contextRefreshedEvent.getApplicationContext(); Container.setSpringContext(applicationContext); startServer(applicationContext); injectService(applicationContext); } } private void injectService(ApplicationContext context) {} protected abstract void startServer(ApplicationContext context);}
DefaultRpcBaseProcessor 抽象类也有两个实现类 DefaultRpcReflectProcessor 和 DefaultRpcJavassistProcessor,来实现关键的天生代理工具的 startServer 方法。
做事接口实现类的Bean作为代理工具public class DefaultRpcReflectProcessor extends DefaultRpcBaseProcessor { @Override protected void startServer(ApplicationContext context) { Map<String, Object> beans = context.getBeansWithAnnotation(RpcService.class); if (beans.size() > 0) { boolean startServerFlag = true; for (Object obj : beans.values()) { Class<?> clazz = obj.getClass(); Class<?>[] interfaces = clazz.getInterfaces(); / 如果只实现了一个接口就用接口的className作为做事名 如果该类实现了多个接口,则利用表明里的value作为做事名 / RpcService service = clazz.getAnnotation(RpcService.class); if (interfaces.length != 1) { String value = service.value(); ServiceObject so = new ServiceObject(value, Class.forName(value), obj, service.group(), service.version()); } else { Class<?> supperClass = interfaces[0]; ServiceObject so = new ServiceObject(supperClass.getName(), supperClass, obj, service.group(), service.version()); } serverRegister.register(so); } } }}
DefaultRpcReflectProcessor 中获取到所有有 RpcService 表明的做事接口实现类 Bean,然后将该 Bean 作为做事代理工具注册到 serverRegister 中供上述的反射调用中利用。
利用Javassist天生新的代理工具public class DefaultRpcJavassistProcessor extends DefaultRpcBaseProcessor { @Override protected void startServer(ApplicationContext context) { Map<String, Object> beans = context.getBeansWithAnnotation(RpcService.class); if (beans.size() > 0) { boolean startServerFlag = true; for (Map.Entry<String, Object> entry : beans.entrySet()) { String beanName = entry.getKey(); Object obj = entry.getValue(); Class<?> clazz = obj.getClass(); Class<?>[] interfaces = clazz.getInterfaces(); Method[] declaredMethods = clazz.getDeclaredMethods(); / 如果只实现了一个接口就用接口的className作为做事名 如果该类实现了多个接口,则利用表明里的value作为做事名 / RpcService service = clazz.getAnnotation(RpcService.class); if (interfaces.length != 1) { String value = service.value(); //bean实现多个接口时,javassist代理类中天生的方法只按照表明指定的做事类来天生 declaredMethods = Class.forName(value).getDeclaredMethods(); Object proxy = ProxyFactory.makeProxy(value, beanName, declaredMethods); ServiceObject so = new ServiceObject(value, Class.forName(value), proxy, service.group(), service.version()); } else { Class<?> supperClass = interfaces[0]; Object proxy = ProxyFactory.makeProxy(supperClass.getName(), beanName, declaredMethods); ServiceObject so = new ServiceObject(supperClass.getName(), supperClass, proxy, service.group(), service.version()); } serverRegister.register(so); } } }}
DefaultRpcJavassistProcessor 与 DefaultRpcReflectProcessor 的差异在于后者直接将做事实现类工具 Bean 作为做事代理工具,而前者通过 ProxyFactory.makeProxy(value, beanName, declaredMethods) 创建了新的代理工具,将新的代理工具注册到 serverRegister 中供后续调用调用中利用。该方法通过 Javassist 来天生代理类,代码冗长,建议阅读源码。我来通过下面的代码演示实现的代理类。
首先我们的做事接口是:
public interface HelloService { String hello(String name);}
做事的实现类是:
@RpcServicepublic class HelloServiceImpl implements HelloService { @Override public String hello(String name) { return "a1"; }}
那终极新天生的代理类是这样的:
public class HelloService$proxy1649315143476 { private static cn.ppphuang.rpcspringstarter.service.HelloService serviceProxy = ((org.springframework.context.ApplicationContext)cn.ppphuang.rpcspringstarter.server.Container.getSpringContext()).getBean("helloServiceImpl"); public cn.ppphuang.rpcspringstarter.common.model.RpcResponse hello(cn.ppphuang.rpcspringstarter.common.model.RpcRequest request) throws java.lang.Exception { java.lang.Object[] params = request.getParameters(); if(params.length == 1 && (params[0] == null||params[0].getClass().getSimpleName().equalsIgnoreCase("String"))){ java.lang.String arg0 = null; arg0 = cn.ppphuang.rpcspringstarter.util.ConvertUtil.convertToString(params[0]); java.lang.String returnValue = serviceProxy.hello(arg0); return new cn.ppphuang.rpcspringstarter.common.model.RpcResponse(returnValue); } } public cn.ppphuang.rpcspringstarter.common.model.RpcResponse invoke(cn.ppphuang.rpcspringstarter.common.model.RpcRequest request) throws java.lang.Exception { String methodName = request.getMethod(); if(methodName.equalsIgnoreCase("hello")){ java.lang.Object returnValue = hello(request); return returnValue; } }}
清理全限定类名后,代码如下:
public class HelloService$proxy1649315143476 { private static HelloService serviceProxy = ((ApplicationContext)Container.getSpringContext()).getBean("helloServiceImpl"); public RpcResponse hello(RpcRequest request) throws Exception { Object[] params = request.getParameters(); if(params.length == 1 && (params[0] == null|| params[0].getClass().getSimpleName().equalsIgnoreCase("String"))){ String arg0 = ConvertUtil.convertToString(params[0]); String returnValue = serviceProxy.hello(arg0); return new RpcResponse(returnValue); } } public RpcResponse invoke(RpcRequest request) throws Exception { String methodName = request.getMethod(); if(methodName.equalsIgnoreCase("hello")){ Object returnValue = hello(request); return returnValue; } }}
代理类 HelloService$proxy1649315143476 中有一个做事接口类型 HelloService的静态属性 serviceProxy,值便是通过 ApplicationContext 高下文获取到的做事接口实现类 HelloServiceImpl 这个 Bean(SpringContext 已经被提前缓存到 Container 类中,读者可以自行查找代码理解)。public RpcResponse invoke(RpcRequest request) throws Exception 该方法判断调用的方法名是 hello 来调用代理类中的hello方法。public RpcResponse hello(RpcRequest request) throws Exception 该方法通过调用 serviceProxy.hello() 的方法获取结果。
public interface InvokeProxy { / invoke调用做事接口 / RpcResponse invoke(RpcRequest rpcRequest) throws Exception;}
HelloService$proxy1649315143476 类实现 InvokeProxy 接口(ProxyFactory.makeProxy 代码中有表示)。InvokeProxy 接口只有一个 invoke 方法。到这里就能理解通过调用代理工具的 invoke 方法就能间接调用到做事接口实现类 HelloServiceImpl 的对应方法了。
调用代理工具方法理清代理工具的天生之后,开始调用代理工具的方法。
上文中写到的抽象类 RequestBaseHandler 有两个实现类 RequestJavassistHandler 和 RequestReflectHandler。
Java 反射调用
先看 RequestReflectHandler:
public class RequestReflectHandler extends RequestBaseHandler { @Override public RpcResponse invoke(ServiceObject serviceObject, RpcRequest request) throws Exception { Method method = serviceObject.getClazz().getMethod(request.getMethod(), request.getParametersTypes()); Object value = method.invoke(serviceObject.getObj(), request.getParameters()); RpcResponse response = new RpcResponse(RpcStatusEnum.SUCCESS); response.setReturnValue(value); return response; }}
Object value = method.invoke(serviceObject.getObj(), request.getParameters());
这行代码都很熟习,用 Java 框架中最常见的反射来调用代理类中的方法,大部分 RPC 框架也都是这么来实现的。
通过 Javassists 天生的代理工具 invoke 方法调用
接着看 RequestJavassistHandler:
public class RequestJavassistHandler extends RequestBaseHandler { @Override public RpcResponse invoke(ServiceObject serviceObject, RpcRequest request) throws Exception { InvokeProxy invokeProxy = (InvokeProxy) serviceObject.getObj(); return invokeProxy.invoke(request); }}
直接将代理工具转为 InvokeProxy,调用 InvokeProxy.invoke() 方法得到返回值,如果这里不能理解,转头再看一下利用 Javassist 天生新的代理工具这个小节吧。
调用代理工具的方法获取到结果,仍要通过序列化、压缩后,将字节流数据包通过网络传输到客户端,客户端拿到相应的结果再解压,反序列化得到结果工具。
Javassist先容Javassist 是一个开源的剖析、编辑和创建Java字节码的类库。是由东京工业大学的数学和打算机科学系的 Shigeru Chiba(千叶滋)所创建的。大略来说便是用源码级别的 api 去修正字节码。Duboo、MyBatis 也都利用了 Javassist。Duboo 作者也选择Javassist作为 Duboo 的代理工具,可以点击这里查看 Duboo 作者也选择 Javassist 的缘故原由。
Javassist 还能和谐(pojie)Java 编写的商业软件,例如抓包工具 Charles。代码在这里,供互换学习。
在利用 Javassist 有踩到如下坑,供大家参考:
Javassist 是运行时,没有 JDK 静态编译过程,JDK 的很多语法糖都是在静态编译过程中处理的,以是须要自行编码处理,例如自动拆装箱。int i = 1;Integer ii = i; //javassist 缺点 JDK会自动装箱,javassist须要自行编码处理 int i = 1;Integer ii = new Integer(i); //javassist 精确自定义的类须要利用类的完备限定名,这也是为什么天生的代理类中类都是完备限定名。选择哪种代理办法可以通过配置文件 application.properties 修正 hp.rpc.server-proxy-type 的值来选择代理模式。
性能测试,机器 Macbook Pro M1 8C 16G, 代码如下:
@AutowiredClientProxyFactory clientProxyFactory;@Testvoid contextLoads() { long l1 = System.currentTimeMillis(); HelloService proxy = clientProxyFactory.getProxy(HelloService.class,"group3","version3"); for (int i = 0; i < 1000000; i++) { String ppphuang = proxy.hello("ppphuang"); } long l2 = System.currentTimeMillis(); long l3 = l2 - l1; System.out.println(l3);}
测试结果(ms):
要求次数反射调用1反射调用2反射调用3Javassist1Javassist2Javassist3100001303115911641126123510941000006110610360656259585461781000000544755189052329525605209952794
测试结果差异并不大,Javassist 模式下只是稍快了一点点,险些可以忽略不记。与Duboo作者博客6楼评论的测试结果同等。以是想大略通用性强用反射模式,也可以通过利用 Javassist 模式来学习更多知识,由于 Javassist 须要自己兼允许多分外的状况,反射调用 JDK 已经帮你兼容完了。
总结写到这里我们理解了 RPC 的基本事理、做事注册与创造、客户端代理、网络传输、重点先容了做事真个两种代理模式,学习 Javassist 如何实现代理。
还有很多东西没有重点讲述乃至没有提及,例如粘、拆包的处理、自定义数据包协议、Javassist 模式下如何实现方法重载、如何办理一个做事接口类有多个实现、如何办理一个实现类实现了多个做事接口、在 SpringBoot 中如何自动装载、如何开箱即用、怎么实现异步调用、怎么扩展序列化、压缩算法等等...有兴趣的读者可以在源码中探求答案,或者探求优化项,当然也可以探求 bug 。如果读者能理解全体项目的实现,相信你一定会有所收成。后续有机会也会再写文章与大家互换学习。因笔者水平有限,不完善的地方请大家斧正。感谢各位的阅读,感激。