40 #include <sys/select.h> 41 #include <sys/socket.h> 44 #include <sys/types.h> 47 #include <arpa/inet.h> 48 #include <sys/ioctl.h> 58 #define UDP_COM_ADDR_IS_MULTICAST(addr) IN_MULTICAST(htonl(addr)) 59 #define UDP_COM_SOCKET_PROT 0 60 #define UDP_COM_DATAGRAM_BUF_SIZE (1024*1024*1024) // 1GB 93 static void comm_udp_close(
comm_ctx_t **ref_comm_ctx);
95 struct timeval* timeout);
97 size_t *ref_count,
char **ref_from,
struct timeval *timeout);
124 struct sockaddr_in service;
126 struct ip_mreqn mgroup;
129 struct ip_mreq mgroup;
134 #ifdef UDP_COM_DATAGRAM_BUF_SIZE 135 const int int_size=
sizeof(int);
136 int stack_buf_size= UDP_COM_DATAGRAM_BUF_SIZE;
138 char *host_text= NULL, *port_text= NULL;
139 int mode, end_code= STAT_EAFNOSUPPORT;
140 const int so_priority= 7;
141 LOG_CTX_INIT(log_ctx);
144 CHECK_DO(url!= NULL && strlen(url)> 0,
return NULL);
147 CHECK_DO(comm_mode< COMM_MODE_MAX,
return NULL);
152 CHECK_DO(comm_udp_ctx!= NULL,
goto end);
156 comm_udp_ctx->
fd= -1;
165 fd= socket(AF_INET, SOCK_DGRAM, UDP_COM_SOCKET_PROT);
167 comm_udp_ctx->
fd=
fd;
170 CHECK_DO(setsockopt(fd, SOL_SOCKET, SO_PRIORITY, &so_priority,
171 sizeof(
int))== 0,
goto end);
174 if((host_text= uri_parser_get_uri_part(url, HOSTTEXT))== NULL) {
178 if((port_text= uri_parser_get_uri_part(url, PORTTEXT))== NULL) {
182 port= atoi(port_text);
183 memset((
char *) &service, 0,
sizeof(service));
184 service.sin_family= AF_INET;
185 service.sin_port= htons(port);
186 service.sin_addr.s_addr= inet_addr(host_text);
196 CHECK_DO(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &multiple_apps,
197 sizeof(
int))>= 0,
goto end);
199 CHECK_DO(setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &multiple_apps,
200 sizeof(
int))>= 0,
goto end);
202 CHECK_DO(bind(fd, (
struct sockaddr*)&service,
sizeof(
203 struct sockaddr_in))== 0,
goto end);
209 LOGE(
"Not supported argument\n");
213 #ifdef UDP_COM_DATAGRAM_BUF_SIZE 215 CHECK_DO(setsockopt(fd, SOL_SOCKET, mode, &stack_buf_size,
216 sizeof(stack_buf_size))== 0,
goto end);
218 CHECK_DO(getsockopt(fd, SOL_SOCKET, mode, &stack_buf_size,
219 (socklen_t*)&int_size)== 0,
goto end);
224 if(UDP_COM_ADDR_IS_MULTICAST(inet_addr(host_text))) {
226 setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl,
sizeof(
int));
227 if(mode== SO_RCVBUF) {
229 memset((
char*)&mgroup, 0,
sizeof(mgroup));
230 mgroup.imr_multiaddr.s_addr= inet_addr(host_text);
232 mgroup.imr_address.s_addr= htonl(INADDR_ANY);
234 mgroup.imr_interface.s_addr= htonl(INADDR_ANY);
236 CHECK_DO(setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
237 (
char*)&mgroup,
sizeof(mgroup))== 0,
goto end);
241 setsockopt(fd, IPPROTO_IP, IP_TTL, &ttl,
sizeof(
int));
244 end_code= STAT_SUCCESS;
246 if(end_code!= STAT_SUCCESS)
255 static void comm_udp_close(
comm_ctx_t **ref_comm_ctx)
260 if(ref_comm_ctx== NULL ||
267 if(comm_udp_ctx->
fd>= 0) {
268 close(comm_udp_ctx->
fd);
269 comm_udp_ctx->
fd= -1;
287 struct timeval* timeout)
291 struct sockaddr_in service;
292 int errno, bytes_io= 0, select_ret= -1, end_code= STAT_ERROR;
294 char *host_text= NULL, *port_text= NULL;
300 CHECK_DO(comm_ctx!= NULL,
return STAT_ERROR);
301 CHECK_DO(buf!= NULL,
return STAT_ERROR);
302 CHECK_DO(count> 0,
return STAT_ERROR);
304 LOG_CTX_SET(comm_ctx->
log_ctx);
316 FD_SET(comm_udp_ctx->
fd, &fds);
317 select_ret= select(FD_SETSIZE, NULL, &fds, NULL, timeout);
319 LOGE(
"'select()' failed\n");
321 }
else if(select_ret== 0) {
323 end_code= STAT_ETIMEDOUT;
328 if((host_text= uri_parser_get_uri_part(comm_ctx->
url, HOSTTEXT))== NULL) {
332 if((port_text= uri_parser_get_uri_part(comm_ctx->
url, PORTTEXT))== NULL) {
336 port= atoi(port_text);
337 memset((
char *) &service, 0,
sizeof(service));
338 service.sin_family= AF_INET;
339 service.sin_port= htons(port);
340 service.sin_addr.s_addr= inet_addr(host_text);
344 bytes_io= sendto(comm_udp_ctx->
fd, buf, count, 0,
345 (
struct sockaddr*)&service,
sizeof(
struct sockaddr_in));
348 LOGE(
"Error occurred, errno: %d\n", errno);
354 end_code= STAT_SUCCESS;
363 static int comm_udp_recv(
comm_ctx_t *comm_ctx,
void** ref_buf,
364 size_t *ref_count,
char **ref_from,
struct timeval *timeout)
367 struct timeval* select_tv;
369 int errno, bytes_io= 0, select_ret= -1, end_code= STAT_ERROR;
371 struct timeval tv_zero= {0, 0};
372 struct sockaddr_in src_addr= {0};
373 socklen_t src_addr_len=
sizeof(
struct sockaddr_in);
384 CHECK_DO(comm_ctx!= NULL,
return STAT_ERROR);
385 CHECK_DO(ref_buf!= NULL,
return STAT_ERROR);
386 CHECK_DO(ref_count!= NULL,
return STAT_ERROR);
388 LOG_CTX_SET(comm_ctx->
log_ctx);
400 FD_SET(comm_udp_ctx->
fd, &fds);
402 select_tv= (comm_udp_ctx->
flag_exit== 0)? timeout: &tv_zero;
403 select_ret= select(FD_SETSIZE, &fds, NULL, NULL, select_tv);
405 LOGE(
"'select()' failed\n");
407 }
else if(select_ret== 0) {
408 LOGV(
"'select()' timeout\n");
409 end_code= STAT_ETIMEDOUT;
418 CHECK_DO(FD_ISSET(comm_udp_ctx->
fd, &fds)> 0,
goto end);
422 bytes_io= recvfrom(comm_udp_ctx->
fd, (
void*)recv_buf,
427 LOGE(
"Error occurred, errno: %d\n", errno);
432 LOGE(
"Bad argument: The maximum datagram size that can be received is " 438 buf= malloc((
size_t)bytes_io);
440 memcpy(buf, recv_buf, (
size_t)bytes_io);
444 *ref_count= (size_t)bytes_io;
446 *ref_from= strdup(inet_ntoa(src_addr.sin_addr));
448 end_code= STAT_SUCCESS;
457 static int comm_udp_unblock(
comm_ctx_t *comm_ctx)
464 CHECK_DO(comm_ctx!= NULL,
return STAT_ERROR);
474 struct timeval tv_zero= {0, 0};
478 if(select(FD_SETSIZE, NULL, &fds, NULL, &tv_zero)> 0) {
480 strlen(
"exit"))== strlen(
"exit"));
485 LOGE(
"Could not send 'exit' signal to COMM-UDP instance\n");
Generic communication module.
#define UDP_COM_RECV_DGRAM_MAXSIZE
UDP communication module.
struct comm_ctx_s comm_ctx
General status codes enumeration.
#define CHECK_DO(COND, ACTION)
struct comm_udp_ctx_s comm_udp_ctx_t
enum comm_mode_enum comm_mode_t
const comm_if_t comm_if_udp