/** * @file queue_uint64.c * @brief Thread safe queue, which stores uint64_t * @details Call queue_init(queue_uint64_msg_t *q, int max_count) paired with queue_deinit(queue_uint64_msg_t *q) as their names imply, queue_initstruct(queue_uint64_msg_t *q)Initialize the message queue structure,*queue_get(queue_uint64_msg_t *q) and queue_put(queue_uint64_msg_t *q, void *data) In and out of the team operation * @author miaow (3703781@qq.com) * @version 1.0 * @date 2021/01/10 * @mainpage github.com/NanjingForestryUniversity * * @copyright Copyright (c) 2022 miaow * * @par Changelog: * *
Date Version Author Description *
2022/01/09 1.0 miaow Write this file *
*/ #include #include #ifdef QUEUE_UINT64_DEBUG #include #include #endif #include /** * @brief Take out the first item from the circular queue * @param q The queue handler * @param data A buffer of uint64_t[1] to store the data taken out * @return 0 - success, -1 - failed */ int queue_uint64_get(queue_uint64_msg_t *q, uint64_t *data) { pthread_mutex_lock(&q->_mux); // while (q->lget == q->lput && 0 == q->nData) // { // // The reason program goes here: assuming there are 2 consmer threads block in this function // // One wakes first and consumes 2 data quickly before another wakes // // In the circumstances that the queue contains 2 items formerly, the second thread should not get data from an empty queue // // This may happen when 2 queue_puts was called by producers and at that moment 2 consmer threads have been blocked // // It is designed as a circular queue, where lget==lput means: // // 1:nData!=0,a full queue // // 2:nData为0,an empty queue // q->nEmptyThread++; // pthread_cond_wait(&q->_cond_get, &q->_mux); // q->nEmptyThread--; // } if (q->nData == 0) { pthread_mutex_unlock(&q->_mux); return -1; } #ifdef QUEUE_UINT64_DEBUG printf("get data! lget:%d, ", q->lget); #endif *data = (q->buffer)[q->lget++]; #ifdef QUEUE_UINT64_DEBUG printf("data:% lld", *data); #endif if (q->lget == q->size) { // this is a circular queue q->lget = 0; } q->nData--; #ifdef QUEUE_UINT64_DEBUG printf(", nData:%d\r\n", q->nData); #endif // if (q->nFullThread) // { // // call pthread_cond_signal only when necessary, enter the kernel state as little as possible // pthread_cond_signal(&q->_cond_put); // } pthread_mutex_unlock(&q->_mux); return 0; } /** * @brief Initialize the queue with a size (maximum count of items) specified in q->size * @param q The queue hander to be initialized * @return 0 - success, -1 - failed * @note q->size should be set before calling this function */ int queue_uint64_initstruct(queue_uint64_msg_t *q) { q->buffer = malloc(q->size * sizeof(uint64_t)); if (q->buffer == NULL) return -1; pthread_mutex_init(&q->_mux, NULL); // pthread_cond_init(&q->_cond_get, NULL); // pthread_cond_init(&q->_cond_put, NULL); return 0; } /** * @brief Initialize the queue * @param q The queue hander to be initialized * @param max_count Maximum count of items in the queue * @return 0 - success, -1 - failed */ int queue_uint64_init(queue_uint64_msg_t *q, int max_count) { q->size = max_count; return queue_uint64_initstruct(q); } /** * @brief Deinitialize the queue * @param q The queue handle * @return 0 - success */ int queue_uint64_deinit(queue_uint64_msg_t *q) { if (q->buffer != NULL) { free(q->buffer); q->buffer = NULL; } pthread_mutex_destroy(&q->_mux); // pthread_cond_destroy(&q->_cond_get); // pthread_cond_destroy(&q->_cond_put); q->size = 0; q->nData = 0; q->lget = 0; q->lput = 0; // q->nEmptyThread = 0; // q->nFullThread = 0; return 0; } /** * @brief Put one item into the circular queue * @param q The queue handle * @param data The item to put * @return 0 - success, -1 - failed */ int queue_uint64_put(queue_uint64_msg_t *q, uint64_t data) { pthread_mutex_lock(&q->_mux); // while (q->lget == q->lput && q->nData) // { // q->nFullThread++; // pthread_cond_wait(&q->_cond_put, &q->_mux); // q->nFullThread--; // } if (q->lget == q->lput && q->nData) { pthread_mutex_unlock(&q->_mux); return -1; } #ifdef QUEUE_UINT64_DEBUG printf("put data! lput:%d, data:%lld", q->lput, data); #endif (q->buffer)[q->lput++] = data; if (q->lput == q->size) { q->lput = 0; } q->nData++; #ifdef QUEUE_UINT64_DEBUG printf(" nData:%d\n", q->nData); #endif // if (q->nEmptyThread) // { // pthread_cond_signal(&q->_cond_get); // } pthread_mutex_unlock(&q->_mux); return 0; } /** * @brief Clear the circular queue * @param q The queue handle * @return 0 - success, -1 - failed */ int queue_uint64_clear(queue_uint64_msg_t *q) { pthread_mutex_lock(&q->_mux); // while (q->lget == q->lput && q->nData) // { // q->nFullThread++; // pthread_cond_wait(&q->_cond_put, &q->_mux); // q->nFullThread--; // } q->lget = q->lput = q->nData = 0; #ifdef QUEUE_UINT64_DEBUG printf("clear!\r\n"); #endif // if (q->nEmptyThread) // { // pthread_cond_signal(&q->_cond_get); // } pthread_mutex_unlock(&q->_mux); return 0; }