X-Git-Url: https://code.grnet.gr/git/archipelago/blobdiff_plain/cc21de66882be4b7295b3a0ea35c7a948a675095..9d3f16da0fa4dc47cb42e96a55be80555398ca8a:/xseg/peers/user/speer.c?ds=sidebyside diff --git a/xseg/peers/user/speer.c b/xseg/peers/user/speer.c index c84b4e9..4d1b599 100644 --- a/xseg/peers/user/speer.c +++ b/xseg/peers/user/speer.c @@ -4,33 +4,15 @@ #include #include #include -#include -#include +#include #include #include #include +#include -#define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__ -#define LOG(level, ...) \ - do { \ - if (level <= verbose) { \ - fprintf(stderr, "%s: " REARRANGE( __func__ , ## __VA_ARGS__, "" )); \ - } \ - }while (0) - - -unsigned int verbose = 0; +unsigned int verbose; struct log_ctx lc; - -struct thread { - struct peerd *peer; - pthread_t tid; - pthread_cond_t cond; - pthread_mutex_t lock; - void (*func)(void *arg); - void *arg; -}; - +uint32_t ta = 0; inline int canDefer(struct peerd *peer) { @@ -67,6 +49,7 @@ void print_req(struct xseg *xseg, struct xseg_request *req) (unsigned long long)req->datalen, data); } } + void log_pr(char *msg, struct peer_req *pr) { char target[64], data[64]; @@ -115,48 +98,10 @@ inline void free_peer_req(struct peerd *peer, struct peer_req *pr) xq_append_head(&peer->free_reqs, idx, 1); } -inline static struct thread* alloc_thread(struct peerd *peer) -{ - xqindex idx = xq_pop_head(&peer->threads, 1); - if (idx == Noneidx) - return NULL; - return peer->thread + idx; -} - -inline static void free_thread(struct peerd *peer, struct thread *t) -{ - xqindex idx = t - peer->thread; - xq_append_head(&peer->threads, idx, 1); -} - - -inline static void __wake_up_thread(struct thread *t) -{ - pthread_mutex_lock(&t->lock); - pthread_cond_signal(&t->cond); - pthread_mutex_unlock(&t->lock); -} - -inline static void wake_up_thread(struct thread* t) -{ - if (t){ - __wake_up_thread(t); - } -} - -inline static int wake_up_next_thread(struct peerd *peer) -{ - //struct thread *t = alloc_thread(peer); - //wake_up_thread(t); - //return t; - return (xseg_signal(peer->xseg, peer->portno_start)); -} - struct timeval resp_start, resp_end, resp_accum = {0, 0}; uint64_t responds = 0; void get_responds_stats(){ printf("Time waiting respond %lu.%06lu sec for %llu times.\n", - //(unsigned int)(t - peer->thread), resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds); } @@ -169,9 +114,9 @@ void fail(struct peerd *peer, struct peer_req *pr) req->state |= XS_FAILED; //xseg_set_req_data(peer->xseg, pr->req, NULL); p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); - xseg_signal(peer->xseg, p); + if (xseg_signal(peer->xseg, p) < 0) + XSEGLOG2(&lc, W, "Cannot signal portno %u", p); free_peer_req(peer, pr); - wake_up_next_thread(peer); } //FIXME error check @@ -188,9 +133,9 @@ void complete(struct peerd *peer, struct peer_req *pr) //timersub(&resp_end, &resp_start, &resp_end); //timeradd(&resp_end, &resp_accum, &resp_accum); //printf("xseg_signal: %u\n", p); - xseg_signal(peer->xseg, p); + if (xseg_signal(peer->xseg, p) < 0) + XSEGLOG2(&lc, W, "Cannot signal portno %u", p); free_peer_req(peer, pr); - wake_up_next_thread(peer); } void pending(struct peerd *peer, struct peer_req *pr) @@ -207,7 +152,7 @@ static void handle_accepted(struct peerd *peer, struct peer_req *pr, xreq->serviced = 0; //xreq->state = XS_ACCEPTED; pr->retval = 0; - dispatch(peer, pr, req); + dispatch(peer, pr, req, dispatch_accept); } static void handle_received(struct peerd *peer, struct peer_req *pr, @@ -216,14 +161,14 @@ static void handle_received(struct peerd *peer, struct peer_req *pr, //struct xseg_request *req = pr->req; //assert req->state != XS_ACCEPTED; XSEGLOG2(&lc, D, "Handle received \n"); - dispatch(peer, pr, req); + dispatch(peer, pr, req, dispatch_receive); } + struct timeval sub_start, sub_end, sub_accum = {0, 0}; uint64_t submits = 0; void get_submits_stats(){ printf("Time waiting submit %lu.%06lu sec for %llu times.\n", - //(unsigned int)(t - peer->thread), sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits); } @@ -232,7 +177,6 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr) uint32_t ret; struct xseg_request *req = pr->req; // assert req->portno == peer->portno ? - //TODO small function with error checking XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs)); ret = xseg_set_req_data(peer->xseg, req, (void *)(pr)); if (ret < 0) @@ -250,98 +194,87 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr) return 0; } -int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg) +static int check_ports(struct peerd *peer) { - struct thread *t = alloc_thread(peer); - if (t) { - t->func = func; - t->arg = arg; - wake_up_thread(t); - return 0; - } else - // we could hijack a thread - return -1; + struct xseg *xseg = peer->xseg; + xport portno_start = peer->portno_start; + xport portno_end = peer->portno_end; + struct xseg_request *accepted, *received; + struct peer_req *pr; + xport i; + int r, c = 0; + + for (i = portno_start; i <= portno_end; i++) { + accepted = NULL; + received = NULL; + pr = alloc_peer_req(peer); + if (pr) { + accepted = xseg_accept(xseg, i, X_NONBLOCK); + if (accepted) { + pr->req = accepted; + pr->portno = i; + xseg_cancel_wait(xseg, i); + handle_accepted(peer, pr, accepted); + c = 1; + } + else { + free_peer_req(peer, pr); + } + } + received = xseg_receive(xseg, i, X_NONBLOCK); + if (received) { + r = xseg_get_req_data(xseg, received, (void **) &pr); + if (r < 0 || !pr){ + XSEGLOG2(&lc, W, "Received request with no pr data\n"); + xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC); + if (p == NoPort){ + XSEGLOG2(&lc, W, "Could not respond stale request"); + xseg_put_request(xseg, received, portno_start); + continue; + } else { + xseg_signal(xseg, p); + } + } else { + //maybe perform sanity check for pr + xseg_cancel_wait(xseg, i); + handle_received(peer, pr, received); + c = 1; + } + } + } + + return c; } -static void* thread_loop(void *arg) +static int peerd_loop(struct peerd *peer) { - struct thread *t = (struct thread *) arg; - struct peerd *peer = t->peer; struct xseg *xseg = peer->xseg; xport portno_start = peer->portno_start; xport portno_end = peer->portno_end; - struct peer_req *pr; - uint64_t threshold=1; + uint64_t threshold=1000/(1 + portno_end - portno_start); pid_t pid =syscall(SYS_gettid); uint64_t loops; - struct xseg_request *accepted, *received; - int r; - int change; - xport i; - - XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread)); - - XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid); + + XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid); xseg_init_local_signal(xseg, peer->portno_start); for (;;) { - if (t->func) { - XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread)); - xseg_cancel_wait(xseg, peer->portno_start); - t->func(t->arg); - t->func = NULL; - t->arg = NULL; - continue; - } - for(loops= threshold; loops > 0; loops--) { - do { - if (loops == 1) - xseg_prepare_wait(xseg, peer->portno_start); - change = 0; - for (i = portno_start; i <= portno_end; i++) { - accepted = NULL; - received = NULL; - pr = alloc_peer_req(peer); - if (pr) { - accepted = xseg_accept(xseg, i, X_NONBLOCK); - if (accepted) { - XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread)); - pr->req = accepted; - pr->portno = i; - xseg_cancel_wait(xseg, i); - handle_accepted(peer, pr, accepted); - change = 1; - } - else { - free_peer_req(peer, pr); - } - } - received = xseg_receive(xseg, i, X_NONBLOCK); - if (received) { - //printf("received req id: %u\n", received - xseg->requests); - //print_req(peer->xseg, received); - r = xseg_get_req_data(xseg, received, (void **) &pr); - if (r < 0 || !pr){ - //FIXME what to do here ? - XSEGLOG2(&lc, W, "Received request with no pr data\n"); - xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC); - //FIXME if fails, put req - } - xseg_cancel_wait(xseg, i); - handle_received(peer, pr, received); - change = 1; - } - } - if (change) - loops = threshold; - }while (change); + if (loops == 1) + xseg_prepare_wait(xseg, peer->portno_start); + if (check_ports(peer)) + loops = threshold; + } + if (ta){ + st_sleep(0); + } else { + XSEGLOG2(&lc, I, "Peer goes to sleep\n"); + xseg_wait_signal(xseg, 10000000UL); + xseg_cancel_wait(xseg, peer->portno_start); + XSEGLOG2(&lc, I, "Peer woke up\n"); } - XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread)); - xseg_wait_signal(xseg, 10000000UL); - xseg_cancel_wait(xseg, peer->portno_start); - XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread)); } - return NULL; + xseg_quit_local_signal(xseg, peer->portno_start); + return 0; } void defer_request(struct peerd *peer, struct peer_req *pr) @@ -352,16 +285,6 @@ void defer_request(struct peerd *peer, struct peer_req *pr) // free_peer_req(peer, pr); } -static int peerd_loop(struct peerd *peer) -{ - if (peer->interactive_func) - peer->interactive_func(); - for (;;) { - pthread_join(peer->thread[0].tid, NULL); - } - return 0; -} - static struct xseg *join(char *spec) { struct xseg_config config; @@ -376,28 +299,15 @@ static struct xseg *join(char *spec) return xseg_join(config.type, config.name, "posix", NULL); } -int peerd_start_threads(struct peerd *peer) -{ - int i; - uint32_t nr_threads = peer->nr_threads; - //TODO err check - for (i = 0; i < nr_threads; i++) { - peer->thread[i].peer = peer; - pthread_cond_init(&peer->thread[i].cond,NULL); - pthread_mutex_init(&peer->thread[i].lock, NULL); - pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i)); - peer->thread[i].func = NULL; - peer->thread[i].arg = NULL; - - } - return 0; -} - static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, - long portno_end, uint32_t nr_threads, uint32_t defer_portno) + long portno_end, uint32_t defer_portno) { int i; struct peerd *peer; + struct xseg_port *port; + + st_init(); + peer = malloc(sizeof(struct peerd)); if (!peer) { perror("malloc"); @@ -405,11 +315,7 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, } peer->nr_ops = nr_ops; peer->defer_portno = defer_portno; - peer->nr_threads = nr_threads; - peer->thread = calloc(nr_threads, sizeof(struct thread)); - if (!peer->thread) - goto malloc_fail; peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req)); if (!peer->peer_reqs){ malloc_fail: @@ -419,8 +325,6 @@ malloc_fail: if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops)) goto malloc_fail; - if (!xq_alloc_empty(&peer->threads, nr_threads)) - goto malloc_fail; if (xseg_initialize()){ printf("cannot initialize library\n"); @@ -432,8 +336,8 @@ malloc_fail: peer->portno_start = (xport) portno_start; peer->portno_end= (xport) portno_end; - peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL); - if (!peer->port){ + port = xseg_bind_port(peer->xseg, peer->portno_start, NULL); + if (!port){ printf("cannot bind to port %ld\n", peer->portno_start); return NULL; } @@ -441,7 +345,7 @@ malloc_fail: xport p; for (p = peer->portno_start + 1; p <= peer->portno_end; p++) { struct xseg_port *tmp; - tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port)); + tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port)); if (!tmp){ printf("cannot bind to port %ld\n", p); return NULL; @@ -456,8 +360,9 @@ malloc_fail: peer->peer_reqs[i].req = NULL; peer->peer_reqs[i].retval = 0; peer->peer_reqs[i].priv = NULL; + peer->peer_reqs[i].portno = NoPort; + peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check } - peer->interactive_func = NULL; return peer; } @@ -468,15 +373,14 @@ int main(int argc, char *argv[]) //parse args char *spec = ""; int i, r; - long portno_start = -1, portno_end = -1; + long portno_start = -1, portno_end = -1, portno = -1;; //set defaults here uint32_t nr_ops = 16; - uint32_t nr_threads = 1 ; unsigned int debug_level = 0; uint32_t defer_portno = NoPort; char *logfile = NULL; - //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level + //capture here -g spec, -n nr_ops, -p portno, -v verbose level // -dp xseg_portno to defer blocking requests // -l log file ? //TODO print messages on arg parsing error @@ -499,6 +403,12 @@ int main(int argc, char *argv[]) i += 1; continue; } + + if (!strcmp(argv[i], "-p") && i + 1 < argc) { + portno = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } if (!strcmp(argv[i], "-n") && i + 1 < argc) { nr_ops = strtoul(argv[i+1], NULL, 10); @@ -523,18 +433,22 @@ int main(int argc, char *argv[]) } init_logctx(&lc, argv[0], debug_level, logfile); - XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid)); - //TODO perform argument sanity checks verbose = debug_level; + if (portno != -1) { + portno_start = portno; + portno_end = portno; + } + //TODO err check - peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno); + peer = peerd_init(nr_ops, spec, portno_start, portno_end, defer_portno); if (!peer) return -1; r = custom_peer_init(peer, argc, argv); if (r < 0) return -1; - peerd_start_threads(peer); - return peerd_loop(peer); + st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0); + return st_thread_join(st, NULL); +// return peerd_loop(peer); }