MediaProcessors
utests_live555_rtsp.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Rafael Antoniello
3  *
4  * This file is part of MediaProcessors.
5  *
6  * MediaProcessors is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU Lesser General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * MediaProcessors is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with MediaProcessors. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
26 #include <UnitTest++/UnitTest++.h>
27 
28 extern "C" {
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <unistd.h>
32 #include <sys/types.h>
33 #include <sys/stat.h>
34 #include <string.h>
35 
36 #include <libcjson/cJSON.h>
37 #include <libmediaprocsutils/log.h>
38 #include <libmediaprocsutils/stat_codes.h>
39 #include <libmediaprocsutils/check_utils.h>
40 #include <libmediaprocsutils/schedule.h>
41 #include <libmediaprocs/proc_if.h>
42 #include <libmediaprocs/procs.h>
43 #include <libmediaprocsmuxers/live555_rtsp.h>
44 }
45 
46 SUITE(UTESTS_LIVE555_RTSP)
47 {
48 #define FRAME_SIZE 2048
49 
50  typedef struct thr_ctx_s {
51  volatile int flag_exit;
52  int dmux_proc_id;
53  procs_ctx_t *procs_ctx;
54  proc_frame_ctx_t *proc_frame_ctx_template1;
55  proc_frame_ctx_t *proc_frame_ctx_template2;
56  } thr_ctx_t;
57 
58  static void* consumer_thr(void *t)
59  {
60  int i, ret_code, elem_strem_id_step= -1, elem_strem_id_alt= -1;
61  int elementary_streams_cnt= 0;
62  proc_frame_ctx_t *proc_frame_ctx= NULL;
63  thr_ctx_t *thr_ctx= (thr_ctx_t*)t;
64  char *rest_str= NULL;
65  cJSON *cjson_rest= NULL, *cjson_es_array= NULL, *cjson_aux= NULL;
66 
67  if(thr_ctx== NULL) {
68  CHECK(false);
69  exit(-1);
70  }
71 
72  /* Receive first frame from de-multiplexer -PRELUDE-.
73  * The first time we receive data we have to check the elementary stream
74  * Id's. The idea is to use the elementary stream Id's to send each
75  * de-multiplexed frame to the correct decoding sink.
76  * We do this once, the first time we are receiving any frame,
77  * by consulting the de-multiplexer API.
78  */
79  ret_code= STAT_EAGAIN;
80  while(ret_code!= STAT_SUCCESS && thr_ctx->flag_exit== 0) {
81  schedule(); // Avoid closed loops
82  ret_code= procs_recv_frame(thr_ctx->procs_ctx,
83  thr_ctx->dmux_proc_id, &proc_frame_ctx);
84  }
85  if(thr_ctx->flag_exit!= 0) {
86  proc_frame_ctx_release(&proc_frame_ctx);
87  return NULL;
88  }
89  if(ret_code!= STAT_SUCCESS || proc_frame_ctx== NULL) {
90  CHECK(false);
91  exit(-1);
92  }
93 
94  /* Parse elementary streams Id's */
95  ret_code= procs_opt(thr_ctx->procs_ctx, "PROCS_ID_GET",
96  thr_ctx->dmux_proc_id, &rest_str);
97  if(ret_code!= STAT_SUCCESS || rest_str== NULL) {
98  CHECK(false);
99  exit(-1);
100  }
101  if((cjson_rest= cJSON_Parse(rest_str))== NULL) {
102  CHECK(false);
103  exit(-1);
104  }
105  // Elementary streams objects array
106  if((cjson_es_array= cJSON_GetObjectItem(cjson_rest,
107  "elementary_streams"))== NULL) {
108  CHECK(false);
109  exit(-1);
110  }
111  // Iterate elementary stream objects and find the corresponding Id.
112  elementary_streams_cnt= cJSON_GetArraySize(cjson_es_array);
113  for(i= 0; i< elementary_streams_cnt; i++) {
114  cJSON *cjson_es= cJSON_GetArrayItem(cjson_es_array, i);
115  if(cjson_es!= NULL) {
116  int elementary_stream_id;
117  char *mime;
118 
119  /* Get stream Id. */
120  cjson_aux= cJSON_GetObjectItem(cjson_es,
121  "elementary_stream_id");
122  if(cjson_aux== NULL) {
123  CHECK(false);
124  exit(-1);
125  }
126  elementary_stream_id= cjson_aux->valueint;
127 
128  /* Check MIME type and assign Id. */
129  cjson_aux= cJSON_GetObjectItem(cjson_es, "sdp_mimetype");
130  if(cjson_aux== NULL) {
131  CHECK(false);
132  exit(-1);
133  }
134  mime= cjson_aux->valuestring;
135  if(mime!= NULL && strcasecmp("application/step-data",
136  mime)== 0)
137  elem_strem_id_step= elementary_stream_id;
138  else if(mime!= NULL && strcasecmp("application/alternate-data",
139  mime)== 0)
140  elem_strem_id_alt= elementary_stream_id;
141  }
142  }
143  free(rest_str); rest_str= NULL;
144  cJSON_Delete(cjson_rest); cjson_rest= NULL;
145  if(elem_strem_id_alt< 0 || elem_strem_id_alt< 0) {
146  CHECK(false);
147  exit(-1);
148  }
149 
150  while(thr_ctx->flag_exit== 0) {
151 
152  /* Get frame from de-multiplexer */
153  proc_frame_ctx_release(&proc_frame_ctx);
154  ret_code= procs_recv_frame(thr_ctx->procs_ctx,
155  thr_ctx->dmux_proc_id, &proc_frame_ctx);
156  CHECK(ret_code== STAT_SUCCESS || ret_code== STAT_EAGAIN ||
157  ret_code== STAT_ENOTFOUND);
158 
159  /* Parse frame if applicable */
160  if(proc_frame_ctx!= NULL) {
161  int i, val_32b;
162  const uint8_t *data_buf= proc_frame_ctx->p_data[0];
163  int frame_size= (int)proc_frame_ctx->width[0];
164  printf("Got frame!\n"); fflush(stdout); //comment-me
165 
166  /* Verify size */
167  CHECK(frame_size== FRAME_SIZE);
168 
169  // { //comment-me
170  //for(i= 0; i< frame_size; i++)
171  // printf("%d ", data_buf[i]);
172  //printf("\n"); fflush(stdout);
173  // }
174 
175  /* Verify frame data content */
176  if(proc_frame_ctx->es_id== elem_strem_id_step) {
177  /* We should have a "step" function */
178  for(i= 0, val_32b= 0; i< frame_size; i+= 4, val_32b++) {
179  CHECK(data_buf[i+0]== (((uint32_t)val_32b>>24)& 0xFF));
180  CHECK(data_buf[i+1]== (((uint32_t)val_32b>>16)& 0xFF));
181  CHECK(data_buf[i+2]== (((uint32_t)val_32b>> 8)& 0xFF));
182  CHECK(data_buf[i+3]== (((uint32_t)val_32b>> 0)& 0xFF));
183  }
184  } else if(proc_frame_ctx->es_id== elem_strem_id_alt) {
185  /* We should have an alternate 0x00-0xFF function */
186  for(i= 0; i< frame_size; i++)
187  CHECK(data_buf[i]== (i&1? 0xFF: 0));
188  }
189  }
190  }
191  proc_frame_ctx_release(&proc_frame_ctx);
192  return NULL;
193  }
194 
200  static void procs_post(procs_ctx_t *procs_ctx, const char *proc_name,
201  const char *proc_settings, int *ref_proc_id)
202  {
203  int ret_code;
204  char *rest_str= NULL;
205  cJSON *cjson_rest= NULL, *cjson_aux= NULL;
206 
207  ret_code= procs_opt(procs_ctx, "PROCS_POST", proc_name, proc_settings,
208  &rest_str);
209  if(ret_code!= STAT_SUCCESS || rest_str== NULL) {
210  CHECK(false);
211  exit(-1);
212  }
213  if((cjson_rest= cJSON_Parse(rest_str))== NULL) {
214  CHECK(false);
215  exit(-1);
216  }
217  if((cjson_aux= cJSON_GetObjectItem(cjson_rest, "proc_id"))== NULL) {
218  CHECK(false);
219  exit(-1);
220  }
221  if((*ref_proc_id= cjson_aux->valuedouble)< 0) {
222  CHECK(false);
223  exit(-1);
224  }
225  free(rest_str); rest_str= NULL;
226  cJSON_Delete(cjson_rest); cjson_rest= NULL;
227  }
228 
229  TEST(UTESTS_LIVE555_RTSP_SERVER)
230  {
231  pthread_t consumer_thread;
232  int i, ret_code, mux_proc_id= -1, dmux_proc_id= -1,
233  elem_strem_id_step= -1, elem_strem_id_alt= -1;
234  procs_ctx_t *procs_ctx= NULL;
235  char *rest_str= NULL;
236  cJSON *cjson_rest= NULL, *cjson_aux= NULL, *cjson_aux2= NULL;
237  proc_frame_ctx_t proc_frame_ctx_template1= {0};
238  proc_frame_ctx_t proc_frame_ctx_template2= {0};
239  uint8_t data_buf_template1[FRAME_SIZE];
240  uint8_t data_buf_template2[FRAME_SIZE];
241  thr_ctx_t thr_ctx= {0};
242 
243  /* Open LOG module */
244  log_module_open();
245 
246  /* Open processors (PROCS) module */
247  ret_code= procs_module_open(NULL);
248  if(ret_code!= STAT_SUCCESS) {
249  CHECK(false);
250  goto end;
251  }
252 
253  /* Register multiplexer and de-multiplexer processor types */
254  ret_code= procs_module_opt("PROCS_REGISTER_TYPE",
256  if(ret_code!= STAT_SUCCESS) {
257  CHECK(false);
258  goto end;
259  }
260  ret_code= procs_module_opt("PROCS_REGISTER_TYPE",
262  if(ret_code!= STAT_SUCCESS) {
263  CHECK(false);
264  goto end;
265  }
266 
267  /* Get PROCS module's instance */
268  procs_ctx= procs_open(NULL, 16, NULL, NULL);
269  if(procs_ctx== NULL) {
270  CHECK(false);
271  goto end;
272  }
273 
274  /* Register (open) a multiplexer instance */
275  procs_post(procs_ctx, "live555_rtsp_mux", "rtsp_port=8554",
276  &mux_proc_id);
277 
278  /* Register an elementary stream for the multiplexer: STEP f() */
279  ret_code= procs_opt(procs_ctx, "PROCS_ID_ES_MUX_REGISTER", mux_proc_id,
280  "sdp_mimetype=application/step-data", &rest_str);
281  if(ret_code!= STAT_SUCCESS || rest_str== NULL) {
282  fprintf(stderr, "Error at line: %d\n", __LINE__);
283  exit(-1);
284  }
285  if((cjson_rest= cJSON_Parse(rest_str))== NULL) {
286  fprintf(stderr, "Error at line: %d\n", __LINE__);
287  exit(-1);
288  }
289  if((cjson_aux= cJSON_GetObjectItem(cjson_rest,
290  "elementary_stream_id"))== NULL) {
291  fprintf(stderr, "Error at line: %d\n", __LINE__);
292  exit(-1);
293  }
294  if((elem_strem_id_step= cjson_aux->valuedouble)< 0) {
295  fprintf(stderr, "Error at line: %d\n", __LINE__);
296  exit(-1);
297  }
298  free(rest_str); rest_str= NULL;
299  cJSON_Delete(cjson_rest); cjson_rest= NULL;
300 
301  /* Register an elementary stream for the multiplexer: ALT f() */
302  ret_code= procs_opt(procs_ctx, "PROCS_ID_ES_MUX_REGISTER", mux_proc_id,
303  "sdp_mimetype=application/alternate-data", &rest_str);
304  if(ret_code!= STAT_SUCCESS || rest_str== NULL) {
305  fprintf(stderr, "Error at line: %d\n", __LINE__);
306  exit(-1);
307  }
308  if((cjson_rest= cJSON_Parse(rest_str))== NULL) {
309  fprintf(stderr, "Error at line: %d\n", __LINE__);
310  exit(-1);
311  }
312  if((cjson_aux= cJSON_GetObjectItem(cjson_rest,
313  "elementary_stream_id"))== NULL) {
314  fprintf(stderr, "Error at line: %d\n", __LINE__);
315  exit(-1);
316  }
317  if((elem_strem_id_alt= cjson_aux->valuedouble)< 0) {
318  fprintf(stderr, "Error at line: %d\n", __LINE__);
319  exit(-1);
320  }
321  free(rest_str); rest_str= NULL;
322  cJSON_Delete(cjson_rest); cjson_rest= NULL;
323 
324  /* Register RTSP de-multiplexer instance and get corresponding Id. */
325  procs_post(procs_ctx, "live555_rtsp_dmux",
326  "rtsp_url=rtsp://127.0.0.1:8554/session", &dmux_proc_id);
327 
328  /* Launch consumer thread */
329  thr_ctx.flag_exit= 0;
330  thr_ctx.procs_ctx= procs_ctx;
331  thr_ctx.dmux_proc_id= dmux_proc_id;
332  thr_ctx.proc_frame_ctx_template1= &proc_frame_ctx_template1;
333  thr_ctx.proc_frame_ctx_template2= &proc_frame_ctx_template2;
334  ret_code= pthread_create(&consumer_thread, NULL, consumer_thr,
335  &thr_ctx);
336  if(ret_code!= 0) {
337  CHECK(false);
338  goto end;
339  }
340 
341  /* **** Prepare data frames templates ****
342  * - "template frame 1": step function;
343  * - "template frame 2": alternate 0x00-0xFF function
344  */
345 
346  /* "template frame 1": step function */
347  proc_frame_ctx_template1.data= data_buf_template1;
348  proc_frame_ctx_template1.p_data[0]= data_buf_template1;
349  proc_frame_ctx_template1.linesize[0]= FRAME_SIZE;
350  proc_frame_ctx_template1.width[0]= FRAME_SIZE;
351  proc_frame_ctx_template1.height[0]= 1; // "1D" data
352  proc_frame_ctx_template1.pts= 0;
353  proc_frame_ctx_template1.es_id= elem_strem_id_step;
354  for(int i= 0, val_32b= 0; i< FRAME_SIZE; i+= 4, val_32b++) {
355  data_buf_template1[i+ 0]= ((uint32_t)val_32b>> 24)& 0xFF;
356  data_buf_template1[i+ 1]= ((uint32_t)val_32b>> 16)& 0xFF;
357  data_buf_template1[i+ 2]= ((uint32_t)val_32b>> 8)& 0xFF;
358  data_buf_template1[i+ 3]= ((uint32_t)val_32b>> 0)& 0xFF;
359  }
360 
361  /* "template frame 2": alternate 0x00-0xFF function */
362  proc_frame_ctx_template2.data= data_buf_template2;
363  proc_frame_ctx_template2.p_data[0]= data_buf_template2;
364  proc_frame_ctx_template2.linesize[0]= FRAME_SIZE;
365  proc_frame_ctx_template2.width[0]= FRAME_SIZE;
366  proc_frame_ctx_template2.height[0]= 1; // "1D" data
367  proc_frame_ctx_template2.pts= 0;
368  proc_frame_ctx_template2.es_id= elem_strem_id_alt;
369  for(int i= 0; i< FRAME_SIZE; i++)
370  data_buf_template2[i]= i&1? 0xFF: 0;
371 
372  /* Send (multiplex RTSP/RTP) a few frames to the consumer thread */
373  for(i= 0; i< 10; i++) {
374  /* the samples */
375  CHECK(procs_send_frame(procs_ctx, mux_proc_id,
376  &proc_frame_ctx_template1)== STAT_SUCCESS);
377  CHECK(procs_send_frame(procs_ctx, mux_proc_id,
378  &proc_frame_ctx_template2)== STAT_SUCCESS);
379  usleep(1000*10);
380  }
381 
382  /* Join the threads */
383  thr_ctx.flag_exit= 1;
384  // Delete processor before joining (to unblock processor).
385  // Close client to avoid O.S. to keep server's ports for 60 secs.
386  ret_code= procs_opt(procs_ctx, "PROCS_ID_DELETE", dmux_proc_id);
387  CHECK(ret_code== STAT_SUCCESS);
388 
389  /* Send a few frames to the consumer thread even when previously
390  * deleted (just to test...).
391  */
392  for(i= 0; i< 10; i++) {
393  /* the samples */
394  CHECK(procs_send_frame(procs_ctx, mux_proc_id,
395  &proc_frame_ctx_template1)== STAT_SUCCESS);
396  CHECK(procs_send_frame(procs_ctx, mux_proc_id,
397  &proc_frame_ctx_template2)== STAT_SUCCESS);
398  usleep(1000*10);
399  }
400 
401  /* Before deleting multiplexer, try to change settings... */
402  ret_code= procs_opt(procs_ctx, "PROCS_ID_PUT", mux_proc_id,
403  "rtsp_streaming_session_name=session2&rtsp_port=1999");
404  if(ret_code!= STAT_SUCCESS) {
405  CHECK(false);
406  goto end;
407  }
408 
409  /* Check new settings */
410  ret_code= procs_opt(procs_ctx, "PROCS_ID_GET", mux_proc_id, &rest_str);
411  if(ret_code!= STAT_SUCCESS || rest_str== NULL) {
412  fprintf(stderr, "Error at line: %d\n", __LINE__);
413  exit(-1);
414  }
415  if((cjson_rest= cJSON_Parse(rest_str))== NULL) {
416  fprintf(stderr, "Error at line: %d\n", __LINE__);
417  exit(-1);
418  }
419  if((cjson_aux= cJSON_GetObjectItem(cjson_rest, "settings"))== NULL) {
420  fprintf(stderr, "Error at line: %d\n", __LINE__);
421  exit(-1);
422  }
423  if((cjson_aux2= cJSON_GetObjectItem(cjson_aux,
424  "rtsp_streaming_session_name"))== NULL) {
425  fprintf(stderr, "Error at line: %d\n", __LINE__);
426  exit(-1);
427  }
428  CHECK(strcmp(cjson_aux2->valuestring, "session2")== 0);
429  if((cjson_aux2= cJSON_GetObjectItem(cjson_aux,
430  "rtsp_port"))== NULL) {
431  fprintf(stderr, "Error at line: %d\n", __LINE__);
432  exit(-1);
433  }
434  CHECK(cjson_aux2->valueint== 1999);
435  free(rest_str); rest_str= NULL;
436  cJSON_Delete(cjson_rest); cjson_rest= NULL;
437 
438  /* Delete multiplexer */
439  ret_code= procs_opt(procs_ctx, "PROCS_ID_DELETE", mux_proc_id);
440  CHECK(ret_code== STAT_SUCCESS);
441  pthread_join(consumer_thread, NULL);
442 
443 end:
444  if(procs_ctx!= NULL)
445  procs_close(&procs_ctx);
447  log_module_close();
448  if(rest_str!= NULL)
449  free(rest_str);
450  return;
451  }
452 }
size_t width[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:113
void proc_frame_ctx_release(proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: proc_if.c:125
int linesize[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:107
struct thr_ctx_s thr_ctx_t
SUITE(UTESTS_LIVE555_RTSP)
int procs_opt(procs_ctx_t *procs_ctx, const char *tag,...)
Definition: procs.c:474
int procs_module_open(log_ctx_t *log_ctx)
Definition: procs.c:244
int procs_module_opt(const char *tag,...)
Definition: procs.c:294
int procs_recv_frame(procs_ctx_t *procs_ctx, int proc_id, proc_frame_ctx_t **ref_proc_frame_ctx)
Definition: procs.c:545
void procs_module_close()
Definition: procs.c:273
const proc_if_t proc_if_live555_rtsp_mux
int procs_send_frame(procs_ctx_t *procs_ctx, int proc_id, const proc_frame_ctx_t *proc_frame_ctx)
Definition: procs.c:504
const proc_if_t proc_if_live555_rtsp_dmux
size_t height[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:119
void procs_close(procs_ctx_t **ref_procs_ctx)
Definition: procs.c:417
const uint8_t * p_data[PROC_FRAME_NUM_DATA_POINTERS]
Definition: proc_if.h:94
procs_ctx_t * procs_open(log_ctx_t *log_ctx, size_t max_procs_num, const char *prefix_name, const char *procs_href)
Definition: procs.c:336
uint8_t * data
Definition: proc_if.h:84
static void procs_post(procs_ctx_t *procs_ctx, const char *proc_name, const char *proc_settings, int *ref_proc_id)
int64_t pts
Definition: proc_if.h:138