概述
Dubbo能够像调用本地服务一样调用远程服务,是依赖于Dubbo的代理机制。业务系统调用的服务方法,使用代理类,代理类里隐藏了远程通信的功能。代理对象会代理到InvokerInvocationHandler上,再调用它属性Invoker#invoke()方法。这个Invoker是服务引用的过程中由Protocol创建的。比如DubboProtocol创建的就是DubboInvoker。
Invocation
Invocation对象表示一次具体的方法调用,封装了方法名和方法参数等状态信息。
public interface Invocation {
/**
* get method name.
*
* @return method name.
* @serial
*/
String getMethodName();
/**
* get parameter types.
*
* @return parameter types.
* @serial
*/
Class<?>[] getParameterTypes();
/**
* get arguments.
*
* @return arguments.
* @serial
*/
Object[] getArguments();
/**
* get attachments.
*
* @return attachments.
* @serial
*/
Map<String, String> getAttachments();
/**
* get attachment by key.
*
* @return attachment value.
* @serial
*/
String getAttachment(String key);
/**
* get attachment by key with default value.
*
* @return attachment value.
* @serial
*/
String getAttachment(String key, String defaultValue);
/**
* get the invoker in current context.
*
* @return invoker.
* @transient
*/
Invoker<?> getInvoker();
}
DubboProtocol
Protocol扩展默认的实现,Dubbo框架默认的通信方式,支持的协议是:dubbo://
客户端发送请求
DubboInvoker使用Client对象,将Invocation当做消息发送。Client对象里封装了真正的rpc实现,比如netty的channel,由它真正发送消息。
客户端调用方法
客户端调用提供方的方法,Dubbo框架会使用动态代理,最终会调用DubboInvoker#doInvoke()。
DubboInvoker
/**
* 覆盖AbstractInvoker#doInvoke()
* @param invocation
* @return
* @throws Throwable
*/
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 是否异步
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 是否需要返回
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 默认超时时间,1000ms
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 不需要返回数据,使用send()发送消息
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
// isSent 设置是否检查,默认false
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 异步调用
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
// 同步调用,而且需要返回数据,之后的返回会放在ResponseFuture中
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
客户端发送请求分为三种情况:
- 同步调用:ExchangeClient#request(),ResponseFuture#get(),同步阻塞直到服务端响应结果;
- 异步调用:ExchangeClient#request(),将ResponseFuture设置到RpcContext中;
- OneWay:ExchangeClient#send();
1、HeaderExchangeChannel
将调用者的消息,封装成Request对象,然后委托给属性channel发送
/**
* 发送请求,需要响应,会将对象封装成Request然后发送
* @param request
* @param timeout
* @return
* @throws RemotingException
*/
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true); // 需要响应
req.setData(request);
// 创建future对象,之后server的返回会放在这个对象里
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 最终都是调用send方法
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
/**
* 仅仅发送消息
*
* @param message
* @param sent 是否已发送完成,是否检查,就是检查是否发送成功,不成功报错 already sent to socket?
* @throws RemotingException
*/
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
// 将message对象封装成Request对象,符合Request模型
Request request = new Request();
request.setVersion("2.0.0");
// 不需要响应
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}
request和send方法的区别有两点:
- 对于Request对象的属性**mTwoWay**设置,前者是true后者是false;
- 前者返回DefaultFuture等待服务端的结果返回,后者则不需要;
2、AbstractPeer
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
3、AbstractClient
/**
* 发送消息,仅仅是发送消息,没有返回
*
* @param message
* @param sent 是否已发送完成,是否检查,就是检查是否发送成功,不成功报错 already sent to socket?
* @throws RemotingException
*/
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
// 由子类实现,或者真正的Channel
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
// 交给真正的Channel执行
channel.send(message, sent);
}
4、NettyClient+NettyChannel
Client对象最终是调用Channel对象实现相应的逻辑
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
// 这个channel是netty框架的channel
Channel c = channel;
if (c == null || !c.isConnected())
return null;
// netty框架的channel转换为dubbo的channel
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
/**
* 根据netty的Channel,返回Dubbo的Channel,一一对应
* @param ch
* @param url
* @param handler
* @return
*/
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
NettyChannel nc = new NettyChannel(ch, url, handler);
if (ch.isConnected()) {
ret = channelMap.putIfAbsent(ch, nc);
}
if (ret == null) {
ret = nc;
}
}
return ret;
}
/**
* 发送信息,真正的发送信息
* @param message
* @param sent 是否检查,就是检查是否发送成功,不成功报错
* @throws RemotingException
*/
public void send(Object message, boolean sent) throws RemotingException {
// 检查channel是否关闭,父类做了检测
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// netty写入消息
ChannelFuture future = channel.write(message);
if (sent) {
// 是否发送成功的检测
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this,
"Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e
.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message "
+ message
+ " to "
+ getRemoteAddress()
+ "in timeout("
+ timeout
+ "ms) limit");
}
}
5、Netty的ChannelHandler链
Dubbo在创建NettyClient对象时,向Netty的ChannelPipeline中添加了三个ChannelHandler。Netty发送消息,会执行其中的两个ChannelHandler,分别是:
- NettyCodecAdapter$InternalEncoder
- NettyHandler
5.1、NettyHandler
执行发送,处理已发送事件
/**
* 消息发送事件触发
* @param ctx
* @param e
* @throws Exception
*/
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// 调用父类方法,发送消息
super.writeRequested(ctx, e);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
// ChannelHandler处理已发送事件
handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
5.2、NettyCodecAdapter$InternalEncoder
将消息(Request对象)编码,编码写入Dubbo的ChannelBuffer中,然后将Dubbo的ChannelBuffer封装成Netty的ChannelBuffer并返回,这样才能发送。
private class InternalEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
// 根据Dubbo SPI,codec是DubboCountCodec
// 将msg对象,编码到buffer中
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
// 最终将Dubbo的ChannelBuffer封装成Netty的ChannelBuffer
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
}
服务端接收请求
服务端启动了NettyServer,从而等待客户端的请求。对于客户端的同步调用、同步调用(不需要响应)、异步调用在流程上可以合并一起分析。
1、Netty的ChannelHandler链
Dubbo在创建NettyServer对象时,向Netty的ChannelPipeline中添加了三个ChannelHandler。Netty接收消息时,会执行其中的两个ChannelHandler,分别是:
- NettyCodecAdapter$InternalDecoder
- NettyHandler
1.1、NettyCodecAdapter$InternalDecoder
将Netty的ChannelBuffer装好成Dubbo的ChannelBuffer;
从ChannelBuffer中解码出Request对象;
交给下一个ChannelHandler处理;
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { Object o = event.getMessage(); if (!(o instanceof ChannelBuffer)) { ctx.sendUpstream(event); return; } ChannelBuffer input = (ChannelBuffer) o; int readable = input.readableBytes(); if (readable <= 0) { return; } // Dubbo的ChannelBuffer com.alibaba.dubbo.remoting.buffer.ChannelBuffer message; if (buffer.readable()) { if (buffer instanceof DynamicChannelBuffer) { buffer.writeBytes(input.toByteBuffer()); message = buffer; } else { int size = buffer.readableBytes() + input.readableBytes(); message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer( size > bufferSize ? size : bufferSize); message.writeBytes(buffer, buffer.readableBytes()); message.writeBytes(input.toByteBuffer()); } } else { message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer( input.toByteBuffer()); } NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { saveReaderIndex = message.readerIndex(); try { // 从Dubbo的ChannelBuffer中解码出Request或者Response对象 msg = codec.decode(channel, message); } catch (IOException e) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw e; } if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { if (saveReaderIndex == message.readerIndex()) { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; throw new IOException("Decode without read data."); } if (msg != null) { // 触发下一个ChannelHandler Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress()); } } } while (message.readable()); } finally { if (message.readable()) { message.discardReadBytes(); buffer = message; } else { buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER; } NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }
1.2、NettyHandler
它会交给Dubbo的ChannelHandler去处理业务逻辑。
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
// 交给Dubbo的ChannelHandler处理,
// 这个handler在创建NettyServer或者NettyClient时传入
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
2、NettyServer中ChannelHandler的处理逻辑
NettyServer本身也是一个ChannelHandler对象,里面处理的逻辑都是委托给属性handler去处理的。我们队它的调用层次简要说明下:
NettyServer > MultiMessageHandler > HeartbeatHandler > WrappedChannelHandler(Dispatcher#dispatch(ChannelHandler) > DecodeHandler > HeaderExchangeHandler > 真正的ChannelHandler实现
每个ChannelHandler都增加一些额外的功能,然后在委托给下一个ChannelHandler去处理,直到最后一个。具体每个ChannelHandler的详细说明,点击这里。
3、HeaderExchangeHandler
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
// Channel转换为ExchangeChannel
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
// 如果需要响应,就将请求结果,再用channel发送
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// String类型是Dubbo的telnet命令
if (isClientSide(channel)) {
// 只能服务端处理
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
// 交给TelnetHandler处理,并且将结果发送给客户端
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
/**
* 处理请求模型,Request对象的逻辑
* @param channel
* @param req
* @return
* @throws RemotingException
*/
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
// 构建Response对象,作为返回
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
// 如果有异常
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
// ExchangeHandler处理请求,返回结果
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
4、DubboProtocol$requestHandler
将Invocation对象,交给Invoker对象处理,Invoker对象里代理了真正的服务实现。
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// 如果是callback,需要处理高版本低版本的兼容问题
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
// 根据key,从exporterMap中获取,对应的exporter对象,里面包含了Invoker,
// 这个Invoker对象,代理真正的服务实现
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
return exporter.getInvoker();
}
客户端接收请求
客户端的同步调用,会等待服务端发送的结果。客户端处理服务端发送的消息,和服务端处理客户端发送的消息逻辑是一样的。
1、Netty的ChannelHandler链
Dubbo在创建NettyClient对象时,向Netty的ChannelPipeline中添加了三个ChannelHandler。NettyClient接收消息时,会执行其中的两个ChannelHandler,分别是:
- NettyCodecAdapter$InternalDecoder
- NettyHandler
1.1、NettyCodecAdapter$InternalDecoder
- 将Netty的ChannelBuffer装好成Dubbo的ChannelBuffer;
- 从ChannelBuffer中解码出Response对象;
- 交给下一个ChannelHandler处理;
1.2、NettyHandler
参照服务端的分析。
2、HeaderExchangeHandler
/**
* 处理响应模型,Response对象的逻辑
* @param channel
* @param response
* @throws RemotingException
*/
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
// 不是心跳检测
// 将结果交给DefaultFuture,DefaultFuture是ResponseFuture的子类
// 客户端从它获取服务端执行的结果
DefaultFuture.received(channel, response);
}
}
3、DefaultFuture
客户端同步调用和异步调用都会使用DefaultFuture。
public static void received(Channel channel, Response response) {
try {
// 根据id,获取future对象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
// 设置Response对象,并且唤醒
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
从DefaultFuture中获取服务端响应的结果,使用get()方法。
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 等待唤醒
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
/**
* 从Response中返回结果
* @return
* @throws RemotingException
*/
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult();
}
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}