36 #include <libcjson/cJSON.h> 37 #include <libmediaprocsutils/uri_parser.h> 38 #include <libmediaprocsutils/log.h> 39 #include <libmediaprocsutils/stat_codes.h> 40 #include <libmediaprocsutils/check_utils.h> 41 #include <libmediaprocsutils/fair_lock.h> 42 #include <libmediaprocsutils/llist.h> 50 #ifdef ENABLE_DEBUG_LOGS 51 #define LOGD_CTX_INIT(CTX) LOG_CTX_INIT(CTX) 52 #define LOGD(FORMAT, ...) LOGV(FORMAT, ##__VA_ARGS__) 54 #define LOGD_CTX_INIT(CTX) 61 #define TAG_HAS(NEEDLE) (strstr(tag, NEEDLE)!= NULL) 66 #define TAG_IS(TAG) (strcmp(tag, TAG)== 0) 71 #define LOCK_PROCS_CTX_API(PROCS_CTX) \ 72 ASSERT(pthread_mutex_lock(&PROCS_CTX->api_mutex)== 0); 77 #define UNLOCK_PROCS_CTX_API(PROCS_CTX) \ 78 ASSERT(pthread_mutex_unlock(&PROCS_CTX->api_mutex)== 0); 85 #define LOCK_PROCS_REG_ELEM_API(PROCS_CTX, REG_ELEM, EXIT_CODE_ON_FAILURE) \ 86 if(pthread_mutex_trylock(&PROCS_CTX->api_mutex)!= EBUSY) {\ 87 EXIT_CODE_ON_FAILURE;\ 89 ASSERT(pthread_mutex_lock(®_ELEM->api_mutex)== 0); 94 #define UNLOCK_PROCS_REG_ELEM_API(REG_ELEM) \ 95 ASSERT(pthread_mutex_unlock(®_ELEM->api_mutex)== 0); 102 #define PROCS_MAX_NUM_PROC_INSTANCES 8192 108 #define PROCS_FIFO_SIZE 2 178 char prefix_name[256];
216 static int unregister_proc_if(
const char *proc_name,
log_ctx_t *log_ctx);
217 static const proc_if_t* get_proc_if_by_name(
const char *proc_name,
220 static int procs_instance_opt(
procs_ctx_t *procs_ctx,
const char *tag,
224 char **ref_rest_str,
const char *filter_str);
225 static int proc_register(
procs_ctx_t *procs_ctx,
const char *proc_name,
226 const char *settings_str,
log_ctx_t *log_ctx,
int *ref_id, va_list arg);
229 static int procs_id_opt(
procs_ctx_t *procs_ctx,
const char *tag,
235 int proc_id,
const char *tag, va_list arg,
log_ctx_t *log_ctx);
246 int ret_code, end_code= STAT_ERROR;
247 LOG_CTX_INIT(log_ctx);
250 if(procs_module_ctx!= NULL) {
251 LOGW(
"'PROCS' module already initialized\n");
252 return STAT_NOTMODIFIED;
257 CHECK_DO(procs_module_ctx!= NULL,
goto end);
266 end_code= STAT_SUCCESS;
268 if(end_code!= STAT_SUCCESS)
278 if(procs_module_ctx== NULL) {
279 LOGE(
"'PROCS' module must be initialized previously\n");
290 free(procs_module_ctx);
291 procs_module_ctx= NULL;
297 int end_code= STAT_ERROR;
301 if(procs_module_ctx== NULL) {
302 LOGE(
"'PROCS' module should be initialized previously\n");
305 CHECK_DO(tag!= NULL,
return STAT_ERROR);
312 if(
TAG_IS(
"PROCS_REGISTER_TYPE")) {
313 end_code= register_proc_if(va_arg(arg,
proc_if_t*), LOG_CTX_GET());
314 }
else if (
TAG_IS(
"PROCS_UNREGISTER_TYPE")) {
315 end_code= unregister_proc_if(va_arg(arg,
const char*), LOG_CTX_GET());
316 }
else if (
TAG_IS(
"PROCS_GET_TYPE")) {
318 const char *proc_name= va_arg(arg,
const char*);
320 proc_if_register= get_proc_if_by_name(proc_name, LOG_CTX_GET());
321 if(proc_if_register!= NULL)
324 *ref_proc_if_cpy= NULL;
325 end_code= (*ref_proc_if_cpy!= NULL)? STAT_SUCCESS: STAT_ENOTFOUND;
327 LOGE(
"Unknown option\n");
328 end_code= STAT_ENOTFOUND;
337 const char *prefix_name,
const char *procs_href)
339 int proc_id, i, ret_code, end_code= STAT_ERROR;
341 LOG_CTX_INIT(log_ctx);
344 if(procs_module_ctx== NULL) {
345 LOGE(
"'PROCS' module should be initialized previously\n");
352 LOGE(
"Specified maximum number of processor exceeds system capacity\n");
359 CHECK_DO(procs_ctx!= NULL,
goto end);
363 if(prefix_name!= NULL && strlen(prefix_name)> 0) {
373 if(procs_href!= NULL && strlen(procs_href)> 0) {
378 ret_code= pthread_mutex_init(&procs_ctx->
api_mutex, NULL);
384 for(proc_id= 0; proc_id< max_procs_num; proc_id++) {
388 ret_code= pthread_mutex_init(&procs_reg_elem->
api_mutex, NULL);
391 for(i= 0; i< PROC_IO_NUM; i++) {
393 CHECK_DO(fair_lock!= NULL,
goto end);
410 end_code= STAT_SUCCESS;
412 if(end_code!= STAT_SUCCESS)
420 int procs_reg_elem_array_size, proc_id, i;
422 LOGD(
">>%s\n", __FUNCTION__);
424 if(ref_procs_ctx== NULL || (procs_ctx= *ref_procs_ctx)== NULL)
427 LOG_CTX_SET(procs_ctx->
log_ctx);
436 for(proc_id= 0; proc_id< procs_reg_elem_array_size; proc_id++) {
437 LOGD(
"unregistering proc with Id.: %d\n", proc_id);
438 proc_unregister(procs_ctx, proc_id, LOG_CTX_GET());
454 for(proc_id= 0; proc_id< procs_reg_elem_array_size; proc_id++) {
460 for(i= 0; i< PROC_IO_NUM; i++)
469 *ref_procs_ctx= NULL;
471 LOGD(
"<<%s\n", __FUNCTION__);
477 int end_code= STAT_ERROR;
481 CHECK_DO(procs_ctx!= NULL,
return STAT_ERROR);
482 CHECK_DO(tag!= NULL,
return STAT_ERROR);
485 if(procs_module_ctx== NULL) {
486 LOGE(
"'PROCS' module should be initialized previously\n");
492 LOG_CTX_SET(procs_ctx->
log_ctx);
495 end_code= procs_id_opt(procs_ctx, tag, LOG_CTX_GET(), arg);
497 end_code= procs_instance_opt(procs_ctx, tag, LOG_CTX_GET(), arg);
507 int procs_reg_elem_array_size, end_code= STAT_ERROR;
514 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
515 CHECK_DO(procs_ctx!= NULL,
return STAT_ERROR);
517 CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size,
519 CHECK_DO(proc_frame_ctx!= NULL,
return STAT_ERROR);
521 LOG_CTX_SET(procs_ctx->
log_ctx);
526 CHECK_DO(p_fair_lock!= NULL,
goto end);
528 fair_lock(p_fair_lock);
535 if((proc_ctx= procs_reg_elem->
proc_ctx)== NULL) {
536 end_code= STAT_ENOTFOUND;
541 fair_unlock(p_fair_lock);
548 int procs_reg_elem_array_size, end_code= STAT_ERROR;
555 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
556 CHECK_DO(procs_ctx!= NULL,
return STAT_ERROR);
558 CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size,
560 CHECK_DO(ref_proc_frame_ctx!= NULL,
return STAT_ERROR);
562 *ref_proc_frame_ctx= NULL;
564 LOG_CTX_SET(procs_ctx->
log_ctx);
569 CHECK_DO(p_fair_lock!= NULL,
goto end);
571 fair_lock(p_fair_lock);
578 if((proc_ctx= procs_reg_elem->
proc_ctx)== NULL) {
579 end_code= STAT_ENOTFOUND;
584 fair_unlock(p_fair_lock);
591 int ret_code, end_code= STAT_ERROR;
593 LOG_CTX_INIT(log_ctx);
596 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
597 CHECK_DO(proc_if!= NULL,
return STAT_ERROR);
601 CHECK_DO(ret_code== EBUSY,
return STAT_ERROR);
604 for(n= procs_module_ctx->
proc_if_llist; n!= NULL; n= n->next) {
606 CHECK_DO(proc_if_nth!= NULL,
continue);
608 end_code= STAT_ECONFLICT;
617 CHECK_DO(proc_if_cpy!= NULL,
goto end);
619 CHECK_DO(ret_code== STAT_SUCCESS,
goto end);
621 end_code= STAT_SUCCESS;
623 if(end_code!= STAT_SUCCESS)
628 static int unregister_proc_if(
const char *proc_name,
log_ctx_t *log_ctx)
632 LOG_CTX_INIT(log_ctx);
635 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
636 CHECK_DO(proc_name!= NULL,
return STAT_ERROR);
640 CHECK_DO(ret_code== EBUSY,
return STAT_ERROR);
642 for(ref_n= &procs_module_ctx->
proc_if_llist; (*ref_n)!= NULL;
643 ref_n= &((*ref_n)->next)) {
645 CHECK_DO(proc_if_nth!= NULL,
continue);
647 if(strcmp(proc_if_nth->
proc_name, proc_name)== 0) {
651 ASSERT(node!= NULL && node== (
void*)proc_if_nth);
660 ASSERT(proc_if_nth== NULL);
664 return STAT_ENOTFOUND;
667 static const proc_if_t* get_proc_if_by_name(
const char *proc_name,
672 LOG_CTX_INIT(log_ctx);
675 CHECK_DO(procs_module_ctx!= NULL,
return NULL);
676 CHECK_DO(proc_name!= NULL && strlen(proc_name)> 0,
return NULL);
680 CHECK_DO(ret_code== EBUSY,
return NULL);
683 for(n= procs_module_ctx->
proc_if_llist; n!= NULL; n= n->next) {
685 CHECK_DO(proc_if_nth!= NULL,
continue);
686 if(strcmp(proc_if_nth->
proc_name, proc_name)== 0)
692 static int procs_instance_opt(
procs_ctx_t *procs_ctx,
const char *tag,
695 #define PROC_ID_STR_FMT "{\"proc_id\":%d}" 696 int end_code= STAT_ERROR;
697 char ref_id_str[strlen(PROC_ID_STR_FMT)+ 64];
698 LOG_CTX_INIT(log_ctx);
701 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
702 CHECK_DO(procs_ctx!= NULL,
return STAT_ERROR);
703 CHECK_DO(tag!= NULL,
return STAT_ERROR);
708 if(
TAG_IS(
"PROCS_POST")) {
710 const char *proc_name= va_arg(arg,
const char*);
711 const char *settings_str= va_arg(arg,
const char*);
712 char **ref_rest_str= va_arg(arg,
char**);
713 end_code= proc_register(procs_ctx, proc_name, settings_str,
714 LOG_CTX_GET(), &
id, arg);
715 if(end_code== STAT_SUCCESS &&
id>= 0) {
716 snprintf(ref_id_str,
sizeof(ref_id_str), PROC_ID_STR_FMT,
id);
717 *ref_rest_str= strdup(ref_id_str);
721 }
else if(
TAG_IS(
"PROCS_GET")) {
722 char **ref_rest_str= va_arg(arg,
char**);
723 const char *filter_str= va_arg(arg,
const char*);
724 end_code= procs_rest_get(procs_ctx, LOG_CTX_GET(), ref_rest_str,
726 }
else if(
TAG_IS(
"PROCS_ID_DELETE")) {
727 register int id= va_arg(arg,
int);
728 end_code= proc_unregister(procs_ctx,
id, LOG_CTX_GET());
730 LOGE(
"Unknown option\n");
731 end_code= STAT_ENOTFOUND;
736 #undef PROC_ID_STR_FMT 740 char **ref_rest_str,
const char *filter_str)
742 int i, ret_code, end_code= STAT_ERROR;
743 cJSON *cjson_rest= NULL, *cjson_proc= NULL;
744 cJSON *cjson_aux= NULL, *cjson_procs;
745 const char *filter_proc_name= NULL, *filter_proc_notname= NULL;
746 char href[1024]= {0};
747 LOG_CTX_INIT(log_ctx);
750 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
751 CHECK_DO(procs_ctx!= NULL,
return STAT_ERROR);
752 CHECK_DO(ref_rest_str!= NULL,
return STAT_ERROR);
756 ret_code= pthread_mutex_trylock(&procs_ctx->
api_mutex);
757 CHECK_DO(ret_code== EBUSY,
return STAT_ERROR);
778 cjson_rest= cJSON_CreateObject();
779 CHECK_DO(cjson_rest!= NULL,
goto end);
782 cjson_procs= cJSON_CreateArray();
783 CHECK_DO(cjson_procs!= NULL,
goto end);
784 cJSON_AddItemToObject(cjson_rest, procs_ctx->
prefix_name, cjson_procs);
787 if(filter_str!= NULL) {
788 size_t filter_proc_name_len= strlen(
"proc_nameX=");
789 if(strncmp(filter_str,
"proc_name==", filter_proc_name_len)== 0)
790 filter_proc_name= filter_str+ filter_proc_name_len;
791 else if(strncmp(filter_str,
"proc_name!=", filter_proc_name_len)== 0)
792 filter_proc_notname= filter_str+ filter_proc_name_len;
797 const char *proc_name;
799 register int proc_instance_index;
800 cJSON *cjson_links, *cjson_link;
805 if((proc_ctx= procs_reg_elem->
proc_ctx)== NULL)
809 CHECK_DO(proc_instance_index== i,
continue);
814 CHECK_DO(proc_name!= NULL,
continue);
817 if(filter_proc_name!= NULL) {
818 if(strcmp(filter_proc_name, proc_name)!= 0)
820 }
else if(filter_proc_notname!= NULL) {
821 if(strcmp(filter_proc_notname, proc_name)== 0)
825 if(cjson_proc!= NULL) {
826 cJSON_Delete(cjson_proc);
829 cjson_proc= cJSON_CreateObject();
830 CHECK_DO(cjson_proc!= NULL,
goto end);
834 cjson_aux= cJSON_CreateNumber((
double)proc_instance_index);
835 CHECK_DO(cjson_aux!= NULL,
goto end);
836 cJSON_AddItemToObject(cjson_proc,
"proc_id", cjson_aux);
839 cjson_aux= cJSON_CreateString(proc_name);
840 CHECK_DO(cjson_aux!= NULL,
goto end);
841 cJSON_AddItemToObject(cjson_proc,
"proc_name", cjson_aux);
844 cjson_links= cJSON_CreateArray();
845 CHECK_DO(cjson_links!= NULL,
goto end);
846 cJSON_AddItemToObject(cjson_proc,
"links", cjson_links);
848 cjson_link= cJSON_CreateObject();
849 CHECK_DO(cjson_link!= NULL,
goto end);
850 cJSON_AddItemToArray(cjson_links, cjson_link);
852 cjson_aux= cJSON_CreateString(
"self");
853 CHECK_DO(cjson_aux!= NULL,
goto end);
854 cJSON_AddItemToObject(cjson_link,
"rel", cjson_aux);
856 snprintf(href,
sizeof(href),
"%s/%s/%d.json",
859 cjson_aux= cJSON_CreateString(href);
860 CHECK_DO(cjson_aux!= NULL,
goto end);
861 cJSON_AddItemToObject(cjson_link,
"href", cjson_aux);
864 cJSON_AddItemToArray(cjson_procs, cjson_proc);
870 CHECK_DO(*ref_rest_str!= NULL && strlen(*ref_rest_str)> 0,
goto end);
872 end_code= STAT_SUCCESS;
874 if(cjson_rest!= NULL)
875 cJSON_Delete(cjson_rest);
876 if(cjson_proc!= NULL)
877 cJSON_Delete(cjson_proc);
881 static int proc_register(
procs_ctx_t *procs_ctx,
const char *proc_name,
882 const char *settings_str,
log_ctx_t *log_ctx,
int *ref_id, va_list arg)
886 int procs_reg_elem_array_size, ret_code, end_code= STAT_ERROR;
887 int proc_id= -1, flag_force_proc_id= 0;
888 int flag_is_query= 0;
889 char *proc_id_str= NULL;
890 cJSON *cjson_settings= NULL;
891 cJSON *cjson_aux= NULL;
893 uint32_t fifo_ctx_maxsize[PROC_IO_NUM]= {PROCS_FIFO_SIZE, PROCS_FIFO_SIZE};
894 LOG_CTX_INIT(log_ctx);
897 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
898 CHECK_DO(procs_ctx!= NULL,
return STAT_ERROR);
899 CHECK_DO(proc_name!= NULL,
return STAT_ERROR);
900 CHECK_DO(settings_str!= NULL,
return STAT_ERROR);
902 CHECK_DO(ref_id!= NULL,
return STAT_ERROR);
909 ret_code= pthread_mutex_trylock(&procs_ctx->
api_mutex);
910 CHECK_DO(ret_code== EBUSY,
goto end);
917 flag_is_query= (settings_str[0]==
'{' &&
918 settings_str[strlen(settings_str)-1]==
'}')? 0: 1;
921 if(flag_is_query== 1) {
922 proc_id_str= uri_parser_query_str_get_value(
"forced_proc_id",
924 if(proc_id_str!= NULL) {
925 proc_id= atoll(proc_id_str);
926 flag_force_proc_id= 1;
930 cjson_settings= cJSON_Parse(settings_str);
931 CHECK_DO(cjson_settings!= NULL,
goto end);
932 cjson_aux= cJSON_GetObjectItem(cjson_settings,
"forced_proc_id");
933 if(cjson_aux!= NULL) {
934 proc_id= cjson_aux->valuedouble;
935 flag_force_proc_id= 1;
940 if(flag_force_proc_id== 0) {
942 for(proc_id= 0; proc_id< procs_reg_elem_array_size; proc_id++) {
948 LOGE(
"Invalid procesor identifier requested (Id. %d)\n", proc_id);
949 end_code= STAT_EINVAL;
952 if(proc_id>= procs_reg_elem_array_size) {
953 LOGE(
"Maximum number of allowed processor instances exceeded\n");
954 end_code= STAT_ENOMEM;
959 LOGE(
"Processor Id. conflict: requested Id. is being used.\n");
960 end_code= STAT_ECONFLICT;
975 proc_if= get_proc_if_by_name(proc_name, LOG_CTX_GET());
978 end_code= STAT_ENOTFOUND;
983 proc_ctx=
proc_open(proc_if, settings_str, proc_id, fifo_ctx_maxsize,
985 CHECK_DO(proc_ctx!= NULL,
goto end);
1002 end_code= STAT_SUCCESS;
1006 if(proc_id_str!= NULL)
1008 if(cjson_settings!= NULL)
1009 cJSON_Delete(cjson_settings);
1013 static int proc_unregister(
procs_ctx_t *procs_ctx,
int proc_id,
1017 int procs_reg_elem_array_size, ret_code;
1019 LOG_CTX_INIT(log_ctx);
1020 LOGD(
">>%s\n", __FUNCTION__);
1023 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
1024 CHECK_DO(procs_ctx!= NULL,
return STAT_ERROR);
1026 CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size,
1031 ret_code= pthread_mutex_trylock(&procs_ctx->
api_mutex);
1032 CHECK_DO(ret_code== EBUSY,
return STAT_ERROR);
1041 proc_ctx= procs_reg_elem->
proc_ctx;
1043 return STAT_ENOTFOUND;
1047 ret_code=
proc_opt(proc_ctx,
"PROC_UNBLOCK");
1048 CHECK_DO(ret_code== STAT_SUCCESS,
return STAT_ERROR);
1067 LOGD(
"<<%s\n", __FUNCTION__);
1068 return STAT_SUCCESS;
1071 static int procs_id_opt(
procs_ctx_t *procs_ctx,
const char *tag,
1075 int procs_reg_elem_array_size, end_code= STAT_ERROR, proc_id= -1;
1076 int flag_procs_api_locked= 0, flag_proc_ctx_api_locked= 0;
1078 LOG_CTX_INIT(log_ctx);
1081 CHECK_DO(procs_module_ctx!= NULL,
return STAT_ERROR);
1082 CHECK_DO(procs_ctx!= NULL,
return STAT_ERROR);
1083 CHECK_DO(tag!= NULL,
return STAT_ERROR);
1099 flag_procs_api_locked= 1;
1102 proc_id= va_arg(arg,
int);
1104 CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size,
goto end);
1107 flag_proc_ctx_api_locked= 1;
1112 if(proc_ctx== NULL) {
1113 end_code= STAT_ENOTFOUND;
1120 flag_procs_api_locked= 0;
1123 if(
TAG_IS(
"PROCS_ID_GET")) {
1124 end_code= procs_id_get(procs_reg_elem, proc_ctx, LOG_CTX_GET(),
1125 (
void**)va_arg(arg,
char**));
1126 }
else if(
TAG_IS(
"PROCS_ID_PUT")) {
1127 end_code=
proc_opt(proc_ctx,
"PROC_PUT", va_arg(arg,
const char*));
1128 }
else if(
TAG_IS(
"PROCS_ID_UNBLOCK")) {
1129 end_code=
proc_opt(proc_ctx,
"PROC_UNBLOCK");
1132 end_code=
proc_vopt(proc_ctx, tag, arg);
1137 if(flag_procs_api_locked!= 0)
1139 if(flag_proc_ctx_api_locked!= 0)
1147 int ret_code, end_code= STAT_ERROR;
1148 cJSON *cjson_rest= NULL, *cjson_settings= NULL, *cjson_aux= NULL;
1149 LOG_CTX_INIT(log_ctx);
1152 CHECK_DO(procs_reg_elem!= NULL,
return STAT_ERROR);
1153 CHECK_DO(proc_ctx!= NULL,
return STAT_ERROR);
1155 CHECK_DO(ref_reponse!= NULL,
return STAT_ERROR);
1160 ret_code= pthread_mutex_trylock(&procs_reg_elem->
api_mutex);
1161 CHECK_DO(ret_code== EBUSY,
goto end);
1166 CHECK_DO(ret_code== STAT_SUCCESS && cjson_rest!= NULL,
goto end);
1169 cjson_settings= cJSON_GetObjectItem(cjson_rest,
"settings");
1170 if(cjson_settings== NULL) {
1171 cjson_settings= cJSON_CreateObject();
1172 CHECK_DO(cjson_settings!= NULL,
goto end);
1173 cJSON_AddItemToObject(cjson_rest,
"settings", cjson_settings);
1186 CHECK_DO(cjson_aux!= NULL,
goto end);
1188 cjson_aux->string= (
char*)strdup(
"proc_name");
1189 cjson_aux->type|= cJSON_StringIsConst;
1191 cJSON_InsertItemInArray(cjson_settings, 0, cjson_aux);
1192 cjson_aux->type&= ~cJSON_StringIsConst;
1195 CHECK_DO(*ref_reponse!= NULL && strlen((
char*)*ref_reponse)> 0,
goto end);
1197 end_code= STAT_SUCCESS;
1199 if(cjson_rest!= NULL)
1200 cJSON_Delete(cjson_rest);
1219 int proc_id,
const char *tag, va_list arg,
log_ctx_t *log_ctx)
1221 va_list arg_cpy, va_list_empty;
1223 int procs_reg_elem_array_size, ret_code;
1224 int flag_is_query= 0;
1225 proc_ctx_t *proc_ctx_ret= NULL, *proc_ctx_curr= NULL, *proc_ctx_new= NULL;
1226 const char *settings_str_arg= NULL;
1227 const proc_if_t *proc_if_curr= NULL, *proc_if_new= NULL;
1228 const char *proc_name_curr= NULL;
1229 char *proc_name_str= NULL;
1230 cJSON *cjson_rest_arg= NULL, *cjson_rest_curr= NULL;
1231 cJSON *cjson_aux= NULL;
1232 char *settings_str_curr= NULL;
1233 uint32_t fifo_ctx_maxsize[PROC_IO_NUM]= {PROCS_FIFO_SIZE, PROCS_FIFO_SIZE};
1234 LOG_CTX_INIT(log_ctx);
1237 CHECK_DO(procs_ctx!= NULL,
return NULL);
1239 CHECK_DO(proc_id>= 0 && proc_id< procs_reg_elem_array_size,
return NULL);
1246 proc_ctx_curr= procs_reg_elem->
proc_ctx;
1247 if(proc_ctx_curr== NULL)
1253 ret_code= pthread_mutex_trylock(&procs_ctx->
api_mutex);
1254 CHECK_DO(ret_code== EBUSY,
goto end);
1255 ret_code= pthread_mutex_trylock(&procs_reg_elem->
api_mutex);
1256 CHECK_DO(ret_code== EBUSY,
goto end);
1261 if(!
TAG_IS(
"PROCS_ID_PUT")) {
1262 proc_ctx_ret= proc_ctx_curr;
1274 va_copy(arg_cpy, arg);
1275 settings_str_arg= va_arg(arg_cpy,
const char*);
1276 if(settings_str_arg== NULL || strlen(settings_str_arg)== 0) {
1278 proc_ctx_ret= proc_ctx_curr;
1283 proc_if_curr= proc_ctx_curr->proc_if;
1284 CHECK_DO(proc_if_curr!= NULL,
goto end);
1285 proc_name_curr= proc_if_curr->
proc_name;
1286 CHECK_DO(proc_name_curr!= NULL && strlen(proc_name_curr)> 0,
goto end);
1290 flag_is_query= (settings_str_arg[0]==
'{' &&
1291 settings_str_arg[strlen(settings_str_arg)-1]==
'}')? 0: 1;
1294 if(flag_is_query== 1) {
1295 proc_name_str= uri_parser_query_str_get_value(
"proc_name",
1299 cjson_rest_arg= cJSON_Parse(settings_str_arg);
1300 CHECK_DO(cjson_rest_arg!= NULL,
goto end);
1301 cjson_aux= cJSON_GetObjectItem(cjson_rest_arg,
"proc_name");
1302 if(cjson_aux!= NULL)
1303 proc_name_str= strdup(cjson_aux->valuestring);
1305 if(proc_name_str== NULL) {
1307 proc_ctx_ret= proc_ctx_curr;
1313 if(strcmp(proc_name_str, proc_name_curr)== 0) {
1315 proc_ctx_ret= proc_ctx_curr;
1321 proc_if_new= get_proc_if_by_name(proc_name_str, LOG_CTX_GET());
1323 if(proc_if_new== NULL) {
1324 LOGE(
"New processor name specified '%s' is not registered.\n",
1330 LOGW(
"Changing processor type from '%s' to '%s'\n",
1332 proc_if_new->proc_mime? proc_if_new->proc_mime: proc_name_str);
1337 CHECK_DO(ret_code== STAT_SUCCESS && cjson_rest_curr!= NULL,
goto end);
1338 cjson_aux= cJSON_GetObjectItem(cjson_rest_curr,
"settings");
1339 if(cjson_aux!= NULL) {
1340 settings_str_curr= cJSON_PrintUnformatted(cjson_aux);
1342 settings_str_curr= strdup(
"");
1344 CHECK_DO(settings_str_curr!= NULL,
goto end);
1351 proc_ctx_new=
proc_open(proc_if_new, settings_str_curr,
1352 proc_ctx_curr->proc_instance_index, fifo_ctx_maxsize,
1353 LOG_CTX_GET(), va_list_empty);
1354 CHECK_DO(proc_ctx_new!= NULL,
goto end);
1359 ret_code=
proc_opt(proc_ctx_curr,
"PROC_UNBLOCK");
1360 CHECK_DO(ret_code== STAT_SUCCESS,
goto end);
1370 procs_reg_elem->
proc_ctx= proc_ctx_new;
1378 ASSERT(proc_ctx_curr== NULL);
1381 proc_ctx_ret= procs_reg_elem->
proc_ctx;
1384 if(proc_name_str!= NULL)
1385 free(proc_name_str);
1386 if(cjson_rest_arg!= NULL)
1387 cJSON_Delete(cjson_rest_arg);
1388 if(cjson_rest_curr!= NULL)
1389 cJSON_Delete(cjson_rest_curr);
1390 if(settings_str_curr!= NULL)
1391 free(settings_str_curr);
1392 if(proc_ctx_new!= NULL) {
1394 ASSERT(proc_ctx_new== NULL);
1396 return proc_ctx_ret;
Generic processors (PROC) module.
#define CJSON_PRINT(CJSON_PTR)
static proc_ctx_t * procs_id_opt_fetch_proc_ctx(procs_ctx_t *procs_ctx, int proc_id, const char *tag, va_list arg, log_ctx_t *log_ctx)
pthread_mutex_t api_mutex
struct procs_reg_elem_s procs_reg_elem_t
pthread_mutex_t module_api_mutex
#define PROCS_MAX_NUM_PROC_INSTANCES
int proc_send_frame(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx)
int procs_opt(procs_ctx_t *procs_ctx, const char *tag,...)
int proc_opt(proc_ctx_t *proc_ctx, const char *tag,...)
proc_ctx_t * proc_open(const proc_if_t *proc_if, const char *settings_str, int proc_instance_index, uint32_t fifo_ctx_maxsize[PROC_IO_NUM], log_ctx_t *log_ctx, va_list arg)
int procs_module_open(log_ctx_t *log_ctx)
int procs_module_opt(const char *tag,...)
void proc_if_release(proc_if_t **ref_proc_if)
int proc_vopt(proc_ctx_t *proc_ctx, const char *tag, va_list arg)
#define UNLOCK_PROCS_CTX_API(PROCS_CTX)
int procs_recv_frame(procs_ctx_t *procs_ctx, int proc_id, proc_frame_ctx_t **ref_proc_frame_ctx)
Generic processor (PROC) module.
void procs_module_close()
proc_if_t * proc_if_dup(const proc_if_t *proc_if_arg)
size_t procs_reg_elem_array_size
struct procs_ctx_s procs_ctx_t
#define UNLOCK_PROCS_REG_ELEM_API(REG_ELEM)
#define LOCK_PROCS_REG_ELEM_API(PROCS_CTX, REG_ELEM, EXIT_CODE_ON_FAILURE)
#define CHECK_DO(COND, ACTION)
int procs_send_frame(procs_ctx_t *procs_ctx, int proc_id, const proc_frame_ctx_t *proc_frame_ctx)
procs_reg_elem_t * procs_reg_elem_array
const proc_if_t * proc_if
pthread_mutex_t api_mutex
#define LLIST_RELEASE(ref_llist_head, node_release_fxn, node_type)
struct procs_module_ctx_s procs_module_ctx_t
int proc_recv_frame(proc_ctx_t *proc_ctx, proc_frame_ctx_t **ref_proc_frame_ctx)
int llist_push(llist_t **ref_llist_head, void *data)
void * llist_pop(llist_t **ref_llist_head)
#define LOCK_PROCS_CTX_API(PROCS_CTX)
void procs_close(procs_ctx_t **ref_procs_ctx)
fair_lock_t * fair_lock_io_array[PROC_IO_NUM]
static procs_module_ctx_t * procs_module_ctx
void proc_close(proc_ctx_t **ref_proc_ctx)
PROC interface prototype related definitions and functions.
procs_ctx_t * procs_open(log_ctx_t *log_ctx, size_t max_procs_num, const char *prefix_name, const char *procs_href)