Peer checks if all peer requests are free in all thread, and then proceeds to
gracefull shutdown.
Also introduce pthread specific variable, holding the struct thread for each
thread.
#ifdef MT
struct peerd *global_peer;
#ifdef MT
struct peerd *global_peer;
+static pthread_key_t threadkey;
{
struct peer_req *pr;
struct thread *nt;
{
struct peer_req *pr;
struct thread *nt;
xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
if (idx != Noneidx)
goto out;
xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
if (idx != Noneidx)
goto out;
- /* try to steal from the next thread */
- nt = &peer->thread[(t->thread_no + 1) % peer->nr_threads];
- idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
- if (idx == Noneidx)
- return NULL;
+
+ /* try to steal from another thread */
+ /*
+ for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
+ nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
+ if (!xq_count(&nt->free_thread_reqs))
+ continue;
+ idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
+ if (idx != Noneidx)
+ goto out;
+ }
+ */
+ return NULL;
out:
pr = peer->peer_reqs + idx;
pr->thread_no = t - peer->thread;
out:
pr = peer->peer_reqs + idx;
pr->thread_no = t - peer->thread;
+/*
+ * Count all free reqs in peer.
+ * Racy, if multithreaded, but the sum should monotonicly increase when checked
+ * after a termination signal is catched.
+ */
+int all_peer_reqs_free(struct peerd *peer)
+{
+ uint32_t free_reqs = 0;
+ int i;
+#ifdef MT
+ for (i = 0; i < peer->nr_threads; i++) {
+ free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
+ }
+#else
+ free_reqs = xq_count(&peer->free_reqs);
+#endif
+ if (free_reqs == peer->nr_ops)
+ return 1;
+ return 0;
+}
+
struct timeval resp_start, resp_end, resp_accum = {0, 0};
uint64_t responds = 0;
void get_responds_stats(){
struct timeval resp_start, resp_end, resp_accum = {0, 0};
uint64_t responds = 0;
void get_responds_stats(){
for (i = portno_start; i <= portno_end; i++) {
accepted = NULL;
received = NULL;
for (i = portno_start; i <= portno_end; i++) {
accepted = NULL;
received = NULL;
- //Shouldn't we just leave?
- //Better way than alloc/free all the time?
- //Cache the allocated peer_req?
#ifdef MT
pr = alloc_peer_req(peer, t);
#else
#ifdef MT
pr = alloc_peer_req(peer, t);
#else
snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
for (i = 0; thread_id[i]; i++) {}
t->arg = (void *)realloc(thread_id, i-1);
snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
for (i = 0; thread_id[i]; i++) {}
t->arg = (void *)realloc(thread_id, i-1);
+ pthread_setspecific(threadkey, t);
//Start thread loop
(void)peer->peerd_loop(t);
//Start thread loop
(void)peer->peerd_loop(t);
XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
xseg_init_local_signal(xseg, peer->portno_start);
XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
xseg_init_local_signal(xseg, peer->portno_start);
- for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
+ //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
+ for (;!(isTerminate() && all_peer_reqs_free(peer));) {
#ifdef MT
if (t->func) {
XSEGLOG2(&lc, D, "%s executes function\n", id);
#ifdef MT
if (t->func) {
XSEGLOG2(&lc, D, "%s executes function\n", id);
goto malloc_fail;
for (i = 0; i < nr_threads; i++) {
peer->thread[i].thread_no = i;
goto malloc_fail;
for (i = 0; i < nr_threads; i++) {
peer->thread[i].thread_no = i;
+ peer->thread[i].peer = peer;
if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads))
goto malloc_fail;
}
if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads))
goto malloc_fail;
}
__xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
}
__xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
}
+ pthread_key_create(&threadkey, NULL);
#else
if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
goto malloc_fail;
#else
if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
goto malloc_fail;
struct thread *thread;
struct xq threads;
void (*interactive_func)(void);
struct thread *thread;
struct xq threads;
void (*interactive_func)(void);
void get_responds_stats();
void usage();
void print_req(struct xseg *xseg, struct xseg_request *req);
void get_responds_stats();
void usage();
void print_req(struct xseg *xseg, struct xseg_request *req);
+int all_peer_reqs_free(struct peerd *peer);
#ifdef MT
int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg);
#ifdef MT
int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg);