ICode9

精准搜索请尝试: 精确搜索
首页 > 系统相关> 文章详细

Linux应用编程实现简单队列功能-改进

2021-05-22 13:59:34  阅读:165  来源: 互联网

标签:队列 编程 ret queue item int tm Linux buf


queue.h



#ifndef __QUEUE_H__
#define __QUEUE_H__

/*
 * -lpthread
 * 编译时需要线程库
 */
#include <pthread.h>


/*
 * https://www.kernel.org/doc/man-pages/
 * https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html
 */
struct queue_buf_t {
	char *data_buf;
	int item_num;
	int item_size;
	int item_cnt;
	int wr;
	int rd;
	pthread_mutex_t mtx;

	/*
	 * 数据成功入队后,触发pop_cond,去唤醒由于对空队列进行pop操作被阻塞的线程
	 */
	pthread_cond_t pop_cond;

	/*
	 * 数据成功出队后,触发push_cond,去唤醒由于满空队列进行push操作被阻塞的线程
	 */
	pthread_cond_t push_cond;
};

int queue_buf_item_num(struct queue_buf_t *queue);
int queue_buf_item_cnt(struct queue_buf_t *queue);
int queue_buf_push(struct queue_buf_t *queue, const void *item);
int queue_buf_push_wait(struct queue_buf_t *queue, const void *item, unsigned int timeout_ms);
int queue_buf_pop(struct queue_buf_t *queue, void *item);
int queue_buf_pop_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms);
int queue_buf_get(struct queue_buf_t *queue, void *item);
int queue_buf_get_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms);
struct queue_buf_t *queue_buf_alloc(int item_num, int item_size);
void queue_buf_free(struct queue_buf_t *queue_buf);


#endif





queue.c

/*
 * Copyright (C) 2021, 2021  huohongpeng
 * Author: huohongpeng <1045338804@qq.com>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * Change logs:
 * Date        Author       Notes
 * 2021-05-15  huohongpeng   首次添加
 * 2021-05-17 修改tm_to_ns()在32bit平台溢出问题
 * 2021-05-22 1.将队列元素从long类型修改为一个大小可调整的缓冲区.
 *            2.队列大小又新变量进行记录,不在使用rd和wr进行计算,避免浪费一个无用的空间
 */


#include "queue.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <errno.h>


static long long tm_to_ns(struct timespec tm)
{
	long long ret = tm.tv_sec;
	ret = ret * 1000000000 + tm.tv_nsec;
	
	return ret;
}

static struct timespec ns_to_tm(long long ns)
{
	struct timespec tm;
	long long tmp;
	tmp = ns / 1000000000;
	tm.tv_sec = tmp;
	
	tm.tv_nsec = ns - (tmp * 1000000000);
	return tm;
}


int queue_buf_item_num(struct queue_buf_t *queue)
{
	pthread_mutex_lock(&queue->mtx);
	int num = queue->item_num;
	pthread_mutex_unlock(&queue->mtx);

	return num;
}

int queue_buf_item_cnt(struct queue_buf_t *queue)
{
	pthread_mutex_lock(&queue->mtx);
	int cnt = queue->item_cnt;
	pthread_mutex_unlock(&queue->mtx);
	
	return cnt;
}


int queue_buf_push(struct queue_buf_t *queue, const void *item)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;
	
	pthread_mutex_lock(&queue->mtx);
	if (queue->item_cnt == queue->item_num) {
		ret = -1;
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->wr;
		memcpy(data, item, queue->item_size);
		queue->wr++;
		queue->wr %= queue->item_num;
		queue->item_cnt++;
	}
	
	pthread_mutex_unlock(&queue->mtx);

	if (ret == 0) {
		/*
		 * 队列中有数据了,通知其他被阻塞的线程可以读数据
		 */
		pthread_cond_signal(&queue->pop_cond);
	}
	
	return ret;
}

int queue_buf_push_wait(struct queue_buf_t *queue, const void *item, unsigned int timeout_ms)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;

	struct timespec start_tm;
	struct timespec end_tm;

	clock_gettime(CLOCK_MONOTONIC, &start_tm);
	
	long long tmp = timeout_ms;
	printf("tmp: %lld\n", tmp);
	end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000);

	printf("st: %lld\n", tm_to_ns(start_tm));
	printf("et: %lld\n", tm_to_ns(end_tm));
	printf("dt: %lld\n", tm_to_ns(end_tm) - tm_to_ns(start_tm));


	pthread_mutex_lock(&queue->mtx);
	
	while (queue->item_cnt == queue->item_num) {
		//printf("2tmp: %lld\n", tmp);
		/*
		 * 队列为满需要等待push_cond有效
		 */
		if (pthread_cond_timedwait(&queue->push_cond, &queue->mtx, &end_tm) == ETIMEDOUT) {
			/*
			 * 如果超时则退出等待
			 */
			ret = -1;
			break;
		}
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->wr;
		memcpy(data, item, queue->item_size);
		queue->wr++;
		queue->wr %= queue->item_num;
		queue->item_cnt++;
	}
	
	pthread_mutex_unlock(&queue->mtx);

	if (ret == 0) {
		pthread_cond_signal(&queue->pop_cond);
	}

	return ret;
}

int queue_buf_pop(struct queue_buf_t *queue, void *item)
{
	if (!queue) {
		return -1;
	}
	
	int ret = 0;

	pthread_mutex_lock(&queue->mtx);

	if (queue->item_cnt == 0) {
		ret = -1;
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->rd;
		memcpy(item, data, queue->item_size);

		queue->rd++;
		queue->rd %= queue->item_num;
		queue->item_cnt--;
	}
	
	pthread_mutex_unlock(&queue->mtx);

	if (ret == 0) {
		/*
		 * 通知其他线程队列已经有空间
		 */
		pthread_cond_signal(&queue->push_cond);
	}

	return ret;
}

int queue_buf_pop_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;

	struct timespec start_tm;
	struct timespec end_tm;
	
	clock_gettime(CLOCK_MONOTONIC, &start_tm);
	long long tmp = timeout_ms;
	end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000);
	
	pthread_mutex_lock(&queue->mtx);
	
	while (queue->item_cnt == 0) {
		/*
		 * 队列为空需要等待pop_cond有效
		 */
		if (pthread_cond_timedwait(&queue->pop_cond, &queue->mtx, &end_tm) == ETIMEDOUT) {
			/*
			 * 如果超时则退出等待
			 */
			ret = -1;
			break;
		}
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->rd;
		memcpy(item, data, queue->item_size);

		queue->rd++;
		queue->rd %= queue->item_num;
		queue->item_cnt--;
	}

	pthread_mutex_unlock(&queue->mtx);

	if (ret == 0) {
		/*
		 * 通知其他线程队列已经有空间
		 */
		pthread_cond_signal(&queue->push_cond);
	}

	return ret;
}


int queue_buf_get(struct queue_buf_t *queue, void *item)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;

	pthread_mutex_lock(&queue->mtx);

	if (queue->item_cnt == 0) {
		ret =  -1;
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->rd;
		memcpy(item, data, queue->item_size);
	}

	pthread_mutex_unlock(&queue->mtx);

	return ret;

}


int queue_buf_get_wait(struct queue_buf_t *queue, void *item, unsigned int timeout_ms)
{
	if (!queue) {
		return -1;
	}

	int ret = 0;

	struct timespec start_tm;
	struct timespec end_tm;

	clock_gettime(CLOCK_MONOTONIC, &start_tm);
	long long tmp = timeout_ms;
	end_tm = ns_to_tm(tm_to_ns(start_tm) + tmp*1000000);
	
	pthread_mutex_lock(&queue->mtx);
	
	while (queue->item_cnt == 0) {
		/*
		 * 队列为空需要等待pop_cond有效
		 */
		if (pthread_cond_timedwait(&queue->pop_cond, &queue->mtx, &end_tm) == ETIMEDOUT) {
			/*
			 * 如果超时则退出等待
			 */
			ret = -1;
			break;
		}
	}

	if (ret == 0) {
		char *data = queue->data_buf + queue->item_size * queue->rd;
		memcpy(item, data, queue->item_size);
	}

	pthread_mutex_unlock(&queue->mtx);

	return ret;
}



struct queue_buf_t *queue_buf_alloc(int item_num, int item_size)
{
	struct queue_buf_t *queue;
	char *p;

	p = (char *)malloc(sizeof(struct queue_buf_t) + item_num * item_size);
	
	if (!p) {
		return NULL;
	}

	queue = (struct queue_buf_t *)p;
	memset(queue, 0x00, sizeof(struct queue_buf_t));
	
	queue->data_buf = (char *)(p + sizeof(struct queue_buf_t));
	queue->item_num = item_num;
	queue->item_size = item_size;
	pthread_mutex_init(&queue->mtx, NULL);

	pthread_condattr_t attr;
	pthread_condattr_init(&attr);
	
#if 0
	clockid_t clock_id;
	pthread_condattr_getclock(&attr, &clock_id);
	printf("clock_id: %d\n", clock_id);
#endif

	/*
	 * pthread_cond_timedwait()默认使用的是CLOCK_REALTIME,
	 * CLOCK_REALTIME容易受系统影响,比如校时操作
	 * 所以条件变量使用的时钟改为CLOCK_MONOTONIC
	 * 参考:https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html
	 */
    pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);

	pthread_cond_init(&queue->push_cond, &attr);
	pthread_cond_init(&queue->pop_cond, &attr);
	
	pthread_condattr_destroy(&attr);
	
	return queue;
}


void queue_buf_free(struct queue_buf_t *queue)
{
	if (queue) {
		pthread_mutex_destroy(&queue->mtx);
		pthread_cond_destroy(&queue->pop_cond);
		pthread_cond_destroy(&queue->push_cond);
		free(queue);
	}
}



void test(void)
{
	struct queue_buf_t *q = queue_buf_alloc(20, sizeof(long long));

	long long l = 9999999999*100;

	int i;

	for (i = 0; i < 25; i++) {
		long long t = l + i;
		int ret = queue_buf_push_wait(q, &t, 1000);
		printf("push ret: %d\n", ret);
		printf("num: %d\n", queue_buf_item_num(q));
		printf("cnt: %d\n", queue_buf_item_cnt(q));
	}

	for (i = 0; i < 25; i++) {
		long long t = 0;
		int ret = queue_buf_pop_wait(q, &t, 500);
		printf("pop ret: %d, data: %lld\n", ret, t);
		printf("num: %d\n", queue_buf_item_num(q));
		printf("cnt: %d\n", queue_buf_item_cnt(q));
	}

	for (i = 0; i < 25; i++) {
		long long t = 0;
		int ret = queue_buf_get_wait(q, &t, 100);
		printf("get ret: %d, data: %lld\n", ret, t);
		printf("num: %d\n", queue_buf_item_num(q));
		printf("cnt: %d\n", queue_buf_item_cnt(q));
	}

}

 

标签:队列,编程,ret,queue,item,int,tm,Linux,buf
来源: https://blog.csdn.net/huohongpeng/article/details/117158406

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有