MediaProcessors
comm_udp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017, 2018 Rafael Antoniello
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of copyright holders nor the names of its
14  * contributors may be used to endorse or promote products derived
15  * from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18  * “AS IS” AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL COPYRIGHT HOLDERS OR CONTRIBUTORS
21  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  */
29 
35 #include "comm_udp.h"
36 
37 #include <stdlib.h>
38 #include <unistd.h>
39 #include <fcntl.h>
40 #include <sys/select.h>
41 #include <sys/socket.h>
42 #include <sys/un.h>
43 #include <errno.h>
44 #include <sys/types.h>
45 #include <inttypes.h>
46 #include <pthread.h>
47 #include <arpa/inet.h>
48 #include <sys/ioctl.h>
49 
50 #include "check_utils.h"
51 #include "log.h"
52 #include "stat_codes.h"
53 #include "comm.h"
54 #include "uri_parser.h"
55 
56 /* **** Definitions **** */
57 
58 #define UDP_COM_ADDR_IS_MULTICAST(addr) IN_MULTICAST(htonl(addr))
59 #define UDP_COM_SOCKET_PROT 0
60 #define UDP_COM_DATAGRAM_BUF_SIZE (1024*1024*1024) // 1GB
61 
65 typedef struct comm_udp_ctx_s {
75  int fd;
80  volatile int flag_exit;
88 
89 /* **** Prototypes **** */
90 
91 static comm_ctx_t* comm_udp_open(const char *url, const char *local_url,
92  comm_mode_t comm_mode, log_ctx_t *log_ctx, va_list arg);
93 static void comm_udp_close(comm_ctx_t **ref_comm_ctx);
94 static int comm_udp_send(comm_ctx_t* comm_ctx, const void *buf, size_t count,
95  struct timeval* timeout);
96 static int comm_udp_recv(comm_ctx_t *comm_ctx, void** ref_buf,
97  size_t *ref_count, char **ref_from, struct timeval *timeout);
98 static int comm_udp_unblock(comm_ctx_t *comm_ctx);
99 
100 /* **** Implementations **** */
101 
103 {
104  "udp",
105  comm_udp_open,
106  comm_udp_close,
107  comm_udp_send,
108  comm_udp_recv,
109  comm_udp_unblock
110 };
111 
112 /*
113  * NOTE: On some linux systems stack configurations may apply; e.g.:
114  * sudo sysctl -w net.core.rmem_max=8388608
115  * sudo sysctl -w net.core.rmem_default=65536
116  * sudo sysctl -w net.ipv4.udp_mem='4096 87380 1024000000'
117  * (...)
118  */
119 static comm_ctx_t* comm_udp_open(const char *url, const char *local_url,
120  comm_mode_t comm_mode, log_ctx_t *log_ctx, va_list arg)
121 {
122  int fd;
123  uint16_t port;
124  struct sockaddr_in service;
125 #ifdef ip_mreqn
126  struct ip_mreqn mgroup;
127 #else
128  /* In BSD/LINUX it is also possible to use ip_mreq instead of ip_mreqn */
129  struct ip_mreq mgroup;
130 #endif
131  int multiple_apps;
132  comm_udp_ctx_t *comm_udp_ctx= NULL;
133  const int ttl= 16; // value associated with IP multicast traffic on socket
134 #ifdef UDP_COM_DATAGRAM_BUF_SIZE
135  const int int_size= sizeof(int);
136  int stack_buf_size= UDP_COM_DATAGRAM_BUF_SIZE;
137 #endif
138  char *host_text= NULL, *port_text= NULL;
139  int mode, end_code= STAT_EAFNOSUPPORT;
140  const int so_priority= 7;
141  LOG_CTX_INIT(log_ctx);
142 
143  /* Check arguments */
144  CHECK_DO(url!= NULL && strlen(url)> 0, return NULL);
145  // argument 'local_url' is allowed to be NULL in certain implementations
146  // (not used in UDP implementation)
147  CHECK_DO(comm_mode< COMM_MODE_MAX, return NULL);
148  // argument 'log_ctx' is allowed to be NULL
149 
150  /* Allocate context structure */
151  comm_udp_ctx= (comm_udp_ctx_t*)calloc(1, sizeof(comm_udp_ctx_t));
152  CHECK_DO(comm_udp_ctx!= NULL, goto end);
153 
154  /* **** Initialize context structure **** */
155 
156  comm_udp_ctx->fd= -1; // set to non-valid file-descriptor value
157 
158  comm_udp_ctx->flag_exit= 0;
159 
160  CHECK_DO(pipe(comm_udp_ctx->pipe_exit_signal)== 0, goto end);
161 
162  /* **** Initialize protocol stack **** */
163 
164  /* Create a SOCKET object. Blocking mode is enabled by default. */
165  fd= socket(AF_INET, SOCK_DGRAM, UDP_COM_SOCKET_PROT);
166  CHECK_DO(fd>= 0, goto end);
167  comm_udp_ctx->fd= fd;
168 
169  /* Set priority */
170  CHECK_DO(setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &so_priority,
171  sizeof(int))== 0, goto end);
172 
173  /* Configure host address and port for the socket that is being bound */
174  if((host_text= uri_parser_get_uri_part(url, HOSTTEXT))== NULL) {
175  end_code= STAT_EAFNOSUPPORT_HOSTNAME;
176  goto end;
177  }
178  if((port_text= uri_parser_get_uri_part(url, PORTTEXT))== NULL) {
179  end_code= STAT_EAFNOSUPPORT_PORT;
180  goto end;
181  }
182  port= atoi(port_text);
183  memset((char *) &service, 0, sizeof(service));
184  service.sin_family= AF_INET;
185  service.sin_port= htons(port);
186  service.sin_addr.s_addr= inet_addr(host_text);
187 
188  /* Bind the socket */
189  switch(comm_mode) {
190  case COMM_MODE_IPUT:
191  mode= SO_RCVBUF;
192  /* Allow a socket to forcibly bind to a port in use by another socket
193  * (reuse)
194  */
195  multiple_apps= 1;
196  CHECK_DO(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &multiple_apps,
197  sizeof(int))>= 0, goto end);
198  multiple_apps= 1;
199  CHECK_DO(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &multiple_apps,
200  sizeof(int))>= 0, goto end);
201  /* Bind */
202  CHECK_DO(bind(fd, (struct sockaddr*)&service, sizeof(
203  struct sockaddr_in))== 0, goto end);
204  break;
205  case COMM_MODE_OPUT:
206  mode= SO_SNDBUF;
207  break;
208  default:
209  LOGE("Not supported argument\n");
210  goto end;
211  }
212 
213 #ifdef UDP_COM_DATAGRAM_BUF_SIZE
214  /* Redefine receiver/sender buffer size, and check if it is possible */
215  CHECK_DO(setsockopt(fd, SOL_SOCKET, mode, &stack_buf_size,
216  sizeof(stack_buf_size))== 0, goto end);
217  stack_buf_size= 0;
218  CHECK_DO(getsockopt(fd, SOL_SOCKET, mode, &stack_buf_size,
219  (socklen_t*)&int_size)== 0, goto end);
220  //LOGV("UDP Stack_buf_size: %d\n", stack_buf_size); //comment-me
221  //CHECK_DO(stack_buf_size== UDP_COM_DATAGRAM_BUF_SIZE, goto end);
222 #endif
223 
224  if(UDP_COM_ADDR_IS_MULTICAST(inet_addr(host_text))) {
225  /* Set time-to-live */
226  setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(int));
227  if(mode== SO_RCVBUF) {
228  /* Join multicast group */
229  memset((char*)&mgroup, 0, sizeof(mgroup));
230  mgroup.imr_multiaddr.s_addr= inet_addr(host_text);
231 #ifdef ip_mreqn
232  mgroup.imr_address.s_addr= htonl(INADDR_ANY);
233 #else
234  mgroup.imr_interface.s_addr= htonl(INADDR_ANY);
235 #endif
236  CHECK_DO(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
237  (char*)&mgroup, sizeof(mgroup))== 0, goto end);
238  }
239  } else {
240  /* Set time-to-live */
241  setsockopt(fd, IPPROTO_IP, IP_TTL, &ttl, sizeof(int));
242  }
243 
244  end_code= STAT_SUCCESS;
245 end:
246  if(end_code!= STAT_SUCCESS)
247  comm_udp_close((comm_ctx_t**)&comm_udp_ctx);
248  if(host_text!= NULL)
249  free(host_text);
250  if(port_text!= NULL)
251  free(port_text);
252  return (comm_ctx_t*)comm_udp_ctx;
253 }
254 
255 static void comm_udp_close(comm_ctx_t **ref_comm_ctx)
256 {
257  comm_udp_ctx_t* comm_udp_ctx;
258 
259  /* check argument */
260  if(ref_comm_ctx== NULL ||
261  (comm_udp_ctx= (comm_udp_ctx_t*)*ref_comm_ctx)== NULL)
262  return;
263 
264  comm_udp_unblock((comm_ctx_t*)comm_udp_ctx);
265 
266  /* Release associated sockets */
267  if(comm_udp_ctx->fd>= 0) {
268  close(comm_udp_ctx->fd);
269  comm_udp_ctx->fd= -1;
270  }
271 
272  /* Close 'exit' signaling pipe */
273  if(comm_udp_ctx->pipe_exit_signal[0]>= 0) {
274  close(comm_udp_ctx->pipe_exit_signal[0]);
275  comm_udp_ctx->pipe_exit_signal[0]= -1;
276  }
277  if(comm_udp_ctx->pipe_exit_signal[1]>= 0) {
278  close(comm_udp_ctx->pipe_exit_signal[1]);
279  comm_udp_ctx->pipe_exit_signal[1]= -1;
280  }
281 
282  free(comm_udp_ctx);
283  *ref_comm_ctx= NULL;
284 }
285 
286 static int comm_udp_send(comm_ctx_t* comm_ctx, const void *buf, size_t count,
287  struct timeval* timeout)
288 {
289  fd_set fds;
290  uint16_t port;
291  struct sockaddr_in service;
292  int errno, bytes_io= 0, select_ret= -1, end_code= STAT_ERROR;
293  comm_udp_ctx_t *comm_udp_ctx= NULL; // Do not release (alias)
294  char *host_text= NULL, *port_text= NULL;
295  LOG_CTX_INIT(NULL);
296 
297  /* Check arguments.
298  * Note: 'timeout' is allowed to be NULL (which means "wait indefinitely").
299  */
300  CHECK_DO(comm_ctx!= NULL, return STAT_ERROR);
301  CHECK_DO(buf!= NULL, return STAT_ERROR);
302  CHECK_DO(count> 0, return STAT_ERROR);
303 
304  LOG_CTX_SET(comm_ctx->log_ctx);
305 
306  comm_udp_ctx= (comm_udp_ctx_t*)comm_ctx;
307 
308  /* Return "end of file" if module is requested to exit */
309  if(comm_udp_ctx->flag_exit!= 0) {
310  end_code= STAT_EOF;
311  goto end;
312  }
313 
314  /* Check output operation with select */
315  FD_ZERO(&fds);
316  FD_SET(comm_udp_ctx->fd, &fds);
317  select_ret= select(FD_SETSIZE, NULL, &fds, NULL, timeout);
318  if(select_ret< 0) {
319  LOGE("'select()' failed\n");
320  goto end;
321  } else if(select_ret== 0) {
322  //LOGV("'select()' timeout\n"); //comment-me
323  end_code= STAT_ETIMEDOUT;
324  goto end;
325  }
326 
327  /* Configure destination address and port */
328  if((host_text= uri_parser_get_uri_part(comm_ctx->url, HOSTTEXT))== NULL) {
329  end_code= STAT_EAFNOSUPPORT_HOSTNAME;
330  goto end;
331  }
332  if((port_text= uri_parser_get_uri_part(comm_ctx->url, PORTTEXT))== NULL) {
333  end_code= STAT_EAFNOSUPPORT_PORT;
334  goto end;
335  }
336  port= atoi(port_text);
337  memset((char *) &service, 0, sizeof(service));
338  service.sin_family= AF_INET;
339  service.sin_port= htons(port);
340  service.sin_addr.s_addr= inet_addr(host_text);
341 
342  /* Perform output operation */
343  errno= 0;
344  bytes_io= sendto(comm_udp_ctx->fd, buf, count, 0,
345  (struct sockaddr*)&service, sizeof(struct sockaddr_in));
346  if(bytes_io< 0) {
347  if(comm_udp_ctx->flag_exit== 0)
348  LOGE("Error occurred, errno: %d\n", errno);
349  else
350  end_code= STAT_EOF;
351  goto end;
352  }
353 
354  end_code= STAT_SUCCESS;
355 end:
356  if(host_text!= NULL)
357  free(host_text);
358  if(port_text!= NULL)
359  free(port_text);
360  return end_code;
361 }
362 
363 static int comm_udp_recv(comm_ctx_t *comm_ctx, void** ref_buf,
364  size_t *ref_count, char **ref_from, struct timeval *timeout)
365 {
366  fd_set fds;
367  struct timeval* select_tv;
368  uint8_t recv_buf[UDP_COM_RECV_DGRAM_MAXSIZE];
369  int errno, bytes_io= 0, select_ret= -1, end_code= STAT_ERROR;
370  comm_udp_ctx_t *comm_udp_ctx= NULL; // Do not release (alias)
371  struct timeval tv_zero= {0, 0};
372  struct sockaddr_in src_addr= {0};
373  socklen_t src_addr_len= sizeof(struct sockaddr_in);
374  void *buf= NULL;
375  LOG_CTX_INIT(NULL);
376 
377  /* Check arguments.
378  * Notes:
379  * - Argument 'timeout' is allowed to be NULL (which means "wait
380  * indefinitely");
381  * - Argument 'ref_from' is allowed to be NULL (no source address is
382  * returned).
383  */
384  CHECK_DO(comm_ctx!= NULL, return STAT_ERROR);
385  CHECK_DO(ref_buf!= NULL, return STAT_ERROR);
386  CHECK_DO(ref_count!= NULL, return STAT_ERROR);
387 
388  LOG_CTX_SET(comm_ctx->log_ctx);
389 
390  comm_udp_ctx= (comm_udp_ctx_t*)comm_ctx;
391 
392  /* Return "end of file" if module is requested to exit */
393  if(comm_udp_ctx->flag_exit!= 0) {
394  end_code= STAT_EOF;
395  goto end;
396  }
397 
398  /* Check input operation with select */
399  FD_ZERO(&fds);
400  FD_SET(comm_udp_ctx->fd, &fds);
401  FD_SET(comm_udp_ctx->pipe_exit_signal[0], &fds); // "exit" signal
402  select_tv= (comm_udp_ctx->flag_exit== 0)? timeout: &tv_zero;
403  select_ret= select(FD_SETSIZE, &fds, NULL, NULL, select_tv);
404  if(select_ret< 0) {
405  LOGE("'select()' failed\n");
406  goto end;
407  } else if(select_ret== 0) {
408  LOGV("'select()' timeout\n"); // comment-me
409  end_code= STAT_ETIMEDOUT;
410  goto end;
411  } else {
412  //LOGV("'select()': i/o operation is available now!\n"); // comment-me
413  if(FD_ISSET(comm_udp_ctx->pipe_exit_signal[0], &fds)) {
414  end_code= STAT_EOF;
415  goto end;
416  }
417  }
418  CHECK_DO(FD_ISSET(comm_udp_ctx->fd, &fds)> 0, goto end);
419 
420  /* Perform input operation */
421  errno= 0;
422  bytes_io= recvfrom(comm_udp_ctx->fd, (void*)recv_buf,
423  UDP_COM_RECV_DGRAM_MAXSIZE, 0, (struct sockaddr*)&src_addr,
424  &src_addr_len);
425  if(bytes_io< 0) {
426  if(comm_udp_ctx->flag_exit== 0)
427  LOGE("Error occurred, errno: %d\n", errno);
428  else
429  end_code= STAT_EOF;
430  goto end;
431  } else if(bytes_io> UDP_COM_RECV_DGRAM_MAXSIZE) {
432  LOGE("Bad argument: The maximum datagram size that can be received is "
433  "%d bytes length.\n", (int)UDP_COM_RECV_DGRAM_MAXSIZE);
434  goto end;
435  }
436 
437  if(bytes_io> 0) {
438  buf= malloc((size_t)bytes_io);
439  CHECK_DO(buf!= NULL, goto end);
440  memcpy(buf, recv_buf, (size_t)bytes_io);
441  *ref_buf= buf;
442  buf= NULL; // Avoid double referencing
443  }
444  *ref_count= (size_t)bytes_io;
445  if(ref_from!= NULL)
446  *ref_from= strdup(inet_ntoa(src_addr.sin_addr));
447 
448  end_code= STAT_SUCCESS;
449 end:
450  if(buf!= NULL) {
451  free(buf);
452  buf= NULL;
453  }
454  return end_code;
455 }
456 
457 static int comm_udp_unblock(comm_ctx_t *comm_ctx)
458 {
459  int fd;
460  comm_udp_ctx_t *comm_udp_ctx= NULL; // Do not release (alias)
461  LOG_CTX_INIT(NULL);
462 
463  /* Check arguments */
464  CHECK_DO(comm_ctx!= NULL, return STAT_ERROR);
465 
466  comm_udp_ctx= (comm_udp_ctx_t*)comm_ctx;
467 
468  /* Mark "exit state" */
469  comm_udp_ctx->flag_exit= 1;
470 
471  /* Send exit signal to force I/O 'select()' to unblock before closing */
472  if((fd= comm_udp_ctx->pipe_exit_signal[1])>= 0) {
473  fd_set fds;
474  struct timeval tv_zero= {0, 0};
475 
476  FD_ZERO(&fds);
477  FD_SET(fd, &fds);
478  if(select(FD_SETSIZE, NULL, &fds, NULL, &tv_zero)> 0) {
479  ASSERT(write(comm_udp_ctx->pipe_exit_signal[1], "exit",
480  strlen("exit"))== strlen("exit"));
481  } else {
482  /* Sanity check; this should never happen in a non-buggy
483  * implementation
484  */
485  LOGE("Could not send 'exit' signal to COMM-UDP instance\n");
486  }
487 
488  /* Close write-end of signaling pipe */
489  close(fd); // reader will see EOF
490  comm_udp_ctx->pipe_exit_signal[1]= -1;
491  }
492  return STAT_SUCCESS;
493 }
URI parser wrapper.
comm_mode_t comm_mode
Definition: comm.h:95
volatile int flag_exit
Definition: comm_udp.c:80
Generic communication module.
#define UDP_COM_RECV_DGRAM_MAXSIZE
Definition: comm_udp.h:44
char * url
Definition: comm.h:103
int pipe_exit_signal[2]
Definition: comm_udp.c:86
UDP communication module.
struct comm_ctx_s comm_ctx
Definition: comm_udp.c:71
char * local_url
Definition: comm.h:99
General status codes enumeration.
#define CHECK_DO(COND, ACTION)
Definition: check_utils.h:57
log_ctx_t * log_ctx
Definition: comm.h:91
#define ASSERT(COND)
Definition: check_utils.h:51
struct comm_udp_ctx_s comm_udp_ctx_t
Definition: log.c:102
enum comm_mode_enum comm_mode_t
const comm_if_t comm_if_udp
Definition: comm_udp.c:102
Definition: comm.h:64