perl plugin, Collectd.pm: Added support for flushing of specific identifiers.
[collectd.git] / bindings / perl / Collectd.pm
1 # collectd - Collectd.pm
2 # Copyright (C) 2007, 2008  Sebastian Harl
3 #
4 # This program is free software; you can redistribute it and/or modify it
5 # under the terms of the GNU General Public License as published by the
6 # Free Software Foundation; only version 2 of the License is applicable.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 # General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
16 #
17 # Author:
18 #   Sebastian Harl <sh at tokkee.org>
19
20 package Collectd;
21
22 use strict;
23 use warnings;
24
25 use Config;
26
27 use threads;
28 use threads::shared;
29
30 BEGIN {
31         if (! $Config{'useithreads'}) {
32                 die "Perl does not support ithreads!";
33         }
34 }
35
36 require Exporter;
37
38 our @ISA = qw( Exporter );
39
40 our %EXPORT_TAGS = (
41         'plugin' => [ qw(
42                         plugin_register
43                         plugin_unregister
44                         plugin_dispatch_values
45                         plugin_flush
46                         plugin_flush_one
47                         plugin_flush_all
48                         plugin_dispatch_notification
49                         plugin_log
50         ) ],
51         'types' => [ qw(
52                         TYPE_INIT
53                         TYPE_READ
54                         TYPE_WRITE
55                         TYPE_SHUTDOWN
56                         TYPE_LOG
57                         TYPE_NOTIF
58                         TYPE_FLUSH
59                         TYPE_DATASET
60         ) ],
61         'ds_types' => [ qw(
62                         DS_TYPE_COUNTER
63                         DS_TYPE_GAUGE
64         ) ],
65         'log' => [ qw(
66                         ERROR
67                         WARNING
68                         NOTICE
69                         INFO
70                         DEBUG
71                         LOG_ERR
72                         LOG_WARNING
73                         LOG_NOTICE
74                         LOG_INFO
75                         LOG_DEBUG
76         ) ],
77         'notif' => [ qw(
78                         NOTIF_FAILURE
79                         NOTIF_WARNING
80                         NOTIF_OKAY
81         ) ],
82         'globals' => [ qw(
83                         $hostname_g
84                         $interval_g
85         ) ],
86 );
87
88 {
89         my %seen;
90         push @{$EXPORT_TAGS{'all'}}, grep {! $seen{$_}++ } @{$EXPORT_TAGS{$_}}
91                 foreach keys %EXPORT_TAGS;
92 }
93
94 # global variables
95 our $hostname_g;
96 our $interval_g;
97
98 Exporter::export_ok_tags ('all');
99
100 my @plugins : shared = ();
101
102 my %types = (
103         TYPE_INIT,     "init",
104         TYPE_READ,     "read",
105         TYPE_WRITE,    "write",
106         TYPE_SHUTDOWN, "shutdown",
107         TYPE_LOG,      "log",
108         TYPE_NOTIF,    "notify",
109         TYPE_FLUSH,    "flush"
110 );
111
112 foreach my $type (keys %types) {
113         $plugins[$type] = &share ({});
114 }
115
116 sub _log {
117         my $caller = shift;
118         my $lvl    = shift;
119         my $msg    = shift;
120
121         if ("Collectd" eq $caller) {
122                 $msg = "perl: $msg";
123         }
124         return plugin_log ($lvl, $msg);
125 }
126
127 sub ERROR   { _log (scalar caller, LOG_ERR,     shift); }
128 sub WARNING { _log (scalar caller, LOG_WARNING, shift); }
129 sub NOTICE  { _log (scalar caller, LOG_NOTICE,  shift); }
130 sub INFO    { _log (scalar caller, LOG_INFO,    shift); }
131 sub DEBUG   { _log (scalar caller, LOG_DEBUG,   shift); }
132
133 sub plugin_call_all {
134         my $type = shift;
135
136         my %plugins;
137
138         our $cb_name = undef;
139
140         if (! defined $type) {
141                 return;
142         }
143
144         if (TYPE_LOG != $type) {
145                 DEBUG ("Collectd::plugin_call: type = \"$type\", args=\"@_\"");
146         }
147
148         if (! defined $plugins[$type]) {
149                 ERROR ("Collectd::plugin_call: unknown type \"$type\"");
150                 return;
151         }
152
153         {
154                 lock %{$plugins[$type]};
155                 %plugins = %{$plugins[$type]};
156         }
157
158         foreach my $plugin (keys %plugins) {
159                 my $p = $plugins{$plugin};
160
161                 my $status = 0;
162
163                 if ($p->{'wait_left'} > 0) {
164                         $p->{'wait_left'} -= $interval_g;
165                 }
166
167                 next if ($p->{'wait_left'} > 0);
168
169                 $cb_name = $p->{'cb_name'};
170                 $status = call_by_name (@_);
171
172                 if (! $status) {
173                         my $err = undef;
174
175                         if ($@) {
176                                 $err = $@;
177                         }
178                         else {
179                                 $err = "callback returned false";
180                         }
181
182                         if (TYPE_LOG != $type) {
183                                 ERROR ("Execution of callback \"$cb_name\" failed: $err");
184                         }
185
186                         $status = 0;
187                 }
188
189                 if ($status) {
190                         $p->{'wait_left'} = 0;
191                         $p->{'wait_time'} = $interval_g;
192                 }
193                 elsif (TYPE_READ == $type) {
194                         if ($p->{'wait_time'} < $interval_g) {
195                                 $p->{'wait_time'} = $interval_g;
196                         }
197
198                         $p->{'wait_left'} = $p->{'wait_time'};
199                         $p->{'wait_time'} *= 2;
200
201                         if ($p->{'wait_time'} > 86400) {
202                                 $p->{'wait_time'} = 86400;
203                         }
204
205                         WARNING ("${plugin}->read() failed with status $status. "
206                                 . "Will suspend it for $p->{'wait_left'} seconds.");
207                 }
208                 elsif (TYPE_INIT == $type) {
209                         ERROR ("${plugin}->init() failed with status $status. "
210                                 . "Plugin will be disabled.");
211
212                         foreach my $type (keys %types) {
213                                 plugin_unregister ($type, $plugin);
214                         }
215                 }
216                 elsif (TYPE_LOG != $type) {
217                         WARNING ("${plugin}->$types{$type}() failed with status $status.");
218                 }
219         }
220         return 1;
221 }
222
223 # Collectd::plugin_register (type, name, data).
224 #
225 # type:
226 #   init, read, write, shutdown, data set
227 #
228 # name:
229 #   name of the plugin
230 #
231 # data:
232 #   reference to the plugin's subroutine that does the work or the data set
233 #   definition
234 sub plugin_register {
235         my $type = shift;
236         my $name = shift;
237         my $data = shift;
238
239         DEBUG ("Collectd::plugin_register: "
240                 . "type = \"$type\", name = \"$name\", data = \"$data\"");
241
242         if (! ((defined $type) && (defined $name) && (defined $data))) {
243                 ERROR ("Usage: Collectd::plugin_register (type, name, data)");
244                 return;
245         }
246
247         if ((! defined $plugins[$type]) && (TYPE_DATASET != $type)) {
248                 ERROR ("Collectd::plugin_register: Invalid type \"$type\"");
249                 return;
250         }
251
252         if ((TYPE_DATASET == $type) && ("ARRAY" eq ref $data)) {
253                 return plugin_register_data_set ($name, $data);
254         }
255         elsif ((TYPE_DATASET != $type) && (! ref $data)) {
256                 my $pkg = scalar caller;
257
258                 my %p : shared;
259
260                 if ($data !~ m/^$pkg\:\:/) {
261                         $data = $pkg . "::" . $data;
262                 }
263
264                 %p = (
265                         wait_time => $interval_g,
266                         wait_left => 0,
267                         cb_name   => $data,
268                 );
269
270                 lock %{$plugins[$type]};
271                 $plugins[$type]->{$name} = \%p;
272         }
273         else {
274                 ERROR ("Collectd::plugin_register: Invalid data.");
275                 return;
276         }
277         return 1;
278 }
279
280 sub plugin_unregister {
281         my $type = shift;
282         my $name = shift;
283
284         DEBUG ("Collectd::plugin_unregister: type = \"$type\", name = \"$name\"");
285
286         if (! ((defined $type) && (defined $name))) {
287                 ERROR ("Usage: Collectd::plugin_unregister (type, name)");
288                 return;
289         }
290
291         if (TYPE_DATASET == $type) {
292                 return plugin_unregister_data_set ($name);
293         }
294         elsif (defined $plugins[$type]) {
295                 lock %{$plugins[$type]};
296                 delete $plugins[$type]->{$name};
297         }
298         else {
299                 ERROR ("Collectd::plugin_unregister: Invalid type.");
300                 return;
301         }
302 }
303
304 sub plugin_flush {
305         my %args = @_;
306
307         my $timeout = -1;
308         my @plugins = ();
309         my @ids     = ();
310
311         DEBUG ("Collectd::plugin_flush:"
312                 . (defined ($args{'timeout'}) ? " timeout = $args{'timeout'}" : "")
313                 . (defined ($args{'plugins'}) ? " plugins = $args{'plugins'}" : "")
314                 . (defined ($args{'identifiers'})
315                         ? " identifiers = $args{'identifiers'}" : ""));
316
317         if (defined ($args{'timeout'}) && ($args{'timeout'} > 0)) {
318                 $timeout = $args{'timeout'};
319         }
320
321         if (defined ($args{'plugins'})) {
322                 if ("ARRAY" eq ref ($args{'plugins'})) {
323                         @plugins = @{$args{'plugins'}};
324                 }
325                 else {
326                         @plugins = ($args{'plugins'});
327                 }
328         }
329         else {
330                 @plugins = (undef);
331         }
332
333         if (defined ($args{'identifiers'})) {
334                 if ("ARRAY" eq ref ($args{'identifiers'})) {
335                         @ids = @{$args{'identifiers'}};
336                 }
337                 else {
338                         @ids = ($args{'identifiers'});
339                 }
340         }
341         else {
342                 @ids = (undef);
343         }
344
345         foreach my $plugin (@plugins) {
346                 foreach my $id (@ids) {
347                         _plugin_flush($plugin, $timeout, $id);
348                 }
349         }
350 }
351
352 sub plugin_flush_one {
353         my $timeout = shift;
354         my $name    = shift;
355
356         WARNING ("Collectd::plugin_flush_one is deprecated - "
357                 . "use Collectd::plugin_flush instead.");
358
359         if (! (defined ($timeout) && defined ($name))) {
360                 ERROR ("Usage: Collectd::plugin_flush_one(timeout, name)");
361                 return;
362         }
363
364         plugin_flush (plugins => $name, timeout => $timeout);
365 }
366
367 sub plugin_flush_all {
368         my $timeout = shift;
369
370         WARNING ("Collectd::plugin_flush_all is deprecated - "
371                 . "use Collectd::plugin_flush instead.");
372
373         if (! defined ($timeout)) {
374                 ERROR ("Usage: Collectd::plugin_flush_all(timeout)");
375                 return;
376         }
377
378         plugin_flush (timeout => $timeout);
379 }
380
381 1;
382
383 # vim: set sw=4 ts=4 tw=78 noexpandtab :
384