Make peers close stdin when entering daemon mode.
[archipelago] / xseg / peers / user / peer.c
index fd79a27..b20e1b0 100644 (file)
@@ -43,7 +43,6 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <errno.h>
-
 #ifdef MT
 #include <pthread.h>
 #endif
 #include <peer.h>
 
 #ifdef MT
+#ifdef ST_THREADS
+#error "MT and ST_THREADS defines are mutually exclusive"
+#endif
+#endif
+
+#ifdef MT
 #define PEER_TYPE "pthread"
 #else
 #define PEER_TYPE "posix"
@@ -70,46 +75,7 @@ uint32_t ta = 0;
 
 #ifdef MT
 struct peerd *global_peer;
-
-struct thread {
-       struct peerd *peer;
-       pthread_t tid;
-       pthread_cond_t cond;
-       pthread_mutex_t lock;
-       int thread_no;
-       void (*func)(void *arg);
-       void *arg;
-       void *priv;
-};
-
-inline static struct thread* alloc_thread(struct peerd *peer)
-{
-       xqindex idx = xq_pop_head(&peer->threads, 1);
-       if (idx == Noneidx)
-               return NULL;
-       return peer->thread + idx;
-}
-
-inline static void free_thread(struct peerd *peer, struct thread *t)
-{
-       xqindex idx = t - peer->thread;
-       xq_append_head(&peer->threads, idx, 1);
-}
-
-
-inline static void __wake_up_thread(struct thread *t)
-{
-       pthread_mutex_lock(&t->lock);
-       pthread_cond_signal(&t->cond);
-       pthread_mutex_unlock(&t->lock);
-}
-
-inline static void wake_up_thread(struct thread* t)
-{
-       if (t){
-               __wake_up_thread(t);
-       }
-}
+static pthread_key_t threadkey;
 
 inline static int wake_up_next_thread(struct peerd *peer)
 {
@@ -124,7 +90,7 @@ inline static int wake_up_next_thread(struct peerd *peer)
 
 void signal_handler(int signal)
 {
-       XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
+//     XSEGLOG2(&lc, I, "Caught signal. Terminating gracefully");
        terminated = 1;
 #ifdef MT
        wake_up_next_thread(global_peer);
@@ -133,7 +99,7 @@ void signal_handler(int signal)
 
 void renew_logfile(int signal)
 {
-       XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
+//     XSEGLOG2(&lc, I, "Caught signal. Renewing logfile");
        renew_logctx(&lc, NULL, verbose, NULL, REOPEN_FILE);
 }
 
@@ -233,6 +199,34 @@ void log_pr(char *msg, struct peer_req *pr)
        }
 }
 
+#ifdef MT
+inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
+{
+       struct peer_req *pr;
+       xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
+       if (idx != Noneidx)
+               goto out;
+
+       /* try to steal from another thread */
+       /*
+       int i;
+       struct thread *nt;
+       for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
+               nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
+               if (!xq_count(&nt->free_thread_reqs))
+                               continue;
+               idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
+               if (idx != Noneidx)
+                       goto out;
+       }
+       */
+       return NULL;
+out:
+       pr = peer->peer_reqs + idx;
+       pr->thread_no = t - peer->thread;
+       return pr;
+}
+#else
 /*
  * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
  * queue. If a pointer from peer_reqs is popped, we are certain that the
@@ -245,12 +239,39 @@ inline struct peer_req *alloc_peer_req(struct peerd *peer)
                return NULL;
        return peer->peer_reqs + idx;
 }
+#endif
 
 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
 {
        xqindex idx = pr - peer->peer_reqs;
        pr->req = NULL;
+#ifdef MT
+       struct thread *t = &peer->thread[pr->thread_no];
+       xq_append_head(&t->free_thread_reqs, idx, 1);
+#else
        xq_append_head(&peer->free_reqs, idx, 1);
+#endif
+}
+
+/*
+ * Count all free reqs in peer.
+ * Racy, if multithreaded, but the sum should monotonicly increase when checked
+ * after a termination signal is catched.
+ */
+int all_peer_reqs_free(struct peerd *peer)
+{
+       uint32_t free_reqs = 0;
+#ifdef MT
+       int i;
+       for (i = 0; i < peer->nr_threads; i++) {
+               free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
+       }
+#else
+       free_reqs = xq_count(&peer->free_reqs);
+#endif
+       if (free_reqs == peer->nr_ops)
+               return 1;
+       return 0;
 }
 
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
@@ -266,11 +287,13 @@ void fail(struct peerd *peer, struct peer_req *pr)
 {
        struct xseg_request *req = pr->req;
        uint32_t p;
-       XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
-       req->state |= XS_FAILED;
-       //xseg_set_req_data(peer->xseg, pr->req, NULL);
-       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
-       xseg_signal(peer->xseg, p);
+       if (req){
+               XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
+               req->state |= XS_FAILED;
+               //xseg_set_req_data(peer->xseg, pr->req, NULL);
+               p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+               xseg_signal(peer->xseg, p);
+       }
        free_peer_req(peer, pr);
 #ifdef MT
        wake_up_next_thread(peer);
@@ -282,20 +305,18 @@ void complete(struct peerd *peer, struct peer_req *pr)
 {
        struct xseg_request *req = pr->req;
        uint32_t p;
-       int r;
-
-       req->state |= XS_SERVED;
-       //xseg_set_req_data(peer->xseg, pr->req, NULL);
-       //gettimeofday(&resp_start, NULL);
-       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
-       //gettimeofday(&resp_end, NULL);
-       //responds++;
-       //timersub(&resp_end, &resp_start, &resp_end);
-       //timeradd(&resp_end, &resp_accum, &resp_accum);
-       //printf("xseg_signal: %u\n", p);
-       r = xseg_signal(peer->xseg, p);
-       if (r < 0)
-               XSEGLOG2(&lc, W, "Cannot signal destination peer (reason %d)\n", r);
+       if (req){
+               req->state |= XS_SERVED;
+               //xseg_set_req_data(peer->xseg, pr->req, NULL);
+               //gettimeofday(&resp_start, NULL);
+               p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+               //gettimeofday(&resp_end, NULL);
+               //responds++;
+               //timersub(&resp_end, &resp_start, &resp_end);
+               //timeradd(&resp_end, &resp_accum, &resp_accum);
+               //printf("xseg_signal: %u\n", p);
+               xseg_signal(peer->xseg, p);
+       }
        free_peer_req(peer, pr);
 #ifdef MT
        wake_up_next_thread(peer);
@@ -354,7 +375,11 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
        return 0;
 }
 
+#ifdef MT
+int check_ports(struct peerd *peer, struct thread *t)
+#else
 int check_ports(struct peerd *peer)
+#endif
 {
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
@@ -367,11 +392,12 @@ int check_ports(struct peerd *peer)
        for (i = portno_start; i <= portno_end; i++) {
                accepted = NULL;
                received = NULL;
-               //Shouldn't we just leave?
                if (!isTerminate()) {
-                       //Better way than alloc/free all the time?
-                       //Cache the allocated peer_req?
+#ifdef MT
+                       pr = alloc_peer_req(peer, t); 
+#else
                        pr = alloc_peer_req(peer);
+#endif
                        if (pr) {
                                accepted = xseg_accept(xseg, i, X_NONBLOCK);
                                if (accepted) {
@@ -430,7 +456,7 @@ static void* thread_loop(void *arg)
                for(loops =  threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
-                       if (check_ports(peer))
+                       if (check_ports(peer, t))
                                loops = threshold;
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
@@ -466,6 +492,7 @@ void *init_thread_loop(void *arg)
        snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
        for (i = 0; thread_id[i]; i++) {}
        t->arg = (void *)realloc(thread_id, i-1);
+       pthread_setspecific(threadkey, t);
 
        //Start thread loop
        (void)peer->peerd_loop(t);
@@ -482,11 +509,8 @@ int peerd_start_threads(struct peerd *peer)
        uint32_t nr_threads = peer->nr_threads;
        //TODO err check
        for (i = 0; i < nr_threads; i++) {
-               peer->thread[i].func = NULL;
-               peer->thread[i].arg = NULL;
+               peer->thread[i].thread_no = i;
                peer->thread[i].peer = peer;
-               pthread_cond_init(&peer->thread[i].cond,NULL);
-               pthread_mutex_init(&peer->thread[i].lock, NULL);
                pthread_create(&peer->thread[i].tid, NULL,
                                        init_thread_loop, (void *)(peer->thread + i));
        }
@@ -546,22 +570,17 @@ static int generic_peerd_loop(void *arg)
 
        XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
        xseg_init_local_signal(xseg, peer->portno_start);
-       for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
-#ifdef MT
-               if (t->func) {
-                       XSEGLOG2(&lc, D, "%s executes function\n", id);
-                       xseg_cancel_wait(xseg, peer->portno_start);
-                       t->func(t->arg);
-                       t->func = NULL;
-                       t->arg = NULL;
-                       continue;
-               }
-#endif
+       //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
+       for (;!(isTerminate() && all_peer_reqs_free(peer));) {
                //Heart of peerd_loop. This loop is common for everyone.
                for(loops = threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+                       if (check_ports(peer, t))
+#else
                        if (check_ports(peer))
+#endif
                                loops = threshold;
                }
 #ifdef ST_THREADS
@@ -627,6 +646,20 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
        peer->thread = calloc(nr_threads, sizeof(struct thread));
        if (!peer->thread)
                goto malloc_fail;
+       if (!xq_alloc_empty(&peer->threads, nr_threads))
+               goto malloc_fail;
+       for (i = 0; i < nr_threads; i++) {
+               if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
+                       goto malloc_fail;
+       }
+       for (i = 0; i < nr_ops; i++) {
+               __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
+       }
+
+       pthread_key_create(&threadkey, NULL);
+#else
+       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
+               goto malloc_fail;
 #endif
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
        if (!peer->peer_reqs){
@@ -634,13 +667,6 @@ malloc_fail:
                perror("malloc");
                return NULL;
        }
-
-       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
-               goto malloc_fail;
-#ifdef MT
-       if (!xq_alloc_empty(&peer->threads, nr_threads))
-               goto malloc_fail;
-#endif
        if (xseg_initialize()){
                printf("cannot initialize library\n");
                return NULL;
@@ -675,14 +701,14 @@ malloc_fail:
                peer->peer_reqs[i].retval = 0;
                peer->peer_reqs[i].priv = NULL;
                peer->peer_reqs[i].portno = NoPort;
-
-       //Plug default peerd_loop. This can change later on by custom_peer_init.
-       peer->peerd_loop = generic_peerd_loop;
-
 #ifdef ST_THREADS
                peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
 #endif
        }
+
+       //Plug default peerd_loop. This can change later on by custom_peer_init.
+       peer->peerd_loop = generic_peerd_loop;
+
 #ifdef MT
        peer->interactive_func = NULL;
 #endif
@@ -835,6 +861,9 @@ int main(int argc, char *argv[])
        }
 
        if (daemonize){
+               if (close(STDIN_FILENO)){
+                       XSEGLOG2(&lc, W, "Could not close stdin");
+               }
                if (daemon(0, 1) < 0){
                        XSEGLOG2(&lc, E, "Cannot daemonize");
                        r = -1;