61 #define SIZEOF_FIFO_ELEM_CTX_T(FLAG_USE_SHM, CHUNK_SIZE_MAX) \ 62 (size_t)(sizeof(fifo_elem_ctx_t)+ (FLAG_USE_SHM== 0? 1: CHUNK_SIZE_MAX)) 104 #define FIFO_FILE_NAME_MAX_SIZE 1024 120 int flag_api_mutex_initialized;
125 int flag_buf_put_signal_initialized;
130 int flag_buf_get_signal_initialized;
179 size_t chunk_size_max, uint32_t flags,
const char *fifo_file_name,
183 static inline int fifo_input(
fifo_ctx_t *fifo_ctx,
void **ref_elem,
184 size_t elem_size,
int dup_flag);
185 static inline int fifo_output(
fifo_ctx_t *fifo_ctx,
void **ref_elem,
186 size_t *ref_elem_size,
int flush_flag, int64_t tout_usecs);
188 static int fifo_mutex_init(pthread_mutex_t *
const pthread_mutex_p,
190 static int fifo_cond_init(pthread_cond_t *
const pthread_cond_p,
198 size_t fifo_ctx_size;
200 int ret_code, end_code= STAT_ERROR;
204 CHECK_DO(slots_max> 0,
return NULL);
211 fifo_ctx= calloc(1, fifo_ctx_size);
212 CHECK_DO(fifo_ctx!= NULL,
goto end);
218 ret_code=
fifo_init(fifo_ctx, slots_max, chunk_size_max, flags, NULL,
219 fifo_elem_alloc_fxn);
220 CHECK_DO(ret_code== STAT_SUCCESS,
goto end);
222 end_code= STAT_SUCCESS;
224 if(end_code!= STAT_SUCCESS)
230 uint32_t flags,
const char *fifo_file_name)
232 size_t fifo_ctx_size;
234 int shm_fd, ret_code, end_code= STAT_ERROR;
238 CHECK_DO(slots_max> 0,
return NULL);
239 CHECK_DO(chunk_size_max> 0,
return NULL);
241 CHECK_DO(fifo_file_name!= NULL,
return NULL);
251 shm_fd= shm_open(fifo_file_name, O_CREAT| O_RDWR, S_IRUSR | S_IWUSR);
252 CHECK_DO(shm_fd>= 0,LOGE(
"errno: %d\n", errno);
goto end);
255 CHECK_DO(ftruncate(shm_fd, fifo_ctx_size)== 0,
goto end);
258 fifo_ctx= mmap(NULL, fifo_ctx_size, PROT_READ| PROT_WRITE, MAP_SHARED,
260 if(fifo_ctx== MAP_FAILED)
262 CHECK_DO(fifo_ctx!= NULL,
goto end);
263 memset(fifo_ctx, 0, fifo_ctx_size);
269 ret_code=
fifo_init(fifo_ctx, slots_max, chunk_size_max, flags,
270 fifo_file_name, NULL);
271 CHECK_DO(ret_code== STAT_SUCCESS,
goto end);
273 end_code= STAT_SUCCESS;
277 ASSERT(close(shm_fd)== 0);
279 if(end_code!= STAT_SUCCESS)
284 fifo_ctx_t* fifo_shm_exec_open(
size_t slots_max,
size_t chunk_size_max,
285 uint32_t flags,
const char *fifo_file_name)
287 size_t fifo_ctx_size;
289 int shm_fd, end_code= STAT_ERROR;
293 CHECK_DO(slots_max> 0,
return NULL);
294 CHECK_DO(chunk_size_max> 0,
return NULL);
296 CHECK_DO(fifo_file_name!= NULL,
return NULL);
303 shm_fd= shm_open(fifo_file_name, O_RDWR, S_IRUSR | S_IWUSR);
304 CHECK_DO(shm_fd>= 0,LOGE(
"errno: %d\n", errno);
goto end);
307 fifo_ctx= mmap(NULL, fifo_ctx_size, PROT_READ| PROT_WRITE, MAP_SHARED,
309 if(fifo_ctx== MAP_FAILED)
311 CHECK_DO(fifo_ctx!= NULL,
goto end);
315 end_code= STAT_SUCCESS;
319 ASSERT(close(shm_fd)== 0);
321 if(end_code!= STAT_SUCCESS) {
338 pthread_mutex_lock(&fifo_ctx->
api_mutex);
351 pthread_mutex_unlock(&fifo_ctx->
api_mutex);
357 void *p= (
void*)elem;
358 return fifo_input(fifo_ctx, &p, elem_size, 1);
363 return fifo_input(fifo_ctx, ref_elem, elem_size, 0);
368 return fifo_output(fifo_ctx, ref_elem, ref_elem_size, 1,
375 return fifo_output(fifo_ctx, ref_elem, ref_elem_size, 1,
381 return fifo_output(fifo_ctx, ref_elem, ref_elem_size,
387 ssize_t buf_level= -1;
391 CHECK_DO(fifo_ctx!= NULL,
return -1);
393 pthread_mutex_lock(&fifo_ctx->
api_mutex);
395 pthread_mutex_unlock(&fifo_ctx->
api_mutex);
401 void (*it_fxn)(
void *
elem, ssize_t elem_size,
int idx,
void *it_arg,
402 int *ref_flag_break),
406 size_t buf_slots_max, chunk_size_max;
407 ssize_t slots_used_cnt;
408 int i, cnt, cnt_max, flag_break;
412 CHECK_DO(fifo_ctx!= NULL,
return STAT_ERROR);
413 CHECK_DO(elem_cnt> 0 || elem_cnt== -1,
return STAT_ERROR);
414 CHECK_DO(it_fxn!= NULL,
return STAT_ERROR);
417 pthread_mutex_lock(&fifo_ctx->
api_mutex);
428 elem_cnt= slots_used_cnt;
429 cnt_max= (elem_cnt< slots_used_cnt)? elem_cnt: slots_used_cnt;
431 for(i= fifo_ctx->
input_idx- 1, cnt= 0; cnt< cnt_max; cnt++) {
433 (uint8_t*)fifo_ctx->
buf+
437 it_fxn(fifo_elem_ctx->
elem, fifo_elem_ctx->
size, i, it_arg,
447 i= (i- 1)% buf_slots_max;
452 pthread_mutex_unlock(&fifo_ctx->
api_mutex);
459 size_t buf_slots_max, chunk_size_max;
465 pthread_mutex_lock(&fifo_ctx->
api_mutex);
472 for(i= 0; i< buf_slots_max; i++) {
474 (uint8_t*)fifo_ctx->
buf+
476 if(fifo_elem_ctx->
elem!= NULL) {
486 free(fifo_elem_ctx->
elem);
488 fifo_elem_ctx->
elem= NULL;
489 fifo_elem_ctx->
size= 0;
499 pthread_mutex_unlock(&fifo_ctx->
api_mutex);
513 if(ref_fifo_ctx== NULL || (fifo_ctx= *ref_fifo_ctx)== NULL)
526 ASSERT(munmap(fifo_ctx, fifo_ctx_size)== 0);
539 size_t chunk_size_max, uint32_t flags,
const char *fifo_file_name,
542 int ret_code, flag_use_shm, end_code= STAT_ERROR;
546 CHECK_DO(fifo_ctx!= NULL,
return STAT_ERROR);
547 CHECK_DO(slots_max> 0,
return STAT_ERROR);
556 fifo_ctx->
flags= flags;
562 if(fifo_file_name!= NULL) {
563 size_t file_name_len;
566 CHECK_DO(flag_use_shm!= 0,
goto end);
567 CHECK_DO(fifo_elem_alloc_fxn== NULL,
goto end);
569 file_name_len= strlen(fifo_file_name);
573 printed_size= snprintf(fifo_ctx->fifo_file_name,
575 CHECK_DO(printed_size==file_name_len,
goto end);
579 if(fifo_elem_alloc_fxn!= NULL) {
580 fifo_elem_ctx_dup_fxn_t *elem_ctx_dup=
581 fifo_elem_alloc_fxn->elem_ctx_dup;
582 fifo_elem_ctx_release_fxn_t *elem_ctx_release=
583 fifo_elem_alloc_fxn->elem_ctx_release;
585 CHECK_DO(flag_use_shm== 0,
goto end);
587 if(elem_ctx_dup!= NULL)
589 if(elem_ctx_release!= NULL)
594 fifo_ctx->flag_api_mutex_initialized= 0;
595 ret_code= fifo_mutex_init(&fifo_ctx->
api_mutex, flag_use_shm,
597 CHECK_DO(ret_code== STAT_SUCCESS,
goto end);
598 fifo_ctx->flag_api_mutex_initialized= 1;
601 fifo_ctx->flag_buf_put_signal_initialized= 0;
604 CHECK_DO(ret_code== STAT_SUCCESS,
goto end);
605 fifo_ctx->flag_buf_put_signal_initialized= 1;
608 fifo_ctx->flag_buf_get_signal_initialized= 0;
611 CHECK_DO(ret_code== STAT_SUCCESS,
goto end);
612 fifo_ctx->flag_buf_get_signal_initialized= 1;
618 if(flag_use_shm && chunk_size_max== 0) {
619 LOGE(
"A valid maximum chunk size must be provided when opening a " 620 "shared-memory FIFO.\n");
625 end_code= STAT_SUCCESS;
627 if(end_code!= STAT_SUCCESS)
638 register int i, flag_use_shm, flag_api_mutex_initialized,
639 flag_buf_put_signal_initialized, flag_buf_get_signal_initialized;
640 size_t buf_slots_max, chunk_size_max;
649 flag_api_mutex_initialized= fifo_ctx->flag_api_mutex_initialized;
650 flag_buf_put_signal_initialized= fifo_ctx->flag_buf_put_signal_initialized;
651 flag_buf_get_signal_initialized= fifo_ctx->flag_buf_get_signal_initialized;
655 if(flag_api_mutex_initialized!= 0) {
656 pthread_mutex_lock(&fifo_ctx->
api_mutex);
657 if(flag_buf_put_signal_initialized!= 0)
659 if(flag_buf_get_signal_initialized!= 0)
661 pthread_mutex_unlock(&fifo_ctx->
api_mutex);
665 for(i= 0; i< buf_slots_max; i++) {
667 (uint8_t*)fifo_ctx->
buf+
669 if(fifo_elem_ctx->
elem!= NULL) {
679 free(fifo_elem_ctx->
elem);
681 fifo_elem_ctx->
elem= NULL;
682 fifo_elem_ctx->
size= 0;
687 if(flag_api_mutex_initialized!= 0) {
689 fifo_ctx->flag_api_mutex_initialized= 0;
693 if(flag_buf_put_signal_initialized!= 0) {
695 fifo_ctx->flag_buf_put_signal_initialized= 0;
697 if(flag_buf_get_signal_initialized!= 0) {
699 fifo_ctx->flag_buf_get_signal_initialized= 0;
703 if(flag_use_shm!= 0 && strlen(fifo_ctx->fifo_file_name)> 0) {
704 ASSERT(shm_unlink(fifo_ctx->fifo_file_name)== 0);
709 static inline int fifo_input(
fifo_ctx_t *fifo_ctx,
void **ref_elem,
710 size_t elem_size,
int dup_flag)
713 size_t buf_slots_max, chunk_size_max;
715 int end_code= STAT_ERROR;
719 CHECK_DO(fifo_ctx!= NULL,
return STAT_ERROR);
720 CHECK_DO(ref_elem!= NULL && *ref_elem!= NULL,
return STAT_ERROR);
721 CHECK_DO(elem_size> 0,
return STAT_ERROR);
723 (elem_size> chunk_size_max)) {
724 LOGE(
"Size of element exceed configured maximum chunk size for this " 730 LOGE(
"Cannot put element into shared-memory FIFO without duplication. " 731 "Please consider using 'fifo_put_dup()' instead.\n");
736 pthread_mutex_lock(&fifo_ctx->
api_mutex);
755 end_code= STAT_ENOMEM;
779 memcpy(fifo_elem_ctx->
elem, *ref_elem, elem_size);
782 fifo_elem_ctx->
elem= *ref_elem;
785 fifo_elem_ctx->
size= elem_size;
795 end_code= STAT_SUCCESS;
797 pthread_mutex_unlock(&fifo_ctx->
api_mutex);
801 static inline int fifo_output(
fifo_ctx_t *fifo_ctx,
void **ref_elem,
802 size_t *ref_elem_size,
int flush_flag, int64_t tout_usecs)
805 size_t buf_slots_max, chunk_size_max;
807 int ret_code, end_code= STAT_ERROR;
809 ssize_t elem_size= 0;
810 void *elem_cpy= NULL;
811 struct timespec ts_tout= {0};
815 CHECK_DO(fifo_ctx!= NULL,
return STAT_ERROR);
816 CHECK_DO(ref_elem!= NULL,
return STAT_ERROR);
817 CHECK_DO(ref_elem_size!= NULL,
return STAT_ERROR);
824 pthread_mutex_lock(&fifo_ctx->
api_mutex);
834 struct timespec ts_curr;
835 register int64_t curr_nsec;
837 CHECK_DO(clock_gettime(CLOCK_MONOTONIC, &ts_curr)== 0,
goto end);
838 curr_nsec= (int64_t)ts_curr.tv_sec*1000000000+ (int64_t)ts_curr.tv_nsec;
843 curr_nsec+= (tout_usecs* 1000);
844 ts_tout.tv_sec= curr_nsec/ 1000000000;
845 ts_tout.tv_nsec= curr_nsec% 1000000000;
846 curr_nsec= (int64_t)ts_tout.tv_sec*1000000000+
847 (int64_t)ts_tout.tv_nsec;
865 if(ret_code== ETIMEDOUT) {
866 LOGW(
"Warning: FIFO buffer timed-out!\n");
867 end_code= STAT_ETIMEDOUT;
878 end_code= STAT_EAGAIN;
892 elem_size= fifo_elem_ctx->
size;
893 CHECK_DO(elem!= NULL && elem_size> 0,
goto end);
899 fifo_elem_ctx->
elem= NULL;
900 fifo_elem_ctx->
size= 0;
913 elem_cpy= malloc(elem_size);
914 CHECK_DO(elem_cpy!= NULL,
goto end);
915 memcpy(elem_cpy, elem, elem_size);
919 *ref_elem_size= (size_t)elem_size;
921 end_code= STAT_SUCCESS;
923 pthread_mutex_unlock(&fifo_ctx->
api_mutex);
929 static int fifo_mutex_init(pthread_mutex_t *
const pthread_mutex_p,
932 pthread_mutexattr_t attr, *attr_p= NULL;
933 int ret_code, end_code= STAT_ERROR;
934 LOG_CTX_INIT(log_ctx);
937 if(flag_use_shm!= 0) {
938 pthread_mutexattr_init(&attr);
939 ret_code= pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
943 ret_code= pthread_mutex_init(pthread_mutex_p, attr_p);
946 end_code= STAT_SUCCESS;
948 if(end_code!= STAT_SUCCESS) {
949 ASSERT(pthread_mutex_destroy(pthread_mutex_p)== 0);
954 static int fifo_cond_init(pthread_cond_t *
const pthread_cond_p,
957 pthread_condattr_t attr;
958 int ret_code, end_code= STAT_ERROR;
962 pthread_condattr_init(&attr);
963 ret_code= pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
966 if(flag_use_shm!= 0) {
967 ret_code= pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
971 ret_code= pthread_cond_init(pthread_cond_p, &attr);
974 end_code= STAT_SUCCESS;
976 if(end_code!= STAT_SUCCESS) {
977 ASSERT(pthread_cond_destroy(pthread_cond_p)== 0);
#define SIZEOF_FIFO_ELEM_CTX_T(FLAG_USE_SHM, CHUNK_SIZE_MAX)
void fifo_close(fifo_ctx_t **ref_fifo_ctx)
int fifo_get(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size)
fifo_elem_ctx_release_fxn_t * elem_ctx_release
int fifo_timedget(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size, int64_t tout_usecs)
ssize_t fifo_get_buffer_level(fifo_ctx_t *fifo_ctx)
int fifo_put_dup(fifo_ctx_t *fifo_ctx, const void *elem, size_t elem_size)
void fifo_set_blocking_mode(fifo_ctx_t *fifo_ctx, int do_block)
static int fifo_init(fifo_ctx_t *fifo_ctx, size_t slots_max, size_t chunk_size_max, uint32_t flags, const char *fifo_file_name, const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn)
pthread_cond_t buf_put_signal
static void fifo_deinit(fifo_ctx_t *fifo_ctx)
int fifo_put(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t elem_size)
pthread_cond_t buf_get_signal
void fifo_empty(fifo_ctx_t *fifo_ctx)
int fifo_show(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size)
#define FIFO_PROCESS_SHARED
General status codes enumeration.
#define CHECK_DO(COND, ACTION)
fifo_ctx_t * fifo_open(size_t slots_max, size_t chunk_size_max, uint32_t flags, const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn)
struct fifo_elem_ctx_s fifo_elem_ctx_t
#define FIFO_FILE_NAME_MAX_SIZE
volatile ssize_t slots_used_cnt
int fifo_traverse(fifo_ctx_t *fifo_ctx, int elem_cnt, void(*it_fxn)(void *elem, ssize_t elem_size, int idx, void *it_arg, int *ref_flag_break), void *it_arg)
Simple pointer queue (FIFO) implementation.
fifo_ctx_t * fifo_shm_open(size_t slots_max, size_t chunk_size_max, uint32_t flags, const char *fifo_file_name)
struct fifo_ctx_s fifo_ctx_t
static void fifo_close_internal(fifo_ctx_t **ref_fifo_ctx, int flag_deinit)
fifo_elem_ctx_dup_fxn_t * elem_ctx_dup
volatile ssize_t buf_level
pthread_mutex_t api_mutex