Revision cb5cf301 xseg/peers/user/peer.c
b/xseg/peers/user/peer.c | ||
---|---|---|
77 | 77 |
struct peerd *global_peer; |
78 | 78 |
static pthread_key_t threadkey; |
79 | 79 |
|
80 |
/* |
|
81 |
struct thread { |
|
82 |
struct peerd *peer; |
|
83 |
pthread_t tid; |
|
84 |
pthread_cond_t cond; |
|
85 |
pthread_mutex_t lock; |
|
86 |
int thread_no; |
|
87 |
void (*func)(void *arg); |
|
88 |
void *arg; |
|
89 |
void *priv; |
|
90 |
struct xq free_thread_reqs; |
|
91 |
}; |
|
92 |
*/ |
|
93 |
inline static struct thread* alloc_thread(struct peerd *peer) |
|
94 |
{ |
|
95 |
xqindex idx = xq_pop_head(&peer->threads, 1); |
|
96 |
if (idx == Noneidx) |
|
97 |
return NULL; |
|
98 |
return peer->thread + idx; |
|
99 |
} |
|
100 |
|
|
101 |
inline static void free_thread(struct peerd *peer, struct thread *t) |
|
102 |
{ |
|
103 |
xqindex idx = t - peer->thread; |
|
104 |
xq_append_head(&peer->threads, idx, 1); |
|
105 |
} |
|
106 |
|
|
107 |
|
|
108 |
inline static void __wake_up_thread(struct thread *t) |
|
109 |
{ |
|
110 |
pthread_mutex_lock(&t->lock); |
|
111 |
pthread_cond_signal(&t->cond); |
|
112 |
pthread_mutex_unlock(&t->lock); |
|
113 |
} |
|
114 |
|
|
115 |
inline static void wake_up_thread(struct thread* t) |
|
116 |
{ |
|
117 |
if (t){ |
|
118 |
__wake_up_thread(t); |
|
119 |
} |
|
120 |
} |
|
121 |
|
|
122 | 80 |
inline static int wake_up_next_thread(struct peerd *peer) |
123 | 81 |
{ |
124 | 82 |
return (xseg_signal(peer->xseg, peer->portno_start)); |
... | ... | |
245 | 203 |
inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t) |
246 | 204 |
{ |
247 | 205 |
struct peer_req *pr; |
248 |
struct thread *nt; |
|
249 |
int i; |
|
250 | 206 |
xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no); |
251 | 207 |
if (idx != Noneidx) |
252 | 208 |
goto out; |
253 | 209 |
|
254 | 210 |
/* try to steal from another thread */ |
255 | 211 |
/* |
212 |
int i; |
|
213 |
struct thread *nt; |
|
256 | 214 |
for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) { |
257 | 215 |
nt = &peer->thread[(t->thread_no + i) % peer->nr_threads]; |
258 | 216 |
if (!xq_count(&nt->free_thread_reqs)) |
... | ... | |
303 | 261 |
int all_peer_reqs_free(struct peerd *peer) |
304 | 262 |
{ |
305 | 263 |
uint32_t free_reqs = 0; |
306 |
int i; |
|
307 | 264 |
#ifdef MT |
265 |
int i; |
|
308 | 266 |
for (i = 0; i < peer->nr_threads; i++) { |
309 | 267 |
free_reqs += xq_count(&peer->thread[i].free_thread_reqs); |
310 | 268 |
} |
... | ... | |
551 | 509 |
uint32_t nr_threads = peer->nr_threads; |
552 | 510 |
//TODO err check |
553 | 511 |
for (i = 0; i < nr_threads; i++) { |
554 |
peer->thread[i].func = NULL; |
|
555 |
peer->thread[i].arg = NULL; |
|
512 |
peer->thread[i].thread_no = i; |
|
556 | 513 |
peer->thread[i].peer = peer; |
557 |
pthread_cond_init(&peer->thread[i].cond,NULL); |
|
558 |
pthread_mutex_init(&peer->thread[i].lock, NULL); |
|
559 | 514 |
pthread_create(&peer->thread[i].tid, NULL, |
560 | 515 |
init_thread_loop, (void *)(peer->thread + i)); |
561 | 516 |
} |
... | ... | |
617 | 572 |
xseg_init_local_signal(xseg, peer->portno_start); |
618 | 573 |
//for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) { |
619 | 574 |
for (;!(isTerminate() && all_peer_reqs_free(peer));) { |
620 |
#ifdef MT |
|
621 |
if (t->func) { |
|
622 |
XSEGLOG2(&lc, D, "%s executes function\n", id); |
|
623 |
xseg_cancel_wait(xseg, peer->portno_start); |
|
624 |
t->func(t->arg); |
|
625 |
t->func = NULL; |
|
626 |
t->arg = NULL; |
|
627 |
continue; |
|
628 |
} |
|
629 |
#endif |
|
630 | 575 |
//Heart of peerd_loop. This loop is common for everyone. |
631 | 576 |
for(loops = threshold; loops > 0; loops--) { |
632 | 577 |
if (loops == 1) |
... | ... | |
704 | 649 |
if (!xq_alloc_empty(&peer->threads, nr_threads)) |
705 | 650 |
goto malloc_fail; |
706 | 651 |
for (i = 0; i < nr_threads; i++) { |
707 |
peer->thread[i].thread_no = i; |
|
708 |
peer->thread[i].peer = peer; |
|
709 | 652 |
if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops)) |
710 | 653 |
goto malloc_fail; |
711 | 654 |
} |
Also available in: Unified diff