更新時間:2022-03-18 11:29:14 來源:動力節點 瀏覽1786次
1.增加consumer配置
這種方式很簡單,只需要在服務引用時增加配置即可,如下所示,其中name為需要異步調用的方法名,async表示是否啟用異步調用。
<dubbo:reference id="asyncService" check="false" interface="com.alibaba.dubbo.demo.AsyncService" url="localhost:20880">
<dubbo:method name="sayHello" async="true" />
</dubbo:reference>
此時consumer端有3種調用方式:
由于配置了異步調用,因此此時直接調用將返回null:
String result = asyncService.sayHello("world");
通過RpcContext獲取Future對象,調用get方法時阻塞知道返回結果:
asyncService.sayHello("world");
Future<String> future = RpcContext.getContext().getFuture();
String result = future.get();
通過ResponseFuture設置回調,執行完成會回調done方法,拋異常則會回調caught方法:
asyncService.sayHello("world");
ResponseFuture responseFuture = ((FutureAdapter)RpcContext.getContext().getFuture()).getFuture();
responseFuture.setCallback(new ResponseCallback() {
@Override
public void done(Object response) {
System.out.println("done");
}
@Override
public void caught(Throwable exception) {
System.out.println("caught");
}
});
try {
System.out.println("result = " + responseFuture.get());
} catch (RemotingException e) {
e.printStackTrace();
}
如果只想異步調用,不需要返回值,則可以配置 return="false",這樣可以避免Future對象的創建,此時RpcContext.getContext().getFuture()將返回null;
2.直接定義返回CompletableFuture的服務接口
在上述方式中,想獲取異步調用的結果,需要從RpcContext中獲取,使用起來不是很方便。基于java 8中引入的CompletableFuture,dubbo在2.7.0版本中也增加了對CompletableFuture的支持,我們可以直接定義一個返回CompletableFuture類型的接口。
public interface AsyncService {
String sayHello(String name);
CompletableFuture<String> sayHelloAsync(String name);
}
服務端實現如下:
public class AsyncServiceImpl implements AsyncService {
@Override
public String sayHello(String name) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return name;
}
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
return CompletableFuture.supplyAsync(() -> name);
}
}
如此一來,我們就實現了服務端的異步,客戶端直接調用接口即可,不需要再從RpcContext中獲取返回值:
CompletableFuture<String> completableFuture = asyncService.sayHelloAsync("async");
String result = completableFuture.get();
3.事件通知
dubbo允許consumer 端在調用之前、調用之后或出現異常時,觸發 oninvoke、onreturn、onthrow 三個事件。類似于Spring中的前置增強、后置增強和異常拋出增強。只需要在服務引用時,增加以下配置指定事件通知的方法即可:
<dubbo:reference id="asyncService" check="false" interface="com.alibaba.dubbo.demo.AsyncService" url="localhost:20880">
<dubbo:method name="sayHello"
oninvoke="notifyServiceImpl.onInvoke"
onreturn="notifyServiceImpl.onReturn"
onthrow="notifyServiceImpl.onThrow" />
</dubbo:reference>
事件通知服務如下:
public class NotifyServiceImpl implements NotifyService {
// 方法參數與調用方法參數相同
@Override
public void onInvoke(String name) {
System.out.println("onInvoke: " + name);
}
// 第一個參數為調用方法的返回值,其余為調用方法的參數
@Override
public void onReturn(String retName, String name) {
System.out.println("onReturn: " + name);
}
// 第一個參數為調用異常,其余為調用方法的參數
@Override
public void onThrow(Throwable ex, String name) {
System.out.println("onThrow: " + name);
}
}
與Spring增強不同的是,dubbo中的事件通知也可以是異步,只需要將調用方法配置為async="true"即可,但oninvoke方法無法異步執行。
4.異步調用源碼分析
dubbo中的異步調用實際上是通過引入一個FutureFilter來實現的,關鍵源碼如下。
(1)調用前獲取方法信息
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements PostProcessFilter {
protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
fireInvokeCallback(invoker, invocation);
// need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future.
return postProcessResult(invoker.invoke(invocation), invoker, invocation);
}
...
}
在fireInvokeCallback()方法中,會首先調用getAsyncMethodInfo()獲取目標方法的方法信息,看是否有配置事件通知:
private ConsumerMethodModel.AsyncMethodInfo getAsyncMethodInfo(Invoker<?> invoker, Invocation invocation) {
// 首先獲取消費者信息
final ConsumerModel consumerModel = ApplicationModel.getConsumerModel(invoker.getUrl().getServiceKey());
if (consumerModel == null) {
return null;
}
// 獲取消費者對應的方法信息
ConsumerMethodModel methodModel = consumerModel.getMethodModel(invocation.getMethodName());
if (methodModel == null) {
return null;
}
// 獲取消費者對應方法的事件信息,即是否有配置事件通知
final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = methodModel.getAsyncInfo();
if (asyncMethodInfo == null) {
return null;
}
return asyncMethodInfo;
}
(2)同步觸發oninvoke事件
獲取到調用方法對應的信息后,回到fireInvokeCallback()方法:
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
if (asyncMethodInfo == null) {
return;
}
// 獲取事件配置信息
final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();
final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
// 獲取方法參數
Object[] params = invocation.getArguments();
try {
// 觸發oninvoke事件
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
// 觸發onthrow事件
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
(3)調用結果處理
方法調用完成后,會回到postProcessResult()方法:
@Override
public Result postProcessResult(Result result, Invoker<?> invoker, Invocation invocation) {
// 如果是異步調用,返回結果會被封裝成AsyncRpcResult類型的對象,具體在哪里封裝的,后面會講到
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> {
asyncCallback(invoker, invocation, r);
return r;
});
return asyncResult;
} else {
syncCallback(invoker, invocation, result);
return result;
}
}
syncCallback和asyncCallback里面的邏輯比較簡單,就是根據方法是正常返回還是拋異常,觸發對應的事件。可以看到,如果被調用方法是同步的,則這兩個事件也是同步的,反之亦然。
(4)方法調用核心過程
在postProcessResult()方法中,第一個參數是invoker.invoke(invocation),這里就會走到下一個Filter鏈完成filter鏈的處理,最終調到原始服務,走到DubboInvoker#doInvoke方法:
protected Result doInvoke(final Invocation invocation) throws Throwable { ...
try {
// 讀取async配置
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 讀取future_generated/future_returntype配置,還沒搞明白是干啥的
boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
// 讀取return配置
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 如果配置return="true",future對象就直接設置為null
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 如果配置async="true",構建future對象
ResponseFuture future = currentClient.request(inv, timeout);
// For compatibility
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
// 同時將返回結果包裝為AsyncResult對象
Result result;
if (isAsyncFuture) {
// register resultCallback, sometimes we need the asyn result being processed by the filter chain.
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
} else {
// 否則就是同步調用,future當然也是null
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
...
}
通過這個過程不難發現,不管是同步調用還是異步調用,最終都會走到ExchangeClient#send方法,再往下會走到HeaderExchangeChannel#request方法,這個一個異步方法,返回ResponseFuture對象。
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
dubbo中同步調用也是通過異步調用來實現,只是同步調用發起后,直接調用future#get的方法來同步等待結果的返回,而異步調用只返回Future Response,在用戶需要關心其結果時才調用get方法。如果大家想了解更多相關知識,可以關注一下動力節點的Java在線學習,里面的課程內容從入門到精通,講的很細致,適合沒有基礎的小伙伴學習,希望對大家能夠有所幫助。
0基礎 0學費 15天面授
有基礎 直達就業
業余時間 高薪轉行
工作1~3年,加薪神器
工作3~5年,晉升架構
提交申請后,顧問老師會電話與您溝通安排學習