56 #define TAG_IS(TAG) (strcmp(tag, TAG)== 0) 83 static int unregister_comm_if(
const char *scheme,
log_ctx_t *log_ctx);
84 static const comm_if_t* get_comm_if_by_scheme(
const char *scheme,
90 static void comm_if_release(
comm_if_t **ref_comm_if);
92 static int comm_module_url_probe(
const char *url);
103 int ret_code, end_code= STAT_ERROR;
104 LOG_CTX_INIT(log_ctx);
107 if(comm_module_ctx!= NULL) {
108 LOGE(
"'COMM' module already initialized\n");
113 CHECK_DO(comm_module_ctx!= NULL,
goto end);
122 end_code= STAT_SUCCESS;
124 if(end_code!= STAT_SUCCESS)
134 if(comm_module_ctx== NULL) {
135 LOGE(
"'COMM' module must be initialized previously\n");
146 free(comm_module_ctx);
147 comm_module_ctx= NULL;
153 int end_code= STAT_ERROR;
157 if(comm_module_ctx== NULL) {
158 LOGE(
"'COMM' module should be initialized previously\n");
161 CHECK_DO(tag!= NULL,
return STAT_ERROR);
168 if(
TAG_IS(
"COMM_REGISTER_PROTO")) {
169 end_code= register_comm_if(va_arg(arg,
comm_if_t*), LOG_CTX_GET());
170 }
else if (
TAG_IS(
"COMM_UNREGISTER_PROTO")) {
171 end_code= unregister_comm_if(va_arg(arg,
const char*), LOG_CTX_GET());
173 LOGE(
"Unknown option\n");
174 end_code= STAT_ENOTFOUND;
182 comm_ctx_t* comm_open(
const char *url,
const char *local_url,
187 int ret_code, end_code= STAT_ERROR;
188 char *uri_scheme= NULL;
189 size_t uri_scheme_size= 0;
196 CHECK_DO(comm_module_ctx!= NULL,
return NULL);
197 CHECK_DO(url!= NULL && strlen(url)> 0,
return NULL);
200 CHECK_DO(comm_mode< COMM_MODE_MAX,
return NULL);
203 va_start(arg, log_ctx);
206 LOG_CTX_SET(log_ctx);
209 uri_scheme= uri_parser_get_uri_part(url, SCHEME);
210 if(uri_scheme== NULL || (uri_scheme_size= strlen(uri_scheme))<= 0) {
211 end_code= STAT_ENOPROTOOPT;
215 comm_if= get_comm_if_by_scheme(uri_scheme, LOG_CTX_GET());
220 CHECK_DO((open= comm_if->open)!= NULL,
goto end);
221 CHECK_DO(comm_if->close!= NULL,
goto end);
224 comm_ctx= open(url, local_url, comm_mode, LOG_CTX_GET(), arg);
225 CHECK_DO(comm_ctx!= NULL,
goto end);
230 comm_ctx->comm_if= comm_if;
233 ret_code= pthread_mutex_init(&comm_ctx->api_mutex, NULL);
237 comm_ctx->log_ctx= LOG_CTX_GET();
240 comm_ctx->comm_mode= comm_mode;
243 if(local_url!= NULL) {
244 comm_ctx->local_url= strdup(local_url);
245 CHECK_DO(comm_ctx->local_url!= NULL,
goto end);
249 comm_ctx->url= strdup(url);
250 CHECK_DO(comm_ctx->url!= NULL,
goto end);
252 end_code= STAT_SUCCESS;
255 if(uri_scheme!= NULL) {
259 if(end_code!= STAT_SUCCESS)
260 comm_close(&comm_ctx);
270 if(ref_comm_ctx== NULL || (comm_ctx= *ref_comm_ctx)== NULL)
274 ASSERT(comm_if!= NULL && comm_if->close!= NULL);
283 if(comm_ctx->
url!= NULL){
289 if(comm_if!= NULL && comm_if->close!= NULL)
290 comm_if->close(ref_comm_ctx);
293 int comm_send(
comm_ctx_t *comm_ctx,
const void *buf,
size_t count,
294 struct timeval *timeout)
300 CHECK_DO(comm_ctx!= NULL,
return STAT_ERROR);
301 CHECK_DO(buf!= NULL,
return STAT_ERROR);
302 CHECK_DO(count> 0,
return STAT_ERROR);
305 LOG_CTX_SET(comm_ctx->
log_ctx);
309 if(comm_ctx->
comm_mode!= COMM_MODE_OPUT || comm_ctx->
comm_if->send== NULL) {
310 LOGE(
"Communication interface does not implement 'send()' function.");
313 pthread_mutex_lock(&comm_ctx->
api_mutex);
314 ret_code= comm_ctx->
comm_if->send(comm_ctx, buf, count, timeout);
315 pthread_mutex_unlock(&comm_ctx->
api_mutex);
319 int comm_recv(
comm_ctx_t *comm_ctx,
void** ref_buf,
size_t *ref_count,
320 char **ref_from,
struct timeval* timeout)
326 CHECK_DO(comm_ctx!= NULL,
return STAT_ERROR);
327 CHECK_DO(ref_buf!= NULL,
return STAT_ERROR);
328 CHECK_DO(ref_count!= NULL,
return STAT_ERROR);
332 LOG_CTX_SET(comm_ctx->
log_ctx);
341 if(comm_ctx->
comm_mode!= COMM_MODE_IPUT || comm_ctx->
comm_if->recv== NULL) {
342 LOGE(
"Communication interface does not implement 'recv()' function.");
345 pthread_mutex_lock(&comm_ctx->
api_mutex);
346 ret_code= comm_ctx->
comm_if->recv(comm_ctx, ref_buf, ref_count, ref_from,
348 pthread_mutex_unlock(&comm_ctx->
api_mutex);
357 CHECK_DO(comm_ctx!= NULL,
return STAT_ERROR);
359 LOG_CTX_SET(comm_ctx->
log_ctx);
363 if(comm_ctx->
comm_if->unblock) {
368 comm_ctx->
comm_if->unblock(comm_ctx);
374 int comm_open_external(pthread_mutex_t *comm_ctx_mutex_external,
375 const char *url,
const char *local_url,
comm_mode_t comm_mode,
378 int ret_code, end_code= STAT_ENOPROTOOPT;
380 LOG_CTX_INIT(log_ctx);
383 CHECK_DO(comm_module_ctx!= NULL,
return STAT_ERROR);
384 CHECK_DO(comm_ctx_mutex_external!= NULL,
return STAT_ERROR);
385 CHECK_DO(url!= NULL && strlen(url)> 0,
return STAT_ERROR);
388 CHECK_DO(comm_mode< COMM_MODE_MAX,
return STAT_ERROR);
390 CHECK_DO(ref_comm_ctx!= NULL,
return STAT_ERROR);
393 if((ret_code= comm_module_url_probe(url))!= STAT_SUCCESS) {
399 comm_ctx= comm_open(url, local_url, comm_mode, LOG_CTX_GET());
406 ASSERT(pthread_mutex_lock(comm_ctx_mutex_external)== 0);
407 if(*ref_comm_ctx!= NULL) {
408 LOGE(
"Trying to open an already opened communication interface\n");
409 end_code= STAT_ECONFLICT;
411 *ref_comm_ctx= comm_ctx;
413 end_code= STAT_SUCCESS;
415 ASSERT(pthread_mutex_unlock(comm_ctx_mutex_external)== 0);
418 comm_close_external(comm_ctx_mutex_external, &comm_ctx, LOG_CTX_GET());
422 void comm_close_external(pthread_mutex_t *comm_ctx_mutex_external,
425 LOG_CTX_INIT(log_ctx);
428 CHECK_DO(comm_module_ctx!= NULL,
return);
429 CHECK_DO(comm_ctx_mutex_external!= NULL,
return);
430 CHECK_DO(ref_comm_ctx!= NULL,
return);
433 if(*ref_comm_ctx!= NULL)
434 comm_unblock(*ref_comm_ctx);
437 ASSERT(pthread_mutex_lock(comm_ctx_mutex_external)== 0);
438 comm_close(ref_comm_ctx);
440 ASSERT(pthread_mutex_unlock(comm_ctx_mutex_external)== 0);
444 int comm_reset_external(pthread_mutex_t *comm_ctx_mutex_external,
445 const char *new_url,
const char *local_url,
comm_mode_t comm_mode,
448 char *p_curr_url= NULL;
449 LOG_CTX_INIT(log_ctx);
452 CHECK_DO(comm_module_ctx!= NULL,
return STAT_ERROR);
453 CHECK_DO(comm_ctx_mutex_external!= NULL,
return STAT_ERROR);
457 CHECK_DO(comm_mode< COMM_MODE_MAX,
return STAT_ERROR);
459 CHECK_DO(ref_comm_ctx_curr!= NULL,
return STAT_ERROR);
462 p_curr_url= (*ref_comm_ctx_curr)!= NULL? (*ref_comm_ctx_curr)->url: NULL;
463 if(new_url!= NULL && p_curr_url!= NULL && strcmp(new_url, p_curr_url)== 0)
464 return STAT_NOTMODIFIED;
467 if(new_url== NULL || (new_url!= NULL && strcmp(new_url,
"")== 0)) {
468 comm_close_external(comm_ctx_mutex_external, ref_comm_ctx_curr,
476 comm_close_external(comm_ctx_mutex_external, ref_comm_ctx_curr,
478 return comm_open_external(comm_ctx_mutex_external, new_url, local_url,
479 comm_mode, LOG_CTX_GET(), ref_comm_ctx_curr);
482 int comm_recv_external(pthread_mutex_t *comm_ctx_mutex_external,
483 comm_ctx_t **ref_comm_ctx,
void** ref_buf,
size_t *ref_count,
484 char **ref_from,
struct timeval* timeout,
log_ctx_t *log_ctx)
486 int end_code= STAT_ENODATA;
487 LOG_CTX_INIT(log_ctx);
490 CHECK_DO(comm_module_ctx!= NULL,
return STAT_ERROR);
491 CHECK_DO(comm_ctx_mutex_external!= NULL,
return STAT_ERROR);
492 CHECK_DO(ref_comm_ctx!= NULL,
return STAT_ERROR);
493 CHECK_DO(ref_buf!= NULL,
return STAT_ERROR);
494 CHECK_DO(ref_count!= NULL,
return STAT_ERROR);
498 ASSERT(pthread_mutex_lock(comm_ctx_mutex_external)== 0);
499 if(*ref_comm_ctx!= NULL) {
500 end_code= comm_recv(*ref_comm_ctx, ref_buf, ref_count, ref_from,
503 ASSERT(pthread_mutex_unlock(comm_ctx_mutex_external)== 0);
510 int ret_code, end_code= STAT_ERROR;
512 LOG_CTX_INIT(log_ctx);
515 CHECK_DO(comm_module_ctx!= NULL,
return STAT_ERROR);
516 CHECK_DO(comm_if!= NULL,
return STAT_ERROR);
520 CHECK_DO(ret_code== EBUSY,
return STAT_ERROR);
523 for(n= comm_module_ctx->
comm_if_llist; n!= NULL; n= n->next) {
525 CHECK_DO(comm_if_nth!= NULL,
continue);
526 if(strcmp(comm_if_nth->scheme, comm_if->scheme)== 0) {
527 end_code= STAT_ECONFLICT;
533 comm_if_cpy= comm_if_dup(comm_if);
536 CHECK_DO(comm_if_cpy!= NULL,
goto end);
538 CHECK_DO(ret_code== STAT_SUCCESS,
goto end);
540 end_code= STAT_SUCCESS;
542 if(end_code!= STAT_SUCCESS)
543 comm_if_release(&comm_if_cpy);
547 static int unregister_comm_if(
const char *scheme,
log_ctx_t *log_ctx)
551 LOG_CTX_INIT(log_ctx);
554 CHECK_DO(comm_module_ctx!= NULL,
return STAT_ERROR);
555 CHECK_DO(scheme!= NULL,
return STAT_ERROR);
559 CHECK_DO(ret_code== EBUSY,
return STAT_ERROR);
562 ref_n= &((*ref_n)->next)) {
564 CHECK_DO(comm_if_nth!= NULL,
continue);
566 if(strcmp(comm_if_nth->scheme, scheme)== 0) {
570 ASSERT(node!= NULL && node== (
void*)comm_if_nth);
578 comm_if_release(&comm_if_nth);
579 ASSERT(comm_if_nth== NULL);
583 return STAT_ENOTFOUND;
586 static const comm_if_t* get_comm_if_by_scheme(
const char *scheme,
591 LOG_CTX_INIT(log_ctx);
594 CHECK_DO(comm_module_ctx!= NULL,
return NULL);
595 CHECK_DO(scheme!= NULL && strlen(scheme)> 0,
return NULL);
599 CHECK_DO(ret_code== EBUSY,
return NULL);
602 for(n= comm_module_ctx->
comm_if_llist; n!= NULL; n= n->next) {
604 CHECK_DO(comm_if_nth!= NULL,
continue);
605 if(strcmp(comm_if_nth->scheme, scheme)== 0)
618 int end_code= STAT_ERROR;
623 CHECK_DO(comm_if_arg!= NULL,
return NULL);
626 comm_if= comm_if_allocate();
633 memcpy(comm_if, comm_if_arg,
sizeof(
comm_if_t));
637 CHECK_DO(comm_if_arg->scheme!= NULL,
goto end);
638 comm_if->scheme= strdup(comm_if_arg->scheme);
639 CHECK_DO(comm_if->scheme!= NULL,
goto end);
641 end_code= STAT_SUCCESS;
643 if(end_code!= STAT_SUCCESS)
644 comm_if_release(&comm_if);
655 CHECK_DO(comm_if1!= NULL,
return 1);
656 CHECK_DO(comm_if2!= NULL,
return 1);
659 if(strcmp(comm_if1->scheme, comm_if2->scheme)!= 0)
661 if(comm_if1->open!= comm_if2->open)
663 if(comm_if1->close!= comm_if2->close)
665 if(comm_if1->send!= comm_if2->send)
667 if(comm_if1->recv!= comm_if2->recv)
669 if(comm_if1->unblock!= comm_if2->unblock)
680 static void comm_if_release(
comm_if_t **ref_comm_if)
684 if(ref_comm_if== NULL || (comm_if= *ref_comm_if)== NULL)
687 if(comm_if->scheme!= NULL) {
688 free((
void*)comm_if->scheme);
689 comm_if->scheme= NULL;
696 static int comm_module_url_probe(
const char *url)
699 int end_code= STAT_ENOPROTOOPT;
700 size_t uri_part_size= 0;
701 char *uri_part= NULL;
705 CHECK_DO(url!= NULL,
return end_code);
711 uri_part= uri_parser_get_uri_part(url, SCHEME);
712 if(!(uri_part!= NULL && (uri_part_size= strlen(uri_part))> 0))
717 LOGE(
"Unknown protocol\n");
721 CHECK_DO(comm_if_nth!= NULL,
continue);
722 if(strcmp(comm_if_nth->scheme, uri_part)== 0)
729 uri_part= uri_parser_get_uri_part(url, HOSTTEXT);
730 if(!(uri_part!= NULL && (uri_part_size= strlen(uri_part))> 0)) {
738 uri_part= uri_parser_get_uri_part(url, PORTTEXT);
739 if(!(uri_part!= NULL && (uri_part_size= strlen(uri_part))> 0)) {
746 end_code= STAT_SUCCESS;
int comm_module_open(log_ctx_t *log_ctx)
Generic communication module.
pthread_mutex_t api_mutex
pthread_mutex_t module_api_mutex
struct comm_module_ctx_s comm_module_ctx_t
General status codes enumeration.
#define CHECK_DO(COND, ACTION)
#define LLIST_RELEASE(ref_llist_head, node_release_fxn, node_type)
int llist_push(llist_t **ref_llist_head, void *data)
Simple linked-list utility implementation.
void * llist_pop(llist_t **ref_llist_head)
static comm_module_ctx_t * comm_module_ctx
enum comm_mode_enum comm_mode_t
int comm_module_opt(const char *tag,...)
const comm_if_t * comm_if