MediaProcessors
proc.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017, 2018 Rafael Antoniello
3  *
4  * This file is part of MediaProcessors.
5  *
6  * MediaProcessors is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Lesser General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * MediaProcessors is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with MediaProcessors. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
25 #include "proc.h"
26 
27 #include <stdlib.h>
28 #include <unistd.h>
29 #include <string.h>
30 #include <pthread.h>
31 #include <time.h>
32 #include <errno.h>
33 
34 #include <libcjson/cJSON.h>
35 
36 #include <libmediaprocsutils/log.h>
37 #include <libmediaprocsutils/stat_codes.h>
38 #include <libmediaprocsutils/check_utils.h>
39 #include <libmediaprocsutils/schedule.h>
40 #include <libmediaprocsutils/fifo.h>
41 #include <libmediaprocsutils/fair_lock.h>
42 #include <libmediaprocsutils/interr_usleep.h>
43 
44 #include "proc_if.h"
45 
46 /* **** Definitions **** */
47 
48 //#define ENABLE_DEBUG_LOGS
49 #ifdef ENABLE_DEBUG_LOGS
50  #define LOGD_CTX_INIT(CTX) LOG_CTX_INIT(CTX)
51  #define LOGD(FORMAT, ...) LOGV(FORMAT, ##__VA_ARGS__)
52 #else
53  #define LOGD_CTX_INIT(CTX)
54  #define LOGD(...)
55 #endif
56 
60 #define TAG_HAS(NEEDLE) (strstr(tag, NEEDLE)!= NULL)
61 
65 #define TAG_IS(TAG) (strcmp(tag, TAG)== 0)
66 
70 #define PROC_STATS_THR_MEASURE_PERIOD_USECS (1000000)
71 
72 /* **** Prototypes **** */
73 
74 static int procs_id_get(proc_ctx_t *proc_ctx, log_ctx_t *log_ctx,
75  proc_if_rest_fmt_t rest_fmt, void **ref_reponse);
76 
77 static void* proc_stats_thr(void *t);
78 static void* proc_thr(void *t);
79 
80 static void proc_stats_register_frame_pts(proc_ctx_t *proc_ctx,
81  const proc_frame_ctx_t *proc_frame_ctx, const proc_io_t proc_io);
83  const proc_frame_ctx_t *proc_frame_ctx, const proc_io_t proc_io);
84 
85 /* **** Implementations **** */
86 
87 proc_ctx_t* proc_open(const proc_if_t *proc_if, const char *settings_str,
88  int proc_instance_index, uint32_t fifo_ctx_maxsize[PROC_IO_NUM],
89  log_ctx_t *log_ctx, va_list arg)
90 {
91  uint64_t flag_proc_features;
92  int i, ret_code, end_code= STAT_ERROR;
93  proc_ctx_t *proc_ctx= NULL;
94  fifo_elem_alloc_fxn_t fifo_elem_alloc_fxn= {0};
95  LOG_CTX_INIT(log_ctx);
96 
97  /* Check arguments */
98  CHECK_DO(proc_if!= NULL, return NULL);
99  CHECK_DO(settings_str!= NULL, return NULL);
100  CHECK_DO(fifo_ctx_maxsize!= NULL , return NULL);
101  // Note: 'log_ctx' is allowed to be NULL
102 
103  /* Check mandatory call-backs existence */
104  CHECK_DO(proc_if->open!= NULL, goto end);
105  CHECK_DO(proc_if->close!= NULL, goto end);
106 
107  /* Open (allocate) the specific processor (PROC) instance */
108  proc_ctx= proc_if->open(proc_if, settings_str, LOG_CTX_GET(), arg);
109  CHECK_DO(proc_ctx!= NULL, goto end);
110 
111  /* Set processor (PROC) interface context structure */
112  proc_ctx->proc_if= proc_if;
113 
114  /* Set PROC register index. */
115  proc_ctx->proc_instance_index= proc_instance_index;
116 
117  /* API mutual exclusion lock */
118  ret_code= pthread_mutex_init(&proc_ctx->api_mutex, NULL);
119  CHECK_DO(ret_code== 0, goto end);
120 
121  /* Set LOG module:
122  * IMPORTANT NOTE: LOG module is thought to be set externally (thus
123  * passed by argument as a pointer). Nevertheless, any specific
124  * implementation may implement its own module instance internally, so we
125  * check it before overwriting what was set at 'proc_if_t::open'.
126  */
127  if(proc_ctx->log_ctx!= NULL) {
128  // Processor manages its own internal instantiation
129  LOG_CTX_SET(proc_ctx->log_ctx);
130  } else {
131  // Processor use external instance passed by argument
132  proc_ctx->log_ctx= LOG_CTX_GET();
133  }
134 
135  /* Initialize input FIFO buffer */
136  fifo_elem_alloc_fxn.elem_ctx_dup= (fifo_elem_ctx_dup_fxn_t*)
137  proc_if->iput_fifo_elem_opaque_dup;
138  fifo_elem_alloc_fxn.elem_ctx_release= (fifo_elem_ctx_release_fxn_t*)
140  proc_ctx->fifo_ctx_array[PROC_IPUT]= fifo_open(fifo_ctx_maxsize[PROC_IPUT],
141  0/*unlimited chunk size*/, 0, &fifo_elem_alloc_fxn);
142  CHECK_DO(proc_ctx->fifo_ctx_array[PROC_IPUT]!= NULL, goto end);
143 
144  /* Initialize output FIFO buffer */
145  fifo_elem_alloc_fxn.elem_ctx_dup= (fifo_elem_ctx_dup_fxn_t*)
146  proc_if->oput_fifo_elem_opaque_dup;
147  fifo_elem_alloc_fxn.elem_ctx_release= (fifo_elem_ctx_release_fxn_t*)
149  proc_ctx->fifo_ctx_array[PROC_OPUT]= fifo_open(fifo_ctx_maxsize[PROC_OPUT],
150  0/*unlimited chunk size*/, 0, &fifo_elem_alloc_fxn);
151  CHECK_DO(proc_ctx->fifo_ctx_array[PROC_OPUT]!= NULL, goto end);
152 
153  /* Initialize input/output fair-locks */
154  for(i= 0; i< PROC_IO_NUM; i++) {
155  fair_lock_t *fair_lock= fair_lock_open();
156  CHECK_DO(fair_lock!= NULL, goto end);
157  proc_ctx->fair_lock_io_array[i]= fair_lock;
158  }
159 
160  /* Initialize input/output MUTEX for bitrate statistics related */
161  ret_code= pthread_mutex_init(&proc_ctx->acc_io_bits_mutex[PROC_IPUT], NULL);
162  CHECK_DO(ret_code== 0, goto end);
163  ret_code= pthread_mutex_init(&proc_ctx->acc_io_bits_mutex[PROC_OPUT], NULL);
164  CHECK_DO(ret_code== 0, goto end);
165 
166  /* Initialize array registering input presentation time-stamps (PTS's) */
167  proc_ctx->iput_pts_array_idx= 0;
168  memset(proc_ctx->iput_pts_array, -1, sizeof(proc_ctx->iput_pts_array));
169 
170  /* Initialize latency measurement related variables */
171  proc_ctx->acc_latency_nsec= 0;
172  proc_ctx->acc_latency_cnt= 0;
173  ret_code= pthread_mutex_init(&proc_ctx->latency_mutex, NULL);
174  CHECK_DO(ret_code== 0, goto end);
175 
176  /* Launch statistics thread if applicable */
177  flag_proc_features= proc_if->flag_proc_features;
178  if(flag_proc_features& (PROC_FEATURE_BITRATE|PROC_FEATURE_REGISTER_PTS|
179  PROC_FEATURE_LATENCY)) {
180  /* Launch periodical statistics computing thread
181  * (e.g. for computing processor's input/output bitrate statistics):
182  * - Instantiate (open) an interruptible usleep module instance;
183  * - Launch statistic thread passing corresponding argument structure.
184  */
186  CHECK_DO(proc_ctx->interr_usleep_ctx!= NULL, goto end);
187  ret_code= pthread_create(&proc_ctx->stats_thread, NULL, proc_stats_thr,
188  (void*)proc_ctx);
189  CHECK_DO(ret_code== 0, goto end);
190  }
191 
192  /* At last, launch processing thread if applicable */
193  proc_ctx->flag_exit= 0;
194  if(proc_if->process_frame!= NULL) {
195  proc_ctx->start_routine= (const void*(*)(void*))proc_thr;
196  ret_code= pthread_create(&proc_ctx->proc_thread, NULL, proc_thr,
197  proc_ctx);
198  CHECK_DO(ret_code== 0, goto end);
199  }
200 
201  end_code= STAT_SUCCESS;
202 end:
203  if(end_code!= STAT_SUCCESS)
204  proc_if->close(&proc_ctx);
205  return proc_ctx;
206 }
207 
208 void proc_close(proc_ctx_t **ref_proc_ctx)
209 {
210  proc_ctx_t *proc_ctx= NULL;
211  LOG_CTX_INIT(NULL);
212  LOGD(">>%s\n", __FUNCTION__); //comment-me
213 
214  if(ref_proc_ctx== NULL)
215  return;
216 
217  if((proc_ctx= *ref_proc_ctx)!= NULL) {
218  int (*unblock)(proc_ctx_t *proc_ctx)= NULL;
219  const proc_if_t *proc_if= proc_ctx->proc_if;
220  void *thread_end_code= NULL;
221  LOG_CTX_SET(proc_ctx->log_ctx);
222 
223  /* Join processing thread first
224  * - set flag to notify we are exiting processing;
225  * - unlock input/output FIFO's and unblock processor;
226  * - join thread.
227  */
228  proc_ctx->flag_exit= 1;
229  fifo_set_blocking_mode(proc_ctx->fifo_ctx_array[PROC_IPUT], 0);
230  fifo_set_blocking_mode(proc_ctx->fifo_ctx_array[PROC_OPUT], 0);
231  if(proc_if!= NULL && (unblock= proc_if->unblock)!= NULL) {
232  ASSERT(unblock(proc_ctx)== STAT_SUCCESS);
233  }
234  LOGD("Waiting processor thread to join... "); // comment-me
235  pthread_join(proc_ctx->proc_thread, &thread_end_code);
236  if(thread_end_code!= NULL) {
237  ASSERT(*((int*)thread_end_code)== STAT_SUCCESS);
238  free(thread_end_code);
239  thread_end_code= NULL;
240  }
241  LOGD("joined O.K; "
242  "Waiting statistics thread to join... "); // comment-me
243  /* Join periodical statistics thread:
244  * - Unlock interruptible usleep module instance;
245  * - Join the statistics thread;
246  * - Release (close) the interruptible usleep module instance.
247  */
248  if(proc_ctx->interr_usleep_ctx!= NULL)
250  pthread_join(proc_ctx->stats_thread, &thread_end_code);
251  if(thread_end_code!= NULL) {
252  ASSERT(*((int*)thread_end_code)== STAT_SUCCESS);
253  free(thread_end_code);
254  thread_end_code= NULL;
255  }
257  LOGD("joined O.K.\n"); // comment-me
258 
259  /* Release API mutual exclusion lock */
260  ASSERT(pthread_mutex_destroy(&proc_ctx->api_mutex)== 0);
261 
262  /* Release input and output FIFO's */
263  fifo_close(&proc_ctx->fifo_ctx_array[PROC_IPUT]);
264  fifo_close(&proc_ctx->fifo_ctx_array[PROC_OPUT]);
265 
266  /* Release input/output fair locks */
267  fair_lock_close(&proc_ctx->fair_lock_io_array[PROC_IPUT]);
268  fair_lock_close(&proc_ctx->fair_lock_io_array[PROC_OPUT]);
269 
270  /* Release input/output MUTEX for bitrate statistics related */
271  ASSERT(pthread_mutex_destroy(&proc_ctx->acc_io_bits_mutex[PROC_IPUT])
272  == 0);
273  ASSERT(pthread_mutex_destroy(&proc_ctx->acc_io_bits_mutex[PROC_OPUT])
274  == 0);
275 
276  /* Release latency measurement related variables */
277  ASSERT(pthread_mutex_destroy(&proc_ctx->latency_mutex)== 0);
278 
279  /* Close the specific PROC instance */
280  CHECK_DO(proc_if!= NULL, return); // sanity check
281  CHECK_DO(proc_if->close!= NULL, return); // sanity check
282  proc_if->close(ref_proc_ctx);
283  }
284  LOGD("<<%s\n", __FUNCTION__); //comment-me
285 }
286 
288  const proc_frame_ctx_t *proc_frame_ctx)
289 {
290  const proc_if_t *proc_if;
291  int end_code;
292  int (*send_frame)(proc_ctx_t*, const proc_frame_ctx_t*)= NULL;
293  fair_lock_t *fair_lock_p= NULL;
294  LOG_CTX_INIT(NULL);
295 
296  /* Check arguments */
297  CHECK_DO(proc_ctx!= NULL, return STAT_ERROR);
298  //CHECK_DO(proc_frame_ctx!= NULL,
299  // return STAT_ERROR); // Bypassed by this function
300 
301  /* Get required variables from PROC interface structure */
302  LOG_CTX_SET(proc_ctx->log_ctx);
303 
304  fair_lock_p= proc_ctx->fair_lock_io_array[PROC_IPUT];
305  CHECK_DO(fair_lock_p!= NULL, return STAT_ERROR);
306 
307  proc_if= proc_ctx->proc_if;
308  CHECK_DO(proc_if!= NULL, return STAT_ERROR);
309 
310  /* Send frame to processor
311  * (perform within input interface critical section).
312  */
313  end_code= STAT_ENOTFOUND;
314  if((send_frame= proc_if->send_frame)!= NULL) {
315  fair_lock(fair_lock_p);
316  end_code= send_frame(proc_ctx, proc_frame_ctx);
317  fair_unlock(fair_lock_p);
318  }
319 
320  return end_code;
321 }
322 
324  proc_frame_ctx_t **ref_proc_frame_ctx)
325 {
326  const proc_if_t *proc_if;
327  int ret_code, end_code= STAT_ERROR;
328  int (*recv_frame)(proc_ctx_t*, proc_frame_ctx_t**)= NULL;
329  fair_lock_t *fair_lock_p= NULL;
330  LOG_CTX_INIT(NULL);
331 
332  /* Check arguments */
333  CHECK_DO(proc_ctx!= NULL, return STAT_ERROR);
334  CHECK_DO(ref_proc_frame_ctx!= NULL, return STAT_ERROR);
335 
336  *ref_proc_frame_ctx= NULL;
337 
338  /* Get required variables from PROC interface structure */
339  LOG_CTX_SET(proc_ctx->log_ctx);
340 
341  fair_lock_p= proc_ctx->fair_lock_io_array[PROC_OPUT];
342  CHECK_DO(fair_lock_p!= NULL, goto end);
343 
344  proc_if= proc_ctx->proc_if;
345  CHECK_DO(proc_if!= NULL, goto end);
346 
347  /* Receive frame from processor
348  * (perform within output interface critical section).
349  */
350  ret_code= STAT_ENOTFOUND;
351  if((recv_frame= proc_if->recv_frame)!= NULL) {
352  fair_lock(fair_lock_p);
353  ret_code= recv_frame(proc_ctx, ref_proc_frame_ctx);
354  fair_unlock(fair_lock_p);
355  }
356  if(ret_code!= STAT_SUCCESS) {
357  end_code= ret_code;
358  goto end;
359  }
360 
361  end_code= STAT_SUCCESS;
362 end:
363  if(end_code!= STAT_SUCCESS)
364  proc_frame_ctx_release(ref_proc_frame_ctx);
365  return end_code;
366 }
367 
368 int proc_opt(proc_ctx_t *proc_ctx, const char *tag, ...)
369 {
370  va_list arg;
371  int end_code;
372 
373  va_start(arg, tag);
374 
375  end_code= proc_vopt(proc_ctx, tag, arg);
376 
377  va_end(arg);
378 
379  return end_code;
380 }
381 
382 int proc_vopt(proc_ctx_t *proc_ctx, const char *tag, va_list arg)
383 {
384  int end_code= STAT_ERROR;
385  const proc_if_t *proc_if= NULL;
386  int (*unblock)(proc_ctx_t *proc_ctx)= NULL;
387  int (*rest_put)(proc_ctx_t *proc_ctx, const char *str)= NULL;
388  int (*opt)(proc_ctx_t *proc_ctx, const char *tag, va_list arg)= NULL;
389  LOG_CTX_INIT(NULL);
390 
391  /* Check arguments */
392  CHECK_DO(proc_ctx!= NULL, return STAT_ERROR);
393  CHECK_DO(tag!= NULL, return STAT_ERROR);
394 
395  ASSERT(pthread_mutex_lock(&proc_ctx->api_mutex)== 0);
396 
397  LOG_CTX_SET(proc_ctx->log_ctx);
398 
399  proc_if= proc_ctx->proc_if;
400 
401  if(TAG_IS("PROC_UNBLOCK")) {
402  fifo_set_blocking_mode(proc_ctx->fifo_ctx_array[PROC_IPUT], 0);
403  fifo_set_blocking_mode(proc_ctx->fifo_ctx_array[PROC_OPUT], 0);
404  end_code= STAT_SUCCESS;
405  if(proc_if!= NULL && (unblock= proc_if->unblock)!= NULL)
406  end_code= unblock(proc_ctx);
407  } else if(TAG_IS("PROC_GET")) {
408  proc_if_rest_fmt_t rest_fmt= va_arg(arg, proc_if_rest_fmt_t);
409  void **ref_reponse= va_arg(arg, void**);
410  end_code= procs_id_get(proc_ctx, LOG_CTX_GET(), rest_fmt, ref_reponse);
411  } else if(TAG_IS("PROC_PUT")) {
412  end_code= STAT_ENOTFOUND;
413  if(proc_if!= NULL && (rest_put= proc_if->rest_put)!= NULL)
414  end_code= rest_put(proc_ctx, va_arg(arg, const char*));
415  } else {
416  if(proc_if!= NULL && (opt= proc_if->opt)!= NULL)
417  end_code= opt(proc_ctx, tag, arg);
418  else {
419  LOGE("Unknown option\n");
420  end_code= STAT_ENOTFOUND;
421  }
422  }
423 
424  ASSERT(pthread_mutex_unlock(&proc_ctx->api_mutex)== 0);
425  return end_code;
426 }
427 
428 int proc_send_frame_default1(proc_ctx_t *proc_ctx,
429  const proc_frame_ctx_t *proc_frame_ctx)
430 {
431  register uint64_t flag_proc_features;
432  const proc_if_t *proc_if;
433  LOG_CTX_INIT(NULL);
434 
435  /* Check arguments */
436  CHECK_DO(proc_ctx!= NULL, return STAT_ERROR);
437  //CHECK_DO(proc_frame_ctx!= NULL, return STAT_ERROR); // bypassed
438 
439  LOG_CTX_SET(proc_ctx->log_ctx);
440 
441  /* Get required variables from PROC interface structure */
442  proc_if= proc_ctx->proc_if;
443  CHECK_DO(proc_if!= NULL, return STAT_ERROR);
444  flag_proc_features= proc_if->flag_proc_features;
445 
446  /* Check if input PTS statistics are used by this processor */
447  if((flag_proc_features& PROC_FEATURE_REGISTER_PTS) &&
448  (flag_proc_features&PROC_FEATURE_LATENCY))
449  proc_stats_register_frame_pts(proc_ctx, proc_frame_ctx, PROC_IPUT);
450 
451  /* Treat bitrate statistics if applicable.
452  * For most processors implementation (specially for encoders and
453  * decoders), measuring traffic at this point would be precise.
454  * Nevertheless, for certain processors, as is the case of demultiplexers,
455  * this function ('proc_send_frame()') is not used thus input bitrate
456  * should be measured at some other point of the specific implementation
457  * (e.g. when receiving data from an INET socket).
458  */
459  if(flag_proc_features& PROC_FEATURE_BITRATE)
460  proc_stats_register_accumulated_io_bits(proc_ctx, proc_frame_ctx,
461  PROC_IPUT);
462 
463  /* Write frame to input FIFO */
464  return fifo_put_dup(proc_ctx->fifo_ctx_array[PROC_IPUT], proc_frame_ctx,
465  sizeof(void*));
466 }
467 
468 int proc_recv_frame_default1(proc_ctx_t *proc_ctx,
469  proc_frame_ctx_t **ref_proc_frame_ctx)
470 {
471  register uint64_t flag_proc_features;
472  const proc_if_t *proc_if;
473  int ret_code, end_code= STAT_ERROR;
474  size_t fifo_elem_size= 0;
475  LOG_CTX_INIT(NULL);
476 
477  /* Check arguments */
478  CHECK_DO(proc_ctx!= NULL, return STAT_ERROR);
479  CHECK_DO(ref_proc_frame_ctx!= NULL, return STAT_ERROR);
480 
481  LOG_CTX_SET(proc_ctx->log_ctx);
482 
483  *ref_proc_frame_ctx= NULL;
484 
485  /* Get required variables from PROC interface structure */
486  proc_if= proc_ctx->proc_if;
487  CHECK_DO(proc_if!= NULL, goto end);
488  flag_proc_features= proc_if->flag_proc_features;
489 
490  /* Read a frame from the output FIFO */
491  ret_code= fifo_get(proc_ctx->fifo_ctx_array[PROC_OPUT],
492  (void**)ref_proc_frame_ctx, &fifo_elem_size);
493  if(ret_code!= STAT_SUCCESS) {
494  end_code= ret_code;
495  goto end;
496  }
497 
498  /* Treat bitrate statistics if applicable.
499  * For most processors implementation (specially for encoders and
500  * decoders), measuring traffic at this point would be precise.
501  * Nevertheless, for certain processors, as is the case of multiplexers,
502  * this function ('proc_recv_frame()') is not used thus input bitrate
503  * should be measured at some other point of the specific implementation
504  * (e.g. when sending data to an INET socket).
505  */
506  if(flag_proc_features& PROC_FEATURE_BITRATE)
507  proc_stats_register_accumulated_io_bits(proc_ctx, *ref_proc_frame_ctx,
508  PROC_OPUT);
509 
510  end_code= STAT_SUCCESS;
511 end:
512  if(end_code!= STAT_SUCCESS)
513  proc_frame_ctx_release(ref_proc_frame_ctx);
514  return end_code;
515 }
516 
517 void proc_stats_register_accumulated_latency(proc_ctx_t *proc_ctx,
518  const int64_t oput_frame_pts)
519 {
520  register int i, idx;
521  LOG_CTX_INIT(NULL);
522 
523  /* Check arguments */
524  CHECK_DO(proc_ctx!= NULL, return);
525  if(oput_frame_pts<= 0)
526  return;
527 
528  LOG_CTX_SET(proc_ctx->log_ctx);
529 
530  /* Parse input circular PTS register.
531  * Note that we do not care if this register is modified
532  * concurrently in function 'proc_stats_register_frame_pts()'...
533  * this measurement is still valid.
534  */
535  for(i= 0, idx= proc_ctx->iput_pts_array_idx; i< IPUT_PTS_ARRAY_SIZE;
536  i++, idx= (idx+ 1)% IPUT_PTS_ARRAY_SIZE) {
537  int64_t pts_iput= proc_ctx->iput_pts_array[IPUT_PTS_VAL][idx];
538 
539  if(pts_iput== oput_frame_pts) {
540  register int ret_code;
541  register int64_t curr_nsec, iput_nsec;
542  struct timespec monotime_curr= {0};
543 
544  ret_code= clock_gettime(CLOCK_MONOTONIC, &monotime_curr);
545  CHECK_DO(ret_code== 0, return);
546  curr_nsec= (int64_t)monotime_curr.tv_sec*1000000000+
547  (int64_t)monotime_curr.tv_nsec;
548  iput_nsec= proc_ctx->iput_pts_array[IPUT_PTS_STC_VAL][idx];
549  if(curr_nsec> iput_nsec) {
550  pthread_mutex_t *latency_mutex_p= &proc_ctx->latency_mutex;
551  ASSERT((pthread_mutex_lock(latency_mutex_p))== 0);
552  proc_ctx->acc_latency_nsec+= curr_nsec- iput_nsec;
553  proc_ctx->acc_latency_cnt++;
554  //LOGV("acc_latency_nsec= %"PRId64" (count: %d)\n",
555  // proc_ctx->acc_latency_nsec,
556  // proc_ctx->acc_latency_cnt); //comment-me
557  ASSERT((pthread_mutex_unlock(latency_mutex_p))== 0);
558  }
559  return;
560  }
561  }
562  return;
563 }
564 
565 static int procs_id_get(proc_ctx_t *proc_ctx, log_ctx_t *log_ctx,
566  proc_if_rest_fmt_t rest_fmt, void **ref_reponse)
567 {
568  const proc_if_t *proc_if;
569  uint64_t flag_proc_features;
570  int ret_code, end_code= STAT_ERROR;
571  cJSON *cjson_rest= NULL, *cjson_aux= NULL;
572  int (*rest_get)(proc_ctx_t *proc_ctx, proc_if_rest_fmt_t rest_fmt,
573  void **ref_reponse)= NULL;
574  LOG_CTX_INIT(log_ctx);
575 
576  /* Check arguments */
577  CHECK_DO(proc_ctx!= NULL, return STAT_ERROR);
578  //log_ctx allowed to be NULL
579  CHECK_DO(rest_fmt>= 0 && rest_fmt< PROC_IF_REST_FMT_ENUM_MAX,
580  return STAT_ERROR);
581  CHECK_DO(ref_reponse!= NULL, return STAT_ERROR);
582 
583  *ref_reponse= NULL;
584 
585  /* Check that processor API critical section is locked */
586  ret_code= pthread_mutex_trylock(&proc_ctx->api_mutex);
587  CHECK_DO(ret_code== EBUSY, goto end);
588 
589  /* Get required variables from PROC interface structure */
590  proc_if= proc_ctx->proc_if;
591  CHECK_DO(proc_if!= NULL, goto end);
592  rest_get= proc_if->rest_get;
593  flag_proc_features= proc_if->flag_proc_features;
594 
595  /* Check if GET function callback is implemented by specific processor */
596  if(rest_get== NULL) {
597  /* Nothing to do */
598  end_code= STAT_ENOTFOUND;
599  goto end;
600  }
601 
602  /* GET processor's REST response */
603  ret_code= rest_get(proc_ctx, PROC_IF_REST_FMT_CJSON, (void**)&cjson_rest);
604  if(ret_code!= STAT_SUCCESS) {
605  end_code= ret_code;
606  goto end;
607  }
608  CHECK_DO(cjson_rest!= NULL, goto end);
609 
610  /* **** Add some REST elements at top ****
611  * We do a little HACK to insert elements at top as cJSON library does not
612  * support it natively -it always insert at the bottom-
613  * We do this at the risk of braking in a future library version, as we
614  * base current solution on the internal implementation of function
615  * 'cJSON_AddItemToObject()' -may change in future-.
616  */
617 
618  if(flag_proc_features&PROC_FEATURE_LATENCY) {
619  /* 'latency_avg_usec' */
620  cjson_aux= cJSON_CreateNumber((double)proc_ctx->latency_avg_usec);
621  CHECK_DO(cjson_aux!= NULL, goto end);
622  // Hack of 'cJSON_AddItemToObject(cjson_rest, "latency_avg_usec",
623  // cjson_aux);':
624  cjson_aux->string= (char*)strdup("latency_avg_usec");
625  cjson_aux->type|= cJSON_StringIsConst;
626  //cJSON_AddItemToArray(cjson_rest, cjson_aux);
627  cJSON_InsertItemInArray(cjson_rest, 0, cjson_aux); // Insert at top
628  cjson_aux->type&= ~cJSON_StringIsConst;
629  }
630 
631  /* Format response to be returned */
632  switch(rest_fmt) {
634  /* Print cJSON structure data to char string */
635  *ref_reponse= (void*)CJSON_PRINT(cjson_rest);
636  CHECK_DO(*ref_reponse!= NULL && strlen((char*)*ref_reponse)> 0,
637  goto end);
638  break;
640  *ref_reponse= (void*)cjson_rest;
641  cjson_rest= NULL; // Avoid double referencing
642  break;
643  default:
644  LOGE("Unknown format requested for processor REST\n");
645  goto end;
646  }
647 
648  end_code= STAT_SUCCESS;
649 end:
650  if(cjson_rest!= NULL)
651  cJSON_Delete(cjson_rest);
652  return end_code;
653 }
654 
655 static void* proc_stats_thr(void *t)
656 {
657  const proc_if_t *proc_if;
658  uint64_t flag_proc_features;
659  proc_ctx_t *proc_ctx= (proc_ctx_t*)t;
660  int *ref_end_code= NULL;
661  interr_usleep_ctx_t *interr_usleep_ctx= NULL; // Do not release
662  LOG_CTX_INIT(NULL);
663 
664  /* Allocate return context; initialize to a default 'ERROR' value */
665  ref_end_code= (int*)malloc(sizeof(int));
666  CHECK_DO(ref_end_code!= NULL, return NULL);
667  *ref_end_code= STAT_ERROR;
668 
669  /* Check arguments */
670  CHECK_DO(proc_ctx!= NULL, goto end);
671 
672  interr_usleep_ctx= proc_ctx->interr_usleep_ctx;
673  CHECK_DO(interr_usleep_ctx!= NULL, goto end);
674 
675  LOG_CTX_SET(proc_ctx->log_ctx);
676 
677  /* Get required variables from PROC interface structure */
678  proc_if= proc_ctx->proc_if;
679  CHECK_DO(proc_if!= NULL, goto end);
680  flag_proc_features= proc_if->flag_proc_features;
681 
682  while(proc_ctx->flag_exit== 0) {
683  register int ret_code;
684 
685  /* Bitrate statistics */
686  if(flag_proc_features& PROC_FEATURE_BITRATE) {
687  register uint32_t bit_counter_iput, bit_counter_oput;
688  pthread_mutex_t *acc_io_bits_mutex_iput_p=
689  &proc_ctx->acc_io_bits_mutex[PROC_IPUT];
690  pthread_mutex_t *acc_io_bits_mutex_oput_p=
691  &proc_ctx->acc_io_bits_mutex[PROC_OPUT];
692 
693  /* Note that this is a periodic loop executed once per second
694  * (Thus, 'bitrate' is given in bits per second)
695  */
696  ASSERT(pthread_mutex_lock(acc_io_bits_mutex_iput_p)== 0);
697  bit_counter_iput= proc_ctx->acc_io_bits[PROC_IPUT];
698  proc_ctx->acc_io_bits[PROC_IPUT]= 0;
699  ASSERT(pthread_mutex_unlock(acc_io_bits_mutex_iput_p)== 0);
700  proc_ctx->bitrate[PROC_IPUT]= bit_counter_iput;
701 
702  ASSERT(pthread_mutex_lock(acc_io_bits_mutex_oput_p)== 0);
703  bit_counter_oput= proc_ctx->acc_io_bits[PROC_OPUT];
704  proc_ctx->acc_io_bits[PROC_OPUT]= 0;
705  ASSERT(pthread_mutex_unlock(acc_io_bits_mutex_oput_p)== 0);
706  proc_ctx->bitrate[PROC_OPUT]= bit_counter_oput;
707  }
708 
709  /* Check if latency statistics are used by this processor */
710  if(flag_proc_features&PROC_FEATURE_LATENCY) {
711  register int acc_latency_cnt;
712  register int64_t acc_latency_usec= 0;
713  pthread_mutex_t *latency_mutex_p= &proc_ctx->latency_mutex;
714 
715  ASSERT((pthread_mutex_lock(latency_mutex_p))== 0);
716  acc_latency_cnt= proc_ctx->acc_latency_cnt;
717  proc_ctx->acc_latency_cnt= 0;
718  if(acc_latency_cnt> 0)
719  acc_latency_usec= proc_ctx->acc_latency_nsec/ acc_latency_cnt;
720  proc_ctx->acc_latency_nsec= 0;
721  ASSERT((pthread_mutex_unlock(latency_mutex_p))== 0);
722  acc_latency_usec/= 1000; // convert nsec-> usec
723  proc_ctx->latency_avg_usec= acc_latency_usec;
724  //LOGV("----> Average latency (usec/sec)= %"PRId64"\n",
725  // acc_latency_usec); //comment-me
726  if(acc_latency_usec> proc_ctx->latency_max_usec)
727  proc_ctx->latency_max_usec= acc_latency_usec;
728  if(proc_ctx->latency_min_usec<= 0)
729  proc_ctx->latency_min_usec= acc_latency_usec;
730  if(acc_latency_usec> 0 &&
731  acc_latency_usec< proc_ctx->latency_min_usec)
732  proc_ctx->latency_min_usec= acc_latency_usec;
733  }
734 
735  /* Sleep given time (interruptible by external thread) */
736  ret_code= interr_usleep(interr_usleep_ctx,
738  ASSERT(ret_code== STAT_SUCCESS || ret_code== STAT_EINTR);
739  }
740 
741  *ref_end_code= STAT_SUCCESS;
742 end:
743  return (void*)ref_end_code;
744 }
745 
746 static void* proc_thr(void *t)
747 {
748  const proc_if_t *proc_if;
749  proc_ctx_t* proc_ctx= (proc_ctx_t*)t;
750  int ret_code, *ref_end_code= NULL;
751  int (*process_frame)(proc_ctx_t*, fifo_ctx_t*, fifo_ctx_t*)= NULL;
752  fifo_ctx_t *iput_fifo_ctx= NULL, *oput_fifo_ctx= NULL;
753  LOG_CTX_INIT(NULL);
754 
755  /* Allocate return context; initialize to a default 'ERROR' value */
756  ref_end_code= (int*)malloc(sizeof(int));
757  CHECK_DO(ref_end_code!= NULL, return NULL);
758  *ref_end_code= STAT_ERROR;
759 
760  /* Check arguments */
761  CHECK_DO(proc_ctx!= NULL, goto end);
762 
763  LOG_CTX_SET(proc_ctx->log_ctx);
764 
765  /* Get PROC processing callback */
766  proc_if= proc_ctx->proc_if;
767  CHECK_DO(proc_if!= NULL, goto end);
768  process_frame= proc_if->process_frame;
769  CHECK_DO(process_frame!= NULL, goto end);
770 
771  /* Get input/output FIFO buffers */
772  iput_fifo_ctx= proc_ctx->fifo_ctx_array[PROC_IPUT];
773  oput_fifo_ctx= proc_ctx->fifo_ctx_array[PROC_OPUT];
774  CHECK_DO(iput_fifo_ctx!= NULL && oput_fifo_ctx!= NULL, goto end);
775 
776  /* Run processing thread */
777  while(proc_ctx->flag_exit== 0) {
778  ret_code= process_frame(proc_ctx, iput_fifo_ctx, oput_fifo_ctx);
779  if(ret_code== STAT_EOF)
780  proc_ctx->flag_exit= 1;
781  else if(ret_code!= STAT_SUCCESS)
782  schedule(); // Avoid CPU-consuming closed loops
783  }
784 
785  *ref_end_code= STAT_SUCCESS;
786 end:
787  return (void*)ref_end_code;
788 }
789 
794  const proc_frame_ctx_t *proc_frame_ctx, const proc_io_t proc_io)
795 {
796  register int64_t curr_nsec;
797  struct timespec monotime_curr= {0};
798  LOG_CTX_INIT(NULL);
799 
800  /* Check arguments */
801  CHECK_DO(proc_ctx!= NULL, return);
802  CHECK_DO(proc_frame_ctx!= NULL, return);
803 
804  LOG_CTX_SET(proc_ctx->log_ctx);
805 
806  /* Register input PTS */
807  proc_ctx->iput_pts_array[IPUT_PTS_VAL][proc_ctx->iput_pts_array_idx]=
808  proc_frame_ctx->pts;
809 
810  /* Register STC value corresponding to the input PTS */
811  CHECK_DO(clock_gettime(CLOCK_MONOTONIC, &monotime_curr)== 0, return);
812  curr_nsec= (int64_t)monotime_curr.tv_sec*1000000000+
813  (int64_t)monotime_curr.tv_nsec;
814  proc_ctx->iput_pts_array[IPUT_PTS_STC_VAL][proc_ctx->iput_pts_array_idx]=
815  curr_nsec;
816 
817  /* Update array index */
818  proc_ctx->iput_pts_array_idx= (proc_ctx->iput_pts_array_idx+ 1)%
820 
821  return;
822 }
823 
829  const proc_frame_ctx_t *proc_frame_ctx, const proc_io_t proc_io)
830 {
831  register uint32_t byte_cnt, bit_cnt;
832  register size_t width;
833  register int i;
834  pthread_mutex_t *acc_io_bits_mutex_p= NULL;
835  LOG_CTX_INIT(NULL);
836 
837  /* Check arguments */
838  CHECK_DO(proc_ctx!= NULL, return);
839  CHECK_DO(proc_frame_ctx!= NULL, return);
840 
841  LOG_CTX_SET(proc_ctx->log_ctx);
842 
843  acc_io_bits_mutex_p= &proc_ctx->acc_io_bits_mutex[proc_io];
844 
845  /* Get frame size (we "unroll" for the most common cases) */
846  byte_cnt= (proc_frame_ctx->width[0]* proc_frame_ctx->height[0])+
847  (proc_frame_ctx->width[1]* proc_frame_ctx->height[1])+
848  (proc_frame_ctx->width[2]* proc_frame_ctx->height[2]);
849  // We do the rest in a loop...
850  for(i= 3; (width= proc_frame_ctx->width[i])> 0 &&
851  i< PROC_FRAME_NUM_DATA_POINTERS; i++)
852  byte_cnt+= width* proc_frame_ctx->height[i];
853 
854  /* Update currently accumulated bit value */
855  bit_cnt= (byte_cnt<< 3);
856  ASSERT(pthread_mutex_lock(acc_io_bits_mutex_p)== 0);
857  proc_ctx->acc_io_bits[proc_io]+= bit_cnt;
858  ASSERT(pthread_mutex_unlock(acc_io_bits_mutex_p)== 0);
859 
860  return;
861 }
void fifo_close(fifo_ctx_t **ref_fifo_ctx)
Definition: fifo.c:327
int fifo_get(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size)
Definition: fifo.c:366
#define CJSON_PRINT(CJSON_PTR)
Definition: proc.h:70
volatile int64_t latency_avg_usec
Definition: proc.h:148
size_t width[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:113
void(* iput_fifo_elem_opaque_release)(void **ref_t)
Definition: proc_if.h:324
interr_usleep_ctx_t * interr_usleep_open()
Definition: interr_usleep.c:70
void interr_usleep_unblock(interr_usleep_ctx_t *interr_usleep_ctx)
void proc_frame_ctx_release(proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: proc_if.c:125
int fifo_put_dup(fifo_ctx_t *fifo_ctx, const void *elem, size_t elem_size)
Definition: fifo.c:355
proc_ctx_t *(* open)(const proc_if_t *proc_if, const char *settings_str, log_ctx_t *log_ctx, va_list arg)
Definition: proc_if.h:207
void fifo_set_blocking_mode(fifo_ctx_t *fifo_ctx, int do_block)
Definition: fifo.c:332
int proc_send_frame(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx)
Definition: proc.c:287
int proc_opt(proc_ctx_t *proc_ctx, const char *tag,...)
Definition: proc.c:368
proc_ctx_t * proc_open(const proc_if_t *proc_if, const char *settings_str, int proc_instance_index, uint32_t fifo_ctx_maxsize[PROC_IO_NUM], log_ctx_t *log_ctx, va_list arg)
Definition: proc.c:87
pthread_t proc_thread
Definition: proc.h:173
pthread_t stats_thread
Definition: proc.h:163
int proc_vopt(proc_ctx_t *proc_ctx, const char *tag, va_list arg)
Definition: proc.c:382
Generic processor (PROC) module.
int(* recv_frame)(proc_ctx_t *proc_ctx, proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: proc_if.h:230
int(* process_frame)(proc_ctx_t *proc_ctx, fifo_ctx_t *fifo_ctx_iput, fifo_ctx_t *fifo_ctx_oput)
Definition: proc_if.h:279
uint64_t flag_proc_features
Definition: proc_if.h:190
int(* opt)(proc_ctx_t *proc_ctx, const char *tag, va_list arg)
Definition: proc_if.h:295
proc_io_t
Definition: proc.h:76
#define IPUT_PTS_ARRAY_SIZE
Definition: proc.h:131
pthread_mutex_t api_mutex
Definition: proc.h:99
const void *(* start_routine)(void *)
Definition: proc.h:181
#define CHECK_DO(COND, ACTION)
Definition: check_utils.h:57
Character string response.
Definition: proc_if.h:158
volatile int flag_exit
Definition: proc.h:169
interr_usleep_ctx_t * interr_usleep_ctx
Definition: proc.h:159
fifo_ctx_t * fifo_open(size_t slots_max, size_t chunk_size_max, uint32_t flags, const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn)
Definition: fifo.c:195
enum proc_if_rest_fmt_enum proc_if_rest_fmt_t
static void proc_stats_register_accumulated_io_bits(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx, const proc_io_t proc_io)
Definition: proc.c:828
const proc_if_t * proc_if
Definition: proc.h:89
fifo_ctx_t * fifo_ctx_array[PROC_IO_NUM]
Definition: proc.h:107
void(* close)(proc_ctx_t **ref_proc_ctx)
Definition: proc_if.h:217
int proc_recv_frame(proc_ctx_t *proc_ctx, proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: proc.c:323
cJSON structure response
Definition: proc_if.h:159
int(* send_frame)(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx)
Definition: proc_if.h:222
#define TAG_IS(TAG)
Definition: proc.c:65
int(* unblock)(proc_ctx_t *proc_ctx)
Definition: proc_if.h:235
int proc_instance_index
Definition: proc.h:95
#define ASSERT(COND)
Definition: check_utils.h:51
static void proc_stats_register_frame_pts(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx, const proc_io_t proc_io)
Definition: proc.c:793
void *(* iput_fifo_elem_opaque_dup)(const proc_frame_ctx_t *proc_frame_ctx)
Definition: proc_if.h:311
size_t height[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:119
Definition: log.c:102
#define PROC_STATS_THR_MEASURE_PERIOD_USECS
Definition: proc.c:70
void proc_close(proc_ctx_t **ref_proc_ctx)
Definition: proc.c:208
PROC interface prototype related definitions and functions.
int(* rest_get)(proc_ctx_t *proc_ctx, const proc_if_rest_fmt_t rest_fmt, void **ref_reponse)
Definition: proc_if.h:265
int interr_usleep(interr_usleep_ctx_t *interr_usleep_ctx, uint32_t usec)
fair_lock_t * fair_lock_io_array[PROC_IO_NUM]
Definition: proc.h:111
volatile uint32_t bitrate[PROC_IO_NUM]
Definition: proc.h:121
proc_frame_ctx_t *(* oput_fifo_elem_opaque_dup)(const void *t)
Definition: proc_if.h:339
log_ctx_t * log_ctx
Definition: proc.h:103
int64_t pts
Definition: proc_if.h:138
void interr_usleep_close(interr_usleep_ctx_t **ref_interr_usleep_ctx)
int(* rest_put)(proc_ctx_t *proc_ctx, const char *str)
Definition: proc_if.h:248