Complete unification of thread_loop/peerd_loop
[archipelago] / xseg / peers / user / peer.c
index 4150271..f3ab4eb 100644 (file)
@@ -1,26 +1,66 @@
+/*
+ * Copyright 2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
 #define _GNU_SOURCE
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <unistd.h>
-#include <xseg/xseg.h>
-#include <peer.h>
 #include <sys/syscall.h>
 #include <sys/time.h>
 #include <signal.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <errno.h>
+
 #ifdef MT
 #include <pthread.h>
 #endif
 
+#include <xseg/xseg.h>
+#include <peer.h>
+
 #ifdef MT
 #define PEER_TYPE "pthread"
 #else
 #define PEER_TYPE "posix"
 #endif
 
+//FIXME this should not be defined here probably
+#define MAX_SPEC_LEN 128
+#define MAX_PIDFILE_LEN 512
+
 volatile unsigned int terminated = 0;
 unsigned int verbose = 0;
 struct log_ctx lc;
@@ -29,16 +69,18 @@ uint32_t ta = 0;
 #endif
 
 #ifdef MT
+struct peerd *global_peer;
+
 struct thread {
        struct peerd *peer;
        pthread_t tid;
        pthread_cond_t cond;
        pthread_mutex_t lock;
+       int thread_no;
        void (*func)(void *arg);
        void *arg;
 };
 
-
 inline static struct thread* alloc_thread(struct peerd *peer)
 {
        xqindex idx = xq_pop_head(&peer->threads, 1);
@@ -74,31 +116,33 @@ inline static int wake_up_next_thread(struct peerd *peer)
 }
 #endif
 
+/*
+ * extern is needed if this function is going to be called by another file
+ * such as bench-xseg.c
+ */
 
-static inline int isTerminate()
+void signal_handler(int signal)
 {
-/* ta doesn't need to be taken into account, because the main loops
- * doesn't check the terminated flag if ta is not 0.
- *
-#ifdef ST_THREADS
-       return (!ta & terminated);
-#else
-       return terminated;
+       XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
+       terminated = 1;
+#ifdef MT
+       wake_up_next_thread(global_peer);
 #endif
-       */
-       return terminated;
 }
 
-void signal_handler(int signal)
+void renew_logfile(int signal)
 {
-       XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
-       terminated = 1;
+       XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
+       renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
 }
 
-static int setup_signals()
+static int setup_signals(struct peerd *peer)
 {
        int r;
        struct sigaction sa;
+#ifdef MT
+       global_peer = peer;
+#endif
        sigemptyset(&sa.sa_mask);
        sa.sa_flags = 0;
        sa.sa_handler = signal_handler;
@@ -111,6 +155,12 @@ static int setup_signals()
        r = sigaction(SIGQUIT, &sa, NULL);
        if (r < 0)
                return r;
+
+       sa.sa_handler = renew_logfile;
+       r = sigaction(SIGUSR1, &sa, NULL);
+       if (r < 0)
+               return r;
+
        return r;
 }
 
@@ -133,7 +183,7 @@ void print_req(struct xseg *xseg, struct xseg_request *req)
                strncpy(data, req_data, 63);
                data[63] = 0;
                printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
-                               "src: %u, st: %u, dst: %u dt: %u\n"
+                               "src: %u, transit: %u, dst: %u effective dst: %u\n"
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
                                (unsigned long)(req),
                                (unsigned int)req->op,
@@ -142,9 +192,9 @@ void print_req(struct xseg *xseg, struct xseg_request *req)
                                (unsigned long)req->serviced,
                                (unsigned int)req->state,
                                (unsigned int)req->src_portno,
-                               (unsigned int)req->src_transit_portno,
+                               (unsigned int)req->transit_portno,
                                (unsigned int)req->dst_portno,
-                               (unsigned int)req->dst_transit_portno,
+                               (unsigned int)req->effective_dst_portno,
                                (unsigned int)req->targetlen, target,
                                (unsigned long long)req->datalen, data);
        }
@@ -242,7 +292,7 @@ void complete(struct peerd *peer, struct peer_req *pr)
 #endif
 }
 
-static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
+static void handle_accepted(struct peerd *peer, struct peer_req *pr,
                                struct xseg_request *req)
 {
        struct xseg_request *xreq = pr->req;
@@ -294,7 +344,7 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
        return 0;
 }
 
-static int check_ports(struct peerd *peer)
+int check_ports(struct peerd *peer)
 {
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
@@ -349,19 +399,6 @@ static int check_ports(struct peerd *peer)
 }
 
 #ifdef MT
-int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
-{
-       struct thread *t = alloc_thread(peer);
-       if (t) {
-               t->func = func;
-               t->arg = arg;
-               wake_up_thread(t);
-               return 0;
-       } else
-               // we could hijack a thread
-               return -1;
-}
-
 static void* thread_loop(void *arg)
 {
        struct thread *t = (struct thread *) arg;
@@ -372,22 +409,12 @@ static void* thread_loop(void *arg)
        pid_t pid =syscall(SYS_gettid);
        uint64_t loops;
        uint64_t threshold=1000/(1 + portno_end - portno_start);
-       
-       XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
 
+       XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
        xseg_init_local_signal(xseg, peer->portno_start);
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
-               if (t->func) {
-                       XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
-                       xseg_cancel_wait(xseg, peer->portno_start);
-                       t->func(t->arg);
-                       t->func = NULL;
-                       t->arg = NULL;
-                       continue;
-               }
-
-               for(loops= threshold; loops > 0; loops--) {
+               for(loops =  threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
                        if (check_ports(peer))
@@ -398,6 +425,41 @@ static void* thread_loop(void *arg)
                xseg_cancel_wait(xseg, peer->portno_start);
                XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
        }
+       wake_up_next_thread(peer);
+       custom_peer_finalize(peer);
+       return NULL;
+}
+
+void *init_thread_loop(void *arg)
+{
+       struct thread *t = (struct thread *) arg;
+       struct peerd *peer = t->peer;
+       char *thread_id;
+       int i;
+
+       /*
+        * We need an identifier for every thread that will spin in peerd_loop.
+        * The following code is a way to create a string of this format:
+        *              "Thread <num>"
+        * minus the null terminator. What we do is we create this string with
+        * snprintf and then resize it to exclude the null terminator with
+        * realloc. Finally, the result string is passed to the (void *arg) field
+        * of struct thread.
+        *
+        * Since the highest thread number can't be more than 5 digits, using 13
+        * chars should be more than enough.
+        */
+       thread_id = malloc(13 * sizeof(char));
+       snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
+       for (i = 0; thread_id[i]; i++) {}
+       t->arg = (void *)realloc(thread_id, i-1);
+
+       //Start thread loop
+       (void)peer->peerd_loop(t);
+
+       wake_up_next_thread(peer);
+       custom_peer_finalize(peer);
+
        return NULL;
 }
 
@@ -407,67 +469,109 @@ int peerd_start_threads(struct peerd *peer)
        uint32_t nr_threads = peer->nr_threads;
        //TODO err check
        for (i = 0; i < nr_threads; i++) {
+               peer->thread[i].func = NULL;
+               peer->thread[i].arg = NULL;
                peer->thread[i].peer = peer;
                pthread_cond_init(&peer->thread[i].cond,NULL);
                pthread_mutex_init(&peer->thread[i].lock, NULL);
-               pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
-               peer->thread[i].func = NULL;
-               peer->thread[i].arg = NULL;
+               pthread_create(&peer->thread[i].tid, NULL,
+                                       init_thread_loop, (void *)(peer->thread + i));
+       }
 
+       if (peer->interactive_func)
+               peer->interactive_func();
+       for (i = 0; i < nr_threads; i++) {
+               pthread_join(peer->thread[i].tid, NULL);
        }
+
        return 0;
 }
 #endif
 
-void defer_request(struct peerd *peer, struct peer_req *pr)
+int defer_request(struct peerd *peer, struct peer_req *pr)
 {
-       // assert canDefer(peer);
-//     xseg_submit(peer->xseg, peer->defer_portno, pr->req);
-//     xseg_signal(peer->xseg, peer->defer_portno);
-//     free_peer_req(peer, pr);
+       int r;
+       xport p;
+       if (!canDefer(peer)){
+               XSEGLOG2(&lc, E, "Peer cannot defer requests");
+               return -1;
+       }
+       p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
+                       X_ALLOC);
+       if (p == NoPort){
+               XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
+               return -1;
+       }
+       r = xseg_signal(peer->xseg, p);
+       if (r < 0) {
+               XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
+       }
+       free_peer_req(peer, pr);
+       return 0;
 }
 
-static int peerd_loop(struct peerd *peer) 
+/*
+ * generic_peerd_loop is a general-purpose port-checker loop that is
+ * suitable both for multi-threaded and single-threaded peers.
+ */
+static int generic_peerd_loop(void *arg)
 {
 #ifdef MT
-       int i;
-       if (peer->interactive_func)
-               peer->interactive_func();
-       for (i = 0; i < peer->nr_threads; i++) {
-               pthread_join(peer->thread[i].tid, NULL);
-       }
+       struct thread *t = (struct thread *) arg;
+       struct peerd *peer = t->peer;
+       char *id = t->arg;
 #else
+       struct peerd *peer = (struct peerd *) arg;
+       char id[4] = {'P','e','e','r'};
+#endif
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
        xport portno_end = peer->portno_end;
+       pid_t pid = syscall(SYS_gettid);
        uint64_t threshold=1000/(1 + portno_end - portno_start);
-       pid_t pid =syscall(SYS_gettid);
        uint64_t loops;
-       
-       XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
+
+       XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
        xseg_init_local_signal(xseg, peer->portno_start);
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
-               for(loops= threshold; loops > 0; loops--) {
-                       if (loops == 1)
-                               xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+               if (t->func) {
+                       XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
+                       xseg_cancel_wait(xseg, peer->portno_start);
+                       t->func(t->arg);
+                       t->func = NULL;
+                       t->arg = NULL;
+                       continue;
+               }
+#endif
+               //Heart of peerd_loop. This loop is common for everyone.
+               for(loops = threshold; loops > 0; loops--) {
                        if (check_ports(peer))
                                loops = threshold;
                }
+               xseg_prepare_wait(xseg, peer->portno_start);
 #ifdef ST_THREADS
                if (ta){
                        st_sleep(0);
-               } else {
-#endif
-                       XSEGLOG2(&lc, I, "Peer goes to sleep\n");
-                       xseg_wait_signal(xseg, 10000000UL);
-                       xseg_cancel_wait(xseg, peer->portno_start);
-                       XSEGLOG2(&lc, I, "Peer woke up\n");
-#ifdef ST_THREADS
+                       continue;
                }
 #endif
+               XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
+               xseg_wait_signal(xseg, 10000000UL);
+               xseg_cancel_wait(xseg, peer->portno_start);
+               XSEGLOG2(&lc, I, "%s woke up\n", id);
        }
+       return 0;
+}
+
+static int init_peerd_loop(struct peerd *peer)
+{
+       struct xseg *xseg = peer->xseg;
+
+       peer->peerd_loop(peer);
+       custom_peer_finalize(peer);
        xseg_quit_local_signal(xseg, peer->portno_start);
-#endif
+
        return 0;
 }
 
@@ -486,7 +590,7 @@ static struct xseg *join(char *spec)
 }
 
 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
-                       long portno_end, uint32_t nr_threads, uint32_t defer_portno)
+                       long portno_end, uint32_t nr_threads, xport defer_portno)
 {
        int i;
        struct peerd *peer;
@@ -526,14 +630,14 @@ malloc_fail:
                return NULL;
        }
        peer->xseg = join(spec);
-       if (!peer->xseg) 
+       if (!peer->xseg)
                return NULL;
 
        peer->portno_start = (xport) portno_start;
        peer->portno_end= (xport) portno_end;
        port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
        if (!port){
-               printf("cannot bind to port %ld\n", peer->portno_start);
+               printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
                return NULL;
        }
 
@@ -542,7 +646,7 @@ malloc_fail:
                struct xseg_port *tmp;
                tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
                if (!tmp){
-                       printf("cannot bind to port %ld\n", p);
+                       printf("cannot bind to port %u\n", (unsigned int) p);
                        return NULL;
                }
        }
@@ -556,6 +660,10 @@ malloc_fail:
                peer->peer_reqs[i].retval = 0;
                peer->peer_reqs[i].priv = NULL;
                peer->peer_reqs[i].portno = NoPort;
+
+       //Plug default peerd_loop. This can change later on by custom_peer_init.
+       peer->peerd_loop = generic_peerd_loop;
+
 #ifdef ST_THREADS
                peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
 #endif
@@ -575,11 +683,11 @@ int pidfile_remove(char *path, int fd)
 int pidfile_write(int pid_fd)
 {
        char buf[16];
-       snprintf(buf, sizeof(buf), "%d", syscall(SYS_gettid));
+       snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
        buf[15] = 0;
-       
+
        lseek(pid_fd, 0, SEEK_SET);
-       int ret = write(pid_fd, buf, 16);
+       int ret = write(pid_fd, buf, strlen(buf));
        return ret;
 }
 
@@ -609,101 +717,97 @@ int pidfile_read(char *path, pid_t *pid)
 int pidfile_open(char *path, pid_t *old_pid)
 {
        //nfs version > 3
-       int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
+       int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
        if (fd < 0){
-               if (errno == -EEXIST)
+               if (errno == EEXIST)
                        pidfile_read(path, old_pid);
        }
        return fd;
 }
 
+void usage(char *argv0)
+{
+       fprintf(stderr, "Usage: %s [general options] [custom peer options]\n\n", argv0);
+       fprintf(stderr, "General peer options:\n"
+               "  Option      | Default | \n"
+               "  --------------------------------------------\n"
+               "    -g        | None    | Segment spec to join\n"
+               "    -sp       | NoPort  | Start portno to bind\n"
+               "    -ep       | NoPort  | End portno to bind\n"
+               "    -p        | NoPort  | Portno to bind\n"
+               "    -n        | 16      | Number of ops\n"
+               "    -v        | 0       | Verbosity level\n"
+               "    -l        | None    | Logfile \n"
+               "    -d        | No      | Daemonize \n"
+               "    --pidfile | None    | Pidfile \n"
+#ifdef MT
+               "    -t        | No      | Number of threads \n"
+#endif
+               "\n"
+              );
+       custom_peer_usage();
+}
+
 int main(int argc, char *argv[])
 {
        struct peerd *peer = NULL;
        //parse args
-       char *spec = "";
-       int i, r, daemonize = 0;
+       int r;
        long portno_start = -1, portno_end = -1, portno = -1;
+
        //set defaults here
+       int daemonize = 0, help = 0;
        uint32_t nr_ops = 16;
-       uint32_t nr_threads = 16 ;
+       uint32_t nr_threads = 1;
        unsigned int debug_level = 0;
-       uint32_t defer_portno = NoPort;
-       char *logfile = NULL;
-       char *pidfile = NULL;
+       xport defer_portno = NoPort;
        pid_t old_pid;
        int pid_fd = -1;
 
+       char spec[MAX_SPEC_LEN + 1];
+       char logfile[MAX_LOGFILE_LEN + 1];
+       char pidfile[MAX_PIDFILE_LEN + 1];
+
+       logfile[0] = 0;
+       pidfile[0] = 0;
+       spec[0] = 0;
+
        //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
        // -dp xseg_portno to defer blocking requests
        // -l log file ?
        //TODO print messages on arg parsing error
-       
-       for (i = 1; i < argc; i++) {
-               if (!strcmp(argv[i], "-g") && i + 1 < argc) {
-                       spec = argv[i+1];
-                       i += 1;
-                       continue;
-               }
-
-               if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
-                       portno_start = strtoul(argv[i+1], NULL, 10);
-                       i += 1;
-                       continue;
-               }
-               
-               if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
-                       portno_end = strtoul(argv[i+1], NULL, 10);
-                       i += 1;
-                       continue;
-               }
-
-               if (!strcmp(argv[i], "-p") && i + 1 < argc) {
-                       portno = strtoul(argv[i+1], NULL, 10);
-                       i += 1;
-                       continue;
-               }
-
-               if (!strcmp(argv[i], "-n") && i + 1 < argc) {
-                       nr_ops = strtoul(argv[i+1], NULL, 10);
-                       i += 1;
-                       continue;
-               }
-               if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
-                       debug_level = atoi(argv[i+1]);
-                       i += 1;
-                       continue;
-               }
-               if (!strcmp(argv[i], "-t") && i + 1 < argc ) {
-                       nr_threads = strtoul(argv[i+1], NULL, 10);
-                       i += 1;
-                       continue;
-               }
-               if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
-                       defer_portno = strtoul(argv[i+1], NULL, 10);
-                       i += 1;
-                       continue;
-               }
-               if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
-                       logfile = argv[i+1];
-                       i += 1;
-                       continue;
-               }
-               if (!strcmp(argv[i], "-d")) {
-                       daemonize = 1;
-                       continue;
-               }
-               if (!strcmp(argv[i], "--pidfile") && i + 1 < argc ) {
-                       pidfile = argv[i+1];
-                       i += 1;
-                       continue;
-               }
+       BEGIN_READ_ARGS(argc, argv);
+       READ_ARG_STRING("-g", spec, MAX_SPEC_LEN);
+       READ_ARG_ULONG("-sp", portno_start);
+       READ_ARG_ULONG("-ep", portno_end);
+       READ_ARG_ULONG("-p", portno);
+       READ_ARG_ULONG("-n", nr_ops);
+       READ_ARG_ULONG("-v", debug_level);
+#ifdef MT
+       READ_ARG_ULONG("-t", nr_threads);
+#endif
+       READ_ARG_ULONG("-dp", defer_portno);
+       READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
+       READ_ARG_BOOL("-d", daemonize);
+       READ_ARG_BOOL("-h", help);
+       READ_ARG_BOOL("--help", help);
+       READ_ARG_STRING("--pidfile", pidfile, MAX_PIDFILE_LEN);
+       END_READ_ARGS();
+
+       if (help){
+               usage(argv[0]);
+               return 0;
+       }
 
+       r = init_logctx(&lc, argv[0], debug_level, logfile,
+                       REDIRECT_STDOUT|REDIRECT_STDERR);
+       if (r < 0){
+               XSEGLOG("Cannot initialize logging to logfile");
+               return -1;
        }
-       init_logctx(&lc, argv[0], debug_level, logfile);
        XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
-       
-       if (pidfile){
+
+       if (pidfile[0]){
                pid_fd = pidfile_open(pidfile, &old_pid);
                if (pid_fd < 0) {
                        if (old_pid) {
@@ -714,44 +818,49 @@ int main(int argc, char *argv[])
                        return -1;
                }
        }
-       
+
        if (daemonize){
                if (daemon(0, 1) < 0){
                        XSEGLOG2(&lc, E, "Cannot daemonize");
-                       if (pid_fd > 0)
-                               pidfile_remove(pidfile, pid_fd);
-                       return -1;
-
+                       r = -1;
+                       goto out;
                }
        }
 
        pidfile_write(pid_fd);
-       
+
        //TODO perform argument sanity checks
        verbose = debug_level;
        if (portno != -1) {
                portno_start = portno;
                portno_end = portno;
        }
+       if (portno_start == -1 || portno_end == -1){
+               XSEGLOG2(&lc, E, "Portno or {portno_start, portno_end} must be supplied");
+               usage(argv[0]);
+               r = -1;
+               goto out;
+       }
 
-       setup_signals();
-       //TODO err check
        peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
-       if (!peer)
-               return -1;
+       if (!peer){
+               r = -1;
+               goto out;
+       }
+       setup_signals(peer);
        r = custom_peer_init(peer, argc, argv);
        if (r < 0)
-               return -1;
-#ifdef MT
+               goto out;
+#if defined(MT)
+       //TODO err check
        peerd_start_threads(peer);
-#endif
-
-#ifdef ST_THREADS
-       st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
+#elif defined(ST_THREADS)
+       st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
        r = st_thread_join(st, NULL);
 #else
-       r = peerd_loop(peer);
+       r = init_peerd_loop(peer);
 #endif
+out:
        if (pid_fd > 0)
                pidfile_remove(pidfile, pid_fd);
        return r;