Merge branch 'develop'
[archipelago] / xseg / peers / user / peer.c
index 555bb3f..a369a02 100644 (file)
@@ -43,7 +43,6 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <errno.h>
-
 #ifdef MT
 #include <pthread.h>
 #endif
 #include <peer.h>
 
 #ifdef MT
+#ifdef ST_THREADS
+#error "MT and ST_THREADS defines are mutually exclusive"
+#endif
+#endif
+
+#ifdef MT
 #define PEER_TYPE "pthread"
 #else
 #define PEER_TYPE "posix"
@@ -70,45 +75,7 @@ uint32_t ta = 0;
 
 #ifdef MT
 struct peerd *global_peer;
-
-struct thread {
-       struct peerd *peer;
-       pthread_t tid;
-       pthread_cond_t cond;
-       pthread_mutex_t lock;
-       void (*func)(void *arg);
-       void *arg;
-};
-
-
-inline static struct thread* alloc_thread(struct peerd *peer)
-{
-       xqindex idx = xq_pop_head(&peer->threads, 1);
-       if (idx == Noneidx)
-               return NULL;
-       return peer->thread + idx;
-}
-
-inline static void free_thread(struct peerd *peer, struct thread *t)
-{
-       xqindex idx = t - peer->thread;
-       xq_append_head(&peer->threads, idx, 1);
-}
-
-
-inline static void __wake_up_thread(struct thread *t)
-{
-       pthread_mutex_lock(&t->lock);
-       pthread_cond_signal(&t->cond);
-       pthread_mutex_unlock(&t->lock);
-}
-
-inline static void wake_up_thread(struct thread* t)
-{
-       if (t){
-               __wake_up_thread(t);
-       }
-}
+static pthread_key_t threadkey;
 
 inline static int wake_up_next_thread(struct peerd *peer)
 {
@@ -116,21 +83,10 @@ inline static int wake_up_next_thread(struct peerd *peer)
 }
 #endif
 
-
-static inline int isTerminate()
-{
-/* ta doesn't need to be taken into account, because the main loops
- * doesn't check the terminated flag if ta is not 0.
+/*
+ * extern is needed if this function is going to be called by another file
+ * such as bench-xseg.c
  */
-       /*
-#ifdef ST_THREADS
-       return (!ta & terminated);
-#else
-       return terminated;
-#endif
-       */
-       return terminated;
-}
 
 void signal_handler(int signal)
 {
@@ -194,7 +150,7 @@ void print_req(struct xseg *xseg, struct xseg_request *req)
                strncpy(data, req_data, 63);
                data[63] = 0;
                printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
-                               "src: %u, st: %u, dst: %u dt: %u\n"
+                               "src: %u, transit: %u, dst: %u effective dst: %u\n"
                                "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
                                (unsigned long)(req),
                                (unsigned int)req->op,
@@ -203,9 +159,9 @@ void print_req(struct xseg *xseg, struct xseg_request *req)
                                (unsigned long)req->serviced,
                                (unsigned int)req->state,
                                (unsigned int)req->src_portno,
-                               (unsigned int)req->src_transit_portno,
+                               (unsigned int)req->transit_portno,
                                (unsigned int)req->dst_portno,
-                               (unsigned int)req->dst_transit_portno,
+                               (unsigned int)req->effective_dst_portno,
                                (unsigned int)req->targetlen, target,
                                (unsigned long long)req->datalen, data);
        }
@@ -243,6 +199,39 @@ void log_pr(char *msg, struct peer_req *pr)
        }
 }
 
+#ifdef MT
+inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
+{
+       struct peer_req *pr;
+       xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
+       if (idx != Noneidx)
+               goto out;
+
+       /* try to steal from another thread */
+       /*
+       int i;
+       struct thread *nt;
+       for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
+               nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
+               if (!xq_count(&nt->free_thread_reqs))
+                               continue;
+               idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
+               if (idx != Noneidx)
+                       goto out;
+       }
+       */
+       return NULL;
+out:
+       pr = peer->peer_reqs + idx;
+       pr->thread_no = t - peer->thread;
+       return pr;
+}
+#else
+/*
+ * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
+ * queue. If a pointer from peer_reqs is popped, we are certain that the
+ * associated memory in peer_reqs is free to use
+ */
 inline struct peer_req *alloc_peer_req(struct peerd *peer)
 {
        xqindex idx = xq_pop_head(&peer->free_reqs, 1);
@@ -250,12 +239,39 @@ inline struct peer_req *alloc_peer_req(struct peerd *peer)
                return NULL;
        return peer->peer_reqs + idx;
 }
+#endif
 
 inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
 {
        xqindex idx = pr - peer->peer_reqs;
        pr->req = NULL;
+#ifdef MT
+       struct thread *t = &peer->thread[pr->thread_no];
+       xq_append_head(&t->free_thread_reqs, idx, 1);
+#else
        xq_append_head(&peer->free_reqs, idx, 1);
+#endif
+}
+
+/*
+ * Count all free reqs in peer.
+ * Racy, if multithreaded, but the sum should monotonicly increase when checked
+ * after a termination signal is catched.
+ */
+int all_peer_reqs_free(struct peerd *peer)
+{
+       uint32_t free_reqs = 0;
+#ifdef MT
+       int i;
+       for (i = 0; i < peer->nr_threads; i++) {
+               free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
+       }
+#else
+       free_reqs = xq_count(&peer->free_reqs);
+#endif
+       if (free_reqs == peer->nr_ops)
+               return 1;
+       return 0;
 }
 
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
@@ -271,11 +287,13 @@ void fail(struct peerd *peer, struct peer_req *pr)
 {
        struct xseg_request *req = pr->req;
        uint32_t p;
-       XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
-       req->state |= XS_FAILED;
-       //xseg_set_req_data(peer->xseg, pr->req, NULL);
-       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
-       xseg_signal(peer->xseg, p);
+       if (req){
+               XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
+               req->state |= XS_FAILED;
+               //xseg_set_req_data(peer->xseg, pr->req, NULL);
+               p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+               xseg_signal(peer->xseg, p);
+       }
        free_peer_req(peer, pr);
 #ifdef MT
        wake_up_next_thread(peer);
@@ -287,23 +305,25 @@ void complete(struct peerd *peer, struct peer_req *pr)
 {
        struct xseg_request *req = pr->req;
        uint32_t p;
-       req->state |= XS_SERVED;
-       //xseg_set_req_data(peer->xseg, pr->req, NULL);
-       //gettimeofday(&resp_start, NULL);
-       p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
-       //gettimeofday(&resp_end, NULL);
-       //responds++;
-       //timersub(&resp_end, &resp_start, &resp_end);
-       //timeradd(&resp_end, &resp_accum, &resp_accum);
-       //printf("xseg_signal: %u\n", p);
-       xseg_signal(peer->xseg, p);
+       if (req){
+               req->state |= XS_SERVED;
+               //xseg_set_req_data(peer->xseg, pr->req, NULL);
+               //gettimeofday(&resp_start, NULL);
+               p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+               //gettimeofday(&resp_end, NULL);
+               //responds++;
+               //timersub(&resp_end, &resp_start, &resp_end);
+               //timeradd(&resp_end, &resp_accum, &resp_accum);
+               //printf("xseg_signal: %u\n", p);
+               xseg_signal(peer->xseg, p);
+       }
        free_peer_req(peer, pr);
 #ifdef MT
        wake_up_next_thread(peer);
 #endif
 }
 
-static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
+static void handle_accepted(struct peerd *peer, struct peer_req *pr,
                                struct xseg_request *req)
 {
        struct xseg_request *xreq = pr->req;
@@ -355,7 +375,11 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
        return 0;
 }
 
-static int check_ports(struct peerd *peer)
+#ifdef MT
+int check_ports(struct peerd *peer, struct thread *t)
+#else
+int check_ports(struct peerd *peer)
+#endif
 {
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
@@ -369,7 +393,11 @@ static int check_ports(struct peerd *peer)
                accepted = NULL;
                received = NULL;
                if (!isTerminate()) {
+#ifdef MT
+                       pr = alloc_peer_req(peer, t); 
+#else
                        pr = alloc_peer_req(peer);
+#endif
                        if (pr) {
                                accepted = xseg_accept(xseg, i, X_NONBLOCK);
                                if (accepted) {
@@ -410,19 +438,6 @@ static int check_ports(struct peerd *peer)
 }
 
 #ifdef MT
-int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
-{
-       struct thread *t = alloc_thread(peer);
-       if (t) {
-               t->func = func;
-               t->arg = arg;
-               wake_up_thread(t);
-               return 0;
-       } else
-               // we could hijack a thread
-               return -1;
-}
-
 static void* thread_loop(void *arg)
 {
        struct thread *t = (struct thread *) arg;
@@ -435,23 +450,13 @@ static void* thread_loop(void *arg)
        uint64_t threshold=1000/(1 + portno_end - portno_start);
 
        XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
-
        XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
        xseg_init_local_signal(xseg, peer->portno_start);
        for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
-               if (t->func) {
-                       XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
-                       xseg_cancel_wait(xseg, peer->portno_start);
-                       t->func(t->arg);
-                       t->func = NULL;
-                       t->arg = NULL;
-                       continue;
-               }
-
-               for(loops= threshold; loops > 0; loops--) {
+               for(loops =  threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
-                       if (check_ports(peer))
+                       if (check_ports(peer, t))
                                loops = threshold;
                }
                XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
@@ -464,83 +469,151 @@ static void* thread_loop(void *arg)
        return NULL;
 }
 
+void *init_thread_loop(void *arg)
+{
+       struct thread *t = (struct thread *) arg;
+       struct peerd *peer = t->peer;
+       char *thread_id;
+       int i;
+
+       /*
+        * We need an identifier for every thread that will spin in peerd_loop.
+        * The following code is a way to create a string of this format:
+        *              "Thread <num>"
+        * minus the null terminator. What we do is we create this string with
+        * snprintf and then resize it to exclude the null terminator with
+        * realloc. Finally, the result string is passed to the (void *arg) field
+        * of struct thread.
+        *
+        * Since the highest thread number can't be more than 5 digits, using 13
+        * chars should be more than enough.
+        */
+       thread_id = malloc(13 * sizeof(char));
+       snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
+       for (i = 0; thread_id[i]; i++) {}
+       t->arg = (void *)realloc(thread_id, i-1);
+       pthread_setspecific(threadkey, t);
+
+       //Start thread loop
+       (void)peer->peerd_loop(t);
+
+       wake_up_next_thread(peer);
+       custom_peer_finalize(peer);
+
+       return NULL;
+}
+
 int peerd_start_threads(struct peerd *peer)
 {
        int i;
        uint32_t nr_threads = peer->nr_threads;
        //TODO err check
        for (i = 0; i < nr_threads; i++) {
+               peer->thread[i].thread_no = i;
                peer->thread[i].peer = peer;
-               pthread_cond_init(&peer->thread[i].cond,NULL);
-               pthread_mutex_init(&peer->thread[i].lock, NULL);
-               pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
-               peer->thread[i].func = NULL;
-               peer->thread[i].arg = NULL;
+               pthread_create(&peer->thread[i].tid, NULL,
+                                       init_thread_loop, (void *)(peer->thread + i));
+       }
 
+       if (peer->interactive_func)
+               peer->interactive_func();
+       for (i = 0; i < nr_threads; i++) {
+               pthread_join(peer->thread[i].tid, NULL);
        }
+
        return 0;
 }
 #endif
 
-
-void defer_request(struct peerd *peer, struct peer_req *pr)
+int defer_request(struct peerd *peer, struct peer_req *pr)
 {
-       // assert canDefer(peer);
-//     xseg_submit(peer->xseg, peer->defer_portno, pr->req);
-//     xseg_signal(peer->xseg, peer->defer_portno);
-//     free_peer_req(peer, pr);
+       int r;
+       xport p;
+       if (!canDefer(peer)){
+               XSEGLOG2(&lc, E, "Peer cannot defer requests");
+               return -1;
+       }
+       p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
+                       X_ALLOC);
+       if (p == NoPort){
+               XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
+               return -1;
+       }
+       r = xseg_signal(peer->xseg, p);
+       if (r < 0) {
+               XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
+       }
+       free_peer_req(peer, pr);
+       return 0;
 }
 
-static int peerd_loop(struct peerd *peer) 
+/*
+ * generic_peerd_loop is a general-purpose port-checker loop that is
+ * suitable both for multi-threaded and single-threaded peers.
+ */
+static int generic_peerd_loop(void *arg)
 {
 #ifdef MT
-       int i;
-       if (peer->interactive_func)
-               peer->interactive_func();
-       for (i = 0; i < peer->nr_threads; i++) {
-               pthread_join(peer->thread[i].tid, NULL);
-       }
+       struct thread *t = (struct thread *) arg;
+       struct peerd *peer = t->peer;
+       char *id = t->arg;
 #else
+       struct peerd *peer = (struct peerd *) arg;
+       char id[4] = {'P','e','e','r'};
+#endif
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
        xport portno_end = peer->portno_end;
+       pid_t pid = syscall(SYS_gettid);
        uint64_t threshold=1000/(1 + portno_end - portno_start);
-       pid_t pid =syscall(SYS_gettid);
        uint64_t loops;
 
-       XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
+       XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
        xseg_init_local_signal(xseg, peer->portno_start);
-       for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
-               for(loops= threshold; loops > 0; loops--) {
+       //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
+       for (;!(isTerminate() && all_peer_reqs_free(peer));) {
+               //Heart of peerd_loop. This loop is common for everyone.
+               for(loops = threshold; loops > 0; loops--) {
                        if (loops == 1)
                                xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+                       if (check_ports(peer, t))
+#else
                        if (check_ports(peer))
+#endif
                                loops = threshold;
                }
 #ifdef ST_THREADS
                if (ta){
                        st_sleep(0);
-               } else {
-#endif
-                       XSEGLOG2(&lc, I, "Peer goes to sleep\n");
-                       xseg_wait_signal(xseg, 10000000UL);
-                       xseg_cancel_wait(xseg, peer->portno_start);
-                       XSEGLOG2(&lc, I, "Peer woke up\n");
-#ifdef ST_THREADS
+                       continue;
                }
 #endif
+               XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
+               xseg_wait_signal(xseg, 10000000UL);
+               xseg_cancel_wait(xseg, peer->portno_start);
+               XSEGLOG2(&lc, I, "%s woke up\n", id);
        }
+       return 0;
+}
+
+static int init_peerd_loop(struct peerd *peer)
+{
+       struct xseg *xseg = peer->xseg;
+
+       peer->peerd_loop(peer);
        custom_peer_finalize(peer);
        xseg_quit_local_signal(xseg, peer->portno_start);
-#endif
+
        return 0;
 }
 
 #ifdef ST_THREADS
 static void * st_peerd_loop(void *peer)
 {
-       peerd_loop(peer);
-       return 0;
+       struct peerd *peerd = peer;
+
+       return init_peerd_loop(peer);
 }
 #endif
 
@@ -559,11 +632,13 @@ static struct xseg *join(char *spec)
 }
 
 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
-                       long portno_end, uint32_t nr_threads, uint32_t defer_portno)
+                       long portno_end, uint32_t nr_threads, xport defer_portno)
 {
        int i;
        struct peerd *peer;
        struct xseg_port *port;
+       void *sd = NULL;
+       xport p;
 
 #ifdef ST_THREADS
        st_init();
@@ -580,6 +655,20 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
        peer->thread = calloc(nr_threads, sizeof(struct thread));
        if (!peer->thread)
                goto malloc_fail;
+       if (!xq_alloc_empty(&peer->threads, nr_threads))
+               goto malloc_fail;
+       for (i = 0; i < nr_threads; i++) {
+               if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
+                       goto malloc_fail;
+       }
+       for (i = 0; i < nr_ops; i++) {
+               __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
+       }
+
+       pthread_key_create(&threadkey, NULL);
+#else
+       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
+               goto malloc_fail;
 #endif
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
        if (!peer->peer_reqs){
@@ -587,41 +676,33 @@ malloc_fail:
                perror("malloc");
                return NULL;
        }
-
-       if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
-               goto malloc_fail;
-#ifdef MT
-       if (!xq_alloc_empty(&peer->threads, nr_threads))
-               goto malloc_fail;
-#endif
        if (xseg_initialize()){
                printf("cannot initialize library\n");
                return NULL;
        }
        peer->xseg = join(spec);
-       if (!peer->xseg) 
+       if (!peer->xseg)
                return NULL;
 
        peer->portno_start = (xport) portno_start;
        peer->portno_end= (xport) portno_end;
-       port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
-       if (!port){
-               printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
-               return NULL;
-       }
 
-       xport p;
-       for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
-               struct xseg_port *tmp;
-               tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
-               if (!tmp){
+       /*
+        * Start binding ports from portno_start to portno_end.
+        * The first port we bind will have its signal_desc initialized by xseg
+        * and the same signal_desc will be used for all the other ports.
+        */
+       for (p = peer->portno_start; p <= peer->portno_end; p++) {
+               port = xseg_bind_port(peer->xseg, p, sd);
+               if (!port){
                        printf("cannot bind to port %u\n", (unsigned int) p);
                        return NULL;
                }
+               if (p == peer->portno_start)
+                       sd = xseg_get_signal_desc(peer->xseg, port);
        }
 
-       printf("Peer on ports  %u-%u\n", peer->portno_start,
-                       peer->portno_end);
+       printf("Peer on ports  %u-%u\n", peer->portno_start, peer->portno_end);
 
        for (i = 0; i < nr_ops; i++) {
                peer->peer_reqs[i].peer = peer;
@@ -633,6 +714,10 @@ malloc_fail:
                peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
 #endif
        }
+
+       //Plug default peerd_loop. This can change later on by custom_peer_init.
+       peer->peerd_loop = generic_peerd_loop;
+
 #ifdef MT
        peer->interactive_func = NULL;
 #endif
@@ -682,7 +767,7 @@ int pidfile_read(char *path, pid_t *pid)
 int pidfile_open(char *path, pid_t *old_pid)
 {
        //nfs version > 3
-       int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
+       int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
        if (fd < 0){
                if (errno == EEXIST)
                        pidfile_read(path, old_pid);
@@ -725,7 +810,7 @@ int main(int argc, char *argv[])
        uint32_t nr_ops = 16;
        uint32_t nr_threads = 1;
        unsigned int debug_level = 0;
-       uint32_t defer_portno = NoPort;
+       xport defer_portno = NoPort;
        pid_t old_pid;
        int pid_fd = -1;
 
@@ -751,7 +836,7 @@ int main(int argc, char *argv[])
 #ifdef MT
        READ_ARG_ULONG("-t", nr_threads);
 #endif
-//     READ_ARG_ULONG("-dp", defer_portno);
+       READ_ARG_ULONG("-dp", defer_portno);
        READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
        READ_ARG_BOOL("-d", daemonize);
        READ_ARG_BOOL("-h", help);
@@ -819,16 +904,14 @@ int main(int argc, char *argv[])
        r = custom_peer_init(peer, argc, argv);
        if (r < 0)
                goto out;
-#ifdef MT
+#if defined(MT)
        //TODO err check
        peerd_start_threads(peer);
-#endif
-
-#ifdef ST_THREADS
+#elif defined(ST_THREADS)
        st_thread_t st = st_thread_create(st_peerd_loop, peer, 1, 0);
        r = st_thread_join(st, NULL);
 #else
-       r = peerd_loop(peer);
+       r = init_peerd_loop(peer);
 #endif
 out:
        if (pid_fd > 0)