2 * collectd - src/utils_ovs.c
4 * Copyright(c) 2016 Intel Corporation. All rights reserved.
6 * Permission is hereby granted, free of charge, to any person obtaining a copy of
7 * this software and associated documentation files (the "Software"), to deal in
8 * the Software without restriction, including without limitation the rights to
9 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10 * of the Software, and to permit persons to whom the Software is furnished to do
11 * so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25 * Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
28 /* clang-format off */
30 * OVS DB API internal architecture diagram
31 * +------------------------------------------------------------------------------+
32 * |OVS plugin |OVS utils |
33 * | | +------------------------+ |
34 * | | | echo handler | JSON request/ |
35 * | | +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
36 * | | | | | | | result |
37 * | | | +------------------------+ | | |
38 * | | | | +----+---+--------+ |
39 * | +----------+ | | +------------------------+ | | | | |
40 * | | update | | | | update handler | | | YAJL | JSON | |
41 * | | callback +<-------+(ovs_db_table_update_cp)+<---+ | parser | reader | |
42 * | +----------+ | | | | | | | | |
43 * | | | +------------------------+ | +--------+---+----+ |
45 * | +----------+ | | +------------------------+ | | |
46 * | | result | | | | result handler | | | |
47 * | | callback +<-------+ (ovs_db_result_cb) +<---+ JSON raw | |
48 * | +----------+ | | | | data | |
49 * | | | +------------------------+ | |
51 * | | | +------------------+ +------------+----+ |
52 * | +----------+ | | |thread| | |thread| | |
53 * | | init | | | | | reconnect | | |
54 * | | callback +<---------+ EVENT WORKER +<------------+ POLL WORKER | |
55 * | +----------+ | | +------------------+ +--------+--------+ |
57 * +----------------+-------------------------------------------------------------+
59 * JSON|echo reply raw|data
61 * +-------------------+----------------------------------------------+-----------+
63 * +-------------------------------------------------------------------------------
67 /* collectd headers */
73 #include "utils_ovs.h"
75 /* system libraries */
80 #include <arpa/inet.h>
89 #include <semaphore.h>
91 #define OVS_ERROR(fmt, ...) \
93 ERROR("ovs_utils: " fmt, ##__VA_ARGS__); \
95 #define OVS_DEBUG(fmt, ...) \
97 DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__, \
101 #define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
102 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
103 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
104 #define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
106 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
107 #define OVS_DB_EVENT_TERMINATE 1
108 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
109 #define OVS_DB_EVENT_CONN_TERMINATED 3
111 #define OVS_DB_POLL_STATE_RUNNING 1
112 #define OVS_DB_POLL_STATE_EXITING 2
114 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
116 #define OVS_YAJL_CALL(func, ...) \
118 yajl_gen_ret = yajl_gen_status_ok; \
119 if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
120 goto yajl_gen_failure; \
122 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
123 #define OVS_ERROR_BUFF_SIZE 512
124 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
126 /* JSON reader internal data */
127 struct ovs_json_reader_s {
133 typedef struct ovs_json_reader_s ovs_json_reader_t;
135 /* Result callback declaration */
136 struct ovs_result_cb_s {
138 ovs_db_result_cb_t call;
140 typedef struct ovs_result_cb_s ovs_result_cb_t;
142 /* Table callback declaration */
143 struct ovs_table_cb_s {
144 ovs_db_table_cb_t call;
146 typedef struct ovs_table_cb_s ovs_table_cb_t;
148 /* Callback declaration */
149 struct ovs_callback_s {
152 ovs_result_cb_t result;
153 ovs_table_cb_t table;
155 struct ovs_callback_s *next;
156 struct ovs_callback_s *prev;
158 typedef struct ovs_callback_s ovs_callback_t;
160 /* Event thread data declaration */
161 struct ovs_event_thread_s {
163 pthread_mutex_t mutex;
167 typedef struct ovs_event_thread_s ovs_event_thread_t;
169 /* Poll thread data declaration */
170 struct ovs_poll_thread_s {
172 pthread_mutex_t mutex;
175 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
177 /* OVS DB internal data declaration */
179 ovs_poll_thread_t poll_thread;
180 ovs_event_thread_t event_thread;
181 pthread_mutex_t mutex;
182 ovs_callback_t *remote_cb;
183 ovs_db_callback_t cb;
184 char service[OVS_DB_ADDR_SERVICE_SIZE];
185 char node[OVS_DB_ADDR_NODE_SIZE];
189 /* Post an event to event thread.
190 * Possible events are:
191 * OVS_DB_EVENT_TERMINATE
192 * OVS_DB_EVENT_CONN_ESTABLISHED
193 * OVS_DB_EVENT_CONN_TERMINATED
195 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
196 pthread_mutex_lock(&pdb->event_thread.mutex);
197 pdb->event_thread.value = event;
198 pthread_mutex_unlock(&pdb->event_thread.mutex);
199 pthread_cond_signal(&pdb->event_thread.cond);
202 /* Check if POLL thread is still running. Returns
203 * 1 if running otherwise 0 is returned */
204 static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
206 pthread_mutex_lock(&pdb->poll_thread.mutex);
207 state = pdb->poll_thread.state;
208 pthread_mutex_unlock(&pdb->poll_thread.mutex);
209 return (state == OVS_DB_POLL_STATE_RUNNING);
212 /* Terminate POLL thread */
213 static void ovs_db_poll_terminate(ovs_db_t *pdb) {
214 pthread_mutex_lock(&pdb->poll_thread.mutex);
215 pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
216 pthread_mutex_unlock(&pdb->poll_thread.mutex);
219 /* Generate unique identifier (UID). It is used by OVS DB API
220 * to set "id" field for any OVS DB JSON request. */
221 static uint64_t ovs_uid_generate() {
223 clock_gettime(CLOCK_MONOTONIC, &ts);
224 return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
228 * Callback API. These function are used to store
229 * registered callbacks in OVS DB API.
232 /* Add new callback into OVS DB object */
233 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
234 pthread_mutex_lock(&pdb->mutex);
236 pdb->remote_cb->prev = new_cb;
237 new_cb->next = pdb->remote_cb;
239 pdb->remote_cb = new_cb;
240 pthread_mutex_unlock(&pdb->mutex);
243 /* Remove callback from OVS DB object */
244 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
245 ovs_callback_t *pre_cb = del_cb->prev;
246 ovs_callback_t *next_cb = del_cb->next;
248 pthread_mutex_lock(&pdb->mutex);
250 next_cb->prev = del_cb->prev;
253 pre_cb->next = del_cb->next;
255 pdb->remote_cb = del_cb->next;
258 pthread_mutex_unlock(&pdb->mutex);
261 /* Remove all callbacks form OVS DB object */
262 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
263 pthread_mutex_lock(&pdb->mutex);
264 for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
265 del_cb = pdb->remote_cb) {
266 pdb->remote_cb = del_cb->next;
269 pdb->remote_cb = NULL;
270 pthread_mutex_unlock(&pdb->mutex);
273 /* Get/find callback in OVS DB object by UID. Returns pointer
274 * to requested callback otherwise NULL is returned.
277 * The OVS DB mutex should be locked by the caller
278 * to make sure that returned callback is still valid.
280 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
281 for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
287 /* Send all requested data to the socket. Returns 0 if
288 * ALL request data has been sent otherwise negative value
290 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
296 if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
298 rem -= (size_t)nbytes;
299 off += (size_t)nbytes;
305 * YAJL (Yet Another JSON Library) helper functions
306 * Documentation (https://lloyd.github.io/yajl/)
309 /* Add null-terminated string into YAJL generator handle (JSON object).
310 * Similar function to yajl_gen_string() but takes null-terminated string
311 * instead of string and its length.
313 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
314 * string - Null-terminated string
316 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
317 const char *string) {
318 return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
321 /* Add YAJL value into YAJL generator handle (JSON object)
323 * jgen - YAJL generator handle allocated by yajl_gen_alloc()
324 * jval - YAJL value usually returned by yajl_tree_get()
326 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
327 size_t array_len = 0;
328 yajl_val *jvalues = NULL;
329 yajl_val jobj_value = NULL;
330 const char *obj_key = NULL;
332 yajl_gen_status yajl_gen_ret;
334 if (YAJL_IS_STRING(jval))
335 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
336 else if (YAJL_IS_DOUBLE(jval))
337 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
338 else if (YAJL_IS_INTEGER(jval))
339 OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
340 else if (YAJL_IS_TRUE(jval))
341 OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
342 else if (YAJL_IS_FALSE(jval))
343 OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
344 else if (YAJL_IS_NULL(jval))
345 OVS_YAJL_CALL(yajl_gen_null, jgen);
346 else if (YAJL_IS_ARRAY(jval)) {
347 /* create new array and add all elements into the array */
348 array_len = YAJL_GET_ARRAY(jval)->len;
349 jvalues = YAJL_GET_ARRAY(jval)->values;
350 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
351 for (int i = 0; i < array_len; i++)
352 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
353 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
354 } else if (YAJL_IS_OBJECT(jval)) {
355 /* create new object and add all elements into the object */
356 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
357 obj_len = YAJL_GET_OBJECT(jval)->len;
358 for (int i = 0; i < obj_len; i++) {
359 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
360 jobj_value = YAJL_GET_OBJECT(jval)->values[i];
361 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
362 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
364 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
366 OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
368 goto yajl_gen_failure;
370 return yajl_gen_status_ok;
373 OVS_ERROR("%s() error to generate value", __FUNCTION__);
377 /* OVS DB echo request handler. When OVS DB sends
378 * "echo" request to the client, client should generate
379 * "echo" replay with the same content received in the
381 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
386 const char *resp = NULL;
387 const char *params_path[] = {"params", NULL};
388 const char *id_path[] = {"id", NULL};
389 yajl_gen_status yajl_gen_ret;
391 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
394 /* check & get request attributes */
395 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
396 ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
397 OVS_ERROR("parse echo request failed");
398 goto yajl_gen_failure;
401 /* generate JSON echo response */
402 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
404 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
405 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
407 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
408 OVS_YAJL_CALL(yajl_gen_null, jgen);
410 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
411 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
413 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
414 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
417 /* send the response */
418 OVS_DEBUG("response: %s", resp);
419 if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
420 OVS_ERROR("send echo reply failed");
421 goto yajl_gen_failure;
423 /* clean up and return success */
424 yajl_gen_clear(jgen);
429 yajl_gen_clear(jgen);
433 /* Get OVS DB registered callback by YAJL val. The YAJL
434 * value should be YAJL string (UID). Returns NULL if
435 * callback hasn't been found. See also ovs_db_callback_get()
436 * description for addition info.
438 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
440 const char *suid = NULL;
443 if (jid && YAJL_IS_STRING(jid)) {
444 suid = YAJL_GET_STRING(jid);
445 uid = (uint64_t)strtoul(suid, &endptr, 16);
446 if (*endptr == '\0' && uid)
447 return ovs_db_callback_get(pdb, uid);
453 /* OVS DB table update event handler.
454 * This callback is called by POLL thread if OVS DB
455 * table update callback is received from the DB
456 * server. Once registered callback found, it's called
457 * by this handler. */
458 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
459 ovs_callback_t *cb = NULL;
462 yajl_val jtable_updates;
463 yajl_val jtable_update;
465 const char *table_name = NULL;
466 const char *params_path[] = {"params", NULL};
467 const char *id_path[] = {"id", NULL};
469 /* check & get request attributes */
470 if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
471 (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
474 /* check array length: [<json-value>, <table-updates>] */
475 if (YAJL_GET_ARRAY(jparams)->len != 2)
478 jvalue = YAJL_GET_ARRAY(jparams)->values[0];
479 jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
480 if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
483 /* find registered callback based on <json-value> */
484 pthread_mutex_lock(&pdb->mutex);
485 cb = ovs_db_table_callback_get(pdb, jvalue);
486 if (cb == NULL || cb->table.call == NULL) {
487 pthread_mutex_unlock(&pdb->mutex);
491 /* call registered callback */
492 cb->table.call(jtable_updates);
493 pthread_mutex_unlock(&pdb->mutex);
497 OVS_ERROR("invalid OVS DB table update event");
501 /* OVS DB result request handler.
502 * This callback is called by POLL thread if OVS DB
503 * result reply is received from the DB server.
504 * Once registered callback found, it's called
505 * by this handler. */
506 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
507 ovs_callback_t *cb = NULL;
511 const char *result_path[] = {"result", NULL};
512 const char *error_path[] = {"error", NULL};
513 const char *id_path[] = {"id", NULL};
515 jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
516 jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
517 jid = yajl_tree_get(jnode, id_path, yajl_t_string);
519 /* check & get result attributes */
520 if (!jresult || !jerror || !jid)
523 /* try to find registered callback */
524 pthread_mutex_lock(&pdb->mutex);
525 cb = ovs_db_table_callback_get(pdb, jid);
526 if (cb != NULL && cb->result.call != NULL) {
527 /* call registered callback */
528 cb->result.call(jresult, jerror);
529 /* unlock owner of the reply */
530 sem_post(&cb->result.sync);
533 pthread_mutex_unlock(&pdb->mutex);
537 /* Handle JSON data (one request) and call
538 * appropriate event OVS DB handler. Currently,
539 * update callback 'ovs_db_table_update_cb' and
540 * result callback 'ovs_db_result_cb' is supported.
542 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
544 const char *method = NULL;
545 char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
546 const char *method_path[] = {"method", NULL};
547 const char *result_path[] = {"result", NULL};
549 yajl_val jnode, jval;
551 /* duplicate the data to make null-terminated string
552 * required for yajl_tree_parse() */
553 if ((sjson = malloc(len + 1)) == NULL)
556 sstrncpy(sjson, data, len + 1);
557 OVS_DEBUG("[len=%zu] %s", len, sjson);
559 /* parse json data */
560 jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
562 OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
567 /* get method name */
568 if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
569 method = YAJL_GET_STRING(jval);
570 if (strcmp("echo", method) == 0) {
571 /* echo request from the server */
572 if (ovs_db_table_echo_cb(pdb, jnode) < 0)
573 OVS_ERROR("handle echo request failed");
574 } else if (strcmp("update", method) == 0) {
575 /* update notification */
576 if (ovs_db_table_update_cb(pdb, jnode) < 0)
577 OVS_ERROR("handle update notification failed");
579 } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
580 /* result notification */
581 if (ovs_db_result_cb(pdb, jnode) < 0)
582 OVS_ERROR("handle result reply failed");
584 OVS_ERROR("connot find method or result failed");
587 yajl_tree_free(jnode);
593 * JSON reader implementation.
595 * This module process raw JSON data (byte stream) and
596 * returns fully-fledged JSON data which can be processed
597 * (parsed) by YAJL later.
600 /* Allocate JSON reader instance */
601 static ovs_json_reader_t *ovs_json_reader_alloc() {
602 ovs_json_reader_t *jreader = NULL;
604 if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
610 /* Push raw data into into the JSON reader for processing */
611 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
612 const char *data, size_t data_len) {
613 char *new_buff = NULL;
614 size_t available = jreader->buff_size - jreader->buff_offset;
616 /* check/update required memory space */
617 if (available < data_len) {
618 OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
619 (int)jreader->buff_size, (int)available, (int)data_len);
621 /* allocate new chunk of memory */
622 new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
623 if (new_buff == NULL)
626 /* point to new allocated memory */
627 jreader->buff_ptr = new_buff;
628 jreader->buff_size += data_len;
631 /* store input data */
632 memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
633 jreader->buff_offset += data_len;
637 /* Pop one fully-fledged JSON if already exists. Returns 0 if
638 * completed JSON already exists otherwise negative value is
640 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
641 const char **json_ptr, size_t *json_len_ptr) {
646 /* search open/close brace */
647 for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
648 if (jreader->buff_ptr[i] == '{') {
650 } else if (jreader->buff_ptr[i] == '}')
654 *json_ptr = jreader->buff_ptr + jreader->json_offset;
655 *json_len_ptr = json_len + 1;
656 jreader->json_offset = i + 1;
660 /* increase JSON data length */
665 if (jreader->json_offset) {
666 if (jreader->json_offset < jreader->buff_offset) {
667 /* shift data to the beginning of the buffer
668 * and zero rest of the buffer data */
669 json = &jreader->buff_ptr[jreader->json_offset];
670 json_len = jreader->buff_offset - jreader->json_offset;
671 for (int i = 0; i < jreader->buff_size; i++)
672 jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
673 jreader->buff_offset = json_len;
675 /* reset the buffer */
676 jreader->buff_offset = 0;
678 /* data is at the beginning of the buffer */
679 jreader->json_offset = 0;
685 /* Reset JSON reader. It is useful when start processing
686 * new raw data. E.g.: in case of lost stream connection.
688 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
690 jreader->buff_offset = 0;
691 jreader->json_offset = 0;
695 /* Release internal data allocated for JSON reader */
696 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
698 free(jreader->buff_ptr);
703 /* Reconnect to OVD DB and call the OVS DB post connection init callback
704 * if connection has been established.
706 static int ovs_db_reconnect(ovs_db_t *pdb) {
707 char errbuff[OVS_ERROR_BUFF_SIZE];
708 const char unix_prefix[] = "unix:";
709 struct addrinfo *result, *rp;
710 _Bool is_connected = 0;
711 struct sockaddr_un saunix;
713 /* remove all registered OVS DB table/result callbacks */
714 ovs_db_callback_remove_all(pdb);
716 if (strncmp(pdb->node, unix_prefix, strlen(unix_prefix)) == 0) {
717 /* create unix socket address */
718 rp = calloc(1, sizeof(struct addrinfo));
719 struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
720 if (rp == NULL || sa_unix == NULL) {
725 rp->ai_family = AF_UNIX;
726 rp->ai_socktype = SOCK_STREAM;
727 rp->ai_addrlen = sizeof(*sa_unix);
728 rp->ai_addr = (struct sockaddr *)sa_unix;
729 sa_unix->sun_family = rp->ai_family;
730 sstrncpy(sa_unix->sun_path, (pdb->node + strlen(unix_prefix)),
731 sizeof(sa_unix->sun_path));
734 /* intet socket address */
736 struct addrinfo hints;
738 /* setup criteria for selecting the socket address */
739 memset(&hints, 0, sizeof(hints));
740 hints.ai_family = AF_UNSPEC;
741 hints.ai_socktype = SOCK_STREAM;
743 /* get socket addresses */
744 if ((ret = getaddrinfo(pdb->node, pdb->service, &hints, &result)) != 0) {
745 OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
749 /* try to connect to the server */
750 for (rp = result; rp != NULL; rp = rp->ai_next) {
751 if ((pdb->sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol)) <
753 sstrerror(errno, errbuff, sizeof(errbuff));
754 OVS_DEBUG("socket(): %s", errbuff);
757 if (connect(pdb->sock, rp->ai_addr, rp->ai_addrlen) < 0) {
758 sstrerror(errno, errbuff, sizeof(errbuff));
759 OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
767 /* send notification to event thread */
769 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
771 OVS_ERROR("connect to \"%s\" failed", pdb->node);
773 freeaddrinfo(result);
774 return !is_connected;
777 /* POLL worker thread.
778 * It listens on OVS DB connection for incoming
779 * requests/reply/events etc. Also, it reconnects to OVS DB
780 * if connection has been lost.
782 static void *ovs_poll_worker(void *arg) {
783 ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
784 ovs_json_reader_t *jreader = NULL;
788 char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
789 struct pollfd poll_fd;
792 if ((jreader = ovs_json_reader_alloc()) == NULL) {
793 OVS_ERROR("initialize json reader failed");
797 /* start polling data */
798 poll_fd.fd = pdb->sock;
799 poll_fd.events = POLLIN | POLLPRI;
803 while (ovs_db_poll_is_running(pdb)) {
804 poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
806 if (poll_fd.revents & POLLNVAL) {
807 /* invalid file descriptor, reconnect */
808 if (ovs_db_reconnect(pdb) != 0) {
809 /* sleep awhile until next reconnect */
810 usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
812 ovs_json_reader_reset(jreader);
813 poll_fd.fd = pdb->sock;
814 } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
815 /* connection is broken */
816 OVS_ERROR("poll() peer closed its end of the channel");
817 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
819 } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
820 /* read incoming data */
821 nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
823 OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
824 ovs_json_reader_push_data(jreader, buff, nbytes);
825 while (!ovs_json_reader_pop(jreader, &json, &json_len))
826 /* process JSON data */
827 ovs_db_json_data_process(pdb, json, json_len);
828 } else if (nbytes == 0) {
829 OVS_ERROR("recv() peer has performed an orderly shutdown");
830 ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
833 OVS_ERROR("recv() receive data error");
836 } /* poll() POLLIN & POLLPRI */
837 } else if (poll_ret == 0)
838 OVS_DEBUG("poll() timeout");
840 OVS_ERROR("poll() error");
846 OVS_DEBUG("poll thread has been completed");
847 ovs_json_reader_free(jreader);
848 pthread_exit((void *)0);
852 /* EVENT worker thread.
853 * Perform task based on incoming events. This
854 * task can be done asynchronously which allows to
855 * handle OVD DB callback like 'init_cb'.
857 static void *ovs_event_worker(void *arg) {
859 ovs_db_t *pdb = (ovs_db_t *)arg;
862 while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
863 /* wait for an event */
864 clock_gettime(CLOCK_REALTIME, &ts);
865 ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
866 ret = pthread_cond_timedwait(&pdb->event_thread.cond,
867 &pdb->event_thread.mutex, &ts);
869 /* handle the event */
870 OVS_DEBUG("handle event %d", pdb->event_thread.value);
871 switch (pdb->event_thread.value) {
872 case OVS_DB_EVENT_CONN_ESTABLISHED:
873 if (pdb->cb.post_conn_init)
874 pdb->cb.post_conn_init(pdb);
876 case OVS_DB_EVENT_CONN_TERMINATED:
877 if (pdb->cb.post_conn_terminate)
878 pdb->cb.post_conn_terminate();
881 OVS_DEBUG("unknown event received");
884 } else if (ret == ETIMEDOUT) {
886 OVS_DEBUG("no event received (timeout)");
889 /* unexpected error */
890 OVS_ERROR("pthread_cond_timedwait() failed");
896 OVS_DEBUG("event thread has been completed");
897 pthread_exit((void *)0);
901 /* Stop EVENT thread */
902 static int ovs_db_event_thread_stop(ovs_db_t *pdb) {
903 ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
904 if (pthread_join(pdb->event_thread.tid, NULL) != 0)
906 pthread_mutex_unlock(&pdb->event_thread.mutex);
907 pthread_mutex_destroy(&pdb->event_thread.mutex);
911 /* Stop POLL thread */
912 static int ovs_db_poll_thread_stop(ovs_db_t *pdb) {
913 ovs_db_poll_terminate(pdb);
914 if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
916 pthread_mutex_destroy(&pdb->poll_thread.mutex);
921 * Public OVS DB API implementation
924 ovs_db_t *ovs_db_init(const char *node, const char *service,
925 ovs_db_callback_t *cb) {
926 pthread_mutexattr_t mutex_attr;
927 ovs_db_t *pdb = NULL;
929 /* allocate db data & fill it */
930 if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
933 /* node cannot be unset */
934 if (node == NULL || strlen(node) == 0)
937 /* store the OVS DB address */
938 sstrncpy(pdb->node, node, sizeof(pdb->node));
940 sstrncpy(pdb->service, service, sizeof(pdb->service));
942 /* setup OVS DB callbacks */
946 /* prepare event thread */
947 pthread_cond_init(&pdb->event_thread.cond, NULL);
948 pthread_mutex_init(&pdb->event_thread.mutex, NULL);
949 pthread_mutex_lock(&pdb->event_thread.mutex);
950 if (plugin_thread_create(&pdb->event_thread.tid, NULL, ovs_event_worker,
952 OVS_ERROR("event worker start failed");
956 /* prepare polling thread */
957 ovs_db_reconnect(pdb);
958 pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
959 pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
960 if (plugin_thread_create(&pdb->poll_thread.tid, NULL, ovs_poll_worker, pdb) !=
962 OVS_ERROR("pull worker start failed");
966 /* init OVS DB mutex */
967 if (pthread_mutexattr_init(&mutex_attr) ||
968 pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
969 pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
970 OVS_ERROR("OVS DB mutex init failed");
974 /* return db to the caller */
979 /* close connection */
981 if (pdb->event_thread.tid != 0)
982 /* stop event thread */
983 if (ovs_db_event_thread_stop(pdb) < 0)
984 OVS_ERROR("stop event thread failed");
985 if (pdb->poll_thread.tid != 0)
986 /* stop poll thread */
987 if (ovs_db_poll_thread_stop(pdb) < 0)
988 OVS_ERROR("stop poll thread failed");
993 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
994 ovs_db_result_cb_t cb) {
996 yajl_gen_status yajl_gen_ret;
999 ovs_callback_t *new_cb = NULL;
1001 char uid_buff[OVS_UID_STR_SIZE];
1002 const char *req = NULL;
1007 if (!pdb || !method || !params)
1010 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1013 /* try to parse params */
1014 if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1015 OVS_ERROR("params is not a JSON string");
1016 yajl_gen_clear(jgen);
1020 /* generate method field */
1021 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1023 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1024 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1026 /* generate params field */
1027 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1028 OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1029 yajl_tree_free(jparams);
1031 /* generate id field */
1032 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1033 uid = ovs_uid_generate();
1034 ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1035 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1037 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1040 /* register result callback */
1041 if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1042 goto yajl_gen_failure;
1044 /* add new callback to front */
1045 sem_init(&new_cb->result.sync, 0, 0);
1046 new_cb->result.call = cb;
1048 ovs_db_callback_add(pdb, new_cb);
1051 /* send the request */
1052 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1053 OVS_DEBUG("%s", req);
1054 if (!ovs_db_data_send(pdb, req, req_len)) {
1056 /* wait for result */
1057 clock_gettime(CLOCK_REALTIME, &ts);
1058 ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1059 if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1060 OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1061 OVS_DB_SEND_REQ_TIMEOUT);
1066 OVS_ERROR("ovs_db_data_send() failed");
1072 /* destroy callback */
1073 sem_destroy(&new_cb->result.sync);
1074 ovs_db_callback_remove(pdb, new_cb);
1077 /* release memory */
1078 yajl_gen_clear(jgen);
1079 return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1082 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1083 const char **tb_column,
1084 ovs_db_table_cb_t update_cb,
1085 ovs_db_result_cb_t result_cb, unsigned int flags) {
1087 yajl_gen_status yajl_gen_ret;
1088 ovs_callback_t *new_cb = NULL;
1089 char uid_str[OVS_UID_STR_SIZE];
1095 if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1098 if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1101 /* register table update callback */
1102 if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1105 /* add new callback to front */
1106 new_cb->table.call = update_cb;
1107 new_cb->uid = ovs_uid_generate();
1108 ovs_db_callback_add(pdb, new_cb);
1110 /* make update notification request
1111 * [<db-name>, <json-value>, <monitor-requests>] */
1112 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1114 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1116 /* uid string <json-value> */
1117 ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1118 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1120 /* <monitor-requests> */
1121 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1123 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1124 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1126 /* <monitor-request> */
1127 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1130 /* columns within the table to be monitored */
1131 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1132 OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1133 for (; *tb_column; tb_column++)
1134 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1135 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1137 /* specify select option */
1138 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1140 OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1142 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1143 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1144 flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1145 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1146 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1147 flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1148 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1149 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1150 flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1151 OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1152 OVS_YAJL_CALL(yajl_gen_bool, jgen,
1153 flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1155 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1158 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1160 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1162 OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1164 OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1166 /* make a request to subscribe to given table */
1167 OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)¶ms,
1169 if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1170 OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1175 /* release memory */
1176 yajl_gen_clear(jgen);
1180 int ovs_db_destroy(ovs_db_t *pdb) {
1188 /* try to lock the structure before releasing */
1189 if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1190 OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
1194 /* stop poll thread */
1195 if (ovs_db_event_thread_stop(pdb) < 0) {
1196 OVS_ERROR("stop poll thread failed");
1200 /* stop event thread */
1201 if (ovs_db_poll_thread_stop(pdb) < 0) {
1202 OVS_ERROR("stop event thread failed");
1206 /* unsubscribe callbacks */
1207 ovs_db_callback_remove_all(pdb);
1209 /* close connection */
1213 /* release DB handler */
1214 pthread_mutex_unlock(&pdb->mutex);
1215 pthread_mutex_destroy(&pdb->mutex);
1221 * Public OVS utils API implementation
1224 /* Get YAJL value by key from YAJL dictionary */
1225 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1226 const char *obj_key = NULL;
1229 if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1232 /* find a value by key */
1233 for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1234 obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1235 if (strcmp(obj_key, key) == 0)
1236 return YAJL_GET_OBJECT(jval)->values[i];
1242 /* Get OVS DB map value by given map key */
1243 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1245 size_t array_len = 0;
1246 yajl_val *map_values = NULL;
1247 yajl_val *array_values = NULL;
1248 const char *str_val = NULL;
1250 /* check YAJL array */
1251 if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1254 /* check a database map value (2-element, first one should be a string */
1255 array_len = YAJL_GET_ARRAY(jval)->len;
1256 array_values = YAJL_GET_ARRAY(jval)->values;
1257 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1258 (!YAJL_IS_ARRAY(array_values[1])))
1261 /* check first element of the array */
1262 str_val = YAJL_GET_STRING(array_values[0]);
1263 if (strcmp("map", str_val) != 0)
1266 /* try to find map value by map key */
1267 map_len = YAJL_GET_ARRAY(array_values[1])->len;
1268 map_values = YAJL_GET_ARRAY(array_values[1])->values;
1269 for (int i = 0; i < map_len; i++) {
1270 /* check YAJL array */
1271 if (!YAJL_IS_ARRAY(map_values[i]))
1274 /* check a database pair value (2-element, first one represents a key
1275 * and it should be a string in our case */
1276 array_len = YAJL_GET_ARRAY(map_values[i])->len;
1277 array_values = YAJL_GET_ARRAY(map_values[i])->values;
1278 if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1281 /* return map value if given key equals map key */
1282 str_val = YAJL_GET_STRING(array_values[0]);
1283 if (strcmp(key, str_val) == 0)
1284 return array_values[1];