ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

从源码深入了解 Dio 的 CancelToken

2021-12-17 21:03:45  阅读:188  来源: 互联网

标签:Dio return 请求 源码 CancelToken interceptor future requestOptions cancelToken


上一篇讲了 Dio 的 CancelToken 的使用,本篇来从源码解析 CancelToken 是如何实现取消网络请求的。相关的内容如下:

  • CancelToken 类的实现
  • CancelToken 如何取消网络请求

CancelToken 类

CalcelToken类的代码并不多,我们直接复制下来一个个过一遍。

import 'dart:async';
import 'dio_error.dart';
import 'options.dart';

/// You can cancel a request by using a cancel token.
/// One token can be shared with different requests.
/// when a token's [cancel] method invoked, all requests
/// with this token will be cancelled.
class CancelToken {
  CancelToken() {
    _completer = Completer<DioError>();
  }

  /// Whether is throw by [cancel]
  static bool isCancel(DioError e) {
    return e.type == DioErrorType.cancel;
  }

  /// If request have been canceled, save the cancel Error.
  DioError? _cancelError;

  /// If request have been canceled, save the cancel Error.
  DioError? get cancelError => _cancelError;

  late Completer<DioError> _completer;

  RequestOptions? requestOptions;

  /// whether cancelled
  bool get isCancelled => _cancelError != null;

  /// When cancelled, this future will be resolved.
  Future<DioError> get whenCancel => _completer.future;

  /// Cancel the request
  void cancel([dynamic reason]) {
    _cancelError = DioError(
      type: DioErrorType.cancel,
      error: reason,
      requestOptions: requestOptions ?? RequestOptions(path: ''),
    );
    _cancelError!.stackTrace = StackTrace.current;
    _completer.complete(_cancelError);
  }
}

首先看注释,我们可以了解到 CancelToken 的一个非常有用的地方,一个 CancelToken 可以和多个请求关联,取消时可以同时取消多个关联的请求。这对于我们一个页面有多个请求时非常有用。大部分的是一些属性:

  • _cancelError:被取消后存储的取消错误信息,对外可以通过 get 方式可以获取。
  • _completer:一个Completer<DioError>对象,Completer 本是一个抽象类,用于管理异步操作事件。构建时返回的是一个 Future 对象,可以调用对应的 complete(对应正常完成) 或completeError(对应错误处理)。该属性为私有属性,外部不可访问。
  • requestOptionsRequestOptions对象,是请求的一些可选属性(比如 headers,请求参数,请求方式等等),可以为空。该属性是公共属性,说明可以在外部修改。
  • isCancelled:布尔值,用于标识是否取消,实际是通过 _cancelError是否为空判断的,如果不为空说明是被取消了。
  • whenCancel:实际就是_completerfuture 对象,可以用来处理操作的响应,这样也相当于对_completer做了一个封装,只暴露了其 future 对象。
  • cancel:取消方法,也就是核心方法了,这个方法构建了一个 DioError 对象(用于存储取消的错误),这里如果调用时传了 reason 对象,也会将 reason 传递到 error 参数中,然后就是 requestOptions 参数,如果 requestOptions 为空则构建一个空的RequestOptions对象。同时还会将当前的堆栈信息存入到_cancelError 的 stackTrace 中,方便跟踪堆栈信息。最后是调用_completer.complete异步方法。这个是关键方法,我们看一下这个方法做了什么事情。

Completer类

我们进入 Completer 类来看一下 complete方法做了什么事情:

/// All listeners on the future are informed about the value.
void complete([FutureOr<T>? value]);

可以看到这个方法是一个抽象方法,意味着应该是由 Completer 的具体实现类来实现的。同时从注释可以看到,这个方法是会调用监听器来告知 complete 方法的 value 泛型对象。可以理解为是通知观察者处理该对象。那我们就可以猜测是在请求的时候,如果有 cancelToken 参数时,应该是给 cancelToken 增加了一个监听器。继续来看 Dio 的请求代码的实现。

Dio 的请求代码

到 Dio 的源码 dio.dart看的时候,发现全部请求其实是fetch<T>(RequestOptionsrequestOptions)的别名,也就是实际全部的请求都是通过该方法完成的。我们看一下这个方法的源码。代码很长,如果有兴趣的可以仔细阅读一下,我们这里只找出与 cancelToken 相关的代码。

@override
Future<Response<T>> fetch<T>(RequestOptions requestOptions) async {
  if (requestOptions.cancelToken != null) {
    requestOptions.cancelToken!.requestOptions = requestOptions;
  }

  if (T != dynamic &&
      !(requestOptions.responseType == ResponseType.bytes ||
          requestOptions.responseType == ResponseType.stream)) {
    if (T == String) {
      requestOptions.responseType = ResponseType.plain;
    } else {
      requestOptions.responseType = ResponseType.json;
    }
  }

  // Convert the request interceptor to a functional callback in which
  // we can handle the return value of interceptor callback.
  FutureOr Function(dynamic) _requestInterceptorWrapper(
    void Function(
      RequestOptions options,
      RequestInterceptorHandler handler,
    )
        interceptor,
  ) {
    return (dynamic _state) async {
      var state = _state as InterceptorState;
      if (state.type == InterceptorResultType.next) {
        return listenCancelForAsyncTask(
          requestOptions.cancelToken,
          Future(() {
            return checkIfNeedEnqueue(interceptors.requestLock, () {
              var requestHandler = RequestInterceptorHandler();
              interceptor(state.data, requestHandler);
              return requestHandler.future;
            });
          }),
        );
      } else {
        return state;
      }
    };
  }

  // Convert the response interceptor to a functional callback in which
  // we can handle the return value of interceptor callback.
  FutureOr<dynamic> Function(dynamic) _responseInterceptorWrapper(
      interceptor) {
    return (_state) async {
      var state = _state as InterceptorState;
      if (state.type == InterceptorResultType.next ||
          state.type == InterceptorResultType.resolveCallFollowing) {
        return listenCancelForAsyncTask(
          requestOptions.cancelToken,
          Future(() {
            return checkIfNeedEnqueue(interceptors.responseLock, () {
              var responseHandler = ResponseInterceptorHandler();
              interceptor(state.data, responseHandler);
              return responseHandler.future;
            });
          }),
        );
      } else {
        return state;
      }
    };
  }

  // Convert the error interceptor to a functional callback in which
  // we can handle the return value of interceptor callback.
  FutureOr<dynamic> Function(dynamic, StackTrace stackTrace)
      _errorInterceptorWrapper(interceptor) {
    return (err, stackTrace) {
      if (err is! InterceptorState) {
        err = InterceptorState(assureDioError(
          err,
          requestOptions,
          stackTrace,
        ));
      }

      if (err.type == InterceptorResultType.next ||
          err.type == InterceptorResultType.rejectCallFollowing) {
        return listenCancelForAsyncTask(
          requestOptions.cancelToken,
          Future(() {
            return checkIfNeedEnqueue(interceptors.errorLock, () {
              var errorHandler = ErrorInterceptorHandler();
              interceptor(err.data, errorHandler);
              return errorHandler.future;
            });
          }),
        );
      } else {
        throw err;
      }
    };
  }

  // Build a request flow in which the processors(interceptors)
  // execute in FIFO order.

  // Start the request flow
  var future = Future<dynamic>(() => InterceptorState(requestOptions));

  // Add request interceptors to request flow
  interceptors.forEach((Interceptor interceptor) {
    future = future.then(_requestInterceptorWrapper(interceptor.onRequest));
  });

  // Add dispatching callback to request flow
  future = future.then(_requestInterceptorWrapper((
    RequestOptions reqOpt,
    RequestInterceptorHandler handler,
  ) {
    requestOptions = reqOpt;
    _dispatchRequest(reqOpt).then(
      (value) => handler.resolve(value, true),
      one rror: (e) {
        handler.reject(e, true);
      },
    );
  }));

  // Add response interceptors to request flow
  interceptors.forEach((Interceptor interceptor) {
    future = future.then(_responseInterceptorWrapper(interceptor.onResponse));
  });

  // Add error handlers to request flow
  interceptors.forEach((Interceptor interceptor) {
    future = future.catchError(_errorInterceptorWrapper(interceptor.onError));
  });

  // Normalize errors, we convert error to the DioError
  return future.then<Response<T>>((data) {
    return assureResponse<T>(
      data is InterceptorState ? data.data : data,
      requestOptions,
    );
  }).catchError((err, stackTrace) {
    var isState = err is InterceptorState;

    if (isState) {
      if ((err as InterceptorState).type == InterceptorResultType.resolve) {
        return assureResponse<T>(err.data, requestOptions);
      }
    }

    throw assureDioError(
      isState ? err.data : err,
      requestOptions,
      stackTrace,
    );
  });
}

首先在一开始就检查了当前请求 requestOptionscancelToken 是不是为空,如果不为空,就设置 cancelTokenrequestOptions为当前请求的requestOptions,相当于在 cancelToken 缓存了所有的请求参数。

接下来是拦截器的处理,包括了请求拦截器,响应拦截器和错误拦截器。分别定义了一个内置的拦截器包装方法,用于将拦截器封装为函数式回调,以便进行统一的拦截处理。这个我们跳过,关键是每个拦截器的包装方法都有一个listenCancelForAsyncTask方法,在拦截器状态是 next(说明还有拦截要处理)的时候,会调用该方法并返回其返回值。这个方法第一个参数就是 cancelToken。从方法名看就是监听异步任务的取消事件,看看这个方法做了什么事情。

异步任务取消事件监听

listenCancelForAsyncTask方法很简单,其实就是返回了一个 Future.any 对象,然后在这个 Future里,如果 cancelToken 不为空的话,在响应 cancelToken 的取消事件时执行后续的处理。Future.any 的特性是将一系列的异步函数按统一的接口组装起来,按次序执行(上一个拦截器的处理完后轮到下一个拦截器执行),以执行 onValue (正常情况)和onError(异常情况) 方法。

这里如果 cancelToken 不为空,就会把cancelToken的取消事件方法放到拦截器中,然后出现异常的时候会将异常抛出。这其实相当于是前置拦截,就是说如果请求还没处理(未加入处理队列)的时候,直接使用拦截器拦截。而如果请求已经加入到了处理队列,就需要在队列调度中处理了。

static Future<T> listenCancelForAsyncTask<T>(
      CancelToken? cancelToken, Future<T> future) {
  return Future.any([
    if (cancelToken != null) cancelToken.whenCancel.then((e) => throw e),
    future,
  ]);
}
/// Returns the result of the first future in [futures] to complete.
///
/// The returned future is completed with the result of the first
/// future in [futures] to report that it is complete,
/// whether it's with a value or an error.
/// The results of all the other futures are discarded.
///
/// If [futures] is empty, or if none of its futures complete,
/// the returned future never completes.
static Future<T> any<T>(Iterable<Future<T>> futures) {
  var completer = new Completer<T>.sync();
  void onValue(T value) {
    if (!completer.isCompleted) completer.complete(value);
  }

  void one rror(Object error, StackTrace stack) {
    if (!completer.isCompleted) completer.completeError(error, stack);
  }

  for (var future in futures) {
    future.then(onValue, one rror: one rror);
  }
  return completer.future;
}

请求调度

实际的请求调度是在 dio_mixin.dart 中的_dispatchRequest方法完成的,该方法实际在上面的 fetch 方法中调用。这个方法有两个地方用到了 cancelToken,一个是使用 httpClientAdapterfetch方法时传入了 cancelTokenwhenCancel 属性。在 httpClientAdapter引用是为了在取消请求后,能够回调告知监听器请求被取消。另外就是调用了一个 checkCancelled 方法,用于检查是否要停止请求。

responseBody = await httpClientAdapter.fetch(
  reqOpt,
  stream,
  cancelToken?.whenCancel,
);

// If the request has been cancelled, stop request and throw error.
static void checkCancelled(CancelToken? cancelToken) {
  if (cancelToken != null && cancelToken.cancelError != null) {
    throw cancelToken.cancelError!;
  }
}

从这里我们就能够大致明白基本的机制了。实际上我们调用 cancelTokencancel 方法的时候,标记了 cancelToken 的错误信息 cancelError,以让_dispatchRequest被调度的时候来检测是否取消。在_dispatchRequest中如果检测到cancelError不为空,就会抛出一个 cancelError,中止当前以及接下来的请求。

总结

从源码来看,有一堆的 Future,以及各类包装方法,阅读起来相当费劲,这也看出来 Dio 的厉害之处,让一般人想这些网络请求头都能想炸去。实际我们从源码以及调试跟踪来看,CancelToken 的机制是:

  • 事前取消:如果请求的 cancelToken 不为空,就会将 cancelToken 的异步处理加入到拦截器,取消后直接在拦截器环节就把请求拦住,不会到后续的调度环节。
  • 事中取消:如果请求已经加入到了调度队列,此时取消的话会抛出异常,中止请求的发出。
  • 事后取消:请求已经发出了,当服务端返回结果的时候,会在响应处理环节(包括出错)拦截,中止后续的响应处理。即便是服务端返回了数据,也会被拦截,不过这个实际意义不太大,无法降低服务端的负荷,只是避免后续的数据处理过程了。
    关注岛上码农

标签:Dio,return,请求,源码,CancelToken,interceptor,future,requestOptions,cancelToken
来源: https://blog.csdn.net/shuijian00/article/details/122004810

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有