RPC(Remote Procedure Call)—远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
原理
1. Client端获取一个 RPC 代理对象 proxy
2. 调用 proxy 上的方法, 被 InvocationHandler 实现类 Invoker 的 invoke() 方法捕获
3. invoke() 方法内将 RPC 请求封装成 Invocation 实例, 再向 Server 发送 RPC请求
4. Server端循环接收 RPC请求, 对每一个请求都创建一个 Handler线程处理
5. Handler线程从输入流中反序列化出 Invocation实例, 再调用 Server端的实现方法
6. 调用结束, 向 Client端返回调用结果
一. Invoker 类
InvocationHandler 的实现类
/**
* InvocationHandler 接口的实现类 <br>
* Client端代理对象的方法调用都会被 Invoker 的 invoke() 方法捕获
*/
public class Invoker implements InvocationHandler {
/** RPC协议接口的 Class对象 */
private Class<?> intface;
/** Client 端 Socket */
private Socket client;
/** 用于向 Server端发送 RPC请求的输出流 */
private ObjectOutputStream oos;
/** 用于接收 Server端返回的 RPC请求结果的输入流 */
private ObjectInputStream ois;
/**
* 构造一个 Socket实例 client, 并连接到指定的 Server端地址, 端口
*
* @param intface
* RPC协议接口的 Class对象
* @param serverAdd
* Server端地址
* @param serverPort
* Server端监听的端口
*/
public Invoker(Class<?> intface, String serverAdd, int serverPort) throws UnknownHostException, IOException {
this.intface = intface;
client = new Socket(serverAdd, serverPort);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
// 封装 RPC请求
Invocation invocation = new Invocation(intface, method.getName(), method.getParameterTypes(), args);
// 打开 client 的输出流
oos = new ObjectOutputStream(client.getOutputStream());
// 序列化, 将 RPC请求写入到 client 的输出流中
oos.writeObject(invocation);
oos.flush();
// 等待 Server端返回 RPC请求结果 //
// 打开 client 的输入流
ois = new ObjectInputStream(client.getInputStream());
// 反序列化, 从输入流中读取 RPC请求结果
Object res = ois.readObject();
// 向 client 返回 RPC请求结果
return res;
} finally { // 关闭资源
CloseUtil.closeAll(ois, oos);
CloseUtil.closeAll(client);
}
}
}
二. Invocation 类
Serializable 的实现类, RPC请求的封装
/**
* RPC调用的封装, 包括以下字段: <br>
* methodName: 方法名 <br>
* parameterTypes: 方法参数列表的 Class 对象数组 <br>
* params: 方法参数列表
*/
@SuppressWarnings("rawtypes")
public class Invocation implements Serializable {
private static final long serialVersionUID = -7311316339835834851L;
/** RPC协议接口的 Class对象 */
private Class<?> intface;
/** 方法名 */
private String methodName;
/** 方法参数列表的 Class 对象数组 */
private Class[] parameterTypes;
/** 方法的参数列表 */
private Object[] params;
public Invocation() {
}
/**
* 构造一个 RPC请求的封装
*
* @param intface
* RPC协议接口的 Class对象
* @param methodName
* 方法名
* @param parameterTypes
* 方法参数列表的 Class 对象数组
* @param params
* 方法的参数列表
*/
public Invocation(Class intface, String methodName, Class[] parameterTypes, Object[] params) {
this.intface = intface;
this.methodName = methodName;
this.parameterTypes = parameterTypes;
this.params = params;
}
public Class getIntface() {
return intface;
}
public String getMethodName() {
return methodName;
}
public Class[] getParameterTypes() {
return parameterTypes;
}
public Object[] getParams() {
return params;
}
}
三.RPC 类
构造 Client端代理对象, Server端实例
/**
* 一个构造 Server 端实例与 Client 端代理对象的类
*/
public class RPC {
/**
* 获取一个 Client 端的代理对象
*
* @param intface
* RPC协议接口, Client 与 Server 端共同遵守
* @param serverAdd
* Server 端地址
* @param serverPort
* Server 端监听的端口
* @return Client 端的代理对象
*/
public static <T> Object getProxy(final Class<T> intface, String serverAdd, int serverPort)
throws UnknownHostException, IOException {
Object proxy = Proxy.newProxyInstance(intface.getClassLoader(), new Class[] { intface },
new Invoker(intface, serverAdd, serverPort));
return proxy;
}
/**
* 获取 RPC 的 Server 端实例
*
* @param intface
* RPC协议接口
* @param intfaceImpl
* Server 端 RPC协议接口的实现
* @param port
* Server 端监听的端口
* @return RPCServer 实例
*/
public static <T> RPCServer getRPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
return new RPCServer(intface, intfaceImpl, port);
}
}
四. RPCServer 类
Server端接收 RPC请求, 处理请求
/**
* RPC 的 Server端
*/
public class RPCServer {
/** Server端的 ServerSocket实例 */
private ServerSocket server;
/** Server端 RPC协议接口的实现缓存, 一个接口对应一个实现类的实例 */
private static Map<Class<?>, Object> intfaceImpls = new HashMap<Class<?>, Object>();
/**
* 构造一个 RPC 的 Server端实例
*
* @param intface
* RPC协议接口的 Class对象
* @param intfaceImpl
* Server端 RPC协议接口的实现
* @param port
* Server端监听的端口
*/
public <T> RPCServer(Class<T> intface, T intfaceImpl, int port) throws IOException {
server = new ServerSocket(port);
RPCServer.intfaceImpls.put(intface, intfaceImpl);
}
/**
* 循环监听并接收 Client端连接, 处理 RPC请求, 向 Client端返回结果
*/
public void start() {
try {
while (true) {
// 接收 Client端连接, 创建一个 Handler线程, 处理 RPC请求
new Handler(server.accept()).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally { // 关闭资源
CloseUtil.closeAll(server);
}
}
/**
* 向 RPC协议接口的实现缓存中添加缓存
*
* @param intface
* RPC协议接口的 Class对象
* @param intfaceImpl
* Server端 RPC协议接口的实现
*/
public static <T> void addIntfaceImpl(Class<T> intface, T intfaceImpl) {
RPCServer.intfaceImpls.put(intface, intfaceImpl);
}
/**
* 处理 RPC请求的线程类
*/
private static class Handler extends Thread {
/** Server端接收到的 Client端连接 */
private Socket client;
/** 用于接收 client 的 RPC请求的输入流 */
private ObjectInputStream ois;
/** 用于向 client 返回 RPC请求结果的输出流 */
private ObjectOutputStream oos;
/** RPC请求的封装 */
private Invocation invocation;
/**
* 用 Client端连接构造 Handler线程
*
* @param client
*/
public Handler(Socket client) {
this.client = client;
}
@Override
public void run() {
try {
// 打开 client 的输入流
ois = new ObjectInputStream(client.getInputStream());
// 反序列化, 从输入流中读取 RPC请求的封装
invocation = (Invocation) ois.readObject();
// 从 RPC协议接口的实现缓存中获取实现
Object intfaceImpl = intfaceImpls.get(invocation.getIntface());
// 获取 Server端 RPC协议接口的方法实现
Method method = intfaceImpl.getClass().getMethod(invocation.getMethodName(),
invocation.getParameterTypes());
// 跳过安全检查
method.setAccessible(true);
// 调用具体的实现方法, 用 res 接收方法返回结果
Object res = method.invoke(intfaceImpl, invocation.getParams());
// 打开 client 的输出流
oos = new ObjectOutputStream(client.getOutputStream());
// 序列化, 向输出流中写入 RPC请求的结果
oos.writeObject(res);
oos.flush();
} catch (Exception e) {
e.printStackTrace();
} finally { // 关闭资源
CloseUtil.closeAll(ois, oos);
CloseUtil.closeAll(client);
}
}
}
}
五. 测试类
Login类, RPC协议接口
/**
* RPC协议接口, Client 与 Server端共同遵守
*/
public interface Login {
/**
* 抽象方法 login(), 模拟用户登录传入两个String 类型的参数, 返回 String类型的结果
*
* @param username
* 用户名
* @param password
* 密码
* @return 返回登录结果
*/
public String login(String username, String password);
}
LoginImpl类, Server 端 RPC协议接口( Login )的实现类
/**
* Server端 RPC协议接口( Login )的实现类
*/
public class LoginImpl implements Login {
/**
* 实现 login()方法, 模拟用户登录
*
* @param username
* 用户名
* @param password
* 密码
* @return hello 用户名
*/
@Override
public String login(String username, String password) {
return "hello " + username;
}
}
ClientTest类, Client端测试类
/**
* Client端测试类
*/
public class ClientTest {
public static void main(String[] args) throws UnknownHostException, IOException {
// 获取一个 Client端的代理对象 proxy
Login proxy = (Login) RPC.getProxy(Login.class, "192.168.8.1", 8888);
// 调用 proxy 的 login() 方法, 返回值为 res
String res = proxy.login("rpc", "password");
// 输出 res
System.out.println(res);
}
}
ServerTest类, Server端测试类
/**
* Server端测试类
*/
public class ServerTest {
public static void main(String[] args) throws ClassNotFoundException, IOException {
// 获取 RPC 的 Server 端实例 server
RPCServer server = RPC.getRPCServer(Login.class, new LoginImpl(), 8888);
// 循环监听并接收 Client 端连接, 处理 RPC 请求, 向 Client 端返回结果
server.start();
}
}
运行 ServerTest, 控制台输出:
Starting Socket Handler for port 8888
运行 ClientTest, 控制台输出:
hello rpc
至此, 实现了基于 Proxy, Socket, IO 的简单版 RPC模型,
对于每一个 RPC请求, Server端都开启一个 Handler线程处理该请求,
在高并发情况下, Server端是扛不住的, 改用 NIO应该表现更好