/**
* @file queue_uint64.c
* @brief Thread safe queue, which stores uint64_t
* @details Call queue_init(queue_uint64_msg_t *q, int max_count) paired with queue_deinit(queue_uint64_msg_t *q) as their names imply, queue_initstruct(queue_uint64_msg_t *q)Initialize the message queue structure,*queue_get(queue_uint64_msg_t *q) and queue_put(queue_uint64_msg_t *q, void *data) In and out of the team operation
* @author miaow (3703781@qq.com)
* @version 1.0
* @date 2021/01/10
*
* @copyright Copyright (c) 2022 miaow
*
* @par Changelog:
*
* | Date | Version | Author | Description
* |
|---|
| 2022/01/09 | 1.0 | miaow | Write this file
* |
*/
#include
#include
#ifdef QUEUE_UINT64_DEBUG
#include
#include
#endif
#include
/**
* @brief Take out the first item from the circular queue
* @param q The queue handler
* @param data A buffer of uint64_t[1] to store the data taken out
* @return 0 - success, -1 - failed
*/
int queue_uint64_get(queue_uint64_msg_t *q, uint64_t *data)
{
pthread_mutex_lock(&q->_mux);
// while (q->lget == q->lput && 0 == q->nData)
// {
// // The reason program goes here: assuming there are 2 consmer threads block in this function
// // One wakes first and consumes 2 data quickly before another wakes
// // In the circumstances that the queue contains 2 items formerly, the second thread should not get data from an empty queue
// // This may happen when 2 queue_puts was called by producers and at that moment 2 consmer threads have been blocked
// // It is designed as a circular queue, where lget==lput means:
// // 1:nData!=0,a full queue
// // 2:nData为0,an empty queue
// q->nEmptyThread++;
// pthread_cond_wait(&q->_cond_get, &q->_mux);
// q->nEmptyThread--;
// }
if (q->nData == 0)
{
pthread_mutex_unlock(&q->_mux);
return -1;
}
#ifdef QUEUE_UINT64_DEBUG
printf("get data! lget:%d, ", q->lget);
#endif
*data = (q->buffer)[q->lget++];
#ifdef QUEUE_UINT64_DEBUG
printf("data:% lld", *data);
#endif
if (q->lget == q->size)
{
// this is a circular queue
q->lget = 0;
}
q->nData--;
#ifdef QUEUE_UINT64_DEBUG
printf(", nData:%d\r\n", q->nData);
#endif
// if (q->nFullThread)
// {
// // call pthread_cond_signal only when necessary, enter the kernel state as little as possible
// pthread_cond_signal(&q->_cond_put);
// }
pthread_mutex_unlock(&q->_mux);
return 0;
}
/**
* @brief Initialize the queue with a size (maximum count of items) specified in q->size
* @param q The queue hander to be initialized
* @return 0 - success, -1 - failed
* @note q->size should be set before calling this function
*/
int queue_uint64_initstruct(queue_uint64_msg_t *q)
{
q->buffer = malloc(q->size * sizeof(uint64_t));
if (q->buffer == NULL)
return -1;
pthread_mutex_init(&q->_mux, NULL);
// pthread_cond_init(&q->_cond_get, NULL);
// pthread_cond_init(&q->_cond_put, NULL);
return 0;
}
/**
* @brief Initialize the queue
* @param q The queue hander to be initialized
* @param max_count Maximum count of items in the queue
* @return 0 - success, -1 - failed
*/
int queue_uint64_init(queue_uint64_msg_t *q, int max_count)
{
q->size = max_count;
return queue_uint64_initstruct(q);
}
/**
* @brief Deinitialize the queue
* @param q The queue handle
* @return 0 - success
*/
int queue_uint64_deinit(queue_uint64_msg_t *q)
{
free(q->buffer);
q->buffer = NULL;
pthread_mutex_destroy(&q->_mux);
// pthread_cond_destroy(&q->_cond_get);
// pthread_cond_destroy(&q->_cond_put);
q->size = 0;
q->nData = 0;
q->lget = 0;
q->lput = 0;
// q->nEmptyThread = 0;
// q->nFullThread = 0;
return 0;
}
/**
* @brief Put one item into the circular queue
* @param q The queue handle
* @param data The item to put
* @return 0 - success, -1 - failed
*/
int queue_uint64_put(queue_uint64_msg_t *q, uint64_t data)
{
pthread_mutex_lock(&q->_mux);
// while (q->lget == q->lput && q->nData)
// {
// q->nFullThread++;
// pthread_cond_wait(&q->_cond_put, &q->_mux);
// q->nFullThread--;
// }
if (q->lget == q->lput && q->nData)
{
pthread_mutex_unlock(&q->_mux);
return -1;
}
#ifdef QUEUE_UINT64_DEBUG
printf("put data! lput:%d, data:%lld", q->lput, data);
#endif
(q->buffer)[q->lput++] = data;
if (q->lput == q->size)
{
q->lput = 0;
}
q->nData++;
#ifdef QUEUE_UINT64_DEBUG
printf(" nData:%d\n", q->nData);
#endif
// if (q->nEmptyThread)
// {
// pthread_cond_signal(&q->_cond_get);
// }
pthread_mutex_unlock(&q->_mux);
return 0;
}
/**
* @brief Clear the circular queue
* @param q The queue handle
* @return 0 - success, -1 - failed
*/
int queue_uint64_clear(queue_uint64_msg_t *q)
{
pthread_mutex_lock(&q->_mux);
// while (q->lget == q->lput && q->nData)
// {
// q->nFullThread++;
// pthread_cond_wait(&q->_cond_put, &q->_mux);
// q->nFullThread--;
// }
q->lget = q->lput = q->nData = 0;
#ifdef QUEUE_UINT64_DEBUG
printf("clear!\r\n");
#endif
// if (q->nEmptyThread)
// {
// pthread_cond_signal(&q->_cond_get);
// }
pthread_mutex_unlock(&q->_mux);
return 0;
}