2 * collectd - src/utils_ovs.c
4 * Copyright(c) 2016 Intel Corporation. All rights reserved.
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * this software and associated documentation files (the "Software"), to deal in
9 * the Software without restriction, including without limitation the rights to
10 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
11 * of the Software, and to permit persons to whom the Software is furnished to
13 * so, subject to the following conditions:
15 * The above copyright notice and this permission notice shall be included in
17 * copies or substantial portions of the Software.
19 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
31 /* clang-format off */
33 * OVS DB API internal architecture diagram
34 * +------------------------------------------------------------------------------+
35 * |OVS plugin |OVS utils |
36 * | | +------------------------+ |
37 * | | | echo handler | JSON request/ |
38 * | | +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
39 * | | | | | | | result |
40 * | | | +------------------------+ | | |
41 * | | | | +----+---+--------+ |
42 * | +----------+ | | +------------------------+ | | | | |
43 * | | update | | | | update handler | | | YAJL | JSON | |
44 * | | callback +<-------+(ovs_db_table_update_cp)+<---+ | parser | reader | |
45 * | +----------+ | | | | | | | | |
46 * | | | +------------------------+ | +--------+---+----+ |
48 * | +----------+ | | +------------------------+ | | |
49 * | | result | | | | result handler | | | |
50 * | | callback +<-------+ (ovs_db_result_cb) +<---+ JSON raw | |
51 * | +----------+ | | | | data | |
52 * | | | +------------------------+ | |
54 * | | | +------------------+ +------------+----+ |
55 * | +----------+ | | |thread| | |thread| | |
56 * | | init | | | | | reconnect | | |
57 * | | callback +<---------+ EVENT WORKER +<------------+ POLL WORKER | |
58 * | +----------+ | | +------------------+ +--------+--------+ |
60 * +----------------+-------------------------------------------------------------+
62 * JSON|echo reply raw|data
64 * +-------------------+----------------------------------------------+-----------+
66 * +-------------------------------------------------------------------------------
70 /* collectd headers */
76 #include "utils_ovs.h"
78 /* system libraries */
83 #include <arpa/inet.h>
92 #include <semaphore.h>
94 #define OVS_ERROR(fmt, ...) \
96 ERROR("ovs_utils: " fmt, ##__VA_ARGS__); \
98 #define OVS_DEBUG(fmt, ...) \
100 DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__, \
104 #define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
105 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
106 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
108 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
109 #define OVS_DB_EVENT_TERMINATE 1
110 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
111 #define OVS_DB_EVENT_CONN_TERMINATED 3
113 #define OVS_DB_POLL_STATE_RUNNING 1
114 #define OVS_DB_POLL_STATE_EXITING 2
116 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
118 #define OVS_YAJL_CALL(func, ...) \
120 yajl_gen_ret = yajl_gen_status_ok; \
121 if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
122 goto yajl_gen_failure; \
124 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
125 #define OVS_ERROR_BUFF_SIZE 512
126 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
128 /* JSON reader internal data */
129 struct ovs_json_reader_s {
135 typedef struct ovs_json_reader_s ovs_json_reader_t;
137 /* Result callback declaration */
138 struct ovs_result_cb_s {
140 ovs_db_result_cb_t call;
142 typedef struct ovs_result_cb_s ovs_result_cb_t;
144 /* Table callback declaration */
145 struct ovs_table_cb_s {
146 ovs_db_table_cb_t call;
148 typedef struct ovs_table_cb_s ovs_table_cb_t;
150 /* Callback declaration */
151 struct ovs_callback_s {
154 ovs_result_cb_t result;
155 ovs_table_cb_t table;
157 struct ovs_callback_s *next;
158 struct ovs_callback_s *prev;
160 typedef struct ovs_callback_s ovs_callback_t;
162 /* Event thread data declaration */
163 struct ovs_event_thread_s {
165 pthread_mutex_t mutex;
169 typedef struct ovs_event_thread_s ovs_event_thread_t;
171 /* Poll thread data declaration */
172 struct ovs_poll_thread_s {
174 pthread_mutex_t mutex;
177 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
179 /* OVS DB internal data declaration */
181 ovs_poll_thread_t poll_thread;
182 ovs_event_thread_t event_thread;
183 pthread_mutex_t mutex;
184 ovs_callback_t *remote_cb;
185 ovs_db_callback_t cb;
186 char service[OVS_DB_ADDR_SERVICE_SIZE];
187 char node[OVS_DB_ADDR_NODE_SIZE];
188 char unix_path[OVS_DB_ADDR_NODE_SIZE];
192 /* Global variables */
193 static uint64_t ovs_uid = 0;
194 static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
196 /* Post an event to event thread.
197 * Possible events are:
198 * OVS_DB_EVENT_TERMINATE
199 * OVS_DB_EVENT_CONN_ESTABLISHED
200 * OVS_DB_EVENT_CONN_TERMINATED
202 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
203 pthread_mutex_lock(&pdb->event_thread.mutex);
204 pdb->event_thread.value = event;
205 pthread_mutex_unlock(&pdb->event_thread.mutex);
206 pthread_cond_signal(&pdb->event_thread.cond);
209 /* Check if POLL thread is still running. Returns
210 * 1 if running otherwise 0 is returned */
211 static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
213 pthread_mutex_lock(&pdb->poll_thread.mutex);
214 state = pdb->poll_thread.state;
215 pthread_mutex_unlock(&pdb->poll_thread.mutex);
216 return state == OVS_DB_POLL_STATE_RUNNING;
219 /* Generate unique identifier (UID). It is used by OVS DB API
220 * to set "id" field for any OVS DB JSON request. */
221 static uint64_t ovs_uid_generate() {
223 pthread_mutex_lock(&ovs_uid_mutex);
225 pthread_mutex_unlock(&ovs_uid_mutex);
230 * Callback API. These function are used to store
231 * registered callbacks in OVS DB API.
234 /* Add new callback into OVS DB object */
235 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
236 pthread_mutex_lock(&pdb->mutex);
238 pdb->remote_cb->prev = new_cb;
239 new_cb->next = pdb->remote_cb;
241 pdb->remote_cb = new_cb;
242 pthread_mutex_unlock(&pdb->mutex);
245 /* Remove callback from OVS DB object */
246 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
247 pthread_mutex_lock(&pdb->mutex);
248 ovs_callback_t *pre_cb = del_cb->prev;
249 ovs_callback_t *next_cb = del_cb->next;
252 next_cb->prev = del_cb->prev;
255 pre_cb->next = del_cb->next;
257 pdb->remote_cb = del_cb->next;
260 pthread_mutex_unlock(&pdb->mutex);
263 /* Remove all callbacks form OVS DB object */
264 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
265 pthread_mutex_lock(&pdb->mutex);
266 while (pdb->remote_cb != NULL) {
267 ovs_callback_t *del_cb = pdb->remote_cb;
268 pdb->remote_cb = del_cb->next;
271 pthread_mutex_unlock(&pdb->mutex);
274 /* Get/find callback in OVS DB object by UID. Returns pointer
275 * to requested callback otherwise NULL is returned.
278 * The OVS DB mutex MUST be locked by the caller
279 * to make sure that returned callback is still valid.
281 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
282 for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
288 /* Send all requested data to the socket. Returns 0 if
289 * ALL request data has been sent otherwise negative value
291 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
297 if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
299 rem -= (size_t)nbytes;
300 off += (size_t)nbytes;
306 * YAJL (Yet Another JSON Library) helper functions
307 * Documentation (https://lloyd.github.io/yajl/)
310 /* Add null-terminated string into YAJL generator handle (JSON object).
311 * Similar function to yajl_gen_string() but takes null-terminated string
312 * instead of string and its length.
314 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
315 * string - Null-terminated string
317 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
318 const char *string) {
319 return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
322 /* Add YAJL value into YAJL generator handle (JSON object)
324 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
325 * jval - YAJL value usually returned by yajl_tree_get()
327 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
328 size_t array_len = 0;
329 yajl_val *jvalues = NULL;
330 yajl_val jobj_value = NULL;
331 const char *obj_key = NULL;
333 yajl_gen_status yajl_gen_ret = yajl_gen_status_ok;
336 return yajl_gen_generation_complete;
338 if (YAJL_IS_STRING(jval))
339 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
340 else if (YAJL_IS_DOUBLE(jval))
341 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
342 else if (YAJL_IS_INTEGER(jval))
343 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
344 else if (YAJL_IS_TRUE(jval))
345 OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
346 else if (YAJL_IS_FALSE(jval))
347 OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
348 else if (YAJL_IS_NULL(jval))
349 OVS_YAJL_CALL(yajl_gen_null, jgen);
350 else if (YAJL_IS_ARRAY(jval)) {
351 /* create new array and add all elements into the array */
352 array_len = YAJL_GET_ARRAY(jval)->len;
353 jvalues = YAJL_GET_ARRAY(jval)->values;
354 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
355 for (size_t i = 0; i < array_len; i++)
356 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
357 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
358 } else if (YAJL_IS_OBJECT(jval)) {
359 /* create new object and add all elements into the object */
360 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
361 obj_len = YAJL_GET_OBJECT(jval)->len;
362 for (size_t i = 0; i < obj_len; i++) {
363 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
364 jobj_value = YAJL_GET_OBJECT(jval)->values[i];
365 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
366 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
368 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
370 OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
372 goto yajl_gen_failure;
374 return yajl_gen_status_ok;
377 OVS_ERROR("%s() error to generate value", __FUNCTION__);
381 /* OVS DB echo request handler. When OVS DB sends
382 * "echo" request to the client, client should generate
383 * "echo" replay with the same content received in the
385 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
390 const char *resp = NULL;
391 const char *params_path[] = {"params", NULL};
392 const char *id_path[] = {"id", NULL};
393 yajl_gen_status yajl_gen_ret;
395 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
398 /* check & get request attributes */
399 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
400 ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
401 OVS_ERROR("parse echo request failed");
402 goto yajl_gen_failure;
405 /* generate JSON echo response */
406 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
408 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
409 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
411 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
412 OVS_YAJL_CALL(yajl_gen_null, jgen);
414 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
415 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
417 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
418 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
421 /* send the response */
422 OVS_DEBUG("response: %s", resp);
423 if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
424 OVS_ERROR("send echo reply failed");
425 goto yajl_gen_failure;
427 /* clean up and return success */
428 yajl_gen_clear(jgen);
433 yajl_gen_clear(jgen);
437 /* Get OVS DB registered callback by YAJL val. The YAJL
438 * value should be YAJL string (UID). Returns NULL if
439 * callback hasn't been found. See also ovs_db_callback_get()
440 * description for addition info.
442 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
444 const char *suid = NULL;
447 if (jid && YAJL_IS_STRING(jid)) {
448 suid = YAJL_GET_STRING(jid);
449 uid = (uint64_t)strtoul(suid, &endptr, 16);
450 if (*endptr == '\0' && uid)
451 return ovs_db_callback_get(pdb, uid);
457 /* OVS DB table update event handler.
458 * This callback is called by POLL thread if OVS DB
459 * table update callback is received from the DB
460 * server. Once registered callback found, it's called
461 * by this handler. */
462 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
463 ovs_callback_t *cb = NULL;
466 yajl_val jtable_updates;
467 const char *params_path[] = {"params", NULL};
468 const char *id_path[] = {"id", NULL};
470 /* check & get request attributes */
471 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
472 (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
473 OVS_ERROR("invalid OVS DB request received");
477 /* check array length: [<json-value>, <table-updates>] */
478 if ((YAJL_GET_ARRAY(jparams) == NULL) ||
479 (YAJL_GET_ARRAY(jparams)->len != 2)) {
480 OVS_ERROR("invalid OVS DB request received");
484 jvalue = YAJL_GET_ARRAY(jparams)->values[0];
485 jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
486 if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
487 OVS_ERROR("invalid OVS DB request id or table update received");
491 /* find registered callback based on <json-value> */
492 pthread_mutex_lock(&pdb->mutex);
493 cb = ovs_db_table_callback_get(pdb, jvalue);
494 if (cb == NULL || cb->table.call == NULL) {
495 OVS_ERROR("No OVS DB table update callback found");
496 pthread_mutex_unlock(&pdb->mutex);
500 /* call registered callback */
501 cb->table.call(jtable_updates);
502 pthread_mutex_unlock(&pdb->mutex);
506 /* OVS DB result request handler.
507 * This callback is called by POLL thread if OVS DB
508 * result reply is received from the DB server.
509 * Once registered callback found, it's called
510 * by this handler. */
511 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
512 ovs_callback_t *cb = NULL;
516 const char *result_path[] = {"result", NULL};
517 const char *error_path[] = {"error", NULL};
518 const char *id_path[] = {"id", NULL};
520 jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
521 jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
522 jid = yajl_tree_get(jnode, id_path, yajl_t_string);
524 /* check & get result attributes */
525 if (!jresult || !jerror || !jid)
528 /* try to find registered callback */
529 pthread_mutex_lock(&pdb->mutex);
530 cb = ovs_db_table_callback_get(pdb, jid);
531 if (cb != NULL && cb->result.call != NULL) {
532 /* call registered callback */
533 cb->result.call(jresult, jerror);
534 /* unlock owner of the reply */
535 sem_post(&cb->result.sync);
538 pthread_mutex_unlock(&pdb->mutex);
542 /* Handle JSON data (one request) and call
543 * appropriate event OVS DB handler. Currently,
544 * update callback 'ovs_db_table_update_cb' and
545 * result callback 'ovs_db_result_cb' is supported.
547 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
549 const char *method = NULL;
550 char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
551 const char *method_path[] = {"method", NULL};
552 const char *result_path[] = {"result", NULL};
554 yajl_val jnode, jval;
556 /* duplicate the data to make null-terminated string
557 * required for yajl_tree_parse() */
558 if ((sjson = calloc(1, len + 1)) == NULL)
561 sstrncpy(sjson, data, len + 1);
562 OVS_DEBUG("[len=%zu] %s", len, sjson);
564 /* parse json data */
565 jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
567 OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
572 /* get method name */
573 if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
574 if ((method = YAJL_GET_STRING(jval)) == NULL) {
575 yajl_tree_free(jnode);
579 if (strcmp("echo", method) == 0) {
580 /* echo request from the server */
581 if (ovs_db_table_echo_cb(pdb, jnode) < 0)
582 OVS_ERROR("handle echo request failed");
583 } else if (strcmp("update", method) == 0) {
584 /* update notification */
585 if (ovs_db_table_update_cb(pdb, jnode) < 0)
586 OVS_ERROR("handle update notification failed");
588 } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
589 /* result notification */
590 if (ovs_db_result_cb(pdb, jnode) < 0)
591 OVS_ERROR("handle result reply failed");
593 OVS_ERROR("connot find method or result failed");
596 yajl_tree_free(jnode);
602 * JSON reader implementation.
604 * This module process raw JSON data (byte stream) and
605 * returns fully-fledged JSON data which can be processed
606 * (parsed) by YAJL later.
609 /* Allocate JSON reader instance */
610 static ovs_json_reader_t *ovs_json_reader_alloc() {
611 ovs_json_reader_t *jreader = NULL;
613 if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
619 /* Push raw data into into the JSON reader for processing */
620 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
621 const char *data, size_t data_len) {
622 char *new_buff = NULL;
623 size_t available = jreader->buff_size - jreader->buff_offset;
625 /* check/update required memory space */
626 if (available < data_len) {
627 OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
628 (int)jreader->buff_size, (int)available, (int)data_len);
630 /* allocate new chunk of memory */
631 new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
632 if (new_buff == NULL)
635 /* point to new allocated memory */
636 jreader->buff_ptr = new_buff;
637 jreader->buff_size += data_len;
640 /* store input data */
641 memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
642 jreader->buff_offset += data_len;
646 /* Pop one fully-fledged JSON if already exists. Returns 0 if
647 * completed JSON already exists otherwise negative value is
649 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
650 const char **json_ptr, size_t *json_len_ptr) {
655 /* search open/close brace */
656 for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) {
657 if (jreader->buff_ptr[i] == '{') {
659 } else if (jreader->buff_ptr[i] == '}')
663 *json_ptr = jreader->buff_ptr + jreader->json_offset;
664 *json_len_ptr = json_len + 1;
665 jreader->json_offset = i + 1;
669 /* increase JSON data length */
674 if (jreader->json_offset) {
675 if (jreader->json_offset < jreader->buff_offset) {
676 /* shift data to the beginning of the buffer
677 * and zero rest of the buffer data */
678 json = &jreader->buff_ptr[jreader->json_offset];
679 json_len = jreader->buff_offset - jreader->json_offset;
680 for (size_t i = 0; i < jreader->buff_size; i++)
681 jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
682 jreader->buff_offset = json_len;
684 /* reset the buffer */
685 jreader->buff_offset = 0;
687 /* data is at the beginning of the buffer */
688 jreader->json_offset = 0;
694 /* Reset JSON reader. It is useful when start processing
695 * new raw data. E.g.: in case of lost stream connection.
697 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
699 jreader->buff_offset = 0;
700 jreader->json_offset = 0;
704 /* Release internal data allocated for JSON reader */
705 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
707 free(jreader->buff_ptr);
712 /* Reconnect to OVS DB and call the OVS DB post connection init callback
713 * if connection has been established.
715 static void ovs_db_reconnect(ovs_db_t *pdb) {
716 const char *node_info = pdb->node;
717 struct addrinfo *result;
719 if (pdb->unix_path[0] != '\0') {
720 /* use UNIX socket instead of INET address */
721 node_info = pdb->unix_path;
722 result = calloc(1, sizeof(struct addrinfo));
723 struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
724 if (result == NULL || sa_unix == NULL) {
729 result->ai_family = AF_UNIX;
730 result->ai_socktype = SOCK_STREAM;
731 result->ai_addrlen = sizeof(*sa_unix);
732 result->ai_addr = (struct sockaddr *)sa_unix;
733 sa_unix->sun_family = result->ai_family;
734 sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
736 /* inet socket address */
737 struct addrinfo hints;
739 /* setup criteria for selecting the socket address */
740 memset(&hints, 0, sizeof(hints));
741 hints.ai_family = AF_UNSPEC;
742 hints.ai_socktype = SOCK_STREAM;
744 /* get socket addresses */
745 int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
747 OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
751 /* try to connect to the server */
752 for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
753 int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
755 OVS_DEBUG("socket(): %s", STRERRNO);
758 if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
760 OVS_DEBUG("connect(): %s [family=%d]", STRERRNO, rp->ai_family);
762 /* send notification to event thread */
763 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
770 OVS_ERROR("connect to \"%s\" failed", node_info);
772 freeaddrinfo(result);
775 /* POLL worker thread.
776 * It listens on OVS DB connection for incoming
777 * requests/reply/events etc. Also, it reconnects to OVS DB
778 * if connection has been lost.
780 static void *ovs_poll_worker(void *arg) {
781 ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
782 ovs_json_reader_t *jreader = NULL;
783 struct pollfd poll_fd = {
784 .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
787 /* create JSON reader instance */
788 if ((jreader = ovs_json_reader_alloc()) == NULL) {
789 OVS_ERROR("initialize json reader failed");
794 while (ovs_db_poll_is_running(pdb)) {
795 poll_fd.fd = pdb->sock;
796 int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
798 OVS_ERROR("poll(): %s", STRERRNO);
800 } else if (poll_ret == 0) {
801 OVS_DEBUG("poll(): timeout");
803 /* invalid fd, so try to reconnect */
804 ovs_db_reconnect(pdb);
807 if (poll_fd.revents & POLLNVAL) {
808 /* invalid file descriptor, clean-up */
809 ovs_db_callback_remove_all(pdb);
810 ovs_json_reader_reset(jreader);
811 /* setting poll FD to -1 tells poll() call to ignore this FD.
812 * In that case poll() call will return timeout all the time */
814 } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
815 /* connection is broken */
817 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
818 OVS_ERROR("poll() peer closed its end of the channel");
819 } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
820 /* read incoming data */
821 char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
822 ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
824 OVS_ERROR("recv(): %s", STRERRNO);
825 /* read error? Try to reconnect */
828 } else if (nbytes == 0) {
830 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
831 OVS_ERROR("recv() peer has performed an orderly shutdown");
834 /* read incoming data */
836 const char *json = NULL;
837 OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
838 ovs_json_reader_push_data(jreader, buff, nbytes);
839 while (!ovs_json_reader_pop(jreader, &json, &json_len))
840 /* process JSON data */
841 ovs_db_json_data_process(pdb, json, json_len);
845 OVS_DEBUG("poll thread has been completed");
846 ovs_json_reader_free(jreader);
850 /* EVENT worker thread.
851 * Perform task based on incoming events. This
852 * task can be done asynchronously which allows to
853 * handle OVS DB callback like 'init_cb'.
855 static void *ovs_event_worker(void *arg) {
856 ovs_db_t *pdb = (ovs_db_t *)arg;
858 while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
859 /* wait for an event */
861 clock_gettime(CLOCK_REALTIME, &ts);
862 ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
863 int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
864 &pdb->event_thread.mutex, &ts);
866 /* handle the event */
867 OVS_DEBUG("handle event %d", pdb->event_thread.value);
868 switch (pdb->event_thread.value) {
869 case OVS_DB_EVENT_CONN_ESTABLISHED:
870 if (pdb->cb.post_conn_init)
871 pdb->cb.post_conn_init(pdb);
873 case OVS_DB_EVENT_CONN_TERMINATED:
874 if (pdb->cb.post_conn_terminate)
875 pdb->cb.post_conn_terminate();
878 OVS_DEBUG("unknown event received");
881 } else if (ret == ETIMEDOUT) {
883 OVS_DEBUG("no event received (timeout)");
886 /* unexpected error */
887 OVS_ERROR("pthread_cond_timedwait() failed");
892 OVS_DEBUG("event thread has been completed");
896 /* Initialize EVENT thread */
897 static int ovs_db_event_thread_init(ovs_db_t *pdb) {
898 pdb->event_thread.tid = (pthread_t){0};
899 /* init event thread condition variable */
900 if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
903 /* init event thread mutex */
904 if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
905 pthread_cond_destroy(&pdb->event_thread.cond);
908 /* Hold the event thread mutex. It ensures that no events
909 * will be lost while thread is still starting. Once event
910 * thread is started and ready to accept events, it will release
912 if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
913 pthread_mutex_destroy(&pdb->event_thread.mutex);
914 pthread_cond_destroy(&pdb->event_thread.cond);
917 /* start event thread */
919 if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb,
920 "utils_ovs:event") != 0) {
921 pthread_mutex_unlock(&pdb->event_thread.mutex);
922 pthread_mutex_destroy(&pdb->event_thread.mutex);
923 pthread_cond_destroy(&pdb->event_thread.cond);
926 pdb->event_thread.tid = tid;
930 /* Destroy EVENT thread */
931 /* XXX: Must hold pdb->mutex when calling! */
932 static int ovs_db_event_thread_destroy(ovs_db_t *pdb) {
933 if (pthread_equal(pdb->event_thread.tid, (pthread_t){0})) {
934 /* already destroyed */
937 ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
938 if (pthread_join(pdb->event_thread.tid, NULL) != 0)
940 /* Event thread always holds the thread mutex when
941 * performs some task (handles event) and releases it when
942 * while sleeping. Thus, if event thread exits, the mutex
944 pthread_mutex_unlock(&pdb->event_thread.mutex);
945 pthread_mutex_destroy(&pdb->event_thread.mutex);
946 pthread_cond_destroy(&pdb->event_thread.cond);
947 pdb->event_thread.tid = (pthread_t){0};
951 /* Initialize POLL thread */
952 static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
953 pdb->poll_thread.tid = (pthread_t){0};
954 /* init event thread mutex */
955 if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
958 /* start poll thread */
960 pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
961 if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb,
962 "utils_ovs:poll") != 0) {
963 pthread_mutex_destroy(&pdb->poll_thread.mutex);
966 pdb->poll_thread.tid = tid;
970 /* Destroy POLL thread */
971 /* XXX: Must hold pdb->mutex when calling! */
972 static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
973 if (pthread_equal(pdb->poll_thread.tid, (pthread_t){0})) {
974 /* already destroyed */
977 /* change thread state */
978 pthread_mutex_lock(&pdb->poll_thread.mutex);
979 pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
980 pthread_mutex_unlock(&pdb->poll_thread.mutex);
981 /* join the thread */
982 if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
984 pthread_mutex_destroy(&pdb->poll_thread.mutex);
985 pdb->poll_thread.tid = (pthread_t){0};
990 * Public OVS DB API implementation
993 ovs_db_t *ovs_db_init(const char *node, const char *service,
994 const char *unix_path, ovs_db_callback_t *cb) {
996 if (node == NULL || service == NULL || unix_path == NULL)
999 /* allocate db data & fill it */
1000 ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb));
1004 /* store the OVS DB address */
1005 sstrncpy(pdb->node, node, sizeof(pdb->node));
1006 sstrncpy(pdb->service, service, sizeof(pdb->service));
1007 sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
1009 /* setup OVS DB callbacks */
1013 /* init OVS DB mutex attributes */
1014 pthread_mutexattr_t mutex_attr;
1015 if (pthread_mutexattr_init(&mutex_attr)) {
1016 OVS_ERROR("OVS DB mutex attribute init failed");
1020 /* set OVS DB mutex as recursive */
1021 if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
1022 OVS_ERROR("Failed to set OVS DB mutex as recursive");
1023 pthread_mutexattr_destroy(&mutex_attr);
1027 /* init OVS DB mutex */
1028 if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1029 OVS_ERROR("OVS DB mutex init failed");
1030 pthread_mutexattr_destroy(&mutex_attr);
1034 /* destroy mutex attributes */
1035 pthread_mutexattr_destroy(&mutex_attr);
1037 /* init event thread */
1038 if (ovs_db_event_thread_init(pdb) < 0) {
1039 ovs_db_destroy(pdb);
1043 /* init polling thread */
1045 if (ovs_db_poll_thread_init(pdb) < 0) {
1046 ovs_db_destroy(pdb);
1052 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
1053 ovs_db_result_cb_t cb) {
1055 yajl_gen_status yajl_gen_ret;
1058 ovs_callback_t *new_cb = NULL;
1060 char uid_buff[OVS_UID_STR_SIZE];
1061 const char *req = NULL;
1066 if (!pdb || !method || !params)
1069 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1072 /* try to parse params */
1073 if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1074 OVS_ERROR("params is not a JSON string");
1075 yajl_gen_clear(jgen);
1079 /* generate method field */
1080 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1082 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1083 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1085 /* generate params field */
1086 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1087 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1088 yajl_tree_free(jparams);
1090 /* generate id field */
1091 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1092 uid = ovs_uid_generate();
1093 snprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1094 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1096 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1099 /* register result callback */
1100 if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1101 goto yajl_gen_failure;
1103 /* add new callback to front */
1104 sem_init(&new_cb->result.sync, 0, 0);
1105 new_cb->result.call = cb;
1107 ovs_db_callback_add(pdb, new_cb);
1110 /* send the request */
1111 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1112 OVS_DEBUG("%s", req);
1113 if (!ovs_db_data_send(pdb, req, req_len)) {
1115 /* wait for result */
1116 clock_gettime(CLOCK_REALTIME, &ts);
1117 ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1118 if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1119 OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1120 OVS_DB_SEND_REQ_TIMEOUT);
1125 OVS_ERROR("ovs_db_data_send() failed");
1131 /* destroy callback */
1132 sem_destroy(&new_cb->result.sync);
1133 ovs_db_callback_remove(pdb, new_cb);
1136 /* release memory */
1137 yajl_gen_clear(jgen);
1138 return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1141 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1142 const char **tb_column,
1143 ovs_db_table_cb_t update_cb,
1144 ovs_db_result_cb_t result_cb, unsigned int flags) {
1146 yajl_gen_status yajl_gen_ret;
1147 ovs_callback_t *new_cb = NULL;
1148 char uid_str[OVS_UID_STR_SIZE];
1154 if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1157 /* allocate new update callback */
1158 if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1161 /* init YAJL generator */
1162 if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
1167 /* add new callback to front */
1168 new_cb->table.call = update_cb;
1169 new_cb->uid = ovs_uid_generate();
1170 ovs_db_callback_add(pdb, new_cb);
1172 /* make update notification request
1173 * [<db-name>, <json-value>, <monitor-requests>] */
1174 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1176 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1178 /* uid string <json-value> */
1179 snprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1180 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1182 /* <monitor-requests> */
1183 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1185 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1186 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1188 /* <monitor-request> */
1189 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1192 /* columns within the table to be monitored */
1193 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1194 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1195 for (; *tb_column; tb_column++)
1196 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1197 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1199 /* specify select option */
1200 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1202 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1204 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1205 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1206 flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1207 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1208 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1209 flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1210 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1211 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1212 flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1213 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1214 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1215 flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1217 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1220 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1222 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1224 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1226 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1228 /* make a request to subscribe to given table */
1229 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)¶ms,
1231 if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1232 OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1237 /* release memory */
1238 yajl_gen_clear(jgen);
1242 int ovs_db_destroy(ovs_db_t *pdb) {
1250 /* try to lock the structure before releasing */
1251 if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1252 OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
1256 /* stop poll thread */
1257 if (ovs_db_event_thread_destroy(pdb) < 0) {
1258 OVS_ERROR("destroy poll thread failed");
1262 /* stop event thread */
1263 if (ovs_db_poll_thread_destroy(pdb) < 0) {
1264 OVS_ERROR("stop event thread failed");
1268 pthread_mutex_unlock(&pdb->mutex);
1270 /* unsubscribe callbacks */
1271 ovs_db_callback_remove_all(pdb);
1273 /* close connection */
1277 /* release DB handler */
1278 pthread_mutex_destroy(&pdb->mutex);
1284 * Public OVS utils API implementation
1287 /* Get YAJL value by key from YAJL dictionary
1291 * "key_a" : <YAJL return value>
1292 * "key_b" : <YAJL return value>
1295 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1296 const char *obj_key = NULL;
1299 if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1302 /* find a value by key */
1303 for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1304 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1305 if (strcmp(obj_key, key) == 0)
1306 return YAJL_GET_OBJECT(jval)->values[i];
1312 /* Get OVS DB map value by given map key
1317 * A 2-element JSON array that represents a pair within a database
1318 * map. The first element is an <atom> that represents the key, and
1319 * the second element is an <atom> that represents the value.
1322 * A 2-element JSON array that represents a database map value. The
1323 * first element of the array must be the string "map", and the
1324 * second element must be an array of zero or more <pair>s giving the
1325 * values in the map. All of the <pair>s must have the same key and
1331 * [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
1335 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1337 size_t array_len = 0;
1338 yajl_val *map_values = NULL;
1339 yajl_val *array_values = NULL;
1340 const char *str_val = NULL;
1342 /* check YAJL array */
1343 if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1346 /* check a database map value (2-element, first one should be a string */
1347 array_len = YAJL_GET_ARRAY(jval)->len;
1348 array_values = YAJL_GET_ARRAY(jval)->values;
1349 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1350 (!YAJL_IS_ARRAY(array_values[1])))
1353 /* check first element of the array */
1354 str_val = YAJL_GET_STRING(array_values[0]);
1355 if (strcmp("map", str_val) != 0)
1358 /* try to find map value by map key */
1359 map_len = YAJL_GET_ARRAY(array_values[1])->len;
1360 map_values = YAJL_GET_ARRAY(array_values[1])->values;
1361 for (size_t i = 0; i < map_len; i++) {
1362 /* check YAJL array */
1363 if (!YAJL_IS_ARRAY(map_values[i]))
1366 /* check a database pair value (2-element, first one represents a key
1367 * and it should be a string in our case */
1368 array_len = YAJL_GET_ARRAY(map_values[i])->len;
1369 array_values = YAJL_GET_ARRAY(map_values[i])->values;
1370 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1373 /* return map value if given key equals map key */
1374 str_val = YAJL_GET_STRING(array_values[0]);
1375 if (strcmp(key, str_val) == 0)
1376 return array_values[1];