From: Filippos Giannakos Date: Thu, 14 Mar 2013 09:27:34 +0000 (+0200) Subject: Fix peer termination. X-Git-Tag: 0.3~21^2~3 X-Git-Url: https://code.grnet.gr/git/archipelago/commitdiff_plain/840248c47b3f50b1f6132fd50f4886d02c1e28ee Fix peer termination. Peer checks if all peer requests are free in all thread, and then proceeds to gracefull shutdown. Also introduce pthread specific variable, holding the struct thread for each thread. --- diff --git a/xseg/peers/user/peer.c b/xseg/peers/user/peer.c index 5021849..f082a8b 100644 --- a/xseg/peers/user/peer.c +++ b/xseg/peers/user/peer.c @@ -69,6 +69,7 @@ uint32_t ta = 0; #ifdef MT struct peerd *global_peer; +static pthread_key_t threadkey; /* struct thread { @@ -239,14 +240,23 @@ inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t) { struct peer_req *pr; struct thread *nt; + int i; xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no); if (idx != Noneidx) goto out; - /* try to steal from the next thread */ - nt = &peer->thread[(t->thread_no + 1) % peer->nr_threads]; - idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no); - if (idx == Noneidx) - return NULL; + + /* try to steal from another thread */ + /* + for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) { + nt = &peer->thread[(t->thread_no + i) % peer->nr_threads]; + if (!xq_count(&nt->free_thread_reqs)) + continue; + idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no); + if (idx != Noneidx) + goto out; + } + */ + return NULL; out: pr = peer->peer_reqs + idx; pr->thread_no = t - peer->thread; @@ -279,6 +289,27 @@ inline void free_peer_req(struct peerd *peer, struct peer_req *pr) #endif } +/* + * Count all free reqs in peer. + * Racy, if multithreaded, but the sum should monotonicly increase when checked + * after a termination signal is catched. + */ +int all_peer_reqs_free(struct peerd *peer) +{ + uint32_t free_reqs = 0; + int i; +#ifdef MT + for (i = 0; i < peer->nr_threads; i++) { + free_reqs += xq_count(&peer->thread[i].free_thread_reqs); + } +#else + free_reqs = xq_count(&peer->free_reqs); +#endif + if (free_reqs == peer->nr_ops) + return 1; + return 0; +} + struct timeval resp_start, resp_end, resp_accum = {0, 0}; uint64_t responds = 0; void get_responds_stats(){ @@ -397,10 +428,7 @@ int check_ports(struct peerd *peer) for (i = portno_start; i <= portno_end; i++) { accepted = NULL; received = NULL; - //Shouldn't we just leave? if (!isTerminate()) { - //Better way than alloc/free all the time? - //Cache the allocated peer_req? #ifdef MT pr = alloc_peer_req(peer, t); #else @@ -500,6 +528,7 @@ void *init_thread_loop(void *arg) snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread); for (i = 0; thread_id[i]; i++) {} t->arg = (void *)realloc(thread_id, i-1); + pthread_setspecific(threadkey, t); //Start thread loop (void)peer->peerd_loop(t); @@ -580,7 +609,8 @@ static int generic_peerd_loop(void *arg) XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid); xseg_init_local_signal(xseg, peer->portno_start); - for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) { + //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) { + for (;!(isTerminate() && all_peer_reqs_free(peer));) { #ifdef MT if (t->func) { XSEGLOG2(&lc, D, "%s executes function\n", id); @@ -669,6 +699,7 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, goto malloc_fail; for (i = 0; i < nr_threads; i++) { peer->thread[i].thread_no = i; + peer->thread[i].peer = peer; if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads)) goto malloc_fail; } @@ -676,6 +707,7 @@ static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i); } + pthread_key_create(&threadkey, NULL); #else if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops)) goto malloc_fail; diff --git a/xseg/peers/user/peer.h b/xseg/peers/user/peer.h index 3ea8a91..77abb83 100644 --- a/xseg/peers/user/peer.h +++ b/xseg/peers/user/peer.h @@ -115,6 +115,7 @@ struct peerd { struct thread *thread; struct xq threads; void (*interactive_func)(void); +#else #endif }; @@ -136,6 +137,7 @@ void get_submits_stats(); void get_responds_stats(); void usage(); void print_req(struct xseg *xseg, struct xseg_request *req); +int all_peer_reqs_free(struct peerd *peer); #ifdef MT int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg);