ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

TDengine代码学习(3):定时器实现

2019-08-16 20:38:06  阅读:289  来源: 互联网

标签:index tmr 定时器 TDengine int 代码 pObj pCtrl pool


定时器实现

代码介绍

学习TDengine ttimer.c 中的代码。
首先看下用到的数据结构

#define MSECONDS_PER_TICK 5

typedef struct _tmr_ctrl_t {
  void *          signature;
  pthread_mutex_t mutex;            /* mutex to protect critical resource */
  int             resolution;       /* resolution in mseconds */
  int             numOfPeriods;     /* total number of periods */
  int64_t         periodsFromStart; /* count number of periods since start */
  pthread_t       thread;           /* timer thread ID */
  tmr_list_t *    tmrList;
  mpool_h         poolHandle;
  char            label[12];
  int             maxNumOfTmrs;
  int             numOfTmrs;
  int             ticks;
  int             maxTicks;
  int             tmrCtrlId;
} tmr_ctrl_t;

tmr_ctrl_t 这是定时器控制类,设置这类定时器的共有属性,主要是调度粒度(resolution) 和 时间片数目(numOfPeriods)。

  • signature: signature 用来判断这个定时器控制类是否被创建。
  • resolution:定时器的调度粒度,表示多少毫秒会查询一次,单位为毫秒(millisecond)。
  • numOfPeriods:时间片的数目,每个时间片的值为resolution。
  • periodsFromStart:当前所在的时间片,用于获取当前到期的时间片index,在创建新定时器用来计算超时时间。
  • tmrList: 创建一个类型为tmr_list_t,大小为 numOfPeriods 的数组。
  • poolHandle:内存池,用来获取新的定时器类tmr_obj_t 内存块。
  • ticks:当前时间片已过去的tick数,一个tick时间默认设为 MSECONDS_PER_TICK 5毫秒
  • maxTicks:值为 resolution / MSECONDS_PER_TICK,表示一个时间片需要几个tick时间。
typedef struct {
  tmr_obj_t *head;
  int        count;
} tmr_list_t;

tmr_list_t 中head 是一个链表头指针,count 保存链表中项的数目,链表中的项类型是tmr_obj_t。

typedef struct _tmr_obj {
  void *param1;
  void (*fp)(void *, void *);
  tmr_h               timerId;
  short               cycle;
  struct _tmr_obj *   prev;
  struct _tmr_obj *   next;
  int                 index;
  struct _tmr_ctrl_t *pCtrl;
} tmr_obj_t;

tmr_obj_t 就是定时器类,每次创建一个定时器,会按照设置的超时时间,计算出在tmrList 数组中的index,然后加入到tmrList[index]对应的链表中。

  • fp:定时器到期时调用的函数指针
  • param1: 定时器到期时调用函数的参数
  • cycle:表示要经历多少仑后才会到期,这个后面会说明。
  • prev,next:实现双向链表的指针
  • index:在tmrList 数组中的index

定时器调度

每间隔一个tick时间,定时器的调度线程就会调用下面的taosTimerLoopFunc函数,会遍历所有的定时器控制类,根据signature 值判断定时器控制类是否已经被创建,当 “pCtrl->ticks >= pCtrl->maxTicks” 时表示当前时间片已到期,需要调用taosTmrProcessList函数处理该时间片链表上的定时器。

#define maxNumOfTmrCtrl 512
tmr_ctrl_t tmrCtrl[maxNumOfTmrCtrl];

void *taosTimerLoopFunc(int signo) {
  tmr_ctrl_t *pCtrl;
  int         count = 0;

  for (int i = 1; i < maxNumOfTmrCtrl; ++i) {
    pCtrl = tmrCtrl + i;
    if (pCtrl->signature) {
      count++;
      pCtrl->ticks++;
      if (pCtrl->ticks >= pCtrl->maxTicks) {
        taosTmrProcessList(pCtrl);
        pCtrl->ticks = 0;
      }
      if (count >= numOfTmrCtrl) break;
    }
  }

  return NULL;
}

先计算出当前时间片的index 为 “pCtrl->periodsFromStart % pCtrl->numOfPeriods”,获取链表 pCtrl->tmrList[index],然后遍历链表:

  • 如果当前节点的cycle值>0,表示该定时器要在下一轮才到期,这边只将cycle值减一。因为在定时器插入链表时,是按照cycle值从小到大连接,所以该节点后面的节点cycle值肯定大于0,也将cycle值减一。
  • 如果当前节点的cycle值为0,表示该定时器到期了,将该定时器节点从链表中删除,释放内存。

最后将periodsFromStart值加一,指向下一个时间片。
这里有个小疑问:为什么不将periodsFromStart 值每次加一时 直接取numOfPeriods的余数,而要将periodsFromStart 定义为 int64 类型?

void taosTmrProcessList(tmr_ctrl_t *pCtrl) {
  unsigned int index;
  tmr_list_t * pList;
  tmr_obj_t *  pObj, *header;

  pthread_mutex_lock(&pCtrl->mutex);
  index = pCtrl->periodsFromStart % pCtrl->numOfPeriods;
  pList = &pCtrl->tmrList[index];

  while (1) {
    header = pList->head;
    if (header == NULL) break;

    if (header->cycle > 0) {
      pObj = header;
      while (pObj) {
        pObj->cycle--;
        pObj = pObj->next;
      }
      break;
    }

    pCtrl->numOfTmrs--;
    tmrTrace("%s %p, timer expired, fp:%p, tmr_h:%p, index:%d, total:%d", pCtrl->label, header->param1, header->fp,
             header, index, pCtrl->numOfTmrs);

    pList->head = header->next;
    if (header->next) header->next->prev = NULL;
    pList->count--;
    header->timerId = NULL;
    //ignore timer function processing code
    tmrMemPoolFree(pCtrl->poolHandle, (char *)header);
  }
  pCtrl->periodsFromStart++;
  pthread_mutex_unlock(&pCtrl->mutex);
}

定时器结构示例

如果定时器的超时时间为 mseconds, 则对应的cycle 和 index 计算公式如下:

period = mseconds / resolution
cycle = period / numOfPeriods
index = (period + periodsFromStart) % numOfPeriods

假设创建的定时器控制类的resolution 设为100毫秒,时间片数目numOfPeriods设为5个,则遍历一轮时间片的时间为500毫秒,初始化时periodsFromStart值为0, ticks值也为0
这个时候,我们创建三个定时器

  • 定时器1的超时时间mseconds为100毫秒,则period值为1,cycle值为0,index值为1
  • 定时器2的超时时间mseconds为600毫秒,则period值为6,cycle值为1,index值为1
  • 定时器3的超时时间mseconds为800毫秒,则period值为8,cycle值为1,index值为3

这个时候的定时器状态如图1所示:
periodsFromStart值为0
可以看到定时器1和定时器2 都在tmrList[1]所指向的链表中,而且按照cycle值由小到大。
定时器3 在tmrList[3]所指向的链表中
图1 timer1
时间过去200毫秒,这个时候定时器控制类刚好过去两个时间片,定时器状态如图2所示
periodsFromStart值为2
时间片index 为1的链表已经被处理过,定时器1已经超时,从链表中删除,定时器2的cycle值减一。
图2 timer2

定时器精度误差

在创建一个定时器的时候,计算公式只考虑当前所在的时间片index periodsFromStart,而没有考虑当前时间片已经过去的ticks 值,在调度线程中也没有考虑这点,所以定时器的实际超时时间会有一个 (0,resolution] 范围的误差。
比如上面的例子,假设时间控制类resolution 为100毫秒,目前的 periodsFromStart值为0, 新建一个定时器的超时时间mseconds为100毫秒,则所在时间片的
index 为1。

  • 如果此时的ticks值是0,则需要经过200毫秒这个定时器会超时,有resolution(100毫秒)的误差。
  • 如果此时的ticks值是10(即这个时间片时间已过去50毫秒),则需要经过150毫秒这个定时器会超时,有50毫秒的误差。

测试效果

照例自己写个测试熟悉下。实际代码中有限制,最少的时间片数目numOfPeriods为10。

http timer ctrl is initialized, tmrCtrlId:0
malloc pObj:0x78c180
http 0x7fff072545f4, timer is reset, fp:0x401a4b, tmr_h:0x78c180, cycle:0, index:1, total:1 numOfFree:14
malloc pObj:0x78c1c0
http 0x7fff072545f8, timer is reset, fp:0x401a7f, tmr_h:0x78c1c0, cycle:1, index:1, total:2 numOfFree:13
malloc pObj:0x78c200
http 0x7fff072545fc, timer is reset, fp:0x401a7f, tmr_h:0x78c200, cycle:1, index:8, total:3 numOfFree:12
http 0x7fff072545f4, timer expired, fp:0x401a4b, tmr_h:0x78c180, index:1, total:2
timerFunc1 id[1]
http 0x7fff072545f8, timer expired, fp:0x401a7f, tmr_h:0x78c1c0, index:1, total:1
timerFunc2 id[2]
http 0x7fff072545fc, timer expired, fp:0x401a7f, tmr_h:0x78c200, index:8, total:0
timerFunc2 id[3]

完整测试代码

在linux 下面运行测试。
编译需要加上 -lpthread 选项。例子如下:
gcc -o timer timer.c -lpthread

#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <sys/time.h>


#define mpool_h void *
typedef void *tmr_h;

typedef struct {
	int				numOfFree;
	int 			first;
	int 			numOfBlock;
	int 			blockSize;
	int *			freeList;
	char * 			pool;
	pthread_mutex_t	mutex;
} pool_t;

#define maxNumOfTmrCtrl  	16
#define MSECONDS_PER_TICK 	5

typedef struct _tmr_obj
{
	void *param1;
	void (*fp)(void *, void *);
	tmr_h				timerId;
	short				cycle;
	struct _tmr_obj * 	prev;
	struct _tmr_obj * 	next;
	int                 index;
	struct _tmr_ctrl_t *pCtrl;
} tmr_obj_t;

typedef struct
{
	tmr_obj_t * head;
	int			count;
} tmr_list_t;

typedef struct _tmr_ctrl_t
{
	void * 			signature;
	pthread_mutex_t mutex;
	int				resolution;
	int 			numOfPeriods;
	unsigned int	periodsFromStart;
	tmr_list_t *	tmrList;
	mpool_h			poolHandle;
	char            label[12];
	int 			maxNumOfTmrs;
	int 			numOfTmrs;
	int 			ticks;
	int				maxTicks;
} tmr_ctrl_t;


tmr_ctrl_t tmrCtrl[maxNumOfTmrCtrl];


mpool_h memPoolInit(int maxNum, int blockSize);
char * memPoolMalloc(mpool_h handle);
void memPoolFree(mpool_h handle, char *p);
void memPoolCleanup(mpool_h handle);


void tmrProcessList(tmr_ctrl_t *pCtrl)
{
	int index;
	tmr_list_t * pList;
	tmr_obj_t * pObj, *header;

	pthread_mutex_lock(&pCtrl->mutex);
	index = pCtrl->periodsFromStart % pCtrl->numOfPeriods;
	pList = &pCtrl->tmrList[index];
	while(1)
	{
		header = pList->head;
		if(header == NULL) break;

		if(header->cycle > 0)
		{
			pObj = header;
			while(pObj)
			{
				pObj->cycle--;
				pObj = pObj->next;
			}
			break;
		}

		pCtrl->numOfTmrs--;
		printf("%s %p, timer expired, fp:%p, tmr_h:%p, index:%d, total:%d\n", pCtrl->label, header->param1, header->fp,
					 header, index, pCtrl->numOfTmrs);

		pList->head = header->next;
		if(header->next) header->next->prev = NULL;
		pList->count--;
		header->timerId = NULL;

		if (header->fp)
      		(*(header->fp))(header->param1, header);
		
		memPoolFree(pCtrl->poolHandle, (char *)header);
	}
	
	pCtrl->periodsFromStart++;
	pthread_mutex_unlock(&pCtrl->mutex);
	//printf("%s tmrProcessList index[%d]\n", pCtrl->label, index);
}

void * timerLoopFunc(void)
{
	tmr_ctrl_t *pCtrl;
	int i = 0;

	for(i = 0; i < maxNumOfTmrCtrl; i++)
	{
		pCtrl = tmrCtrl + i;
		if(pCtrl->signature)
		{
			pCtrl->ticks++;
			if(pCtrl->ticks >= pCtrl->maxTicks)
			{
				tmrProcessList(pCtrl);
				pCtrl->ticks = 0;
			}
		}
	}
}

void * processAlarmSignal(void *tharg)
{
	sigset_t 		sigset;
	timer_t	 		timerId;
	int signo;

	sigemptyset(&sigset);
	sigaddset(&sigset, SIGALRM);
	sigprocmask(SIG_BLOCK, &sigset, NULL);

	struct itimerval new_value, old_value;
	new_value.it_value.tv_sec = 0;
	new_value.it_value.tv_usec = 1000 * MSECONDS_PER_TICK;
	new_value.it_interval.tv_sec = 0;
	new_value.it_interval.tv_usec = 1000 * MSECONDS_PER_TICK;
	setitimer(ITIMER_REAL, &new_value, &old_value);

	while(1)
	{
		if(sigwait(&sigset, &signo))
		{
	      	printf("Failed to wait signal: number %d", signo);
	      	continue;
	    }
		timerLoopFunc();
	}
	return NULL;
}

void tmrModuleInit(void)
{
	pthread_t		thread;
	pthread_attr_t	tattr;
	
	memset(tmrCtrl, 0, sizeof(tmrCtrl));

	pthread_attr_init(&tattr);
	pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
	if(pthread_create(&thread, &tattr, processAlarmSignal, NULL) != 0)
	{
		printf("failed to create timer thread");
    	return;
  	}

  	pthread_attr_destroy(&tattr);
}

void * tmrInit(int maxNumOfTmrs, int resolution, int longest, char * label)
{
	tmr_ctrl_t * pCtrl;
	int tmrCtrlId = 0;
	int i = 0;

	//tmrCtrlId = taosAllocateId(tmrIdPool);
	pCtrl = tmrCtrl + tmrCtrlId;

	memset(pCtrl, 0, sizeof(tmr_ctrl_t));
	strncpy(pCtrl->label, label, sizeof(pCtrl->label));
	pCtrl->maxNumOfTmrs = maxNumOfTmrs;

	if((pCtrl->poolHandle = memPoolInit(maxNumOfTmrs + 10,sizeof(tmr_obj_t))) == NULL)
	{
		printf("%s failed to allocate mem pool", label);
		memPoolCleanup(pCtrl->poolHandle);
		return NULL;
	}
	
	if(resolution < MSECONDS_PER_TICK) resolution = MSECONDS_PER_TICK;
	pCtrl->resolution = resolution;
	pCtrl->maxTicks = resolution / MSECONDS_PER_TICK;
	pCtrl->ticks = rand() / pCtrl->maxTicks;
	pCtrl->numOfPeriods = longest / resolution;
	if(pCtrl->numOfPeriods < 10) pCtrl->numOfPeriods = 10;

	pCtrl->tmrList = (tmr_list_t *)malloc(sizeof(tmr_list_t) * pCtrl->numOfPeriods);
	for(i = 0; i < pCtrl->numOfPeriods; i++)
	{
		pCtrl->tmrList[i].head = NULL;
		pCtrl->tmrList[i].count = 0;
	}

	pCtrl->signature = pCtrl;
	printf("%s timer ctrl is initialized, tmrCtrlId:%d\n", label, tmrCtrlId);
	return pCtrl;
}

void tmrReset(void (*fp)(void *, void*), int mseconds, void * param1, void * handle, tmr_h *pTmrId)
{
	tmr_obj_t *pObj, *cNode, *pNode;
	tmr_list_t * pList = NULL;
	int index, period;
	tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle;

	if(handle == NULL || pTmrId == NULL) return;

	period = mseconds / pCtrl->resolution;
	if(pthread_mutex_lock(&pCtrl->mutex) != 0)
		printf("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno));

	pObj = (tmr_obj_t *)(*pTmrId);

	if(pObj && pObj->timerId == *pTmrId)
	{
		pList = &(pCtrl->tmrList[pObj->index]);
		if(pObj->prev)
			pObj->prev->next = pObj->next;
		else
			pList->head = pObj->next;

		if(pObj->next)
			pObj->next->prev = pObj->prev;

		pList->count--;
		pObj->timerId = NULL;
		pCtrl->numOfTmrs--;
		printf("reset pObj:%p\n", pObj);
	}
	else
	{
		pObj = (tmr_obj_t *)memPoolMalloc(pCtrl->poolHandle);
		*pTmrId = pObj;
		if(pObj == NULL)
		{
			printf("%s failed to allocate timer, max:%d allocated:%d", pCtrl->label, pCtrl->maxNumOfTmrs, pCtrl->numOfTmrs);
      		pthread_mutex_unlock(&pCtrl->mutex);
      		return;
		}
		printf("malloc pObj:%p\n", pObj);
	}

	pObj->cycle = period / pCtrl->numOfPeriods;
	pObj->param1 = param1;
	pObj->fp = fp;
	pObj->timerId = pObj;
	pObj->pCtrl = pCtrl;

	index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods;
	pList = &(pCtrl->tmrList[index]);
	pObj->index = index;
	cNode = pList->head;
	pNode = NULL;

	while(cNode != NULL)
	{
		if(cNode->cycle < pObj->cycle)
		{
			pNode = cNode;
			cNode = cNode->next;
		}
		else
			break;
	}

	pObj->next = cNode;
	pObj->prev = pNode;

	if(cNode != NULL)
		cNode->prev = pObj;

	if(pNode != NULL)
		pNode->next = pObj;
	else
		pList->head = pObj;

	pList->count++;
	pCtrl->numOfTmrs++;

	if (pthread_mutex_unlock(&pCtrl->mutex) != 0)
    	printf("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno));

	printf("%s %p, timer is reset, fp:%p, tmr_h:%p, cycle:%d, index:%d, total:%d numOfFree:%d\n", pCtrl->label, param1, fp, pObj,
           pObj->cycle, index, pCtrl->numOfTmrs, ((pool_t *)pCtrl->poolHandle)->numOfFree);
	return;
}



mpool_h memPoolInit(int numOfBlock, int blockSize)
{
	int i = 0;
	pool_t * pool_p = NULL;

	if(numOfBlock <= 1 || blockSize <= 1)
	{
		printf("invalid parameter in memPoolInit\n");
		return NULL;
	}

	pool_p = (pool_t *)malloc(sizeof(pool_t));
	if(pool_p == NULL)
	{
		printf("mempool malloc failed\n");
		return NULL;
	}

	memset(pool_p, 0, sizeof(pool_t));

	pool_p->blockSize = blockSize;
	pool_p->numOfBlock = numOfBlock;
	pool_p->pool = (char *)malloc((size_t)(blockSize * numOfBlock));
	pool_p->freeList = (int *)malloc(sizeof(int) * (size_t)numOfBlock);

	if(pool_p->pool == NULL || pool_p->freeList == NULL)
	{
		printf("failed to allocate memory\n");
		free(pool_p->freeList);
		free(pool_p->pool);
		free(pool_p);
	}

	pthread_mutex_init(&(pool_p->mutex), NULL);

	for(i = 0; i < pool_p->numOfBlock; i++)
		pool_p->freeList[i] = i;

	pool_p->first = 0;
	pool_p->numOfFree= pool_p->numOfBlock;

	return (mpool_h)pool_p;
}

char * memPoolMalloc(mpool_h handle)
{
	char * pos = NULL;
	pool_t * pool_p = (pool_t *)handle;

	pthread_mutex_lock(&pool_p->mutex);

	if(pool_p->numOfFree <= 0)
	{
		printf("mempool: out of memory");
	}
	else
	{
		pos = pool_p->pool + pool_p->blockSize * (pool_p->freeList[pool_p->first]);
		pool_p->first = (pool_p->first + 1) % pool_p->numOfBlock;
		pool_p->numOfFree--;
	}

	pthread_mutex_unlock(&pool_p->mutex);
	if(pos != NULL) memset(pos, 0, (size_t)pool_p->blockSize);
	return pos;
}

void memPoolFree(mpool_h handle, char * pMem)
{
	int index = 0;
	pool_t * pool_p = (pool_t *)handle;

	if(pool_p == NULL || pMem == NULL) return;

	pthread_mutex_lock(&pool_p->mutex);

	index = (int)(pMem - pool_p->pool) % pool_p->blockSize;
	if(index != 0)
	{
		printf("invalid free address:%p\n", pMem);
	}
	else
	{
		index = (int)((pMem - pool_p->pool) / pool_p->blockSize);
		if(index < 0 || index >= pool_p->numOfBlock)
		{
			printf("mempool: error, invalid address:%p\n", pMem);
		}
		else
		{
			pool_p->freeList[(pool_p->first + pool_p->numOfFree) % pool_p->numOfBlock] = index;
			pool_p->numOfFree++;
			memset(pMem, 0, (size_t)pool_p->blockSize);
		}
	}
	
	pthread_mutex_unlock(&pool_p->mutex);
}

void memPoolCleanup(mpool_h handle)
{
	pool_t *pool_p = (pool_t *)handle;

	pthread_mutex_destroy(&pool_p->mutex);
	if(pool_p->pool) free(pool_p->pool);
	if(pool_p->freeList) free(pool_p->freeList);
}

void timerFunc1(void *param, void *tmrId) 
{
	int id = *(int *)param;
	printf("%s id[%d]\n", __func__, id);
}

void timerFunc2(void *param, void *tmrId) 
{
	int id = *(int *)param;
	printf("%s id[%d]\n", __func__, id);
}

void test()
{	
	void *timerHandle = tmrInit(5, 100, 1000, "http");
	void *timer1 = NULL, *timer2 = NULL, *timer3 = NULL;
	int id1 = 1, id2 = 2, id3 = 3;

	tmrReset(timerFunc1, 100, (void *)&id1, timerHandle, &timer1);

	tmrReset(timerFunc2, 1100, (void *)&id2, timerHandle, &timer2);

	tmrReset(timerFunc2, 1800, (void *)&id3, timerHandle, &timer3);
}

int main()
{
	sigset_t 		sigset;

	sigemptyset(&sigset);
	sigaddset(&sigset, SIGALRM);
	sigprocmask(SIG_BLOCK, &sigset, NULL);
	
	tmrModuleInit();
	test();
	while(1)
	{	
	}
}

标签:index,tmr,定时器,TDengine,int,代码,pObj,pCtrl,pool
来源: https://blog.csdn.net/marble_xu/article/details/99621424

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有