Merge branch 'develop' into debian-develop
[archipelago] / src / peer.c
similarity index 87%
rename from xseg/peers/user/peer.c
rename to src/peer.c
index 0d45d71..7da6b5c 100644 (file)
 #include <unistd.h>
 #include <sys/syscall.h>
 #include <sys/time.h>
+#include <sys/resource.h>
 #include <signal.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <errno.h>
+#include <sched.h>
 #ifdef MT
 #include <pthread.h>
 #endif
@@ -58,6 +60,8 @@
 
 #ifdef MT
 #define PEER_TYPE "pthread"
+#elif defined(FD)
+#define PEER_TYPE "posixfd"
 #else
 #define PEER_TYPE "posix"
 #endif
 //FIXME this should not be defined here probably
 #define MAX_SPEC_LEN 128
 #define MAX_PIDFILE_LEN 512
+#define MAX_CPUS_LEN 512
 
+/* Define the cpus on which the threads/process will be pinned */
+struct cpu_list {
+       int cpus[48];
+       int len;
+};
+
+struct cpu_list cpu_list;
 volatile unsigned int terminated = 0;
 unsigned int verbose = 0;
 struct log_ctx lc;
@@ -97,6 +109,23 @@ void signal_handler(int signal)
 #endif
 }
 
+/*
+ * We want to both print the backtrace and dump the core. To do so, we fork a
+ * process that prints its stack trace, that should be the same as the parents.
+ * Then we wait for 1 sec and we abort. The reason we don't abort immediately
+ * is because it may interrupt the printing of the backtrace.
+ */
+void segv_handler(int signal)
+{
+       if (fork() == 0) {
+               xseg_printtrace();
+               _exit(1);
+       }
+
+       sleep(1);
+       abort();
+}
+
 void renew_logfile(int signal)
 {
 //     XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
@@ -107,6 +136,7 @@ static int setup_signals(struct peerd *peer)
 {
        int r;
        struct sigaction sa;
+       struct rlimit rlim;
 #ifdef MT
        global_peer = peer;
 #endif
@@ -123,6 +153,24 @@ static int setup_signals(struct peerd *peer)
        if (r < 0)
                return r;
 
+       /*
+        * Get the current limits for core files and raise them to the largest
+        * value possible
+        */
+       if (getrlimit(RLIMIT_CORE, &rlim) < 0)
+               return r;
+
+       rlim.rlim_cur = rlim.rlim_max;
+
+       if (setrlimit(RLIMIT_CORE, &rlim) < 0)
+               return r;
+
+       /* Install handler for segfaults */
+       sa.sa_handler = segv_handler;
+       r = sigaction(SIGSEGV, &sa, NULL);
+       if (r < 0)
+               return r;
+
        sa.sa_handler = renew_logfile;
        r = sigaction(SIGUSR1, &sa, NULL);
        if (r < 0)
@@ -460,7 +508,7 @@ static void* thread_loop(void *arg)
                                loops = threshold;
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
-               xseg_wait_signal(xseg, 10000000UL);
+               xseg_wait_signal(xseg, peer->sd, 10000000UL);
                xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
        }
@@ -473,9 +521,25 @@ void *init_thread_loop(void *arg)
 {
        struct thread *t = (struct thread *) arg;
        struct peerd *peer = t->peer;
+       pthread_t thread = pthread_self();
+       long int thread_num = t - t->peer->thread;
        char *thread_id;
        int i;
 
+       cpu_set_t mask;
+       int r;
+
+       if (cpu_list.len) {
+               CPU_ZERO(&mask);
+               CPU_SET(cpu_list.cpus[thread_num], &mask);
+               r = pthread_setaffinity_np(thread, sizeof(mask), &mask);
+               if (r < 0) {
+                       perror("sched_setaffinity");
+                       return NULL;
+               }
+       }
+
+
        /*
         * We need an identifier for every thread that will spin in peerd_loop.
         * The following code is a way to create a string of this format:
@@ -489,7 +553,7 @@ void *init_thread_loop(void *arg)
         * chars should be more than enough.
         */
        thread_id = malloc(13 * sizeof(char));
-       snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
+       snprintf(thread_id, 13, "Thread %ld", thread_num);
        for (i = 0; thread_id[i]; i++) {}
        t->arg = (void *)realloc(thread_id, i-1);
        pthread_setspecific(threadkey, t);
@@ -565,7 +629,9 @@ static int generic_peerd_loop(void *arg)
        xport portno_start = peer->portno_start;
        xport portno_end = peer->portno_end;
        pid_t pid = syscall(SYS_gettid);
-       uint64_t threshold=1000/(1 + portno_end - portno_start);
+       uint64_t threshold = peer->threshold;
+       threshold /= (1 + portno_end - portno_start);
+       threshold += 1;
        uint64_t loops;
 
        XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
@@ -590,7 +656,7 @@ static int generic_peerd_loop(void *arg)
                }
 #endif
                XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
-               xseg_wait_signal(xseg, 10000000UL);
+               xseg_wait_signal(xseg, peer->sd, 10000000UL);
                xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "%s woke up\n", id);
        }
@@ -600,6 +666,19 @@ static int generic_peerd_loop(void *arg)
 static int init_peerd_loop(struct peerd *peer)
 {
        struct xseg *xseg = peer->xseg;
+       cpu_set_t mask;
+       int r;
+
+       if (cpu_list.len) {
+               CPU_ZERO(&mask);
+               CPU_SET(cpu_list.cpus[0], &mask);
+
+               r = sched_setaffinity(0, sizeof(mask), &mask);
+               if (r < 0) {
+                       perror("sched_setaffinity");
+                       return -1;
+               }
+       }
 
        peer->peerd_loop(peer);
        custom_peer_finalize(peer);
@@ -632,7 +711,8 @@ static struct xseg *join(char *spec)
 }
 
 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
-                       long portno_end, uint32_t nr_threads, xport defer_portno)
+                       long portno_end, uint32_t nr_threads, xport defer_portno,
+                       uint64_t threshold)
 {
        int i;
        struct peerd *peer;
@@ -650,6 +730,7 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
        }
        peer->nr_ops = nr_ops;
        peer->defer_portno = defer_portno;
+       peer->threshold = threshold;
 #ifdef MT
        peer->nr_threads = nr_threads;
        peer->thread = calloc(nr_threads, sizeof(struct thread));
@@ -669,6 +750,8 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
 #else
        if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
                goto malloc_fail;
+       if (peer->free_reqs.size < peer->nr_ops)
+               peer->nr_ops = peer->free_reqs.size;
 #endif
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
        if (!peer->peer_reqs){
@@ -692,14 +775,15 @@ malloc_fail:
         * The first port we bind will have its signal_desc initialized by xseg
         * and the same signal_desc will be used for all the other ports.
         */
+       peer->sd = NULL;
        for (p = peer->portno_start; p <= peer->portno_end; p++) {
-               port = xseg_bind_port(peer->xseg, p, sd);
+               port = xseg_bind_port(peer->xseg, p, peer->sd);
                if (!port){
                        printf("cannot bind to port %u\n", (unsigned int) p);
                        return NULL;
                }
                if (p == peer->portno_start)
-                       sd = xseg_get_signal_desc(peer->xseg, port);
+                       peer->sd = xseg_get_signal_desc(peer->xseg, port);
        }
 
        printf("Peer on ports  %u-%u\n", peer->portno_start, peer->portno_end);
@@ -775,6 +859,23 @@ int pidfile_open(char *path, pid_t *old_pid)
        return fd;
 }
 
+int get_cpu_list(char *cpus, struct cpu_list *cpu_list)
+{
+       char *tok, *rem;
+       int i = 0;
+
+       while ((tok = strtok(cpus, ",")) != NULL) {
+               cpu_list->cpus[i++] = strtol(tok, &rem, 10);
+               if (strlen(rem) > 0)    /* Not a number */
+                       return -1;
+               cpus = NULL;
+       }
+
+       cpu_list->len = i;
+
+       return 0;
+}
+
 void usage(char *argv0)
 {
        fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
@@ -793,6 +894,8 @@ void usage(char *argv0)
 #ifdef MT
                "    -t        | No      | Number of threads \n"
 #endif
+               "    --cpus    | No      | Coma-separated list of CPUs\n"
+               "              |         | to pin the process or threads\n"
                "\n"
               );
        custom_peer_usage();
@@ -809,6 +912,7 @@ int main(int argc, char *argv[])
        int daemonize = 0, help = 0;
        uint32_t nr_ops = 16;
        uint32_t nr_threads = 1;
+       uint64_t threshold = 1000;
        unsigned int debug_level = 0;
        xport defer_portno = NoPort;
        pid_t old_pid;
@@ -817,10 +921,12 @@ int main(int argc, char *argv[])
        char spec[MAX_SPEC_LEN + 1];
        char logfile[MAX_LOGFILE_LEN + 1];
        char pidfile[MAX_PIDFILE_LEN + 1];
+       char cpus[MAX_CPUS_LEN + 1];
 
        logfile[0] = 0;
        pidfile[0] = 0;
        spec[0] = 0;
+       cpus[0] = 0;
 
        //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
        // -dp xseg_portno to defer blocking requests
@@ -841,6 +947,8 @@ int main(int argc, char *argv[])
        READ_ARG_BOOL("-d", daemonize);
        READ_ARG_BOOL("-h", help);
        READ_ARG_BOOL("--help", help);
+       READ_ARG_ULONG("--threshold", threshold);
+       READ_ARG_STRING("--cpus", cpus, MAX_CPUS_LEN);
        READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
        END_READ_ARGS();
 
@@ -882,6 +990,30 @@ int main(int argc, char *argv[])
 
        pidfile_write(pid_fd);
 
+       if (cpus[0]) {
+               r = get_cpu_list(cpus, &cpu_list);
+
+               if (r < 0) {
+                       XSEGLOG2(&lc, E, "--cpus %s: Invalid input", cpus);
+                       goto out;
+               }
+#ifdef MT
+               if (nr_threads != cpu_list.len) {
+                       XSEGLOG2(&lc, E, "--cpus %s: Number of "
+                                       "threads (%d) and CPUs don't match",
+                                       cpus, nr_threads);
+                       goto out;
+               }
+#else
+               if (cpu_list.len != 1) {
+                       XSEGLOG2(&lc, E, "--cpus %s: Too many CPUs for a "
+                                       "single process", cpus);
+                       goto out;
+               }
+#endif
+       }
+
+
        //TODO perform argument sanity checks
        verbose = debug_level;
        if (portno != -1) {
@@ -895,7 +1027,7 @@ int main(int argc, char *argv[])
                goto out;
        }
 
-       peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
+       peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno, threshold);
        if (!peer){
                r = -1;
                goto out;