77 |
77 |
struct peerd *global_peer;
|
78 |
78 |
static pthread_key_t threadkey;
|
79 |
79 |
|
80 |
|
/*
|
81 |
|
struct thread {
|
82 |
|
struct peerd *peer;
|
83 |
|
pthread_t tid;
|
84 |
|
pthread_cond_t cond;
|
85 |
|
pthread_mutex_t lock;
|
86 |
|
int thread_no;
|
87 |
|
void (*func)(void *arg);
|
88 |
|
void *arg;
|
89 |
|
void *priv;
|
90 |
|
struct xq free_thread_reqs;
|
91 |
|
};
|
92 |
|
*/
|
93 |
|
inline static struct thread* alloc_thread(struct peerd *peer)
|
94 |
|
{
|
95 |
|
xqindex idx = xq_pop_head(&peer->threads, 1);
|
96 |
|
if (idx == Noneidx)
|
97 |
|
return NULL;
|
98 |
|
return peer->thread + idx;
|
99 |
|
}
|
100 |
|
|
101 |
|
inline static void free_thread(struct peerd *peer, struct thread *t)
|
102 |
|
{
|
103 |
|
xqindex idx = t - peer->thread;
|
104 |
|
xq_append_head(&peer->threads, idx, 1);
|
105 |
|
}
|
106 |
|
|
107 |
|
|
108 |
|
inline static void __wake_up_thread(struct thread *t)
|
109 |
|
{
|
110 |
|
pthread_mutex_lock(&t->lock);
|
111 |
|
pthread_cond_signal(&t->cond);
|
112 |
|
pthread_mutex_unlock(&t->lock);
|
113 |
|
}
|
114 |
|
|
115 |
|
inline static void wake_up_thread(struct thread* t)
|
116 |
|
{
|
117 |
|
if (t){
|
118 |
|
__wake_up_thread(t);
|
119 |
|
}
|
120 |
|
}
|
121 |
|
|
122 |
80 |
inline static int wake_up_next_thread(struct peerd *peer)
|
123 |
81 |
{
|
124 |
82 |
return (xseg_signal(peer->xseg, peer->portno_start));
|
... | ... | |
245 |
203 |
inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
|
246 |
204 |
{
|
247 |
205 |
struct peer_req *pr;
|
248 |
|
struct thread *nt;
|
249 |
|
int i;
|
250 |
206 |
xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
|
251 |
207 |
if (idx != Noneidx)
|
252 |
208 |
goto out;
|
253 |
209 |
|
254 |
210 |
/* try to steal from another thread */
|
255 |
211 |
/*
|
|
212 |
int i;
|
|
213 |
struct thread *nt;
|
256 |
214 |
for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
|
257 |
215 |
nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
|
258 |
216 |
if (!xq_count(&nt->free_thread_reqs))
|
... | ... | |
303 |
261 |
int all_peer_reqs_free(struct peerd *peer)
|
304 |
262 |
{
|
305 |
263 |
uint32_t free_reqs = 0;
|
306 |
|
int i;
|
307 |
264 |
#ifdef MT
|
|
265 |
int i;
|
308 |
266 |
for (i = 0; i < peer->nr_threads; i++) {
|
309 |
267 |
free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
|
310 |
268 |
}
|
... | ... | |
551 |
509 |
uint32_t nr_threads = peer->nr_threads;
|
552 |
510 |
//TODO err check
|
553 |
511 |
for (i = 0; i < nr_threads; i++) {
|
554 |
|
peer->thread[i].func = NULL;
|
555 |
|
peer->thread[i].arg = NULL;
|
|
512 |
peer->thread[i].thread_no = i;
|
556 |
513 |
peer->thread[i].peer = peer;
|
557 |
|
pthread_cond_init(&peer->thread[i].cond,NULL);
|
558 |
|
pthread_mutex_init(&peer->thread[i].lock, NULL);
|
559 |
514 |
pthread_create(&peer->thread[i].tid, NULL,
|
560 |
515 |
init_thread_loop, (void *)(peer->thread + i));
|
561 |
516 |
}
|
... | ... | |
617 |
572 |
xseg_init_local_signal(xseg, peer->portno_start);
|
618 |
573 |
//for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
|
619 |
574 |
for (;!(isTerminate() && all_peer_reqs_free(peer));) {
|
620 |
|
#ifdef MT
|
621 |
|
if (t->func) {
|
622 |
|
XSEGLOG2(&lc, D, "%s executes function\n", id);
|
623 |
|
xseg_cancel_wait(xseg, peer->portno_start);
|
624 |
|
t->func(t->arg);
|
625 |
|
t->func = NULL;
|
626 |
|
t->arg = NULL;
|
627 |
|
continue;
|
628 |
|
}
|
629 |
|
#endif
|
630 |
575 |
//Heart of peerd_loop. This loop is common for everyone.
|
631 |
576 |
for(loops = threshold; loops > 0; loops--) {
|
632 |
577 |
if (loops == 1)
|
... | ... | |
704 |
649 |
if (!xq_alloc_empty(&peer->threads, nr_threads))
|
705 |
650 |
goto malloc_fail;
|
706 |
651 |
for (i = 0; i < nr_threads; i++) {
|
707 |
|
peer->thread[i].thread_no = i;
|
708 |
|
peer->thread[i].peer = peer;
|
709 |
652 |
if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
|
710 |
653 |
goto malloc_fail;
|
711 |
654 |
}
|