Complete unification of thread_loop/peerd_loop
[archipelago] / xseg / peers / user / peer.c
index c2a2926..f3ab4eb 100644 (file)
@@ -76,11 +76,11 @@ struct thread {
        pthread_t tid;
        pthread_cond_t cond;
        pthread_mutex_t lock;
+       int thread_no;
        void (*func)(void *arg);
        void *arg;
 };
 
-
 inline static struct thread* alloc_thread(struct peerd *peer)
 {
        xqindex idx = xq_pop_head(&peer->threads, 1);
@@ -116,21 +116,10 @@ inline static int wake_up_next_thread(struct peerd *peer)
 }
 #endif
 
-
-static inline int isTerminate()
-{
-/* ta doesn't need to be taken into account, because the main loops
- * doesn't check the terminated flag if ta is not 0.
+/*
+ * extern is needed if this function is going to be called by another file
+ * such as bench-xseg.c
  */
-       /*
-#ifdef ST_THREADS
-       return (!ta & terminated);
-#else
-       return terminated;
-#endif
-       */
-       return terminated;
-}
 
 void signal_handler(int signal)
 {
@@ -303,7 +292,7 @@ void complete(struct peerd *peer, struct peer_req *pr)
 #endif
 }
 
-static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
+static void handle_accepted(struct peerd *peer, struct peer_req *pr,
                                struct xseg_request *req)
 {
        struct xseg_request *xreq = pr->req;
@@ -355,7 +344,7 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
        return 0;
 }
 
-static int check_ports(struct peerd *peer)
+int check_ports(struct peerd *peer)
 {
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
@@ -410,19 +399,6 @@ static int check_ports(struct peerd *peer)
 }
 
 #ifdef MT
-int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
-{
-       struct thread *t = alloc_thread(peer);
-       if (t) {
-               t->func = func;
-               t->arg = arg;
-               wake_up_thread(t);
-               return 0;
-       } else
-               // we could hijack a thread
-               return -1;
-}
-
 static void* thread_loop(void *arg)
 {
        struct thread *t = (struct thread *) arg;
@@ -435,20 +411,10 @@ static void* thread_loop(void *arg)
        uint64_t threshold=1000/(1 + portno_end - portno_start);
 
        XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
-
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
        xseg_init_local_signal(xseg, peer->portno_start);
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
-               if (t->func) {
-                       XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
-                       xseg_cancel_wait(xseg, peer->portno_start);
-                       t->func(t->arg);
-                       t->func = NULL;
-                       t->arg = NULL;
-                       continue;
-               }
-
-               for(loops= threshold; loops > 0; loops--) {
+               for(loops =  threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
                        if (check_ports(peer))
@@ -464,25 +430,64 @@ static void* thread_loop(void *arg)
        return NULL;
 }
 
+void *init_thread_loop(void *arg)
+{
+       struct thread *t = (struct thread *) arg;
+       struct peerd *peer = t->peer;
+       char *thread_id;
+       int i;
+
+       /*
+        * We need an identifier for every thread that will spin in peerd_loop.
+        * The following code is a way to create a string of this format:
+        *              "Thread <num>"
+        * minus the null terminator. What we do is we create this string with
+        * snprintf and then resize it to exclude the null terminator with
+        * realloc. Finally, the result string is passed to the (void *arg) field
+        * of struct thread.
+        *
+        * Since the highest thread number can't be more than 5 digits, using 13
+        * chars should be more than enough.
+        */
+       thread_id = malloc(13 * sizeof(char));
+       snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
+       for (i = 0; thread_id[i]; i++) {}
+       t->arg = (void *)realloc(thread_id, i-1);
+
+       //Start thread loop
+       (void)peer->peerd_loop(t);
+
+       wake_up_next_thread(peer);
+       custom_peer_finalize(peer);
+
+       return NULL;
+}
+
 int peerd_start_threads(struct peerd *peer)
 {
        int i;
        uint32_t nr_threads = peer->nr_threads;
        //TODO err check
        for (i = 0; i < nr_threads; i++) {
+               peer->thread[i].func = NULL;
+               peer->thread[i].arg = NULL;
                peer->thread[i].peer = peer;
                pthread_cond_init(&peer->thread[i].cond,NULL);
                pthread_mutex_init(&peer->thread[i].lock, NULL);
-               pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
-               peer->thread[i].func = NULL;
-               peer->thread[i].arg = NULL;
+               pthread_create(&peer->thread[i].tid, NULL,
+                                       init_thread_loop, (void *)(peer->thread + i));
+       }
 
+       if (peer->interactive_func)
+               peer->interactive_func();
+       for (i = 0; i < nr_threads; i++) {
+               pthread_join(peer->thread[i].tid, NULL);
        }
+
        return 0;
 }
 #endif
 
-
 int defer_request(struct peerd *peer, struct peer_req *pr)
 {
        int r;
@@ -505,48 +510,68 @@ int defer_request(struct peerd *peer, struct peer_req *pr)
        return 0;
 }
 
-static int peerd_loop(struct peerd *peer) 
+/*
+ * generic_peerd_loop is a general-purpose port-checker loop that is
+ * suitable both for multi-threaded and single-threaded peers.
+ */
+static int generic_peerd_loop(void *arg)
 {
 #ifdef MT
-       int i;
-       if (peer->interactive_func)
-               peer->interactive_func();
-       for (i = 0; i < peer->nr_threads; i++) {
-               pthread_join(peer->thread[i].tid, NULL);
-       }
+       struct thread *t = (struct thread *) arg;
+       struct peerd *peer = t->peer;
+       char *id = t->arg;
 #else
+       struct peerd *peer = (struct peerd *) arg;
+       char id[4] = {'P','e','e','r'};
+#endif
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
        xport portno_end = peer->portno_end;
+       pid_t pid = syscall(SYS_gettid);
        uint64_t threshold=1000/(1 + portno_end - portno_start);
-       pid_t pid =syscall(SYS_gettid);
        uint64_t loops;
 
-       XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
+       XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
        xseg_init_local_signal(xseg, peer->portno_start);
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
-               for(loops= threshold; loops > 0; loops--) {
-                       if (loops == 1)
-                               xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+               if (t->func) {
+                       XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
+                       xseg_cancel_wait(xseg, peer->portno_start);
+                       t->func(t->arg);
+                       t->func = NULL;
+                       t->arg = NULL;
+                       continue;
+               }
+#endif
+               //Heart of peerd_loop. This loop is common for everyone.
+               for(loops = threshold; loops > 0; loops--) {
                        if (check_ports(peer))
                                loops = threshold;
                }
+               xseg_prepare_wait(xseg, peer->portno_start);
 #ifdef ST_THREADS
                if (ta){
                        st_sleep(0);
-               } else {
-#endif
-                       XSEGLOG2(&lc, I, "Peer goes to sleep\n");
-                       xseg_wait_signal(xseg, 10000000UL);
-                       xseg_cancel_wait(xseg, peer->portno_start);
-                       XSEGLOG2(&lc, I, "Peer woke up\n");
-#ifdef ST_THREADS
+                       continue;
                }
 #endif
+               XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
+               xseg_wait_signal(xseg, 10000000UL);
+               xseg_cancel_wait(xseg, peer->portno_start);
+               XSEGLOG2(&lc, I, "%s woke up\n", id);
        }
+       return 0;
+}
+
+static int init_peerd_loop(struct peerd *peer)
+{
+       struct xseg *xseg = peer->xseg;
+
+       peer->peerd_loop(peer);
        custom_peer_finalize(peer);
        xseg_quit_local_signal(xseg, peer->portno_start);
-#endif
+
        return 0;
 }
 
@@ -605,7 +630,7 @@ malloc_fail:
                return NULL;
        }
        peer->xseg = join(spec);
-       if (!peer->xseg) 
+       if (!peer->xseg)
                return NULL;
 
        peer->portno_start = (xport) portno_start;
@@ -635,6 +660,10 @@ malloc_fail:
                peer->peer_reqs[i].retval = 0;
                peer->peer_reqs[i].priv = NULL;
                peer->peer_reqs[i].portno = NoPort;
+
+       //Plug default peerd_loop. This can change later on by custom_peer_init.
+       peer->peerd_loop = generic_peerd_loop;
+
 #ifdef ST_THREADS
                peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
 #endif
@@ -822,16 +851,14 @@ int main(int argc, char *argv[])
        r = custom_peer_init(peer, argc, argv);
        if (r < 0)
                goto out;
-#ifdef MT
+#if defined(MT)
        //TODO err check
        peerd_start_threads(peer);
-#endif
-
-#ifdef ST_THREADS
-       st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
+#elif defined(ST_THREADS)
+       st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
        r = st_thread_join(st, NULL);
 #else
-       r = peerd_loop(peer);
+       r = init_peerd_loop(peer);
 #endif
 out:
        if (pid_fd > 0)