RPC的简单实现

Wesley13
• 阅读 880

RPC(Remote Procedure Call)—远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

源码托管在 Git@OSC

原理

1. Client端获取一个 RPC 代理对象 proxy

2. 调用 proxy 上的方法, 被 InvocationHandler 实现类 Invoker 的 invoke() 方法捕获

3. invoke() 方法内将 RPC 请求封装成 Invocation 实例, 再向 Server 发送 RPC请求

4. Server端循环接收 RPC请求, 对每一个请求都创建一个 Handler线程处理

5. Handler线程从输入流中反序列化出 Invocation实例, 再调用 Server端的实现方法

6. 调用结束, 向 Client端返回调用结果

一. Invoker 类

    InvocationHandler 的实现类

/**
 * InvocationHandler 接口的实现类 <br>
 * Client端代理对象的方法调用都会被 Invoker 的 invoke() 方法捕获
 */
public class Invoker implements InvocationHandler {
    /** RPC协议接口的 Class对象 */
    private Class<?> intface;
    /** Client 端 Socket */
    private Socket client;
    /** 用于向 Server端发送 RPC请求的输出流 */
    private ObjectOutputStream oos;
    /** 用于接收 Server端返回的 RPC请求结果的输入流 */
    private ObjectInputStream ois;

    /**
     * 构造一个 Socket实例 client, 并连接到指定的 Server端地址, 端口
     * 
     * @param intface
     *            RPC协议接口的 Class对象
     * @param serverAdd
     *            Server端地址
     * @param serverPort
     *            Server端监听的端口
     */
    public Invoker(Class<?> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException {
        this.intface = intface;
        client = new Socket(serverAdd, serverPort);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        try {
            // 封装 RPC请求
            Invocation invocation = new Invocation(intface, method.getName(), method.getParameterTypes(), args);
            // 打开 client 的输出流
            oos = new ObjectOutputStream(client.getOutputStream());
            // 序列化, 将 RPC请求写入到 client 的输出流中
            oos.writeObject(invocation);
            oos.flush();

            // 等待 Server端返回 RPC请求结果 //

            // 打开 client 的输入流
            ois = new ObjectInputStream(client.getInputStream());
            // 反序列化, 从输入流中读取 RPC请求结果
            Object res = ois.readObject();
            // 向 client 返回 RPC请求结果
            return res;
        } finally { // 关闭资源
            CloseUtil.closeAll(ois, oos);
            CloseUtil.closeAll(client);
        }
    }
}

二. Invocation 类

    Serializable 的实现类, RPC请求的封装

/**
 * RPC调用的封装, 包括以下字段: <br>
 * methodName: 方法名 <br>
 * parameterTypes: 方法参数列表的 Class 对象数组 <br>
 * params: 方法参数列表
 */
@SuppressWarnings("rawtypes")
public class Invocation implements Serializable {
    private static final long serialVersionUID = -7311316339835834851L;
    /** RPC协议接口的 Class对象 */
    private Class<?> intface;
    /** 方法名 */
    private String methodName;
    /** 方法参数列表的 Class 对象数组 */
    private Class[] parameterTypes;
    /** 方法的参数列表 */
    private Object[] params;

    public Invocation() {
    }

    /**
     * 构造一个 RPC请求的封装
     * 
     * @param intface
     *            RPC协议接口的 Class对象
     * @param methodName
     *            方法名
     * @param parameterTypes
     *            方法参数列表的 Class 对象数组
     * @param params
     *            方法的参数列表
     */
    public Invocation(Class intface, String methodName, Class[] parameterTypes, Object[] params) {
        this.intface = intface;
        this.methodName = methodName;
        this.parameterTypes = parameterTypes;
        this.params = params;
    }

    public Class getIntface() {
        return intface;
    }

    public String getMethodName() {
        return methodName;
    }

    public Class[] getParameterTypes() {
        return parameterTypes;
    }

    public Object[] getParams() {
        return params;
    }
}

三.RPC 类

    构造 Client端代理对象, Server端实例

/**
 * 一个构造 Server 端实例与 Client 端代理对象的类
 */
public class RPC {

    /**
     * 获取一个 Client 端的代理对象
     * 
     * @param intface
     *            RPC协议接口, Client 与 Server 端共同遵守
     * @param serverAdd
     *            Server 端地址
     * @param serverPort
     *            Server 端监听的端口
     * @return Client 端的代理对象
     */
    public static <T> Object getProxy(final Class<T> intface, String serverAdd, int serverPort)
            throws UnknownHostException, IOException {

        Object proxy = Proxy.newProxyInstance(intface.getClassLoader(), new Class[] { intface },
                new Invoker(intface, serverAdd, serverPort));
        return proxy;
    }

    /**
     * 获取 RPC 的 Server 端实例
     * 
     * @param intface
     *            RPC协议接口
     * @param intfaceImpl
     *            Server 端 RPC协议接口的实现
     * @param port
     *            Server 端监听的端口
     * @return RPCServer 实例
     */
    public static <T> RPCServer getRPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
        return new RPCServer(intface, intfaceImpl, port);
    }

}

四. RPCServer 类

    Server端接收 RPC请求, 处理请求

/**
 * RPC 的 Server端
 */
public class RPCServer {
    /** Server端的 ServerSocket实例 */
    private ServerSocket server;
    /** Server端 RPC协议接口的实现缓存, 一个接口对应一个实现类的实例 */
    private static Map<Class<?>, Object> intfaceImpls = new HashMap<Class<?>, Object>();

    /**
     * 构造一个 RPC 的 Server端实例
     * 
     * @param intface
     *            RPC协议接口的 Class对象
     * @param intfaceImpl
     *            Server端 RPC协议接口的实现
     * @param port
     *            Server端监听的端口
     */
    public <T> RPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
        server = new ServerSocket(port);
        RPCServer.intfaceImpls.put(intface, intfaceImpl);
    }

    /**
     * 循环监听并接收 Client端连接, 处理 RPC请求, 向 Client端返回结果
     */
    public void start() {
        try {
            while (true) {
                // 接收 Client端连接, 创建一个 Handler线程, 处理 RPC请求
                new Handler(server.accept()).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally { // 关闭资源
            CloseUtil.closeAll(server);
        }
    }

    /**
     * 向 RPC协议接口的实现缓存中添加缓存
     * 
     * @param intface
     *            RPC协议接口的 Class对象
     * @param intfaceImpl
     *            Server端 RPC协议接口的实现
     */
    public static <T> void addIntfaceImpl(Class<T> intface, T intfaceImpl) {
        RPCServer.intfaceImpls.put(intface, intfaceImpl);
    }

    /**
     * 处理 RPC请求的线程类
     */
    private static class Handler extends Thread {
        /** Server端接收到的 Client端连接 */
        private Socket client;
        /** 用于接收 client 的 RPC请求的输入流 */
        private ObjectInputStream ois;
        /** 用于向 client 返回 RPC请求结果的输出流 */
        private ObjectOutputStream oos;
        /** RPC请求的封装 */
        private Invocation invocation;

        /**
         * 用 Client端连接构造 Handler线程
         * 
         * @param client
         */
        public Handler(Socket client) {
            this.client = client;
        }

        @Override
        public void run() {
            try {
                // 打开 client 的输入流
                ois = new ObjectInputStream(client.getInputStream());
                // 反序列化, 从输入流中读取 RPC请求的封装
                invocation = (Invocation) ois.readObject();
                // 从 RPC协议接口的实现缓存中获取实现
                Object intfaceImpl = intfaceImpls.get(invocation.getIntface());
                // 获取 Server端 RPC协议接口的方法实现
                Method method = intfaceImpl.getClass().getMethod(invocation.getMethodName(),
                        invocation.getParameterTypes());
                // 跳过安全检查
                method.setAccessible(true);
                // 调用具体的实现方法, 用 res 接收方法返回结果
                Object res = method.invoke(intfaceImpl, invocation.getParams());
                // 打开 client 的输出流
                oos = new ObjectOutputStream(client.getOutputStream());
                // 序列化, 向输出流中写入 RPC请求的结果
                oos.writeObject(res);
                oos.flush();
            } catch (Exception e) {
                e.printStackTrace();
            } finally { // 关闭资源
                CloseUtil.closeAll(ois, oos);
                CloseUtil.closeAll(client);
            }
        }
    }
}

五. 测试类

    Login类, RPC协议接口

/**
 * RPC协议接口, Client 与 Server端共同遵守
 */
public interface Login {
    /**
     * 抽象方法 login(), 模拟用户登录传入两个String 类型的参数, 返回 String类型的结果
     * 
     * @param username
     *            用户名
     * @param password
     *            密码
     * @return 返回登录结果
     */
    public String login(String username, String password);
}

LoginImpl类, Server 端 RPC协议接口( Login )的实现类

/**
 * Server端 RPC协议接口( Login )的实现类
 */
public class LoginImpl implements Login {
    /**
     * 实现 login()方法, 模拟用户登录
     * 
     * @param username
     *            用户名
     * @param password
     *            密码
     * @return hello 用户名
     */
    @Override
    public String login(String username, String password) {
        return "hello " + username;
    }
}

ClientTest类, Client端测试类

/**
 * Client端测试类
 */
public class ClientTest {
    public static void main(String[] args) throws UnknownHostException, IOException {
        // 获取一个 Client端的代理对象 proxy
        Login proxy = (Login) RPC.getProxy(Login.class, "192.168.8.1", 8888);
        // 调用 proxy 的 login() 方法, 返回值为 res
        String res = proxy.login("rpc", "password");
        // 输出 res
        System.out.println(res);
    }
}

ServerTest类, Server端测试类

/**
 * Server端测试类
 */
public class ServerTest {
    public static void main(String[] args) throws ClassNotFoundException, IOException {
        // 获取 RPC 的 Server 端实例 server
        RPCServer server = RPC.getRPCServer(Login.class, new LoginImpl(), 8888);
        // 循环监听并接收 Client 端连接, 处理 RPC 请求, 向 Client 端返回结果
        server.start();
    }
}

运行 ServerTest, 控制台输出: 

Starting Socket Handler for port 8888

运行 ClientTest, 控制台输出: 

hello rpc

至此, 实现了基于 Proxy, Socket, IO 的简单版 RPC模型,

对于每一个 RPC请求, Server端都开启一个 Handler线程处理该请求,

在高并发情况下, Server端是扛不住的, 改用 NIO应该表现更好

JDK动态代理的简单实现

Hadoop中RPC机制简介

源码托管在 Git@OSC

点赞
收藏
评论区
推荐文章
爱写码 爱写码
3年前
为RPC而生的t-io企业集群版的msg服务器tio-msg-demo你应该感兴趣
概念解释什么是RPC(RemoteProcedureCall)远程过程调用,是一种通过网络从远程计算机程序上请求服务,实现某个业务,但是不需要具体了解底层网络技术的协议。tio把程序中对外实现通信的各个协议模块进行了打包处理成一个盒子,上层应用对外通信就只要对接盒子的接口,而不必关心盒子里面的内容,RPC服务要对外实现远程调用,首先要跟tio通信,再到远
Easter79 Easter79
3年前
spring集成Hessian的基本使用方法
一、什么是RPC  RPC全称RemoteProcedureCall,中文名叫远程过程调用。RPC是一种远程调用技术,用于不同系统之间的远程相互调用。其在分布式系统中应用十分广泛。二、什么是Hessian  Hessian是一个轻量级的RPC框架。 相比WebService,Hessian更简单、快捷。采用的是二进制RPC协议,因为采用的是二
Wesley13 Wesley13
3年前
RPC、RMI、HTTP、REST的区别
RPC、RMI、HTTP、REST的区别RPC:远程服务调用(RemoteProcedureCall),加上Protocol后可以称为远程过程调用协议,可以用不同的语言实现,可以借用HTTP协议或者其他协议来实现,一般都是通过基于TCP/IP的自定义协议实现。HTTP协议和TCP/
Wesley13 Wesley13
3年前
RPC调用(架构)和HTTP调用(架构)的区别
我是接触到了Dubbo,才接触到RPC服务的。它处于的网络模型的传输层,而http处于应用层,RPC处于更底层所以效率更高!本文简单地介绍一下两种形式的C/S架构,先说一下他们最本质的区别,就是RPC主要是基于TCP/IP协议的,而HTTP服务主要是基于HTTP协议的,我们都知道HTTP协议是在传输层协议TCP之上的,所以效率来看的话,RPC当然是要更胜
Wesley13 Wesley13
3年前
RPC 定义 和 原理
一、RPC  1.RPC是什么  RPC(RemoteProcedureCallProtocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了
Stella981 Stella981
3年前
Rpc基础 原理 框架
一.RPC的原理1.RPC是什么RPC(RemoteProcedureCallProtocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC
Stella981 Stella981
3年前
Python中的远程过程调用rpc
软硬件环境Pythonhprosexmlrpc什么是RPC远程过程调用(RemoteProcedureCall)是一个计算机通信协议,它允许运行于一台计算机的程序调用另一台计算机的程序,就像调用本地程序一样简单方便。python中rpc的实现x
Stella981 Stella981
3年前
RPC简介与hdfs读过程与写过程简介
1.RPC简介RemoteProcedureCall远程过程调用协议  RPC——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络
API 小达人 API 小达人
1年前
Eolink Apikit「 零代码」快速发起 RPC 接口自动化测试
RPC(RemoteProcedureCall)远程过程调用,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC的核心思想是将远程服务抽象成一个接口,客户端通过调用这个接口,就可以实现对远程服务的访问。EolinkApikit支持多协议,RPC、DUBBO、HTTP、REST、Websocket、gRPC、TCP、UDP、SOAP、HSF等。零代码快速发起RPC接口自动化测试,可以根据RPC接口文档自动生成测试用例,开发者只需简单修改即可使用。
京东云开发者 京东云开发者
6个月前
如何手搓一个自定义的RPC(远程过程调用框架)
1、RPC(远程过程调用概述)远程过程调用(RPC,RemoteProcedureCall)是一种通过网络从远程计算机程序上请求服务,而无需了解网络细节的通信技术。在分布式系统中,RPC是一种常用的技术,能够简化客户端与服务器之间的交互。本文将介绍如何基于