MediaProcessors
procs.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017, 2018 Rafael Antoniello
3  *
4  * This file is part of MediaProcessors.
5  *
6  * MediaProcessors is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Lesser General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * MediaProcessors is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with MediaProcessors. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
25 #include "procs.h"
26 
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #include <stdarg.h>
31 #include <string.h>
32 #include <pthread.h>
33 #include <errno.h>
34 #include <ctype.h>
35 
36 #include <libcjson/cJSON.h>
37 #include <libmediaprocsutils/uri_parser.h>
38 #include <libmediaprocsutils/log.h>
39 #include <libmediaprocsutils/stat_codes.h>
40 #include <libmediaprocsutils/check_utils.h>
41 #include <libmediaprocsutils/fair_lock.h>
42 #include <libmediaprocsutils/llist.h>
43 
44 #include "proc.h"
45 #include "proc_if.h"
46 
47 /* **** Definitions **** */
48 
49 //#define ENABLE_DEBUG_LOGS
50 #ifdef ENABLE_DEBUG_LOGS
51  #define LOGD_CTX_INIT(CTX) LOG_CTX_INIT(CTX)
52  #define LOGD(FORMAT, ...) LOGV(FORMAT, ##__VA_ARGS__)
53 #else
54  #define LOGD_CTX_INIT(CTX)
55  #define LOGD(...)
56 #endif
57 
61 #define TAG_HAS(NEEDLE) (strstr(tag, NEEDLE)!= NULL)
62 
66 #define TAG_IS(TAG) (strcmp(tag, TAG)== 0)
67 
71 #define LOCK_PROCS_CTX_API(PROCS_CTX) \
72  ASSERT(pthread_mutex_lock(&PROCS_CTX->api_mutex)== 0);
73 
77 #define UNLOCK_PROCS_CTX_API(PROCS_CTX) \
78  ASSERT(pthread_mutex_unlock(&PROCS_CTX->api_mutex)== 0);
79 
85 #define LOCK_PROCS_REG_ELEM_API(PROCS_CTX, REG_ELEM, EXIT_CODE_ON_FAILURE) \
86  if(pthread_mutex_trylock(&PROCS_CTX->api_mutex)!= EBUSY) {\
87  EXIT_CODE_ON_FAILURE;\
88  }\
89  ASSERT(pthread_mutex_lock(&REG_ELEM->api_mutex)== 0);
90 
94 #define UNLOCK_PROCS_REG_ELEM_API(REG_ELEM) \
95  ASSERT(pthread_mutex_unlock(&REG_ELEM->api_mutex)== 0);
96 
102 #define PROCS_MAX_NUM_PROC_INSTANCES 8192
103 
104 /*
105  * Input/output processor's FIFOs size.
106  * //TODO: This value should be passed as a module setting in the future.
107  */
108 #define PROCS_FIFO_SIZE 2
109 
114 typedef struct procs_module_ctx_s {
122  pthread_mutex_t module_api_mutex;
131 
135 typedef struct procs_reg_elem_s {
146  pthread_mutex_t api_mutex;
154  fair_lock_t *fair_lock_io_array[PROC_IO_NUM];
167 
172 typedef struct procs_ctx_s {
178  char prefix_name[256];
183  char *procs_href;
191  pthread_mutex_t api_mutex;
211 } procs_ctx_t;
212 
213 /* **** Prototypes **** */
214 
215 static int register_proc_if(const proc_if_t *proc_if, log_ctx_t *log_ctx);
216 static int unregister_proc_if(const char *proc_name, log_ctx_t *log_ctx);
217 static const proc_if_t* get_proc_if_by_name(const char *proc_name,
218  log_ctx_t *log_ctx);
219 
220 static int procs_instance_opt(procs_ctx_t *procs_ctx, const char *tag,
221  log_ctx_t *log_ctx, va_list arg);
222 
223 static int procs_rest_get(procs_ctx_t *procs_ctx, log_ctx_t *log_ctx,
224  char **ref_rest_str, const char *filter_str);
225 static int proc_register(procs_ctx_t *procs_ctx, const char *proc_name,
226  const char *settings_str, log_ctx_t *log_ctx, int *ref_id, va_list arg);
227 static int proc_unregister(procs_ctx_t *procs_ctx, int id, log_ctx_t *log_ctx);
228 
229 static int procs_id_opt(procs_ctx_t *procs_ctx, const char *tag,
230  log_ctx_t *log_ctx, va_list arg);
231 
232 static int procs_id_get(procs_reg_elem_t *procs_reg_elem, proc_ctx_t *proc_ctx,
233  log_ctx_t *log_ctx, void **ref_reponse);
235  int proc_id, const char *tag, va_list arg, log_ctx_t *log_ctx);
236 
237 /* **** Implementations **** */
238 
243 
245 {
246  int ret_code, end_code= STAT_ERROR;
247  LOG_CTX_INIT(log_ctx);
248 
249  /* Check module initialization */
250  if(procs_module_ctx!= NULL) {
251  LOGW("'PROCS' module already initialized\n");
252  return STAT_NOTMODIFIED;
253  }
254 
255  procs_module_ctx= (procs_module_ctx_t*)calloc(1, sizeof(
257  CHECK_DO(procs_module_ctx!= NULL, goto end);
258 
259  /* **** Initialize context **** */
260 
261  ret_code= pthread_mutex_init(&procs_module_ctx->module_api_mutex, NULL);
262  CHECK_DO(ret_code== 0, goto end);
263 
264  procs_module_ctx->proc_if_llist= NULL;
265 
266  end_code= STAT_SUCCESS;
267 end:
268  if(end_code!= STAT_SUCCESS)
270  return STAT_SUCCESS;
271 }
272 
274 {
275  LOG_CTX_INIT(NULL);
276 
277  /* Check module initialization */
278  if(procs_module_ctx== NULL) {
279  LOGE("'PROCS' module must be initialized previously\n");
280  return;
281  }
282 
283  /* Module's API mutual exclusion lock */
284  ASSERT(pthread_mutex_destroy(&procs_module_ctx->module_api_mutex)== 0);
285 
286  /* List of supported/registered processor types */
288 
289  /* Release module's context structure */
290  free(procs_module_ctx);
291  procs_module_ctx= NULL;
292 }
293 
294 int procs_module_opt(const char *tag, ...)
295 {
296  va_list arg;
297  int end_code= STAT_ERROR;
298  LOG_CTX_INIT(NULL);
299 
300  /* Check arguments */
301  if(procs_module_ctx== NULL) {
302  LOGE("'PROCS' module should be initialized previously\n");
303  return STAT_ERROR;
304  }
305  CHECK_DO(tag!= NULL, return STAT_ERROR);
306 
307  va_start(arg, tag);
308 
309  /* Lock module API critical section */
310  ASSERT(pthread_mutex_lock(&procs_module_ctx->module_api_mutex)== 0);
311 
312  if(TAG_IS("PROCS_REGISTER_TYPE")) {
313  end_code= register_proc_if(va_arg(arg, proc_if_t*), LOG_CTX_GET());
314  } else if (TAG_IS("PROCS_UNREGISTER_TYPE")) {
315  end_code= unregister_proc_if(va_arg(arg, const char*), LOG_CTX_GET());
316  } else if (TAG_IS("PROCS_GET_TYPE")) {
317  const proc_if_t *proc_if_register= NULL; // Do not release
318  const char *proc_name= va_arg(arg, const char*);
319  proc_if_t **ref_proc_if_cpy= va_arg(arg, proc_if_t**);
320  proc_if_register= get_proc_if_by_name(proc_name, LOG_CTX_GET());
321  if(proc_if_register!= NULL)
322  *ref_proc_if_cpy= proc_if_dup(proc_if_register);
323  else
324  *ref_proc_if_cpy= NULL;
325  end_code= (*ref_proc_if_cpy!= NULL)? STAT_SUCCESS: STAT_ENOTFOUND;
326  } else {
327  LOGE("Unknown option\n");
328  end_code= STAT_ENOTFOUND;
329  }
330 
331  ASSERT(pthread_mutex_unlock(&procs_module_ctx->module_api_mutex)== 0);
332  va_end(arg);
333  return end_code;
334 }
335 
336 procs_ctx_t* procs_open(log_ctx_t *log_ctx, size_t max_procs_num,
337  const char *prefix_name, const char *procs_href)
338 {
339  int proc_id, i, ret_code, end_code= STAT_ERROR;
340  procs_ctx_t *procs_ctx= NULL;
341  LOG_CTX_INIT(log_ctx);
342 
343  /* Check module initialization */
344  if(procs_module_ctx== NULL) {
345  LOGE("'PROCS' module should be initialized previously\n");
346  return NULL;
347  }
348 
349  /* Check arguments */
350  // Argument 'log_ctx' is allowed to be NULL
351  if(max_procs_num> PROCS_MAX_NUM_PROC_INSTANCES) {
352  LOGE("Specified maximum number of processor exceeds system capacity\n");
353  return NULL;
354  }
355  // Argument 'prefix_name' is allowed to be NULL
356 
357  /* Allocate context structure */
358  procs_ctx= (procs_ctx_t*)calloc(1, sizeof(procs_ctx_t));
359  CHECK_DO(procs_ctx!= NULL, goto end);
360 
361  /* **** Initialize context **** */
362 
363  if(prefix_name!= NULL && strlen(prefix_name)> 0) {
364  CHECK_DO(strlen(prefix_name)< sizeof(procs_ctx->prefix_name),
365  goto end);
366  snprintf(procs_ctx->prefix_name, sizeof(procs_ctx->prefix_name),
367  "%s", prefix_name);
368  } else {
369  snprintf(procs_ctx->prefix_name, sizeof(procs_ctx->prefix_name),
370  "procs");
371  }
372 
373  if(procs_href!= NULL && strlen(procs_href)> 0) {
374  procs_ctx->procs_href= strdup(procs_href);
375  CHECK_DO(procs_ctx->procs_href!= NULL, goto end);
376  }
377 
378  ret_code= pthread_mutex_init(&procs_ctx->api_mutex, NULL);
379  CHECK_DO(ret_code== 0, goto end);
380 
381  procs_ctx->procs_reg_elem_array= (procs_reg_elem_t*)malloc(max_procs_num*
382  sizeof(procs_reg_elem_t));
383  CHECK_DO(procs_ctx->procs_reg_elem_array!= NULL, goto end)
384  for(proc_id= 0; proc_id< max_procs_num; proc_id++) {
385  procs_reg_elem_t *procs_reg_elem=
386  &procs_ctx->procs_reg_elem_array[proc_id];
387 
388  ret_code= pthread_mutex_init(&procs_reg_elem->api_mutex, NULL);
389  CHECK_DO(ret_code== 0, goto end);
390 
391  for(i= 0; i< PROC_IO_NUM; i++) {
392  fair_lock_t *fair_lock= fair_lock_open();
393  CHECK_DO(fair_lock!= NULL, goto end);
394  procs_reg_elem->fair_lock_io_array[i]= fair_lock;
395  }
396 
397  procs_reg_elem->proc_ctx= NULL;
398  }
399 
400  /* Important note:
401  * We update the register array size *after* we successfully allocated and
402  * initialized the array; otherwise the size keeps set to zero
403  * (thus, we can securely execute 'procs_close()' on failure, as other
404  * PROCS module functions).
405  */
406  procs_ctx->procs_reg_elem_array_size= max_procs_num;
407 
408  procs_ctx->log_ctx= log_ctx;
409 
410  end_code= STAT_SUCCESS;
411 end:
412  if(end_code!= STAT_SUCCESS)
413  procs_close(&procs_ctx);
414  return procs_ctx;
415 }
416 
417 void procs_close(procs_ctx_t **ref_procs_ctx)
418 {
419  procs_ctx_t *procs_ctx;
420  int procs_reg_elem_array_size, proc_id, i;
421  LOG_CTX_INIT(NULL);
422  LOGD(">>%s\n", __FUNCTION__); //comment-me
423 
424  if(ref_procs_ctx== NULL || (procs_ctx= *ref_procs_ctx)== NULL)
425  return;
426 
427  LOG_CTX_SET(procs_ctx->log_ctx);
428 
429  procs_reg_elem_array_size= procs_ctx->procs_reg_elem_array_size;
430 
431  /* First of all release all the processors (note that for deleting
432  * the processors we need the processor IF type to be still available).
433  */
434  LOCK_PROCS_CTX_API(procs_ctx);
435  if(procs_ctx->procs_reg_elem_array!= NULL) {
436  for(proc_id= 0; proc_id< procs_reg_elem_array_size; proc_id++) {
437  LOGD("unregistering proc with Id.: %d\n", proc_id); //comment-me
438  proc_unregister(procs_ctx, proc_id, LOG_CTX_GET());
439  }
440  }
441  UNLOCK_PROCS_CTX_API(procs_ctx);
442 
443  /* Module's API REST href attribute */
444  if(procs_ctx->procs_href!= NULL) {
445  free(procs_ctx->procs_href);
446  procs_ctx->procs_href= NULL;
447  }
448 
449  /* Module's instance API mutual exclusion lock */
450  ASSERT(pthread_mutex_destroy(&procs_ctx->api_mutex)== 0);
451 
452  /* Array listing the registered processor instances */
453  if(procs_ctx->procs_reg_elem_array!= NULL) {
454  for(proc_id= 0; proc_id< procs_reg_elem_array_size; proc_id++) {
455  procs_reg_elem_t *procs_reg_elem=
456  &procs_ctx->procs_reg_elem_array[proc_id];
457 
458  ASSERT(pthread_mutex_destroy(&procs_reg_elem->api_mutex)== 0);
459 
460  for(i= 0; i< PROC_IO_NUM; i++)
461  fair_lock_close(&procs_reg_elem->fair_lock_io_array[i]);
462  }
463  free(procs_ctx->procs_reg_elem_array);
464  procs_ctx->procs_reg_elem_array= NULL;
465  }
466 
467  /* Release module's instance context structure */
468  free(procs_ctx);
469  *ref_procs_ctx= NULL;
470 
471  LOGD("<<%s\n", __FUNCTION__); //comment-me
472 }
473 
474 int procs_opt(procs_ctx_t *procs_ctx, const char *tag, ...)
475 {
476  va_list arg;
477  int end_code= STAT_ERROR;
478  LOG_CTX_INIT(NULL);
479 
480  /* Check arguments */
481  CHECK_DO(procs_ctx!= NULL, return STAT_ERROR);
482  CHECK_DO(tag!= NULL, return STAT_ERROR);
483 
484  /* Check module initialization */
485  if(procs_module_ctx== NULL) {
486  LOGE("'PROCS' module should be initialized previously\n");
487  return STAT_ERROR;
488  }
489 
490  va_start(arg, tag);
491 
492  LOG_CTX_SET(procs_ctx->log_ctx);
493 
494  if(TAG_HAS("PROCS_ID") && !TAG_IS("PROCS_ID_DELETE")) {
495  end_code= procs_id_opt(procs_ctx, tag, LOG_CTX_GET(), arg);
496  } else {
497  end_code= procs_instance_opt(procs_ctx, tag, LOG_CTX_GET(), arg);
498  }
499 
500  va_end(arg);
501  return end_code;
502 }
503 
504 int procs_send_frame(procs_ctx_t *procs_ctx, int proc_id,
505  const proc_frame_ctx_t *proc_frame_ctx)
506 {
507  int procs_reg_elem_array_size, end_code= STAT_ERROR;
508  procs_reg_elem_t *procs_reg_elem;
509  fair_lock_t *p_fair_lock= NULL;
510  proc_ctx_t *proc_ctx= NULL;
511  LOG_CTX_INIT(NULL);
512 
513  /* Check arguments */
514  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
515  CHECK_DO(procs_ctx!= NULL, return STAT_ERROR);
516  procs_reg_elem_array_size= procs_ctx->procs_reg_elem_array_size;
517  CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size,
518  return STAT_ERROR);
519  CHECK_DO(proc_frame_ctx!= NULL, return STAT_ERROR);
520 
521  LOG_CTX_SET(procs_ctx->log_ctx);
522 
523  procs_reg_elem= &procs_ctx->procs_reg_elem_array[proc_id];
524 
525  p_fair_lock= procs_reg_elem->fair_lock_io_array[PROC_IPUT];
526  CHECK_DO(p_fair_lock!= NULL, goto end);
527 
528  fair_lock(p_fair_lock);
529 
530  /* Write frame to processor input FIFO if applicable.
531  * Note that we are operating in the i/o critical section; thus this
532  * operation is thread safe and can be executed concurrently with any
533  * other i/o or API operation.
534  */
535  if((proc_ctx= procs_reg_elem->proc_ctx)== NULL) {
536  end_code= STAT_ENOTFOUND;
537  goto end;
538  }
539  end_code= proc_send_frame(proc_ctx, proc_frame_ctx);
540 end:
541  fair_unlock(p_fair_lock);
542  return end_code;
543 }
544 
545 int procs_recv_frame(procs_ctx_t *procs_ctx, int proc_id,
546  proc_frame_ctx_t **ref_proc_frame_ctx)
547 {
548  int procs_reg_elem_array_size, end_code= STAT_ERROR;
549  procs_reg_elem_t *procs_reg_elem;
550  fair_lock_t *p_fair_lock= NULL;
551  proc_ctx_t *proc_ctx= NULL;
552  LOG_CTX_INIT(NULL);
553 
554  /* Check arguments */
555  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
556  CHECK_DO(procs_ctx!= NULL, return STAT_ERROR);
557  procs_reg_elem_array_size= procs_ctx->procs_reg_elem_array_size;
558  CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size,
559  return STAT_ERROR);
560  CHECK_DO(ref_proc_frame_ctx!= NULL, return STAT_ERROR);
561 
562  *ref_proc_frame_ctx= NULL;
563 
564  LOG_CTX_SET(procs_ctx->log_ctx);
565 
566  procs_reg_elem= &procs_ctx->procs_reg_elem_array[proc_id];
567 
568  p_fair_lock= procs_reg_elem->fair_lock_io_array[PROC_OPUT];
569  CHECK_DO(p_fair_lock!= NULL, goto end);
570 
571  fair_lock(p_fair_lock);
572 
573  /* Read frame from processor output FIFO if applicable.
574  * Note that we are operating in the i/o critical section; thus this
575  * operation is thread safe and can be executed concurrently with any
576  * other i/o or API operation.
577  */
578  if((proc_ctx= procs_reg_elem->proc_ctx)== NULL) {
579  end_code= STAT_ENOTFOUND;
580  goto end;
581  }
582  end_code= proc_recv_frame(proc_ctx, ref_proc_frame_ctx);
583 end:
584  fair_unlock(p_fair_lock);
585  return end_code;
586 }
587 
588 static int register_proc_if(const proc_if_t *proc_if, log_ctx_t *log_ctx)
589 {
590  llist_t *n;
591  int ret_code, end_code= STAT_ERROR;
592  proc_if_t *proc_if_cpy= NULL;
593  LOG_CTX_INIT(log_ctx);
594 
595  /* Check arguments */
596  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
597  CHECK_DO(proc_if!= NULL, return STAT_ERROR);
598 
599  /* Check that module API critical section is locked */
600  ret_code= pthread_mutex_trylock(&procs_module_ctx->module_api_mutex);
601  CHECK_DO(ret_code== EBUSY, return STAT_ERROR);
602 
603  /* Check if processor is already register with given "name" */
604  for(n= procs_module_ctx->proc_if_llist; n!= NULL; n= n->next) {
605  proc_if_t *proc_if_nth= (proc_if_t*)n->data;
606  CHECK_DO(proc_if_nth!= NULL, continue);
607  if(strcmp(proc_if_nth->proc_name, proc_if->proc_name)== 0) {
608  end_code= STAT_ECONFLICT;
609  goto end;
610  }
611  }
612 
613  /* Allocate a copy of the processor type in the list */
614  proc_if_cpy= proc_if_dup(proc_if);
615  //LOGV("Registering processor with name: '%s'\n",
616  // proc_if_cpy->proc_name); //comment-me
617  CHECK_DO(proc_if_cpy!= NULL, goto end);
618  ret_code= llist_push(&procs_module_ctx->proc_if_llist, proc_if_cpy);
619  CHECK_DO(ret_code== STAT_SUCCESS, goto end);
620 
621  end_code= STAT_SUCCESS;
622 end:
623  if(end_code!= STAT_SUCCESS)
624  proc_if_release(&proc_if_cpy);
625  return end_code;
626 }
627 
628 static int unregister_proc_if(const char *proc_name, log_ctx_t *log_ctx)
629 {
630  llist_t **ref_n;
631  int ret_code;
632  LOG_CTX_INIT(log_ctx);
633 
634  /* Check arguments */
635  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
636  CHECK_DO(proc_name!= NULL, return STAT_ERROR);
637 
638  /* Check that module instance critical section is locked */
639  ret_code= pthread_mutex_trylock(&procs_module_ctx->module_api_mutex);
640  CHECK_DO(ret_code== EBUSY, return STAT_ERROR);
641 
642  for(ref_n= &procs_module_ctx->proc_if_llist; (*ref_n)!= NULL;
643  ref_n= &((*ref_n)->next)) {
644  proc_if_t *proc_if_nth= (proc_if_t*)(*ref_n)->data;
645  CHECK_DO(proc_if_nth!= NULL, continue);
646 
647  if(strcmp(proc_if_nth->proc_name, proc_name)== 0) { // Node found
648  void *node;
649 
650  node= llist_pop(ref_n);
651  ASSERT(node!= NULL && node== (void*)proc_if_nth);
652 
653  /* Once that node register was popped (and thus not accessible
654  * by any concurrent thread), release corresponding context
655  * structure.
656  */
657  //LOGD("Unregistering processor '%s' succeed\n",
658  // proc_if_nth->proc_name); // comment-me
659  proc_if_release(&proc_if_nth);
660  ASSERT(proc_if_nth== NULL);
661  return STAT_SUCCESS;
662  }
663  }
664  return STAT_ENOTFOUND;
665 }
666 
667 static const proc_if_t* get_proc_if_by_name(const char *proc_name,
668  log_ctx_t *log_ctx)
669 {
670  llist_t *n;
671  int ret_code;
672  LOG_CTX_INIT(log_ctx);
673 
674  /* Check arguments */
675  CHECK_DO(procs_module_ctx!= NULL, return NULL);
676  CHECK_DO(proc_name!= NULL && strlen(proc_name)> 0, return NULL);
677 
678  /* Check that module instance critical section is locked */
679  ret_code= pthread_mutex_trylock(&procs_module_ctx->module_api_mutex);
680  CHECK_DO(ret_code== EBUSY, return NULL);
681 
682  /* Check if processor is already register with given "name" */
683  for(n= procs_module_ctx->proc_if_llist; n!= NULL; n= n->next) {
684  proc_if_t *proc_if_nth= (proc_if_t*)n->data;
685  CHECK_DO(proc_if_nth!= NULL, continue);
686  if(strcmp(proc_if_nth->proc_name, proc_name)== 0)
687  return proc_if_nth;
688  }
689  return NULL;
690 }
691 
692 static int procs_instance_opt(procs_ctx_t *procs_ctx, const char *tag,
693  log_ctx_t *log_ctx, va_list arg)
694 {
695 #define PROC_ID_STR_FMT "{\"proc_id\":%d}"
696  int end_code= STAT_ERROR;
697  char ref_id_str[strlen(PROC_ID_STR_FMT)+ 64];
698  LOG_CTX_INIT(log_ctx);
699 
700  /* Check arguments */
701  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
702  CHECK_DO(procs_ctx!= NULL, return STAT_ERROR);
703  CHECK_DO(tag!= NULL, return STAT_ERROR);
704 
705  /* Lock module instance API critical section */
706  LOCK_PROCS_CTX_API(procs_ctx);
707 
708  if(TAG_IS("PROCS_POST")) {
709  int id;
710  const char *proc_name= va_arg(arg, const char*);
711  const char *settings_str= va_arg(arg, const char*);
712  char **ref_rest_str= va_arg(arg, char**);
713  end_code= proc_register(procs_ctx, proc_name, settings_str,
714  LOG_CTX_GET(), &id, arg);
715  if(end_code== STAT_SUCCESS && id>= 0) {
716  snprintf(ref_id_str, sizeof(ref_id_str), PROC_ID_STR_FMT, id);
717  *ref_rest_str= strdup(ref_id_str);
718  } else {
719  *ref_rest_str= NULL;
720  }
721  } else if(TAG_IS("PROCS_GET")) {
722  char **ref_rest_str= va_arg(arg, char**);
723  const char *filter_str= va_arg(arg, const char*);
724  end_code= procs_rest_get(procs_ctx, LOG_CTX_GET(), ref_rest_str,
725  filter_str);
726  } else if(TAG_IS("PROCS_ID_DELETE")) {
727  register int id= va_arg(arg, int);
728  end_code= proc_unregister(procs_ctx, id, LOG_CTX_GET());
729  } else {
730  LOGE("Unknown option\n");
731  end_code= STAT_ENOTFOUND;
732  }
733 
734  UNLOCK_PROCS_CTX_API(procs_ctx);
735  return end_code;
736 #undef PROC_ID_STR_FMT
737 }
738 
739 static int procs_rest_get(procs_ctx_t *procs_ctx, log_ctx_t *log_ctx,
740  char **ref_rest_str, const char *filter_str)
741 {
742  int i, ret_code, end_code= STAT_ERROR;
743  cJSON *cjson_rest= NULL, *cjson_proc= NULL;
744  cJSON *cjson_aux= NULL, *cjson_procs; // Do not release
745  const char *filter_proc_name= NULL, *filter_proc_notname= NULL;
746  char href[1024]= {0};
747  LOG_CTX_INIT(log_ctx);
748 
749  /* Check arguments */
750  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
751  CHECK_DO(procs_ctx!= NULL, return STAT_ERROR);
752  CHECK_DO(ref_rest_str!= NULL, return STAT_ERROR);
753  //argument 'filter_str' is allowed to be NULL
754 
755  /* Check that module instance API critical section is locked */
756  ret_code= pthread_mutex_trylock(&procs_ctx->api_mutex);
757  CHECK_DO(ret_code== EBUSY, return STAT_ERROR);
758 
759  *ref_rest_str= NULL;
760 
761  /* JSON structure is as follows:
762  * {
763  * <selected prefix_name>("procs" by default):[
764  * {
765  * "proc_id":number,
766  * "proc_name":string,
767  * "links":
768  * [
769  * {"rel":"self", "href":string}
770  * ]
771  * },
772  * ....
773  * ]
774  * }
775  */
776 
777  /* Create cJSON tree-root object */
778  cjson_rest= cJSON_CreateObject();
779  CHECK_DO(cjson_rest!= NULL, goto end);
780 
781  /* Create and attach 'PROCS' array */
782  cjson_procs= cJSON_CreateArray();
783  CHECK_DO(cjson_procs!= NULL, goto end);
784  cJSON_AddItemToObject(cjson_rest, procs_ctx->prefix_name, cjson_procs);
785 
786  /* Parse the filter if available */
787  if(filter_str!= NULL) {
788  size_t filter_proc_name_len= strlen("proc_nameX=");
789  if(strncmp(filter_str, "proc_name==", filter_proc_name_len)== 0)
790  filter_proc_name= filter_str+ filter_proc_name_len;
791  else if(strncmp(filter_str, "proc_name!=", filter_proc_name_len)== 0)
792  filter_proc_notname= filter_str+ filter_proc_name_len;
793  }
794 
795  /* Compose the REST list */
796  for(i= 0; i< procs_ctx->procs_reg_elem_array_size; i++) {
797  const char *proc_name;
798  const proc_if_t *proc_if;
799  register int proc_instance_index;
800  cJSON *cjson_links, *cjson_link;
801  proc_ctx_t *proc_ctx= NULL;
802  procs_reg_elem_t *procs_reg_elem=
803  &procs_ctx->procs_reg_elem_array[i];
804 
805  if((proc_ctx= procs_reg_elem->proc_ctx)== NULL)
806  continue;
807 
808  proc_instance_index= proc_ctx->proc_instance_index;
809  CHECK_DO(proc_instance_index== i, continue);
810 
811  proc_if= proc_ctx->proc_if;
812  CHECK_DO(proc_if!= NULL, continue);
813  proc_name= proc_if->proc_name;
814  CHECK_DO(proc_name!= NULL, continue);
815 
816  /* Check filters */
817  if(filter_proc_name!= NULL) {
818  if(strcmp(filter_proc_name, proc_name)!= 0)
819  continue;
820  } else if(filter_proc_notname!= NULL) {
821  if(strcmp(filter_proc_notname, proc_name)== 0)
822  continue;
823  }
824 
825  if(cjson_proc!= NULL) {
826  cJSON_Delete(cjson_proc);
827  cjson_proc= NULL;
828  }
829  cjson_proc= cJSON_CreateObject();
830  CHECK_DO(cjson_proc!= NULL, goto end);
831  //cJSON_AddItemToArray(cjson_procs, cjson_proc); //at the end of loop
832 
833  /* 'proc_id' */
834  cjson_aux= cJSON_CreateNumber((double)proc_instance_index);
835  CHECK_DO(cjson_aux!= NULL, goto end);
836  cJSON_AddItemToObject(cjson_proc, "proc_id", cjson_aux);
837 
838  /* 'proc_name' */
839  cjson_aux= cJSON_CreateString(proc_name);
840  CHECK_DO(cjson_aux!= NULL, goto end);
841  cJSON_AddItemToObject(cjson_proc, "proc_name", cjson_aux);
842 
843  /* 'links' */
844  cjson_links= cJSON_CreateArray();
845  CHECK_DO(cjson_links!= NULL, goto end);
846  cJSON_AddItemToObject(cjson_proc, "links", cjson_links);
847 
848  cjson_link= cJSON_CreateObject();
849  CHECK_DO(cjson_link!= NULL, goto end);
850  cJSON_AddItemToArray(cjson_links, cjson_link);
851 
852  cjson_aux= cJSON_CreateString("self");
853  CHECK_DO(cjson_aux!= NULL, goto end);
854  cJSON_AddItemToObject(cjson_link, "rel", cjson_aux);
855 
856  snprintf(href, sizeof(href), "%s/%s/%d.json",
857  procs_ctx->procs_href!= NULL? procs_ctx->procs_href: "",
858  procs_ctx->prefix_name, proc_instance_index);
859  cjson_aux= cJSON_CreateString(href);
860  CHECK_DO(cjson_aux!= NULL, goto end);
861  cJSON_AddItemToObject(cjson_link, "href", cjson_aux);
862 
863  /* Finally attach to REST list */
864  cJSON_AddItemToArray(cjson_procs, cjson_proc);
865  cjson_proc= NULL; // Avoid double referencing
866  }
867 
868  /* Print cJSON structure data to char string */
869  *ref_rest_str= CJSON_PRINT(cjson_rest);
870  CHECK_DO(*ref_rest_str!= NULL && strlen(*ref_rest_str)> 0, goto end);
871 
872  end_code= STAT_SUCCESS;
873 end:
874  if(cjson_rest!= NULL)
875  cJSON_Delete(cjson_rest);
876  if(cjson_proc!= NULL)
877  cJSON_Delete(cjson_proc);
878  return end_code;
879 }
880 
881 static int proc_register(procs_ctx_t *procs_ctx, const char *proc_name,
882  const char *settings_str, log_ctx_t *log_ctx, int *ref_id, va_list arg)
883 {
884  procs_reg_elem_t *procs_reg_elem;
885  const proc_if_t *proc_if;
886  int procs_reg_elem_array_size, ret_code, end_code= STAT_ERROR;
887  int proc_id= -1, flag_force_proc_id= 0;
888  int flag_is_query= 0; // 0-> JSON / 1->query string
889  char *proc_id_str= NULL;
890  cJSON *cjson_settings= NULL;
891  cJSON *cjson_aux= NULL; // Do not release
892  proc_ctx_t *proc_ctx= NULL;
893  uint32_t fifo_ctx_maxsize[PROC_IO_NUM]= {PROCS_FIFO_SIZE, PROCS_FIFO_SIZE};
894  LOG_CTX_INIT(log_ctx);
895 
896  /* Check arguments */
897  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
898  CHECK_DO(procs_ctx!= NULL, return STAT_ERROR);
899  CHECK_DO(proc_name!= NULL, return STAT_ERROR);
900  CHECK_DO(settings_str!= NULL, return STAT_ERROR);
901  // Note: argument 'log_ctx' is allowed to be NULL
902  CHECK_DO(ref_id!= NULL, return STAT_ERROR);
903 
904  procs_reg_elem_array_size= procs_ctx->procs_reg_elem_array_size;
905 
906  *ref_id= -1; // Set to invalid (undefined) value
907 
908  /* Check that module instance critical section is locked */
909  ret_code= pthread_mutex_trylock(&procs_ctx->api_mutex);
910  CHECK_DO(ret_code== EBUSY, goto end);
911 
912  /* **** Get processor Id. ****
913  * API user has the option to request to force the processor Id. to a
914  * proposed value (passed as setting: 'forced_proc_id=number').
915  */
916  /* Guess settings string representation format (JSON-REST or Query) */
917  flag_is_query= (settings_str[0]=='{' &&
918  settings_str[strlen(settings_str)-1]=='}')? 0: 1;
919 
920  /* Parse JSON or query string and check for 'forced_proc_id' field */
921  if(flag_is_query== 1) {
922  proc_id_str= uri_parser_query_str_get_value("forced_proc_id",
923  settings_str);
924  if(proc_id_str!= NULL) {
925  proc_id= atoll(proc_id_str);
926  flag_force_proc_id= 1;
927  }
928  } else {
929  /* In the case string format is JSON-REST, parse to cJSON structure */
930  cjson_settings= cJSON_Parse(settings_str);
931  CHECK_DO(cjson_settings!= NULL, goto end);
932  cjson_aux= cJSON_GetObjectItem(cjson_settings, "forced_proc_id");
933  if(cjson_aux!= NULL) {
934  proc_id= cjson_aux->valuedouble;
935  flag_force_proc_id= 1;
936  }
937  }
938 
939  /* If a forced processor Id. was not requested, get one */
940  if(flag_force_proc_id== 0) {
941  /* Get free slot where to register new processor */
942  for(proc_id= 0; proc_id< procs_reg_elem_array_size; proc_id++) {
943  if(procs_ctx->procs_reg_elem_array[proc_id].proc_ctx== NULL)
944  break;
945  }
946  }
947  if(proc_id< 0) {
948  LOGE("Invalid procesor identifier requested (Id. %d)\n", proc_id);
949  end_code= STAT_EINVAL;
950  goto end;
951  }
952  if(proc_id>= procs_reg_elem_array_size) {
953  LOGE("Maximum number of allowed processor instances exceeded\n");
954  end_code= STAT_ENOMEM;
955  goto end;
956  }
957  // In case Id. was forced, we need to check if slot is empty
958  if(procs_ctx->procs_reg_elem_array[proc_id].proc_ctx!= NULL) {
959  LOGE("Processor Id. conflict: requested Id. is being used.\n");
960  end_code= STAT_ECONFLICT;
961  goto end;
962  }
963 
964  /* Note that working on a locked module instance API critical section
965  * guarantee that 'procs_reg_elem_array' is not accessed concurrently
966  * for registering/unregistering. Nevertheless, 'procs_reg_elem_array'
967  * is still accessible concurrently by module's i/o operations, until
968  * corresponding "fair-lock" is locked (in the code below we will lock
969  * i/o "fair-locks" to register the new processor).
970  */
971  procs_reg_elem= &procs_ctx->procs_reg_elem_array[proc_id];
972 
973  /* Get processor interface (lock module!) */
974  ASSERT(pthread_mutex_lock(&procs_module_ctx->module_api_mutex)== 0);
975  proc_if= get_proc_if_by_name(proc_name, LOG_CTX_GET());
976  ASSERT(pthread_mutex_unlock(&procs_module_ctx->module_api_mutex)== 0);
977  if(proc_if== NULL) {
978  end_code= STAT_ENOTFOUND;
979  goto end;
980  }
981 
982  /* Open processor */
983  proc_ctx= proc_open(proc_if, settings_str, proc_id, fifo_ctx_maxsize,
984  LOG_CTX_GET(), arg);
985  CHECK_DO(proc_ctx!= NULL, goto end);
986 
987  /* Register processor context structure.
988  * At this point we lock corresponding i/o critical section ("fair" lock),
989  * as this register slot is still accessible concurrently by processor
990  * API and i/o operations.
991  */
992  LOCK_PROCS_REG_ELEM_API(procs_ctx, procs_reg_elem, goto end);
993  fair_lock(procs_reg_elem->fair_lock_io_array[PROC_IPUT]);
994  fair_lock(procs_reg_elem->fair_lock_io_array[PROC_OPUT]);
995  procs_reg_elem->proc_ctx= proc_ctx;
996  fair_unlock(procs_reg_elem->fair_lock_io_array[PROC_IPUT]);
997  fair_unlock(procs_reg_elem->fair_lock_io_array[PROC_OPUT]);
998  UNLOCK_PROCS_REG_ELEM_API(procs_reg_elem);
999  proc_ctx= NULL; // Avoid double referencing
1000 
1001  *ref_id= proc_id;
1002  end_code= STAT_SUCCESS;
1003 end:
1004  if(proc_ctx!= NULL)
1005  proc_close(&proc_ctx);
1006  if(proc_id_str!= NULL)
1007  free(proc_id_str);
1008  if(cjson_settings!= NULL)
1009  cJSON_Delete(cjson_settings);
1010  return end_code;
1011 }
1012 
1013 static int proc_unregister(procs_ctx_t *procs_ctx, int proc_id,
1014  log_ctx_t *log_ctx)
1015 {
1016  procs_reg_elem_t *procs_reg_elem;
1017  int procs_reg_elem_array_size, ret_code;
1018  proc_ctx_t *proc_ctx= NULL;
1019  LOG_CTX_INIT(log_ctx);
1020  LOGD(">>%s\n", __FUNCTION__); //comment-me
1021 
1022  /* Check arguments */
1023  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
1024  CHECK_DO(procs_ctx!= NULL, return STAT_ERROR);
1025  procs_reg_elem_array_size= procs_ctx->procs_reg_elem_array_size;
1026  CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size,
1027  return STAT_ERROR);
1028  // Note: argument 'log_ctx' is allowed to be NULL
1029 
1030  /* Check that module instance critical section is locked */
1031  ret_code= pthread_mutex_trylock(&procs_ctx->api_mutex);
1032  CHECK_DO(ret_code== EBUSY, return STAT_ERROR);
1033 
1034  procs_reg_elem= &procs_ctx->procs_reg_elem_array[proc_id];
1035 
1036  /* Fetch processor.
1037  * Note that working on locked module's API critical section guarantee that
1038  * 'procs_reg_elem_array' is not accessed concurrently for
1039  * registering/unregistering.
1040  */
1041  proc_ctx= procs_reg_elem->proc_ctx;
1042  if(proc_ctx== NULL)
1043  return STAT_ENOTFOUND;
1044  ASSERT(proc_ctx->proc_instance_index== proc_id);
1045 
1046  /* Unblock processor input/output FIFOs to be able to acquire i/o locks */
1047  ret_code= proc_opt(proc_ctx, "PROC_UNBLOCK");
1048  CHECK_DO(ret_code== STAT_SUCCESS, return STAT_ERROR);
1049 
1050  /* Lock processor API and i/o critical sections.
1051  * Delete processor reference from array register.
1052  */
1053  LOCK_PROCS_REG_ELEM_API(procs_ctx, procs_reg_elem, return STAT_ERROR);
1054  fair_lock(procs_reg_elem->fair_lock_io_array[PROC_IPUT]);
1055  fair_lock(procs_reg_elem->fair_lock_io_array[PROC_OPUT]);
1056  procs_reg_elem->proc_ctx= NULL;
1057  fair_unlock(procs_reg_elem->fair_lock_io_array[PROC_IPUT]);
1058  fair_unlock(procs_reg_elem->fair_lock_io_array[PROC_OPUT]);
1059  UNLOCK_PROCS_REG_ELEM_API(procs_reg_elem);
1060 
1061  /* Once processor register was deleted (and thus not accessible
1062  * by any concurrent thread performing i/o), release corresponding context
1063  * structure.
1064  */
1065  proc_close(&proc_ctx);
1066  ASSERT(proc_ctx== NULL);
1067  LOGD("<<%s\n", __FUNCTION__); //comment-me
1068  return STAT_SUCCESS;
1069 }
1070 
1071 static int procs_id_opt(procs_ctx_t *procs_ctx, const char *tag,
1072  log_ctx_t *log_ctx, va_list arg)
1073 {
1074  procs_reg_elem_t *procs_reg_elem;
1075  int procs_reg_elem_array_size, end_code= STAT_ERROR, proc_id= -1;
1076  int flag_procs_api_locked= 0, flag_proc_ctx_api_locked= 0;
1077  proc_ctx_t *proc_ctx= NULL;
1078  LOG_CTX_INIT(log_ctx);
1079 
1080  /* Check arguments */
1081  CHECK_DO(procs_module_ctx!= NULL, return STAT_ERROR);
1082  CHECK_DO(procs_ctx!= NULL, return STAT_ERROR);
1083  CHECK_DO(tag!= NULL, return STAT_ERROR);
1084 
1085  /* Implementation note:
1086  * We use a double lock technique to access processor instance:
1087  * - Lock PROCS module instance API critical section (while we are
1088  * accessing processors register);
1089  * - Lock processor (PROC) API critical section;
1090  * - Fetch processor using Id. argument;
1091  * - If fetching succeed, unlock PROCS module instance API critical
1092  * section (we are able to leave processors register critical section);
1093  * - ... use processor instance to treat options ...
1094  * - Unlock processor (PROC) API critical section.
1095  */
1096 
1097  /* Lock PROCS module instance API critical section */
1098  LOCK_PROCS_CTX_API(procs_ctx);
1099  flag_procs_api_locked= 1;
1100 
1101  /* Lock processor (PROC) API critical section */
1102  proc_id= va_arg(arg, int);
1103  procs_reg_elem_array_size= procs_ctx->procs_reg_elem_array_size;
1104  CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size, goto end);
1105  procs_reg_elem= &procs_ctx->procs_reg_elem_array[proc_id];
1106  LOCK_PROCS_REG_ELEM_API(procs_ctx, procs_reg_elem, goto end);
1107  flag_proc_ctx_api_locked= 1;
1108 
1109  /* Fetch processor */
1110  proc_ctx= procs_id_opt_fetch_proc_ctx(procs_ctx, proc_id, tag, arg,
1111  LOG_CTX_GET());
1112  if(proc_ctx== NULL) {
1113  end_code= STAT_ENOTFOUND;
1114  goto end;
1115  }
1116  ASSERT(proc_ctx->proc_instance_index== proc_id); // sanity check
1117 
1118  /* Unlock PROCS module instance API critical section */
1119  UNLOCK_PROCS_CTX_API(procs_ctx);
1120  flag_procs_api_locked= 0;
1121 
1122  /* Process options */
1123  if(TAG_IS("PROCS_ID_GET")) {
1124  end_code= procs_id_get(procs_reg_elem, proc_ctx, LOG_CTX_GET(),
1125  (void**)va_arg(arg, char**));
1126  } else if(TAG_IS("PROCS_ID_PUT")) {
1127  end_code= proc_opt(proc_ctx, "PROC_PUT", va_arg(arg, const char*));
1128  } else if(TAG_IS("PROCS_ID_UNBLOCK")) {
1129  end_code= proc_opt(proc_ctx, "PROC_UNBLOCK");
1130  } else {
1131  // We assume is a private option for the processor
1132  end_code= proc_vopt(proc_ctx, tag, arg);
1133  }
1134 
1135 end:
1136  /* Check critical sections and unlock if applicable */
1137  if(flag_procs_api_locked!= 0)
1138  UNLOCK_PROCS_CTX_API(procs_ctx);
1139  if(flag_proc_ctx_api_locked!= 0)
1140  UNLOCK_PROCS_REG_ELEM_API(procs_reg_elem);
1141  return end_code;
1142 }
1143 
1144 static int procs_id_get(procs_reg_elem_t *procs_reg_elem, proc_ctx_t *proc_ctx,
1145  log_ctx_t *log_ctx, void **ref_reponse)
1146 {
1147  int ret_code, end_code= STAT_ERROR;
1148  cJSON *cjson_rest= NULL, *cjson_settings= NULL, *cjson_aux= NULL;
1149  LOG_CTX_INIT(log_ctx);
1150 
1151  /* Check arguments */
1152  CHECK_DO(procs_reg_elem!= NULL, return STAT_ERROR);
1153  CHECK_DO(proc_ctx!= NULL, return STAT_ERROR);
1154  //log_ctx allowed to be NULL
1155  CHECK_DO(ref_reponse!= NULL, return STAT_ERROR);
1156 
1157  *ref_reponse= NULL;
1158 
1159  /* Check that processor API level critical section is locked */
1160  ret_code= pthread_mutex_trylock(&procs_reg_elem->api_mutex);
1161  CHECK_DO(ret_code== EBUSY, goto end);
1162 
1163  /* GET processor's REST response */
1164  ret_code= proc_opt(proc_ctx, "PROC_GET", PROC_IF_REST_FMT_CJSON,
1165  &cjson_rest);
1166  CHECK_DO(ret_code== STAT_SUCCESS && cjson_rest!= NULL, goto end);
1167 
1168  /* Get settings */
1169  cjson_settings= cJSON_GetObjectItem(cjson_rest, "settings");
1170  if(cjson_settings== NULL) {
1171  cjson_settings= cJSON_CreateObject();
1172  CHECK_DO(cjson_settings!= NULL, goto end);
1173  cJSON_AddItemToObject(cjson_rest, "settings", cjson_settings);
1174  }
1175 
1176  /* **** Add some REST elements at settings top ****
1177  * We do a little HACK to insert elements at top as cJSON library does not
1178  * support it natively -it always insert at the bottom-
1179  * We do this at the risk of braking in a future library version, as we
1180  * base current solution on the internal implementation of function
1181  * 'cJSON_AddItemToObject()' -may change in future-.
1182  */
1183 
1184  /* 'proc_name' */
1185  cjson_aux= cJSON_CreateString(proc_ctx->proc_if->proc_name);
1186  CHECK_DO(cjson_aux!= NULL, goto end);
1187  // Hack of 'cJSON_AddItemToObject(cjson_rest, "proc_name", cjson_aux);':
1188  cjson_aux->string= (char*)strdup("proc_name");
1189  cjson_aux->type|= cJSON_StringIsConst;
1190  //cJSON_AddItemToArray(cjson_rest, cjson_aux);
1191  cJSON_InsertItemInArray(cjson_settings, 0, cjson_aux); // Insert at top
1192  cjson_aux->type&= ~cJSON_StringIsConst;
1193 
1194  *ref_reponse= (void*)CJSON_PRINT(cjson_rest);
1195  CHECK_DO(*ref_reponse!= NULL && strlen((char*)*ref_reponse)> 0, goto end);
1196 
1197  end_code= STAT_SUCCESS;
1198 end:
1199  if(cjson_rest!= NULL)
1200  cJSON_Delete(cjson_rest);
1201  return end_code;
1202 }
1203 
1219  int proc_id, const char *tag, va_list arg, log_ctx_t *log_ctx)
1220 {
1221  va_list arg_cpy, va_list_empty;
1222  procs_reg_elem_t *procs_reg_elem;
1223  int procs_reg_elem_array_size, ret_code;
1224  int flag_is_query= 0; // 0-> JSON / 1->query string
1225  proc_ctx_t *proc_ctx_ret= NULL, *proc_ctx_curr= NULL, *proc_ctx_new= NULL;
1226  const char *settings_str_arg= NULL; // Do not release
1227  const proc_if_t *proc_if_curr= NULL, *proc_if_new= NULL; // Do not release
1228  const char *proc_name_curr= NULL; // Do not release
1229  char *proc_name_str= NULL;
1230  cJSON *cjson_rest_arg= NULL, *cjson_rest_curr= NULL;
1231  cJSON *cjson_aux= NULL; // Do not release
1232  char *settings_str_curr= NULL;
1233  uint32_t fifo_ctx_maxsize[PROC_IO_NUM]= {PROCS_FIFO_SIZE, PROCS_FIFO_SIZE};
1234  LOG_CTX_INIT(log_ctx);
1235 
1236  /* Check arguments */
1237  CHECK_DO(procs_ctx!= NULL, return NULL);
1238  procs_reg_elem_array_size= procs_ctx->procs_reg_elem_array_size;
1239  CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size, return NULL);
1240  CHECK_DO(tag!= NULL, return NULL);
1241  //arg nothing to check
1242  //log_ctx allowed to be NULL
1243 
1244  /* Get register element and current processor references */
1245  procs_reg_elem= &procs_ctx->procs_reg_elem_array[proc_id];
1246  proc_ctx_curr= procs_reg_elem->proc_ctx;
1247  if(proc_ctx_curr== NULL)
1248  goto end;
1249 
1250  /* Check that module instance critical section is locked and processor API
1251  * level critical section is also locked ("double locked").
1252  */
1253  ret_code= pthread_mutex_trylock(&procs_ctx->api_mutex);
1254  CHECK_DO(ret_code== EBUSY, goto end);
1255  ret_code= pthread_mutex_trylock(&procs_reg_elem->api_mutex);
1256  CHECK_DO(ret_code== EBUSY, goto end);
1257 
1258  /* In the case we have a PUT operation request we may have to treat a
1259  * special case (code below). Otherwise, we're done.
1260  */
1261  if(!TAG_IS("PROCS_ID_PUT")) {
1262  proc_ctx_ret= proc_ctx_curr;
1263  goto end;
1264  }
1265 
1266  /* **** Treat possible special case in PUT operation ****
1267  * Check if a new processor type is requested (namely, check for a PUT
1268  * operation with a new processor name specified).
1269  */
1270 
1271  /* Copy variable list of arguments to avoid interfering with original.
1272  * Get settings string argument corresponding to the PUT operation.
1273  */
1274  va_copy(arg_cpy, arg);
1275  settings_str_arg= va_arg(arg_cpy, const char*);
1276  if(settings_str_arg== NULL || strlen(settings_str_arg)== 0) {
1277  // No new processor name requested, we're done.
1278  proc_ctx_ret= proc_ctx_curr;
1279  goto end;
1280  }
1281 
1282  /* Get current processor name */
1283  proc_if_curr= proc_ctx_curr->proc_if;
1284  CHECK_DO(proc_if_curr!= NULL, goto end);
1285  proc_name_curr= proc_if_curr->proc_name;
1286  CHECK_DO(proc_name_curr!= NULL && strlen(proc_name_curr)> 0, goto end);
1287 
1288  /* Guess string representation format (JSON-REST or Query) */
1289  //LOGV("'%s'\n", str); //comment-me
1290  flag_is_query= (settings_str_arg[0]=='{' &&
1291  settings_str_arg[strlen(settings_str_arg)-1]=='}')? 0: 1;
1292 
1293  /* Parse JSON or query string and check for 'proc_name' field */
1294  if(flag_is_query== 1) {
1295  proc_name_str= uri_parser_query_str_get_value("proc_name",
1296  settings_str_arg);
1297  } else {
1298  /* In the case string format is JSON-REST, parse to cJSON structure */
1299  cjson_rest_arg= cJSON_Parse(settings_str_arg);
1300  CHECK_DO(cjson_rest_arg!= NULL, goto end);
1301  cjson_aux= cJSON_GetObjectItem(cjson_rest_arg, "proc_name");
1302  if(cjson_aux!= NULL)
1303  proc_name_str= strdup(cjson_aux->valuestring);
1304  }
1305  if(proc_name_str== NULL) {
1306  // No new processor name requested, we're done.
1307  proc_ctx_ret= proc_ctx_curr;
1308  goto end;
1309  }
1310  //LOGV("'proc_name'= '%s' in PUT request\n", proc_name_str); //comment-me
1311 
1312  /* Check if processor name actually change or is the same as current */
1313  if(strcmp(proc_name_str, proc_name_curr)== 0) {
1314  // No new processor name requested, we're done.
1315  proc_ctx_ret= proc_ctx_curr;
1316  goto end;
1317  }
1318 
1319  /* Check and get processor interface (lock PROCS module API!) */
1320  ASSERT(pthread_mutex_lock(&procs_module_ctx->module_api_mutex)== 0);
1321  proc_if_new= get_proc_if_by_name(proc_name_str, LOG_CTX_GET());
1322  ASSERT(pthread_mutex_unlock(&procs_module_ctx->module_api_mutex)== 0);
1323  if(proc_if_new== NULL) {
1324  LOGE("New processor name specified '%s' is not registered.\n",
1325  proc_name_str);
1326  goto end;
1327  }
1328 
1329  /* **** At this point we have a valid new processor request **** */
1330  LOGW("Changing processor type from '%s' to '%s'\n",
1331  proc_if_curr->proc_mime? proc_if_curr->proc_mime: proc_name_curr,
1332  proc_if_new->proc_mime? proc_if_new->proc_mime: proc_name_str);
1333 
1334  /* Get processor's current settings */
1335  ret_code= proc_opt(proc_ctx_curr, "PROC_GET", PROC_IF_REST_FMT_CJSON,
1336  &cjson_rest_curr);
1337  CHECK_DO(ret_code== STAT_SUCCESS && cjson_rest_curr!= NULL, goto end);
1338  cjson_aux= cJSON_GetObjectItem(cjson_rest_curr, "settings");
1339  if(cjson_aux!= NULL) { // May have settings object or not...
1340  settings_str_curr= cJSON_PrintUnformatted(cjson_aux);
1341  } else {
1342  settings_str_curr= strdup("");
1343  }
1344  CHECK_DO(settings_str_curr!= NULL, goto end);
1345  //LOGV("Current processor setting: '%s'\n", settings_str_curr); //comment-me
1346 
1347  /* Open (instantiate) new processor passing current settings and Id.
1348  * Note that all the settings fields that do not apply to the new
1349  * processor type will be just ignored.
1350  */
1351  proc_ctx_new= proc_open(proc_if_new, settings_str_curr,
1352  proc_ctx_curr->proc_instance_index, fifo_ctx_maxsize,
1353  LOG_CTX_GET(), va_list_empty);
1354  CHECK_DO(proc_ctx_new!= NULL, goto end);
1355 
1356  /* Unblock current processor input/output FIFOs to be able to acquire
1357  * i/o locks next.
1358  */
1359  ret_code= proc_opt(proc_ctx_curr, "PROC_UNBLOCK");
1360  CHECK_DO(ret_code== STAT_SUCCESS, goto end);
1361 
1362  /* Register new processor context structure.
1363  * At this point we lock corresponding i/o critical section ("fair" lock),
1364  * as this register slot is still accessible concurrently by processor
1365  * API and i/o operations.
1366  */
1367  //LOCK_PROCS_REG_ELEM_API(...); //already
1368  fair_lock(procs_reg_elem->fair_lock_io_array[PROC_IPUT]);
1369  fair_lock(procs_reg_elem->fair_lock_io_array[PROC_OPUT]);
1370  procs_reg_elem->proc_ctx= proc_ctx_new;
1371  fair_unlock(procs_reg_elem->fair_lock_io_array[PROC_IPUT]);
1372  fair_unlock(procs_reg_elem->fair_lock_io_array[PROC_OPUT]);
1373  //UNLOCK_PROCS_REG_ELEM_API(...); //already
1374  proc_ctx_new= NULL; // Avoid double referencing
1375 
1376  /* Finally, we release the old and successfully substituted processor */
1377  proc_close(&proc_ctx_curr);
1378  ASSERT(proc_ctx_curr== NULL);
1379 
1380  /* Success! */
1381  proc_ctx_ret= procs_reg_elem->proc_ctx;
1382 end:
1383  va_end(arg_cpy);
1384  if(proc_name_str!= NULL)
1385  free(proc_name_str);
1386  if(cjson_rest_arg!= NULL)
1387  cJSON_Delete(cjson_rest_arg);
1388  if(cjson_rest_curr!= NULL)
1389  cJSON_Delete(cjson_rest_curr);
1390  if(settings_str_curr!= NULL)
1391  free(settings_str_curr);
1392  if(proc_ctx_new!= NULL) {
1393  proc_close(&proc_ctx_new);
1394  ASSERT(proc_ctx_new== NULL);
1395  }
1396  return proc_ctx_ret;
1397 }
Generic processors (PROC) module.
#define CJSON_PRINT(CJSON_PTR)
Definition: proc.h:70
const char * proc_mime
Definition: proc_if.h:186
static proc_ctx_t * procs_id_opt_fetch_proc_ctx(procs_ctx_t *procs_ctx, int proc_id, const char *tag, va_list arg, log_ctx_t *log_ctx)
Definition: procs.c:1218
pthread_mutex_t api_mutex
Definition: procs.c:146
struct procs_reg_elem_s procs_reg_elem_t
pthread_mutex_t module_api_mutex
Definition: procs.c:122
#define PROCS_MAX_NUM_PROC_INSTANCES
Definition: procs.c:102
int proc_send_frame(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx)
Definition: proc.c:287
int procs_opt(procs_ctx_t *procs_ctx, const char *tag,...)
Definition: procs.c:474
int proc_opt(proc_ctx_t *proc_ctx, const char *tag,...)
Definition: proc.c:368
proc_ctx_t * proc_ctx
Definition: procs.c:165
proc_ctx_t * proc_open(const proc_if_t *proc_if, const char *settings_str, int proc_instance_index, uint32_t fifo_ctx_maxsize[PROC_IO_NUM], log_ctx_t *log_ctx, va_list arg)
Definition: proc.c:87
Definition: llist.h:49
int procs_module_open(log_ctx_t *log_ctx)
Definition: procs.c:244
int procs_module_opt(const char *tag,...)
Definition: procs.c:294
void proc_if_release(proc_if_t **ref_proc_if)
Definition: proc_if.c:232
int proc_vopt(proc_ctx_t *proc_ctx, const char *tag, va_list arg)
Definition: proc.c:382
#define UNLOCK_PROCS_CTX_API(PROCS_CTX)
Definition: procs.c:77
int procs_recv_frame(procs_ctx_t *procs_ctx, int proc_id, proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: procs.c:545
Generic processor (PROC) module.
void procs_module_close()
Definition: procs.c:273
char * procs_href
Definition: procs.c:183
proc_if_t * proc_if_dup(const proc_if_t *proc_if_arg)
Definition: proc_if.c:147
size_t procs_reg_elem_array_size
Definition: procs.c:206
struct procs_ctx_s procs_ctx_t
#define UNLOCK_PROCS_REG_ELEM_API(REG_ELEM)
Definition: procs.c:94
char prefix_name[256]
Definition: procs.c:178
#define LOCK_PROCS_REG_ELEM_API(PROCS_CTX, REG_ELEM, EXIT_CODE_ON_FAILURE)
Definition: procs.c:85
#define CHECK_DO(COND, ACTION)
Definition: check_utils.h:57
#define TAG_HAS(NEEDLE)
Definition: procs.c:61
int procs_send_frame(procs_ctx_t *procs_ctx, int proc_id, const proc_frame_ctx_t *proc_frame_ctx)
Definition: procs.c:504
procs_reg_elem_t * procs_reg_elem_array
Definition: procs.c:201
const proc_if_t * proc_if
Definition: proc.h:89
pthread_mutex_t api_mutex
Definition: procs.c:191
#define LLIST_RELEASE(ref_llist_head, node_release_fxn, node_type)
Definition: llist.h:165
struct procs_module_ctx_s procs_module_ctx_t
int proc_recv_frame(proc_ctx_t *proc_ctx, proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: proc.c:323
cJSON structure response
Definition: proc_if.h:159
llist_t * proc_if_llist
Definition: procs.c:129
int llist_push(llist_t **ref_llist_head, void *data)
Definition: llist.c:57
int proc_instance_index
Definition: proc.h:95
log_ctx_t * log_ctx
Definition: procs.c:210
#define ASSERT(COND)
Definition: check_utils.h:51
#define TAG_IS(TAG)
Definition: procs.c:66
void * llist_pop(llist_t **ref_llist_head)
Definition: llist.c:75
#define LOCK_PROCS_CTX_API(PROCS_CTX)
Definition: procs.c:71
void procs_close(procs_ctx_t **ref_procs_ctx)
Definition: procs.c:417
fair_lock_t * fair_lock_io_array[PROC_IO_NUM]
Definition: procs.c:154
Definition: log.c:102
const char * proc_name
Definition: proc_if.h:174
static procs_module_ctx_t * procs_module_ctx
Definition: procs.c:242
void proc_close(proc_ctx_t **ref_proc_ctx)
Definition: proc.c:208
PROC interface prototype related definitions and functions.
procs_ctx_t * procs_open(log_ctx_t *log_ctx, size_t max_procs_num, const char *prefix_name, const char *procs_href)
Definition: procs.c:336