MediaProcessors
fifo.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017, 2018 Rafael Antoniello
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of copyright holders nor the names of its
14  * contributors may be used to endorse or promote products derived
15  * from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18  * “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL COPYRIGHT HOLDERS OR CONTRIBUTORS
21  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
35 #include "fifo.h"
36 
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <fcntl.h>
40 #include <unistd.h>
41 #include <string.h>
42 #include <sys/mman.h>
43 #include <sys/shm.h>
44 #include <sys/stat.h>
45 #include <pthread.h>
46 #include <errno.h>
47 
48 #include "check_utils.h"
49 #include "log.h"
50 #include "stat_codes.h"
51 
52 /* **** Definitions **** */
53 
61 #define SIZEOF_FIFO_ELEM_CTX_T(FLAG_USE_SHM, CHUNK_SIZE_MAX) \
62  (size_t)(sizeof(fifo_elem_ctx_t)+ (FLAG_USE_SHM== 0? 1: CHUNK_SIZE_MAX))
63 
67 typedef struct fifo_elem_ctx_s {
71  ssize_t size;
75  void *elem;
81  uint8_t shm_elem_pool[]; //flexible array member must be last
83 
87 typedef struct fifo_ctx_s {
93  volatile uint32_t flags;
98  volatile int flag_exit;
104 #define FIFO_FILE_NAME_MAX_SIZE 1024
105  char fifo_file_name[FIFO_FILE_NAME_MAX_SIZE];
110  fifo_elem_ctx_dup_fxn_t *elem_ctx_dup;
115  fifo_elem_ctx_release_fxn_t *elem_ctx_release;
119  pthread_mutex_t api_mutex;
120  int flag_api_mutex_initialized;
124  pthread_cond_t buf_put_signal;
125  int flag_buf_put_signal_initialized;
129  pthread_cond_t buf_get_signal;
130  int flag_buf_get_signal_initialized;
134  volatile ssize_t slots_used_cnt;
139  volatile ssize_t buf_level;
145  volatile int input_idx;
152  volatile int output_idx;
172  fifo_elem_ctx_t buf[]; //flexible array member must be last
173 } fifo_ctx_t;
174 
175 /* **** Prototypes **** */
176 
177 static void fifo_close_internal(fifo_ctx_t **ref_fifo_ctx, int flag_deinit);
178 static int fifo_init(fifo_ctx_t *fifo_ctx, size_t slots_max,
179  size_t chunk_size_max, uint32_t flags, const char *fifo_file_name,
180  const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn);
181 static void fifo_deinit(fifo_ctx_t *fifo_ctx);
182 
183 static inline int fifo_input(fifo_ctx_t *fifo_ctx, void **ref_elem,
184  size_t elem_size, int dup_flag);
185 static inline int fifo_output(fifo_ctx_t *fifo_ctx, void **ref_elem,
186  size_t *ref_elem_size, int flush_flag, int64_t tout_usecs);
187 
188 static int fifo_mutex_init(pthread_mutex_t * const pthread_mutex_p,
189  int flag_use_shm, log_ctx_t *log_ctx);
190 static int fifo_cond_init(pthread_cond_t * const pthread_cond_p,
191  int flag_use_shm, log_ctx_t *log_ctx);
192 
193 /* **** Implementations **** */
194 
195 fifo_ctx_t* fifo_open(size_t slots_max, size_t chunk_size_max,
196  uint32_t flags, const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn)
197 {
198  size_t fifo_ctx_size;
199  fifo_ctx_t *fifo_ctx= NULL;
200  int ret_code, end_code= STAT_ERROR;
201  LOG_CTX_INIT(NULL);
202 
203  /* Check arguments */
204  CHECK_DO(slots_max> 0, return NULL);
205  // 'chunk_size_max' may be zero (means no limitation in chunk size)
206  CHECK_DO((flags& FIFO_PROCESS_SHARED)== 0, return NULL);
207  // 'fifo_elem_alloc_fxn' may be NULL
208 
209  /* Allocate FIFO context structure */
210  fifo_ctx_size= sizeof(fifo_ctx_t)+ slots_max* (sizeof(fifo_elem_ctx_t)+ 1);
211  fifo_ctx= calloc(1, fifo_ctx_size);
212  CHECK_DO(fifo_ctx!= NULL, goto end);
213 
214  /* Initialize rest of FIFO context structure.
215  * Note that FIFO file-name do *not* apply when *not* using shared-memory
216  * (thus we pass NULL pointer).
217  */
218  ret_code= fifo_init(fifo_ctx, slots_max, chunk_size_max, flags, NULL,
219  fifo_elem_alloc_fxn);
220  CHECK_DO(ret_code== STAT_SUCCESS, goto end);
221 
222  end_code= STAT_SUCCESS;
223 end:
224  if(end_code!= STAT_SUCCESS)
225  fifo_close(&fifo_ctx);
226  return fifo_ctx;
227 }
228 
229 fifo_ctx_t* fifo_shm_open(size_t slots_max, size_t chunk_size_max,
230  uint32_t flags, const char *fifo_file_name)
231 {
232  size_t fifo_ctx_size;
233  fifo_ctx_t *fifo_ctx= NULL;
234  int shm_fd, ret_code, end_code= STAT_ERROR;
235  LOG_CTX_INIT(NULL);
236 
237  /* Check arguments */
238  CHECK_DO(slots_max> 0, return NULL);
239  CHECK_DO(chunk_size_max> 0, return NULL);
240  // 'flags' may take any value
241  CHECK_DO(fifo_file_name!= NULL, return NULL);
242 
243  /* Add shared memory flag */
244  flags|= FIFO_PROCESS_SHARED;
245 
246  /* Compute the size of the FIFO context structure to be allocated */
247  fifo_ctx_size= sizeof(fifo_ctx_t)+ slots_max*
248  (sizeof(fifo_elem_ctx_t)+ chunk_size_max);
249 
250  /* Create the shared memory segment */
251  shm_fd= shm_open(fifo_file_name, O_CREAT| O_RDWR, S_IRUSR | S_IWUSR);
252  CHECK_DO(shm_fd>= 0,LOGE("errno: %d\n", errno); goto end);
253 
254  /* Configure size of the shared memory segment */
255  CHECK_DO(ftruncate(shm_fd, fifo_ctx_size)== 0, goto end);
256 
257  /* Map the shared memory segment in the address space of the process */
258  fifo_ctx= mmap(NULL, fifo_ctx_size, PROT_READ| PROT_WRITE, MAP_SHARED,
259  shm_fd, 0);
260  if(fifo_ctx== MAP_FAILED)
261  fifo_ctx= NULL;
262  CHECK_DO(fifo_ctx!= NULL, goto end);
263  memset(fifo_ctx, 0, fifo_ctx_size);
264 
265  /* Initialize rest of FIFO context structure.
266  * Note that FIFO element allocation/release external callback functions
267  * do not apply when using shared-memory (thus we pass NULL pointer).
268  */
269  ret_code= fifo_init(fifo_ctx, slots_max, chunk_size_max, flags,
270  fifo_file_name, NULL);
271  CHECK_DO(ret_code== STAT_SUCCESS, goto end);
272 
273  end_code= STAT_SUCCESS;
274 end:
275  /* We will not need file descriptor any more */
276  if(shm_fd>= 0) {
277  ASSERT(close(shm_fd)== 0);
278  }
279  if(end_code!= STAT_SUCCESS)
280  fifo_close(&fifo_ctx);
281  return fifo_ctx;
282 }
283 
284 fifo_ctx_t* fifo_shm_exec_open(size_t slots_max, size_t chunk_size_max,
285  uint32_t flags, const char *fifo_file_name)
286 {
287  size_t fifo_ctx_size;
288  fifo_ctx_t *fifo_ctx= NULL;
289  int shm_fd, end_code= STAT_ERROR;
290  LOG_CTX_INIT(NULL);
291 
292  /* Check arguments */
293  CHECK_DO(slots_max> 0, return NULL);
294  CHECK_DO(chunk_size_max> 0, return NULL);
295  // 'flags' may take any value
296  CHECK_DO(fifo_file_name!= NULL, return NULL);
297 
298  /* Compute the size of the FIFO context structure to be allocated */
299  fifo_ctx_size= sizeof(fifo_ctx_t)+ slots_max*
300  (sizeof(fifo_elem_ctx_t)+ chunk_size_max);
301 
302  /* Create the shared memory segment */
303  shm_fd= shm_open(fifo_file_name, O_RDWR, S_IRUSR | S_IWUSR);
304  CHECK_DO(shm_fd>= 0,LOGE("errno: %d\n", errno); goto end);
305 
306  /* Map the shared memory segment in the address space of the process */
307  fifo_ctx= mmap(NULL, fifo_ctx_size, PROT_READ| PROT_WRITE, MAP_SHARED,
308  shm_fd, 0);
309  if(fifo_ctx== MAP_FAILED)
310  fifo_ctx= NULL;
311  CHECK_DO(fifo_ctx!= NULL, goto end);
312 
313  //LOGV("FIFO flags are: '0x%0x\n", fifo_ctx->flags); //comment-me
314 
315  end_code= STAT_SUCCESS;
316 end:
317  /* We will not need file descriptor any more */
318  if(shm_fd>= 0) {
319  ASSERT(close(shm_fd)== 0);
320  }
321  if(end_code!= STAT_SUCCESS) {
322  fifo_close_internal(&fifo_ctx, 0);
323  }
324  return fifo_ctx;
325 }
326 
327 void fifo_close(fifo_ctx_t **ref_fifo_ctx)
328 {
329  fifo_close_internal(ref_fifo_ctx, 1);
330 }
331 
332 void fifo_set_blocking_mode(fifo_ctx_t *fifo_ctx, int do_block)
333 {
334  LOG_CTX_INIT(NULL);
335 
336  CHECK_DO(fifo_ctx!= NULL, return);
337 
338  pthread_mutex_lock(&fifo_ctx->api_mutex);
339 
340  /* Set the 'non-blocking' bit-flag */
341  if(do_block!= 0) {
342  fifo_ctx->flags&= ~((uint32_t)FIFO_O_NONBLOCK);
343  } else {
344  fifo_ctx->flags|= (uint32_t)FIFO_O_NONBLOCK;
345  }
346 
347  /* Announce to unblock conditional waits */
348  pthread_cond_broadcast(&fifo_ctx->buf_put_signal);
349  pthread_cond_broadcast(&fifo_ctx->buf_get_signal);
350 
351  pthread_mutex_unlock(&fifo_ctx->api_mutex);
352  return;
353 }
354 
355 int fifo_put_dup(fifo_ctx_t *fifo_ctx, const void *elem, size_t elem_size)
356 {
357  void *p= (void*)elem;
358  return fifo_input(fifo_ctx, &p, elem_size, 1/*duplicate*/);
359 }
360 
361 int fifo_put(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t elem_size)
362 {
363  return fifo_input(fifo_ctx, ref_elem, elem_size, 0/*do not duplicate*/);
364 }
365 
366 int fifo_get(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size)
367 {
368  return fifo_output(fifo_ctx, ref_elem, ref_elem_size, 1/*flush FIFO*/,
369  -1/*no time-out*/);
370 }
371 
372 int fifo_timedget(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size,
373  int64_t tout_usecs)
374 {
375  return fifo_output(fifo_ctx, ref_elem, ref_elem_size, 1/*flush FIFO*/,
376  tout_usecs/*user specified time-out*/);
377 }
378 
379 int fifo_show(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size)
380 {
381  return fifo_output(fifo_ctx, ref_elem, ref_elem_size,
382  0/*do NOT flush FIFO*/, -1/*no time-out*/);
383 }
384 
386 {
387  ssize_t buf_level= -1; // invalid value to indicate STAT_ERROR
388  LOG_CTX_INIT(NULL);
389 
390  /* Check arguments */
391  CHECK_DO(fifo_ctx!= NULL, return -1);
392 
393  pthread_mutex_lock(&fifo_ctx->api_mutex);
394  buf_level= fifo_ctx->buf_level;
395  pthread_mutex_unlock(&fifo_ctx->api_mutex);
396 
397  return buf_level;
398 }
399 
400 int fifo_traverse(fifo_ctx_t *fifo_ctx, int elem_cnt,
401  void (*it_fxn)(void *elem, ssize_t elem_size, int idx, void *it_arg,
402  int *ref_flag_break),
403  void *it_arg)
404 {
405  int flag_use_shm;
406  size_t buf_slots_max, chunk_size_max;
407  ssize_t slots_used_cnt;
408  int i, cnt, cnt_max, flag_break;
409  LOG_CTX_INIT(NULL);
410 
411  /* Check arguments */
412  CHECK_DO(fifo_ctx!= NULL, return STAT_ERROR);
413  CHECK_DO(elem_cnt> 0 || elem_cnt== -1, return STAT_ERROR);
414  CHECK_DO(it_fxn!= NULL, return STAT_ERROR);
415 
416  /* Lock API MUTEX */
417  pthread_mutex_lock(&fifo_ctx->api_mutex);
418 
419  flag_use_shm= fifo_ctx->flags& FIFO_PROCESS_SHARED;
420  buf_slots_max= fifo_ctx->buf_slots_max;
421  chunk_size_max= fifo_ctx->chunk_size_max;
422 
423  /* Iterate: we do it beginning with the input index (namely, we go from
424  * the newest queued element to the oldest).
425  */
426  slots_used_cnt= fifo_ctx->slots_used_cnt;
427  if(elem_cnt== -1)
428  elem_cnt= slots_used_cnt; // '-1' means "traverse all the FIFO"
429  cnt_max= (elem_cnt< slots_used_cnt)? elem_cnt: slots_used_cnt;
430  flag_break= 0;
431  for(i= fifo_ctx->input_idx- 1, cnt= 0; cnt< cnt_max; cnt++) {
432  fifo_elem_ctx_t *fifo_elem_ctx= (fifo_elem_ctx_t*)(
433  (uint8_t*)fifo_ctx->buf+
434  i* SIZEOF_FIFO_ELEM_CTX_T(flag_use_shm, chunk_size_max));
435 
436  /* Execute iterator callback function */
437  it_fxn(fifo_elem_ctx->elem, fifo_elem_ctx->size, i, it_arg,
438  &flag_break);
439  if(flag_break!= 0)
440  break;
441 
442  /* Update for next iteration; note that 'buf_slots_max' is > 0 in
443  * modulo operation:
444  * integer r = a % b;
445  * r= r < 0 ? r + b : r; <- Only works if B> 0
446  */
447  i= (i- 1)% buf_slots_max;
448  if(i< 0)
449  i= i+ buf_slots_max;
450  }
451 
452  pthread_mutex_unlock(&fifo_ctx->api_mutex);
453  return STAT_SUCCESS;
454 }
455 
456 void fifo_empty(fifo_ctx_t *fifo_ctx)
457 {
458  int i, flag_use_shm;
459  size_t buf_slots_max, chunk_size_max;
460  LOG_CTX_INIT(NULL);
461 
462  CHECK_DO(fifo_ctx!= NULL, return);
463 
464  /* Lock API mutex */
465  pthread_mutex_lock(&fifo_ctx->api_mutex);
466 
467  flag_use_shm= fifo_ctx->flags& FIFO_PROCESS_SHARED;
468  buf_slots_max= fifo_ctx->buf_slots_max;
469  chunk_size_max= fifo_ctx->chunk_size_max;
470 
471  /* Release all the elements available in FIFO buffer */
472  for(i= 0; i< buf_slots_max; i++) {
473  fifo_elem_ctx_t *fifo_elem_ctx= (fifo_elem_ctx_t*)(
474  (uint8_t*)fifo_ctx->buf+
475  i* SIZEOF_FIFO_ELEM_CTX_T(flag_use_shm, chunk_size_max));
476  if(fifo_elem_ctx->elem!= NULL) {
477  if(fifo_ctx->elem_ctx_release!= NULL) {
478  fifo_ctx->elem_ctx_release(&fifo_elem_ctx->elem);
479  } else {
480  /* This is the only case shared memory may be being used.
481  * If it is the case, it is not applicable to free memory as
482  * we use a preallocated pool (just set element pointer to
483  * NULL).
484  */
485  if(!(fifo_ctx->flags& FIFO_PROCESS_SHARED))
486  free(fifo_elem_ctx->elem);
487  }
488  fifo_elem_ctx->elem= NULL;
489  fifo_elem_ctx->size= 0;
490  }
491  }
492 
493  /* Reset FIFO level and indexes */
494  fifo_ctx->slots_used_cnt= 0;
495  fifo_ctx->buf_level= 0;
496  fifo_ctx->input_idx= 0;
497  fifo_ctx->output_idx= 0;
498 
499  pthread_mutex_unlock(&fifo_ctx->api_mutex);
500 }
501 
508 static void fifo_close_internal(fifo_ctx_t **ref_fifo_ctx, int flag_deinit)
509 {
510  fifo_ctx_t *fifo_ctx;
511  LOG_CTX_INIT(NULL);
512 
513  if(ref_fifo_ctx== NULL || (fifo_ctx= *ref_fifo_ctx)== NULL)
514  return;
515 
516  /* De-initialize FIFO context structure if applicable */
517  if(flag_deinit)
518  fifo_deinit(fifo_ctx);
519 
520  /* Release module instance context structure */
521  if((fifo_ctx->flags& FIFO_PROCESS_SHARED)!= 0) {
522  size_t fifo_ctx_size= sizeof(fifo_ctx_t)+ fifo_ctx->buf_slots_max*
523  (sizeof(fifo_elem_ctx_t)+ fifo_ctx->chunk_size_max);
524 
525  /* remove the mapped shared memory segment from the address space */
526  ASSERT(munmap(fifo_ctx, fifo_ctx_size)== 0);
527  } else {
528  free(fifo_ctx);
529  }
530  *ref_fifo_ctx= NULL;
531 }
532 
538 static int fifo_init(fifo_ctx_t *fifo_ctx, size_t slots_max,
539  size_t chunk_size_max, uint32_t flags, const char *fifo_file_name,
540  const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn)
541 {
542  int ret_code, flag_use_shm, end_code= STAT_ERROR;
543  LOG_CTX_INIT(NULL);
544 
545  /* Check arguments */
546  CHECK_DO(fifo_ctx!= NULL, return STAT_ERROR);
547  CHECK_DO(slots_max> 0, return STAT_ERROR);
548  // 'chunk_size_max' may be zero
549  // 'flags' may take any value
550  // 'fifo_file_name' may be NULL
551  // 'fifo_elem_alloc_fxn' may be NULL
552 
553  flag_use_shm= flags& FIFO_PROCESS_SHARED;
554 
555  /* Module flags */
556  fifo_ctx->flags= flags;
557 
558  /* Exit flag */
559  fifo_ctx->flag_exit= 0;
560 
561  /* Shared FIFO name */
562  if(fifo_file_name!= NULL) {
563  size_t file_name_len;
564  int printed_size;
565 
566  CHECK_DO(flag_use_shm!= 0, goto end);
567  CHECK_DO(fifo_elem_alloc_fxn== NULL, goto end);
568 
569  file_name_len= strlen(fifo_file_name);
570  CHECK_DO(file_name_len> 0 && file_name_len< FIFO_FILE_NAME_MAX_SIZE,
571  goto end);
572 
573  printed_size= snprintf(fifo_ctx->fifo_file_name,
574  FIFO_FILE_NAME_MAX_SIZE, "%s", fifo_file_name);
575  CHECK_DO(printed_size==file_name_len, goto end);
576  }
577 
578  /* FIFO element allocation/release external callback functions */
579  if(fifo_elem_alloc_fxn!= NULL) {
580  fifo_elem_ctx_dup_fxn_t *elem_ctx_dup=
581  fifo_elem_alloc_fxn->elem_ctx_dup;
582  fifo_elem_ctx_release_fxn_t *elem_ctx_release=
583  fifo_elem_alloc_fxn->elem_ctx_release;
584 
585  CHECK_DO(flag_use_shm== 0, goto end);
586 
587  if(elem_ctx_dup!= NULL)
588  fifo_ctx->elem_ctx_dup= elem_ctx_dup;
589  if(elem_ctx_release!= NULL)
590  fifo_ctx->elem_ctx_release= elem_ctx_release;
591  }
592 
593  /* API MUTEX */
594  fifo_ctx->flag_api_mutex_initialized= 0; // To close safely on error
595  ret_code= fifo_mutex_init(&fifo_ctx->api_mutex, flag_use_shm,
596  LOG_CTX_GET());
597  CHECK_DO(ret_code== STAT_SUCCESS, goto end);
598  fifo_ctx->flag_api_mutex_initialized= 1;
599 
600  /* "Put into buffer" conditional */
601  fifo_ctx->flag_buf_put_signal_initialized= 0; // To close safely on error
602  ret_code= fifo_cond_init(&fifo_ctx->buf_put_signal, flag_use_shm,
603  LOG_CTX_GET());
604  CHECK_DO(ret_code== STAT_SUCCESS, goto end);
605  fifo_ctx->flag_buf_put_signal_initialized= 1;
606 
607  /* "Get from buffer" conditional */
608  fifo_ctx->flag_buf_get_signal_initialized= 0; // To close safely on error
609  ret_code= fifo_cond_init(&fifo_ctx->buf_get_signal, flag_use_shm,
610  LOG_CTX_GET());
611  CHECK_DO(ret_code== STAT_SUCCESS, goto end);
612  fifo_ctx->flag_buf_get_signal_initialized= 1;
613 
614  /* Maximum number of element-slots */
615  fifo_ctx->buf_slots_max= slots_max;
616 
617  /* Maximum permitted size of chunks [bytes] */
618  if(flag_use_shm && chunk_size_max== 0) {
619  LOGE("A valid maximum chunk size must be provided when opening a "
620  "shared-memory FIFO.\n");
621  goto end;
622  }
623  fifo_ctx->chunk_size_max= chunk_size_max;
624 
625  end_code= STAT_SUCCESS;
626 end:
627  if(end_code!= STAT_SUCCESS)
628  fifo_deinit(fifo_ctx);
629  return end_code;
630 }
631 
636 static void fifo_deinit(fifo_ctx_t *fifo_ctx)
637 {
638  register int i, flag_use_shm, flag_api_mutex_initialized,
639  flag_buf_put_signal_initialized, flag_buf_get_signal_initialized;
640  size_t buf_slots_max, chunk_size_max;
641  LOG_CTX_INIT(NULL);
642 
643  if(fifo_ctx== NULL)
644  return;
645 
646  flag_use_shm= fifo_ctx->flags& FIFO_PROCESS_SHARED;
647  buf_slots_max= fifo_ctx->buf_slots_max;
648  chunk_size_max= fifo_ctx->chunk_size_max;
649  flag_api_mutex_initialized= fifo_ctx->flag_api_mutex_initialized;
650  flag_buf_put_signal_initialized= fifo_ctx->flag_buf_put_signal_initialized;
651  flag_buf_get_signal_initialized= fifo_ctx->flag_buf_get_signal_initialized;
652 
653  /* Set exit flag and send signals to eventually unlock MUTEX */
654  fifo_ctx->flag_exit= 1;
655  if(flag_api_mutex_initialized!= 0) {
656  pthread_mutex_lock(&fifo_ctx->api_mutex);
657  if(flag_buf_put_signal_initialized!= 0)
658  pthread_cond_broadcast(&fifo_ctx->buf_put_signal);
659  if(flag_buf_get_signal_initialized!= 0)
660  pthread_cond_broadcast(&fifo_ctx->buf_get_signal);
661  pthread_mutex_unlock(&fifo_ctx->api_mutex);
662  }
663 
664  /* Release FIFO buffer elements if applicable */
665  for(i= 0; i< buf_slots_max; i++) {
666  fifo_elem_ctx_t *fifo_elem_ctx= (fifo_elem_ctx_t*)(
667  (uint8_t*)fifo_ctx->buf+
668  i* SIZEOF_FIFO_ELEM_CTX_T(flag_use_shm, chunk_size_max));
669  if(fifo_elem_ctx->elem!= NULL) {
670  if(fifo_ctx->elem_ctx_release!= NULL) {
671  fifo_ctx->elem_ctx_release(&fifo_elem_ctx->elem);
672  } else {
673  /* This is the only case shared memory may be being used.
674  * If it is the case, it is not applicable to free memory
675  * as we use a preallocated pool (just set element pointer
676  * to NULL).
677  */
678  if(!flag_use_shm)
679  free(fifo_elem_ctx->elem);
680  }
681  fifo_elem_ctx->elem= NULL;
682  fifo_elem_ctx->size= 0;
683  }
684  }
685 
686  /* Release API MUTEX */
687  if(flag_api_mutex_initialized!= 0) {
688  ASSERT(pthread_mutex_destroy(&fifo_ctx->api_mutex)== 0);
689  fifo_ctx->flag_api_mutex_initialized= 0;
690  }
691 
692  /* Release conditionals */
693  if(flag_buf_put_signal_initialized!= 0) {
694  ASSERT(pthread_cond_destroy(&fifo_ctx->buf_put_signal)== 0);
695  fifo_ctx->flag_buf_put_signal_initialized= 0;
696  }
697  if(flag_buf_get_signal_initialized!= 0) {
698  ASSERT(pthread_cond_destroy(&fifo_ctx->buf_get_signal)== 0);
699  fifo_ctx->flag_buf_get_signal_initialized= 0;
700  }
701 
702  /* Unlink FIFO file-name if applicable */
703  if(flag_use_shm!= 0 && strlen(fifo_ctx->fifo_file_name)> 0) {
704  ASSERT(shm_unlink(fifo_ctx->fifo_file_name)== 0);
705  memset(fifo_ctx->fifo_file_name, 0, FIFO_FILE_NAME_MAX_SIZE);
706  }
707 }
708 
709 static inline int fifo_input(fifo_ctx_t *fifo_ctx, void **ref_elem,
710  size_t elem_size, int dup_flag)
711 {
712  int flag_use_shm;
713  size_t buf_slots_max, chunk_size_max;
714  fifo_elem_ctx_t *fifo_elem_ctx= NULL;
715  int end_code= STAT_ERROR;
716  LOG_CTX_INIT(NULL);
717 
718  /* Check arguments */
719  CHECK_DO(fifo_ctx!= NULL, return STAT_ERROR);
720  CHECK_DO(ref_elem!= NULL && *ref_elem!= NULL, return STAT_ERROR);
721  CHECK_DO(elem_size> 0, return STAT_ERROR);
722  if((chunk_size_max= fifo_ctx->chunk_size_max)!= 0 &&
723  (elem_size> chunk_size_max)) {
724  LOGE("Size of element exceed configured maximum chunk size for this "
725  "FIFO\n");
726  return STAT_ERROR;
727  }
728  if((dup_flag== 0) && (fifo_ctx->flags& FIFO_PROCESS_SHARED)) {
729  // Duplication is mandatory when using shared memory
730  LOGE("Cannot put element into shared-memory FIFO without duplication. "
731  "Please consider using 'fifo_put_dup()' instead.\n");
732  return STAT_ERROR;
733  }
734 
735  /* Lock API MUTEX */
736  pthread_mutex_lock(&fifo_ctx->api_mutex);
737 
738  flag_use_shm= fifo_ctx->flags& FIFO_PROCESS_SHARED;
739  buf_slots_max= fifo_ctx->buf_slots_max;
740 
741  /* In the case of blocking FIFO, if buffer is full we block until a
742  * element is consumed and a new free slot is available.
743  * In the case of a non-blocking FIFO, if buffer is full we exit
744  * returning 'STAT_ENOMEM' status.
745  */
746  while(fifo_ctx->slots_used_cnt>= buf_slots_max &&
747  !(fifo_ctx->flags& FIFO_O_NONBLOCK) &&
748  fifo_ctx->flag_exit== 0) {
749  pthread_cond_broadcast(&fifo_ctx->buf_put_signal);
750  pthread_cond_wait(&fifo_ctx->buf_get_signal, &fifo_ctx->api_mutex);
751  }
752  if(fifo_ctx->slots_used_cnt>= buf_slots_max &&
753  (fifo_ctx->flags& FIFO_O_NONBLOCK)) {
754  //LOGV("FIFO buffer overflow!\n"); //Comment-me
755  end_code= STAT_ENOMEM;
756  goto end;
757  }
758 
759  /* Get FIFO slot where to put new element */
760  fifo_elem_ctx= (fifo_elem_ctx_t*)((uint8_t*)fifo_ctx->buf+
761  fifo_ctx->input_idx*
762  SIZEOF_FIFO_ELEM_CTX_T(flag_use_shm, chunk_size_max));
763  CHECK_DO(fifo_elem_ctx->elem== NULL, goto end);
764  CHECK_DO(fifo_elem_ctx->size== 0, goto end);
765 
766  /* Get or copy (if applicable) the new element */
767  if(dup_flag!= 0 && fifo_ctx->elem_ctx_dup!= NULL) {
768  fifo_elem_ctx->elem= fifo_ctx->elem_ctx_dup(*ref_elem);
769  CHECK_DO(fifo_elem_ctx->elem!= NULL, goto end);
770  }
771  if(dup_flag!= 0 && fifo_ctx->elem_ctx_dup== NULL) {
772  /* This is the only case shared memory may be being used.
773  * If it is the case, it is not need to allocate memory as we use a
774  * preallocated pool.
775  */
776  fifo_elem_ctx->elem= !(fifo_ctx->flags& FIFO_PROCESS_SHARED)?
777  malloc(elem_size): &fifo_elem_ctx->shm_elem_pool[0];
778  CHECK_DO(fifo_elem_ctx->elem!= NULL, goto end);
779  memcpy(fifo_elem_ctx->elem, *ref_elem, elem_size);
780  }
781  if(dup_flag== 0) {
782  fifo_elem_ctx->elem= *ref_elem;
783  *ref_elem= NULL; // we now own the element
784  }
785  fifo_elem_ctx->size= elem_size;
786 
787  /* Update circular buffer management variables */
788  fifo_ctx->slots_used_cnt+= 1;
789  fifo_ctx->buf_level+= elem_size;
790  //CHECK_DO(fifo_ctx->slots_used_cnt<= buf_slots_max,
791  // fifo_ctx->slots_used_cnt= buf_slots_max); //comment-me
792  fifo_ctx->input_idx= (fifo_ctx->input_idx+ 1)% buf_slots_max;
793  pthread_cond_broadcast(&fifo_ctx->buf_put_signal);
794 
795  end_code= STAT_SUCCESS;
796 end:
797  pthread_mutex_unlock(&fifo_ctx->api_mutex);
798  return end_code;
799 }
800 
801 static inline int fifo_output(fifo_ctx_t *fifo_ctx, void **ref_elem,
802  size_t *ref_elem_size, int flush_flag, int64_t tout_usecs)
803 {
804  int flag_use_shm;
805  size_t buf_slots_max, chunk_size_max;
806  fifo_elem_ctx_t *fifo_elem_ctx= NULL;
807  int ret_code, end_code= STAT_ERROR;
808  void *elem= NULL; // Do not release
809  ssize_t elem_size= 0; // Do not release
810  void *elem_cpy= NULL;
811  struct timespec ts_tout= {0};
812  LOG_CTX_INIT(NULL);
813 
814  /* Check arguments */
815  CHECK_DO(fifo_ctx!= NULL, return STAT_ERROR);
816  CHECK_DO(ref_elem!= NULL, return STAT_ERROR);
817  CHECK_DO(ref_elem_size!= NULL, return STAT_ERROR);
818 
819  /* Reset arguments to be returned by value */
820  *ref_elem= NULL;
821  *ref_elem_size= 0;
822 
823  /* Lock API MUTEX */
824  pthread_mutex_lock(&fifo_ctx->api_mutex);
825 
826  flag_use_shm= fifo_ctx->flags& FIFO_PROCESS_SHARED;
827  buf_slots_max= fifo_ctx->buf_slots_max;
828  chunk_size_max= fifo_ctx->chunk_size_max;
829 
830  /* Get current time and compute time-out if applicable.
831  * Note that a negative time-out mens 'wait indefinitely'.
832  */
833  if(tout_usecs>= 0) {
834  struct timespec ts_curr;
835  register int64_t curr_nsec;
836  /* Get current time */
837  CHECK_DO(clock_gettime(CLOCK_MONOTONIC, &ts_curr)== 0, goto end);
838  curr_nsec= (int64_t)ts_curr.tv_sec*1000000000+ (int64_t)ts_curr.tv_nsec;
839  //LOGV("curr_nsec: %"PRId64"\n", curr_nsec); //comment-me
840  //LOGV("secs: %"PRId64"\n", (int64_t)ts_curr.tv_sec); //comment-me
841  //LOGV("nsecs: %"PRId64"\n", (int64_t)ts_curr.tv_nsec); //comment-me
842  /* Compute time-out */
843  curr_nsec+= (tout_usecs* 1000);
844  ts_tout.tv_sec= curr_nsec/ 1000000000;
845  ts_tout.tv_nsec= curr_nsec% 1000000000;
846  curr_nsec= (int64_t)ts_tout.tv_sec*1000000000+
847  (int64_t)ts_tout.tv_nsec; //comment-me
848  //LOGV("tout_nsec: %"PRId64"\n", curr_nsec); //comment-me
849  //LOGV("secs: %"PRId64"\n", (int64_t)ts_tout.tv_sec); //comment-me
850  //LOGV("nsecs: %"PRId64"\n", (int64_t)ts_tout.tv_nsec); //comment-me
851  }
852 
853  /* In the case of blocking FIFO, if buffer is empty we block until a
854  * new element is inserted, or if it is the case, time-out occur.
855  * In the case of a non-blocking FIFO, if buffer is empty we exit
856  * returning 'STAT_EAGAIN' status.
857  */
858  while(fifo_ctx->slots_used_cnt<= 0 && !(fifo_ctx->flags& FIFO_O_NONBLOCK)
859  && fifo_ctx->flag_exit== 0) {
860  //LOGV("FIFO buffer underrun!\n"); //comment-me
861  pthread_cond_broadcast(&fifo_ctx->buf_get_signal);
862  if(tout_usecs>= 0) {
863  ret_code= pthread_cond_timedwait(&fifo_ctx->buf_put_signal,
864  &fifo_ctx->api_mutex, &ts_tout);
865  if(ret_code== ETIMEDOUT) {
866  LOGW("Warning: FIFO buffer timed-out!\n");
867  end_code= STAT_ETIMEDOUT;
868  goto end;
869  }
870  } else {
871  ret_code= pthread_cond_wait(&fifo_ctx->buf_put_signal,
872  &fifo_ctx->api_mutex);
873  CHECK_DO(ret_code== 0, goto end);
874  }
875  }
876  if(fifo_ctx->slots_used_cnt<= 0 && (fifo_ctx->flags& FIFO_O_NONBLOCK)) {
877  //LOGV("FIFO buffer underrun!\n"); //comment-me
878  end_code= STAT_EAGAIN;
879  goto end;
880  }
881 
882  /* Get the element.
883  * It is important to note that in a for-exec setting, value of
884  * 'fifo_elem_ctx->elem' can not be used as same shared memory will have
885  * different pointers values (because of different virtual memory maps).
886  */
887  fifo_elem_ctx= (fifo_elem_ctx_t*)((uint8_t*)fifo_ctx->buf+
888  fifo_ctx->output_idx*
889  SIZEOF_FIFO_ELEM_CTX_T(flag_use_shm, chunk_size_max));
890  elem= !(fifo_ctx->flags& FIFO_PROCESS_SHARED)? fifo_elem_ctx->elem:
891  &fifo_elem_ctx->shm_elem_pool[0];
892  elem_size= fifo_elem_ctx->size;
893  CHECK_DO(elem!= NULL && elem_size> 0, goto end);
894 
895  /* Flush element from FIFO if required
896  * (Update circular buffer management variables).
897  */
898  if(flush_flag) {
899  fifo_elem_ctx->elem= NULL;
900  fifo_elem_ctx->size= 0;
901  fifo_ctx->slots_used_cnt-= 1;
902  fifo_ctx->buf_level-= elem_size;
903  fifo_ctx->output_idx= (fifo_ctx->output_idx+ 1)% buf_slots_max;
904  pthread_cond_broadcast(&fifo_ctx->buf_get_signal);
905  }
906 
907  /* Set the element references to return.
908  * In the special case we work with shared memory, we must return a copy.
909  */
910  if(!(fifo_ctx->flags& FIFO_PROCESS_SHARED)) {
911  *ref_elem= elem; // directly return the pointer
912  } else {
913  elem_cpy= malloc(elem_size);
914  CHECK_DO(elem_cpy!= NULL, goto end);
915  memcpy(elem_cpy, elem, elem_size);
916  *ref_elem= elem_cpy; // return a copy
917  elem_cpy= NULL; // Avoid double referencing
918  }
919  *ref_elem_size= (size_t)elem_size;
920 
921  end_code= STAT_SUCCESS;
922 end:
923  pthread_mutex_unlock(&fifo_ctx->api_mutex);
924  if(elem_cpy!= NULL)
925  free(elem_cpy);
926  return end_code;
927 }
928 
929 static int fifo_mutex_init(pthread_mutex_t * const pthread_mutex_p,
930  int flag_use_shm, log_ctx_t *log_ctx)
931 {
932  pthread_mutexattr_t attr, *attr_p= NULL;
933  int ret_code, end_code= STAT_ERROR;
934  LOG_CTX_INIT(log_ctx);
935 
936  /* Initialize */
937  if(flag_use_shm!= 0) {
938  pthread_mutexattr_init(&attr);
939  ret_code= pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
940  CHECK_DO(ret_code== 0, goto end);
941  attr_p= &attr; // update pointer
942  }
943  ret_code= pthread_mutex_init(pthread_mutex_p, attr_p);
944  CHECK_DO(ret_code== 0, goto end);
945 
946  end_code= STAT_SUCCESS;
947 end:
948  if(end_code!= STAT_SUCCESS) {
949  ASSERT(pthread_mutex_destroy(pthread_mutex_p)== 0);
950  }
951  return end_code;
952 }
953 
954 static int fifo_cond_init(pthread_cond_t * const pthread_cond_p,
955  int flag_use_shm, log_ctx_t *log_ctx)
956 {
957  pthread_condattr_t attr;
958  int ret_code, end_code= STAT_ERROR;
959  LOG_CTX_INIT(NULL);
960 
961  /* Initialize */
962  pthread_condattr_init(&attr);
963  ret_code= pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
964  CHECK_DO(ret_code== 0, goto end);
965 
966  if(flag_use_shm!= 0) {
967  ret_code= pthread_condattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
968  CHECK_DO(ret_code== 0, goto end);
969  }
970 
971  ret_code= pthread_cond_init(pthread_cond_p, &attr);
972  CHECK_DO(ret_code== 0, goto end);
973 
974  end_code= STAT_SUCCESS;
975 end:
976  if(end_code!= STAT_SUCCESS) {
977  ASSERT(pthread_cond_destroy(pthread_cond_p)== 0);
978  }
979  return end_code;
980 }
#define SIZEOF_FIFO_ELEM_CTX_T(FLAG_USE_SHM, CHUNK_SIZE_MAX)
Definition: fifo.c:61
size_t buf_slots_max
Definition: fifo.c:157
void fifo_close(fifo_ctx_t **ref_fifo_ctx)
Definition: fifo.c:327
int fifo_get(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size)
Definition: fifo.c:366
fifo_elem_ctx_release_fxn_t * elem_ctx_release
Definition: fifo.c:115
volatile int flag_exit
Definition: fifo.c:98
int fifo_timedget(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size, int64_t tout_usecs)
Definition: fifo.c:372
ssize_t fifo_get_buffer_level(fifo_ctx_t *fifo_ctx)
Definition: fifo.c:385
int fifo_put_dup(fifo_ctx_t *fifo_ctx, const void *elem, size_t elem_size)
Definition: fifo.c:355
void fifo_set_blocking_mode(fifo_ctx_t *fifo_ctx, int do_block)
Definition: fifo.c:332
static int fifo_init(fifo_ctx_t *fifo_ctx, size_t slots_max, size_t chunk_size_max, uint32_t flags, const char *fifo_file_name, const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn)
Definition: fifo.c:538
volatile uint32_t flags
Definition: fifo.c:93
pthread_cond_t buf_put_signal
Definition: fifo.c:124
static void fifo_deinit(fifo_ctx_t *fifo_ctx)
Definition: fifo.c:636
int fifo_put(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t elem_size)
Definition: fifo.c:361
pthread_cond_t buf_get_signal
Definition: fifo.c:129
void fifo_empty(fifo_ctx_t *fifo_ctx)
Definition: fifo.c:456
volatile int input_idx
Definition: fifo.c:145
int fifo_show(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size)
Definition: fifo.c:379
#define FIFO_PROCESS_SHARED
Definition: fifo.h:56
General status codes enumeration.
#define CHECK_DO(COND, ACTION)
Definition: check_utils.h:57
fifo_ctx_t * fifo_open(size_t slots_max, size_t chunk_size_max, uint32_t flags, const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn)
Definition: fifo.c:195
ssize_t size
Definition: fifo.c:71
struct fifo_elem_ctx_s fifo_elem_ctx_t
#define FIFO_FILE_NAME_MAX_SIZE
Definition: fifo.c:104
volatile ssize_t slots_used_cnt
Definition: fifo.c:134
int fifo_traverse(fifo_ctx_t *fifo_ctx, int elem_cnt, void(*it_fxn)(void *elem, ssize_t elem_size, int idx, void *it_arg, int *ref_flag_break), void *it_arg)
Definition: fifo.c:400
volatile int output_idx
Definition: fifo.c:152
fifo_elem_ctx_t buf[]
Definition: fifo.c:172
Simple pointer queue (FIFO) implementation.
fifo_ctx_t * fifo_shm_open(size_t slots_max, size_t chunk_size_max, uint32_t flags, const char *fifo_file_name)
Definition: fifo.c:229
struct fifo_ctx_s fifo_ctx_t
#define FIFO_O_NONBLOCK
Definition: fifo.h:50
#define ASSERT(COND)
Definition: check_utils.h:51
static void fifo_close_internal(fifo_ctx_t **ref_fifo_ctx, int flag_deinit)
Definition: fifo.c:508
Definition: log.c:102
fifo_elem_ctx_dup_fxn_t * elem_ctx_dup
Definition: fifo.c:110
uint8_t shm_elem_pool[]
Definition: fifo.c:81
volatile ssize_t buf_level
Definition: fifo.c:139
size_t chunk_size_max
Definition: fifo.c:165
void * elem
Definition: fifo.c:75
pthread_mutex_t api_mutex
Definition: fifo.c:119