* Copyright (C) 2009 Doug MacEachern
* Copyright (C) 2009 Franck Lombardi
* Copyright (C) 2012 Nicolas Szalay
+ * Copyright (C) 2017 Pavel Rochnyak
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Doug MacEachern <dougm at hyperic.com>
* Franck Lombardi
* Nicolas Szalay
+ * Pavel Rochnyak <pavel2000 ngs.ru>
**/
#include "collectd.h"
#include <netinet/tcp.h>
#include <sys/un.h>
+#include <poll.h>
+
#define MEMCACHED_DEF_HOST "127.0.0.1"
#define MEMCACHED_DEF_PORT "11211"
+#define MEMCACHED_CONNECT_TIMEOUT 10000
+#define MEMCACHED_IO_TIMEOUT 5000
struct memcached_s {
char *name;
char *socket;
char *connhost;
char *connport;
+ int fd;
};
typedef struct memcached_s memcached_t;
if (st == NULL)
return;
+ if (st->fd >= 0) {
+ shutdown(st->fd, SHUT_RDWR);
+ close(st->fd);
+ }
+
sfree(st->name);
sfree(st->host);
sfree(st->socket);
if (status != 0) {
shutdown(fd, SHUT_RDWR);
close(fd);
- fd = -1;
+ return -1;
+ }
+
+ /* switch to non-blocking mode */
+ int flags = fcntl(fd, F_GETFL);
+ status = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ if (status != 0) {
+ close(fd);
+ return -1;
}
return fd;
continue;
}
+ /* switch socket to non-blocking mode */
+ int flags = fcntl(fd, F_GETFL);
+ status = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ if (status != 0) {
+ close(fd);
+ fd = -1;
+ continue;
+ }
+
/* connect to the memcached daemon */
status = (int)connect(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
- if (status != 0) {
+ if (status != 0 && errno != EINPROGRESS) {
shutdown(fd, SHUT_RDWR);
close(fd);
fd = -1;
continue;
}
- /* A socket could be opened and connecting succeeded. We're done. */
+ /* Wait until connection establishes */
+ struct pollfd pollfd;
+ pollfd.fd = fd;
+ pollfd.events = POLLOUT;
+ do
+ status = poll(&pollfd, 1, MEMCACHED_CONNECT_TIMEOUT);
+ while (status < 0 && errno == EINTR);
+ if (status <= 0) {
+ close(fd);
+ fd = -1;
+ continue;
+ }
+
+ /* Check if all is good */
+ int socket_error;
+ socklen_t socket_error_len = sizeof(socket_error);
+ status = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&socket_error,
+ &socket_error_len);
+ if (status != 0 || socket_error != 0) {
+ close(fd);
+ fd = -1;
+ continue;
+ }
+ /* A socket is opened and connection succeeded. We're done. */
break;
}
return fd;
} /* int memcached_connect_inet */
-static int memcached_connect(memcached_t *st) {
+static void memcached_connect(memcached_t *st) {
+ if (st->fd >= 0)
+ return;
+
if (st->socket != NULL)
- return memcached_connect_unix(st);
+ st->fd = memcached_connect_unix(st);
else
- return memcached_connect_inet(st);
+ st->fd = memcached_connect_inet(st);
+
+ if (st->fd >= 0)
+ INFO("memcached plugin: Instance \"%s\": connection established.",
+ st->name);
}
static int memcached_query_daemon(char *buffer, size_t buffer_size,
memcached_t *st) {
- int fd, status;
+ int status;
size_t buffer_fill;
- fd = memcached_connect(st);
- if (fd < 0) {
+ memcached_connect(st);
+ if (st->fd < 0) {
ERROR("memcached plugin: Instance \"%s\" could not connect to daemon.",
st->name);
return -1;
}
- status = (int)swrite(fd, "stats\r\n", strlen("stats\r\n"));
+ struct pollfd pollfd;
+ pollfd.fd = st->fd;
+ pollfd.events = POLLOUT;
+
+ do
+ status = poll(&pollfd, 1, MEMCACHED_IO_TIMEOUT);
+ while (status < 0 && errno == EINTR);
+
+ if (status <= 0) {
+ ERROR("memcached plugin: poll() failed for write() call.");
+ close(st->fd);
+ st->fd = -1;
+ return -1;
+ }
+
+ status = (int)swrite(st->fd, "stats\r\n", strlen("stats\r\n"));
if (status != 0) {
char errbuf[1024];
- ERROR("memcached plugin: write(2) failed: %s",
+ ERROR("memcached plugin: Instance \"%s\": write(2) failed: %s", st->name,
sstrerror(errno, errbuf, sizeof(errbuf)));
- shutdown(fd, SHUT_RDWR);
- close(fd);
+ shutdown(st->fd, SHUT_RDWR);
+ close(st->fd);
+ st->fd = -1;
return -1;
}
memset(buffer, 0, buffer_size);
buffer_fill = 0;
- while ((status = (int)recv(fd, buffer + buffer_fill,
- buffer_size - buffer_fill, /* flags = */ 0)) !=
- 0) {
+ pollfd.events = POLLIN;
+ while (1) {
+ do
+ status = poll(&pollfd, 1, MEMCACHED_IO_TIMEOUT);
+ while (status < 0 && errno == EINTR);
+
+ if (status <= 0) {
+ ERROR("memcached plugin: Instance \"%s\": Timeout reading from socket",
+ st->name);
+ close(st->fd);
+ st->fd = -1;
+ return -1;
+ }
+
+ do
+ status = (int)recv(st->fd, buffer + buffer_fill,
+ buffer_size - buffer_fill, /* flags = */ 0);
+ while (status < 0 && errno == EINTR);
+
char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
if (status < 0) {
char errbuf[1024];
- if ((errno == EAGAIN) || (errno == EINTR))
+ if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
continue;
- ERROR("memcached: Error reading from socket: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- shutdown(fd, SHUT_RDWR);
- close(fd);
+ ERROR("memcached plugin: Instance \"%s\": Error reading from socket: %s",
+ st->name, sstrerror(errno, errbuf, sizeof(errbuf)));
+ shutdown(st->fd, SHUT_RDWR);
+ close(st->fd);
+ st->fd = -1;
return -1;
}
buffer_fill += (size_t)status;
if (buffer_fill > buffer_size) {
buffer_fill = buffer_size;
- WARNING("memcached plugin: Message was truncated.");
+ WARNING("memcached plugin: Instance \"%s\": Message was truncated.",
+ st->name);
+ shutdown(st->fd, SHUT_RDWR);
+ close(st->fd);
+ st->fd = -1;
break;
}
status = 0;
if (buffer_fill == 0) {
- WARNING("memcached plugin: No data returned by memcached.");
+ WARNING("memcached plugin: Instance \"%s\": No data returned by memcached.",
+ st->name);
status = -1;
}
- shutdown(fd, SHUT_RDWR);
- close(fd);
return status;
} /* int memcached_query_daemon */
st->connhost = NULL;
st->connport = NULL;
+ st->fd = -1;
+
if (strcasecmp(ci->key, "Instance") == 0)
status = cf_util_get_string(ci, &st->name);
st->connhost = NULL;
st->connport = NULL;
+ st->fd = -1;
+
status = memcached_add_read_callback(st);
if (status == 0)
memcached_have_instances = 1;