#include <sys/time.h>
#include <signal.h>
-#define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
-#define LOG(level, ...) \
- do { \
- if (level <= verbose) { \
- fprintf(stderr, "%s: " REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
- } \
- }while (0)
-
-
unsigned int verbose = 0;
+struct log_ctx lc;
struct thread {
struct peerd *peer;
};
+inline static struct thread* alloc_thread(struct peerd *peer)
+{
+ xqindex idx = xq_pop_head(&peer->threads, 1);
+ if (idx == Noneidx)
+ return NULL;
+ return peer->thread + idx;
+}
+
+inline static void free_thread(struct peerd *peer, struct thread *t)
+{
+ xqindex idx = t - peer->thread;
+ xq_append_head(&peer->threads, idx, 1);
+}
+
+
+inline static void __wake_up_thread(struct thread *t)
+{
+ pthread_mutex_lock(&t->lock);
+ pthread_cond_signal(&t->cond);
+ pthread_mutex_unlock(&t->lock);
+}
+
+inline static void wake_up_thread(struct thread* t)
+{
+ if (t){
+ __wake_up_thread(t);
+ }
+}
+
+inline static int wake_up_next_thread(struct peerd *peer)
+{
+ //struct thread *t = alloc_thread(peer);
+ //wake_up_thread(t);
+ //return t;
+ return (xseg_signal(peer->xseg, peer->portno_start));
+}
+
inline int canDefer(struct peerd *peer)
{
return !(peer->defer_portno == NoPort);
req_target = xseg_get_target(xseg, pr->req);
req_data = xseg_get_data(xseg, pr->req);
/* null terminate name in case of req->target is less than 63 characters,
- * * and next character after name (aka first byte of next buffer) is not
- * * null
- * */
+ * and next character after name (aka first byte of next buffer) is not
+ * null
+ */
unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
if (verbose) {
strncpy(target, req_target, end);
xq_append_head(&peer->free_reqs, idx, 1);
}
-inline static struct thread* alloc_thread(struct peerd *peer)
-{
- xqindex idx = xq_pop_head(&peer->threads, 1);
- if (idx == Noneidx)
- return NULL;
- return peer->thread + idx;
-}
-
-inline static void free_thread(struct peerd *peer, struct thread *t)
-{
- xqindex idx = t - peer->thread;
- xq_append_head(&peer->threads, idx, 1);
-}
-
-
-inline static void __wake_up_thread(struct thread *t)
-{
- pthread_mutex_lock(&t->lock);
- pthread_cond_signal(&t->cond);
- pthread_mutex_unlock(&t->lock);
-}
-
-inline static void wake_up_thread(struct thread* t)
-{
- if (t){
- __wake_up_thread(t);
- }
-}
-
-inline static int wake_up_next_thread(struct peerd *peer)
-{
- //struct thread *t = alloc_thread(peer);
- //wake_up_thread(t);
- //return t;
- return (xseg_signal(peer->xseg, peer->portno));
-}
-
struct timeval resp_start, resp_end, resp_accum = {0, 0};
uint64_t responds = 0;
void get_responds_stats(){
printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
//(unsigned int)(t - peer->thread),
- resp_accum.tv_sec, resp_accum.tv_usec, responds);
+ resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
}
//FIXME error check
{
struct xseg_request *req = pr->req;
uint32_t p;
- LOG(5, "failing req %u\n", (unsigned int) (pr - peer->peer_reqs));
+ XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
req->state |= XS_FAILED;
//xseg_set_req_data(peer->xseg, pr->req, NULL);
- p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
xseg_signal(peer->xseg, p);
free_peer_req(peer, pr);
wake_up_next_thread(peer);
req->state |= XS_SERVED;
//xseg_set_req_data(peer->xseg, pr->req, NULL);
//gettimeofday(&resp_start, NULL);
- p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
//gettimeofday(&resp_end, NULL);
//responds++;
//timersub(&resp_end, &resp_start, &resp_end);
wake_up_next_thread(peer);
}
-void pending(struct peerd *peer, struct peer_req *pr)
+static void handle_accepted(struct peerd *peer, struct peer_req *pr,
+ struct xseg_request *req)
{
- pr->req->state = XS_PENDING;
-}
-
-static void handle_accepted(struct peerd *peer, struct peer_req *pr)
-{
- struct xseg_request *req = pr->req;
- LOG(4, "Handle accepted \n");
- req->serviced = 0;
- req->state = XS_ACCEPTED;
+ struct xseg_request *xreq = pr->req;
+ //assert xreq == req;
+ XSEGLOG2(&lc, D, "Handle accepted");
+ xreq->serviced = 0;
+ //xreq->state = XS_ACCEPTED;
pr->retval = 0;
- dispatch(peer, pr);
+ dispatch(peer, pr, req, dispatch_accept);
}
-static void handle_received(struct peerd *peer, struct peer_req *pr)
+static void handle_received(struct peerd *peer, struct peer_req *pr,
+ struct xseg_request *req)
{
//struct xseg_request *req = pr->req;
//assert req->state != XS_ACCEPTED;
- LOG(4, "Handle received \n");
- dispatch(peer, pr);
+ XSEGLOG2(&lc, D, "Handle received \n");
+ dispatch(peer, pr, req, dispatch_receive);
}
struct timeval sub_start, sub_end, sub_accum = {0, 0};
void get_submits_stats(){
printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
//(unsigned int)(t - peer->thread),
- sub_accum.tv_sec, sub_accum.tv_usec, submits);
+ sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
}
int submit_peer_req(struct peerd *peer, struct peer_req *pr)
struct xseg_request *req = pr->req;
// assert req->portno == peer->portno ?
//TODO small function with error checking
- LOG (5, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
+ XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
if (ret < 0)
return -1;
//printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
//gettimeofday(&sub_start, NULL);
- ret = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
//gettimeofday(&sub_end, NULL);
//submits++;
//timersub(&sub_end, &sub_start, &sub_end);
return -1;
}
+static int check_ports(struct peerd *peer)
+{
+ struct xseg *xseg = peer->xseg;
+ xport portno_start = peer->portno_start;
+ xport portno_end = peer->portno_end;
+ struct xseg_request *accepted, *received;
+ struct peer_req *pr;
+ xport i;
+ int r, c = 0;
+
+ for (i = portno_start; i <= portno_end; i++) {
+ accepted = NULL;
+ received = NULL;
+ pr = alloc_peer_req(peer);
+ if (pr) {
+ accepted = xseg_accept(xseg, i, X_NONBLOCK);
+ if (accepted) {
+ pr->req = accepted;
+ pr->portno = i;
+ xseg_cancel_wait(xseg, i);
+ handle_accepted(peer, pr, accepted);
+ c = 1;
+ }
+ else {
+ free_peer_req(peer, pr);
+ }
+ }
+ received = xseg_receive(xseg, i, X_NONBLOCK);
+ if (received) {
+ r = xseg_get_req_data(xseg, received, (void **) &pr);
+ if (r < 0 || !pr){
+ XSEGLOG2(&lc, W, "Received request with no pr data\n");
+ xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+ if (p == NoPort){
+ XSEGLOG2(&lc, W, "Could not respond stale request");
+ xseg_put_request(xseg, received, portno_start);
+ continue;
+ } else {
+ xseg_signal(xseg, p);
+ }
+ } else {
+ //maybe perform sanity check for pr
+ xseg_cancel_wait(xseg, i);
+ handle_received(peer, pr, received);
+ c = 1;
+ }
+ }
+ }
+
+ return c;
+}
+
static void* thread_loop(void *arg)
{
struct thread *t = (struct thread *) arg;
struct peerd *peer = t->peer;
struct xseg *xseg = peer->xseg;
- uint32_t portno = peer->portno;
- struct peer_req *pr;
- uint64_t threshold=1000;
+ xport portno_start = peer->portno_start;
+ xport portno_end = peer->portno_end;
pid_t pid =syscall(SYS_gettid);
uint64_t loops;
- struct xseg_request *accepted, *received;
- int r;
-
- printf("thread %u\n", (unsigned int) (t- peer->thread));
+ uint64_t threshold=1000/(1 + portno_end - portno_start);
+
+ XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
- LOG(0, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
- xseg_init_signal(xseg, portno);
+ XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
+ xseg_init_local_signal(xseg, peer->portno_start);
for (;;) {
if (t->func) {
- LOG(5, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
- xseg_cancel_wait(xseg, portno);
+ XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
+ xseg_cancel_wait(xseg, peer->portno_start);
t->func(t->arg);
t->func = NULL;
t->arg = NULL;
}
for(loops= threshold; loops > 0; loops--) {
- accepted = NULL;
- received = NULL;
if (loops == 1)
- xseg_prepare_wait(xseg, portno);
-
-// if (xq_count(&peer->xport->request_queue)){
- pr = alloc_peer_req(peer);
- if (pr) {
- accepted = xseg_accept(xseg, peer->portno);
- LOG(5, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
- if (accepted) {
- pr->req = accepted;
- xseg_cancel_wait(xseg, portno);
- wake_up_next_thread(peer);
- handle_accepted(peer, pr);
- loops = threshold;
- }
- else {
- free_peer_req(peer, pr);
- }
- }
-// }
-// if (xq_count(&peer->xport->reply_queue)){
- received = xseg_receive(xseg, peer->portno);
- if (received) {
- //printf("received req id: %u\n", received - xseg->requests);
- //print_req(peer->xseg, received);
- r = xseg_get_req_data(xseg, received, (void **) &pr);
- if (r < 0 || !pr){
- //FIXME what to do here ?
- LOG(0, "Received request with no pr data\n");
- xseg_respond(peer->xseg, received, peer->portno, X_ALLOC);
- }
- //fail(peer, received);
- //assert pr->req == received;
- xseg_cancel_wait(xseg, portno);
- wake_up_next_thread(peer);
- handle_received(peer, pr);
- loops = threshold;
- }
-// }
+ xseg_prepare_wait(xseg, peer->portno_start);
+ if (check_ports(peer))
+ loops = threshold;
}
- LOG(1, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
+ XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
xseg_wait_signal(xseg, 10000000UL);
- xseg_cancel_wait(xseg, portno);
- LOG(1, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
+ xseg_cancel_wait(xseg, peer->portno_start);
+ XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
}
return NULL;
}
return 0;
}
-static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno, uint32_t nr_threads, uint32_t defer_portno)
+static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
+ long portno_end, uint32_t nr_threads, uint32_t defer_portno)
{
int i;
struct peerd *peer;
+ struct xseg_port *port;
peer = malloc(sizeof(struct peerd));
if (!peer) {
perror("malloc");
if (!peer->xseg)
return NULL;
- peer->xport = xseg_bind_port(peer->xseg, portno);
- if (!peer->xport){
- printf("cannot bind to port %ld\n", portno);
+ peer->portno_start = (xport) portno_start;
+ peer->portno_end= (xport) portno_end;
+ port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
+ if (!port){
+ printf("cannot bind to port %ld\n", peer->portno_start);
return NULL;
}
- printf("%lx\n", (unsigned long) peer->xport);
- peer->portno = xseg_portno(peer->xseg, peer->xport);
- printf("Peer on port %u/%u\n", peer->portno,
- peer->xseg->config.nr_ports);
+
+ xport p;
+ for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
+ struct xseg_port *tmp;
+ tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, port));
+ if (!tmp){
+ printf("cannot bind to port %ld\n", p);
+ return NULL;
+ }
+ }
+
+ printf("Peer on ports %u-%u\n", peer->portno_start,
+ peer->portno_end);
for (i = 0; i < nr_ops; i++) {
peer->peer_reqs[i].peer = peer;
peer->peer_reqs[i].req = NULL;
peer->peer_reqs[i].retval = 0;
peer->peer_reqs[i].priv = NULL;
+ peer->peer_reqs[i].portno = NoPort;
}
peer->interactive_func = NULL;
- peerd_start_threads(peer);
return peer;
}
-int main(int argc, const char *argv[])
+int main(int argc, char *argv[])
{
struct peerd *peer = NULL;
//parse args
char *spec = "";
int i, r;
- long portno = -1;
+ long portno_start = -1, portno_end = -1, portno = -1;
//set defaults here
uint32_t nr_ops = 16;
uint32_t nr_threads = 16 ;
unsigned int debug_level = 0;
uint32_t defer_portno = NoPort;
-
+ char *logfile = NULL;
+
//capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
// -dp xseg_portno to defer blocking requests
- //maybe -l log file ?
+ // -l log file ?
//TODO print messages on arg parsing error
- LOG(5, "Main thread has tid %ld.\n", syscall(SYS_gettid));
for (i = 1; i < argc; i++) {
if (!strcmp(argv[i], "-g") && i + 1 < argc) {
continue;
}
+ if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
+ portno_start = strtoul(argv[i+1], NULL, 10);
+ i += 1;
+ continue;
+ }
+
+ if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
+ portno_end = strtoul(argv[i+1], NULL, 10);
+ i += 1;
+ continue;
+ }
+
if (!strcmp(argv[i], "-p") && i + 1 < argc) {
portno = strtoul(argv[i+1], NULL, 10);
i += 1;
i += 1;
continue;
}
+ if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
+ logfile = argv[i+1];
+ i += 1;
+ continue;
+ }
}
+ init_logctx(&lc, argv[0], debug_level, logfile);
+ XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
//TODO perform argument sanity checks
verbose = debug_level;
+ if (portno != -1) {
+ portno_start = portno;
+ portno_end = portno;
+ }
//TODO err check
- peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno);
+ peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
+ if (!peer)
+ return -1;
r = custom_peer_init(peer, argc, argv);
-// peerd_start_threads(peer);
+ if (r < 0)
+ return -1;
+ peerd_start_threads(peer);
return peerd_loop(peer);
}