Revision cb5cf301

b/xseg/peers/user/peer.c
77 77
struct peerd *global_peer;
78 78
static pthread_key_t threadkey;
79 79

  
80
/*
81
struct thread {
82
	struct peerd *peer;
83
	pthread_t tid;
84
	pthread_cond_t cond;
85
	pthread_mutex_t lock;
86
	int thread_no;
87
	void (*func)(void *arg);
88
	void *arg;
89
	void *priv;
90
	struct xq free_thread_reqs;
91
};
92
*/
93
inline static struct thread* alloc_thread(struct peerd *peer)
94
{
95
	xqindex idx = xq_pop_head(&peer->threads, 1);
96
	if (idx == Noneidx)
97
		return NULL;
98
	return peer->thread + idx;
99
}
100

  
101
inline static void free_thread(struct peerd *peer, struct thread *t)
102
{
103
	xqindex idx = t - peer->thread;
104
	xq_append_head(&peer->threads, idx, 1);
105
}
106

  
107

  
108
inline static void __wake_up_thread(struct thread *t)
109
{
110
	pthread_mutex_lock(&t->lock);
111
	pthread_cond_signal(&t->cond);
112
	pthread_mutex_unlock(&t->lock);
113
}
114

  
115
inline static void wake_up_thread(struct thread* t)
116
{
117
	if (t){
118
		__wake_up_thread(t);
119
	}
120
}
121

  
122 80
inline static int wake_up_next_thread(struct peerd *peer)
123 81
{
124 82
	return (xseg_signal(peer->xseg, peer->portno_start));
......
245 203
inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
246 204
{
247 205
	struct peer_req *pr;
248
	struct thread *nt;
249
	int i;
250 206
	xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
251 207
	if (idx != Noneidx)
252 208
		goto out;
253 209

  
254 210
	/* try to steal from another thread */
255 211
	/*
212
	int i;
213
	struct thread *nt;
256 214
	for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
257 215
		nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
258 216
		if (!xq_count(&nt->free_thread_reqs))
......
303 261
int all_peer_reqs_free(struct peerd *peer)
304 262
{
305 263
	uint32_t free_reqs = 0;
306
	int i;
307 264
#ifdef MT
265
	int i;
308 266
	for (i = 0; i < peer->nr_threads; i++) {
309 267
		free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
310 268
	}
......
551 509
	uint32_t nr_threads = peer->nr_threads;
552 510
	//TODO err check
553 511
	for (i = 0; i < nr_threads; i++) {
554
		peer->thread[i].func = NULL;
555
		peer->thread[i].arg = NULL;
512
		peer->thread[i].thread_no = i;
556 513
		peer->thread[i].peer = peer;
557
		pthread_cond_init(&peer->thread[i].cond,NULL);
558
		pthread_mutex_init(&peer->thread[i].lock, NULL);
559 514
		pthread_create(&peer->thread[i].tid, NULL,
560 515
					init_thread_loop, (void *)(peer->thread + i));
561 516
	}
......
617 572
	xseg_init_local_signal(xseg, peer->portno_start);
618 573
	//for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
619 574
	for (;!(isTerminate() && all_peer_reqs_free(peer));) {
620
#ifdef MT
621
		if (t->func) {
622
			XSEGLOG2(&lc, D, "%s executes function\n", id);
623
			xseg_cancel_wait(xseg, peer->portno_start);
624
			t->func(t->arg);
625
			t->func = NULL;
626
			t->arg = NULL;
627
			continue;
628
		}
629
#endif
630 575
		//Heart of peerd_loop. This loop is common for everyone.
631 576
		for(loops = threshold; loops > 0; loops--) {
632 577
			if (loops == 1)
......
704 649
	if (!xq_alloc_empty(&peer->threads, nr_threads))
705 650
		goto malloc_fail;
706 651
	for (i = 0; i < nr_threads; i++) {
707
		peer->thread[i].thread_no = i;
708
		peer->thread[i].peer = peer;
709 652
		if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
710 653
			goto malloc_fail;
711 654
	}
b/xseg/peers/user/peer.h
89 89
};
90 90

  
91 91
struct thread {
92
	struct peerd *peer;
93 92
	pthread_t tid;
94
	pthread_cond_t cond;
95
	pthread_mutex_t lock;
93
	struct peerd *peer;
96 94
	int thread_no;
97
	void (*func)(void *arg);
98
	void *arg;
99
	void *priv;
100 95
	struct xq free_thread_reqs;
96
	void *priv;
97
	void *arg;
101 98
};
102 99

  
103 100
struct peerd {

Also available in: Unified diff