pthread_t tid;
pthread_cond_t cond;
pthread_mutex_t lock;
+ int thread_no;
void (*func)(void *arg);
void *arg;
};
#endif
}
-static void handle_accepted(struct peerd *peer, struct peer_req *pr,
+static void handle_accepted(struct peerd *peer, struct peer_req *pr,
struct xseg_request *req)
{
struct xseg_request *xreq = pr->req;
}
#ifdef MT
-static int peerd_loop(void *arg);
-
static void* thread_loop(void *arg)
{
struct thread *t = (struct thread *) arg;
return NULL;
}
+void *init_thread_loop(void *arg)
+{
+ struct thread *t = (struct thread *) arg;
+ struct peerd *peer = t->peer;
+ char *thread_id;
+ int i;
+
+ /*
+ * We need an identifier for every thread that will spin in peerd_loop.
+ * The following code is a way to create a string of this format:
+ * "Thread <num>"
+ * minus the null terminator. What we do is we create this string with
+ * snprintf and then resize it to exclude the null terminator with
+ * realloc. Finally, the result string is passed to the (void *arg) field
+ * of struct thread.
+ *
+ * Since the highest thread number can't be more than 5 digits, using 13
+ * chars should be more than enough.
+ */
+ thread_id = malloc(13 * sizeof(char));
+ snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
+ for (i = 0; thread_id[i]; i++) {}
+ t->arg = (void *)realloc(thread_id, i-1);
+
+ //Start thread loop
+ (void)peer->peerd_loop(t);
+
+ wake_up_next_thread(peer);
+ custom_peer_finalize(peer);
+
+ return NULL;
+}
+
int peerd_start_threads(struct peerd *peer)
{
int i;
uint32_t nr_threads = peer->nr_threads;
//TODO err check
for (i = 0; i < nr_threads; i++) {
+ peer->thread[i].func = NULL;
+ peer->thread[i].arg = NULL;
peer->thread[i].peer = peer;
pthread_cond_init(&peer->thread[i].cond,NULL);
pthread_mutex_init(&peer->thread[i].lock, NULL);
- if (peer->custom_peerd_loop)
- pthread_create(&peer->thread[i].tid, NULL, peer->custom_peerd_loop, (void *)(peer->thread + i));
- else
- pthread_create(&peer->thread[i].tid, NULL, peerd_loop, (void *)(peer->thread + i));
- peer->thread[i].func = NULL;
- peer->thread[i].arg = NULL;
+ pthread_create(&peer->thread[i].tid, NULL,
+ init_thread_loop, (void *)(peer->thread + i));
}
+ if (peer->interactive_func)
+ peer->interactive_func();
for (i = 0; i < nr_threads; i++) {
pthread_join(peer->thread[i].tid, NULL);
}
- //?: Is this re-ordering acceptable?
- if (peer->interactive_func)
- peer->interactive_func();
return 0;
}
#endif
-
int defer_request(struct peerd *peer, struct peer_req *pr)
{
int r;
return 0;
}
-static int peerd_loop(void *arg)
+/*
+ * generic_peerd_loop is a general-purpose port-checker loop that is
+ * suitable both for multi-threaded and single-threaded peers.
+ */
+static int generic_peerd_loop(void *arg)
{
#ifdef MT
struct thread *t = (struct thread *) arg;
struct peerd *peer = t->peer;
+ char *id = t->arg;
#else
struct peerd *peer = (struct peerd *) arg;
+ char id[4] = {'P','e','e','r'};
#endif
struct xseg *xseg = peer->xseg;
xport portno_start = peer->portno_start;
xport portno_end = peer->portno_end;
- pid_t pid =syscall(SYS_gettid);
+ pid_t pid = syscall(SYS_gettid);
uint64_t threshold=1000/(1 + portno_end - portno_start);
uint64_t loops;
- XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
+ XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
xseg_init_local_signal(xseg, peer->portno_start);
for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
- for(loops= threshold; loops > 0; loops--) {
- if (loops == 1)
- xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+ if (t->func) {
+ XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
+ xseg_cancel_wait(xseg, peer->portno_start);
+ t->func(t->arg);
+ t->func = NULL;
+ t->arg = NULL;
+ continue;
+ }
+#endif
+ //Heart of peerd_loop. This loop is common for everyone.
+ for(loops = threshold; loops > 0; loops--) {
if (check_ports(peer))
loops = threshold;
}
+ xseg_prepare_wait(xseg, peer->portno_start);
#ifdef ST_THREADS
if (ta){
st_sleep(0);
continue;
}
#endif
- XSEGLOG2(&lc, I, "Peer goes to sleep\n");
+ XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
xseg_wait_signal(xseg, 10000000UL);
xseg_cancel_wait(xseg, peer->portno_start);
- XSEGLOG2(&lc, I, "Peer woke up\n");
+ XSEGLOG2(&lc, I, "%s woke up\n", id);
}
-#ifdef MT
- wake_up_next_thread(peer);
- custom_peer_finalize(peer);
-#else
+ return 0;
+}
+
+static int init_peerd_loop(struct peerd *peer)
+{
+ struct xseg *xseg = peer->xseg;
+
+ peer->peerd_loop(peer);
custom_peer_finalize(peer);
xseg_quit_local_signal(xseg, peer->portno_start);
-#endif
+
return 0;
}
peer->peer_reqs[i].retval = 0;
peer->peer_reqs[i].priv = NULL;
peer->peer_reqs[i].portno = NoPort;
+
+ //Plug default peerd_loop. This can change later on by custom_peer_init.
+ peer->peerd_loop = generic_peerd_loop;
+
#ifdef ST_THREADS
peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
#endif
r = custom_peer_init(peer, argc, argv);
if (r < 0)
goto out;
-#ifdef MT
+#if defined(MT)
//TODO err check
peerd_start_threads(peer);
-#endif
-
-#ifdef ST_THREADS
- st_thread_t st = st_thread_create(peerd_loop, peer, 1, 0);
+#elif defined(ST_THREADS)
+ st_thread_t st = st_thread_create(init_peerd_loop, peer, 1, 0);
r = st_thread_join(st, NULL);
#else
- if (peer->custom_peerd_loop)
- r = peer->custom_peerd_loop(peer);
- else
- r = peerd_loop(peer);
+ r = init_peerd_loop(peer);
#endif
out:
if (pid_fd > 0)