MediaProcessors
codecs_muxers_loopback.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Rafael Antoniello
3  *
4  * This file is part of MediaProcessors.
5  *
6  * MediaProcessors is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Lesser General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * MediaProcessors is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with MediaProcessors. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
29 #include <stdio.h>
30 #include <unistd.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <pthread.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <math.h>
37 
38 #include <mongoose.h>
39 #include <libcjson/cJSON.h>
40 #include <SDL2/SDL.h>
41 
42 #include <libavcodec/avcodec.h>
43 #include <libavutil/opt.h>
44 #include <libavutil/imgutils.h>
45 #include <libmediaprocsutils/log.h>
46 #include <libmediaprocsutils/check_utils.h>
47 #include <libmediaprocsutils/stat_codes.h>
48 #include <libmediaprocsutils/fifo.h>
49 #include <libmediaprocsutils/schedule.h>
50 #include <libmediaprocs/proc_if.h>
51 #include <libmediaprocs/procs.h>
52 #include <libmediaprocs/procs_api_http.h>
53 #include <libmediaprocs/proc.h>
54 #include <libmediaprocsmuxers/live555_rtsp.h>
55 #include <libmediaprocscodecs/ffmpeg_x264.h>
56 #include <libmediaprocscodecs/ffmpeg_m2v.h>
57 #include <libmediaprocscodecs/ffmpeg_mp3.h>
58 #include <libmediaprocscodecs/ffmpeg_lhe.h>
59 
60 /* **** Definitions **** */
61 
62 #define LISTENING_PORT "8088"
63 #define LISTENING_HOST "127.0.0.1"
64 
65 #define VIDEO_WIDTH "352"
66 #define VIDEO_HEIGHT "288"
67 
68 #define REFRESH_EVENT (SDL_USEREVENT + 1)
69 #define BREAK_EVENT (SDL_USEREVENT + 2)
70 
71 static volatile int flag_app_exit= 0;
72 
73 
79 typedef struct thr_ctx_s {
80  volatile int flag_exit;
81  int enc_proc_id, dec_proc_id, mux_proc_id, dmux_proc_id;
82  int elem_strem_id_video_server;
83  const char *mime_setting_video;
84  procs_ctx_t *procs_ctx;
85 } thr_ctx_t;
86 
87 static void prepare_and_send_raw_video_data(procs_ctx_t *procs_ctx,
88  int enc_proc_id, volatile int *ref_flag_exit)
89 {
90  uint8_t *p_data_y, *p_data_cr, *p_data_cb;
91  int64_t frame_period_usec, frame_period_90KHz;
92  int x, y;
93  const int width= atoi(VIDEO_WIDTH), height= atoi(VIDEO_HEIGHT);
94  uint8_t *buf= NULL;
95  proc_frame_ctx_t proc_frame_ctx= {0};
96  const char *fps_cstr= "30";
97 
98  /* Prepare raw data buffer */
99  buf= (uint8_t*)malloc((width* height* 3)/ 2);
100  if(buf== NULL) {
101  fprintf(stderr, "Could not allocate producer raw data buffer\n");
102  exit(-1);
103  }
104  proc_frame_ctx.data= buf;
105  proc_frame_ctx.p_data[0]= buf;
106  proc_frame_ctx.p_data[1]= proc_frame_ctx.p_data[0]+ (width* height);
107  proc_frame_ctx.p_data[2]= proc_frame_ctx.p_data[1]+ ((width* height)/4);
108  proc_frame_ctx.width[0]= proc_frame_ctx.linesize[0]= width;
109  proc_frame_ctx.width[1]= proc_frame_ctx.linesize[1]= width>> 1;
110  proc_frame_ctx.width[2]= proc_frame_ctx.linesize[2]= width>> 1;
111  proc_frame_ctx.height[0]= height;
112  proc_frame_ctx.height[1]= height>> 1;
113  proc_frame_ctx.height[2]= height>> 1;
114  proc_frame_ctx.proc_sample_fmt= PROC_IF_FMT_YUV420P;
115  proc_frame_ctx.es_id= 0;
116 
117  /* Encode few seconds of video */
118  p_data_y= (uint8_t*)proc_frame_ctx.p_data[0];
119  p_data_cr= (uint8_t*)proc_frame_ctx.p_data[1];
120  p_data_cb= (uint8_t*)proc_frame_ctx.p_data[2];
121  frame_period_usec= 1000000/ atoi(fps_cstr); //usecs
122  frame_period_90KHz= (frame_period_usec/1000/*[msec]*/)*
123  90/*[ticks/msec]*/; //ticks
124  for(; *ref_flag_exit== 0;) {
125 
126  usleep((unsigned int)frame_period_usec); //simulate real-time FPS
127  proc_frame_ctx.pts+= frame_period_90KHz;
128 
129  /* Y */
130  for(y= 0; y< height; y++)
131  for(x= 0; x< width; x++)
132  p_data_y[y* proc_frame_ctx.linesize[0]+ x]= x+ y+
133  proc_frame_ctx.pts* 3;
134  /* Cb and Cr */
135  for(y= 0; y< height>> 1; y++) {
136  for(x= 0; x< width>> 1; x++) {
137  p_data_cr[y* proc_frame_ctx.linesize[1]+ x]= 128+ y+
138  proc_frame_ctx.pts* 2;
139  p_data_cb[y* proc_frame_ctx.linesize[2]+ x]= 64+ x+
140  proc_frame_ctx.pts* 5;
141  }
142  }
143 
144  /* Encode the image */
145  procs_send_frame(procs_ctx, enc_proc_id, &proc_frame_ctx);
146  }
147 
148  if(buf!= NULL) {
149  free(buf);
150  buf= NULL;
151  }
152 }
153 
157 static void* producer_thr_video(void *t)
158 {
159  thr_ctx_t *thr_ctx= (thr_ctx_t*)t;
160 
161  /* Check argument */
162  if(thr_ctx== NULL) {
163  fprintf(stderr, "Bad argument '%s'\n", __FUNCTION__);
164  exit(1);
165  }
166 
167  /* Producer loop */
168  while(thr_ctx->flag_exit== 0) {
169  prepare_and_send_raw_video_data(thr_ctx->procs_ctx,
170  thr_ctx->enc_proc_id, &thr_ctx->flag_exit);
171  }
172 
173  return NULL;
174 }
175 
179 static void* mux_thr(void *t)
180 {
181  thr_ctx_t *thr_ctx= (thr_ctx_t*)t;
182  proc_frame_ctx_t *proc_frame_ctx= NULL;
183 
184  /* Check argument */
185  if(thr_ctx== NULL) {
186  fprintf(stderr, "Bad argument '%s'\n", __FUNCTION__);
187  exit(1);
188  }
189 
190  /* Get frame from encoder and send to multiplexer */
191  while(thr_ctx->flag_exit== 0) {
192  int ret_code;
193 
194  /* Receive encoded frame */
195  if(proc_frame_ctx!= NULL)
196  proc_frame_ctx_release(&proc_frame_ctx);
197  ret_code= procs_recv_frame(thr_ctx->procs_ctx, thr_ctx->enc_proc_id,
198  &proc_frame_ctx);
199  if(ret_code!= STAT_SUCCESS) {
200  if(ret_code== STAT_EAGAIN)
201  schedule(); // Avoid closed loops
202  else
203  fprintf(stderr, "Error while encoding frame'\n");
204  continue;
205  }
206 
207  /* Send encoded frame to multiplexer.
208  * IMPORTANT: Set correctly the elementary stream Id. to be able to
209  * correctly multiplex each frame.
210  */
211  if(proc_frame_ctx== NULL)
212  continue;
213  proc_frame_ctx->es_id= thr_ctx->elem_strem_id_video_server;
214  ret_code= procs_send_frame(thr_ctx->procs_ctx, thr_ctx->mux_proc_id,
215  proc_frame_ctx);
216  if(ret_code!= STAT_SUCCESS) {
217  if(ret_code== STAT_EAGAIN)
218  schedule(); // Avoid closed loops
219  else
220  fprintf(stderr, "Error while multiplexing frame'\n");
221  continue;
222  }
223  }
224 
225  if(proc_frame_ctx!= NULL)
226  proc_frame_ctx_release(&proc_frame_ctx);
227  return NULL;
228 }
229 
233 static void* dmux_thr(void *t)
234 {
235  int i, ret_code, elem_strem_id_video_client= -1;
236  int elementary_streams_cnt= 0;
237  thr_ctx_t *thr_ctx= (thr_ctx_t*)t;
238  proc_frame_ctx_t *proc_frame_ctx= NULL;
239  char *rest_str= NULL;
240  cJSON *cjson_rest= NULL, *cjson_es_array= NULL, *cjson_aux= NULL;
241 
242  /* Check argument */
243  if(thr_ctx== NULL) {
244  fprintf(stderr, "Bad argument '%s'\n", __FUNCTION__);
245  exit(1);
246  }
247 
248  /* Receive first frame from de-multiplexer -EPILOGUE-.
249  * The first time we receive data we have to check the elementary stream
250  * Id's. The idea is to use the elementary stream Id's to send each
251  * de-multiplexed frame to the correct decoding sink.
252  * We do this once, the first time we are receiving any frame,
253  * by consulting the de-multiplexer API.
254  */
255  ret_code= STAT_EAGAIN;
256  while(ret_code!= STAT_SUCCESS && thr_ctx->flag_exit== 0) {
257  schedule(); // Avoid closed loops
258  ret_code= procs_recv_frame(thr_ctx->procs_ctx, thr_ctx->dmux_proc_id,
259  &proc_frame_ctx);
260  }
261  if(ret_code!= STAT_SUCCESS || proc_frame_ctx== NULL) {
262  fprintf(stderr, "Error at line: %d\n", __LINE__);
263  exit(-1);
264  }
265 
266  /* Parse elementary streams Id's */
267  ret_code= procs_opt(thr_ctx->procs_ctx, "PROCS_ID_GET",
268  thr_ctx->dmux_proc_id, &rest_str);
269  if(ret_code!= STAT_SUCCESS || rest_str== NULL) {
270  fprintf(stderr, "Error at line: %d\n", __LINE__);
271  exit(-1);
272  }
273  if((cjson_rest= cJSON_Parse(rest_str))== NULL) {
274  fprintf(stderr, "Error at line: %d\n", __LINE__);
275  exit(-1);
276  }
277  // Elementary streams objects array
278  if((cjson_es_array= cJSON_GetObjectItem(cjson_rest,
279  "elementary_streams"))== NULL) {
280  fprintf(stderr, "Error at line: %d\n", __LINE__);
281  exit(-1);
282  }
283  // Iterate elementary stream objects and find the corresponding Id.
284  elementary_streams_cnt= cJSON_GetArraySize(cjson_es_array);
285  for(i= 0; i< elementary_streams_cnt; i++) {
286  cJSON *cjson_es= cJSON_GetArrayItem(cjson_es_array, i);
287  if(cjson_es!= NULL) {
288  int elementary_stream_id;
289  char *mime;
290  const char *mime_needle= thr_ctx->mime_setting_video+
291  strlen("sdp_mimetype=");
292 
293  /* Get stream Id. */
294  cjson_aux= cJSON_GetObjectItem(cjson_es, "elementary_stream_id");
295  if(cjson_aux== NULL) {
296  fprintf(stderr, "Error at line: %d\n", __LINE__);
297  exit(-1);
298  }
299  elementary_stream_id= cjson_aux->valueint;
300 
301  /* Check MIME type and assign Id. */
302  cjson_aux= cJSON_GetObjectItem(cjson_es, "sdp_mimetype");
303  if(cjson_aux== NULL) {
304  fprintf(stderr, "Error at line: %d\n", __LINE__);
305  exit(-1);
306  }
307  mime= cjson_aux->valuestring;
308  if(mime!= NULL && strcasecmp(mime_needle, mime)== 0)
309  elem_strem_id_video_client= elementary_stream_id;
310  }
311  }
312  free(rest_str); rest_str= NULL;
313  cJSON_Delete(cjson_rest); cjson_rest= NULL;
314  if(elem_strem_id_video_client< 0) {
315  fprintf(stderr, "Error at line: %d\n", __LINE__);
316  exit(-1);
317  }
318 
319  /* Send first received frame to decoder */
320  ret_code= procs_send_frame(thr_ctx->procs_ctx, thr_ctx->dec_proc_id,
321  proc_frame_ctx);
322  if(ret_code!= STAT_SUCCESS)
323  fprintf(stderr, "Error while decoding frame'\n");
324 
325  /* De-multiplexer loop */
326  while(thr_ctx->flag_exit== 0) {
327 
328  /* Receive frame from de-multiplexer */
329  if(proc_frame_ctx!= NULL)
330  proc_frame_ctx_release(&proc_frame_ctx);
331  ret_code= procs_recv_frame(thr_ctx->procs_ctx, thr_ctx->dmux_proc_id,
332  &proc_frame_ctx);
333  if(ret_code!= STAT_SUCCESS) {
334  if(ret_code== STAT_EAGAIN)
335  schedule(); // Avoid closed loops
336  else
337  fprintf(stderr, "Error while de-multiplexing frame'\n");
338  continue;
339  }
340 
341  /* Send received encoded frame to decoder */
342  if(proc_frame_ctx== NULL)
343  continue;
344  ret_code= procs_send_frame(thr_ctx->procs_ctx, thr_ctx->dec_proc_id,
345  proc_frame_ctx);
346  if(ret_code!= STAT_SUCCESS) {
347  if(ret_code== STAT_EAGAIN)
348  schedule(); // Avoid closed loops
349  else
350  fprintf(stderr, "Error while decoding frame'\n");
351  continue;
352  }
353  }
354 
355  if(proc_frame_ctx!= NULL)
356  proc_frame_ctx_release(&proc_frame_ctx);
357  return NULL;
358 }
359 
360 static void* consumer_thr_video(void *t)
361 {
362  thr_ctx_t *thr_ctx= (thr_ctx_t*)t;
363  SDL_Window *sdlWindow= NULL;
364  SDL_Renderer *sdlRenderer= NULL;
365  const uint32_t pixformat= SDL_PIXELFORMAT_IYUV;
366  SDL_Texture *sdlTexture= NULL;
367  proc_frame_ctx_t *proc_frame_ctx= NULL;
368  int w_Y= atoi(VIDEO_WIDTH),
369  h_Y= atoi(VIDEO_HEIGHT); // Initial values, may change and update.
370 
371  /* Check argument */
372  if(thr_ctx== NULL) {
373  fprintf(stderr, "Bad argument '%s'\n", __FUNCTION__);
374  exit(-1);
375  }
376 
377  /* **** SDL2 initialization **** */
378 
379  if(SDL_Init(SDL_INIT_VIDEO)) {
380  fprintf(stderr, "Could not initialize SDL: %s\n", SDL_GetError());
381  exit(-1);
382  }
383  sdlWindow= SDL_CreateWindow("codecs_muxers_loopback.c",
384  SDL_WINDOWPOS_UNDEFINED, SDL_WINDOWPOS_UNDEFINED, w_Y, h_Y,
385  SDL_WINDOW_OPENGL|SDL_WINDOW_RESIZABLE);
386  if(!sdlWindow) {
387  fprintf(stderr, "SDL: could not create window:%s\n",SDL_GetError());
388  exit(-1);
389  }
390  sdlRenderer= SDL_CreateRenderer(sdlWindow, -1, 0);
391  if(!sdlRenderer) {
392  fprintf(stderr, "SDL: could not create renderer:%s\n",SDL_GetError());
393  exit(-1);
394  }
395  sdlTexture= SDL_CreateTexture(sdlRenderer, pixformat,
396  SDL_TEXTUREACCESS_STREAMING, w_Y, h_Y);
397  if(!sdlTexture) {
398  fprintf(stderr, "SDL: could not create texture:%s\n",SDL_GetError());
399  exit(-1);
400  }
401 
402  /* **** Output loop **** */
403  while(thr_ctx->flag_exit== 0) {
404  SDL_Event event;
405  int w_Y_iput, h_Y_iput, ret_code;
406 
407  /* Receive decoded frame */
408  if(proc_frame_ctx!= NULL)
409  proc_frame_ctx_release(&proc_frame_ctx);
410  ret_code= procs_recv_frame(thr_ctx->procs_ctx, thr_ctx->dec_proc_id,
411  &proc_frame_ctx);
412  if(ret_code!= STAT_SUCCESS) {
413  if(ret_code== STAT_EAGAIN)
414  schedule(); // Avoid closed loops
415  else
416  fprintf(stderr, "Error while receiving decoded frame'\n");
417  continue;
418  }
419 
420  /* **** Write encoded-decoded frame to output if applicable **** */
421 
422  if(proc_frame_ctx== NULL) {
423  continue;
424  }
425 
426  /* Get and check input frame with and height */
427  w_Y_iput= proc_frame_ctx->width[0];
428  h_Y_iput= proc_frame_ctx->height[0];
429  if(w_Y_iput<= 0 || h_Y_iput<= 0) {
430  fprintf(stderr, "Invalid frame size\n");
431  exit(-1);
432  }
433 
434  /* Resize if applicable */
435  if(w_Y_iput!= w_Y || h_Y_iput!= h_Y) {
436 
437  w_Y= w_Y_iput;
438  h_Y= h_Y_iput;
439 
440  /* Push resize event to SDL2 */
441  event.type= SDL_WINDOWEVENT;
442  SDL_PushEvent(&event);
443  }
444 
445  /* Push refresh event to SDL2 */
446  event.type= REFRESH_EVENT;
447  SDL_PushEvent(&event);
448 
449  /* Wait for SDL event to be consumed */
450  SDL_WaitEvent(&event);
451  if(event.type== REFRESH_EVENT) {
452  SDL_Rect sdlRect= {
453  0, //x
454  0, //y
455  w_Y_iput, h_Y_iput
456  };
457  SDL_UpdateYUVTexture(sdlTexture, NULL,
458  proc_frame_ctx->p_data[0], proc_frame_ctx->linesize[0],
459  proc_frame_ctx->p_data[1], proc_frame_ctx->linesize[1],
460  proc_frame_ctx->p_data[2], proc_frame_ctx->linesize[2]);
461  SDL_RenderClear(sdlRenderer);
462  SDL_RenderCopy(sdlRenderer, sdlTexture, NULL, &sdlRect);
463  SDL_RenderPresent(sdlRenderer);
464 
465  }else if(event.type==SDL_WINDOWEVENT){
466  /* Resize */
467  SDL_SetWindowSize(sdlWindow, w_Y, h_Y);
468  }else if(event.type==SDL_QUIT){
469  flag_app_exit= 1;
470  }else if(event.type==BREAK_EVENT){
471  break;
472  }
473 
474  } // end-loop
475 
476  if(proc_frame_ctx!= NULL)
477  proc_frame_ctx_release(&proc_frame_ctx);
478  if(sdlTexture!= NULL)
479  SDL_DestroyTexture(sdlTexture);
480  if(sdlRenderer!= NULL) {
481  SDL_RenderPresent(sdlRenderer);
482  SDL_RenderClear(sdlRenderer);
483  SDL_DestroyRenderer(sdlRenderer);
484  }
485  if(sdlWindow!= NULL)
486  SDL_DestroyWindow(sdlWindow);
487  SDL_Quit();
488  return NULL;
489 }
490 
491 static void http_event_handler(struct mg_connection *c, int ev, void *p)
492 {
493 #define URI_MAX 4096
494 #define METH_MAX 16
495 #define BODY_MAX 4096000
496 
497  if(ev== MG_EV_HTTP_REQUEST) {
498  register size_t uri_len= 0, method_len= 0, qs_len= 0, body_len= 0;
499  const char *uri_p, *method_p, *qs_p, *body_p;
500  struct http_message *hm= (struct http_message*)p;
501  char *url_str= NULL, *method_str= NULL, *str_response= NULL,
502  *qstring_str= NULL, *body_str= NULL;
503  thr_ctx_t *thr_ctx= (thr_ctx_t*)c->user_data;
504 
505  if((uri_p= hm->uri.p)!= NULL && (uri_len= hm->uri.len)> 0 &&
506  uri_len< URI_MAX) {
507  url_str= (char*)calloc(1, uri_len+ 1);
508  if(url_str!= NULL)
509  memcpy(url_str, uri_p, uri_len);
510  }
511  if((method_p= hm->method.p)!= NULL && (method_len= hm->method.len)> 0
512  && method_len< METH_MAX) {
513  method_str= (char*)calloc(1, method_len+ 1);
514  if(method_str!= NULL)
515  memcpy(method_str, method_p, method_len);
516  }
517  if((qs_p= hm->query_string.p)!= NULL &&
518  (qs_len= hm->query_string.len)> 0 && qs_len< URI_MAX) {
519  qstring_str= (char*)calloc(1, qs_len+ 1);
520  if(qstring_str!= NULL)
521  memcpy(qstring_str, qs_p, qs_len);
522  }
523  if((body_p= hm->body.p)!= NULL && (body_len= hm->body.len)> 0
524  && body_len< BODY_MAX) {
525  body_str= (char*)calloc(1, body_len+ 1);
526  if(body_str!= NULL)
527  memcpy(body_str, body_p, body_len);
528  }
529 
530  /* Process HTTP request */
531  if(url_str!= NULL && method_str!= NULL)
532  procs_api_http_req_handler(thr_ctx->procs_ctx, url_str,
533  qstring_str, method_str, body_str, body_len, &str_response);
534  /* Send response */
535  if(str_response!= NULL && strlen(str_response)> 0) {
536  //printf("str_response: %s (len: %d)\n", str_response,
537  // (int)strlen(str_response)); //comment-me
538  mg_printf(c, "%s", "HTTP/1.1 200 OK\r\n");
539  mg_printf(c, "Content-Length: %d\r\n", (int)strlen(str_response));
540  mg_printf(c, "\r\n");
541  mg_printf(c, "%s", str_response);
542  } else {
543  mg_printf(c, "%s", "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
544  }
545 
546  if(str_response!= NULL)
547  free(str_response);
548  if(url_str!= NULL)
549  free(url_str);
550  if(method_str!= NULL)
551  free(method_str);
552  if(qstring_str!= NULL)
553  free(qstring_str);
554  if(body_str!= NULL)
555  free(body_str);
556  } else if(ev== MG_EV_RECV) {
557  mg_printf(c, "%s", "HTTP/1.1 202 ACCEPTED\r\nContent-Length: 0\r\n");
558  } else if(ev== MG_EV_SEND) {
559  mg_printf(c, "%s", "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
560  }
561 }
562 
566 static void* http_server_thr(void *t)
567 {
568  struct mg_mgr mgr;
569  struct mg_connection *c;
570  thr_ctx_t *thr_ctx= (thr_ctx_t*)t;
571  const char *listening_port= LISTENING_PORT;
572  struct mg_bind_opts opts;
573  const char *error_str= NULL;
574 
575  /* Check argument */
576  if(thr_ctx== NULL) {
577  fprintf(stderr, "Bad argument '%s'\n", __FUNCTION__);
578  exit(1);
579  }
580 
581  /* Create and configure the server */
582  mg_mgr_init(&mgr, NULL);
583 
584  memset(&opts, 0, sizeof(opts));
585  opts.error_string= &error_str;
586  opts.user_data= thr_ctx;
587  c= mg_bind_opt(&mgr, listening_port, http_event_handler, opts);
588  if(c== NULL) {
589  fprintf(stderr, "mg_bind_opt(%s:%s) failed: %s\n", LISTENING_HOST,
590  LISTENING_PORT, error_str);
591  exit(EXIT_FAILURE);
592  }
593  mg_set_protocol_http_websocket(c);
594 
595  while(flag_app_exit== 0)
596  mg_mgr_poll(&mgr, 1000);
597 
598  mg_mgr_free(&mgr);
599  return NULL;
600 }
601 
602 static void stream_proc_quit_signal_handler()
603 {
604  printf("signaling application to finalize...\n"); fflush(stdout);
605  flag_app_exit= 1;
606 }
607 
613 static void procs_post(procs_ctx_t *procs_ctx, const char *proc_name,
614  const char *proc_settings, int *ref_proc_id)
615 {
616  int ret_code;
617  char *rest_str= NULL;
618  cJSON *cjson_rest= NULL, *cjson_aux= NULL;
619 
620  ret_code= procs_opt(procs_ctx, "PROCS_POST", proc_name, proc_settings,
621  &rest_str);
622  if(ret_code!= STAT_SUCCESS || rest_str== NULL) {
623  fprintf(stderr, "Error at line: %d\n", __LINE__);
624  exit(-1);
625  }
626  if((cjson_rest= cJSON_Parse(rest_str))== NULL) {
627  fprintf(stderr, "Error at line: %d\n", __LINE__);
628  exit(-1);
629  }
630  if((cjson_aux= cJSON_GetObjectItem(cjson_rest, "proc_id"))== NULL) {
631  fprintf(stderr, "Error at line: %d\n", __LINE__);
632  exit(-1);
633  }
634  if((*ref_proc_id= cjson_aux->valuedouble)< 0) {
635  fprintf(stderr, "Error at line: %d\n", __LINE__);
636  exit(-1);
637  }
638  free(rest_str); rest_str= NULL;
639  cJSON_Delete(cjson_rest); cjson_rest= NULL;
640 }
641 
642 int main(int argc, char* argv[])
643 {
644  sigset_t set;
645  pthread_t producer_thread, mux_thread, dmux_thread, consumer_thread;
646  int ret_code, enc_proc_id= -1, dec_proc_id= -1, mux_proc_id= -1,
647  dmux_proc_id= -1, elem_strem_id_video_server= -1;
648  procs_ctx_t *procs_ctx= NULL;
649  char *rest_str= NULL, *settings_str= NULL;
650  cJSON *cjson_rest= NULL, *cjson_aux= NULL;
651  thr_ctx_t thr_ctx= {0};
652  const char *video_settings=
653  "width_output="VIDEO_WIDTH
654  "&height_output="VIDEO_HEIGHT;
655 #define MPEG2_VIDEO
656 #ifdef MPEG2_VIDEO
657  const proc_if_t *proc_if_enc= &proc_if_ffmpeg_m2v_enc;
658  const proc_if_t *proc_if_dec= &proc_if_ffmpeg_m2v_dec;
659  const char *mime_setting= "sdp_mimetype=video/mp2v";
660 #endif
661 #ifdef LHE_VIDEO
662  const proc_if_t *proc_if_enc= &proc_if_ffmpeg_mlhe_enc;
663  const proc_if_t *proc_if_dec= &proc_if_ffmpeg_mlhe_dec;
664  const char *mime_setting= "sdp_mimetype=video/mlhe";
665 #endif
666 #ifdef X264_VIDEO
667  const proc_if_t *proc_if_enc= &proc_if_ffmpeg_x264_enc;
668  const proc_if_t *proc_if_dec= &proc_if_ffmpeg_x264_dec;
669  const char *mime_setting= "sdp_mimetype=video/avc1";
670 #endif
671  const proc_if_t *proc_if_mux= &proc_if_live555_rtsp_mux;
672  const proc_if_t *proc_if_dmux= &proc_if_live555_rtsp_dmux;
673 
674  /* Set SIGNAL handlers to this process */
675  sigfillset(&set);
676  sigdelset(&set, SIGINT);
677  pthread_sigmask(SIG_SETMASK, &set, NULL);
678  signal(SIGINT, stream_proc_quit_signal_handler);
679 
680  /* Open LOG module */
681  log_module_open();
682 
683  /* Register all FFmpeg's CODECS */
684  avcodec_register_all();
685 
686  /* Open processors (PROCS) module */
687  if(procs_module_open(NULL)!= STAT_SUCCESS) {
688  fprintf(stderr, "Error at line: %d\n", __LINE__);
689  exit(-1);
690  }
691 
692  /* Register encoders, decoders, RTSP multiplexer and RTSP de-multiplexer
693  * processor types.
694  */
695  if(procs_module_opt("PROCS_REGISTER_TYPE", &proc_if_ffmpeg_m2v_enc)!=
696  STAT_SUCCESS) {
697  fprintf(stderr, "Error at line: %d\n", __LINE__);
698  exit(-1);
699  }
700  if(procs_module_opt("PROCS_REGISTER_TYPE", &proc_if_ffmpeg_m2v_dec)!=
701  STAT_SUCCESS) {
702  fprintf(stderr, "Error at line: %d\n", __LINE__);
703  exit(-1);
704  }
705  if(procs_module_opt("PROCS_REGISTER_TYPE", &proc_if_ffmpeg_mlhe_enc)!=
706  STAT_SUCCESS) {
707  fprintf(stderr, "Error at line: %d\n", __LINE__);
708  exit(-1);
709  }
710  if(procs_module_opt("PROCS_REGISTER_TYPE", &proc_if_ffmpeg_mlhe_dec)!=
711  STAT_SUCCESS) {
712  fprintf(stderr, "Error at line: %d\n", __LINE__);
713  exit(-1);
714  }
715  if(procs_module_opt("PROCS_REGISTER_TYPE", &proc_if_ffmpeg_x264_enc)!=
716  STAT_SUCCESS) {
717  fprintf(stderr, "Error at line: %d\n", __LINE__);
718  exit(-1);
719  }
720  if(procs_module_opt("PROCS_REGISTER_TYPE", &proc_if_ffmpeg_x264_dec)!=
721  STAT_SUCCESS) {
722  fprintf(stderr, "Error at line: %d\n", __LINE__);
723  exit(-1);
724  }
725  if(procs_module_opt("PROCS_REGISTER_TYPE", &proc_if_live555_rtsp_mux)!=
726  STAT_SUCCESS) {
727  fprintf(stderr, "Error at line: %d\n", __LINE__);
728  exit(-1);
729  }
730  if(procs_module_opt("PROCS_REGISTER_TYPE", &proc_if_live555_rtsp_dmux)!=
731  STAT_SUCCESS) {
732  fprintf(stderr, "Error at line: %d\n", __LINE__);
733  exit(-1);
734  }
735 
736  /* Get PROCS module's instance */
737  if((procs_ctx= procs_open(NULL, 16, NULL, NULL))== NULL) {
738  fprintf(stderr, "Error at line: %d\n", __LINE__);
739  exit(-1);
740  }
741 
742  /* Register an encoder instance and get corresponding processor Id. */
743  procs_post(procs_ctx, proc_if_enc->proc_name, video_settings, &enc_proc_id);
744 
745  /* Register a decoder instance and get corresponding processor Id. */
746  procs_post(procs_ctx, proc_if_dec->proc_name, "", &dec_proc_id);
747 
748  /* Register RTSP multiplexer instance and get corresponding Id. */
749  procs_post(procs_ctx, proc_if_mux->proc_name, "rtsp_port=8574",
750  &mux_proc_id);
751  /* Register an elementary stream for the multiplexer */
752  ret_code= procs_opt(procs_ctx, "PROCS_ID_ES_MUX_REGISTER", mux_proc_id,
753  mime_setting, &rest_str);
754  if(ret_code!= STAT_SUCCESS || rest_str== NULL) {
755  fprintf(stderr, "Error at line: %d\n", __LINE__);
756  exit(-1);
757  }
758  if((cjson_rest= cJSON_Parse(rest_str))== NULL) {
759  fprintf(stderr, "Error at line: %d\n", __LINE__);
760  exit(-1);
761  }
762  if((cjson_aux= cJSON_GetObjectItem(cjson_rest, "elementary_stream_id"))==
763  NULL) {
764  fprintf(stderr, "Error at line: %d\n", __LINE__);
765  exit(-1);
766  }
767  if((elem_strem_id_video_server= cjson_aux->valuedouble)< 0) {
768  fprintf(stderr, "Error at line: %d\n", __LINE__);
769  exit(-1);
770  }
771  free(rest_str); rest_str= NULL;
772  cJSON_Delete(cjson_rest); cjson_rest= NULL;
773 
774  /* Register RTSP de-multiplexer instance and get corresponding Id. */
775  procs_post(procs_ctx, proc_if_dmux->proc_name,
776  "rtsp_url=rtsp://127.0.0.1:8574/session", &dmux_proc_id);
777 
778  /* Launch producer, encoding-multiplexing, de-multiplexing-decoder,
779  * consumer (rendering) and HTTP-sever threads.
780  */
781  thr_ctx.flag_exit= 0;
782  thr_ctx.enc_proc_id= enc_proc_id;
783  thr_ctx.dec_proc_id= dec_proc_id;
784  thr_ctx.mux_proc_id= mux_proc_id;
785  thr_ctx.elem_strem_id_video_server= elem_strem_id_video_server;
786  thr_ctx.mime_setting_video= mime_setting;
787  thr_ctx.dmux_proc_id= dmux_proc_id;
788  thr_ctx.procs_ctx= procs_ctx;
789  ret_code= pthread_create(&producer_thread, NULL, producer_thr_video,
790  &thr_ctx);
791  if(ret_code!= 0) {
792  fprintf(stderr, "Error at line: %d\n", __LINE__);
793  exit(-1);
794  }
795  ret_code= pthread_create(&mux_thread, NULL, mux_thr, &thr_ctx);
796  if(ret_code!= 0) {
797  fprintf(stderr, "Error at line: %d\n", __LINE__);
798  exit(-1);
799  }
800  ret_code= pthread_create(&dmux_thread, NULL, dmux_thr, &thr_ctx);
801  if(ret_code!= 0) {
802  fprintf(stderr, "Error at line: %d\n", __LINE__);
803  exit(-1);
804  }
805  ret_code= pthread_create(&consumer_thread, NULL, consumer_thr_video,
806  &thr_ctx);
807  if(ret_code!= 0) {
808  fprintf(stderr, "Error at line: %d\n", __LINE__);
809  exit(-1);
810  }
811 
812  /* Server main processing loop */
813  printf("Starting server...\n"); fflush(stdout);
814  http_server_thr(&thr_ctx);
815 
816  /* Join the threads */
817  thr_ctx.flag_exit= 1;
818  ret_code= procs_opt(procs_ctx, "PROCS_ID_DELETE",
819  enc_proc_id); // before joining to unblock processor
820  if(ret_code!= STAT_SUCCESS) {
821  fprintf(stderr, "Error at line: %d\n", __LINE__);
822  exit(-1);
823  }
824  ret_code= procs_opt(procs_ctx, "PROCS_ID_DELETE", dec_proc_id);
825  if(ret_code!= STAT_SUCCESS) {
826  fprintf(stderr, "Error at line: %d\n", __LINE__);
827  exit(-1);
828  }
829  ret_code= procs_opt(procs_ctx, "PROCS_ID_DELETE", dmux_proc_id);
830  if(ret_code!= STAT_SUCCESS) {
831  fprintf(stderr, "Error at line: %d\n", __LINE__);
832  exit(-1);
833  }
834  ret_code= procs_opt(procs_ctx, "PROCS_ID_DELETE", mux_proc_id);
835  if(ret_code!= STAT_SUCCESS) {
836  fprintf(stderr, "Error at line: %d\n", __LINE__);
837  exit(-1);
838  }
839 
840  pthread_join(producer_thread, NULL);
841  pthread_join(mux_thread, NULL);
842  pthread_join(dmux_thread, NULL);
843  pthread_join(consumer_thread, NULL);
844 
845  if(procs_ctx!= NULL)
846  procs_close(&procs_ctx);
848  log_module_close();
849  if(rest_str!= NULL)
850  free(rest_str);
851  if(settings_str!= NULL)
852  free(settings_str);
853  if(cjson_rest!= NULL)
854  cJSON_Delete(cjson_rest);
855  return 0;
856 }
size_t width[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:113
void proc_frame_ctx_release(proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: proc_if.c:125
static void * mux_thr(void *t)
int linesize[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:107
struct thr_ctx_s thr_ctx_t
int procs_opt(procs_ctx_t *procs_ctx, const char *tag,...)
Definition: procs.c:474
int proc_sample_fmt
Definition: proc_if.h:127
const proc_if_t proc_if_ffmpeg_x264_enc
Definition: ffmpeg_x264.c:150
const proc_if_t proc_if_ffmpeg_m2v_enc
Definition: ffmpeg_m2v.c:145
const proc_if_t proc_if_ffmpeg_mlhe_enc
Definition: ffmpeg_lhe.c:145
int procs_module_open(log_ctx_t *log_ctx)
Definition: procs.c:244
int procs_module_opt(const char *tag,...)
Definition: procs.c:294
int procs_recv_frame(procs_ctx_t *procs_ctx, int proc_id, proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: procs.c:545
void procs_module_close()
Definition: procs.c:273
static void * dmux_thr(void *t)
const proc_if_t proc_if_live555_rtsp_mux
int procs_send_frame(procs_ctx_t *procs_ctx, int proc_id, const proc_frame_ctx_t *proc_frame_ctx)
Definition: procs.c:504
Planar YUV 4:2:0 with 12bpp (video)
Definition: proc_if.h:55
int procs_api_http_req_handler(procs_ctx_t *procs_ctx, const char *url, const char *query_string, const char *request_method, char *content, size_t content_len, char **ref_str_response)
const proc_if_t proc_if_ffmpeg_x264_dec
Definition: ffmpeg_x264.c:170
static void * producer_thr_video(void *t)
const proc_if_t proc_if_ffmpeg_m2v_dec
Definition: ffmpeg_m2v.c:165
const proc_if_t proc_if_live555_rtsp_dmux
size_t height[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:119
void procs_close(procs_ctx_t **ref_procs_ctx)
Definition: procs.c:417
const char * proc_name
Definition: proc_if.h:174
static void * http_server_thr(void *t)
const uint8_t * p_data[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:94
procs_ctx_t * procs_open(log_ctx_t *log_ctx, size_t max_procs_num, const char *prefix_name, const char *procs_href)
Definition: procs.c:336
uint8_t * data
Definition: proc_if.h:84
static void procs_post(procs_ctx_t *procs_ctx, const char *proc_name, const char *proc_settings, int *ref_proc_id)
const proc_if_t proc_if_ffmpeg_mlhe_dec
Definition: ffmpeg_lhe.c:165
int64_t pts
Definition: proc_if.h:138