--- /dev/null
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim: fileencoding=utf-8
+#
+# Copyright © 2009 Adrian Perez <aperez@igalia.com>
+#
+# Distributed under terms of the GPLv2 license.
+
+"""
+Collectd network protocol implementation.
+"""
+
+import socket
+import struct
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+from datetime import datetime
+from copy import deepcopy
+
+
+DEFAULT_PORT = 25826
+"""Default port"""
+
+DEFAULT_IPv4_GROUP = "239.192.74.66"
+"""Default IPv4 multicast group"""
+
+DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
+"""Default IPv6 multicast group"""
+
+
+
+# Message kinds
+TYPE_HOST = 0x0000
+TYPE_TIME = 0x0001
+TYPE_PLUGIN = 0x0002
+TYPE_PLUGIN_INSTANCE = 0x0003
+TYPE_TYPE = 0x0004
+TYPE_TYPE_INSTANCE = 0x0005
+TYPE_VALUES = 0x0006
+TYPE_INTERVAL = 0x0007
+
+# For notifications
+TYPE_MESSAGE = 0x0100
+TYPE_SEVERITY = 0x0101
+
+# DS kinds
+DS_TYPE_COUNTER = 0
+DS_TYPE_GAUGE = 1
+
+
+header = struct.Struct("!2H")
+number = struct.Struct("!Q")
+short = struct.Struct("!H")
+double = struct.Struct("<d")
+
+
+def decode_network_values(ptype, plen, buf):
+ """Decodes a list of DS values in collectd network format
+ """
+ nvalues = short.unpack_from(buf, header.size)[0]
+ off = header.size + short.size + nvalues
+ valskip = double.size
+
+ # Check whether our expected packet size is the reported one
+ assert ((valskip + 1) * nvalues + short.size + header.size) == plen
+ assert double.size == number.size
+
+ result = []
+ for dstype in map(ord, buf[header.size+short.size:off]):
+ if dstype == DS_TYPE_COUNTER:
+ result.append((dstype, number.unpack_from(buf, off)[0]))
+ off += valskip
+ elif dstype == DS_TYPE_GAUGE:
+ result.append((dstype, double.unpack_from(buf, off)[0]))
+ off += valskip
+ else:
+ raise ValueError("DS type %i unsupported" % dstype)
+
+ return result
+
+
+def decode_network_number(ptype, plen, buf):
+ """Decodes a number (64-bit unsigned) in collectd network format.
+ """
+ return number.unpack_from(buf, header.size)[0]
+
+
+def decode_network_string(msgtype, plen, buf):
+ """Decodes a floating point number (64-bit) in collectd network format.
+ """
+ return buf[header.size:plen-1]
+
+
+# Mapping of message types to decoding functions.
+_decoders = {
+ TYPE_VALUES : decode_network_values,
+ TYPE_TIME : decode_network_number,
+ TYPE_INTERVAL : decode_network_number,
+ TYPE_HOST : decode_network_string,
+ TYPE_PLUGIN : decode_network_string,
+ TYPE_PLUGIN_INSTANCE: decode_network_string,
+ TYPE_TYPE : decode_network_string,
+ TYPE_TYPE_INSTANCE : decode_network_string,
+ TYPE_MESSAGE : decode_network_string,
+ TYPE_SEVERITY : decode_network_number,
+}
+
+
+def decode_network_packet(buf):
+ """Decodes a network packet in collectd format.
+ """
+ off = 0
+ blen = len(buf)
+ while off < blen:
+ ptype, plen = header.unpack_from(buf, off)
+
+ if plen > blen - off:
+ raise ValueError("Packet longer than amount of data in buffer")
+
+ if ptype not in _decoders:
+ raise ValueError("Message type %i not recognized" % ptype)
+
+ yield ptype, _decoders[ptype](ptype, plen, buf[off:])
+ off += plen
+
+
+
+
+
+class Data(object):
+ time = 0
+ host = None
+ plugin = None
+ plugininstance = None
+ type = None
+ typeinstance = None
+
+ def __init__(self, **kw):
+ [setattr(self, k, v) for k, v in kw.iteritems()]
+
+ @property
+ def datetime(self):
+ return datetime.fromtimestamp(self.time)
+
+ @property
+ def source(self):
+ buf = StringIO()
+ if self.host:
+ buf.write(self.host)
+ if self.plugin:
+ buf.write("/")
+ buf.write(self.plugin)
+ if self.plugininstance:
+ buf.write("/")
+ buf.write(self.plugininstance)
+ if self.type:
+ buf.write("/")
+ buf.write(self.type)
+ if self.typeinstance:
+ buf.write("/")
+ buf.write(self.typeinstance)
+ return buf.getvalue()
+
+ def __str__(self):
+ return "[%i] %s" % (self.time, self.source)
+
+
+
+class Notification(Data):
+ FAILURE = 1
+ WARNING = 2
+ OKAY = 4
+
+ SEVERITY = {
+ FAILURE: "FAILURE",
+ WARNING: "WARNING",
+ OKAY : "OKAY",
+ }
+
+ __severity = 0
+ message = ""
+
+ def __set_severity(self, value):
+ if value in (self.FAILURE, self.WARNING, self.OKAY):
+ self.__severity = value
+
+ severity = property(lambda self: self.__severity, __set_severity)
+
+ @property
+ def severitystring(self):
+ return self.SEVERITY.get(self.severity, "UNKNOWN")
+
+ def __str__(self):
+ return "%s [%s] %s" % (
+ super(Notification, self).__str__(),
+ self.severitystring,
+ self.message)
+
+
+
+class Values(Data, list):
+ def __str__(self):
+ return "%s %s" % (Data.__str__(self), list.__str__(self))
+
+
+
+def interpret_opcodes(iterable):
+ vl = Values()
+ nt = Notification()
+
+ for kind, data in iterable:
+ if kind == TYPE_TIME:
+ vl.time = nt.time = data
+ elif kind == TYPE_INTERVAL:
+ vl.interval = data
+ elif kind == TYPE_HOST:
+ vl.host = nt.host = data
+ elif kind == TYPE_PLUGIN:
+ vl.plugin = nt.plugin = data
+ elif kind == TYPE_PLUGIN_INSTANCE:
+ vl.plugininstance = nt.plugininstance = data
+ elif kind == TYPE_TYPE:
+ vl.type = nt.type = data
+ elif kind == TYPE_TYPE_INSTANCE:
+ vl.typeinstance = nt.typeinstance = data
+ elif kind == TYPE_SEVERITY:
+ nt.severity = data
+ elif kind == TYPE_MESSAGE:
+ nt.message = data
+ yield deepcopy(nt)
+ elif kind == TYPE_VALUES:
+ vl[:] = data
+ yield deepcopy(vl)
+
+
+
+class Reader(object):
+ """Network reader for collectd data.
+
+ Listens on the network in a given address, which can be a multicast
+ group address, and handles reading data when it arrives.
+ """
+ addr = None
+ host = None
+ port = DEFAULT_PORT
+
+ BUFFER_SIZE = 1024
+
+
+ def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
+ if host is None:
+ multicast = True
+ host = DEFAULT_IPv4_GROUP
+
+ self.host, self.port = host, port
+ self.ipv6 = ":" in self.host
+
+ family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
+ None if multicast else self.host, self.port,
+ socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
+ socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
+
+ self._sock = socket.socket(family, socktype, proto)
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self._sock.bind(sockaddr)
+
+ if multicast:
+ if hasattr(socket, "SO_REUSEPORT"):
+ self._sock.setsockopt(
+ socket.SOL_SOCKET,
+ socket.SO_REUSEPORT, 1)
+
+ val = None
+ if family == socket.AF_INET:
+ assert "." in self.host
+ val = struct.pack("4sl",
+ socket.inet_aton(self.host), socket.INADDR_ANY)
+ elif family == socket.AF_INET6:
+ raise NotImplementedError("IPv6 support not ready yet")
+ else:
+ raise ValueError("Unsupported network address family")
+
+ self._sock.setsockopt(
+ socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
+ socket.IP_ADD_MEMBERSHIP, val)
+ self._sock.setsockopt(
+ socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
+ socket.IP_MULTICAST_LOOP, 0)
+
+
+ def receive(self):
+ """Receives a single raw collect network packet.
+ """
+ return self._sock.recv(self.BUFFER_SIZE)
+
+
+ def decode(self, buf=None):
+ """Decodes a given buffer or the next received packet.
+ """
+ if buf is None:
+ buf = self.receive()
+ return decode_network_packet(buf)
+
+
+ def interpret(self, iterable=None):
+ """Interprets a sequence
+ """
+ if iterable is None:
+ iterable = self.decode()
+ if isinstance(iterable, basestring):
+ iterable = self.decode(iterable)
+ return interpret_opcodes(iterable)
+
+