X-Git-Url: https://code.grnet.gr/git/archipelago/blobdiff_plain/b782cd3d1774de6bca36c39fa30548a333af20b8..de4c09bba20f45504f08f0697c177891655a99c4:/xseg/peers/user/peer.c diff --git a/xseg/peers/user/peer.c b/xseg/peers/user/peer.c index c2a2926..f3ab4eb 100644 --- a/xseg/peers/user/peer.c +++ b/xseg/peers/user/peer.c @@ -76,11 +76,11 @@ struct thread { pthread_t tid; pthread_cond_t cond; pthread_mutex_t lock; + int thread_no; void (*func)(void *arg); void *arg; }; - inline static struct thread* alloc_thread(struct peerd *peer) { xqindex idx = xq_pop_head(&peer->threads, 1); @@ -116,21 +116,10 @@ inline static int wake_up_next_thread(struct peerd *peer) } #endif - -static inline int isTerminate() -{ -/* ta doesn't need to be taken into account, because the main loops - * doesn't check the terminated flag if ta is not 0. +/* + * extern is needed if this function is going to be called by another file + * such as bench-xseg.c */ - /* -#ifdef ST_THREADS - return (!ta & terminated); -#else - return terminated; -#endif - */ - return terminated; -} void signal_handler(int signal) { @@ -303,7 +292,7 @@ void complete(struct peerd *peer, struct peer_req *pr) #endif } -static void handle_accepted(struct peerd *peer, struct peer_req *pr, +static void handle_accepted(struct peerd *peer, struct peer_req *pr, struct xseg_request *req) { struct xseg_request *xreq = pr->req; @@ -355,7 +344,7 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr) return 0; } -static int check_ports(struct peerd *peer) +int check_ports(struct peerd *peer) { struct xseg *xseg = peer->xseg; xport portno_start = peer->portno_start; @@ -410,19 +399,6 @@ static int check_ports(struct peerd *peer) } #ifdef MT -int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg) -{ - struct thread *t = alloc_thread(peer); - if (t) { - t->func = func; - t->arg = arg; - wake_up_thread(t); - return 0; - } else - // we could hijack a thread - return -1; -} - static void* thread_loop(void *arg) { struct thread *t = (struct thread *) arg; @@ -435,20 +411,10 @@ static void* thread_loop(void *arg) uint64_t threshold=1000/(1 + portno_end - portno_start); XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread)); - XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid); xseg_init_local_signal(xseg, peer->portno_start); for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) { - if (t->func) { - XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread)); - xseg_cancel_wait(xseg, peer->portno_start); - t->func(t->arg); - t->func = NULL; - t->arg = NULL; - continue; - } - - for(loops= threshold; loops > 0; loops--) { + for(loops = threshold; loops > 0; loops--) { if (loops == 1) xseg_prepare_wait(xseg, peer->portno_start); if (check_ports(peer)) @@ -464,25 +430,64 @@ static void* thread_loop(void *arg) return NULL; } +void *init_thread_loop(void *arg) +{ + struct thread *t = (struct thread *) arg; + struct peerd *peer = t->peer; + char *thread_id; + int i; + + /* + * We need an identifier for every thread that will spin in peerd_loop. + * The following code is a way to create a string of this format: + * "Thread " + * minus the null terminator. What we do is we create this string with + * snprintf and then resize it to exclude the null terminator with + * realloc. Finally, the result string is passed to the (void *arg) field + * of struct thread. + * + * Since the highest thread number can't be more than 5 digits, using 13 + * chars should be more than enough. + */ + thread_id = malloc(13 * sizeof(char)); + snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread); + for (i = 0; thread_id[i]; i++) {} + t->arg = (void *)realloc(thread_id, i-1); + + //Start thread loop + (void)peer->peerd_loop(t); + + wake_up_next_thread(peer); + custom_peer_finalize(peer); + + return NULL; +} + int peerd_start_threads(struct peerd *peer) { int i; uint32_t nr_threads = peer->nr_threads; //TODO err check for (i = 0; i < nr_threads; i++) { + peer->thread[i].func = NULL; + peer->thread[i].arg = NULL; peer->thread[i].peer = peer; pthread_cond_init(&peer->thread[i].cond,NULL); pthread_mutex_init(&peer->thread[i].lock, NULL); - pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i)); - peer->thread[i].func = NULL; - peer->thread[i].arg = NULL; + pthread_create(&peer->thread[i].tid, NULL, + init_thread_loop, (void *)(peer->thread + i)); + } + if (peer->interactive_func) + peer->interactive_func(); + for (i = 0; i < nr_threads; i++) { + pthread_join(peer->thread[i].tid, NULL); } + return 0; } #endif - int defer_request(struct peerd *peer, struct peer_req *pr) { int r; @@ -505,48 +510,68 @@ int defer_request(struct peerd *peer, struct peer_req *pr) return 0; } -static int peerd_loop(struct peerd *peer) +/* + * generic_peerd_loop is a general-purpose port-checker loop that is + * suitable both for multi-threaded and single-threaded peers. + */ +static int generic_peerd_loop(void *arg) { #ifdef MT - int i; - if (peer->interactive_func) - peer->interactive_func(); - for (i = 0; i < peer->nr_threads; i++) { - pthread_join(peer->thread[i].tid, NULL); - } + struct thread *t = (struct thread *) arg; + struct peerd *peer = t->peer; + char *id = t->arg; #else + struct peerd *peer = (struct peerd *) arg; + char id[4] = {'P','e','e','r'}; +#endif struct xseg *xseg = peer->xseg; xport portno_start = peer->portno_start; xport portno_end = peer->portno_end; + pid_t pid = syscall(SYS_gettid); uint64_t threshold=1000/(1 + portno_end - portno_start); - pid_t pid =syscall(SYS_gettid); uint64_t loops; - XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid); + XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid); xseg_init_local_signal(xseg, peer->portno_start); for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) { - for(loops= threshold; loops > 0; loops--) { - if (loops == 1) - xseg_prepare_wait(xseg, peer->portno_start); +#ifdef MT + if (t->func) { + XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread)); + xseg_cancel_wait(xseg, peer->portno_start); + t->func(t->arg); + t->func = NULL; + t->arg = NULL; + continue; + } +#endif + //Heart of peerd_loop. This loop is common for everyone. + for(loops = threshold; loops > 0; loops--) { if (check_ports(peer)) loops = threshold; } + xseg_prepare_wait(xseg, peer->portno_start); #ifdef ST_THREADS if (ta){ st_sleep(0); - } else { -#endif - XSEGLOG2(&lc, I, "Peer goes to sleep\n"); - xseg_wait_signal(xseg, 10000000UL); - xseg_cancel_wait(xseg, peer->portno_start); - XSEGLOG2(&lc, I, "Peer woke up\n"); -#ifdef ST_THREADS + continue; } #endif + XSEGLOG2(&lc, I, "%s goes to sleep\n", id); + xseg_wait_signal(xseg, 10000000UL); + xseg_cancel_wait(xseg, peer->portno_start); + XSEGLOG2(&lc, I, "%s woke up\n", id); } + return 0; +} + +static int init_peerd_loop(struct peerd *peer) +{ + struct xseg *xseg = peer->xseg; + + peer->peerd_loop(peer); custom_peer_finalize(peer); xseg_quit_local_signal(xseg, peer->portno_start); -#endif + return 0; } @@ -605,7 +630,7 @@ malloc_fail: return NULL; } peer->xseg = join(spec); - if (!peer->xseg) + if (!peer->xseg) return NULL; peer->portno_start = (xport) portno_start; @@ -635,6 +660,10 @@ malloc_fail: peer->peer_reqs[i].retval = 0; peer->peer_reqs[i].priv = NULL; peer->peer_reqs[i].portno = NoPort; + + //Plug default peerd_loop. This can change later on by custom_peer_init. + peer->peerd_loop = generic_peerd_loop; + #ifdef ST_THREADS peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check #endif @@ -822,16 +851,14 @@ int main(int argc, char *argv[]) r = custom_peer_init(peer, argc, argv); if (r < 0) goto out; -#ifdef MT +#if defined(MT) //TODO err check peerd_start_threads(peer); -#endif - -#ifdef ST_THREADS - st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0); +#elif defined(ST_THREADS) + st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0); r = st_thread_join(st, NULL); #else - r = peerd_loop(peer); + r = init_peerd_loop(peer); #endif out: if (pid_fd > 0)