RPC基础知识
什么是RPC?
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。
RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。
RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC的模型
C/S模式
基于传输层协议(例如TCP/IP) 远程调用不是新的一种数据传输协议
事件响应基本模型(请求、计算、响应)
RPC设计的目的
通过固定的协议调用非本机的方法
提供不同语言程序之间通信
可以在不了解底层通信,像本地方法一样调用
RPC框架完全封装了网络传输以及其他细节,比如Spring的RPC框架在调用远程对象的方法时就像调用Spring Bean对象一样使用.
RPC的应用大量的分布式应用都使用了RPC协议,比如分布式操作系统、分布式计算、分布式软件设计
RPC过程详解
RPC框架封装网络传输和其他细节,消费者和生产者不用去关心底层原理
消费者的代理层控制了整个RPC调用的流程,生成代理对象,封装请求报文,发送请求之类的
服务提供者会有一个监听模块,用来监听请求,并且按照约定,应该是注册了的服务才会被消费者调用到,注册的服务需要被反射调用到,用来计算结果
RPC框架的特点和设计模型
封装网络交互
尽量不要让RPC框架的使用者涉及到过多的网络层的开发
远程调用对象的代理
将接口代理的对象放入到Spring 容器之中,方便服务端开发
支持容器(Spring、Jetty等)
支持Spring容器,还有Jetty这样的web容器
可配置,可扩展
尽量做到可配置,可扩展
设计模型
Proxy代理层
用于对象的代理,对象的反射调用,RPC流程的控制
Serialize序列化层
将请求和结果做序列化和反序列化
Invoke网络模块
网络通信相关的处理
Container容器组件
支持代理层监听网络请求
代码实现
pom.xml
- <projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.ibigsea</groupId>
- <artifactId>http-rpc</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <dependencies>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.3</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.0.13</version>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- <version>6.1.26</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- <version>4.3.3</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.3.6</version>
- <exclusions>
- <exclusion>
- <artifactId>commons-logging</artifactId>
- <groupId>commons-logging</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>3.2.8.RELEASE</version>
- <exclusions>
- <exclusion>
- <artifactId>commons-logging</artifactId>
- <groupId>commons-logging</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.2</version>
- </dependency>
- </dependencies>
- </project>
config包下面的
ConsumerConfig.java
- packagecom.ibigsea.rpc.config;
- /**
- *服务消费者配置
- *
- *@authorbigsea
- *
- */
- publicclassConsumerConfig{
- /**
- *请求地址服务提供者监听的地址和端口
- */
- privateStringurl;
- publicStringgetUrl(){
- returnurl;
- }
- publicvoidsetUrl(Stringurl){
- this.url=url;
- }
- }
ProviderConfig.java
- packagecom.ibigsea.rpc.config;
- /**
- *服务提供者配置
- *
- *@authorbigsea
- *
- */
- publicclassProviderConfig{
- /**
- *监听端口服务提供者监听请求端口
- */
- privateintport;
- publicProviderConfig(){
- }
- publicProviderConfig(intport){
- this.port=port;
- }
- publicintgetPort(){
- returnport;
- }
- publicvoidsetPort(intport){
- this.port=port;
- }
- }
序列化层
Request.java
- packagecom.ibigsea.rpc.serizlize;
- importjava.io.Serializable;
- importcom.alibaba.fastjson.annotation.JSONType;
- /**
- *请求信息
- *
- *@authorbigsea
- *
- */
- publicclassRequestimplementsSerializable{
- privatestaticfinallongserialVersionUID=-4363326153251862952L;
- privateClassclazz;
- privateStringmethod;
- privateObjectparam;
- publicRequest(){
- }
- publicRequest(Classclazz,Stringmethod,Objectparam){
- this.clazz=clazz;
- this.method=method;
- this.param=param;
- }
- publicClassgetClazz(){
- returnclazz;
- }
- publicvoidsetClazz(Classclazz){
- this.clazz=clazz;
- }
- publicStringgetMethod(){
- returnmethod;
- }
- publicvoidsetMethod(Stringmethod){
- this.method=method;
- }
- publicObjectgetParam(){
- returnparam;
- }
- publicvoidsetParam(Objectparam){
- this.param=param;
- }
- /**
- *通过反射执行对应的方法
- *
- *@parambean
- *@return
- *@throwsException
- */
- publicObjectinvoke(Objectbean)throwsException{
- returnclazz.getMethod(method,param.getClass()).invoke(bean,param);
- }
- }
JsonParser.java
- packagecom.ibigsea.rpc.serizlize;
- importcom.alibaba.fastjson.JSON;
- importcom.alibaba.fastjson.parser.ParserConfig;
- importcom.alibaba.fastjson.serializer.SerializerFeature;
- /**
- *反序列化
- *
- *@authorbigsea
- *
- */
- publicclassJsonParser{
- /**
- *反序列化请求将请求反序列化成一个请求报文
- *
- *@paramparam
- *@return
- */
- publicstaticRequestreqParse(Stringparam){
- returnJSON.parseObject(param,Request.class);
- }
- /**
- *反序列化响应将响应反序列化成一个响应报文
- *
- *@paramresult
- *@return
- */
- publicstatic<T>TresbParse(Stringresult){
- return(T)JSON.parse(result);
- }
- }
JsonFormatter.java
- packagecom.ibigsea.rpc.serizlize;
- importcom.alibaba.fastjson.JSON;
- importcom.alibaba.fastjson.parser.ParserConfig;
- importcom.alibaba.fastjson.serializer.SerializerFeature;
- /**
- *序列化
- *
- *@authorbigsea
- *
- */
- publicclassJsonFormatter{
- /**
- *将请求序列化成字符串
- *
- *@paramclazz
- *@parammethod
- *@paramparam
- *@return
- */
- publicstaticStringreqFormatter(Classclazz,Stringmethod,Objectparam){
- Requestrequest=newRequest(clazz,method,param);
- returnJSON.toJSONString(request,SerializerFeature.WriteClassName);
- }
- /**
- *将响应序列化成字符串
- *
- *@paramparam
- *@return
- */
- publicstaticStringresbFormatter(Objectparam){
- returnJSON.toJSONString(param,SerializerFeature.WriteClassName);
- }
- }
http容器 httpContainer.java
- packagecom.ibigsea.rpc.container;
- importorg.mortbay.jetty.Connector;
- importorg.mortbay.jetty.Server;
- importorg.mortbay.jetty.handler.AbstractHandler;
- importorg.mortbay.jetty.nio.SelectChannelConnector;
- importorg.slf4j.Logger;
- importorg.slf4j.LoggerFactory;
- importcom.ibigsea.rpc.config.ProviderConfig;
- /**
- *利用Jetty实现简单的嵌入式Httpserver
- *
- *@authorbigsea
- *
- */
- publicclassHttpContainer{
- privateLoggerLOG=LoggerFactory.getLogger(HttpContainer.class);
- privateAbstractHandlerhttpHandler;
- privateProviderConfigproviderConfig;
- /**
- *构造方法
- *
- *@paramhttpHandler
- */
- publicHttpContainer(AbstractHandlerhttpHandler){
- this(httpHandler,newProviderConfig(8080));
- }
- /**
- *构造方法
- *
- *@paramhttpHandler
- *@paramproviderConfig
- */
- publicHttpContainer(AbstractHandlerhttpHandler,ProviderConfigproviderConfig){
- this.httpHandler=httpHandler;
- this.providerConfig=providerConfig;
- }
- publicvoidstart(){
- //进行服务器配置
- Serverserver=newServer();
- try{
- SelectChannelConnectorconnector=newSelectChannelConnector();
- //设置监听端口
- connector.setPort(providerConfig.getPort());
- //设置handler,请求过来之后通过该handler来处理请求
- server.setHandler(httpHandler);
- server.setConnectors(newConnector[]{connector});
- server.start();
- LOG.info("容器启动~");
- }catch(Exceptione){
- LOG.error("容器启动异常~",e);
- }
- }
- }
RpcException.java
- packagecom.ibigsea.rpc.exception;
- /**
- *异常
- *
- *@authorbigsea
- *
- */
- publicclassRpcExceptionextendsThrowable{
- privateObjectdata;
- publicRpcException(Stringmessage,Throwablecause,Objectdata){
- super(message,cause);
- this.data=data;
- }
- publicRpcException(Objectdata){
- super();
- this.data=data;
- }
- publicObjectgetData(){
- returndata;
- }
- }
HttpInvoke.java
- packagecom.ibigsea.rpc.invoke;
- importjava.io.OutputStream;
- importjava.util.ArrayList;
- importjava.util.List;
- importorg.apache.http.HttpHost;
- importorg.apache.http.HttpResponse;
- importorg.apache.http.NameValuePair;
- importorg.apache.http.client.HttpClient;
- importorg.apache.http.client.entity.UrlEncodedFormEntity;
- importorg.apache.http.client.methods.HttpPost;
- importorg.apache.http.conn.routing.HttpRoute;
- importorg.apache.http.impl.client.HttpClients;
- importorg.apache.http.impl.conn.PoolingHttpClientConnectionManager;
- importorg.apache.http.message.BasicNameValuePair;
- importorg.apache.http.util.EntityUtils;
- importcom.ibigsea.rpc.config.ConsumerConfig;
- importcom.ibigsea.rpc.exception.RpcException;
- /**
- *http请求和响应处理
- *
- *@authorbigsea
- *
- */
- publicclassHttpInvoke{
- privatestaticfinalHttpClienthttpClient=getHttpClient();
- /**
- *单例
- */
- privatestaticHttpInvokehttpInvoke;
- privateHttpInvoke(){
- }
- publicstaticsynchronizedHttpInvokegetInstance(){
- if(httpInvoke==null){
- httpInvoke=newHttpInvoke();
- }
- returnhttpInvoke;
- }
- /**
- *发送请求
- *
- *@paramrequest
- *服务消费者将(类信息、方法、参数)封装成请求报文,序列化后的字符串
- *@paramconsumerConfig
- *服务消费者请求的地址
- *@return请求结果
- *@throwsRpcException
- */
- publicStringrequest(Stringrequest,ConsumerConf