2 # -*- coding: utf-8 -*-
3 # vim: fileencoding=utf-8
5 # Copyright © 2009 Adrian Perez <aperez@igalia.com>
7 # Distributed under terms of the GPLv2 license.
10 Collectd network protocol implementation.
17 from cStringIO import StringIO
19 from StringIO import StringIO
21 from datetime import datetime
22 from copy import deepcopy
28 DEFAULT_IPv4_GROUP = "239.192.74.66"
29 """Default IPv4 multicast group"""
31 DEFAULT_IPv6_GROUP = "ff18::efc0:4a42"
32 """Default IPv6 multicast group"""
40 TYPE_PLUGIN_INSTANCE = 0x0003
42 TYPE_TYPE_INSTANCE = 0x0005
44 TYPE_INTERVAL = 0x0007
48 TYPE_SEVERITY = 0x0101
55 header = struct.Struct("!2H")
56 number = struct.Struct("!Q")
57 short = struct.Struct("!H")
58 double = struct.Struct("<d")
61 def decode_network_values(ptype, plen, buf):
62 """Decodes a list of DS values in collectd network format
64 nvalues = short.unpack_from(buf, header.size)[0]
65 off = header.size + short.size + nvalues
68 # Check whether our expected packet size is the reported one
69 assert ((valskip + 1) * nvalues + short.size + header.size) == plen
70 assert double.size == number.size
73 for dstype in map(ord, buf[header.size+short.size:off]):
74 if dstype == DS_TYPE_COUNTER:
75 result.append((dstype, number.unpack_from(buf, off)[0]))
77 elif dstype == DS_TYPE_GAUGE:
78 result.append((dstype, double.unpack_from(buf, off)[0]))
81 raise ValueError("DS type %i unsupported" % dstype)
86 def decode_network_number(ptype, plen, buf):
87 """Decodes a number (64-bit unsigned) in collectd network format.
89 return number.unpack_from(buf, header.size)[0]
92 def decode_network_string(msgtype, plen, buf):
93 """Decodes a floating point number (64-bit) in collectd network format.
95 return buf[header.size:plen-1]
98 # Mapping of message types to decoding functions.
100 TYPE_VALUES : decode_network_values,
101 TYPE_TIME : decode_network_number,
102 TYPE_INTERVAL : decode_network_number,
103 TYPE_HOST : decode_network_string,
104 TYPE_PLUGIN : decode_network_string,
105 TYPE_PLUGIN_INSTANCE: decode_network_string,
106 TYPE_TYPE : decode_network_string,
107 TYPE_TYPE_INSTANCE : decode_network_string,
108 TYPE_MESSAGE : decode_network_string,
109 TYPE_SEVERITY : decode_network_number,
113 def decode_network_packet(buf):
114 """Decodes a network packet in collectd format.
119 ptype, plen = header.unpack_from(buf, off)
121 if plen > blen - off:
122 raise ValueError("Packet longer than amount of data in buffer")
124 if ptype not in _decoders:
125 raise ValueError("Message type %i not recognized" % ptype)
127 yield ptype, _decoders[ptype](ptype, plen, buf[off:])
138 plugininstance = None
142 def __init__(self, **kw):
143 [setattr(self, k, v) for k, v in kw.iteritems()]
147 return datetime.fromtimestamp(self.time)
156 buf.write(self.plugin)
157 if self.plugininstance:
159 buf.write(self.plugininstance)
163 if self.typeinstance:
165 buf.write(self.typeinstance)
166 return buf.getvalue()
169 return "[%i] %s" % (self.time, self.source)
173 class Notification(Data):
187 def __set_severity(self, value):
188 if value in (self.FAILURE, self.WARNING, self.OKAY):
189 self.__severity = value
191 severity = property(lambda self: self.__severity, __set_severity)
194 def severitystring(self):
195 return self.SEVERITY.get(self.severity, "UNKNOWN")
198 return "%s [%s] %s" % (
199 super(Notification, self).__str__(),
205 class Values(Data, list):
207 return "%s %s" % (Data.__str__(self), list.__str__(self))
211 def interpret_opcodes(iterable):
215 for kind, data in iterable:
216 if kind == TYPE_TIME:
217 vl.time = nt.time = data
218 elif kind == TYPE_INTERVAL:
220 elif kind == TYPE_HOST:
221 vl.host = nt.host = data
222 elif kind == TYPE_PLUGIN:
223 vl.plugin = nt.plugin = data
224 elif kind == TYPE_PLUGIN_INSTANCE:
225 vl.plugininstance = nt.plugininstance = data
226 elif kind == TYPE_TYPE:
227 vl.type = nt.type = data
228 elif kind == TYPE_TYPE_INSTANCE:
229 vl.typeinstance = nt.typeinstance = data
230 elif kind == TYPE_SEVERITY:
232 elif kind == TYPE_MESSAGE:
235 elif kind == TYPE_VALUES:
241 class Reader(object):
242 """Network reader for collectd data.
244 Listens on the network in a given address, which can be a multicast
245 group address, and handles reading data when it arrives.
254 def __init__(self, host=None, port=DEFAULT_PORT, multicast=False):
257 host = DEFAULT_IPv4_GROUP
259 self.host, self.port = host, port
260 self.ipv6 = ":" in self.host
262 family, socktype, proto, canonname, sockaddr = socket.getaddrinfo(
263 None if multicast else self.host, self.port,
264 socket.AF_INET6 if self.ipv6 else socket.AF_UNSPEC,
265 socket.SOCK_DGRAM, 0, socket.AI_PASSIVE)[0]
267 self._sock = socket.socket(family, socktype, proto)
268 self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
269 self._sock.bind(sockaddr)
272 if hasattr(socket, "SO_REUSEPORT"):
273 self._sock.setsockopt(
275 socket.SO_REUSEPORT, 1)
278 if family == socket.AF_INET:
279 assert "." in self.host
280 val = struct.pack("4sl",
281 socket.inet_aton(self.host), socket.INADDR_ANY)
282 elif family == socket.AF_INET6:
283 raise NotImplementedError("IPv6 support not ready yet")
285 raise ValueError("Unsupported network address family")
287 self._sock.setsockopt(
288 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
289 socket.IP_ADD_MEMBERSHIP, val)
290 self._sock.setsockopt(
291 socket.IPPROTO_IPV6 if self.ipv6 else socket.IPPROTO_IP,
292 socket.IP_MULTICAST_LOOP, 0)
296 """Receives a single raw collect network packet.
298 return self._sock.recv(self.BUFFER_SIZE)
301 def decode(self, buf=None):
302 """Decodes a given buffer or the next received packet.
306 return decode_network_packet(buf)
309 def interpret(self, iterable=None):
310 """Interprets a sequence
313 iterable = self.decode()
314 if isinstance(iterable, basestring):
315 iterable = self.decode(iterable)
316 return interpret_opcodes(iterable)