X-Git-Url: https://code.grnet.gr/git/archipelago/blobdiff_plain/a7594f44e8b5f02a6f0dd692d10bdd162376e368..d637512167df785919c588b478fe55f1f281f701:/xseg/peers/user/peer.c diff --git a/xseg/peers/user/peer.c b/xseg/peers/user/peer.c index 555bb3f..a369a02 100644 --- a/xseg/peers/user/peer.c +++ b/xseg/peers/user/peer.c @@ -43,7 +43,6 @@ #include #include #include - #ifdef MT #include #endif @@ -52,6 +51,12 @@ #include #ifdef MT +#ifdef ST_THREADS +#error "MT and ST_THREADS defines are mutually exclusive" +#endif +#endif + +#ifdef MT #define PEER_TYPE "pthread" #else #define PEER_TYPE "posix" @@ -70,45 +75,7 @@ uint32_t ta = 0; #ifdef MT struct peerd *global_peer; - -struct thread { - struct peerd *peer; - pthread_t tid; - pthread_cond_t cond; - pthread_mutex_t lock; - void (*func)(void *arg); - void *arg; -}; - - -inline static struct thread* alloc_thread(struct peerd *peer) -{ - xqindex idx = xq_pop_head(&peer->threads, 1); - if (idx == Noneidx) - return NULL; - return peer->thread + idx; -} - -inline static void free_thread(struct peerd *peer, struct thread *t) -{ - xqindex idx = t - peer->thread; - xq_append_head(&peer->threads, idx, 1); -} - - -inline static void __wake_up_thread(struct thread *t) -{ - pthread_mutex_lock(&t->lock); - pthread_cond_signal(&t->cond); - pthread_mutex_unlock(&t->lock); -} - -inline static void wake_up_thread(struct thread* t) -{ - if (t){ - __wake_up_thread(t); - } -} +static pthread_key_t threadkey; inline static int wake_up_next_thread(struct peerd *peer) { @@ -116,21 +83,10 @@ inline static int wake_up_next_thread(struct peerd *peer) } #endif - -static inline int isTerminate() -{ -/* ta doesn't need to be taken into account, because the main loops - * doesn't check the terminated flag if ta is not 0. +/* + * extern is needed if this function is going to be called by another file + * such as bench-xseg.c */ - /* -#ifdef ST_THREADS - return (!ta & terminated); -#else - return terminated; -#endif - */ - return terminated; -} void signal_handler(int signal) { @@ -194,7 +150,7 @@ void print_req(struct xseg *xseg, struct xseg_request *req) strncpy(data, req_data, 63); data[63] = 0; printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n" - "src: %u, st: %u, dst: %u dt: %u\n" + "src: %u, transit: %u, dst: %u effective dst: %u\n" "target[%u]:'%s', data[%llu]:\n%s------------------\n\n", (unsigned long)(req), (unsigned int)req->op, @@ -203,9 +159,9 @@ void print_req(struct xseg *xseg, struct xseg_request *req) (unsigned long)req->serviced, (unsigned int)req->state, (unsigned int)req->src_portno, - (unsigned int)req->src_transit_portno, + (unsigned int)req->transit_portno, (unsigned int)req->dst_portno, - (unsigned int)req->dst_transit_portno, + (unsigned int)req->effective_dst_portno, (unsigned int)req->targetlen, target, (unsigned long long)req->datalen, data); } @@ -243,6 +199,39 @@ void log_pr(char *msg, struct peer_req *pr) } } +#ifdef MT +inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t) +{ + struct peer_req *pr; + xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no); + if (idx != Noneidx) + goto out; + + /* try to steal from another thread */ + /* + int i; + struct thread *nt; + for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) { + nt = &peer->thread[(t->thread_no + i) % peer->nr_threads]; + if (!xq_count(&nt->free_thread_reqs)) + continue; + idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no); + if (idx != Noneidx) + goto out; + } + */ + return NULL; +out: + pr = peer->peer_reqs + idx; + pr->thread_no = t - peer->thread; + return pr; +} +#else +/* + * free_reqs is a queue that simply contains pointer offsets to the peer_reqs + * queue. If a pointer from peer_reqs is popped, we are certain that the + * associated memory in peer_reqs is free to use + */ inline struct peer_req *alloc_peer_req(struct peerd *peer) { xqindex idx = xq_pop_head(&peer->free_reqs, 1); @@ -250,12 +239,39 @@ inline struct peer_req *alloc_peer_req(struct peerd *peer) return NULL; return peer->peer_reqs + idx; } +#endif inline void free_peer_req(struct peerd *peer, struct peer_req *pr) { xqindex idx = pr - peer->peer_reqs; pr->req = NULL; +#ifdef MT + struct thread *t = &peer->thread[pr->thread_no]; + xq_append_head(&t->free_thread_reqs, idx, 1); +#else xq_append_head(&peer->free_reqs, idx, 1); +#endif +} + +/* + * Count all free reqs in peer. + * Racy, if multithreaded, but the sum should monotonicly increase when checked + * after a termination signal is catched. + */ +int all_peer_reqs_free(struct peerd *peer) +{ + uint32_t free_reqs = 0; +#ifdef MT + int i; + for (i = 0; i < peer->nr_threads; i++) { + free_reqs += xq_count(&peer->thread[i].free_thread_reqs); + } +#else + free_reqs = xq_count(&peer->free_reqs); +#endif + if (free_reqs == peer->nr_ops) + return 1; + return 0; } struct timeval resp_start, resp_end, resp_accum = {0, 0}; @@ -271,11 +287,13 @@ void fail(struct peerd *peer, struct peer_req *pr) { struct xseg_request *req = pr->req; uint32_t p; - XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs)); - req->state |= XS_FAILED; - //xseg_set_req_data(peer->xseg, pr->req, NULL); - p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); - xseg_signal(peer->xseg, p); + if (req){ + XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs)); + req->state |= XS_FAILED; + //xseg_set_req_data(peer->xseg, pr->req, NULL); + p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); + xseg_signal(peer->xseg, p); + } free_peer_req(peer, pr); #ifdef MT wake_up_next_thread(peer); @@ -287,23 +305,25 @@ void complete(struct peerd *peer, struct peer_req *pr) { struct xseg_request *req = pr->req; uint32_t p; - req->state |= XS_SERVED; - //xseg_set_req_data(peer->xseg, pr->req, NULL); - //gettimeofday(&resp_start, NULL); - p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); - //gettimeofday(&resp_end, NULL); - //responds++; - //timersub(&resp_end, &resp_start, &resp_end); - //timeradd(&resp_end, &resp_accum, &resp_accum); - //printf("xseg_signal: %u\n", p); - xseg_signal(peer->xseg, p); + if (req){ + req->state |= XS_SERVED; + //xseg_set_req_data(peer->xseg, pr->req, NULL); + //gettimeofday(&resp_start, NULL); + p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); + //gettimeofday(&resp_end, NULL); + //responds++; + //timersub(&resp_end, &resp_start, &resp_end); + //timeradd(&resp_end, &resp_accum, &resp_accum); + //printf("xseg_signal: %u\n", p); + xseg_signal(peer->xseg, p); + } free_peer_req(peer, pr); #ifdef MT wake_up_next_thread(peer); #endif } -static void handle_accepted(struct peerd *peer, struct peer_req *pr, +static void handle_accepted(struct peerd *peer, struct peer_req *pr, struct xseg_request *req) { struct xseg_request *xreq = pr->req; @@ -355,7 +375,11 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr) return 0; } -static int check_ports(struct peerd *peer) +#ifdef MT +int check_ports(struct peerd *peer, struct thread *t) +#else +int check_ports(struct peerd *peer) +#endif { struct xseg *xseg = peer->xseg; xport portno_start = peer->portno_start; @@ -369,7 +393,11 @@ static int check_ports(struct peerd *peer) accepted = NULL; received = NULL; if (!isTerminate()) { +#ifdef MT + pr = alloc_peer_req(peer, t); +#else pr = alloc_peer_req(peer); +#endif if (pr) { accepted = xseg_accept(xseg, i, X_NONBLOCK); if (accepted) { @@ -410,19 +438,6 @@ static int check_ports(struct peerd *peer) } #ifdef MT -int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg) -{ - struct thread *t = alloc_thread(peer); - if (t) { - t->func = func; - t->arg = arg; - wake_up_thread(t); - return 0; - } else - // we could hijack a thread - return -1; -} - static void* thread_loop(void *arg) { struct thread *t = (struct thread *) arg; @@ -435,23 +450,13 @@ static void* thread_loop(void *arg) uint64_t threshold=1000/(1 + portno_end - portno_start); XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread)); - XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid); xseg_init_local_signal(xseg, peer->portno_start); for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) { - if (t->func) { - XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread)); - xseg_cancel_wait(xseg, peer->portno_start); - t->func(t->arg); - t->func = NULL; - t->arg = NULL; - continue; - } - - for(loops= threshold; loops > 0; loops--) { + for(loops = threshold; loops > 0; loops--) { if (loops == 1) xseg_prepare_wait(xseg, peer->portno_start); - if (check_ports(peer)) + if (check_ports(peer, t)) loops = threshold; } XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread)); @@ -464,83 +469,151 @@ static void* thread_loop(void *arg) return NULL; } +void *init_thread_loop(void *arg) +{ + struct thread *t = (struct thread *) arg; + struct peerd *peer = t->peer; + char *thread_id; + int i; + + /* + * We need an identifier for every thread that will spin in peerd_loop. + * The following code is a way to create a string of this format: + * "Thread " + * minus the null terminator. What we do is we create this string with + * snprintf and then resize it to exclude the null terminator with + * realloc. Finally, the result string is passed to the (void *arg) field + * of struct thread. + * + * Since the highest thread number can't be more than 5 digits, using 13 + * chars should be more than enough. + */ + thread_id = malloc(13 * sizeof(char)); + snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread); + for (i = 0; thread_id[i]; i++) {} + t->arg = (void *)realloc(thread_id, i-1); + pthread_setspecific(threadkey, t); + + //Start thread loop + (void)peer->peerd_loop(t); + + wake_up_next_thread(peer); + custom_peer_finalize(peer); + + return NULL; +} + int peerd_start_threads(struct peerd *peer) { int i; uint32_t nr_threads = peer->nr_threads; //TODO err check for (i = 0; i < nr_threads; i++) { + peer->thread[i].thread_no = i; peer->thread[i].peer = peer; - pthread_cond_init(&peer->thread[i].cond,NULL); - pthread_mutex_init(&peer->thread[i].lock, NULL); - pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i)); - peer->thread[i].func = NULL; - peer->thread[i].arg = NULL; + pthread_create(&peer->thread[i].tid, NULL, + init_thread_loop, (void *)(peer->thread + i)); + } + if (peer->interactive_func) + peer->interactive_func(); + for (i = 0; i < nr_threads; i++) { + pthread_join(peer->thread[i].tid, NULL); } + return 0; } #endif - -void defer_request(struct peerd *peer, struct peer_req *pr) +int defer_request(struct peerd *peer, struct peer_req *pr) { - // assert canDefer(peer); -// xseg_submit(peer->xseg, peer->defer_portno, pr->req); -// xseg_signal(peer->xseg, peer->defer_portno); -// free_peer_req(peer, pr); + int r; + xport p; + if (!canDefer(peer)){ + XSEGLOG2(&lc, E, "Peer cannot defer requests"); + return -1; + } + p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno, + X_ALLOC); + if (p == NoPort){ + XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req); + return -1; + } + r = xseg_signal(peer->xseg, p); + if (r < 0) { + XSEGLOG2(&lc, W, "Cannot signal port %lu", p); + } + free_peer_req(peer, pr); + return 0; } -static int peerd_loop(struct peerd *peer) +/* + * generic_peerd_loop is a general-purpose port-checker loop that is + * suitable both for multi-threaded and single-threaded peers. + */ +static int generic_peerd_loop(void *arg) { #ifdef MT - int i; - if (peer->interactive_func) - peer->interactive_func(); - for (i = 0; i < peer->nr_threads; i++) { - pthread_join(peer->thread[i].tid, NULL); - } + struct thread *t = (struct thread *) arg; + struct peerd *peer = t->peer; + char *id = t->arg; #else + struct peerd *peer = (struct peerd *) arg; + char id[4] = {'P','e','e','r'}; +#endif struct xseg *xseg = peer->xseg; xport portno_start = peer->portno_start; xport portno_end = peer->portno_end; + pid_t pid = syscall(SYS_gettid); uint64_t threshold=1000/(1 + portno_end - portno_start); - pid_t pid =syscall(SYS_gettid); uint64_t loops; - XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid); + XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid); xseg_init_local_signal(xseg, peer->portno_start); - for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) { - for(loops= threshold; loops > 0; loops--) { + //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) { + for (;!(isTerminate() && all_peer_reqs_free(peer));) { + //Heart of peerd_loop. This loop is common for everyone. + for(loops = threshold; loops > 0; loops--) { if (loops == 1) xseg_prepare_wait(xseg, peer->portno_start); +#ifdef MT + if (check_ports(peer, t)) +#else if (check_ports(peer)) +#endif loops = threshold; } #ifdef ST_THREADS if (ta){ st_sleep(0); - } else { -#endif - XSEGLOG2(&lc, I, "Peer goes to sleep\n"); - xseg_wait_signal(xseg, 10000000UL); - xseg_cancel_wait(xseg, peer->portno_start); - XSEGLOG2(&lc, I, "Peer woke up\n"); -#ifdef ST_THREADS + continue; } #endif + XSEGLOG2(&lc, I, "%s goes to sleep\n", id); + xseg_wait_signal(xseg, 10000000UL); + xseg_cancel_wait(xseg, peer->portno_start); + XSEGLOG2(&lc, I, "%s woke up\n", id); } + return 0; +} + +static int init_peerd_loop(struct peerd *peer) +{ + struct xseg *xseg = peer->xseg; + + peer->peerd_loop(peer); custom_peer_finalize(peer); xseg_quit_local_signal(xseg, peer->portno_start); -#endif + return 0; } #ifdef ST_THREADS static void * st_peerd_loop(void *peer) { - peerd_loop(peer); - return 0; + struct peerd *peerd = peer; + + return init_peerd_loop(peer); } #endif @@ -559,11 +632,13 @@ static struct xseg *join(char *spec) } static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, - long portno_end, uint32_t nr_threads, uint32_t defer_portno) + long portno_end, uint32_t nr_threads, xport defer_portno) { int i; struct peerd *peer; struct xseg_port *port; + void *sd = NULL; + xport p; #ifdef ST_THREADS st_init(); @@ -580,6 +655,20 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, peer->thread = calloc(nr_threads, sizeof(struct thread)); if (!peer->thread) goto malloc_fail; + if (!xq_alloc_empty(&peer->threads, nr_threads)) + goto malloc_fail; + for (i = 0; i < nr_threads; i++) { + if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops)) + goto malloc_fail; + } + for (i = 0; i < nr_ops; i++) { + __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i); + } + + pthread_key_create(&threadkey, NULL); +#else + if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops)) + goto malloc_fail; #endif peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req)); if (!peer->peer_reqs){ @@ -587,41 +676,33 @@ malloc_fail: perror("malloc"); return NULL; } - - if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops)) - goto malloc_fail; -#ifdef MT - if (!xq_alloc_empty(&peer->threads, nr_threads)) - goto malloc_fail; -#endif if (xseg_initialize()){ printf("cannot initialize library\n"); return NULL; } peer->xseg = join(spec); - if (!peer->xseg) + if (!peer->xseg) return NULL; peer->portno_start = (xport) portno_start; peer->portno_end= (xport) portno_end; - port = xseg_bind_port(peer->xseg, peer->portno_start, NULL); - if (!port){ - printf("cannot bind to port %u\n", (unsigned int) peer->portno_start); - return NULL; - } - xport p; - for (p = peer->portno_start + 1; p <= peer->portno_end; p++) { - struct xseg_port *tmp; - tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port)); - if (!tmp){ + /* + * Start binding ports from portno_start to portno_end. + * The first port we bind will have its signal_desc initialized by xseg + * and the same signal_desc will be used for all the other ports. + */ + for (p = peer->portno_start; p <= peer->portno_end; p++) { + port = xseg_bind_port(peer->xseg, p, sd); + if (!port){ printf("cannot bind to port %u\n", (unsigned int) p); return NULL; } + if (p == peer->portno_start) + sd = xseg_get_signal_desc(peer->xseg, port); } - printf("Peer on ports %u-%u\n", peer->portno_start, - peer->portno_end); + printf("Peer on ports %u-%u\n", peer->portno_start, peer->portno_end); for (i = 0; i < nr_ops; i++) { peer->peer_reqs[i].peer = peer; @@ -633,6 +714,10 @@ malloc_fail: peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check #endif } + + //Plug default peerd_loop. This can change later on by custom_peer_init. + peer->peerd_loop = generic_peerd_loop; + #ifdef MT peer->interactive_func = NULL; #endif @@ -682,7 +767,7 @@ int pidfile_read(char *path, pid_t *pid) int pidfile_open(char *path, pid_t *old_pid) { //nfs version > 3 - int fd = open(path, O_CREAT|O_EXCL|O_WRONLY); + int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR); if (fd < 0){ if (errno == EEXIST) pidfile_read(path, old_pid); @@ -725,7 +810,7 @@ int main(int argc, char *argv[]) uint32_t nr_ops = 16; uint32_t nr_threads = 1; unsigned int debug_level = 0; - uint32_t defer_portno = NoPort; + xport defer_portno = NoPort; pid_t old_pid; int pid_fd = -1; @@ -751,7 +836,7 @@ int main(int argc, char *argv[]) #ifdef MT READ_ARG_ULONG("-t", nr_threads); #endif -// READ_ARG_ULONG("-dp", defer_portno); + READ_ARG_ULONG("-dp", defer_portno); READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN); READ_ARG_BOOL("-d", daemonize); READ_ARG_BOOL("-h", help); @@ -819,16 +904,14 @@ int main(int argc, char *argv[]) r = custom_peer_init(peer, argc, argv); if (r < 0) goto out; -#ifdef MT +#if defined(MT) //TODO err check peerd_start_threads(peer); -#endif - -#ifdef ST_THREADS +#elif defined(ST_THREADS) st_thread_t st = st_thread_create(st_peerd_loop, peer, 1, 0); r = st_thread_join(st, NULL); #else - r = peerd_loop(peer); + r = init_peerd_loop(peer); #endif out: if (pid_fd > 0)