Merge branch 'collectd-5.6' into collectd-5.7
authorFlorian Forster <octo@collectd.org>
Thu, 21 Sep 2017 15:22:39 +0000 (17:22 +0200)
committerFlorian Forster <octo@collectd.org>
Thu, 21 Sep 2017 15:22:39 +0000 (17:22 +0200)
1  2 
contrib/redhat/collectd.spec
src/network.c
src/ntpd.c
src/perl.c

@@@ -73,7 -73,6 +73,7 @@@
  %define with_gmond 0%{!?_without_gmond:1}
  %define with_gps 0%{!?_without_gps:1}
  %define with_hddtemp 0%{!?_without_hddtemp:1}
 +%define with_hugepages 0%{!?_without_hugepages:1}
  %define with_interface 0%{!?_without_interface:1}
  %define with_ipc 0%{!?_without_ipc:1}
  %define with_ipmi 0%{!?_without_ipmi:1}
  %define with_write_graphite 0%{!?_without_write_graphite:1}
  %define with_write_http 0%{!?_without_write_http:1}
  %define with_write_log 0%{!?_without_write_log:1}
 +%define with_write_prometheus 0%{!?_without_write_prometheus:1}
  %define with_write_redis 0%{!?_without_write_redis:1}
  %define with_write_riemann 0%{!?_without_write_riemann:1}
  %define with_write_sensu 0%{!?_without_write_sensu:1}
  %define with_barometer 0%{!?_without_barometer:0}
  # plugin grpc disabled, requires protobuf-compiler >= 3.0
  %define with_grpc 0%{!?_without_grpc:0}
 +# plugin dpdkstat disabled, requires libdpdk
 +%define with_dpdkstat 0%{!?_without_dpdkstat:0}
  # plugin lpar disabled, requires AIX
  %define with_lpar 0%{!?_without_lpar:0}
 +# plugin intel_rdt disabled, requires intel-cmt-cat
 +%define with_intel_rdt 0%{!?_without_intel_rdt:0}
  # plugin mic disabled, requires Mic
  %define with_mic 0%{!?_without_mic:0}
  # plugin netapp disabled, requires libnetapp
  %define with_redis 0
  %define with_smart 0
  %define with_turbostat 0
 +%define with_write_prometheus 0
  %define with_write_redis 0
  %define with_write_riemann 0
  %endif
  
  Summary:      Statistics collection and monitoring daemon
  Name:         collectd
 -Version:      5.6.1
 -Release:      4%{?dist}
 +Version:      5.7.1
 +Release:      3%{?dist}
  URL:          https://collectd.org
  Source:               https://collectd.org/files/%{name}-%{version}.tar.bz2
  License:      GPLv2
@@@ -462,17 -455,6 +462,17 @@@ The HDDTemp plugin collects the tempera
  provided via SMART and queried by the external hddtemp daemon.
  %endif
  
 +%if %{with_intel_rdt}
 +%package intel_rdt
 +Summary:      Intel RDT plugin for collectd
 +Group:                System Environment/Daemons
 +Requires:     %{name}%{?_isa} = %{version}-%{release}
 +BuildRequires:        intel-cmt-cat
 +%description intel_rdt
 +The intel_rdt plugin collects information provided by monitoring features of
 +Intel Resource Director Technology (Intel(R) RDT).
 +%endif
 +
  %if %{with_ipmi}
  %package ipmi
  Summary:      IPMI plugin for collectd
@@@ -501,8 -483,8 +501,8 @@@ the byte- and packet-counters of select
  Summary:      Java plugin for collectd
  Group:                System Environment/Daemons
  Requires:     %{name}%{?_isa} = %{version}-%{release}
- BuildRequires:        java-devel, jpackage-utils
- Requires:     java, jpackage-utils
+ BuildRequires:        java-devel >= 1.6, jpackage-utils >= 1.6
+ Requires:     java >= 1.6, jpackage-utils >= 1.6
  %description java
  This plugin for collectd allows plugins to be written in Java and executed
  in an embedded JVM.
@@@ -834,17 -816,6 +834,17 @@@ BuildRequires: librdkafka-deve
  The write_kafka plugin sends values to kafka, a distributed messaging system.
  %endif
  
 +%if %{with_write_prometheus}
 +%package write_prometheus
 +Summary:      Write-prometheus plugin for collectd
 +Group:                System Environment/Daemons
 +Requires:     %{name}%{?_isa} = %{version}-%{release}
 +BuildRequires:        libmicrohttpd-devel
 +%description write_prometheus
 +The Write Prometheus plugin exposes collected values using an embedded HTTP
 +server, turning the collectd daemon into a Prometheus exporter.
 +%endif
 +
  %if %{with_write_redis}
  %package write_redis
  Summary:      Write-Redis plugin for collectd
@@@ -1101,12 -1072,6 +1101,12 @@@ Collectd utilitie
  %define _with_drbd --disable-drbd
  %endif
  
 +%if %{with_dpdkstat}
 +%define _with_dpdkstat --enable-dpdkstat
 +%else
 +%define _with_dpdkstat --disable-dpdkstat
 +%endif
 +
  %if %{with_email}
  %define _with_email --enable-email
  %else
  %define _with_hddtemp --disable-hddtemp
  %endif
  
 +%if %{with_hugepages}
 +%define _with_hugepages --enable-hugepages
 +%else
 +%define _with_hugepages --disable-hugepages
 +%endif
 +
 +%if %{with_intel_rdt}
 +%define _with_intel_rdt --enable-intel_rdt
 +%else
 +%define _with_intel_rdt --disable-intel_rdt
 +%endif
 +
  %if %{with_interface}
  %define _with_interface --enable-interface
  %else
  %define _with_write_mongodb --disable-write_mongodb
  %endif
  
 +%if %{with_write_prometheus}
 +%define _with_write_prometheus --enable-write_prometheus
 +%else
 +%define _with_write_prometheus --disable-write_prometheus
 +%endif
 +
  %if %{with_write_redis}
  %define _with_write_redis --enable-write_redis
  %else
        %{?_with_disk} \
        %{?_with_dns} \
        %{?_with_drbd} \
 +      %{?_with_dpdkstat} \
        %{?_with_email} \
        %{?_with_entropy} \
        %{?_with_ethstat} \
        %{?_with_gps} \
        %{?_with_grpc} \
        %{?_with_hddtemp} \
 +      %{?_with_hugepages} \
 +      %{?_with_intel_rdt} \
        %{?_with_interface} \
        %{?_with_ipc} \
        %{?_with_ipmi} \
        %{?_with_write_kafka} \
        %{?_with_write_log} \
        %{?_with_write_mongodb} \
 +      %{?_with_write_prometheus} \
        %{?_with_write_redis} \
        %{?_with_write_riemann} \
        %{?_with_write_sensu} \
  %if %{with_drbd}
  %{_libdir}/%{name}/drbd.so
  %endif
 +%if %{with_dpdkstat}
 +%{_libdir}/%{name}/dpdkstat.so
 +%endif
  %if %{with_ethstat}
  %{_libdir}/%{name}/ethstat.so
  %endif
  %if %{with_fscache}
  %{_libdir}/%{name}/fscache.so
  %endif
 +%if %{with_hugepages}
 +%{_libdir}/%{name}/hugepages.so
 +%endif
  %if %{with_interface}
  %{_libdir}/%{name}/interface.so
  %endif
  %{_libdir}/%{name}/hddtemp.so
  %endif
  
 +%if %{with_intel_rdt}
 +%files intel_rdt
 +%{_libdir}/%{name}/intel_rdt.so
 +%endif
 +
  %if %{with_ipmi}
  %files ipmi
  %{_libdir}/%{name}/ipmi.so
  %{_libdir}/%{name}/write_kafka.so
  %endif
  
 +%if %{with_write_prometheus}
 +%files write_prometheus
 +%{_libdir}/%{name}/write_prometheus.so
 +%endif
 +
  %if %{with_write_redis}
  %files write_redis
  %{_libdir}/%{name}/write_redis.so
  %doc contrib/
  
  %changelog
 -* Sun Mar 05 2017 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.6.1-4
 +* Sun Mar 05 2017 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.7.1-2
  - Don't enable XFS support on RHEL6, it is missing for i386
  
 -* Wed Feb 22 2017 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.6.1-3
 +* Wed Feb 22 2017 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.7.1-2
  - Enable XFS support in df plugin
 +- Fix bogus date in changelog
 +
 +* Sun Jan 01 2017 Marc Fournier <marc.fournier@camptocamp.com> - 5.7.1-1
 +- New upstream version
 +
 +* Tue Nov 29 2016 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.7.0-2
 +- Disable redis plugin on RHEL 6, hiredis has been retired from EPEL6
  
 -* Tue Nov 29 2016 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.6.1-2
 -- Disable redis plugin on RHEL < 7, hiredis has been retired from EPEL6
 +* Mon Oct 10 2016 Marc Fournier <marc.fournier@camptocamp.com> - 5.7.0-1
 +- New PRE-RELEASE version
 +- New plugins enabled by default: hugepages, write_prometheus
 +- New plugins disabled by default: dpdkstat, intel_rdt
  
  * Mon Oct 10 2016 Victor Demonchy <v.demonchy@criteo.com> - 5.6.1-1
  - New upstream version
diff --combined src/network.c
@@@ -292,7 -292,7 +292,7 @@@ static char *send_buffer
  static char *send_buffer_ptr;
  static int send_buffer_fill;
  static cdtime_t send_buffer_last_update;
 -static value_list_t send_buffer_vl = VALUE_LIST_STATIC;
 +static value_list_t send_buffer_vl = VALUE_LIST_INIT;
  static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER;
  
  /* XXX: These counters are incremented from one place only. The spot in which
@@@ -1230,9 -1230,9 +1230,9 @@@ static int parse_part_encr_aes256(socke
                              part_size - buffer_offset,
                              /* in = */ NULL, /* in len = */ 0);
    if (err != 0) {
-     sfree(pea.username);
      ERROR("network plugin: gcry_cipher_decrypt returned: %s. Username: %s",
            gcry_strerror(err), pea.username);
+     sfree(pea.username);
      return (-1);
    }
  
    parse_packet(se, buffer + buffer_offset, payload_len, flags | PP_ENCRYPTED,
                 pea.username);
  
-   /* XXX: Free pea.username?!? */
    /* Update return values */
    *ret_buffer = buffer + part_size;
    *ret_buffer_len = buffer_len - part_size;
@@@ -3086,6 -3084,7 +3084,6 @@@ static int network_stats_read(void) /* 
    vl.values = values;
    vl.values_len = 2;
    vl.time = 0;
 -  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
    sstrncpy(vl.plugin, "network", sizeof(vl.plugin));
  
    /* Octets received / sent */
@@@ -3166,8 -3165,7 +3164,8 @@@ static int network_init(void) 
    if (dispatch_thread_running == 0) {
      int status;
      status = plugin_thread_create(&dispatch_thread_id, NULL /* no attributes */,
 -                                  dispatch_thread, NULL /* no argument */);
 +                                  dispatch_thread, NULL /* no argument */,
 +                                  "network disp");
      if (status != 0) {
        char errbuf[1024];
        ERROR("network: pthread_create failed: %s",
    if (receive_thread_running == 0) {
      int status;
      status = plugin_thread_create(&receive_thread_id, NULL /* no attributes */,
 -                                  receive_thread, NULL /* no argument */);
 +                                  receive_thread, NULL /* no argument */,
 +                                  "network recv");
      if (status != 0) {
        char errbuf[1024];
        ERROR("network: pthread_create failed: %s",
diff --combined src/ntpd.c
@@@ -292,11 -292,16 +292,11 @@@ static int ntpd_config(const char *key
  
  static void ntpd_submit(const char *type, const char *type_inst,
                          gauge_t value) {
 -  value_t values[1];
    value_list_t vl = VALUE_LIST_INIT;
  
 -  values[0].gauge = value;
 -
 -  vl.values = values;
 +  vl.values = &(value_t){.gauge = value};
    vl.values_len = 1;
 -  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
    sstrncpy(vl.plugin, "ntpd", sizeof(vl.plugin));
 -  sstrncpy(vl.plugin_instance, "", sizeof(vl.plugin_instance));
    sstrncpy(vl.type, type, sizeof(vl.type));
    sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
  
@@@ -850,9 -855,9 +850,9 @@@ static int ntpd_read(void) 
    }
  
    /* kerninfo -> estimated error */
-   offset_loop = scale_loop * ((gauge_t)ntohl(ik->offset));
+   offset_loop = (gauge_t)((int32_t)ntohl(ik->offset) * scale_loop);
    freq_loop = ntpd_read_fp(ik->freq);
-   offset_error = scale_error * ((gauge_t)ntohl(ik->esterror));
+   offset_error = (gauge_t)((int32_t)ntohl(ik->esterror) * scale_error);
  
    DEBUG("info_kernel:\n"
          "  pll offset    = %.8g\n"
diff --combined src/perl.c
@@@ -22,7 -22,6 +22,7 @@@
   *
   * Authors:
   *   Sebastian Harl <sh at tokkee.org>
 + *   Pavel Rochnyak <pavel2000 ngs.ru>
   **/
  
  /*
@@@ -78,9 -77,8 +78,9 @@@
  #define PLUGIN_LOG 4
  #define PLUGIN_NOTIF 5
  #define PLUGIN_FLUSH 6
 +#define PLUGIN_FLUSH_ALL 7 /* For collectd-5.6 only */
  
 -#define PLUGIN_TYPES 7
 +#define PLUGIN_TYPES 8
  
  #define PLUGIN_CONFIG 254
  #define PLUGIN_DATASET 255
  /* this is defined in DynaLoader.a */
  void boot_DynaLoader(PerlInterpreter *, CV *);
  
 +static XS(Collectd_plugin_register_read);
 +static XS(Collectd_plugin_register_write);
 +static XS(Collectd_plugin_register_log);
 +static XS(Collectd_plugin_register_notification);
 +static XS(Collectd_plugin_register_flush);
 +static XS(Collectd_plugin_unregister_read);
 +static XS(Collectd_plugin_unregister_write);
 +static XS(Collectd_plugin_unregister_log);
 +static XS(Collectd_plugin_unregister_notification);
 +static XS(Collectd_plugin_unregister_flush);
  static XS(Collectd_plugin_register_ds);
  static XS(Collectd_plugin_unregister_ds);
  static XS(Collectd_plugin_dispatch_values);
@@@ -125,14 -113,6 +125,14 @@@ static XS(Collectd_plugin_log)
  static XS(Collectd__fc_register);
  static XS(Collectd_call_by_name);
  
 +static int perl_read(user_data_t *ud);
 +static int perl_write(const data_set_t *ds, const value_list_t *vl,
 +                      user_data_t *user_data);
 +static void perl_log(int level, const char *msg, user_data_t *user_data);
 +static int perl_notify(const notification_t *notif, user_data_t *user_data);
 +static int perl_flush(cdtime_t timeout, const char *identifier,
 +                      user_data_t *user_data);
 +
  /*
   * private data types
   */
@@@ -185,8 -165,6 +185,8 @@@ extern char **environ
   * private variables
   */
  
 +static _Bool register_legacy_flush = 1;
 +
  /* if perl_threads != NULL perl_threads->head must
   * point to the "base" thread */
  static c_ithread_list_t *perl_threads = NULL;
@@@ -203,18 -181,6 +203,18 @@@ static struct 
    char name[64];
    XS((*f));
  } api[] = {
 +    {"Collectd::plugin_register_read", Collectd_plugin_register_read},
 +    {"Collectd::plugin_register_write", Collectd_plugin_register_write},
 +    {"Collectd::plugin_register_log", Collectd_plugin_register_log},
 +    {"Collectd::plugin_register_notification",
 +     Collectd_plugin_register_notification},
 +    {"Collectd::plugin_register_flush", Collectd_plugin_register_flush},
 +    {"Collectd::plugin_unregister_read", Collectd_plugin_unregister_read},
 +    {"Collectd::plugin_unregister_write", Collectd_plugin_unregister_write},
 +    {"Collectd::plugin_unregister_log", Collectd_plugin_unregister_log},
 +    {"Collectd::plugin_unregister_notification",
 +     Collectd_plugin_unregister_notification},
 +    {"Collectd::plugin_unregister_flush", Collectd_plugin_unregister_flush},
      {"Collectd::plugin_register_data_set", Collectd_plugin_register_ds},
      {"Collectd::plugin_unregister_data_set", Collectd_plugin_unregister_ds},
      {"Collectd::plugin_dispatch_values", Collectd_plugin_dispatch_values},
@@@ -1007,7 -973,7 +1007,7 @@@ static int call_pv_locked(pTHX_ const c
      return 0;
    }
  
 -  ret = call_pv(sub_name, G_SCALAR);
 +  ret = call_pv(sub_name, G_SCALAR | G_EVAL);
  
    t->running = old_running;
    return ret;
  /*
   * Call all working functions of the given type.
   */
 -static int pplugin_call_all(pTHX_ int type, ...) {
 +static int pplugin_call(pTHX_ int type, ...) {
    int retvals = 0;
  
    va_list ap;
    int ret = 0;
 +  char *subname;
  
    dSP;
  
  
    PUSHMARK(SP);
  
 -  XPUSHs(sv_2mortal(newSViv((IV)type)));
 +  if (PLUGIN_READ == type) {
 +    subname = va_arg(ap, char *);
 +  } else if (PLUGIN_WRITE == type) {
 +    data_set_t *ds;
 +    value_list_t *vl;
  
 -  if (PLUGIN_WRITE == type) {
 +    AV *pds = newAV();
 +    HV *pvl = newHV();
 +
 +    subname = va_arg(ap, char *);
      /*
       * $_[0] = $plugin_type;
       *
       *   type_instance   => $type_instance
       * };
       */
 -    data_set_t *ds;
 -    value_list_t *vl;
 -
 -    AV *pds = newAV();
 -    HV *pvl = newHV();
 -
      ds = va_arg(ap, data_set_t *);
      vl = va_arg(ap, value_list_t *);
  
      XPUSHs(sv_2mortal(newRV_noinc((SV *)pds)));
      XPUSHs(sv_2mortal(newRV_noinc((SV *)pvl)));
    } else if (PLUGIN_LOG == type) {
 +    subname = va_arg(ap, char *);
      /*
       * $_[0] = $level;
       *
      XPUSHs(sv_2mortal(newSViv(va_arg(ap, int))));
      XPUSHs(sv_2mortal(newSVpv(va_arg(ap, char *), 0)));
    } else if (PLUGIN_NOTIF == type) {
 +    notification_t *n;
 +    HV *notif = newHV();
 +
 +    subname = va_arg(ap, char *);
      /*
       * $_[0] =
       * {
       *   type_instance   => $type_instance
       * };
       */
 -    notification_t *n;
 -    HV *notif = newHV();
 -
      n = va_arg(ap, notification_t *);
  
      if (-1 == notification2hv(aTHX_ n, notif)) {
      XPUSHs(sv_2mortal(newRV_noinc((SV *)notif)));
    } else if (PLUGIN_FLUSH == type) {
      cdtime_t timeout;
 +    subname = va_arg(ap, char *);
 +    /*
 +     * $_[0] = $timeout;
 +     * $_[1] = $identifier;
 +     */
 +    timeout = va_arg(ap, cdtime_t);
  
 +    XPUSHs(sv_2mortal(newSVnv(CDTIME_T_TO_DOUBLE(timeout))));
 +    XPUSHs(sv_2mortal(newSVpv(va_arg(ap, char *), 0)));
 +  } else if (PLUGIN_FLUSH_ALL == type) {
 +    cdtime_t timeout;
 +    subname = "Collectd::plugin_call_all";
      /*
       * $_[0] = $timeout;
       * $_[1] = $identifier;
       */
      timeout = va_arg(ap, cdtime_t);
  
 +    XPUSHs(sv_2mortal(newSViv((IV)PLUGIN_FLUSH)));
      XPUSHs(sv_2mortal(newSVnv(CDTIME_T_TO_DOUBLE(timeout))));
      XPUSHs(sv_2mortal(newSVpv(va_arg(ap, char *), 0)));
 +  } else if (PLUGIN_INIT == type) {
 +    subname = "Collectd::plugin_call_all";
 +    XPUSHs(sv_2mortal(newSViv((IV)type)));
 +  } else if (PLUGIN_SHUTDOWN == type) {
 +    subname = "Collectd::plugin_call_all";
 +    XPUSHs(sv_2mortal(newSViv((IV)type)));
 +  } else { /* Unknown type. Run 'plugin_call_all' and make compiler happy */
 +    subname = "Collectd::plugin_call_all";
 +    XPUSHs(sv_2mortal(newSViv((IV)type)));
    }
  
    PUTBACK;
  
 -  retvals = call_pv_locked(aTHX_ "Collectd::plugin_call_all");
 +  retvals = call_pv_locked(aTHX_ subname);
  
    SPAGAIN;
 -  if (0 < retvals) {
 +  if (SvTRUE(ERRSV)) {
 +    if (PLUGIN_LOG != type)
 +      ERROR("perl: %s error: %s", subname, SvPV_nolen(ERRSV));
 +    ret = -1;
 +  } else if (0 < retvals) {
      SV *tmp = POPs;
      if (!SvTRUE(tmp))
        ret = -1;
  
    va_end(ap);
    return ret;
 -} /* static int pplugin_call_all (int, ...) */
 +} /* static int pplugin_call (int, ...) */
  
  /*
   * collectd's Perl interpreter based thread implementation.
@@@ -1197,10 -1134,6 +1197,10 @@@ static void c_ithread_destroy(c_ithread
    assert(NULL != perl_threads);
  
    PERL_SET_CONTEXT(aTHX);
 +  /* Mark as running to avoid deadlock:
 +     c_ithread_destroy -> log_debug -> perl_log()
 +  */
 +  ithread->running = 1;
    log_debug("Shutting down Perl interpreter %p...", aTHX);
  
  #if COLLECT_DEBUG
@@@ -1414,10 -1347,7 +1414,10 @@@ static int fc_call(pTHX_ int type, int 
    }
  
    SPAGAIN;
 -  if (0 < retvals) {
 +  if (SvTRUE(ERRSV)) {
 +    ERROR("perl: Collectd::fc_call error: %s", SvPV_nolen(ERRSV));
 +    ret = -1;
 +  } else if (0 < retvals) {
      SV *tmp = POPs;
  
      /* the exec callbacks return a status, while
@@@ -1573,163 -1503,6 +1573,163 @@@ static target_proc_t ptarget = {ptarget
   * Exported Perl API.
   */
  
 +static void _plugin_register_generic_userdata(pTHX, int type,
 +                                              const char *desc) {
 +  int ret = 0;
 +  user_data_t userdata;
 +  char *pluginname;
 +
 +  dXSARGS;
 +
 +  if (2 != items) {
 +    log_err("Usage: Collectd::plugin_register_%s(pluginname, subname)", desc);
 +    XSRETURN_EMPTY;
 +  }
 +
 +  if (!SvOK(ST(0))) {
 +    log_err("Collectd::plugin_register_%s(pluginname, subname): "
 +            "Invalid pluginname",
 +            desc);
 +    XSRETURN_EMPTY;
 +  }
 +  if (!SvOK(ST(1))) {
 +    log_err("Collectd::plugin_register_%s(pluginname, subname): "
 +            "Invalid subname",
 +            desc);
 +    XSRETURN_EMPTY;
 +  }
 +
 +  /* Use pluginname as-is to allow flush a single perl plugin */
 +  pluginname = SvPV_nolen(ST(0));
 +
 +  log_debug("Collectd::plugin_register_%s: "
 +            "plugin = \"%s\", sub = \"%s\"",
 +            desc, pluginname, SvPV_nolen(ST(1)));
 +
 +  memset(&userdata, 0, sizeof(userdata));
 +  userdata.data = strdup(SvPV_nolen(ST(1)));
 +  userdata.free_func = free;
 +
 +  if (PLUGIN_READ == type) {
 +    ret = plugin_register_complex_read(
 +        "perl",                                       /* group */
 +        pluginname, perl_read, plugin_get_interval(), /* Default interval */
 +        &userdata);
 +  } else if (PLUGIN_WRITE == type) {
 +    ret = plugin_register_write(pluginname, perl_write, &userdata);
 +  } else if (PLUGIN_LOG == type) {
 +    ret = plugin_register_log(pluginname, perl_log, &userdata);
 +  } else if (PLUGIN_NOTIF == type) {
 +    ret = plugin_register_notification(pluginname, perl_notify, &userdata);
 +  } else if (PLUGIN_FLUSH == type) {
 +    if (1 == register_legacy_flush) { /* For collectd-5.7 only, #1731 */
 +      register_legacy_flush = 0;
 +      ret = plugin_register_flush("perl", perl_flush, /* user_data = */ NULL);
 +    }
 +
 +    if (0 == ret)
 +      ret = plugin_register_flush(pluginname, perl_flush, &userdata);
 +  } else {
 +    ret = -1;
 +  }
 +
 +  if (0 == ret)
 +    XSRETURN_YES;
 +  else {
 +    free(userdata.data);
 +    XSRETURN_EMPTY;
 +  }
 +} /* static void _plugin_register_generic_userdata ( ... ) */
 +
 +/*
 + * Collectd::plugin_register_TYPE (pluginname, subname).
 + *
 + * pluginname:
 + *   name of the perl plugin
 + *
 + * subname:
 + *   name of the plugin's subroutine that does the work
 + */
 +
 +static XS(Collectd_plugin_register_read) {
 +  return _plugin_register_generic_userdata(aTHX, PLUGIN_READ, "read");
 +}
 +
 +static XS(Collectd_plugin_register_write) {
 +  return _plugin_register_generic_userdata(aTHX, PLUGIN_WRITE, "write");
 +}
 +
 +static XS(Collectd_plugin_register_log) {
 +  return _plugin_register_generic_userdata(aTHX, PLUGIN_LOG, "log");
 +}
 +
 +static XS(Collectd_plugin_register_notification) {
 +  return _plugin_register_generic_userdata(aTHX, PLUGIN_NOTIF, "notification");
 +}
 +
 +static XS(Collectd_plugin_register_flush) {
 +  return _plugin_register_generic_userdata(aTHX, PLUGIN_FLUSH, "flush");
 +}
 +
 +typedef int perl_unregister_function_t(const char *name);
 +
 +static void _plugin_unregister_generic(pTHX, perl_unregister_function_t *unreg,
 +                                       const char *desc) {
 +  dXSARGS;
 +
 +  if (1 != items) {
 +    log_err("Usage: Collectd::plugin_unregister_%s(pluginname)", desc);
 +    XSRETURN_EMPTY;
 +  }
 +
 +  if (!SvOK(ST(0))) {
 +    log_err("Collectd::plugin_unregister_%s(pluginname): "
 +            "Invalid pluginname",
 +            desc);
 +    XSRETURN_EMPTY;
 +  }
 +
 +  log_debug("Collectd::plugin_unregister_%s: plugin = \"%s\"", desc,
 +            SvPV_nolen(ST(0)));
 +
 +  unreg(SvPV_nolen(ST(0)));
 +
 +  XSRETURN_EMPTY;
 +
 +  return;
 +} /* static void _plugin_unregister_generic ( ... ) */
 +
 +/*
 + * Collectd::plugin_unregister_TYPE (pluginname).
 + *
 + * TYPE:
 + *   type of callback to be unregistered: read, write, log, notification, flush
 + *
 + * pluginname:
 + *   name of the perl plugin
 + */
 +
 +static XS(Collectd_plugin_unregister_read) {
 +  return _plugin_unregister_generic(aTHX, plugin_unregister_read, "read");
 +}
 +
 +static XS(Collectd_plugin_unregister_write) {
 +  return _plugin_unregister_generic(aTHX, plugin_unregister_write, "write");
 +}
 +
 +static XS(Collectd_plugin_unregister_log) {
 +  return _plugin_unregister_generic(aTHX, plugin_unregister_log, "log");
 +}
 +
 +static XS(Collectd_plugin_unregister_notification) {
 +  return _plugin_unregister_generic(aTHX, plugin_unregister_notification,
 +                                    "notification");
 +}
 +
 +static XS(Collectd_plugin_unregister_flush) {
 +  return _plugin_unregister_generic(aTHX, plugin_unregister_flush, "flush");
 +}
 +
  /*
   * Collectd::plugin_register_data_set (type, dataset).
   *
@@@ -2105,14 -1878,14 +2105,14 @@@ static int perl_init(void) 
    assert(aTHX == perl_threads->head->interp);
    pthread_mutex_lock(&perl_threads->mutex);
  
 -  status = pplugin_call_all(aTHX_ PLUGIN_INIT);
 +  status = pplugin_call(aTHX_ PLUGIN_INIT);
  
    pthread_mutex_unlock(&perl_threads->mutex);
  
    return status;
  } /* static int perl_init (void) */
  
 -static int perl_read(void) {
 +static int perl_read(user_data_t *user_data) {
    dTHX;
  
    if (NULL == perl_threads)
  
    log_debug("perl_read: c_ithread: interp = %p (active threads: %i)", aTHX,
              perl_threads->number_of_threads);
 -  return pplugin_call_all(aTHX_ PLUGIN_READ);
 -} /* static int perl_read (void) */
 +
 +  return pplugin_call(aTHX_ PLUGIN_READ, user_data->data);
 +} /* static int perl_read (user_data_t *user_data) */
  
  static int perl_write(const data_set_t *ds, const value_list_t *vl,
 -                      user_data_t __attribute__((unused)) * user_data) {
 +                      user_data_t *user_data) {
    int status;
    dTHX;
  
  
    log_debug("perl_write: c_ithread: interp = %p (active threads: %i)", aTHX,
              perl_threads->number_of_threads);
 -  status = pplugin_call_all(aTHX_ PLUGIN_WRITE, ds, vl);
 +  status = pplugin_call(aTHX_ PLUGIN_WRITE, user_data->data, ds, vl);
  
    if (aTHX == perl_threads->head->interp)
      pthread_mutex_unlock(&perl_threads->mutex);
    return status;
  } /* static int perl_write (const data_set_t *, const value_list_t *) */
  
 -static void perl_log(int level, const char *msg,
 -                     user_data_t __attribute__((unused)) * user_data) {
 +static void perl_log(int level, const char *msg, user_data_t *user_data) {
    dTHX;
  
    if (NULL == perl_threads)
    if (aTHX == perl_threads->head->interp)
      pthread_mutex_lock(&perl_threads->mutex);
  
 -  pplugin_call_all(aTHX_ PLUGIN_LOG, level, msg);
 +  pplugin_call(aTHX_ PLUGIN_LOG, user_data->data, level, msg);
  
    if (aTHX == perl_threads->head->interp)
      pthread_mutex_unlock(&perl_threads->mutex);
    return;
  } /* static void perl_log (int, const char *) */
  
 -static int perl_notify(const notification_t *notif,
 -                       user_data_t __attribute__((unused)) * user_data) {
 +static int perl_notify(const notification_t *notif, user_data_t *user_data) {
    dTHX;
  
    if (NULL == perl_threads)
  
      aTHX = t->interp;
    }
 -  return pplugin_call_all(aTHX_ PLUGIN_NOTIF, notif);
 +  return pplugin_call(aTHX_ PLUGIN_NOTIF, user_data->data, notif);
  } /* static int perl_notify (const notification_t *) */
  
  static int perl_flush(cdtime_t timeout, const char *identifier,
 -                      user_data_t __attribute__((unused)) * user_data) {
 +                      user_data_t *user_data) {
    dTHX;
  
    if (NULL == perl_threads)
  
      aTHX = t->interp;
    }
 -  return pplugin_call_all(aTHX_ PLUGIN_FLUSH, timeout, identifier);
 +
 +  /* For collectd-5.6 only, #1731 */
 +  if (user_data == NULL || user_data->data == NULL)
 +    return pplugin_call(aTHX_ PLUGIN_FLUSH_ALL, timeout, identifier);
 +
 +  return pplugin_call(aTHX_ PLUGIN_FLUSH, user_data->data, timeout, identifier);
  } /* static int perl_flush (const int) */
  
  static int perl_shutdown(void) {
    dTHX;
  
    plugin_unregister_complex_config("perl");
 +  plugin_unregister_read_group("perl");
  
    if (NULL == perl_threads)
      return 0;
    log_debug("perl_shutdown: c_ithread: interp = %p (active threads: %i)", aTHX,
              perl_threads->number_of_threads);
  
 -  plugin_unregister_log("perl");
 -  plugin_unregister_notification("perl");
    plugin_unregister_init("perl");
 -  plugin_unregister_read("perl");
 -  plugin_unregister_write("perl");
 -  plugin_unregister_flush("perl");
 +  plugin_unregister_flush("perl"); /* For collectd-5.6 only, #1731 */
  
 -  ret = pplugin_call_all(aTHX_ PLUGIN_SHUTDOWN);
 +  ret = pplugin_call(aTHX_ PLUGIN_SHUTDOWN);
  
    pthread_mutex_lock(&perl_threads->mutex);
    t = perl_threads->tail;
@@@ -2476,7 -2248,15 +2476,7 @@@ static int init_pi(int argc, char **arg
  
    perl_run(aTHX);
  
 -  plugin_register_log("perl", perl_log, /* user_data = */ NULL);
 -  plugin_register_notification("perl", perl_notify,
 -                               /* user_data = */ NULL);
    plugin_register_init("perl", perl_init);
 -
 -  plugin_register_read("perl", perl_read);
 -
 -  plugin_register_write("perl", perl_write, /* user_data = */ NULL);
 -  plugin_register_flush("perl", perl_flush, /* user_data = */ NULL);
    plugin_register_shutdown("perl", perl_shutdown);
    return 0;
  } /* static int init_pi (const char **, const int) */
@@@ -2618,6 -2398,12 +2618,12 @@@ static int perl_config_plugin(pTHX_ oco
    char *plugin;
    HV *config;
  
+   if (NULL == perl_threads) {
+     log_err("A `Plugin' block was encountered but no plugin was loaded yet. "
+             "Put the appropriate `LoadPlugin' option in front of it.");
+     return -1;
+   }
    dSP;
  
    if ((1 != ci->values_num) || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
@@@ -2686,8 -2472,6 +2692,8 @@@ static int perl_config(oconfig_item_t *
        current_status = perl_config_includedir(aTHX_ c);
      else if (0 == strcasecmp(c->key, "Plugin"))
        current_status = perl_config_plugin(aTHX_ c);
 +    else if (0 == strcasecmp(c->key, "RegisterLegacyFlush"))
 +      cf_util_get_boolean(c, &register_legacy_flush);
      else {
        log_warn("Ignoring unknown config key \"%s\".", c->key);
        current_status = 0;