Fix peer termination.
authorFilippos Giannakos <philipgian@grnet.gr>
Thu, 14 Mar 2013 09:27:34 +0000 (11:27 +0200)
committerFilippos Giannakos <philipgian@grnet.gr>
Thu, 14 Mar 2013 16:00:21 +0000 (18:00 +0200)
Peer checks if all peer requests are free in all thread, and then proceeds to
gracefull shutdown.

Also introduce pthread specific variable, holding the struct thread for each
thread.

xseg/peers/user/peer.c
xseg/peers/user/peer.h

index 5021849..f082a8b 100644 (file)
@@ -69,6 +69,7 @@ uint32_t ta = 0;
 
 #ifdef MT
 struct peerd *global_peer;
+static pthread_key_t threadkey;
 
 /*
 struct thread {
@@ -239,14 +240,23 @@ inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
 {
        struct peer_req *pr;
        struct thread *nt;
+       int i;
        xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
        if (idx != Noneidx)
                goto out;
-       /* try to steal from the next thread */
-       nt = &peer->thread[(t->thread_no + 1) % peer->nr_threads];
-       idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
-       if (idx == Noneidx)
-               return NULL;
+
+       /* try to steal from another thread */
+       /*
+       for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
+               nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
+               if (!xq_count(&nt->free_thread_reqs))
+                               continue;
+               idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
+               if (idx != Noneidx)
+                       goto out;
+       }
+       */
+       return NULL;
 out:
        pr = peer->peer_reqs + idx;
        pr->thread_no = t - peer->thread;
@@ -279,6 +289,27 @@ inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
 #endif
 }
 
+/*
+ * Count all free reqs in peer.
+ * Racy, if multithreaded, but the sum should monotonicly increase when checked
+ * after a termination signal is catched.
+ */
+int all_peer_reqs_free(struct peerd *peer)
+{
+       uint32_t free_reqs = 0;
+       int i;
+#ifdef MT
+       for (i = 0; i < peer->nr_threads; i++) {
+               free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
+       }
+#else
+       free_reqs = xq_count(&peer->free_reqs);
+#endif
+       if (free_reqs == peer->nr_ops)
+               return 1;
+       return 0;
+}
+
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
 uint64_t responds = 0;
 void get_responds_stats(){
@@ -397,10 +428,7 @@ int check_ports(struct peerd *peer)
        for (i = portno_start; i <= portno_end; i++) {
                accepted = NULL;
                received = NULL;
-               //Shouldn't we just leave?
                if (!isTerminate()) {
-                       //Better way than alloc/free all the time?
-                       //Cache the allocated peer_req?
 #ifdef MT
                        pr = alloc_peer_req(peer, t); 
 #else
@@ -500,6 +528,7 @@ void *init_thread_loop(void *arg)
        snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
        for (i = 0; thread_id[i]; i++) {}
        t->arg = (void *)realloc(thread_id, i-1);
+       pthread_setspecific(threadkey, t);
 
        //Start thread loop
        (void)peer->peerd_loop(t);
@@ -580,7 +609,8 @@ static int generic_peerd_loop(void *arg)
 
        XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
        xseg_init_local_signal(xseg, peer->portno_start);
-       for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
+       //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
+       for (;!(isTerminate() && all_peer_reqs_free(peer));) {
 #ifdef MT
                if (t->func) {
                        XSEGLOG2(&lc, D, "%s executes function\n", id);
@@ -669,6 +699,7 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
                goto malloc_fail;
        for (i = 0; i < nr_threads; i++) {
                peer->thread[i].thread_no = i;
+               peer->thread[i].peer = peer;
                if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads))
                        goto malloc_fail;
        }
@@ -676,6 +707,7 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
                __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
        }
 
+       pthread_key_create(&threadkey, NULL);
 #else
        if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
                goto malloc_fail;
index 3ea8a91..77abb83 100644 (file)
@@ -115,6 +115,7 @@ struct peerd {
        struct thread *thread;
        struct xq threads;
        void (*interactive_func)(void);
+#else
 #endif
 };
 
@@ -136,6 +137,7 @@ void get_submits_stats();
 void get_responds_stats();
 void usage();
 void print_req(struct xseg *xseg, struct xseg_request *req);
+int all_peer_reqs_free(struct peerd *peer);
 
 #ifdef MT
 int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg);