Implement concurrency.
authorFlorian Forster <ff@octo.it>
Thu, 11 Jul 2013 11:22:39 +0000 (13:22 +0200)
committerFlorian Forster <ff@octo.it>
Thu, 11 Jul 2013 11:22:39 +0000 (13:22 +0200)
src/Makefile.am
src/statsd-tg.c

index fe34a05..1189f8e 100644 (file)
@@ -1,3 +1,5 @@
 bin_PROGRAMS = statsd-tg
 
 statsd_tg_SOURCES = statsd-tg.c
+statsd_tg_CFLAGS = $(AM_CFLAGS) -pthread
+statsd_tg_LDADD = -lrt
index 8815a8b..3f792f7 100644 (file)
@@ -31,6 +31,8 @@
 #include <signal.h>
 #include <errno.h>
 #include <assert.h>
+#include <time.h>
+#include <pthread.h>
 
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -57,11 +59,13 @@ static int conf_set_size     = DEF_SET_SIZE;
 static const char *conf_node = DEF_NODE;
 static const char *conf_service = DEF_SERVICE;
 
-static int sock = -1;
+static int conf_threads_num = 1;
 
 static struct sigaction sigint_action;
 static struct sigaction sigterm_action;
 
+static unsigned long long events_sent = 0;
+pthread_mutex_t events_sent_lock = PTHREAD_MUTEX_INITIALIZER;
 static _Bool loop = 1;
 
 __attribute__((noreturn))
@@ -101,6 +105,7 @@ static int sock_open (void) /* {{{ */
   struct addrinfo ai_hints;
   struct addrinfo *ai_list = NULL;
   struct addrinfo *ai_ptr;
+  int sock;
 
   int status;
 
@@ -147,17 +152,17 @@ static int sock_open (void) /* {{{ */
     exit (EXIT_FAILURE);
   }
 
-  return (0);
+  return (sock);
 } /* }}} int sock_open */
 
-static int send_random_event (void) /* {{{ */
+static int send_random_event (int sock, unsigned short seed[static 3]) /* {{{ */
 {
   long conf_num_total = conf_num_counters + conf_num_timers
       + conf_num_gauges + conf_num_sets;
   /* Not completely fair, but good enough for our use-case. */
-  long rnd = lrand48 () % conf_num_total;
+  long rnd = nrand48 (seed) % conf_num_total;
 
-  long value = lrand48 ();
+  long value = nrand48 (seed);
   char *type;
 
   char buffer[1024];
@@ -199,7 +204,7 @@ static int send_random_event (void) /* {{{ */
   status = send (sock, buffer, (size_t) buffer_size, /* flags = */ 0);
   if (status < 0)
   {
-    fprintf (stderr, "send failed: %s", strerror (errno));
+    fprintf (stderr, "send failed: %s\n", strerror (errno));
     return (-1);
   }
 
@@ -239,6 +244,10 @@ static int read_options (int argc, char **argv) /* {{{ */
 {
   int opt;
 
+#ifdef _SC_NPROCESSORS_ONLN
+  conf_threads_num = (int) sysconf (_SC_NPROCESSORS_ONLN);
+#endif
+
   while ((opt = getopt (argc, argv, "c:t:g:s:S:d:D:h")) != -1)
   {
     switch (opt)
@@ -271,6 +280,10 @@ static int read_options (int argc, char **argv) /* {{{ */
         conf_service = optarg;
         break;
 
+      case 'T':
+        get_integer_opt (optarg, &conf_threads_num);
+        break;
+
       case 'h':
         exit_usage (EXIT_SUCCESS);
 
@@ -282,8 +295,76 @@ static int read_options (int argc, char **argv) /* {{{ */
   return (0);
 } /* }}} int read_options */
 
+static void *send_thread (void *args __attribute__((unused))) /* {{{ */
+{
+  int sock;
+  unsigned short seed[3];
+  struct timespec ts;
+
+  unsigned long long local_events_sent = 0;
+
+  clock_gettime (CLOCK_REALTIME, &ts);
+  seed[2] = (unsigned short) (ts.tv_nsec);
+  seed[1] = (unsigned short) (ts.tv_nsec >> 16);
+  seed[0] = (unsigned short) (ts.tv_sec);
+
+  sock = sock_open ();
+
+  while (loop)
+  {
+    send_random_event (sock, seed);
+    local_events_sent++;
+  }
+
+  close (sock);
+
+  pthread_mutex_lock (&events_sent_lock);
+  events_sent += local_events_sent;
+  pthread_mutex_unlock (&events_sent_lock);
+
+  return (NULL);
+} /* }}} void *send_thread */
+
+static void run_threads (void) /* {{{ */
+{
+  pthread_t threads[conf_threads_num];
+  int i;
+
+  for (i = 0; i < conf_threads_num; i++)
+  {
+    int status;
+
+    status = pthread_create (&threads[i], /* attr = */ NULL,
+        send_thread, /* args = */ NULL);
+    if (status != 0)
+    {
+      fprintf (stderr, "pthread_create failed.");
+      abort ();
+    }
+  }
+
+  for (i = 0; i < conf_threads_num; i++)
+    pthread_join (threads[i], /* retval = */ NULL);
+} /* }}} void run_threads */
+
+static double timespec_diff (struct timespec const *ts0, /* {{{ */
+    struct timespec const *ts1)
+{
+  time_t diff_sec;
+  long diff_nsec;
+
+  diff_sec = ts1->tv_sec - ts0->tv_sec;
+  diff_nsec += ts1->tv_nsec - ts0->tv_nsec;
+
+  return ((double) diff_sec) + (((double) diff_nsec) / 1.0e9);
+} /* }}} double timespec_diff */
+
 int main (int argc, char **argv) /* {{{ */
 {
+  struct timespec ts_begin;
+  struct timespec ts_end;
+  double runtime;
+
   read_options (argc, argv);
 
   sigint_action.sa_handler = signal_handler;
@@ -292,15 +373,13 @@ int main (int argc, char **argv) /* {{{ */
   sigterm_action.sa_handler = signal_handler;
   sigaction (SIGTERM, &sigterm_action, /* old = */ NULL);
 
-  sock_open ();
+  clock_gettime (CLOCK_MONOTONIC, &ts_begin);
+  run_threads ();
+  clock_gettime (CLOCK_MONOTONIC, &ts_end);
 
-  while (loop)
-  {
-    send_random_event ();
-  }
-
-  close (sock);
-  sock = -1;
+  runtime = timespec_diff (&ts_begin, &ts_end);
+  printf ("Sent %llu events in %.0fs (%.0f events/s).\n",
+      events_sent, runtime, ((double) events_sent) / runtime);
 
   exit (EXIT_SUCCESS);
   return (0);