2 * collectd - src/dpdkstat.c
5 * Copyright(c) 2016 Intel Corporation. All rights reserved.
7 * Permission is hereby granted, free of charge, to any person obtaining a copy of
8 * this software and associated documentation files (the "Software"), to deal in
9 * the Software without restriction, including without limitation the rights to
10 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
11 * of the Software, and to permit persons to whom the Software is furnished to do
12 * so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be included in all
15 * copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * Maryam Tahhan <maryam.tahhan@intel.com>
27 * Harry van Haaren <harry.van.haaren@intel.com>
32 #include "common.h" /* auxiliary functions */
33 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
34 #include "utils_time.h"
37 #include <semaphore.h>
39 #include <sys/queue.h>
42 #include <rte_config.h>
44 #include <rte_ethdev.h>
45 #include <rte_common.h>
46 #include <rte_debug.h>
47 #include <rte_malloc.h>
48 #include <rte_memory.h>
49 #include <rte_memzone.h>
50 #include <rte_launch.h>
51 #include <rte_tailq.h>
52 #include <rte_lcore.h>
53 #include <rte_per_lcore.h>
54 #include <rte_debug.h>
56 #include <rte_atomic.h>
57 #include <rte_branch_prediction.h>
58 #include <rte_string_fns.h>
60 #define DPDK_DEFAULT_RTE_CONFIG "/var/run/.rte_config"
61 #define DPDK_MAX_ARGC 8
62 #define DPDKSTAT_MAX_BUFFER_SIZE (4096*4)
63 #define DPDK_SHM_NAME "dpdk_collectd_stats_shm"
64 #define ERR_BUF_SIZE 1024
69 enum DPDK_HELPER_ACTION {
70 DPDK_HELPER_ACTION_COUNT_STATS,
71 DPDK_HELPER_ACTION_SEND_STATS,
74 enum DPDK_HELPER_STATUS {
75 DPDK_HELPER_NOT_INITIALIZED = 0,
76 DPDK_HELPER_WAITING_ON_PRIMARY,
77 DPDK_HELPER_INITIALIZING_EAL,
78 DPDK_HELPER_ALIVE_SENDING_STATS,
79 DPDK_HELPER_GRACEFUL_QUIT,
82 struct dpdk_config_s {
83 /* General DPDK params */
84 char coremask[DATA_MAX_NAME_LEN];
85 char memory_channels[DATA_MAX_NAME_LEN];
86 char socket_memory[DATA_MAX_NAME_LEN];
87 char process_type[DATA_MAX_NAME_LEN];
88 char file_prefix[DATA_MAX_NAME_LEN];
90 uint32_t eal_initialized;
91 uint32_t enabled_port_mask;
92 char port_name[RTE_MAX_ETHPORTS][DATA_MAX_NAME_LEN];
95 int collectd_reinit_shm;
97 sem_t sema_helper_get_stats;
98 sem_t sema_stats_in_shm;
100 enum DPDK_HELPER_STATUS helper_status;
101 enum DPDK_HELPER_ACTION helper_action;
105 cdtime_t port_read_time[RTE_MAX_ETHPORTS];
106 uint32_t num_stats_in_port[RTE_MAX_ETHPORTS];
107 struct rte_eth_link link_status[RTE_MAX_ETHPORTS];
108 struct rte_eth_xstats *xstats;
109 /* rte_eth_xstats from here on until the end of the SHM */
111 typedef struct dpdk_config_s dpdk_config_t;
113 static int g_configured;
114 static dpdk_config_t *g_configuration;
116 static void dpdk_config_init_default(void);
117 static int dpdk_config(oconfig_item_t *ci);
118 static int dpdk_helper_init_eal(void);
119 static int dpdk_helper_run(void);
120 static int dpdk_helper_spawn(enum DPDK_HELPER_ACTION action);
121 static int dpdk_init(void);
122 static int dpdk_read(user_data_t *ud);
123 static int dpdk_shm_cleanup(void);
124 static int dpdk_shm_init(size_t size);
126 /* Write the default configuration to the g_configuration instances */
127 static void dpdk_config_init_default(void)
129 g_configuration->interval = plugin_get_interval();
130 if (g_configuration->interval == cf_get_default_interval())
131 WARNING("dpdkstat: No time interval was configured, default value %lu ms is set",
132 CDTIME_T_TO_MS(g_configuration->interval));
133 /* Default is all ports enabled */
134 g_configuration->enabled_port_mask = ~0;
135 g_configuration->eal_argc = DPDK_MAX_ARGC;
136 g_configuration->eal_initialized = 0;
137 ssnprintf(g_configuration->coremask, DATA_MAX_NAME_LEN, "%s", "0xf");
138 ssnprintf(g_configuration->memory_channels, DATA_MAX_NAME_LEN, "%s", "1");
139 ssnprintf(g_configuration->process_type, DATA_MAX_NAME_LEN, "%s", "secondary");
140 ssnprintf(g_configuration->file_prefix, DATA_MAX_NAME_LEN, "%s",
141 DPDK_DEFAULT_RTE_CONFIG);
143 for (int i = 0; i < RTE_MAX_ETHPORTS; i++)
144 g_configuration->port_name[i][0] = 0;
147 static int dpdk_config(oconfig_item_t *ci)
149 int port_counter = 0;
150 char errbuf[ERR_BUF_SIZE];
151 /* Allocate g_configuration and
152 * initialize a POSIX SHared Memory (SHM) object.
154 int err = dpdk_shm_init(sizeof (dpdk_config_t));
156 DEBUG("dpdkstat: error in shm_init, %s", sstrerror(errno, errbuf,
161 /* Set defaults for config, overwritten by loop if config item exists */
162 dpdk_config_init_default();
164 for (int i = 0; i < ci->children_num; i++) {
165 oconfig_item_t *child = ci->children + i;
167 if (strcasecmp("Coremask", child->key) == 0) {
168 cf_util_get_string_buffer(child, g_configuration->coremask,
169 sizeof (g_configuration->coremask));
170 DEBUG("dpdkstat:COREMASK %s ", g_configuration->coremask);
171 } else if (strcasecmp("MemoryChannels", child->key) == 0) {
172 cf_util_get_string_buffer(child, g_configuration->memory_channels,
173 sizeof (g_configuration->memory_channels));
174 DEBUG("dpdkstat:Memory Channels %s ", g_configuration->memory_channels);
175 } else if (strcasecmp("SocketMemory", child->key) == 0) {
176 cf_util_get_string_buffer(child, g_configuration->socket_memory,
177 sizeof (g_configuration->memory_channels));
178 DEBUG("dpdkstat: socket mem %s ", g_configuration->socket_memory);
179 } else if (strcasecmp("ProcessType", child->key) == 0) {
180 cf_util_get_string_buffer(child, g_configuration->process_type,
181 sizeof (g_configuration->process_type));
182 DEBUG("dpdkstat: proc type %s ", g_configuration->process_type);
183 } else if ((strcasecmp("FilePrefix", child->key) == 0) &&
184 (child->values[0].type == OCONFIG_TYPE_STRING)) {
185 ssnprintf(g_configuration->file_prefix, DATA_MAX_NAME_LEN, "/var/run/.%s_config",
186 child->values[0].value.string);
187 DEBUG("dpdkstat: file prefix %s ", g_configuration->file_prefix);
188 } else if ((strcasecmp("EnabledPortMask", child->key) == 0) &&
189 (child->values[0].type == OCONFIG_TYPE_NUMBER)) {
190 g_configuration->enabled_port_mask = (uint32_t) child->values[0].value.number;
191 DEBUG("dpdkstat: Enabled Port Mask %u", g_configuration->enabled_port_mask);
192 } else if (strcasecmp("PortName", child->key) == 0) {
193 cf_util_get_string_buffer(child, g_configuration->port_name[port_counter],
194 sizeof (g_configuration->port_name[port_counter]));
195 DEBUG("dpdkstat: Port %d Name: %s ", port_counter,
196 g_configuration->port_name[port_counter]);
199 WARNING("dpdkstat: The config option \"%s\" is unknown.",
202 } /* End for (int i = 0; i < ci->children_num; i++)*/
203 g_configured = 1; /* Bypass configuration in dpdk_shm_init(). */
209 * Allocate g_configuration and initialize SHared Memory (SHM)
210 * for config and helper process
212 static int dpdk_shm_init(size_t size)
215 * Check if SHM is already configured: when config items are provided, the
216 * config function initializes SHM. If there is no config, then init() will
222 char errbuf[ERR_BUF_SIZE];
224 /* Create and open a new object, or open an existing object. */
225 int fd = shm_open(DPDK_SHM_NAME, O_CREAT | O_TRUNC | O_RDWR, 0666);
227 WARNING("dpdkstat:Failed to open %s as SHM:%s", DPDK_SHM_NAME,
228 sstrerror(errno, errbuf, sizeof (errbuf)));
231 /* Set the size of the shared memory object. */
232 int ret = ftruncate(fd, size);
234 WARNING("dpdkstat:Failed to resize SHM:%s", sstrerror(errno, errbuf,
238 /* Map the shared memory object into this process' virtual address space. */
239 g_configuration = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
240 if (g_configuration == MAP_FAILED) {
241 WARNING("dpdkstat:Failed to mmap SHM:%s", sstrerror(errno, errbuf,
246 * Close the file descriptor, the shared memory object still exists
247 * and can only be removed by calling shm_unlink().
251 /* Initialize g_configuration. */
252 memset(g_configuration, 0, size);
254 /* Initialize the semaphores for SHM use */
255 int err = sem_init(&g_configuration->sema_helper_get_stats, 1, 0);
257 ERROR("dpdkstat semaphore init failed: %s", sstrerror(errno, errbuf,
261 err = sem_init(&g_configuration->sema_stats_in_shm, 1, 0);
263 ERROR("dpdkstat semaphore init failed: %s", sstrerror(errno, errbuf,
268 g_configuration->xstats = NULL;
275 /* Reset to zero, as it was set to MAP_FAILED aka: (void *)-1. Avoid
276 * an issue if collectd attempts to run this plugin failure.
282 static int dpdk_re_init_shm()
284 dpdk_config_t temp_config;
285 memcpy(&temp_config, g_configuration, sizeof(dpdk_config_t));
286 DEBUG("dpdkstat: %s: ports %"PRIu32", xstats %"PRIu32, __func__, temp_config.num_ports,
287 temp_config.num_xstats);
289 size_t shm_xstats_size = sizeof(dpdk_config_t) + (sizeof(struct rte_eth_xstats) *
290 g_configuration->num_xstats);
291 DEBUG("=== SHM new size for %"PRIu32" xstats", g_configuration->num_xstats);
293 int err = dpdk_shm_cleanup();
295 ERROR("dpdkstat: Error in shm_cleanup in %s", __func__);
298 err = dpdk_shm_init(shm_xstats_size);
300 WARNING("dpdkstat: Error in shm_init in %s", __func__);
303 /* If the XML config() function has been run, don't re-initialize defaults */
305 dpdk_config_init_default();
307 memcpy(g_configuration, &temp_config, sizeof(dpdk_config_t));
308 g_configuration->collectd_reinit_shm = 0;
309 g_configuration->xstats = (struct rte_eth_xstats *) (g_configuration + 1);
313 static int dpdk_init(void)
315 int err = dpdk_shm_init(sizeof(dpdk_config_t));
317 ERROR("dpdkstat: %s : error %d in shm_init()", __func__, err);
321 /* If the XML config() function has been run, dont re-initialize defaults */
323 dpdk_config_init_default();
329 static int dpdk_helper_stop(int reset)
331 g_configuration->helper_status = DPDK_HELPER_GRACEFUL_QUIT;
333 g_configuration->eal_initialized = 0;
334 g_configuration->num_ports = 0;
335 g_configuration->xstats = NULL;
336 g_configuration->num_xstats = 0;
337 for (int i = 0; i < RTE_MAX_ETHPORTS; i++)
338 g_configuration->num_stats_in_port[i] = 0;
340 close(g_configuration->helper_pipes[1]);
341 int err = kill(g_configuration->helper_pid, SIGKILL);
343 char errbuf[ERR_BUF_SIZE];
344 WARNING("dpdkstat: error sending kill to helper: %s", sstrerror(errno, errbuf,
351 static int dpdk_helper_spawn(enum DPDK_HELPER_ACTION action)
353 char errbuf[ERR_BUF_SIZE];
354 g_configuration->eal_initialized = 0;
355 g_configuration->helper_action = action;
357 * Create a pipe for helper stdout back to collectd. This is necessary for
358 * logging EAL failures, as rte_eal_init() calls rte_panic().
360 if (pipe(g_configuration->helper_pipes) != 0) {
361 DEBUG("dpdkstat: Could not create helper pipe: %s", sstrerror(errno, errbuf,
366 int pipe0_flags = fcntl(g_configuration->helper_pipes[0], F_GETFL, 0);
367 int pipe1_flags = fcntl(g_configuration->helper_pipes[1], F_GETFL, 0);
368 if (pipe0_flags == -1 || pipe1_flags == -1) {
369 WARNING("dpdkstat: Failed setting up pipe flags: %s", sstrerror(errno, errbuf,
372 int pipe0_err = fcntl(g_configuration->helper_pipes[0], F_SETFL, pipe1_flags
374 int pipe1_err = fcntl(g_configuration->helper_pipes[1], F_SETFL, pipe0_flags
376 if (pipe0_err == -1 || pipe1_err == -1) {
377 WARNING("dpdkstat: Failed setting up pipes: %s", sstrerror(errno, errbuf,
383 close(g_configuration->helper_pipes[1]);
384 g_configuration->helper_pid = pid;
385 DEBUG("dpdkstat: helper pid %lu", (long)g_configuration->helper_pid);
386 /* Kick helper once its alive to have it start processing */
387 sem_post(&g_configuration->sema_helper_get_stats);
388 } else if (pid == 0) {
389 /* Replace stdout with a pipe to collectd. */
390 close(g_configuration->helper_pipes[0]);
391 close(STDOUT_FILENO);
392 dup2(g_configuration->helper_pipes[1], STDOUT_FILENO);
396 ERROR("dpdkstat: Failed to fork helper process: %s", sstrerror(errno, errbuf,
404 * Initialize the DPDK EAL, if this returns, EAL is successfully initialized.
405 * On failure, the EAL prints an error message, and the helper process exits.
407 static int dpdk_helper_init_eal(void)
409 g_configuration->helper_status = DPDK_HELPER_INITIALIZING_EAL;
410 char *argp[(g_configuration->eal_argc) + 1];
413 argp[i++] = "collectd-dpdk";
414 if (strcasecmp(g_configuration->coremask, "") != 0) {
416 argp[i++] = g_configuration->coremask;
418 if (strcasecmp(g_configuration->memory_channels, "") != 0) {
420 argp[i++] = g_configuration->memory_channels;
422 if (strcasecmp(g_configuration->socket_memory, "") != 0) {
423 argp[i++] = "--socket-mem";
424 argp[i++] = g_configuration->socket_memory;
426 if (strcasecmp(g_configuration->file_prefix, "") != 0 &&
427 strcasecmp(g_configuration->file_prefix, DPDK_DEFAULT_RTE_CONFIG) != 0) {
428 argp[i++] = "--file-prefix";
429 argp[i++] = g_configuration->file_prefix;
431 if (strcasecmp(g_configuration->process_type, "") != 0) {
432 argp[i++] = "--proc-type";
433 argp[i++] = g_configuration->process_type;
435 g_configuration->eal_argc = i;
437 g_configuration->eal_initialized = 1;
438 int ret = rte_eal_init(g_configuration->eal_argc, argp);
440 g_configuration->eal_initialized = 0;
446 static int dpdk_helper_run (void)
448 char errbuf[ERR_BUF_SIZE];
449 pid_t ppid = getppid();
450 g_configuration->helper_status = DPDK_HELPER_WAITING_ON_PRIMARY;
453 /* sem_timedwait() to avoid blocking forever */
455 cdtime_t now = cdtime();
456 cdtime_t safety_period = MS_TO_CDTIME_T(1500);
457 CDTIME_T_TO_TIMESPEC(now + safety_period + g_configuration->interval * 2, &ts);
458 int ret = sem_timedwait(&g_configuration->sema_helper_get_stats, &ts);
460 if (ret == -1 && errno == ETIMEDOUT) {
461 ERROR("dpdkstat-helper: sem timedwait()"
462 " timeout, did collectd terminate?");
463 dpdk_helper_stop(RESET);
465 /* Parent PID change means collectd died so quit the helper process. */
466 if (ppid != getppid()) {
467 WARNING("dpdkstat-helper: parent PID changed, quitting.");
468 dpdk_helper_stop(RESET);
471 /* Checking for DPDK primary process. */
472 if (!rte_eal_primary_proc_alive(g_configuration->file_prefix)) {
473 if (g_configuration->eal_initialized) {
474 WARNING("dpdkstat-helper: no primary alive but EAL initialized:"
476 dpdk_helper_stop(RESET);
478 g_configuration->helper_status = DPDK_HELPER_WAITING_ON_PRIMARY;
479 /* Back to start of while() - waiting for primary process */
483 if (!g_configuration->eal_initialized) {
484 /* Initialize EAL. */
485 int ret = dpdk_helper_init_eal();
487 WARNING("ERROR INITIALIZING EAL");
488 dpdk_helper_stop(RESET);
492 g_configuration->helper_status = DPDK_HELPER_ALIVE_SENDING_STATS;
494 uint8_t nb_ports = rte_eth_dev_count();
496 DEBUG("dpdkstat-helper: No DPDK ports available. "
497 "Check bound devices to DPDK driver.");
498 dpdk_helper_stop(RESET);
501 if (nb_ports > RTE_MAX_ETHPORTS)
502 nb_ports = RTE_MAX_ETHPORTS;
504 int len = 0, enabled_port_count = 0, num_xstats = 0;
505 for (uint8_t i = 0; i < nb_ports; i++) {
506 if (!(g_configuration->enabled_port_mask & (1 << i)))
509 if (g_configuration->helper_action == DPDK_HELPER_ACTION_COUNT_STATS) {
510 len = rte_eth_xstats_get(i, NULL, 0);
512 ERROR("dpdkstat-helper: Cannot get xstats count on port %"PRIu8, i);
516 g_configuration->num_stats_in_port[enabled_port_count] = len;
517 enabled_port_count++;
520 len = g_configuration->num_stats_in_port[enabled_port_count];
521 g_configuration->port_read_time[enabled_port_count] = cdtime();
522 ret = rte_eth_xstats_get(i, g_configuration->xstats + num_xstats,
523 g_configuration->num_stats_in_port[enabled_port_count]);
524 if (ret < 0 || ret != len) {
525 DEBUG("dpdkstat-helper: Error reading xstats on port %"PRIu8" len = %d",
529 num_xstats += g_configuration->num_stats_in_port[enabled_port_count];
530 enabled_port_count++;
532 } /* for (nb_ports) */
534 if (g_configuration->helper_action == DPDK_HELPER_ACTION_COUNT_STATS) {
535 g_configuration->num_ports = enabled_port_count;
536 g_configuration->num_xstats = num_xstats;
537 DEBUG("dpdkstat-helper ports: %"PRIu32", num stats: %"PRIu32,
538 g_configuration->num_ports, g_configuration->num_xstats);
539 /* Exit, allowing collectd to re-init SHM to the right size */
540 g_configuration->collectd_reinit_shm = REINIT_SHM;
541 dpdk_helper_stop(NO_RESET);
543 /* Now kick collectd send thread to send the stats */
544 int err = sem_post(&g_configuration->sema_stats_in_shm);
546 WARNING("dpdkstat: error posting semaphore to helper %s", sstrerror(errno,
547 errbuf, sizeof (errbuf)));
548 dpdk_helper_stop(RESET);
555 static void dpdk_submit_xstats(const char* dev_name,
556 const struct rte_eth_xstats *xstats, uint32_t counters, cdtime_t port_read_time)
558 for (uint32_t j = 0; j < counters; j++) {
559 value_list_t dpdkstat_vl = VALUE_LIST_INIT;
562 dpdkstat_vl.values = &(value_t) { .derive = (derive_t) xstats[j].value };
563 dpdkstat_vl.values_len = 1; /* Submit stats one at a time */
564 dpdkstat_vl.time = port_read_time;
565 sstrncpy(dpdkstat_vl.host, hostname_g, sizeof (dpdkstat_vl.host));
566 sstrncpy(dpdkstat_vl.plugin, "dpdkstat", sizeof (dpdkstat_vl.plugin));
567 sstrncpy(dpdkstat_vl.plugin_instance, dev_name,
568 sizeof (dpdkstat_vl.plugin_instance));
570 type_end = strrchr(xstats[j].name, '_');
572 if ((type_end != NULL) &&
573 (strncmp(xstats[j].name, "rx_", strlen("rx_")) == 0)) {
574 if (strncmp(type_end, "_errors", strlen("_errors")) == 0) {
575 sstrncpy(dpdkstat_vl.type, "if_rx_errors",
576 sizeof (dpdkstat_vl.type));
577 } else if (strncmp(type_end, "_dropped", strlen("_dropped")) == 0) {
578 sstrncpy(dpdkstat_vl.type, "if_rx_dropped",
579 sizeof (dpdkstat_vl.type));
580 } else if (strncmp(type_end, "_bytes", strlen("_bytes")) == 0) {
581 sstrncpy(dpdkstat_vl.type, "if_rx_octets",
582 sizeof (dpdkstat_vl.type));
583 } else if (strncmp(type_end, "_packets", strlen("_packets")) == 0) {
584 sstrncpy(dpdkstat_vl.type, "if_rx_packets",
585 sizeof (dpdkstat_vl.type));
586 } else if (strncmp(type_end, "_placement", strlen("_placement")) == 0) {
587 sstrncpy(dpdkstat_vl.type, "if_rx_errors",
588 sizeof (dpdkstat_vl.type));
589 } else if (strncmp(type_end, "_buff", strlen("_buff")) == 0) {
590 sstrncpy(dpdkstat_vl.type, "if_rx_errors",
591 sizeof (dpdkstat_vl.type));
593 /* Does not fit obvious type: use a more generic one */
594 sstrncpy(dpdkstat_vl.type, "derive",
595 sizeof (dpdkstat_vl.type));
598 } else if ((type_end != NULL) &&
599 (strncmp(xstats[j].name, "tx_", strlen("tx_"))) == 0) {
600 if (strncmp(type_end, "_errors", strlen("_errors")) == 0) {
601 sstrncpy(dpdkstat_vl.type, "if_tx_errors",
602 sizeof (dpdkstat_vl.type));
603 } else if (strncmp(type_end, "_dropped", strlen("_dropped")) == 0) {
604 sstrncpy(dpdkstat_vl.type, "if_tx_dropped",
605 sizeof (dpdkstat_vl.type));
606 } else if (strncmp(type_end, "_bytes", strlen("_bytes")) == 0) {
607 sstrncpy(dpdkstat_vl.type, "if_tx_octets",
608 sizeof (dpdkstat_vl.type));
609 } else if (strncmp(type_end, "_packets", strlen("_packets")) == 0) {
610 sstrncpy(dpdkstat_vl.type, "if_tx_packets",
611 sizeof (dpdkstat_vl.type));
613 /* Does not fit obvious type: use a more generic one */
614 sstrncpy(dpdkstat_vl.type, "derive",
615 sizeof (dpdkstat_vl.type));
617 } else if ((type_end != NULL) &&
618 (strncmp(xstats[j].name, "flow_", strlen("flow_"))) == 0) {
620 if (strncmp(type_end, "_filters", strlen("_filters")) == 0) {
621 sstrncpy(dpdkstat_vl.type, "operations",
622 sizeof (dpdkstat_vl.type));
623 } else if (strncmp(type_end, "_errors", strlen("_errors")) == 0) {
624 sstrncpy(dpdkstat_vl.type, "errors",
625 sizeof (dpdkstat_vl.type));
626 } else if (strncmp(type_end, "_filters", strlen("_filters")) == 0) {
627 sstrncpy(dpdkstat_vl.type, "filter_result",
628 sizeof (dpdkstat_vl.type));
630 } else if ((type_end != NULL) &&
631 (strncmp(xstats[j].name, "mac_", strlen("mac_"))) == 0) {
632 if (strncmp(type_end, "_errors", strlen("_errors")) == 0) {
633 sstrncpy(dpdkstat_vl.type, "errors",
634 sizeof (dpdkstat_vl.type));
637 /* Does not fit obvious type, or strrchr error:
638 * use a more generic type */
639 sstrncpy(dpdkstat_vl.type, "derive",
640 sizeof (dpdkstat_vl.type));
643 sstrncpy(dpdkstat_vl.type_instance, xstats[j].name,
644 sizeof (dpdkstat_vl.type_instance));
645 plugin_dispatch_values(&dpdkstat_vl);
649 static int dpdk_read(user_data_t *ud)
654 * Check if SHM flag is set to be re-initialized. AKA DPDK ports have been
655 * counted, so re-init SHM to be large enough to fit all the statistics.
657 if (g_configuration->collectd_reinit_shm) {
658 DEBUG("dpdkstat: read() now reinit SHM then launching send-thread");
663 * Check if DPDK proc is alive, and has already counted port / stats. This
664 * must be done in dpdk_read(), because the DPDK primary process may not be
665 * alive at dpdk_init() time.
667 if (g_configuration->helper_status == DPDK_HELPER_NOT_INITIALIZED ||
668 g_configuration->helper_status == DPDK_HELPER_GRACEFUL_QUIT) {
669 int action = DPDK_HELPER_ACTION_SEND_STATS;
670 if(g_configuration->num_xstats == 0)
671 action = DPDK_HELPER_ACTION_COUNT_STATS;
672 /* Spawn the helper thread to count stats or to read stats. */
673 int err = dpdk_helper_spawn(action);
675 char errbuf[ERR_BUF_SIZE];
676 ERROR("dpdkstat: error spawning helper %s", sstrerror(errno, errbuf,
682 pid_t ws = waitpid(g_configuration->helper_pid, NULL, WNOHANG);
684 * Conditions under which to respawn helper:
685 * waitpid() fails, helper process died (or quit), so respawn
687 _Bool respawn_helper = 0;
692 char buf[DPDKSTAT_MAX_BUFFER_SIZE];
693 char out[DPDKSTAT_MAX_BUFFER_SIZE];
695 /* non blocking check on helper logging pipe */
696 struct pollfd fds = {
697 .fd = g_configuration->helper_pipes[0],
700 int data_avail = poll(&fds, 1, 0);
701 if (data_avail < 0) {
702 char errbuf[ERR_BUF_SIZE];
703 if (errno != EINTR || errno != EAGAIN)
704 ERROR("dpdkstats: poll(2) failed: %s",
705 sstrerror(errno, errbuf, sizeof (errbuf)));
708 int nbytes = read(g_configuration->helper_pipes[0], buf, sizeof(buf));
711 ssnprintf(out, nbytes, "%s", buf);
712 DEBUG("dpdkstat: helper-proc: %s", out);
715 if (respawn_helper) {
716 if (g_configuration->helper_pid)
717 dpdk_helper_stop(RESET);
718 dpdk_helper_spawn(DPDK_HELPER_ACTION_COUNT_STATS);
721 /* Kick helper process through SHM */
722 sem_post(&g_configuration->sema_helper_get_stats);
725 cdtime_t now = cdtime();
726 CDTIME_T_TO_TIMESPEC(now + g_configuration->interval, &ts);
727 ret = sem_timedwait(&g_configuration->sema_stats_in_shm, &ts);
729 if (errno == ETIMEDOUT)
730 DEBUG("dpdkstat: timeout in collectd thread: is a DPDK Primary running? ");
734 /* Dispatch the stats.*/
735 uint32_t count = 0, port_num = 0;
737 for (uint32_t i = 0; i < g_configuration->num_ports; i++) {
739 cdtime_t port_read_time = g_configuration->port_read_time[i];
740 uint32_t counters_num = g_configuration->num_stats_in_port[i];
741 size_t ports_max = CHAR_BIT * sizeof (g_configuration->enabled_port_mask);
742 for (size_t j = port_num; j < ports_max; j++) {
743 if ((g_configuration->enabled_port_mask & (1 << j)) != 0)
748 if (g_configuration->port_name[i][0] != 0)
749 ssnprintf(dev_name, sizeof(dev_name), "%s", g_configuration->port_name[i]);
751 ssnprintf(dev_name, sizeof(dev_name), "port.%"PRIu32, port_num);
752 struct rte_eth_xstats *xstats = g_configuration->xstats + count;
754 dpdk_submit_xstats(dev_name, xstats, counters_num, port_read_time);
755 count += counters_num;
757 } /* for each port */
761 static int dpdk_shm_cleanup(void)
763 int ret = munmap(g_configuration, sizeof(dpdk_config_t));
766 ERROR("dpdkstat: munmap returned %d", ret);
769 ret = shm_unlink(DPDK_SHM_NAME);
771 ERROR("dpdkstat: shm_unlink returned %d", ret);
777 static int dpdk_shutdown(void)
780 char errbuf[ERR_BUF_SIZE];
781 close(g_configuration->helper_pipes[1]);
782 int err = kill(g_configuration->helper_pid, SIGKILL);
784 ERROR("dpdkstat: error sending sigkill to helper %s", sstrerror(errno, errbuf,
788 err = dpdk_shm_cleanup();
790 ERROR("dpdkstat: error cleaning up SHM: %s", sstrerror(errno, errbuf,
798 void module_register(void)
800 plugin_register_complex_config("dpdkstat", dpdk_config);
801 plugin_register_init("dpdkstat", dpdk_init);
802 plugin_register_complex_read(NULL, "dpdkstat", dpdk_read, 0, NULL);
803 plugin_register_shutdown("dpdkstat", dpdk_shutdown);