34 #include <libcjson/cJSON.h> 36 #include <libmediaprocsutils/log.h> 37 #include <libmediaprocsutils/stat_codes.h> 38 #include <libmediaprocsutils/check_utils.h> 39 #include <libmediaprocsutils/schedule.h> 40 #include <libmediaprocsutils/fifo.h> 41 #include <libmediaprocsutils/fair_lock.h> 42 #include <libmediaprocsutils/interr_usleep.h> 49 #ifdef ENABLE_DEBUG_LOGS 50 #define LOGD_CTX_INIT(CTX) LOG_CTX_INIT(CTX) 51 #define LOGD(FORMAT, ...) LOGV(FORMAT, ##__VA_ARGS__) 53 #define LOGD_CTX_INIT(CTX) 60 #define TAG_HAS(NEEDLE) (strstr(tag, NEEDLE)!= NULL) 65 #define TAG_IS(TAG) (strcmp(tag, TAG)== 0) 70 #define PROC_STATS_THR_MEASURE_PERIOD_USECS (1000000) 77 static void* proc_stats_thr(
void *t);
78 static void* proc_thr(
void *t);
88 int proc_instance_index, uint32_t fifo_ctx_maxsize[PROC_IO_NUM],
91 uint64_t flag_proc_features;
92 int i, ret_code, end_code= STAT_ERROR;
95 LOG_CTX_INIT(log_ctx);
98 CHECK_DO(proc_if!= NULL,
return NULL);
99 CHECK_DO(settings_str!= NULL,
return NULL);
100 CHECK_DO(fifo_ctx_maxsize!= NULL ,
return NULL);
108 proc_ctx= proc_if->
open(proc_if, settings_str, LOG_CTX_GET(), arg);
109 CHECK_DO(proc_ctx!= NULL,
goto end);
118 ret_code= pthread_mutex_init(&proc_ctx->
api_mutex, NULL);
129 LOG_CTX_SET(proc_ctx->
log_ctx);
132 proc_ctx->
log_ctx= LOG_CTX_GET();
136 fifo_elem_alloc_fxn.elem_ctx_dup= (fifo_elem_ctx_dup_fxn_t*)
138 fifo_elem_alloc_fxn.elem_ctx_release= (fifo_elem_ctx_release_fxn_t*)
141 0, 0, &fifo_elem_alloc_fxn);
145 fifo_elem_alloc_fxn.elem_ctx_dup= (fifo_elem_ctx_dup_fxn_t*)
147 fifo_elem_alloc_fxn.elem_ctx_release= (fifo_elem_ctx_release_fxn_t*)
150 0, 0, &fifo_elem_alloc_fxn);
154 for(i= 0; i< PROC_IO_NUM; i++) {
156 CHECK_DO(fair_lock!= NULL,
goto end);
161 ret_code= pthread_mutex_init(&proc_ctx->acc_io_bits_mutex[PROC_IPUT], NULL);
163 ret_code= pthread_mutex_init(&proc_ctx->acc_io_bits_mutex[PROC_OPUT], NULL);
167 proc_ctx->iput_pts_array_idx= 0;
168 memset(proc_ctx->iput_pts_array, -1,
sizeof(proc_ctx->iput_pts_array));
171 proc_ctx->acc_latency_nsec= 0;
172 proc_ctx->acc_latency_cnt= 0;
173 ret_code= pthread_mutex_init(&proc_ctx->latency_mutex, NULL);
178 if(flag_proc_features& (PROC_FEATURE_BITRATE|PROC_FEATURE_REGISTER_PTS|
179 PROC_FEATURE_LATENCY)) {
187 ret_code= pthread_create(&proc_ctx->
stats_thread, NULL, proc_stats_thr,
196 ret_code= pthread_create(&proc_ctx->
proc_thread, NULL, proc_thr,
201 end_code= STAT_SUCCESS;
203 if(end_code!= STAT_SUCCESS)
204 proc_if->
close(&proc_ctx);
212 LOGD(
">>%s\n", __FUNCTION__);
214 if(ref_proc_ctx== NULL)
217 if((proc_ctx= *ref_proc_ctx)!= NULL) {
220 void *thread_end_code= NULL;
221 LOG_CTX_SET(proc_ctx->
log_ctx);
231 if(proc_if!= NULL && (unblock= proc_if->
unblock)!= NULL) {
232 ASSERT(unblock(proc_ctx)== STAT_SUCCESS);
234 LOGD(
"Waiting processor thread to join... ");
235 pthread_join(proc_ctx->
proc_thread, &thread_end_code);
236 if(thread_end_code!= NULL) {
237 ASSERT(*((
int*)thread_end_code)== STAT_SUCCESS);
238 free(thread_end_code);
239 thread_end_code= NULL;
242 "Waiting statistics thread to join... ");
251 if(thread_end_code!= NULL) {
252 ASSERT(*((
int*)thread_end_code)== STAT_SUCCESS);
253 free(thread_end_code);
254 thread_end_code= NULL;
257 LOGD(
"joined O.K.\n");
271 ASSERT(pthread_mutex_destroy(&proc_ctx->acc_io_bits_mutex[PROC_IPUT])
273 ASSERT(pthread_mutex_destroy(&proc_ctx->acc_io_bits_mutex[PROC_OPUT])
277 ASSERT(pthread_mutex_destroy(&proc_ctx->latency_mutex)== 0);
282 proc_if->
close(ref_proc_ctx);
284 LOGD(
"<<%s\n", __FUNCTION__);
297 CHECK_DO(proc_ctx!= NULL,
return STAT_ERROR);
302 LOG_CTX_SET(proc_ctx->
log_ctx);
305 CHECK_DO(fair_lock_p!= NULL,
return STAT_ERROR);
308 CHECK_DO(proc_if!= NULL,
return STAT_ERROR);
313 end_code= STAT_ENOTFOUND;
314 if((send_frame= proc_if->
send_frame)!= NULL) {
315 fair_lock(fair_lock_p);
316 end_code= send_frame(proc_ctx, proc_frame_ctx);
317 fair_unlock(fair_lock_p);
327 int ret_code, end_code= STAT_ERROR;
333 CHECK_DO(proc_ctx!= NULL,
return STAT_ERROR);
334 CHECK_DO(ref_proc_frame_ctx!= NULL,
return STAT_ERROR);
336 *ref_proc_frame_ctx= NULL;
339 LOG_CTX_SET(proc_ctx->
log_ctx);
342 CHECK_DO(fair_lock_p!= NULL,
goto end);
350 ret_code= STAT_ENOTFOUND;
351 if((recv_frame= proc_if->
recv_frame)!= NULL) {
352 fair_lock(fair_lock_p);
353 ret_code= recv_frame(proc_ctx, ref_proc_frame_ctx);
354 fair_unlock(fair_lock_p);
356 if(ret_code!= STAT_SUCCESS) {
361 end_code= STAT_SUCCESS;
363 if(end_code!= STAT_SUCCESS)
384 int end_code= STAT_ERROR;
387 int (*rest_put)(
proc_ctx_t *proc_ctx,
const char *str)= NULL;
388 int (*opt)(
proc_ctx_t *proc_ctx,
const char *tag, va_list arg)= NULL;
392 CHECK_DO(proc_ctx!= NULL,
return STAT_ERROR);
393 CHECK_DO(tag!= NULL,
return STAT_ERROR);
397 LOG_CTX_SET(proc_ctx->
log_ctx);
401 if(
TAG_IS(
"PROC_UNBLOCK")) {
404 end_code= STAT_SUCCESS;
405 if(proc_if!= NULL && (unblock= proc_if->
unblock)!= NULL)
406 end_code= unblock(proc_ctx);
407 }
else if(
TAG_IS(
"PROC_GET")) {
409 void **ref_reponse= va_arg(arg,
void**);
410 end_code= procs_id_get(proc_ctx, LOG_CTX_GET(), rest_fmt, ref_reponse);
411 }
else if(
TAG_IS(
"PROC_PUT")) {
412 end_code= STAT_ENOTFOUND;
413 if(proc_if!= NULL && (rest_put= proc_if->
rest_put)!= NULL)
414 end_code= rest_put(proc_ctx, va_arg(arg,
const char*));
416 if(proc_if!= NULL && (opt= proc_if->
opt)!= NULL)
417 end_code= opt(proc_ctx, tag, arg);
419 LOGE(
"Unknown option\n");
420 end_code= STAT_ENOTFOUND;
428 int proc_send_frame_default1(
proc_ctx_t *proc_ctx,
431 register uint64_t flag_proc_features;
436 CHECK_DO(proc_ctx!= NULL,
return STAT_ERROR);
439 LOG_CTX_SET(proc_ctx->
log_ctx);
443 CHECK_DO(proc_if!= NULL,
return STAT_ERROR);
447 if((flag_proc_features& PROC_FEATURE_REGISTER_PTS) &&
448 (flag_proc_features&PROC_FEATURE_LATENCY))
459 if(flag_proc_features& PROC_FEATURE_BITRATE)
468 int proc_recv_frame_default1(
proc_ctx_t *proc_ctx,
471 register uint64_t flag_proc_features;
473 int ret_code, end_code= STAT_ERROR;
474 size_t fifo_elem_size= 0;
478 CHECK_DO(proc_ctx!= NULL,
return STAT_ERROR);
479 CHECK_DO(ref_proc_frame_ctx!= NULL,
return STAT_ERROR);
481 LOG_CTX_SET(proc_ctx->
log_ctx);
483 *ref_proc_frame_ctx= NULL;
492 (
void**)ref_proc_frame_ctx, &fifo_elem_size);
493 if(ret_code!= STAT_SUCCESS) {
506 if(flag_proc_features& PROC_FEATURE_BITRATE)
510 end_code= STAT_SUCCESS;
512 if(end_code!= STAT_SUCCESS)
517 void proc_stats_register_accumulated_latency(
proc_ctx_t *proc_ctx,
518 const int64_t oput_frame_pts)
525 if(oput_frame_pts<= 0)
528 LOG_CTX_SET(proc_ctx->
log_ctx);
537 int64_t pts_iput= proc_ctx->iput_pts_array[IPUT_PTS_VAL][idx];
539 if(pts_iput== oput_frame_pts) {
540 register int ret_code;
541 register int64_t curr_nsec, iput_nsec;
542 struct timespec monotime_curr= {0};
544 ret_code= clock_gettime(CLOCK_MONOTONIC, &monotime_curr);
546 curr_nsec= (int64_t)monotime_curr.tv_sec*1000000000+
547 (int64_t)monotime_curr.tv_nsec;
548 iput_nsec= proc_ctx->iput_pts_array[IPUT_PTS_STC_VAL][idx];
549 if(curr_nsec> iput_nsec) {
550 pthread_mutex_t *latency_mutex_p= &proc_ctx->latency_mutex;
551 ASSERT((pthread_mutex_lock(latency_mutex_p))== 0);
552 proc_ctx->acc_latency_nsec+= curr_nsec- iput_nsec;
553 proc_ctx->acc_latency_cnt++;
557 ASSERT((pthread_mutex_unlock(latency_mutex_p))== 0);
569 uint64_t flag_proc_features;
570 int ret_code, end_code= STAT_ERROR;
571 cJSON *cjson_rest= NULL, *cjson_aux= NULL;
573 void **ref_reponse)= NULL;
574 LOG_CTX_INIT(log_ctx);
577 CHECK_DO(proc_ctx!= NULL,
return STAT_ERROR);
579 CHECK_DO(rest_fmt>= 0 && rest_fmt< PROC_IF_REST_FMT_ENUM_MAX,
581 CHECK_DO(ref_reponse!= NULL,
return STAT_ERROR);
586 ret_code= pthread_mutex_trylock(&proc_ctx->
api_mutex);
587 CHECK_DO(ret_code== EBUSY,
goto end);
596 if(rest_get== NULL) {
598 end_code= STAT_ENOTFOUND;
604 if(ret_code!= STAT_SUCCESS) {
608 CHECK_DO(cjson_rest!= NULL,
goto end);
618 if(flag_proc_features&PROC_FEATURE_LATENCY) {
621 CHECK_DO(cjson_aux!= NULL,
goto end);
624 cjson_aux->string= (
char*)strdup(
"latency_avg_usec");
625 cjson_aux->type|= cJSON_StringIsConst;
627 cJSON_InsertItemInArray(cjson_rest, 0, cjson_aux);
628 cjson_aux->type&= ~cJSON_StringIsConst;
636 CHECK_DO(*ref_reponse!= NULL && strlen((
char*)*ref_reponse)> 0,
640 *ref_reponse= (
void*)cjson_rest;
644 LOGE(
"Unknown format requested for processor REST\n");
648 end_code= STAT_SUCCESS;
650 if(cjson_rest!= NULL)
651 cJSON_Delete(cjson_rest);
655 static void* proc_stats_thr(
void *t)
658 uint64_t flag_proc_features;
660 int *ref_end_code= NULL;
665 ref_end_code= (
int*)malloc(
sizeof(
int));
666 CHECK_DO(ref_end_code!= NULL,
return NULL);
667 *ref_end_code= STAT_ERROR;
670 CHECK_DO(proc_ctx!= NULL,
goto end);
673 CHECK_DO(interr_usleep_ctx!= NULL,
goto end);
675 LOG_CTX_SET(proc_ctx->
log_ctx);
683 register int ret_code;
686 if(flag_proc_features& PROC_FEATURE_BITRATE) {
687 register uint32_t bit_counter_iput, bit_counter_oput;
688 pthread_mutex_t *acc_io_bits_mutex_iput_p=
689 &proc_ctx->acc_io_bits_mutex[PROC_IPUT];
690 pthread_mutex_t *acc_io_bits_mutex_oput_p=
691 &proc_ctx->acc_io_bits_mutex[PROC_OPUT];
696 ASSERT(pthread_mutex_lock(acc_io_bits_mutex_iput_p)== 0);
697 bit_counter_iput= proc_ctx->acc_io_bits[PROC_IPUT];
698 proc_ctx->acc_io_bits[PROC_IPUT]= 0;
699 ASSERT(pthread_mutex_unlock(acc_io_bits_mutex_iput_p)== 0);
700 proc_ctx->
bitrate[PROC_IPUT]= bit_counter_iput;
702 ASSERT(pthread_mutex_lock(acc_io_bits_mutex_oput_p)== 0);
703 bit_counter_oput= proc_ctx->acc_io_bits[PROC_OPUT];
704 proc_ctx->acc_io_bits[PROC_OPUT]= 0;
705 ASSERT(pthread_mutex_unlock(acc_io_bits_mutex_oput_p)== 0);
706 proc_ctx->
bitrate[PROC_OPUT]= bit_counter_oput;
710 if(flag_proc_features&PROC_FEATURE_LATENCY) {
711 register int acc_latency_cnt;
712 register int64_t acc_latency_usec= 0;
713 pthread_mutex_t *latency_mutex_p= &proc_ctx->latency_mutex;
715 ASSERT((pthread_mutex_lock(latency_mutex_p))== 0);
716 acc_latency_cnt= proc_ctx->acc_latency_cnt;
717 proc_ctx->acc_latency_cnt= 0;
718 if(acc_latency_cnt> 0)
719 acc_latency_usec= proc_ctx->acc_latency_nsec/ acc_latency_cnt;
720 proc_ctx->acc_latency_nsec= 0;
721 ASSERT((pthread_mutex_unlock(latency_mutex_p))== 0);
722 acc_latency_usec/= 1000;
726 if(acc_latency_usec> proc_ctx->latency_max_usec)
727 proc_ctx->latency_max_usec= acc_latency_usec;
728 if(proc_ctx->latency_min_usec<= 0)
729 proc_ctx->latency_min_usec= acc_latency_usec;
730 if(acc_latency_usec> 0 &&
731 acc_latency_usec< proc_ctx->latency_min_usec)
732 proc_ctx->latency_min_usec= acc_latency_usec;
738 ASSERT(ret_code== STAT_SUCCESS || ret_code== STAT_EINTR);
741 *ref_end_code= STAT_SUCCESS;
743 return (
void*)ref_end_code;
746 static void* proc_thr(
void *t)
750 int ret_code, *ref_end_code= NULL;
752 fifo_ctx_t *iput_fifo_ctx= NULL, *oput_fifo_ctx= NULL;
756 ref_end_code= (
int*)malloc(
sizeof(
int));
757 CHECK_DO(ref_end_code!= NULL,
return NULL);
758 *ref_end_code= STAT_ERROR;
761 CHECK_DO(proc_ctx!= NULL,
goto end);
763 LOG_CTX_SET(proc_ctx->
log_ctx);
769 CHECK_DO(process_frame!= NULL,
goto end);
774 CHECK_DO(iput_fifo_ctx!= NULL && oput_fifo_ctx!= NULL,
goto end);
778 ret_code= process_frame(proc_ctx, iput_fifo_ctx, oput_fifo_ctx);
779 if(ret_code== STAT_EOF)
781 else if(ret_code!= STAT_SUCCESS)
785 *ref_end_code= STAT_SUCCESS;
787 return (
void*)ref_end_code;
796 register int64_t curr_nsec;
797 struct timespec monotime_curr= {0};
802 CHECK_DO(proc_frame_ctx!= NULL,
return);
804 LOG_CTX_SET(proc_ctx->
log_ctx);
807 proc_ctx->iput_pts_array[IPUT_PTS_VAL][proc_ctx->iput_pts_array_idx]=
811 CHECK_DO(clock_gettime(CLOCK_MONOTONIC, &monotime_curr)== 0,
return);
812 curr_nsec= (int64_t)monotime_curr.tv_sec*1000000000+
813 (int64_t)monotime_curr.tv_nsec;
814 proc_ctx->iput_pts_array[IPUT_PTS_STC_VAL][proc_ctx->iput_pts_array_idx]=
818 proc_ctx->iput_pts_array_idx= (proc_ctx->iput_pts_array_idx+ 1)%
831 register uint32_t byte_cnt, bit_cnt;
832 register size_t width;
834 pthread_mutex_t *acc_io_bits_mutex_p= NULL;
839 CHECK_DO(proc_frame_ctx!= NULL,
return);
841 LOG_CTX_SET(proc_ctx->
log_ctx);
843 acc_io_bits_mutex_p= &proc_ctx->acc_io_bits_mutex[proc_io];
846 byte_cnt= (proc_frame_ctx->
width[0]* proc_frame_ctx->
height[0])+
847 (proc_frame_ctx->
width[1]* proc_frame_ctx->
height[1])+
848 (proc_frame_ctx->
width[2]* proc_frame_ctx->
height[2]);
850 for(i= 3; (width= proc_frame_ctx->
width[i])> 0 &&
851 i< PROC_FRAME_NUM_DATA_POINTERS; i++)
852 byte_cnt+= width* proc_frame_ctx->
height[i];
855 bit_cnt= (byte_cnt<< 3);
856 ASSERT(pthread_mutex_lock(acc_io_bits_mutex_p)== 0);
857 proc_ctx->acc_io_bits[proc_io]+= bit_cnt;
858 ASSERT(pthread_mutex_unlock(acc_io_bits_mutex_p)== 0);
void fifo_close(fifo_ctx_t **ref_fifo_ctx)
int fifo_get(fifo_ctx_t *fifo_ctx, void **ref_elem, size_t *ref_elem_size)
#define CJSON_PRINT(CJSON_PTR)
volatile int64_t latency_avg_usec
size_t width[PROC_FRAME_NUM_DATA_POINTERS]
void(* iput_fifo_elem_opaque_release)(void **ref_t)
interr_usleep_ctx_t * interr_usleep_open()
void interr_usleep_unblock(interr_usleep_ctx_t *interr_usleep_ctx)
void proc_frame_ctx_release(proc_frame_ctx_t **ref_proc_frame_ctx)
int fifo_put_dup(fifo_ctx_t *fifo_ctx, const void *elem, size_t elem_size)
proc_ctx_t *(* open)(const proc_if_t *proc_if, const char *settings_str, log_ctx_t *log_ctx, va_list arg)
void fifo_set_blocking_mode(fifo_ctx_t *fifo_ctx, int do_block)
int proc_send_frame(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx)
int proc_opt(proc_ctx_t *proc_ctx, const char *tag,...)
proc_ctx_t * proc_open(const proc_if_t *proc_if, const char *settings_str, int proc_instance_index, uint32_t fifo_ctx_maxsize[PROC_IO_NUM], log_ctx_t *log_ctx, va_list arg)
int proc_vopt(proc_ctx_t *proc_ctx, const char *tag, va_list arg)
Generic processor (PROC) module.
int(* recv_frame)(proc_ctx_t *proc_ctx, proc_frame_ctx_t **ref_proc_frame_ctx)
int(* process_frame)(proc_ctx_t *proc_ctx, fifo_ctx_t *fifo_ctx_iput, fifo_ctx_t *fifo_ctx_oput)
uint64_t flag_proc_features
int(* opt)(proc_ctx_t *proc_ctx, const char *tag, va_list arg)
#define IPUT_PTS_ARRAY_SIZE
pthread_mutex_t api_mutex
const void *(* start_routine)(void *)
#define CHECK_DO(COND, ACTION)
Character string response.
interr_usleep_ctx_t * interr_usleep_ctx
fifo_ctx_t * fifo_open(size_t slots_max, size_t chunk_size_max, uint32_t flags, const fifo_elem_alloc_fxn_t *fifo_elem_alloc_fxn)
enum proc_if_rest_fmt_enum proc_if_rest_fmt_t
static void proc_stats_register_accumulated_io_bits(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx, const proc_io_t proc_io)
const proc_if_t * proc_if
fifo_ctx_t * fifo_ctx_array[PROC_IO_NUM]
void(* close)(proc_ctx_t **ref_proc_ctx)
int proc_recv_frame(proc_ctx_t *proc_ctx, proc_frame_ctx_t **ref_proc_frame_ctx)
int(* send_frame)(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx)
int(* unblock)(proc_ctx_t *proc_ctx)
static void proc_stats_register_frame_pts(proc_ctx_t *proc_ctx, const proc_frame_ctx_t *proc_frame_ctx, const proc_io_t proc_io)
void *(* iput_fifo_elem_opaque_dup)(const proc_frame_ctx_t *proc_frame_ctx)
size_t height[PROC_FRAME_NUM_DATA_POINTERS]
#define PROC_STATS_THR_MEASURE_PERIOD_USECS
void proc_close(proc_ctx_t **ref_proc_ctx)
PROC interface prototype related definitions and functions.
int(* rest_get)(proc_ctx_t *proc_ctx, const proc_if_rest_fmt_t rest_fmt, void **ref_reponse)
int interr_usleep(interr_usleep_ctx_t *interr_usleep_ctx, uint32_t usec)
fair_lock_t * fair_lock_io_array[PROC_IO_NUM]
volatile uint32_t bitrate[PROC_IO_NUM]
proc_frame_ctx_t *(* oput_fifo_elem_opaque_dup)(const void *t)
void interr_usleep_close(interr_usleep_ctx_t **ref_interr_usleep_ctx)
int(* rest_put)(proc_ctx_t *proc_ctx, const char *str)