Elasticsearch自定义Future的实现源码分析
1、Future自定义实现类
实现类有3个,PlainActionFuture 、PlainTransportFuture、PlainListenableActionFuture。
类关系如图:
2、Future实现原理分析
核心实现为BaseFuture
Sync内部定义了四个常量状态:
static final int RUNNING = 0;
static final int COMPLETING = 1;
static final int COMPLETED = 2;
static final int CANCELLED = 4;
Sync重写了tryAcquireShared(int arg)、tryReleaseShared(int arg)两个方法。
获取锁:
protected int tryAcquireShared(int ignored) {
if (isDone()) {
return 1;
}
return -1;
}
释放锁:
protected boolean tryReleaseShared(int finalState) {
setState(finalState);
return true;
}
Future实例化时,默认线程阻塞。
解除阻塞:
①调用BaseFuture#set方法
protected boolean set(@Nullable V value) {
boolean result = sync.set(value);
if (result) {
done();
}
return result;
}
protected boolean setException(Throwable throwable) { boolean result = sync.setException(Objects.requireNonNull(throwable)); if (result) { done(); } return result; }
②Sync内部set方法:
boolean set(@Nullable V v) {
return complete(v, null, COMPLETED);
}
boolean setException(Throwable t) {
return complete(null, t, COMPLETED);
}
③Sync内部complete方法:
private boolean complete(@Nullable V v, @Nullable Throwable t, int finalState) {
boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
if (doCompletion) {
this.value = v;
this.exception = t; releaseShared(finalState); } else if (getState() == COMPLETING) { acquireShared(-1); } return doCompletion; }
最终通过调用队列同步器提供的模板方法compareAndSetState(int expect, int update),将state由0设为1。
3、PlainActionFuture实例分析
下面Gateway调用为例进行分析:
获取future,线程阻塞
PlainActionFuture#newFuture(),静态方法返回一个新的实例对象
public static
PlainActionFuture newFuture() { return new PlainActionFuture<>(); } TransportAction#execute(request), execute(request, future)异步执行,返回future
public final ActionFuture
execute(Request request) { PlainActionFuture future = newFuture(); execute(request, future); return future; } Gateway#performStateRecovery(),future.actionGet()方法阻塞
public void performStateRecovery(final GatewayStateRecoveredListener listener) { ActionFuture future = execute(new Request(nodesIds)); NodesGatewayMetaState nodesState = future.actionGet(); ... }
函数回调,取消线程阻塞
4) BaseFuture
protected boolean set(V value) {
boolean result = sync.set(value);
if (result) {
done();
}
return result;
}
AdapterActionFuture#onResponse(result),回调返回响应
public void onResponse(L result) { set(result); }
TransportAction#execute(request, listener) ,调用回调函数
public final Task execute(Request request, ActionListener
listener) { Task task = taskManager.register("transport", actionName, request); if (task == null) { execute(null, request, listener); } else { execute(task, request, new ActionListener () { @Override public void onResponse(Response response) { taskManager.unregister(task); listener.onResponse(response); } @Override public void onFailure(Exception e) { taskManager.unregister(task); listener.onFailure(e); } }); } return task; }