ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

webrtc 线程设计

2021-03-21 17:31:18  阅读:629  来源: 互联网

标签:std task thread rtc 线程 设计 webrtc


webrtc 线程设计

前言

本文主要对webrtc框架使用到的三种线程类进行分析,根据下面两方面去讨论:

  1. 使用场景(webrtc那些模块使用)
  2. 接口设计

1. webrtc::ProcessThread

1.1 使用场景

webrtc::ProcessThread 在 modules\utility\include 路径下,它其实本质上是一个定时器的作用,用于周期性的调度业务模块,比如pacing,发送RTCP报文和NACK重传包发送等模块。这些业务特点都是需要周期性的检测。

1.2 接口设计

webrtc::ProcessThread 是对外提供的接口类,真正的实现在ProcessThreadImpl

class ProcessThreadImpl : public ProcessThread {
 public:
  explicit ProcessThreadImpl(const char* thread_name);
  ~ProcessThreadImpl() override;

  void Start() override;
  void Stop() override;

  void WakeUp(Module* module) override;
  void PostTask(std::unique_ptr<QueuedTask> task) override;
  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
                       uint32_t milliseconds) override;

  void RegisterModule(Module* module, const rtc::Location& from) override;
  void DeRegisterModule(Module* module) override;

 protected:
  static void Run(void* obj);
  bool Process();

 private:
  struct ModuleCallback {
    ModuleCallback() = delete;
    ModuleCallback(ModuleCallback&& cb) = default;
    ModuleCallback(const ModuleCallback& cb) = default;
    ModuleCallback(Module* module, const rtc::Location& location)
        : module(module), location(location) {}
    bool operator==(const ModuleCallback& cb) const {
      return cb.module == module;
    }

    Module* const module;
    int64_t next_callback = 0;  // Absolute timestamp.
    const rtc::Location location;

   private:
    ModuleCallback& operator=(ModuleCallback&);
  };
  struct DelayedTask {
    DelayedTask(int64_t run_at_ms, std::unique_ptr<QueuedTask> task)
        : run_at_ms(run_at_ms), task(task.release()) {}
    friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) {
      // Earliest DelayedTask should be at the top of the priority queue.
      return lhs.run_at_ms > rhs.run_at_ms;
    }

    int64_t run_at_ms;
    // DelayedTask owns the |task|, but some delayed tasks must be removed from
    // the std::priority_queue, but mustn't be deleted. std::priority_queue does
    // not give non-const access to the values, so storing unique_ptr would
    // delete the task as soon as it is remove from the priority queue.
    // Thus lifetime of the |task| is managed manually.
    QueuedTask* task;
  };
  typedef std::list<ModuleCallback> ModuleList;

  void Delete() override;

  // Warning: For some reason, if |lock_| comes immediately before |modules_|
  // with the current class layout, we will  start to have mysterious crashes
  // on Mac 10.9 debug.  I (Tommi) suspect we're hitting some obscure alignemnt
  // issues, but I haven't figured out what they are, if there are alignment
  // requirements for mutexes on Mac or if there's something else to it.
  // So be careful with changing the layout.
  rtc::RecursiveCriticalSection
      lock_;  // Used to guard modules_, tasks_ and stop_.

  rtc::ThreadChecker thread_checker_;
  rtc::Event wake_up_;
  // TODO(pbos): Remove unique_ptr and stop recreating the thread.
  std::unique_ptr<rtc::PlatformThread> thread_;

  ModuleList modules_;
  std::queue<QueuedTask*> queue_;
  std::priority_queue<DelayedTask> delayed_tasks_ RTC_GUARDED_BY(lock_);
  bool stop_;
  const char* thread_name_;
};

为了节省资源,一般是许多模块共用一个webrtc::ProcessThread,提供了RegisterModule函数,注册相关的模块,只要调度的业务按照 class Module 实现接口就行了。

Module 有两个主要的接口,TimeUntilNextProcess() 这是返回当前模块需要调度的时间点,单位是毫秒;Process() 是模块实现具体的业务。

class Module {
 public:
  // Returns the number of milliseconds until the module wants a worker
  // thread to call Process.
  // This method is called on the same worker thread as Process will
  // be called on.
  // TODO(tommi): Almost all implementations of this function, need to know
  // the current tick count.  Consider passing it as an argument.  It could
  // also improve the accuracy of when the next callback occurs since the
  // thread that calls Process() will also have it's tick count reference
  // which might not match with what the implementations use.
  virtual int64_t TimeUntilNextProcess() = 0;

  // Process any pending tasks such as timeouts.
  // Called on a worker thread.
  virtual void Process() = 0;

  // This method is called when the module is attached to a *running* process
  // thread or detached from one.  In the case of detaching, |process_thread|
  // will be nullptr.
  //
  // This method will be called in the following cases:
  //
  // * Non-null process_thread:
  //   * ProcessThread::RegisterModule() is called while the thread is running.
  //   * ProcessThread::Start() is called and RegisterModule has previously
  //     been called.  The thread will be started immediately after notifying
  //     all modules.
  //
  // * Null process_thread:
  //   * ProcessThread::DeRegisterModule() is called while the thread is
  //     running.
  //   * ProcessThread::Stop() was called and the thread has been stopped.
  //
  // NOTE: This method is not called from the worker thread itself, but from
  //       the thread that registers/deregisters the module or calls Start/Stop.
  virtual void ProcessThreadAttached(ProcessThread* process_thread) {}

 protected:
  virtual ~Module() {}
};

2、rtc::TaskQueue

2.1 使用场景

rtc::TaskQueue 在 rtc_base\task_queue 下,它是一个异步处理任务线程模型,使用者继承QueuedTask 实现Run接口,投递到TaskQueue中,TaskQueue就会自动调度任务处理。这个线程模型在webrtc中使用广泛,如webrtc的音频和视频的编码线程就是采用这个线程调度模型

2.2 使用接口

PostTask 投递任务
PostDelayedTask 投递任务,但任务设定在milliseconds 后执行

class RTC_LOCKABLE RTC_EXPORT TaskQueue {
public:
 // TaskQueue priority levels. On some platforms these will map to thread
 // priorities, on others such as Mac and iOS, GCD queue priorities.
 using Priority = ::webrtc::TaskQueueFactory::Priority;

 explicit TaskQueue(std::unique_ptr<webrtc::TaskQueueBase,
                                    webrtc::TaskQueueDeleter> task_queue);
 ~TaskQueue();

 // Used for DCHECKing the current queue.
 bool IsCurrent() const;

 // Returns non-owning pointer to the task queue implementation.
 webrtc::TaskQueueBase* Get() { return impl_; }

 // TODO(tommi): For better debuggability, implement RTC_FROM_HERE.

 // Ownership of the task is passed to PostTask.
 void PostTask(std::unique_ptr<webrtc::QueuedTask> task);

 // Schedules a task to execute a specified number of milliseconds from when
 // the call is made. The precision should be considered as "best effort"
 // and in some cases, such as on Windows when all high precision timers have
 // been used up, can be off by as much as 15 millseconds (although 8 would be
 // more likely). This can be mitigated by limiting the use of delayed tasks.
 void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
                      uint32_t milliseconds);

 // std::enable_if is used here to make sure that calls to PostTask() with
 // std::unique_ptr<SomeClassDerivedFromQueuedTask> would not end up being
 // caught by this template.
 template <class Closure,
           typename std::enable_if<!std::is_convertible<
               Closure,
               std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
 void PostTask(Closure&& closure) {
   PostTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)));
 }

 // See documentation above for performance expectations.
 template <class Closure,
           typename std::enable_if<!std::is_convertible<
               Closure,
               std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
 void PostDelayedTask(Closure&& closure, uint32_t milliseconds) {
   PostDelayedTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)),
                   milliseconds);
 }

private:
 webrtc::TaskQueueBase* const impl_;

 RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
};

rtc::TaskQueue有两个具体的实现类,一个是TaskQueueStdlib
路径是rtc_basc\task_queue_stdlib.h,非window的平台采用这个实现,采用了一个队列实现任务的分发处理;

一个是TaskQueueWin(rtc_basc\task_queue_win.h),TaskQueueWin是window下专有的,采用window的消息机制实现任务的分发处理。

3、rtc::Thread

3.1 使用场景

在创建peerconnectionfactory 的时候,如下图所示会设置对应的rtc::Thread 线程:

RTC_EXPORT rtc::scoped_refptr<PeerConnectionFactoryInterface>
CreatePeerConnectionFactory(
   rtc::Thread* network_thread,
   rtc::Thread* worker_thread,
   rtc::Thread* signaling_thread,
   rtc::scoped_refptr<AudioDeviceModule> default_adm,
   rtc::scoped_refptr<AudioEncoderFactory> audio_encoder_factory,
   rtc::scoped_refptr<AudioDecoderFactory> audio_decoder_factory,
   std::unique_ptr<VideoEncoderFactory> video_encoder_factory,
   std::unique_ptr<VideoDecoderFactory> video_decoder_factory,
   rtc::scoped_refptr<AudioMixer> audio_mixer,
   rtc::scoped_refptr<AudioProcessing> audio_processing,
   AudioFrameProcessor* audio_frame_processor = nullptr);

}  // namespace webrtc

下面这些是来自WebRTC 的线程模型的引用内容

signaling_thread:
一般是工作在 PeerConnection 层,主要是完成控制平面的逻辑,用于和应用层交互。比如,CreateOffer,SetRemoteSession 等接口都是通过 Signal threa 完成的。默认是采用 PeerConnectionFactory 初始化线程作为信令线程

network_thread 会创建带有socket的线程:

  1. Transport 的初始化
  2. 从网络接收数据,发送给 Worker thread
  3. 从 Worker thread 接收数据,发送到网络

worker_thread 不带有socket的线程,处理以下等业务:

  1. 音频设备初始化
  2. 视频设备初始化
  3. 流对象的初始化
  4. 从网络线程接收数据,传给解码器线程
  5. 从编码器线程接收数据,传给网络线程

3.2 接口设计

接口设计参看下面这篇文章:rtc::thread 设计

4. 引用文章

https://zhuanlan.zhihu.com/p/136070941
https://blog.csdn.net/xiaomucgwlmx/article/details/103287398

标签:std,task,thread,rtc,线程,设计,webrtc
来源: https://blog.csdn.net/The_Old_man_and_sea/article/details/114992728

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有