From 5a90d0e758ee3dcf34fc5ecf852093ad38d50186 Mon Sep 17 00:00:00 2001 From: "Mytnyk, VolodymyrX" Date: Fri, 28 Oct 2016 11:18:17 +0100 Subject: [PATCH] ovs_events: Address PR comments - Change configuration format to suggested one; - Fix init/destroy API; - Fix memory leaks; - Code-clean-up. Change-Id: I1ff94271b777c69f3d07a66f43dc10d034e71101 Signed-off-by: Mytnyk, VolodymyrX --- src/collectd.conf.in | 4 +- src/collectd.conf.pod | 39 ++--- src/ovs_events.c | 112 ++++++------ src/utils_ovs.c | 461 ++++++++++++++++++++++++++++++-------------------- src/utils_ovs.h | 6 +- 5 files changed, 353 insertions(+), 269 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 883a079f..e754fa8e 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -993,7 +993,9 @@ # # -# OvsDbAddress "127.0.0.1" "6640" +# Port "6640" +# Address "127.0.0.1" +# Socket "/var/run/openvswitch/db.sock" # Interfaces "br0" "veth0" # SendNotification false # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 061c4baf..210e1073 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -5463,7 +5463,9 @@ notification. B - OvsDbAddress "127.0.0.1" "6640" + Port "6640" + Address "127.0.0.1" + Socket "/var/run/openvswitch/db.sock" Interfaces "br0" "veth0" SendNotification false @@ -5472,31 +5474,26 @@ The plugin provides the following configuration options: =over 4 -=item B I I +=item B
I -The address of OVS DB server JSON-RPC interface used by the plugin. -To enable the interface, OVS DB daemon should be running with '--remote=ptcp:' -or '--remote=punix:' option. See L for more details. The -address arguments must take one of the following forms: +The address of OVS DB server JSON-RPC interface used by the plugin. To enable +the interface, OVS DB daemon should be running with '--remote=ptcp:' option. +See L for more details. The option may be either network +hostname, IPv4 numbers-and-dots notation or IPv6 hexadecimal string format. +Defaults to 'localhost'. -=over 4 - -=item I - -The I argument of the address can be either network hostname, IPv4 -numbers-and-dots notation or IPv6 hexadecimal string format. In case of Unix -domain socket, the "Ifile" format should be used, where I is -the full name of OVS DB Unix domain socket. +=item B I -=item I +TCP-port to connect to. Either a service name or a port number may be given. +Please note that numerical port numbers must be given as a string. Defaults +to "6640". -The I argument of the address specifies the service name used to -connect to OVS DB. See L for more details. This argument is -skipped if Unix domain address is used. - -=back +=item B I -Default: C<"localhost" "6640"> +The UNIX domain socket path of OVS DB server JSON-RPC interface used by the +plugin. To enable the interface, the OVS DB daemon should be running with +'--remote=punix:' option. See L for more details. If this +option is set, B
and B options are ignored. =item B [I ...] diff --git a/src/ovs_events.c b/src/ovs_events.c index ce5254da..1f63a065 100644 --- a/src/ovs_events.c +++ b/src/ovs_events.c @@ -71,6 +71,7 @@ struct ovs_events_config_s { _Bool send_notification; /* sent notification to collectd? */ char ovs_db_node[OVS_DB_ADDR_NODE_SIZE]; /* OVS DB node */ char ovs_db_serv[OVS_DB_ADDR_SERVICE_SIZE]; /* OVS DB service */ + char ovs_db_unix[OVS_DB_ADDR_UNIX_SIZE]; /* OVS DB unix socket path */ ovs_events_iface_list_t *ifaces; /* interface info */ }; typedef struct ovs_events_config_s ovs_events_config_t; @@ -93,6 +94,7 @@ static ovs_events_ctx_t ovs_events_ctx = { .config = {.send_notification = 0, /* do not send notification */ .ovs_db_node = "localhost", /* use default OVS DB node */ .ovs_db_serv = "6640", /* use default OVS DB service */ + .ovs_db_unix = "", /* UNIX path empty by default */ .ifaces = NULL}, .ovs_db_select_params = NULL, .is_db_available = 0, @@ -137,9 +139,8 @@ static int ovs_events_config_iface_exists(const char *ifname) { static char *ovs_events_get_select_params() { int ret = 0; size_t buff_size = 0; - size_t offset = 0; - char *buff = NULL; - char *new_buff = NULL; + size_t buff_off = 0; + char *opt_buff = NULL; const char params_fmt[] = "[\"Open_vSwitch\"%s]"; const char option_fmt[] = ",{\"op\":\"select\",\"table\":\"Interface\"," "\"where\":[[\"name\",\"==\",\"%s\"]]," @@ -150,40 +151,40 @@ static char *ovs_events_get_select_params() { "\"external_ids\",\"name\",\"_uuid\"]}"; /* setup OVS DB interface condition */ for (ovs_events_iface_list_t *iface = ovs_events_ctx.config.ifaces; iface; - iface = iface->next, offset += ret) { + iface = iface->next, buff_off += ret) { /* allocate new buffer (format size + ifname len is good enough) */ buff_size += (sizeof(option_fmt) + strlen(iface->name)); - new_buff = realloc(buff, buff_size); - if (new_buff == NULL) - goto failure; - buff = new_buff; - ret = ssnprintf(buff + offset, buff_size, option_fmt, iface->name); - if (ret < 0) - goto failure; + char *new_buff = realloc(opt_buff, buff_size); + if (new_buff == NULL) { + sfree(opt_buff); + return NULL; + } + opt_buff = new_buff; + ret = ssnprintf(opt_buff + buff_off, buff_size - buff_off, option_fmt, + iface->name); + if (ret < 0) { + sfree(opt_buff); + return NULL; + } } /* if no interfaces are configured, use default params */ - if (buff == NULL) { - buff = strdup(default_opt); - offset = strlen(default_opt); - } + if (opt_buff == NULL) + opt_buff = strdup(default_opt); /* allocate memory for OVS DB select params */ - buff_size = offset + sizeof(params_fmt); - new_buff = malloc(buff_size); - if (new_buff == NULL) - goto failure; + size_t params_size = sizeof(params_fmt) + strlen(opt_buff); + char *params_buff = malloc(params_size); + if (params_buff == NULL) { + sfree(opt_buff); + return NULL; + } /* create OVS DB select params */ - if (ssnprintf(new_buff, buff_size, params_fmt, buff) < 0) - goto failure; + if (ssnprintf(params_buff, params_size, params_fmt, opt_buff) < 0) + sfree(params_buff); - sfree(buff); - return new_buff; - -failure: - sfree(new_buff); - sfree(buff); - return NULL; + sfree(opt_buff); + return params_buff; } /* Release memory allocated for configuration data */ @@ -206,31 +207,24 @@ static int ovs_events_plugin_config(oconfig_item_t *ci) { for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; if (strcasecmp("SendNotification", child->key) == 0) { - if (cf_util_get_boolean(child, &ovs_events_ctx.config.send_notification) < - 0) + if (cf_util_get_boolean(child, + &ovs_events_ctx.config.send_notification) != 0) + OVS_EVENTS_CONFIG_ERROR(child->key); + } else if (strcasecmp("Address", child->key) == 0) { + if (cf_util_get_string_buffer( + child, ovs_events_ctx.config.ovs_db_node, + sizeof(ovs_events_ctx.config.ovs_db_node)) != 0) + OVS_EVENTS_CONFIG_ERROR(child->key); + } else if (strcasecmp("Port", child->key) == 0) { + if (cf_util_get_string_buffer( + child, ovs_events_ctx.config.ovs_db_serv, + sizeof(ovs_events_ctx.config.ovs_db_serv)) != 0) + OVS_EVENTS_CONFIG_ERROR(child->key); + } else if (strcasecmp("Socket", child->key) == 0) { + if (cf_util_get_string_buffer( + child, ovs_events_ctx.config.ovs_db_unix, + sizeof(ovs_events_ctx.config.ovs_db_unix)) != 0) OVS_EVENTS_CONFIG_ERROR(child->key); - } else if (strcasecmp("OvsDbAddress", child->key) == 0) { - if (child->values_num < 1) { - ERROR(OVS_EVENTS_PLUGIN ": invalid OVS DB address specified"); - goto failure; - } - /* check node type and get the value */ - if (child->values[0].type != OCONFIG_TYPE_STRING) { - ERROR(OVS_EVENTS_PLUGIN ": OVS DB node is not a string"); - goto failure; - } - sstrncpy(ovs_events_ctx.config.ovs_db_node, child->values[0].value.string, - sizeof(ovs_events_ctx.config.ovs_db_node)); - /* get OVS DB address service name (optional) */ - if (child->values_num > 1) { - if (child->values[1].type != OCONFIG_TYPE_STRING) { - ERROR(OVS_EVENTS_PLUGIN ": OVS DB service is not a string"); - goto failure; - } - sstrncpy(ovs_events_ctx.config.ovs_db_serv, - child->values[1].value.string, - sizeof(ovs_events_ctx.config.ovs_db_serv)); - } } else if (strcasecmp("Interfaces", child->key) == 0) { for (int j = 0; j < child->values_num; j++) { /* check value type */ @@ -481,9 +475,9 @@ static void ovs_events_table_update_cb(yajl_val jupdates) { } } -/* OVD DB reply callback. It parses reply, receives +/* OVS DB reply callback. It parses reply, receives * interface information and dispatches the info to - * collecd + * collectd */ static void ovs_events_poll_result_cb(yajl_val jresult, yajl_val jerror) { yajl_val *jvalues = NULL; @@ -544,7 +538,7 @@ static void ovs_events_conn_initialize(ovs_db_t *pdb) { } } OVS_EVENTS_CTX_LOCK { ovs_events_ctx.is_db_available = 1; } - DEBUG(OVS_EVENTS_PLUGIN ": OVS DB has been initialized"); + DEBUG(OVS_EVENTS_PLUGIN ": OVS DB connection has been initialized"); } /* OVS DB terminate connection notification callback */ @@ -576,8 +570,9 @@ static int ovs_events_plugin_init(void) { ovs_db_callback_t cb = {.post_conn_init = ovs_events_conn_initialize, .post_conn_terminate = ovs_events_conn_terminate}; - DEBUG(OVS_EVENTS_PLUGIN ": OVS DB node = %s, service=%s", - ovs_events_ctx.config.ovs_db_node, ovs_events_ctx.config.ovs_db_serv); + DEBUG(OVS_EVENTS_PLUGIN ": OVS DB address=%s, service=%s, unix=%s", + ovs_events_ctx.config.ovs_db_node, ovs_events_ctx.config.ovs_db_serv, + ovs_events_ctx.config.ovs_db_unix); /* generate OVS DB select condition based on list on configured interfaces */ ovs_events_ctx.ovs_db_select_params = ovs_events_get_select_params(); @@ -588,7 +583,8 @@ static int ovs_events_plugin_init(void) { /* initialize OVS DB */ ovs_db = ovs_db_init(ovs_events_ctx.config.ovs_db_node, - ovs_events_ctx.config.ovs_db_serv, &cb); + ovs_events_ctx.config.ovs_db_serv, + ovs_events_ctx.config.ovs_db_unix, &cb); if (ovs_db == NULL) { ERROR(OVS_EVENTS_PLUGIN ": fail to connect to OVS DB server"); goto ovs_events_failure; diff --git a/src/utils_ovs.c b/src/utils_ovs.c index 2a4bdf83..2b148490 100644 --- a/src/utils_ovs.c +++ b/src/utils_ovs.c @@ -101,7 +101,6 @@ #define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */ #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */ #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch" -#define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */ #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */ #define OVS_DB_EVENT_TERMINATE 1 @@ -183,9 +182,14 @@ struct ovs_db_s { ovs_db_callback_t cb; char service[OVS_DB_ADDR_SERVICE_SIZE]; char node[OVS_DB_ADDR_NODE_SIZE]; + char unix_path[OVS_DB_ADDR_NODE_SIZE]; int sock; }; +/* Global variables */ +static uint64_t ovs_uid = 0; +static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER; + /* Post an event to event thread. * Possible events are: * OVS_DB_EVENT_TERMINATE @@ -209,19 +213,14 @@ static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) { return (state == OVS_DB_POLL_STATE_RUNNING); } -/* Terminate POLL thread */ -static void ovs_db_poll_terminate(ovs_db_t *pdb) { - pthread_mutex_lock(&pdb->poll_thread.mutex); - pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING; - pthread_mutex_unlock(&pdb->poll_thread.mutex); -} - /* Generate unique identifier (UID). It is used by OVS DB API * to set "id" field for any OVS DB JSON request. */ static uint64_t ovs_uid_generate() { - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX)); + uint64_t new_uid; + pthread_mutex_lock(&ovs_uid_mutex); + new_uid = ++ovs_uid; + pthread_mutex_unlock(&ovs_uid_mutex); + return new_uid; } /* @@ -242,10 +241,10 @@ static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) { /* Remove callback from OVS DB object */ static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) { + pthread_mutex_lock(&pdb->mutex); ovs_callback_t *pre_cb = del_cb->prev; ovs_callback_t *next_cb = del_cb->next; - pthread_mutex_lock(&pdb->mutex); if (next_cb) next_cb->prev = del_cb->prev; @@ -274,7 +273,7 @@ static void ovs_db_callback_remove_all(ovs_db_t *pdb) { * to requested callback otherwise NULL is returned. * * IMPORTANT NOTE: - * The OVS DB mutex should be locked by the caller + * The OVS DB mutex MUST be locked by the caller * to make sure that returned callback is still valid. */ static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) { @@ -468,34 +467,38 @@ static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) { /* check & get request attributes */ if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL || - (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) - goto ovs_failure; + (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) { + OVS_ERROR("invalid OVS DB request received"); + return (-1); + } /* check array length: [, ] */ - if (YAJL_GET_ARRAY(jparams)->len != 2) - goto ovs_failure; + if ((YAJL_GET_ARRAY(jparams) == NULL) || + (YAJL_GET_ARRAY(jparams)->len != 2)) { + OVS_ERROR("invalid OVS DB request received"); + return (-1); + } jvalue = YAJL_GET_ARRAY(jparams)->values[0]; jtable_updates = YAJL_GET_ARRAY(jparams)->values[1]; - if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) - goto ovs_failure; + if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) { + OVS_ERROR("invalid OVS DB request id or table update received"); + return (-1); + } /* find registered callback based on */ pthread_mutex_lock(&pdb->mutex); cb = ovs_db_table_callback_get(pdb, jvalue); if (cb == NULL || cb->table.call == NULL) { + OVS_ERROR("No OVS DB table update callback found"); pthread_mutex_unlock(&pdb->mutex); - goto ovs_failure; + return (-1); } /* call registered callback */ cb->table.call(jtable_updates); pthread_mutex_unlock(&pdb->mutex); return 0; - -ovs_failure: - OVS_ERROR("invalid OVS DB table update event"); - return (-1); } /* OVS DB result request handler. @@ -550,7 +553,7 @@ static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data, /* duplicate the data to make null-terminated string * required for yajl_tree_parse() */ - if ((sjson = malloc(len + 1)) == NULL) + if ((sjson = calloc(1, len + 1)) == NULL) return (-1); sstrncpy(sjson, data, len + 1); @@ -700,39 +703,33 @@ static void ovs_json_reader_free(ovs_json_reader_t *jreader) { } } -/* Reconnect to OVD DB and call the OVS DB post connection init callback +/* Reconnect to OVS DB and call the OVS DB post connection init callback * if connection has been established. */ -static int ovs_db_reconnect(ovs_db_t *pdb) { - char errbuff[OVS_ERROR_BUFF_SIZE]; +static void ovs_db_reconnect(ovs_db_t *pdb) { const char unix_prefix[] = "unix:"; - struct addrinfo *result, *rp; - _Bool is_connected = 0; + const char *node_info = pdb->node; + struct addrinfo *result; struct sockaddr_un saunix; - /* remove all registered OVS DB table/result callbacks */ - ovs_db_callback_remove_all(pdb); - - if (strncmp(pdb->node, unix_prefix, strlen(unix_prefix)) == 0) { - /* create unix socket address */ - rp = calloc(1, sizeof(struct addrinfo)); + if (pdb->unix_path[0] != '\0') { + /* use UNIX socket instead of INET address */ + node_info = pdb->unix_path; + result = calloc(1, sizeof(struct addrinfo)); struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un)); - if (rp == NULL || sa_unix == NULL) { - sfree(rp); + if (result == NULL || sa_unix == NULL) { + sfree(result); sfree(sa_unix); - return (1); + return; } - rp->ai_family = AF_UNIX; - rp->ai_socktype = SOCK_STREAM; - rp->ai_addrlen = sizeof(*sa_unix); - rp->ai_addr = (struct sockaddr *)sa_unix; - sa_unix->sun_family = rp->ai_family; - sstrncpy(sa_unix->sun_path, (pdb->node + strlen(unix_prefix)), - sizeof(sa_unix->sun_path)); - result = rp; + result->ai_family = AF_UNIX; + result->ai_socktype = SOCK_STREAM; + result->ai_addrlen = sizeof(*sa_unix); + result->ai_addr = (struct sockaddr *)sa_unix; + sa_unix->sun_family = result->ai_family; + sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path)); } else { - /* intet socket address */ - int ret = 0; + /* inet socket address */ struct addrinfo hints; /* setup criteria for selecting the socket address */ @@ -741,37 +738,37 @@ static int ovs_db_reconnect(ovs_db_t *pdb) { hints.ai_socktype = SOCK_STREAM; /* get socket addresses */ - if ((ret = getaddrinfo(pdb->node, pdb->service, &hints, &result)) != 0) { + int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result); + if (ret != 0) { OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret)); - return (1); + return; } } /* try to connect to the server */ - for (rp = result; rp != NULL; rp = rp->ai_next) { - if ((pdb->sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol)) < - 0) { + for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) { + char errbuff[OVS_ERROR_BUFF_SIZE]; + int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sock < 0) { sstrerror(errno, errbuff, sizeof(errbuff)); OVS_DEBUG("socket(): %s", errbuff); continue; } - if (connect(pdb->sock, rp->ai_addr, rp->ai_addrlen) < 0) { + if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) { + close(sock); sstrerror(errno, errbuff, sizeof(errbuff)); OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family); - close(pdb->sock); } else { - is_connected = 1; + /* send notification to event thread */ + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED); + pdb->sock = sock; break; } } - /* send notification to event thread */ - if (is_connected) - ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED); - else - OVS_ERROR("connect to \"%s\" failed", pdb->node); + if (pdb->sock < 0) + OVS_ERROR("connect to \"%s\" failed", node_info); freeaddrinfo(result); - return !is_connected; } /* POLL worker thread. @@ -782,89 +779,91 @@ static int ovs_db_reconnect(ovs_db_t *pdb) { static void *ovs_poll_worker(void *arg) { ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */ ovs_json_reader_t *jreader = NULL; - const char *json; - size_t json_len; - ssize_t nbytes = 0; - char buff[OVS_DB_POLL_READ_BLOCK_SIZE]; - struct pollfd poll_fd; - int poll_ret = 0; + struct pollfd poll_fd = { + .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0, + }; + /* create JSON reader instance */ if ((jreader = ovs_json_reader_alloc()) == NULL) { OVS_ERROR("initialize json reader failed"); - goto thread_exit; + return (NULL); } - /* start polling data */ - poll_fd.fd = pdb->sock; - poll_fd.events = POLLIN | POLLPRI; - poll_fd.revents = 0; - /* poll data */ while (ovs_db_poll_is_running(pdb)) { - poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000); - if (poll_ret > 0) { - if (poll_fd.revents & POLLNVAL) { - /* invalid file descriptor, reconnect */ - if (ovs_db_reconnect(pdb) != 0) { - /* sleep awhile until next reconnect */ - usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000); - } - ovs_json_reader_reset(jreader); - poll_fd.fd = pdb->sock; - } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) { - /* connection is broken */ - OVS_ERROR("poll() peer closed its end of the channel"); - ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); - close(poll_fd.fd); - } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) { - /* read incoming data */ - nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0); - if (nbytes > 0) { - OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes); - ovs_json_reader_push_data(jreader, buff, nbytes); - while (!ovs_json_reader_pop(jreader, &json, &json_len)) - /* process JSON data */ - ovs_db_json_data_process(pdb, json, json_len); - } else if (nbytes == 0) { - OVS_ERROR("recv() peer has performed an orderly shutdown"); - ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); - close(poll_fd.fd); - } else { - OVS_ERROR("recv() receive data error"); - break; - } - } /* poll() POLLIN & POLLPRI */ - } else if (poll_ret == 0) - OVS_DEBUG("poll() timeout"); - else { - OVS_ERROR("poll() error"); + char errbuff[OVS_ERROR_BUFF_SIZE]; + poll_fd.fd = pdb->sock; + int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000); + if (poll_ret < 0) { + sstrerror(errno, errbuff, sizeof(errbuff)); + OVS_ERROR("poll(): %s", errbuff); break; + } else if (poll_ret == 0) { + OVS_DEBUG("poll(): timeout"); + if (pdb->sock < 0) + /* invalid fd, so try to reconnect */ + ovs_db_reconnect(pdb); + continue; + } + if (poll_fd.revents & POLLNVAL) { + /* invalid file descriptor, clean-up */ + ovs_db_callback_remove_all(pdb); + ovs_json_reader_reset(jreader); + /* setting poll FD to -1 tells poll() call to ignore this FD. + * In that case poll() call will return timeout all the time */ + pdb->sock = (-1); + } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) { + /* connection is broken */ + close(poll_fd.fd); + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); + OVS_ERROR("poll() peer closed its end of the channel"); + } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) { + /* read incoming data */ + char buff[OVS_DB_POLL_READ_BLOCK_SIZE]; + ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0); + if (nbytes < 0) { + sstrerror(errno, errbuff, sizeof(errbuff)); + OVS_ERROR("recv(): %s", errbuff); + /* read error? Try to reconnect */ + close(poll_fd.fd); + continue; + } else if (nbytes == 0) { + close(poll_fd.fd); + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); + OVS_ERROR("recv() peer has performed an orderly shutdown"); + continue; + } + /* read incoming data */ + size_t json_len = 0; + const char *json = NULL; + OVS_DEBUG("recv(): received %zd bytes of data", nbytes); + ovs_json_reader_push_data(jreader, buff, nbytes); + while (!ovs_json_reader_pop(jreader, &json, &json_len)) + /* process JSON data */ + ovs_db_json_data_process(pdb, json, json_len); } } -thread_exit: OVS_DEBUG("poll thread has been completed"); ovs_json_reader_free(jreader); - pthread_exit((void *)0); - return ((void *)0); + return (NULL); } /* EVENT worker thread. * Perform task based on incoming events. This * task can be done asynchronously which allows to - * handle OVD DB callback like 'init_cb'. + * handle OVS DB callback like 'init_cb'. */ static void *ovs_event_worker(void *arg) { - int ret = 0; ovs_db_t *pdb = (ovs_db_t *)arg; - struct timespec ts; while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) { /* wait for an event */ + struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += (OVS_DB_EVENT_TIMEOUT); - ret = pthread_cond_timedwait(&pdb->event_thread.cond, - &pdb->event_thread.mutex, &ts); + int ret = pthread_cond_timedwait(&pdb->event_thread.cond, + &pdb->event_thread.mutex, &ts); if (!ret) { /* handle the event */ OVS_DEBUG("handle event %d", pdb->event_thread.value); @@ -892,28 +891,94 @@ static void *ovs_event_worker(void *arg) { } } -thread_exit: OVS_DEBUG("event thread has been completed"); - pthread_exit((void *)0); - return ((void *)0); + return (NULL); +} + +/* Initialize EVENT thread */ +static int ovs_db_event_thread_init(ovs_db_t *pdb) { + pdb->event_thread.tid = -1; + /* init event thread condition variable */ + if (pthread_cond_init(&pdb->event_thread.cond, NULL)) { + return (-1); + } + /* init event thread mutex */ + if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) { + pthread_cond_destroy(&pdb->event_thread.cond); + return (-1); + } + /* Hold the event thread mutex. It ensures that no events + * will be lost while thread is still starting. Once event + * thread is started and ready to accept events, it will release + * the mutex */ + if (pthread_mutex_lock(&pdb->event_thread.mutex)) { + pthread_mutex_destroy(&pdb->event_thread.mutex); + pthread_cond_destroy(&pdb->event_thread.cond); + return (-1); + } + /* start event thread */ + pthread_t tid; + if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb) != 0) { + pthread_mutex_unlock(&pdb->event_thread.mutex); + pthread_mutex_destroy(&pdb->event_thread.mutex); + pthread_cond_destroy(&pdb->event_thread.cond); + return (-1); + } + pdb->event_thread.tid = tid; + return (0); } -/* Stop EVENT thread */ -static int ovs_db_event_thread_stop(ovs_db_t *pdb) { +/* Destroy EVENT thread */ +static int ovs_db_event_thread_destroy(ovs_db_t *pdb) { + if (pdb->event_thread.tid < 0) + /* already destroyed */ + return (0); ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE); if (pthread_join(pdb->event_thread.tid, NULL) != 0) return (-1); + /* Event thread always holds the thread mutex when + * performs some task (handles event) and releases it when + * while sleeping. Thus, if event thread exits, the mutex + * remains locked */ pthread_mutex_unlock(&pdb->event_thread.mutex); pthread_mutex_destroy(&pdb->event_thread.mutex); + pthread_cond_destroy(&pdb->event_thread.cond); + pdb->event_thread.tid = -1; + return (0); +} + +/* Initialize POLL thread */ +static int ovs_db_poll_thread_init(ovs_db_t *pdb) { + pdb->poll_thread.tid = -1; + /* init event thread mutex */ + if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) { + return (-1); + } + /* start poll thread */ + pthread_t tid; + pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING; + if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb) != 0) { + pthread_mutex_destroy(&pdb->poll_thread.mutex); + return (-1); + } + pdb->poll_thread.tid = tid; return (0); } -/* Stop POLL thread */ -static int ovs_db_poll_thread_stop(ovs_db_t *pdb) { - ovs_db_poll_terminate(pdb); +/* Destroy POLL thread */ +static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) { + if (pdb->poll_thread.tid < 0) + /* already destroyed */ + return (0); + /* change thread state */ + pthread_mutex_lock(&pdb->poll_thread.mutex); + pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING; + pthread_mutex_unlock(&pdb->poll_thread.mutex); + /* join the thread */ if (pthread_join(pdb->poll_thread.tid, NULL) != 0) return (-1); pthread_mutex_destroy(&pdb->poll_thread.mutex); + pdb->poll_thread.tid = -1; return (0); } @@ -922,72 +987,62 @@ static int ovs_db_poll_thread_stop(ovs_db_t *pdb) { */ ovs_db_t *ovs_db_init(const char *node, const char *service, - ovs_db_callback_t *cb) { - pthread_mutexattr_t mutex_attr; - ovs_db_t *pdb = NULL; - - /* allocate db data & fill it */ - if ((pdb = calloc(1, sizeof(*pdb))) == NULL) + const char *unix_path, ovs_db_callback_t *cb) { + /* sanity check */ + if (node == NULL || service == NULL || unix_path == NULL) return (NULL); - /* node cannot be unset */ - if (node == NULL || strlen(node) == 0) + /* allocate db data & fill it */ + ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb)); + if (pdb == NULL) return (NULL); /* store the OVS DB address */ sstrncpy(pdb->node, node, sizeof(pdb->node)); - if (service != NULL) - sstrncpy(pdb->service, service, sizeof(pdb->service)); + sstrncpy(pdb->service, service, sizeof(pdb->service)); + sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path)); /* setup OVS DB callbacks */ if (cb) pdb->cb = *cb; - /* prepare event thread */ - pthread_cond_init(&pdb->event_thread.cond, NULL); - pthread_mutex_init(&pdb->event_thread.mutex, NULL); - pthread_mutex_lock(&pdb->event_thread.mutex); - if (plugin_thread_create(&pdb->event_thread.tid, NULL, ovs_event_worker, - pdb) != 0) { - OVS_ERROR("event worker start failed"); - goto failure; + /* init OVS DB mutex attributes */ + pthread_mutexattr_t mutex_attr; + if (pthread_mutexattr_init(&mutex_attr)) { + OVS_ERROR("OVS DB mutex attribute init failed"); + sfree(pdb); + return (NULL); } - - /* prepare polling thread */ - ovs_db_reconnect(pdb); - pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING; - pthread_mutex_init(&pdb->poll_thread.mutex, NULL); - if (plugin_thread_create(&pdb->poll_thread.tid, NULL, ovs_poll_worker, pdb) != - 0) { - OVS_ERROR("pull worker start failed"); - goto failure; + /* set OVS DB mutex as recursive */ + if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) { + OVS_ERROR("Failed to set OVS DB mutex as recursive"); + pthread_mutexattr_destroy(&mutex_attr); + sfree(pdb); + return (NULL); } - /* init OVS DB mutex */ - if (pthread_mutexattr_init(&mutex_attr) || - pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) || - pthread_mutex_init(&pdb->mutex, &mutex_attr)) { + if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) { OVS_ERROR("OVS DB mutex init failed"); - goto failure; + pthread_mutexattr_destroy(&mutex_attr); + sfree(pdb); + return (NULL); } + /* destroy mutex attributes */ + pthread_mutexattr_destroy(&mutex_attr); - /* return db to the caller */ - return pdb; + /* init event thread */ + if (ovs_db_event_thread_init(pdb) < 0) { + ovs_db_destroy(pdb); + return (NULL); + } -failure: - if (pdb->sock) - /* close connection */ - close(pdb->sock); - if (pdb->event_thread.tid != 0) - /* stop event thread */ - if (ovs_db_event_thread_stop(pdb) < 0) - OVS_ERROR("stop event thread failed"); - if (pdb->poll_thread.tid != 0) - /* stop poll thread */ - if (ovs_db_poll_thread_stop(pdb) < 0) - OVS_ERROR("stop poll thread failed"); - sfree(pdb); - return NULL; + /* init polling thread */ + pdb->sock = -1; + if (ovs_db_poll_thread_init(pdb) < 0) { + ovs_db_destroy(pdb); + return (NULL); + } + return pdb; } int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params, @@ -1038,7 +1093,7 @@ int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params, if (cb) { /* register result callback */ - if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL) + if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL) goto yajl_gen_failure; /* add new callback to front */ @@ -1095,12 +1150,15 @@ int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name, if (pdb == NULL || tb_name == NULL || update_cb == NULL) return (-1); - if ((jgen = yajl_gen_alloc(NULL)) == NULL) + /* allocate new update callback */ + if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL) return (-1); - /* register table update callback */ - if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL) + /* init YAJL generator */ + if ((jgen = yajl_gen_alloc(NULL)) == NULL) { + sfree(new_cb); return (-1); + } /* add new callback to front */ new_cb->table.call = update_cb; @@ -1187,18 +1245,18 @@ int ovs_db_destroy(ovs_db_t *pdb) { /* try to lock the structure before releasing */ if ((ret = pthread_mutex_lock(&pdb->mutex))) { - OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret); + OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret); return (-1); } /* stop poll thread */ - if (ovs_db_event_thread_stop(pdb) < 0) { - OVS_ERROR("stop poll thread failed"); + if (ovs_db_event_thread_destroy(pdb) < 0) { + OVS_ERROR("destroy poll thread failed"); ovs_db_ret = (-1); } /* stop event thread */ - if (ovs_db_poll_thread_stop(pdb) < 0) { + if (ovs_db_poll_thread_destroy(pdb) < 0) { OVS_ERROR("stop event thread failed"); ovs_db_ret = (-1); } @@ -1207,7 +1265,7 @@ int ovs_db_destroy(ovs_db_t *pdb) { ovs_db_callback_remove_all(pdb); /* close connection */ - if (pdb->sock) + if (pdb->sock >= 0) close(pdb->sock); /* release DB handler */ @@ -1221,7 +1279,14 @@ int ovs_db_destroy(ovs_db_t *pdb) { * Public OVS utils API implementation */ -/* Get YAJL value by key from YAJL dictionary */ +/* Get YAJL value by key from YAJL dictionary + * + * EXAMPLE: + * { + * "key_a" : + * "key_b" : + * } + */ yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) { const char *obj_key = NULL; @@ -1239,7 +1304,29 @@ yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) { return NULL; } -/* Get OVS DB map value by given map key */ +/* Get OVS DB map value by given map key + * + * FROM RFC7047: + * + * + * A 2-element JSON array that represents a pair within a database + * map. The first element is an that represents the key, and + * the second element is an that represents the value. + * + * + * A 2-element JSON array that represents a database map value. The + * first element of the array must be the string "map", and the + * second element must be an array of zero or more s giving the + * values in the map. All of the s must have the same key and + * value types. + * + * EXAMPLE: + * [ + * "map", [ + * [ "key_a", ], [ "key_b", ], ... + * ] + * ] + */ yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) { size_t map_len = 0; size_t array_len = 0; diff --git a/src/utils_ovs.h b/src/utils_ovs.h index e90bda31..52c2f915 100644 --- a/src/utils_ovs.h +++ b/src/utils_ovs.h @@ -86,7 +86,7 @@ struct ovs_db_callback_s { */ void (*post_conn_init)(ovs_db_t *pdb); /* - * This callback is called when OVD DB connection + * This callback is called when OVS DB connection * has been lost. This field can be NULL. */ void (*post_conn_terminate)(void); @@ -96,6 +96,7 @@ typedef struct ovs_db_callback_s ovs_db_callback_t; /* OVS DB defines */ #define OVS_DB_ADDR_NODE_SIZE 256 #define OVS_DB_ADDR_SERVICE_SIZE 128 +#define OVS_DB_ADDR_UNIX_SIZE 108 /* OVS DB prototypes */ @@ -110,13 +111,14 @@ typedef struct ovs_db_callback_s ovs_db_callback_t; * PARAMETERS * `node' OVS DB Address. * `service' OVS DB service name. + * `unix' OVS DB unix socket path. * `cb' OVS DB callbacks. * * RETURN VALUE * New ovs_db_t object upon success or NULL if an error occurred. */ ovs_db_t *ovs_db_init(const char *node, const char *service, - ovs_db_callback_t *cb); + const char *unix_path, ovs_db_callback_t *cb); /* * NAME -- 2.11.0