struct peerd *global_peer;
static pthread_key_t threadkey;
-/*
-struct thread {
- struct peerd *peer;
- pthread_t tid;
- pthread_cond_t cond;
- pthread_mutex_t lock;
- int thread_no;
- void (*func)(void *arg);
- void *arg;
- void *priv;
- struct xq free_thread_reqs;
-};
-*/
-inline static struct thread* alloc_thread(struct peerd *peer)
-{
- xqindex idx = xq_pop_head(&peer->threads, 1);
- if (idx == Noneidx)
- return NULL;
- return peer->thread + idx;
-}
-
-inline static void free_thread(struct peerd *peer, struct thread *t)
-{
- xqindex idx = t - peer->thread;
- xq_append_head(&peer->threads, idx, 1);
-}
-
-
-inline static void __wake_up_thread(struct thread *t)
-{
- pthread_mutex_lock(&t->lock);
- pthread_cond_signal(&t->cond);
- pthread_mutex_unlock(&t->lock);
-}
-
-inline static void wake_up_thread(struct thread* t)
-{
- if (t){
- __wake_up_thread(t);
- }
-}
-
inline static int wake_up_next_thread(struct peerd *peer)
{
return (xseg_signal(peer->xseg, peer->portno_start));
inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
{
struct peer_req *pr;
- struct thread *nt;
- int i;
xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
if (idx != Noneidx)
goto out;
/* try to steal from another thread */
/*
+ int i;
+ struct thread *nt;
for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
if (!xq_count(&nt->free_thread_reqs))
int all_peer_reqs_free(struct peerd *peer)
{
uint32_t free_reqs = 0;
- int i;
#ifdef MT
+ int i;
for (i = 0; i < peer->nr_threads; i++) {
free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
}
uint32_t nr_threads = peer->nr_threads;
//TODO err check
for (i = 0; i < nr_threads; i++) {
- peer->thread[i].func = NULL;
- peer->thread[i].arg = NULL;
+ peer->thread[i].thread_no = i;
peer->thread[i].peer = peer;
- pthread_cond_init(&peer->thread[i].cond,NULL);
- pthread_mutex_init(&peer->thread[i].lock, NULL);
pthread_create(&peer->thread[i].tid, NULL,
init_thread_loop, (void *)(peer->thread + i));
}
xseg_init_local_signal(xseg, peer->portno_start);
//for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
for (;!(isTerminate() && all_peer_reqs_free(peer));) {
-#ifdef MT
- if (t->func) {
- XSEGLOG2(&lc, D, "%s executes function\n", id);
- xseg_cancel_wait(xseg, peer->portno_start);
- t->func(t->arg);
- t->func = NULL;
- t->arg = NULL;
- continue;
- }
-#endif
//Heart of peerd_loop. This loop is common for everyone.
for(loops = threshold; loops > 0; loops--) {
if (loops == 1)
if (!xq_alloc_empty(&peer->threads, nr_threads))
goto malloc_fail;
for (i = 0; i < nr_threads; i++) {
- peer->thread[i].thread_no = i;
- peer->thread[i].peer = peer;
if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
goto malloc_fail;
}