#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
-
#ifdef MT
#include <pthread.h>
#endif
#include <peer.h>
#ifdef MT
+#ifdef ST_THREADS
+#error "MT and ST_THREADS defines are mutually exclusive"
+#endif
+#endif
+
+#ifdef MT
#define PEER_TYPE "pthread"
#else
#define PEER_TYPE "posix"
#ifdef MT
struct peerd *global_peer;
-
-struct thread {
- struct peerd *peer;
- pthread_t tid;
- pthread_cond_t cond;
- pthread_mutex_t lock;
- void (*func)(void *arg);
- void *arg;
-};
-
-
-inline static struct thread* alloc_thread(struct peerd *peer)
-{
- xqindex idx = xq_pop_head(&peer->threads, 1);
- if (idx == Noneidx)
- return NULL;
- return peer->thread + idx;
-}
-
-inline static void free_thread(struct peerd *peer, struct thread *t)
-{
- xqindex idx = t - peer->thread;
- xq_append_head(&peer->threads, idx, 1);
-}
-
-
-inline static void __wake_up_thread(struct thread *t)
-{
- pthread_mutex_lock(&t->lock);
- pthread_cond_signal(&t->cond);
- pthread_mutex_unlock(&t->lock);
-}
-
-inline static void wake_up_thread(struct thread* t)
-{
- if (t){
- __wake_up_thread(t);
- }
-}
+static pthread_key_t threadkey;
inline static int wake_up_next_thread(struct peerd *peer)
{
}
#endif
-
-static inline int isTerminate()
-{
-/* ta doesn't need to be taken into account, because the main loops
- * doesn't check the terminated flag if ta is not 0.
+/*
+ * extern is needed if this function is going to be called by another file
+ * such as bench-xseg.c
*/
- /*
-#ifdef ST_THREADS
- return (!ta & terminated);
-#else
- return terminated;
-#endif
- */
- return terminated;
-}
void signal_handler(int signal)
{
strncpy(data, req_data, 63);
data[63] = 0;
printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
- "src: %u, st: %u, dst: %u dt: %u\n"
+ "src: %u, transit: %u, dst: %u effective dst: %u\n"
"target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
(unsigned long)(req),
(unsigned int)req->op,
(unsigned long)req->serviced,
(unsigned int)req->state,
(unsigned int)req->src_portno,
- (unsigned int)req->src_transit_portno,
+ (unsigned int)req->transit_portno,
(unsigned int)req->dst_portno,
- (unsigned int)req->dst_transit_portno,
+ (unsigned int)req->effective_dst_portno,
(unsigned int)req->targetlen, target,
(unsigned long long)req->datalen, data);
}
}
}
+#ifdef MT
+inline struct peer_req *alloc_peer_req(struct peerd *peer, struct thread *t)
+{
+ struct peer_req *pr;
+ xqindex idx = xq_pop_head(&t->free_thread_reqs, t->thread_no);
+ if (idx != Noneidx)
+ goto out;
+
+ /* try to steal from another thread */
+ /*
+ int i;
+ struct thread *nt;
+ for (i = t->thread_no + 1; i < (t->thread_no + peer->nr_threads); i++) {
+ nt = &peer->thread[(t->thread_no + i) % peer->nr_threads];
+ if (!xq_count(&nt->free_thread_reqs))
+ continue;
+ idx = xq_pop_head(&nt->free_thread_reqs, t->thread_no);
+ if (idx != Noneidx)
+ goto out;
+ }
+ */
+ return NULL;
+out:
+ pr = peer->peer_reqs + idx;
+ pr->thread_no = t - peer->thread;
+ return pr;
+}
+#else
+/*
+ * free_reqs is a queue that simply contains pointer offsets to the peer_reqs
+ * queue. If a pointer from peer_reqs is popped, we are certain that the
+ * associated memory in peer_reqs is free to use
+ */
inline struct peer_req *alloc_peer_req(struct peerd *peer)
{
xqindex idx = xq_pop_head(&peer->free_reqs, 1);
return NULL;
return peer->peer_reqs + idx;
}
+#endif
inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
{
xqindex idx = pr - peer->peer_reqs;
pr->req = NULL;
+#ifdef MT
+ struct thread *t = &peer->thread[pr->thread_no];
+ xq_append_head(&t->free_thread_reqs, idx, 1);
+#else
xq_append_head(&peer->free_reqs, idx, 1);
+#endif
+}
+
+/*
+ * Count all free reqs in peer.
+ * Racy, if multithreaded, but the sum should monotonicly increase when checked
+ * after a termination signal is catched.
+ */
+int all_peer_reqs_free(struct peerd *peer)
+{
+ uint32_t free_reqs = 0;
+#ifdef MT
+ int i;
+ for (i = 0; i < peer->nr_threads; i++) {
+ free_reqs += xq_count(&peer->thread[i].free_thread_reqs);
+ }
+#else
+ free_reqs = xq_count(&peer->free_reqs);
+#endif
+ if (free_reqs == peer->nr_ops)
+ return 1;
+ return 0;
}
struct timeval resp_start, resp_end, resp_accum = {0, 0};
{
struct xseg_request *req = pr->req;
uint32_t p;
- XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
- req->state |= XS_FAILED;
- //xseg_set_req_data(peer->xseg, pr->req, NULL);
- p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
- xseg_signal(peer->xseg, p);
+ if (req){
+ XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
+ req->state |= XS_FAILED;
+ //xseg_set_req_data(peer->xseg, pr->req, NULL);
+ p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+ xseg_signal(peer->xseg, p);
+ }
free_peer_req(peer, pr);
#ifdef MT
wake_up_next_thread(peer);
{
struct xseg_request *req = pr->req;
uint32_t p;
- req->state |= XS_SERVED;
- //xseg_set_req_data(peer->xseg, pr->req, NULL);
- //gettimeofday(&resp_start, NULL);
- p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
- //gettimeofday(&resp_end, NULL);
- //responds++;
- //timersub(&resp_end, &resp_start, &resp_end);
- //timeradd(&resp_end, &resp_accum, &resp_accum);
- //printf("xseg_signal: %u\n", p);
- xseg_signal(peer->xseg, p);
+ if (req){
+ req->state |= XS_SERVED;
+ //xseg_set_req_data(peer->xseg, pr->req, NULL);
+ //gettimeofday(&resp_start, NULL);
+ p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+ //gettimeofday(&resp_end, NULL);
+ //responds++;
+ //timersub(&resp_end, &resp_start, &resp_end);
+ //timeradd(&resp_end, &resp_accum, &resp_accum);
+ //printf("xseg_signal: %u\n", p);
+ xseg_signal(peer->xseg, p);
+ }
free_peer_req(peer, pr);
#ifdef MT
wake_up_next_thread(peer);
#endif
}
-static void handle_accepted(struct peerd *peer, struct peer_req *pr,
+static void handle_accepted(struct peerd *peer, struct peer_req *pr,
struct xseg_request *req)
{
struct xseg_request *xreq = pr->req;
return 0;
}
-static int check_ports(struct peerd *peer)
+#ifdef MT
+int check_ports(struct peerd *peer, struct thread *t)
+#else
+int check_ports(struct peerd *peer)
+#endif
{
struct xseg *xseg = peer->xseg;
xport portno_start = peer->portno_start;
accepted = NULL;
received = NULL;
if (!isTerminate()) {
+#ifdef MT
+ pr = alloc_peer_req(peer, t);
+#else
pr = alloc_peer_req(peer);
+#endif
if (pr) {
accepted = xseg_accept(xseg, i, X_NONBLOCK);
if (accepted) {
}
#ifdef MT
-int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
-{
- struct thread *t = alloc_thread(peer);
- if (t) {
- t->func = func;
- t->arg = arg;
- wake_up_thread(t);
- return 0;
- } else
- // we could hijack a thread
- return -1;
-}
-
static void* thread_loop(void *arg)
{
struct thread *t = (struct thread *) arg;
uint64_t threshold=1000/(1 + portno_end - portno_start);
XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
-
XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
xseg_init_local_signal(xseg, peer->portno_start);
for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
- if (t->func) {
- XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
- xseg_cancel_wait(xseg, peer->portno_start);
- t->func(t->arg);
- t->func = NULL;
- t->arg = NULL;
- continue;
- }
-
- for(loops= threshold; loops > 0; loops--) {
+ for(loops = threshold; loops > 0; loops--) {
if (loops == 1)
xseg_prepare_wait(xseg, peer->portno_start);
- if (check_ports(peer))
+ if (check_ports(peer, t))
loops = threshold;
}
XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
return NULL;
}
+void *init_thread_loop(void *arg)
+{
+ struct thread *t = (struct thread *) arg;
+ struct peerd *peer = t->peer;
+ char *thread_id;
+ int i;
+
+ /*
+ * We need an identifier for every thread that will spin in peerd_loop.
+ * The following code is a way to create a string of this format:
+ * "Thread <num>"
+ * minus the null terminator. What we do is we create this string with
+ * snprintf and then resize it to exclude the null terminator with
+ * realloc. Finally, the result string is passed to the (void *arg) field
+ * of struct thread.
+ *
+ * Since the highest thread number can't be more than 5 digits, using 13
+ * chars should be more than enough.
+ */
+ thread_id = malloc(13 * sizeof(char));
+ snprintf(thread_id, 13, "Thread %ld", t - t->peer->thread);
+ for (i = 0; thread_id[i]; i++) {}
+ t->arg = (void *)realloc(thread_id, i-1);
+ pthread_setspecific(threadkey, t);
+
+ //Start thread loop
+ (void)peer->peerd_loop(t);
+
+ wake_up_next_thread(peer);
+ custom_peer_finalize(peer);
+
+ return NULL;
+}
+
int peerd_start_threads(struct peerd *peer)
{
int i;
uint32_t nr_threads = peer->nr_threads;
//TODO err check
for (i = 0; i < nr_threads; i++) {
+ peer->thread[i].thread_no = i;
peer->thread[i].peer = peer;
- pthread_cond_init(&peer->thread[i].cond,NULL);
- pthread_mutex_init(&peer->thread[i].lock, NULL);
- pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
- peer->thread[i].func = NULL;
- peer->thread[i].arg = NULL;
+ pthread_create(&peer->thread[i].tid, NULL,
+ init_thread_loop, (void *)(peer->thread + i));
+ }
+ if (peer->interactive_func)
+ peer->interactive_func();
+ for (i = 0; i < nr_threads; i++) {
+ pthread_join(peer->thread[i].tid, NULL);
}
+
return 0;
}
#endif
-
-void defer_request(struct peerd *peer, struct peer_req *pr)
+int defer_request(struct peerd *peer, struct peer_req *pr)
{
- // assert canDefer(peer);
-// xseg_submit(peer->xseg, peer->defer_portno, pr->req);
-// xseg_signal(peer->xseg, peer->defer_portno);
-// free_peer_req(peer, pr);
+ int r;
+ xport p;
+ if (!canDefer(peer)){
+ XSEGLOG2(&lc, E, "Peer cannot defer requests");
+ return -1;
+ }
+ p = xseg_forward(peer->xseg, pr->req, peer->defer_portno, pr->portno,
+ X_ALLOC);
+ if (p == NoPort){
+ XSEGLOG2(&lc, E, "Cannot defer request %lx", pr->req);
+ return -1;
+ }
+ r = xseg_signal(peer->xseg, p);
+ if (r < 0) {
+ XSEGLOG2(&lc, W, "Cannot signal port %lu", p);
+ }
+ free_peer_req(peer, pr);
+ return 0;
}
-static int peerd_loop(struct peerd *peer)
+/*
+ * generic_peerd_loop is a general-purpose port-checker loop that is
+ * suitable both for multi-threaded and single-threaded peers.
+ */
+static int generic_peerd_loop(void *arg)
{
#ifdef MT
- int i;
- if (peer->interactive_func)
- peer->interactive_func();
- for (i = 0; i < peer->nr_threads; i++) {
- pthread_join(peer->thread[i].tid, NULL);
- }
+ struct thread *t = (struct thread *) arg;
+ struct peerd *peer = t->peer;
+ char *id = t->arg;
#else
+ struct peerd *peer = (struct peerd *) arg;
+ char id[4] = {'P','e','e','r'};
+#endif
struct xseg *xseg = peer->xseg;
xport portno_start = peer->portno_start;
xport portno_end = peer->portno_end;
+ pid_t pid = syscall(SYS_gettid);
uint64_t threshold=1000/(1 + portno_end - portno_start);
- pid_t pid =syscall(SYS_gettid);
uint64_t loops;
- XSEGLOG2(&lc, I, "Peer has tid %u.\n", pid);
+ XSEGLOG2(&lc, I, "%s has tid %u.\n", id, pid);
xseg_init_local_signal(xseg, peer->portno_start);
- for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
- for(loops= threshold; loops > 0; loops--) {
+ //for (;!(isTerminate() && xq_count(&peer->free_reqs) == peer->nr_ops);) {
+ for (;!(isTerminate() && all_peer_reqs_free(peer));) {
+ //Heart of peerd_loop. This loop is common for everyone.
+ for(loops = threshold; loops > 0; loops--) {
if (loops == 1)
xseg_prepare_wait(xseg, peer->portno_start);
+#ifdef MT
+ if (check_ports(peer, t))
+#else
if (check_ports(peer))
+#endif
loops = threshold;
}
#ifdef ST_THREADS
if (ta){
st_sleep(0);
- } else {
-#endif
- XSEGLOG2(&lc, I, "Peer goes to sleep\n");
- xseg_wait_signal(xseg, 10000000UL);
- xseg_cancel_wait(xseg, peer->portno_start);
- XSEGLOG2(&lc, I, "Peer woke up\n");
-#ifdef ST_THREADS
+ continue;
}
#endif
+ XSEGLOG2(&lc, I, "%s goes to sleep\n", id);
+ xseg_wait_signal(xseg, 10000000UL);
+ xseg_cancel_wait(xseg, peer->portno_start);
+ XSEGLOG2(&lc, I, "%s woke up\n", id);
}
+ return 0;
+}
+
+static int init_peerd_loop(struct peerd *peer)
+{
+ struct xseg *xseg = peer->xseg;
+
+ peer->peerd_loop(peer);
custom_peer_finalize(peer);
xseg_quit_local_signal(xseg, peer->portno_start);
-#endif
+
return 0;
}
#ifdef ST_THREADS
static void * st_peerd_loop(void *peer)
{
- peerd_loop(peer);
- return 0;
+ struct peerd *peerd = peer;
+
+ return init_peerd_loop(peer);
}
#endif
}
static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
- long portno_end, uint32_t nr_threads, uint32_t defer_portno)
+ long portno_end, uint32_t nr_threads, xport defer_portno)
{
int i;
struct peerd *peer;
struct xseg_port *port;
+ void *sd = NULL;
+ xport p;
#ifdef ST_THREADS
st_init();
peer->thread = calloc(nr_threads, sizeof(struct thread));
if (!peer->thread)
goto malloc_fail;
+ if (!xq_alloc_empty(&peer->threads, nr_threads))
+ goto malloc_fail;
+ for (i = 0; i < nr_threads; i++) {
+ if (!xq_alloc_empty(&peer->thread[i].free_thread_reqs, nr_ops))
+ goto malloc_fail;
+ }
+ for (i = 0; i < nr_ops; i++) {
+ __xq_append_head(&peer->thread[i % nr_threads].free_thread_reqs, (xqindex)i);
+ }
+
+ pthread_key_create(&threadkey, NULL);
+#else
+ if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
+ goto malloc_fail;
#endif
peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
if (!peer->peer_reqs){
perror("malloc");
return NULL;
}
-
- if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
- goto malloc_fail;
-#ifdef MT
- if (!xq_alloc_empty(&peer->threads, nr_threads))
- goto malloc_fail;
-#endif
if (xseg_initialize()){
printf("cannot initialize library\n");
return NULL;
}
peer->xseg = join(spec);
- if (!peer->xseg)
+ if (!peer->xseg)
return NULL;
peer->portno_start = (xport) portno_start;
peer->portno_end= (xport) portno_end;
- port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
- if (!port){
- printf("cannot bind to port %u\n", (unsigned int) peer->portno_start);
- return NULL;
- }
- xport p;
- for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
- struct xseg_port *tmp;
- tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
- if (!tmp){
+ /*
+ * Start binding ports from portno_start to portno_end.
+ * The first port we bind will have its signal_desc initialized by xseg
+ * and the same signal_desc will be used for all the other ports.
+ */
+ for (p = peer->portno_start; p <= peer->portno_end; p++) {
+ port = xseg_bind_port(peer->xseg, p, sd);
+ if (!port){
printf("cannot bind to port %u\n", (unsigned int) p);
return NULL;
}
+ if (p == peer->portno_start)
+ sd = xseg_get_signal_desc(peer->xseg, port);
}
- printf("Peer on ports %u-%u\n", peer->portno_start,
- peer->portno_end);
+ printf("Peer on ports %u-%u\n", peer->portno_start, peer->portno_end);
for (i = 0; i < nr_ops; i++) {
peer->peer_reqs[i].peer = peer;
peer->peer_reqs[i].cond = st_cond_new(); //FIXME err check
#endif
}
+
+ //Plug default peerd_loop. This can change later on by custom_peer_init.
+ peer->peerd_loop = generic_peerd_loop;
+
#ifdef MT
peer->interactive_func = NULL;
#endif
int pidfile_open(char *path, pid_t *old_pid)
{
//nfs version > 3
- int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
+ int fd = open(path, O_CREAT|O_EXCL|O_WRONLY, S_IWUSR);
if (fd < 0){
if (errno == EEXIST)
pidfile_read(path, old_pid);
uint32_t nr_ops = 16;
uint32_t nr_threads = 1;
unsigned int debug_level = 0;
- uint32_t defer_portno = NoPort;
+ xport defer_portno = NoPort;
pid_t old_pid;
int pid_fd = -1;
#ifdef MT
READ_ARG_ULONG("-t", nr_threads);
#endif
-// READ_ARG_ULONG("-dp", defer_portno);
+ READ_ARG_ULONG("-dp", defer_portno);
READ_ARG_STRING("-l", logfile, MAX_LOGFILE_LEN);
READ_ARG_BOOL("-d", daemonize);
READ_ARG_BOOL("-h", help);
r = custom_peer_init(peer, argc, argv);
if (r < 0)
goto out;
-#ifdef MT
+#if defined(MT)
//TODO err check
peerd_start_threads(peer);
-#endif
-
-#ifdef ST_THREADS
+#elif defined(ST_THREADS)
st_thread_t st = st_thread_create(st_peerd_loop, peer, 1, 0);
r = st_thread_join(st, NULL);
#else
- r = peerd_loop(peer);
+ r = init_peerd_loop(peer);
#endif
out:
if (pid_fd > 0)