X-Git-Url: https://code.grnet.gr/git/archipelago/blobdiff_plain/37758262535f1b2704668689192ba665637c25aa..96b50cd3a6566375063e7c408c4067b07493617d:/xseg/peers/user/mpeer.c diff --git a/xseg/peers/user/mpeer.c b/xseg/peers/user/mpeer.c index ff9a494..e12ef76 100644 --- a/xseg/peers/user/mpeer.c +++ b/xseg/peers/user/mpeer.c @@ -10,16 +10,8 @@ #include #include -#define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__ -#define LOG(level, ...) \ - do { \ - if (level <= verbose) { \ - fprintf(stderr, "%s: " REARRANGE( __func__ , ## __VA_ARGS__, "" )); \ - } \ - }while (0) - - unsigned int verbose = 0; +struct log_ctx lc; struct thread { struct peerd *peer; @@ -31,6 +23,43 @@ struct thread { }; +inline static struct thread* alloc_thread(struct peerd *peer) +{ + xqindex idx = xq_pop_head(&peer->threads, 1); + if (idx == Noneidx) + return NULL; + return peer->thread + idx; +} + +inline static void free_thread(struct peerd *peer, struct thread *t) +{ + xqindex idx = t - peer->thread; + xq_append_head(&peer->threads, idx, 1); +} + + +inline static void __wake_up_thread(struct thread *t) +{ + pthread_mutex_lock(&t->lock); + pthread_cond_signal(&t->cond); + pthread_mutex_unlock(&t->lock); +} + +inline static void wake_up_thread(struct thread* t) +{ + if (t){ + __wake_up_thread(t); + } +} + +inline static int wake_up_next_thread(struct peerd *peer) +{ + //struct thread *t = alloc_thread(peer); + //wake_up_thread(t); + //return t; + return (xseg_signal(peer->xseg, peer->portno_start)); +} + inline int canDefer(struct peerd *peer) { return !(peer->defer_portno == NoPort); @@ -75,9 +104,9 @@ void log_pr(char *msg, struct peer_req *pr) req_target = xseg_get_target(xseg, pr->req); req_data = xseg_get_data(xseg, pr->req); /* null terminate name in case of req->target is less than 63 characters, - * * and next character after name (aka first byte of next buffer) is not - * * null - * */ + * and next character after name (aka first byte of next buffer) is not + * null + */ unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen; if (verbose) { strncpy(target, req_target, end); @@ -114,49 +143,12 @@ inline void free_peer_req(struct peerd *peer, struct peer_req *pr) xq_append_head(&peer->free_reqs, idx, 1); } -inline static struct thread* alloc_thread(struct peerd *peer) -{ - xqindex idx = xq_pop_head(&peer->threads, 1); - if (idx == Noneidx) - return NULL; - return peer->thread + idx; -} - -inline static void free_thread(struct peerd *peer, struct thread *t) -{ - xqindex idx = t - peer->thread; - xq_append_head(&peer->threads, idx, 1); -} - - -inline static void __wake_up_thread(struct thread *t) -{ - pthread_mutex_lock(&t->lock); - pthread_cond_signal(&t->cond); - pthread_mutex_unlock(&t->lock); -} - -inline static void wake_up_thread(struct thread* t) -{ - if (t){ - __wake_up_thread(t); - } -} - -inline static int wake_up_next_thread(struct peerd *peer) -{ - //struct thread *t = alloc_thread(peer); - //wake_up_thread(t); - //return t; - return (xseg_signal(peer->xseg, peer->portno)); -} - struct timeval resp_start, resp_end, resp_accum = {0, 0}; uint64_t responds = 0; void get_responds_stats(){ printf("Time waiting respond %lu.%06lu sec for %llu times.\n", //(unsigned int)(t - peer->thread), - resp_accum.tv_sec, resp_accum.tv_usec, responds); + resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds); } //FIXME error check @@ -164,10 +156,10 @@ void fail(struct peerd *peer, struct peer_req *pr) { struct xseg_request *req = pr->req; uint32_t p; - LOG(5, "failing req %u\n", (unsigned int) (pr - peer->peer_reqs)); + XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs)); req->state |= XS_FAILED; //xseg_set_req_data(peer->xseg, pr->req, NULL); - p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC); + p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); xseg_signal(peer->xseg, p); free_peer_req(peer, pr); wake_up_next_thread(peer); @@ -181,7 +173,7 @@ void complete(struct peerd *peer, struct peer_req *pr) req->state |= XS_SERVED; //xseg_set_req_data(peer->xseg, pr->req, NULL); //gettimeofday(&resp_start, NULL); - p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC); + p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); //gettimeofday(&resp_end, NULL); //responds++; //timersub(&resp_end, &resp_start, &resp_end); @@ -192,27 +184,25 @@ void complete(struct peerd *peer, struct peer_req *pr) wake_up_next_thread(peer); } -void pending(struct peerd *peer, struct peer_req *pr) +static void handle_accepted(struct peerd *peer, struct peer_req *pr, + struct xseg_request *req) { - pr->req->state = XS_PENDING; -} - -static void handle_accepted(struct peerd *peer, struct peer_req *pr) -{ - struct xseg_request *req = pr->req; - LOG(4, "Handle accepted \n"); - req->serviced = 0; - req->state = XS_ACCEPTED; + struct xseg_request *xreq = pr->req; + //assert xreq == req; + XSEGLOG2(&lc, D, "Handle accepted"); + xreq->serviced = 0; + //xreq->state = XS_ACCEPTED; pr->retval = 0; - dispatch(peer, pr); + dispatch(peer, pr, req, dispatch_accept); } -static void handle_received(struct peerd *peer, struct peer_req *pr) +static void handle_received(struct peerd *peer, struct peer_req *pr, + struct xseg_request *req) { //struct xseg_request *req = pr->req; //assert req->state != XS_ACCEPTED; - LOG(4, "Handle received \n"); - dispatch(peer, pr); + XSEGLOG2(&lc, D, "Handle received \n"); + dispatch(peer, pr, req, dispatch_receive); } struct timeval sub_start, sub_end, sub_accum = {0, 0}; @@ -220,7 +210,7 @@ uint64_t submits = 0; void get_submits_stats(){ printf("Time waiting submit %lu.%06lu sec for %llu times.\n", //(unsigned int)(t - peer->thread), - sub_accum.tv_sec, sub_accum.tv_usec, submits); + sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits); } int submit_peer_req(struct peerd *peer, struct peer_req *pr) @@ -229,13 +219,13 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr) struct xseg_request *req = pr->req; // assert req->portno == peer->portno ? //TODO small function with error checking - LOG (5, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs)); + XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs)); ret = xseg_set_req_data(peer->xseg, req, (void *)(pr)); if (ret < 0) return -1; //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req)); //gettimeofday(&sub_start, NULL); - ret = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC); + ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); //gettimeofday(&sub_end, NULL); //submits++; //timersub(&sub_end, &sub_start, &sub_end); @@ -259,27 +249,77 @@ int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg) return -1; } +static int check_ports(struct peerd *peer) +{ + struct xseg *xseg = peer->xseg; + xport portno_start = peer->portno_start; + xport portno_end = peer->portno_end; + struct xseg_request *accepted, *received; + struct peer_req *pr; + xport i; + int r, c = 0; + + for (i = portno_start; i <= portno_end; i++) { + accepted = NULL; + received = NULL; + pr = alloc_peer_req(peer); + if (pr) { + accepted = xseg_accept(xseg, i, X_NONBLOCK); + if (accepted) { + pr->req = accepted; + pr->portno = i; + xseg_cancel_wait(xseg, i); + handle_accepted(peer, pr, accepted); + c = 1; + } + else { + free_peer_req(peer, pr); + } + } + received = xseg_receive(xseg, i, X_NONBLOCK); + if (received) { + r = xseg_get_req_data(xseg, received, (void **) &pr); + if (r < 0 || !pr){ + XSEGLOG2(&lc, W, "Received request with no pr data\n"); + xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC); + if (p == NoPort){ + XSEGLOG2(&lc, W, "Could not respond stale request"); + xseg_put_request(xseg, received, portno_start); + continue; + } else { + xseg_signal(xseg, p); + } + } else { + //maybe perform sanity check for pr + xseg_cancel_wait(xseg, i); + handle_received(peer, pr, received); + c = 1; + } + } + } + + return c; +} + static void* thread_loop(void *arg) { struct thread *t = (struct thread *) arg; struct peerd *peer = t->peer; struct xseg *xseg = peer->xseg; - uint32_t portno = peer->portno; - struct peer_req *pr; - uint64_t threshold=1000; + xport portno_start = peer->portno_start; + xport portno_end = peer->portno_end; pid_t pid =syscall(SYS_gettid); uint64_t loops; - struct xseg_request *accepted, *received; - int r; - - printf("thread %u\n", (unsigned int) (t- peer->thread)); + uint64_t threshold=1000/(1 + portno_end - portno_start); + + XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread)); - LOG(0, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid); - xseg_init_signal(xseg, portno); + XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid); + xseg_init_local_signal(xseg, peer->portno_start); for (;;) { if (t->func) { - LOG(5, "Thread %u executes function\n", (unsigned int) (t- peer->thread)); - xseg_cancel_wait(xseg, portno); + XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread)); + xseg_cancel_wait(xseg, peer->portno_start); t->func(t->arg); t->func = NULL; t->arg = NULL; @@ -287,52 +327,15 @@ static void* thread_loop(void *arg) } for(loops= threshold; loops > 0; loops--) { - accepted = NULL; - received = NULL; if (loops == 1) - xseg_prepare_wait(xseg, portno); - -// if (xq_count(&peer->xport->request_queue)){ - pr = alloc_peer_req(peer); - if (pr) { - accepted = xseg_accept(xseg, peer->portno); - LOG(5, "Thread %u accepted\n", (unsigned int) (t- peer->thread)); - if (accepted) { - pr->req = accepted; - xseg_cancel_wait(xseg, portno); - wake_up_next_thread(peer); - handle_accepted(peer, pr); - loops = threshold; - } - else { - free_peer_req(peer, pr); - } - } -// } -// if (xq_count(&peer->xport->reply_queue)){ - received = xseg_receive(xseg, peer->portno); - if (received) { - //printf("received req id: %u\n", received - xseg->requests); - //print_req(peer->xseg, received); - r = xseg_get_req_data(xseg, received, (void **) &pr); - if (r < 0 || !pr){ - //FIXME what to do here ? - LOG(0, "Received request with no pr data\n"); - xseg_respond(peer->xseg, received, peer->portno, X_ALLOC); - } - //fail(peer, received); - //assert pr->req == received; - xseg_cancel_wait(xseg, portno); - wake_up_next_thread(peer); - handle_received(peer, pr); - loops = threshold; - } -// } + xseg_prepare_wait(xseg, peer->portno_start); + if (check_ports(peer)) + loops = threshold; } - LOG(1, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread)); + XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread)); xseg_wait_signal(xseg, 10000000UL); - xseg_cancel_wait(xseg, portno); - LOG(1, "Thread %u woke up\n", (unsigned int) (t- peer->thread)); + xseg_cancel_wait(xseg, peer->portno_start); + XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread)); } return NULL; } @@ -386,10 +389,12 @@ int peerd_start_threads(struct peerd *peer) return 0; } -static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno, uint32_t nr_threads, uint32_t defer_portno) +static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, + long portno_end, uint32_t nr_threads, uint32_t defer_portno) { int i; struct peerd *peer; + struct xseg_port *port; peer = malloc(sizeof(struct peerd)); if (!peer) { perror("malloc"); @@ -422,46 +427,57 @@ malloc_fail: if (!peer->xseg) return NULL; - peer->xport = xseg_bind_port(peer->xseg, portno); - if (!peer->xport){ - printf("cannot bind to port %ld\n", portno); + peer->portno_start = (xport) portno_start; + peer->portno_end= (xport) portno_end; + port = xseg_bind_port(peer->xseg, peer->portno_start, NULL); + if (!port){ + printf("cannot bind to port %ld\n", peer->portno_start); return NULL; } - printf("%lx\n", (unsigned long) peer->xport); - peer->portno = xseg_portno(peer->xseg, peer->xport); - printf("Peer on port %u/%u\n", peer->portno, - peer->xseg->config.nr_ports); + + xport p; + for (p = peer->portno_start + 1; p <= peer->portno_end; p++) { + struct xseg_port *tmp; + tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port)); + if (!tmp){ + printf("cannot bind to port %ld\n", p); + return NULL; + } + } + + printf("Peer on ports %u-%u\n", peer->portno_start, + peer->portno_end); for (i = 0; i < nr_ops; i++) { peer->peer_reqs[i].peer = peer; peer->peer_reqs[i].req = NULL; peer->peer_reqs[i].retval = 0; peer->peer_reqs[i].priv = NULL; + peer->peer_reqs[i].portno = NoPort; } peer->interactive_func = NULL; - peerd_start_threads(peer); return peer; } -int main(int argc, const char *argv[]) +int main(int argc, char *argv[]) { struct peerd *peer = NULL; //parse args char *spec = ""; int i, r; - long portno = -1; + long portno_start = -1, portno_end = -1, portno = -1; //set defaults here uint32_t nr_ops = 16; uint32_t nr_threads = 16 ; unsigned int debug_level = 0; uint32_t defer_portno = NoPort; - + char *logfile = NULL; + //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level // -dp xseg_portno to defer blocking requests - //maybe -l log file ? + // -l log file ? //TODO print messages on arg parsing error - LOG(5, "Main thread has tid %ld.\n", syscall(SYS_gettid)); for (i = 1; i < argc; i++) { if (!strcmp(argv[i], "-g") && i + 1 < argc) { @@ -470,6 +486,18 @@ int main(int argc, const char *argv[]) continue; } + if (!strcmp(argv[i], "-sp") && i + 1 < argc) { + portno_start = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + + if (!strcmp(argv[i], "-ep") && i + 1 < argc) { + portno_end = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + if (!strcmp(argv[i], "-p") && i + 1 < argc) { portno = strtoul(argv[i+1], NULL, 10); i += 1; @@ -496,15 +524,30 @@ int main(int argc, const char *argv[]) i += 1; continue; } + if (!strcmp(argv[i], "-l") && i + 1 < argc ) { + logfile = argv[i+1]; + i += 1; + continue; + } } + init_logctx(&lc, argv[0], debug_level, logfile); + XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid)); //TODO perform argument sanity checks verbose = debug_level; + if (portno != -1) { + portno_start = portno; + portno_end = portno; + } //TODO err check - peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno); + peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno); + if (!peer) + return -1; r = custom_peer_init(peer, argc, argv); -// peerd_start_threads(peer); + if (r < 0) + return -1; + peerd_start_threads(peer); return peerd_loop(peer); }