Bump version to 0.3.5next
[archipelago] / xseg / peers / user / speer.c
index 343866d..3a2f145 100644 (file)
@@ -1,37 +1,56 @@
+/*
+ * Copyright 2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
 #define _GNU_SOURCE
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <unistd.h>
 #include <xseg/xseg.h>
-#include <pthread.h>
-#include <mpeer.h>
+#include <speer.h>
 #include <sys/syscall.h>
 #include <sys/time.h>
 #include <signal.h>
 
-#define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
-#define LOG(level, ...)                                              \
-               do {                                                               \
-                       if (level <=  verbose) {                           \
-                               fprintf(stderr, "%s: "  REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
-                       }                                                          \
-               }while (0)
-
+#ifdef ST_THREADS
+#include <st.h>
+uint32_t ta = 0;
+#endif
 
-unsigned int verbose = 0;
+unsigned int verbose;
 struct log_ctx lc;
 
-struct thread {
-       struct peerd *peer;
-       pthread_t tid;
-       pthread_cond_t cond;
-       pthread_mutex_t lock;
-       void (*func)(void *arg);
-       void *arg;
-};
-
-
 inline int canDefer(struct peerd *peer)
 {
        return !(peer->defer_portno == NoPort);
@@ -67,6 +86,7 @@ void print_req(struct xseg *xseg, struct xseg_request *req)
                                (unsigned long long)req->datalen, data);
        }
 }
+
 void log_pr(char *msg, struct peer_req *pr)
 {
        char target[64], data[64];
@@ -115,48 +135,10 @@ inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
        xq_append_head(&peer->free_reqs, idx, 1);
 }
 
-inline static struct thread* alloc_thread(struct peerd *peer)
-{
-       xqindex idx = xq_pop_head(&peer->threads, 1);
-       if (idx == Noneidx)
-               return NULL;
-       return peer->thread + idx;
-}
-
-inline static void free_thread(struct peerd *peer, struct thread *t)
-{
-       xqindex idx = t - peer->thread;
-       xq_append_head(&peer->threads, idx, 1);
-}
-
-
-inline static void __wake_up_thread(struct thread *t)
-{
-       pthread_mutex_lock(&t->lock);
-       pthread_cond_signal(&t->cond);
-       pthread_mutex_unlock(&t->lock);
-}
-
-inline static void wake_up_thread(struct thread* t)
-{
-       if (t){
-               __wake_up_thread(t);
-       }
-}
-
-inline static int wake_up_next_thread(struct peerd *peer)
-{
-       //struct thread *t = alloc_thread(peer);
-       //wake_up_thread(t);
-       //return t;
-       return (xseg_signal(peer->xseg, peer->portno_start));
-}
-
 struct timeval resp_start, resp_end, resp_accum = {0, 0};
 uint64_t responds = 0;
 void get_responds_stats(){
                printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
-                               //(unsigned int)(t - peer->thread),
                                resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
 }
 
@@ -172,7 +154,6 @@ void fail(struct peerd *peer, struct peer_req *pr)
        if (xseg_signal(peer->xseg, p) < 0)
                XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
        free_peer_req(peer, pr);
-       wake_up_next_thread(peer);
 }
 
 //FIXME error check
@@ -192,12 +173,6 @@ void complete(struct peerd *peer, struct peer_req *pr)
        if (xseg_signal(peer->xseg, p) < 0)
                XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
        free_peer_req(peer, pr);
-       wake_up_next_thread(peer);
-}
-
-void pending(struct peerd *peer, struct peer_req *pr)
-{
-               pr->req->state = XS_PENDING;
 }
 
 static void handle_accepted(struct peerd *peer, struct peer_req *pr, 
@@ -207,25 +182,23 @@ static void handle_accepted(struct peerd *peer, struct peer_req *pr,
        //assert xreq == req;
        XSEGLOG2(&lc, D, "Handle accepted");
        xreq->serviced = 0;
-       //xreq->state = XS_ACCEPTED;
        pr->retval = 0;
-       dispatch(peer, pr, req);
+       dispatch(peer, pr, req, dispatch_accept);
 }
 
 static void handle_received(struct peerd *peer, struct peer_req *pr,
                                struct xseg_request *req)
 {
        //struct xseg_request *req = pr->req;
-       //assert req->state != XS_ACCEPTED;
        XSEGLOG2(&lc, D, "Handle received \n");
-       dispatch(peer, pr, req);
+       dispatch(peer, pr, req, dispatch_receive);
 
 }
+
 struct timeval sub_start, sub_end, sub_accum = {0, 0};
 uint64_t submits = 0;
 void get_submits_stats(){
                printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
-                               //(unsigned int)(t - peer->thread),
                                sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
 }
 
@@ -234,7 +207,6 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
        uint32_t ret;
        struct xseg_request *req = pr->req;
        // assert req->portno == peer->portno ?
-       //TODO small function with error checking
        XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
        ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
        if (ret < 0)
@@ -252,98 +224,91 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr)
        return 0;
 }
 
-int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
+static int check_ports(struct peerd *peer)
 {
-       struct thread *t = alloc_thread(peer);
-       if (t) {
-               t->func = func;
-               t->arg = arg;
-               wake_up_thread(t);
-               return 0;
-       } else
-               // we could hijack a thread
-               return -1;
+       struct xseg *xseg = peer->xseg;
+       xport portno_start = peer->portno_start;
+       xport portno_end = peer->portno_end;
+       struct xseg_request *accepted, *received;
+       struct peer_req *pr;
+       xport i;
+       int  r, c = 0;
+
+       for (i = portno_start; i <= portno_end; i++) {
+               accepted = NULL;
+               received = NULL;
+               pr = alloc_peer_req(peer);
+               if (pr) {
+                       accepted = xseg_accept(xseg, i, X_NONBLOCK);
+                       if (accepted) {
+                               pr->req = accepted;
+                               pr->portno = i;
+                               xseg_cancel_wait(xseg, i);
+                               handle_accepted(peer, pr, accepted);
+                               c = 1;
+                       }
+                       else {
+                               free_peer_req(peer, pr);
+                       }
+               }
+               received = xseg_receive(xseg, i, X_NONBLOCK);
+               if (received) {
+                       r =  xseg_get_req_data(xseg, received, (void **) &pr);
+                       if (r < 0 || !pr){
+                               XSEGLOG2(&lc, W, "Received request with no pr data\n");
+                               xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+                               if (p == NoPort){
+                                       XSEGLOG2(&lc, W, "Could not respond stale request");
+                                       xseg_put_request(xseg, received, portno_start);
+                                       continue;
+                               } else {
+                                       xseg_signal(xseg, p);
+                               }
+                       } else {
+                               //maybe perform sanity check for pr
+                               xseg_cancel_wait(xseg, i);
+                               handle_received(peer, pr, received);
+                               c = 1;
+                       }
+               }
+       }
+
+       return c;
 }
 
-static void* thread_loop(void *arg)
+static int peerd_loop(struct peerd *peer)
 {
-       struct thread *t = (struct thread *) arg;
-       struct peerd *peer = t->peer;
        struct xseg *xseg = peer->xseg;
        xport portno_start = peer->portno_start;
        xport portno_end = peer->portno_end;
-       struct peer_req *pr;
-       uint64_t threshold=1;
+       uint64_t threshold=1000/(1 + portno_end - portno_start);
        pid_t pid =syscall(SYS_gettid);
        uint64_t loops;
-       struct xseg_request *accepted, *received;
-       int r;
-       int change;
-       xport i;
-               
-       XSEGLOG2(&lc, D, "thread %u\n",  (unsigned int) (t- peer->thread));
-
-       XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
+       
+       XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
        xseg_init_local_signal(xseg, peer->portno_start);
        for (;;) {
-               if (t->func) {
-                       XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
-                       xseg_cancel_wait(xseg, peer->portno_start);
-                       t->func(t->arg);
-                       t->func = NULL;
-                       t->arg = NULL;
-                       continue;
-               }
-
                for(loops= threshold; loops > 0; loops--) {
-                       do {
-                               if (loops == 1)
-                                       xseg_prepare_wait(xseg, peer->portno_start);
-                               change = 0;
-                               for (i = portno_start; i <= portno_end; i++) {
-                                       accepted = NULL;
-                                       received = NULL;
-                                       pr = alloc_peer_req(peer);
-                                       if (pr) {
-                                               accepted = xseg_accept(xseg, i, X_NONBLOCK);
-                                               if (accepted) {
-                                                       XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
-                                                       pr->req = accepted;
-                                                       pr->portno = i;
-                                                       xseg_cancel_wait(xseg, i);
-                                                       handle_accepted(peer, pr, accepted);
-                                                       change = 1;
-                                               }
-                                               else {
-                                                       free_peer_req(peer, pr);
-                                               }
-                                       }
-                                       received = xseg_receive(xseg, i, X_NONBLOCK);
-                                       if (received) {
-                                               //printf("received req id: %u\n", received - xseg->requests);
-                                               //print_req(peer->xseg, received);
-                                               r =  xseg_get_req_data(xseg, received, (void **) &pr);
-                                               if (r < 0 || !pr){
-                                                       //FIXME what to do here ?
-                                                       XSEGLOG2(&lc, W, "Received request with no pr data\n");
-                                                       xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
-                                                       //FIXME if fails, put req
-                                               }
-                                               xseg_cancel_wait(xseg, i);
-                                               handle_received(peer, pr, received);
-                                               change = 1;
-                                       }
-                               }
-                               if (change)
-                                       loops = threshold;
-                       }while (change);
+                       if (loops == 1)
+                               xseg_prepare_wait(xseg, peer->portno_start);
+                       if (check_ports(peer))
+                               loops = threshold;
                }
-               XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
-               xseg_wait_signal(xseg, 10000000UL);
-               xseg_cancel_wait(xseg, peer->portno_start);
-               XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
+#ifdef ST_THREADS
+               if (ta){
+                       st_sleep(0);
+               } else {
+#endif
+                       XSEGLOG2(&lc, I, "Peer goes to sleep\n");
+                       xseg_wait_signal(xseg, 10000000UL);
+                       xseg_cancel_wait(xseg, peer->portno_start);
+                       XSEGLOG2(&lc, I, "Peer woke up\n");
+#ifdef ST_THREADS
+               }
+#endif
        }
-       return NULL;
+       xseg_quit_local_signal(xseg, peer->portno_start);
+       return 0;
 }
 
 void defer_request(struct peerd *peer, struct peer_req *pr)
@@ -354,16 +319,6 @@ void defer_request(struct peerd *peer, struct peer_req *pr)
 //     free_peer_req(peer, pr);
 }
 
-static int peerd_loop(struct peerd *peer) 
-{
-       if (peer->interactive_func)
-               peer->interactive_func();
-       for (;;) {
-               pthread_join(peer->thread[0].tid, NULL);
-       }
-       return 0;
-}
-
 static struct xseg *join(char *spec)
 {
        struct xseg_config config;
@@ -378,28 +333,16 @@ static struct xseg *join(char *spec)
        return xseg_join(config.type, config.name, "posix", NULL);
 }
 
-int peerd_start_threads(struct peerd *peer)
-{
-       int i;
-       uint32_t nr_threads = peer->nr_threads;
-       //TODO err check
-       for (i = 0; i < nr_threads; i++) {
-               peer->thread[i].peer = peer;
-               pthread_cond_init(&peer->thread[i].cond,NULL);
-               pthread_mutex_init(&peer->thread[i].lock, NULL);
-               pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
-               peer->thread[i].func = NULL;
-               peer->thread[i].arg = NULL;
-
-       }
-       return 0;
-}
-
 static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
-                       long portno_end, uint32_t nr_threads, uint32_t defer_portno)
+                       long portno_end, uint32_t defer_portno)
 {
        int i;
        struct peerd *peer;
+       struct xseg_port *port;
+
+#ifdef ST_THREADS
+       st_init();
+#endif
        peer = malloc(sizeof(struct peerd));
        if (!peer) {
                perror("malloc");
@@ -407,11 +350,7 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
        }
        peer->nr_ops = nr_ops;
        peer->defer_portno = defer_portno;
-       peer->nr_threads = nr_threads;
 
-       peer->thread = calloc(nr_threads, sizeof(struct thread));
-       if (!peer->thread)
-               goto malloc_fail;
        peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
        if (!peer->peer_reqs){
 malloc_fail:
@@ -421,8 +360,6 @@ malloc_fail:
 
        if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
                goto malloc_fail;
-       if (!xq_alloc_empty(&peer->threads, nr_threads))
-               goto malloc_fail;
 
        if (xseg_initialize()){
                printf("cannot initialize library\n");
@@ -434,8 +371,8 @@ malloc_fail:
 
        peer->portno_start = (xport) portno_start;
        peer->portno_end= (xport) portno_end;
-       peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
-       if (!peer->port){
+       port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
+       if (!port){
                printf("cannot bind to port %ld\n", peer->portno_start);
                return NULL;
        }
@@ -443,7 +380,7 @@ malloc_fail:
        xport p;
        for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
                struct xseg_port *tmp;
-               tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port));
+               tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
                if (!tmp){
                        printf("cannot bind to port %ld\n", p);
                        return NULL;
@@ -458,8 +395,11 @@ malloc_fail:
                peer->peer_reqs[i].req = NULL;
                peer->peer_reqs[i].retval = 0;
                peer->peer_reqs[i].priv = NULL;
+               peer->peer_reqs[i].portno = NoPort;
+#ifdef ST_THREADS
+               peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
+#endif
        }
-       peer->interactive_func = NULL;
        return peer;
 }
 
@@ -470,15 +410,14 @@ int main(int argc, char *argv[])
        //parse args
        char *spec = "";
        int i, r;
-       long portno_start = -1, portno_end = -1;
+       long portno_start = -1, portno_end = -1, portno = -1;;
        //set defaults here
        uint32_t nr_ops = 16;
-       uint32_t nr_threads = 1 ;
        unsigned int debug_level = 0;
        uint32_t defer_portno = NoPort;
        char *logfile = NULL;
 
-       //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
+       //capture here -g spec, -n nr_ops, -p portno, -v verbose level
        // -dp xseg_portno to defer blocking requests
        // -l log file ?
        //TODO print messages on arg parsing error
@@ -501,6 +440,12 @@ int main(int argc, char *argv[])
                        i += 1;
                        continue;
                }
+               
+               if (!strcmp(argv[i], "-p") && i + 1 < argc) {
+                       portno = strtoul(argv[i+1], NULL, 10);
+                       i += 1;
+                       continue;
+               }
 
                if (!strcmp(argv[i], "-n") && i + 1 < argc) {
                        nr_ops = strtoul(argv[i+1], NULL, 10);
@@ -525,18 +470,25 @@ int main(int argc, char *argv[])
 
        }
        init_logctx(&lc, argv[0], debug_level, logfile);
-       XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
-       
        //TODO perform argument sanity checks
        verbose = debug_level;
 
+       if (portno != -1) {
+               portno_start = portno;
+               portno_end = portno;
+       }
+
        //TODO err check
-       peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
+       peer = peerd_init(nr_ops, spec, portno_start, portno_end, defer_portno);
        if (!peer)
                return -1;
        r = custom_peer_init(peer, argc, argv);
        if (r < 0)
                return -1;
-       peerd_start_threads(peer);
+#ifdef ST_THREADS
+       st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
+       return st_thread_join(st, NULL);
+#else
        return peerd_loop(peer);
+#endif
 }