#ifdef MT
struct peerd *global_peer;
+/*
struct thread {
struct peerd *peer;
pthread_t tid;
void (*func)(void *arg);
void *arg;
void *priv;
+ struct xq free_thread_reqs;
};
-
+*/
inline static struct thread* alloc_thread(struct peerd *peer)
{
xqindex idx = xq_pop_head(&peer->threads, 1);
}
}
+#ifdef MT
+inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
+{
+ struct peer_req *pr;
+ struct thread *nt;
+ xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
+ if (idx != Noneidx)
+ goto out;
+ /* try to steal from the next thread */
+ nt = &peer->thread[(t->thread_no + 1) % peer->nr_threads];
+ idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
+ if (idx == Noneidx)
+ return NULL;
+out:
+ pr = peer->peer_reqs + idx;
+ pr->thread_no = t - peer->thread;
+ return pr;
+}
+#else
/*
* free_reqs is a queue that simply contains pointer offsets to the peer_reqs
* queue. If a pointer from peer_reqs is popped, we are certain that the
return NULL;
return peer->peer_reqs + idx;
}
+#endif
inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
{
xqindex idx = pr - peer->peer_reqs;
pr->req = NULL;
+#ifdef MT
+ struct thread *t = &peer->thread[pr->thread_no];
+ xq_append_head(&t->free_thread_reqs, idx, 1);
+#else
xq_append_head(&peer->free_reqs, idx, 1);
+#endif
}
struct timeval resp_start, resp_end, resp_accum = {0, 0};
return 0;
}
+#ifdef MT
+int check_ports(struct peerd *peer, struct thread *t)
+#else
int check_ports(struct peerd *peer)
+#endif
{
struct xseg *xseg = peer->xseg;
xport portno_start = peer->portno_start;
if (!isTerminate()) {
//Better way than alloc/free all the time?
//Cache the allocated peer_req?
+#ifdef MT
+ pr = alloc_peer_req(peer, t);
+#else
pr = alloc_peer_req(peer);
+#endif
if (pr) {
accepted = xseg_accept(xseg, i, X_NONBLOCK);
if (accepted) {
for(loops = threshold; loops > 0; loops--) {
if (loops == 1)
xseg_prepare_wait(xseg, peer->portno_start);
- if (check_ports(peer))
+ if (check_ports(peer, t))
loops = threshold;
}
XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
for(loops = threshold; loops > 0; loops--) {
if (loops == 1)
xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+ if (check_ports(peer, t))
+#else
if (check_ports(peer))
+#endif
loops = threshold;
}
#ifdef ST_THREADS
peer->thread = calloc(nr_threads, sizeof(struct thread));
if (!peer->thread)
goto malloc_fail;
+ if (!xq_alloc_empty(&peer->threads, nr_threads))
+ goto malloc_fail;
+ for (i = 0; i < nr_threads; i++) {
+ peer->thread[i].thread_no = i;
+ if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops/nr_threads))
+ goto malloc_fail;
+ }
+ for (i = 0; i < nr_ops; i++) {
+ __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
+ }
+
+#else
+ if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
+ goto malloc_fail;
#endif
peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
if (!peer->peer_reqs){
perror("malloc");
return NULL;
}
-
- if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
- goto malloc_fail;
-#ifdef MT
- if (!xq_alloc_empty(&peer->threads, nr_threads))
- goto malloc_fail;
-#endif
if (xseg_initialize()){
printf("cannot initialize library\n");
return NULL;
peer->interactive_func = NULL;
#endif
return peer;
+
}
int pidfile_remove(char *path, int fd)
#ifdef ST_THREADS
st_cond_t cond;
#endif
+#ifdef MT
+ int thread_no;
+#endif
+};
+
+struct thread {
+ struct peerd *peer;
+ pthread_t tid;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+ int thread_no;
+ void (*func)(void *arg);
+ void *arg;
+ void *priv;
+ struct xq free_thread_reqs;
};
struct peerd {
void pending(struct peerd *peer, struct peer_req *req);
void log_pr(char *msg, struct peer_req *pr);
int canDefer(struct peerd *peer);
-int submit_peer_req(struct peerd *peer, struct peer_req *pr);
-struct peer_req *alloc_peer_req(struct peerd *peer);
void free_peer_req(struct peerd *peer, struct peer_req *pr);
+int submit_peer_req(struct peerd *peer, struct peer_req *pr);
void get_submits_stats();
void get_responds_stats();
void usage();
void print_req(struct xseg *xseg, struct xseg_request *req);
-int check_ports(struct peerd *peer);
#ifdef MT
int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg);
+struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t);
+int check_ports(struct peerd *peer, struct thread *t);
+#else
+struct peer_req *alloc_peer_req(struct peerd *peer);
+int check_ports(struct peerd *peer);
#endif
static inline struct peerd * __get_peerd(void * custom_peerd)