[原]Muduo网络库源码分析之定时器的实现

杜肖孟 18/01/07 15:41:05

muduo 的定时器功能由三个 class 实现,TimerId、Timer 和 TimerQueue。

TimerId 类

它唯一标识一个 Timer 定时器。TimerId Class 同时保存Timer* 和 sequence_,这个 sequence_ 是每个 Timer 对象有一个全局递增的序列号 int64_t sequence_,用原子计数器(AtomicInt64)生成。
它主要用于注销定时器,这样就可以区分地址相同的先后两个 Timer 对象。

namespace muduo
{
namespace net
{

class Timer;

///
/// An opaque identifier, for canceling Timer.
///
/* 带有唯一标识的Timer,主要用于取消Timer */
class TimerId : public muduo::copyable
{
 public:
  TimerId()
    : timer_(NULL),
      sequence_(0)
  {
  }

  TimerId(Timer* timer, int64_t seq)
    : timer_(timer),    //timer 定时器的指针
      sequence_(seq)    //seq 该定时任务的序列号
  {
  }

  // default copy-ctor, dtor and assignment are okay

  friend class TimerQueue;

 private:
  Timer* timer_;
  int64_t sequence_;
};

}
}

Timer 类

封装了定时器的一些参数,包括超时时间(expiration_)、超时回调函数(callback_)、时间间隔(interval_)、是否重复定时(repeat_)、定时器的序列号等成员变量,成员函数大都是返回这些变量的值,run() 用来调用回调函数,restart() 用来重启定时器。

Timer.h

namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
/* 定时器 */
class Timer : boost::noncopyable
{
 public:
  Timer(const TimerCallback& cb, Timestamp when, double interval)
    : callback_(cb),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0),
      sequence_(s_numCreated_.incrementAndGet())
  { }

#ifdef __GXX_EXPERIMENTAL_CXX0X__
  Timer(TimerCallback&& cb, Timestamp when, double interval)
    : callback_(std::move(cb)),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0),
      sequence_(s_numCreated_.incrementAndGet())
  { }
#endif

  void run() const
  {
    callback_();    //执行定时器回调函数
  }

  /* 返回定时器的超时时间戳 */
  Timestamp expiration() const  { return expiration_; }
  /* 是否周期性定时 */
  bool repeat() const { return repeat_; }
  /* 返回本定时器的序列号 */
  int64_t sequence() const { return sequence_; }

  /* 重启定时器 */
  void restart(Timestamp now);

  static int64_t numCreated() { return s_numCreated_.get(); }

 private:
  const TimerCallback callback_;    //超时回调函数
  Timestamp expiration_;            //超时时间戳
  const double interval_;           //时间间隔,如果是一次性定时器,该值为0
  const bool repeat_;               //是否重复执行
  const int64_t sequence_;          //本定时任务的序号

  static AtomicInt64 s_numCreated_; //定时器计数,当前已经创建的定时器数量
};
}
}

Timer.cc

#include <muduo/net/Timer.h>

using namespace muduo;
using namespace muduo::net;

AtomicInt64 Timer::s_numCreated_;

void Timer::restart(Timestamp now)
{
  if (repeat_)
  {
      //如果需要重复,那就将时间设为下次超时的时间
    expiration_ = addTime(now, interval_);
  }
  else
  {
      //如果不需要重复,那就将超时时间设为一个不可用的 value
    expiration_ = Timestamp::invalid();
  }
}

TimerQueue 类

定时器队列,用于管理所有的定时器,此类的接口只有两个:添加和注销定时器,分别为 addTimer()cancel()

TimerQueue 数据结构的选择。需要高效地组织目前尚未到期的 Timer,能快速地根据当前时间找到已经到期的 Timer,也要能高效地添加和删除 Timer。最终选择了 set < pair<TimeStamp,Timer*> >,采用 pair 为 key 的原因是可能在一个时刻有多个相同的 Timestamp 时间戳超时,而查找只返回一个,这样即使两个 Timer 的超时时间相同,它们的地址也必须不同。

通过给 timerfd 一个超时时间实现超时计时,它内部有 Channel,通过 Channel 管理 timerfd,然后向EventLoop和 Poller 注册 timerfd 的可读事件,当 timerfd 的可读事件就绪时表明一个超时时间点到了,然后调用 timerfdChannel_ 的可读事件回调 handleRead(),通过 getExpired() 找出所有的超时事件,然后执行相应的超时回调函数 Timer::run()。为了复用定时器,每次处理完之后,会检查这些超时定时器是否需要重复定时,如果需要重复,就再次添加到定时器集合中。

timerfd 如何实现多个定时器超时计时的呢?每次向保存定时器的 set 容器插入一个定时器 Timer 的时候就比较 set 的头元素的超时时间,若新插入的超时时间小,则更新 timerfd 的时间,从而保证 timerfd 始终是 set 中最近的一个超时时间。当 timerfd 可读时,需要遍历容器 set,因为可能此时有多个 Timer 超时了(尽管 tiemrfd 是当前最小的超时时间)。这里的关键是采用 timerfd 实现统一事件源。

TimerQueue.h

namespace muduo
{
namespace net
{

class EventLoop;
class Timer;
class TimerId;

///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
/* 定时器队列 */
class TimerQueue : boost::noncopyable
{
 public:
  explicit TimerQueue(EventLoop* loop);
  ~TimerQueue();

  ///
  /// Schedules the callback to be run at given time,
  /// repeats if @c interval > 0.0.
  ///
  /// Must be thread safe. Usually be called from other threads.
  /* 添加一个定时器 */
  TimerId addTimer(const TimerCallback& cb,
                   Timestamp when,
                   double interval);
#ifdef __GXX_EXPERIMENTAL_CXX0X__
  TimerId addTimer(TimerCallback&& cb,
                   Timestamp when,
                   double interval);
#endif

  /* 注销一个定时器 */
  void cancel(TimerId timerId);

 private:

  // FIXME: use unique_ptr<Timer> instead of raw pointers.
  // This requires heterogeneous comparison lookup (N3465) from C++14
  // so that we can find an T* in a set<unique_ptr<T>>.
  typedef std::pair<Timestamp, Timer*> Entry;   //对应一个定时任务
  typedef std::set<Entry> TimerList;            //定时任务集合,采用set,有key无value,且有序
  typedef std::pair<Timer*, int64_t> ActiveTimer; //下面有解释  
  typedef std::set<ActiveTimer> ActiveTimerSet;

  void addTimerInLoop(Timer* timer);    //添加一个定时任务
  void cancelInLoop(TimerId timerId);   //注销一个定时器
  // called when timerfd alarms
  void handleRead();                    //timerfd 可读 的回调
  // move out all expired timers
  std::vector<Entry> getExpired(Timestamp now); //获取所有超时的定时器
  /* 重置超时的定时器 */
  void reset(const std::vector<Entry>& expired, Timestamp now);

  bool insert(Timer* timer);    //把定时器插到TimerList中

  EventLoop* loop_;             //TimerQueue 所属的 EventLoop
  const int timerfd_;           // 内部的 timerfd 
  Channel timerfdChannel_;      //timerfd 对应的Channel,借此来观察timerfd_ 上的readable事件
  // Timer list sorted by expiration
  TimerList timers_;            //所有的定时任务

  // for cancel()
  // timers_ 与 activeTimers_ 都保存了相同的Timer 地址
  // timers_ 是按超时时间排序,activeTimers_ 是按定时器地址排序
  ActiveTimerSet activeTimers_;
  bool callingExpiredTimers_; /* atomic *///是否处于 处理定时器超时回调中
  ActiveTimerSet cancelingTimers_; //保存被注销的定时器
};
}
}

TimerQueue.cc

TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),
    timerfdChannel_(loop, timerfd_),
    timers_(),
    callingExpiredTimers_(false)
{
  timerfdChannel_.setReadCallback(
      boost::bind(&TimerQueue::handleRead, this));//设置timerfd可读事件回调函数为handleRead
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  timerfdChannel_.enableReading();  //timerfd 注册可读事件
}

TimerQueue::~TimerQueue()
{
  timerfdChannel_.disableAll();
  timerfdChannel_.remove();
  ::close(timerfd_);
  // do not remove channel, since we're in EventLoop::dtor();
  for (TimerList::iterator it = timers_.begin();
      it != timers_.end(); ++it)
  {
    delete it->second; //手动释放Timer*
  }
}

/* 添加定时任务,返回此定时器对应的唯一标识 */
TimerId TimerQueue::addTimer(const TimerCallback& cb,
                             Timestamp when,
                             double interval)
{
    /* new 一个定时器对象 interval 大于0 ,就是需要重复的定时器 */
  Timer* timer = new Timer(cb, when, interval);
  /* 
   * runInLoop 的意思是 如果本IO线程想要添加定时器则直接由 addTimerInLoop 添加
   * 如果是其他线程向IO线程添加定时器则需要间接通过 queueInLoop添加
   */
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}

#ifdef __GXX_EXPERIMENTAL_CXX0X__
TimerId TimerQueue::addTimer(TimerCallback&& cb,
                             Timestamp when,
                             double interval)
{
    // 右值语义
  Timer* timer = new Timer(std::move(cb), when, interval);
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}
#endif
/* 注销一个定时器,被EventLoop::cancel(TimerId timerId)调用 */
void TimerQueue::cancel(TimerId timerId)
{
  loop_->runInLoop(
      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
}

/* IO线程向自己添加定时器 */
void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  bool earliestChanged = insert(timer); //如果当前插入的定时器 比队列中的定时器都早 则返回真

  if (earliestChanged)      //最早的超时时间改变了,就需要重置timerfd_的超时时间
  {
    resetTimerfd(timerfd_, timer->expiration()); //timerfd_ 重新设置超时时间
  }
}

void TimerQueue::cancelInLoop(TimerId timerId)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  ActiveTimer timer(timerId.timer_, timerId.sequence_);
  ActiveTimerSet::iterator it = activeTimers_.find(timer);  //查找该定时器
  if (it != activeTimers_.end()) // 找到了
  {
      /* 从 timers_ 和 activeTimers_ 中删掉*/
    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
    assert(n == 1); (void)n;
    delete it->first; // FIXME: no delete please    //手动 delete 
    activeTimers_.erase(it);
  }
  else if (callingExpiredTimers_) //可能正在处理
  {
      /* 那就先 插入要被注销的定时器 */
    cancelingTimers_.insert(timer);
  }
  assert(timers_.size() == activeTimers_.size());
}

/* timerfd 可读事件的回调函数 */
void TimerQueue::handleRead()
{
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  readTimerfd(timerfd_, now);

  /* 找出所有超时的事件 */
  std::vector<Entry> expired = getExpired(now);

  callingExpiredTimers_ = true;
  cancelingTimers_.clear();
  // safe to callback outside critical section
  for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    it->second->run();  //执行超时定时器的回调
  }
  callingExpiredTimers_ = false;

  reset(expired, now); //重置定时器,如果不需要再次定时,就删掉,否则再次定时
}

/* 获取队列中超时的定时器 */
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;   //保存超时定时器的容器
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); //哨兵值
  TimerList::iterator end = timers_.lower_bound(sentry);    //返回第一个未超时的Timer的迭代器
  assert(end == timers_.end() || now < end->first);         //均未超时或者找到了
  std::copy(timers_.begin(), end, back_inserter(expired));  //把超时的定时器拷贝到 expired 容器中
  timers_.erase(timers_.begin(), end);                      //将超时的定时器从timers_删掉

  for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    size_t n = activeTimers_.erase(timer);      // 将超时的定时器 从 activeTimers_ 删掉
    assert(n == 1); (void)n;
  }

  assert(timers_.size() == activeTimers_.size());   // 都删掉之后 size 应该相同
  return expired;   //返回超时的那部分定时器
}

/* 已经执行完超时回调的定时任务后,检查这些定时器是否需要重复 */
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
  Timestamp nextExpire;

  for (std::vector<Entry>::const_iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    if (it->second->repeat()    // 需要重复 而且 没有要被注销
        && cancelingTimers_.find(timer) == cancelingTimers_.end())
    {
        /* 将该定时器的超时时间改为下次超时的时间 */
      it->second->restart(now);
      insert(it->second);   //重新插入到定时器容器中
    }
    else
    {
      // FIXME move to a free list
      // 不需要重复就删除
      delete it->second; // FIXME: no delete please
    }
  }

  if (!timers_.empty())
  {
      /* 获取当前定时器集合中的最早定时器的时间戳,作为下次超时时间*/
    nextExpire = timers_.begin()->second->expiration();
  }

  if (nextExpire.valid())
  {
    resetTimerfd(timerfd_, nextExpire);     //重置 timerfd_ 的超时时间
  }
}

/* 向 set 中插入新的定时器 */
bool TimerQueue::insert(Timer* timer)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  bool earliestChanged = false;             // 最早的超时时间 是否被更改
  Timestamp when = timer->expiration();     //新插入 timer 的超时时间
  TimerList::iterator it = timers_.begin(); // 当前最早的定时任务
  if (it == timers_.end() || when < it->first)
  {
    earliestChanged = true; 
    //如果timers_为空,或者when 小于目前最早的定时任务,那么最早的超时时间,肯定需要被改变
  }
  {
      /* 向 timers_ 中插入定时任务 */
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  {
      /* 向 activeTimers_ 中插入定时任务 */
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }

  /* 插入完成后,两个容器元素数目应该相同 */
  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;   //返回修改标志,表示最近的超时时间已经改变
}

定时器的使用接口

EventLoop 中提供了四个接口来使用定时器,三个是添加定时器,都转而调用 TimerQueue::addTimer();还有一个注销定时器。如下:

/* 在时间戳为 time 的时刻执行,0.0 表示不重复 */
TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{
  return timerQueue_->addTimer(cb, time, 0.0);
}

/* 延迟 delay 时间执行 */
TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{
  Timestamp time(addTime(Timestamp::now(), delay));
  return runAt(time, cb);
}

/* 重复定时器,时间间隔为 interval */
TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{
  Timestamp time(addTime(Timestamp::now(), interval));
  return timerQueue_->addTimer(cb, time, interval);
}

/* 注销定时器,直接调用 TimerQueue::cancel() */
void EventLoop::cancel(TimerId timerId)
{
  return timerQueue_->cancel(timerId);
}

timerfd 的相关操作

timerfd 是 Linux 为用户程序提供的一个定时器接口,将定时器抽象为文件描述符,通过文件描述符的可读事件进行超时通知,该文件在超时的那一刻变得可读,这样就能完美的融入到 select/poll 框架中,用统一的方式处理 I/O 和定时事件。同时它的时间精度比用 select/poll 的 timeout 更高,timeout 定时精度只有毫秒。

提供了三个 timerfd C API:

#include <sys/timerfd.h> 
int timerfd_create(int clockid, int flags); 
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value,struct itimerspec *old_value); 
int timerfd_gettime(int fd, struct itimerspec *curr_value);

用法和测试例子请参考:
http://xiaorui.cc/2016/07/29/%E5%9F%BA%E4%BA%8Etimerfd-epoll%E5%BC%80%E5%8F%91%E7%9A%84io%E5%AE%9A%E6%97%B6%E5%99%A8/

下面看一下 TimerQueue 中 timerfd 的相关操作。

/* 创建 timerfd */
int createTimerfd()
{
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);
  if (timerfd < 0)
  {
    LOG_SYSFATAL << "Failed in timerfd_create";
  }
  return timerfd;
}

/* 计算超时时间与当前时间的时间差,并将参数转换为 api 接受的类型  */
struct timespec howMuchTimeFromNow(Timestamp when)
{
    /* 微秒数 = 超时时刻微秒数 - 当前时刻微秒数 */
  int64_t microseconds = when.microSecondsSinceEpoch()
                         - Timestamp::now().microSecondsSinceEpoch();
  if (microseconds < 100)
  {
    microseconds = 100;
  }
  struct timespec ts;   //转换成 struct timespec 结构返回
  ts.tv_sec = static_cast<time_t>(
      microseconds / Timestamp::kMicroSecondsPerSecond);
  ts.tv_nsec = static_cast<long>(
      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
  return ts;
}

/* 读timerfd,避免定时器事件一直触发 */
void readTimerfd(int timerfd, Timestamp now)
{
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
  if (n != sizeof howmany)
  {
    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
  }
}

/* 重置定时器超时时间 */
void resetTimerfd(int timerfd, Timestamp expiration)
{
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  bzero(&newValue, sizeof newValue);
  bzero(&oldValue, sizeof oldValue);
  newValue.it_value = howMuchTimeFromNow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); //到这个时间后,会产生一个定时事件
  if (ret)
  {
    LOG_SYSERR << "timerfd_settime()";
  }
}

以上就是 muduo 定时器的实现,我自己在写 http server 时自己用最小堆实现的定时器,有兴趣的可以看下:
https://github.com/Tanswer/Xserver/blob/master/src/timer.h

作者:Tanswer_发表于2018/1/7 15:41:05 原文链接
阅读:71评论:0查看评论