From cc21de66882be4b7295b3a0ea35c7a948a675095 Mon Sep 17 00:00:00 2001 From: Giannakos Filippos Date: Fri, 5 Oct 2012 19:25:24 +0300 Subject: [PATCH] added speer skeletor --- xseg/launch | 21 +- xseg/peers/kernel/xsegbd.c | 2 +- xseg/peers/user/Makefile | 7 +- xseg/peers/user/filed.c | 2 +- xseg/peers/user/mapperd.c | 2 +- xseg/peers/user/monitor.c | 8 +- xseg/peers/user/mpeer.c | 140 ++++++----- xseg/peers/user/mpeer.h | 7 +- xseg/peers/user/mt-mapperd.c | 64 ++--- xseg/peers/user/mt-vlmcd.c | 33 ++- xseg/peers/user/pfiled.c | 2 +- xseg/peers/user/speer.c | 540 ++++++++++++++++++++++++++++++++++++++++++ xseg/peers/user/vlmc-tool.c | 2 +- xseg/peers/user/xseg-tool.c | 22 +- xseg/sys/util.h | 5 +- xseg/tools/vlmc | 2 +- xseg/xseg/xseg.c | 19 +- xseg/xseg/xseg.h | 6 +- 18 files changed, 734 insertions(+), 150 deletions(-) create mode 100644 xseg/peers/user/speer.c diff --git a/xseg/launch b/xseg/launch index 1c0ebeb..77da522 100755 --- a/xseg/launch +++ b/xseg/launch @@ -26,9 +26,9 @@ function parse_config { [ -z "${HOSTNAME}" ] && HOSTNAME=`hostname` [ -z "${XSEG_HOME}" ] && XSEG_HOME="/root/archip/xseg" [ -z "${MODULES_DIR}" ] && MODULES_DIR="${XSEG_HOME}/lib/kernel" - [ -z "${SPEC}" ] && SPEC="segdev:xsegbd:16:1024:12" + [ -z "${SPEC}" ] && SPEC="segdev:xsegbd:512:1024:12" [ -z "${REQS}" ] && REQS=512 - [ -z "${PORTS}" ] && PORTS=16 + [ -z "${PORTS}" ] && PORTS=512 [ -z "${IMAGES}" ] && IMAGES="/home/user/archip/xseg/peers/user/foo" [ -z "${MAPS}" ] && MAPS="/home/user/archip/xseg/peers/user/foo" [ -z "${PITHOS}" ] && PITHOS="/srv/pithos/data/blocks" @@ -42,10 +42,11 @@ function parse_config { [ -z "${CHRDEV_MAJOR}" ] && CHRDEV_MAJOR=60 [ -z "${NR_OPS}" ] && NR_OPS=128 [ -z "${BPORT}" ] && BPORT=0 - [ -z "${VPORT}" ] && VPORT=2 + [ -z "${VPORT_START}" ] && VPORT_START=4 + [ -z "${VPORT_END}" ] && VPORT_END=200 [ -z "${MPORT}" ] && MPORT=1 - [ -z "${MBPORT}" ] && MBPORT=3 - [ -z "${VTOOL}" ] && VTOOL=15 + [ -z "${MBPORT}" ] && MBPORT=2 + [ -z "${VTOOL}" ] && VTOOL=3 } @@ -116,15 +117,17 @@ function spawn_pfiledm { } function spawn_vlmcd { - pgrep -f "peers/user/mt-vlmcd" || \ - "${XSEG_HOME}/peers/user/mt-vlmcd" -t 1 -p "$VPORT" -bp "$BPORT" -mp "$MPORT" -g \ + pgrep -f "peers/user/st-vlmcd" || \ + "${XSEG_HOME}/peers/user/st-vlmcd" -t 1 -sp "$VPORT_START" \ + -ep "$VPORT_END" -bp "$BPORT" -mp "$MPORT" -g \ "${SPEC}" -n ${NR_OPS} &> "${XSEG_LOGS}/vlmcd-${HOSTNAME}" & # alloc_requests "$VPORT:0" 128 } function spawn_mapperdc { pgrep -f "mt-mapperd" || \ - "${XSEG_HOME}/peers/user/mt-mapperd" -t 1 -p "$MPORT" -bp "$BPORT" -mbp "$MBPORT" -g "${SPEC}"\ + "${XSEG_HOME}/peers/user/mt-mapperd" -t 1 -sp "$MPORT" -ep "$MPORT"\ + -bp "$BPORT" -mbp "$MBPORT" -g "${SPEC}"\ -n ${NR_OPS} &> "${XSEG_LOGS}/mapperd-${HOSTNAME}" & # alloc_requests "$MPORT:0" 128 } @@ -291,7 +294,7 @@ doublemap) spawn_vlmcd ;; stop) - pkill -f peers/user/mt-vlmcd || true + pkill -f peers/user/st-vlmcd || true # free_requests "$VPORT:0" 128 || true pkill -f peers/user/mt-mapperd || true # free_requests "$MPORT:0" 128 || true diff --git a/xseg/peers/kernel/xsegbd.c b/xseg/peers/kernel/xsegbd.c index 3a1479c..0e7feb9 100644 --- a/xseg/peers/kernel/xsegbd.c +++ b/xseg/peers/kernel/xsegbd.c @@ -673,7 +673,7 @@ static void xseg_callback(xport portno) for (;;) { xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno); - xreq = xseg_receive(xsegbd_dev->xseg, portno); + xreq = xseg_receive(xsegbd_dev->xseg, portno, 0); if (!xreq) break; diff --git a/xseg/peers/user/Makefile b/xseg/peers/user/Makefile index 4d65688..e556e2d 100644 --- a/xseg/peers/user/Makefile +++ b/xseg/peers/user/Makefile @@ -6,7 +6,7 @@ default: all #all: filed xseg sosd vlmcd mapperd #all: filed xseg vlmcd mapperd -all: filed xseg mt-sosd dummy mt-vlmcd mapperd mt-mapperd pfiled monitor vlmc-xseg +all: filed xseg mt-sosd dummy mt-vlmcd mapperd mt-mapperd pfiled monitor vlmc-xseg st-vlmcd filed: filed.c $(BASE)/xseg/xseg.h @@ -36,6 +36,9 @@ monitor: monitor.c mpeer.c mpeer.h mt-vlmcd: mt-vlmcd.c mpeer.c mpeer.h $(BASE)/xseg/protocol.h $(CC) $(CFLAGS) -o $@ $< mpeer.c $(INC) -L$(LIB) -lxseg -lpthread +st-vlmcd: mt-vlmcd.c speer.c mpeer.h $(BASE)/xseg/protocol.h + $(CC) $(CFLAGS) -o $@ $< speer.c $(INC) -L$(LIB) -lxseg -lpthread + mt-mapperd: mt-mapperd.c mpeer.c mpeer.h $(BASE)/xseg/protocol.h $(CC) $(CFLAGS) -o $@ $< mpeer.c $(INC) -L$(LIB) -lxseg -lpthread -lgcrypt @@ -46,4 +49,4 @@ vlmc-xseg: vlmc-tool.c $(BASE)/xseg/xseg.h $(CC) $(CFLAGS) -o $@ $< $(INC) -L$(LIB) -lxseg clean: - rm -f filed xseg sosd vlmcd mapperd mt-sosd dummy monitor mt-mapperd pfiled vlmc-xseg + rm -f filed xseg sosd vlmcd mapperd mt-sosd dummy monitor mt-mapperd pfiled vlmc-xseg st-vlmcd diff --git a/xseg/peers/user/filed.c b/xseg/peers/user/filed.c index 3db4dae..b9666d4 100644 --- a/xseg/peers/user/filed.c +++ b/xseg/peers/user/filed.c @@ -562,7 +562,7 @@ void *io_loop(void *arg) for (;;) { accepted = NULL; - accepted = xseg_accept(xseg, portno); + accepted = xseg_accept(xseg, portno, 0); if (accepted) { io->req = accepted; wake_up_next_iothread(store); diff --git a/xseg/peers/user/mapperd.c b/xseg/peers/user/mapperd.c index e74f16b..a58e838 100644 --- a/xseg/peers/user/mapperd.c +++ b/xseg/peers/user/mapperd.c @@ -157,7 +157,7 @@ static int mapperd_loop(struct mapperd *mapperd) ret = xseg_prepare_wait(xseg, mportno); always_assert(ret == 0); - xreq = xseg_accept(xseg, mportno); + xreq = xseg_accept(xseg, mportno, 0); if (xreq) { xseg_cancel_wait(xseg, mportno); /* diff --git a/xseg/peers/user/monitor.c b/xseg/peers/user/monitor.c index 70cc8b4..282ff30 100644 --- a/xseg/peers/user/monitor.c +++ b/xseg/peers/user/monitor.c @@ -64,19 +64,19 @@ int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *xreq) int mpause(struct peerd *peer) { struct xseg *xseg = peer->xseg; - struct xseg_port *port = xseg_get_port(xseg, peer->portno); + struct xseg_port *port = xseg_get_port(xseg, peer->portno_start); if (!port) return -1; - xlock_acquire(&port->rq_lock, peer->portno); - xlock_acquire(&port->pq_lock, peer->portno); + xlock_acquire(&port->rq_lock, peer->portno_start); + xlock_acquire(&port->pq_lock, peer->portno_start); return 0; } int munpause(struct peerd *peer) { struct xseg *xseg = peer->xseg; - struct xseg_port *port = xseg_get_port(xseg, peer->portno); + struct xseg_port *port = xseg_get_port(xseg, peer->portno_start); if (!port) return -1; diff --git a/xseg/peers/user/mpeer.c b/xseg/peers/user/mpeer.c index 830c073..b0905c5 100644 --- a/xseg/peers/user/mpeer.c +++ b/xseg/peers/user/mpeer.c @@ -149,7 +149,7 @@ inline static int wake_up_next_thread(struct peerd *peer) //struct thread *t = alloc_thread(peer); //wake_up_thread(t); //return t; - return (xseg_signal(peer->xseg, peer->portno)); + return (xseg_signal(peer->xseg, peer->portno_start)); } struct timeval resp_start, resp_end, resp_accum = {0, 0}; @@ -168,7 +168,7 @@ void fail(struct peerd *peer, struct peer_req *pr) XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs)); req->state |= XS_FAILED; //xseg_set_req_data(peer->xseg, pr->req, NULL); - p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC); + p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); xseg_signal(peer->xseg, p); free_peer_req(peer, pr); wake_up_next_thread(peer); @@ -182,7 +182,7 @@ void complete(struct peerd *peer, struct peer_req *pr) req->state |= XS_SERVED; //xseg_set_req_data(peer->xseg, pr->req, NULL); //gettimeofday(&resp_start, NULL); - p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC); + p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); //gettimeofday(&resp_end, NULL); //responds++; //timersub(&resp_end, &resp_start, &resp_end); @@ -239,7 +239,7 @@ int submit_peer_req(struct peerd *peer, struct peer_req *pr) return -1; //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req)); //gettimeofday(&sub_start, NULL); - ret = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC); + ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); //gettimeofday(&sub_end, NULL); //submits++; //timersub(&sub_end, &sub_start, &sub_end); @@ -268,22 +268,25 @@ static void* thread_loop(void *arg) struct thread *t = (struct thread *) arg; struct peerd *peer = t->peer; struct xseg *xseg = peer->xseg; - uint32_t portno = peer->portno; + xport portno_start = peer->portno_start; + xport portno_end = peer->portno_end; struct peer_req *pr; - uint64_t threshold=1000; + uint64_t threshold=1; pid_t pid =syscall(SYS_gettid); uint64_t loops; struct xseg_request *accepted, *received; int r; + int change; + xport i; XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread)); XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid); - xseg_init_local_signal(xseg, portno); + xseg_init_local_signal(xseg, peer->portno_start); for (;;) { if (t->func) { XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread)); - xseg_cancel_wait(xseg, portno); + xseg_cancel_wait(xseg, peer->portno_start); t->func(t->arg); t->func = NULL; t->arg = NULL; @@ -291,52 +294,53 @@ static void* thread_loop(void *arg) } for(loops= threshold; loops > 0; loops--) { - accepted = NULL; - received = NULL; - if (loops == 1) - xseg_prepare_wait(xseg, portno); - -// if (xq_count(&peer->xport->request_queue)){ - pr = alloc_peer_req(peer); - if (pr) { - accepted = xseg_accept(xseg, peer->portno); - if (accepted) { - XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread)); - pr->req = accepted; - xseg_cancel_wait(xseg, portno); - wake_up_next_thread(peer); - handle_accepted(peer, pr, accepted); - loops = threshold; + do { + if (loops == 1) + xseg_prepare_wait(xseg, peer->portno_start); + change = 0; + for (i = portno_start; i <= portno_end; i++) { + accepted = NULL; + received = NULL; + pr = alloc_peer_req(peer); + if (pr) { + accepted = xseg_accept(xseg, i, X_NONBLOCK); + if (accepted) { + XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread)); + pr->req = accepted; + pr->portno = i; + xseg_cancel_wait(xseg, i); + wake_up_next_thread(peer); + handle_accepted(peer, pr, accepted); + change = 1; + } + else { + free_peer_req(peer, pr); + } } - else { - free_peer_req(peer, pr); + received = xseg_receive(xseg, i, X_NONBLOCK); + if (received) { + //printf("received req id: %u\n", received - xseg->requests); + //print_req(peer->xseg, received); + r = xseg_get_req_data(xseg, received, (void **) &pr); + if (r < 0 || !pr){ + //FIXME what to do here ? + XSEGLOG2(&lc, W, "Received request with no pr data\n"); + xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC); + //FIXME if fails, put req + } + xseg_cancel_wait(xseg, i); + wake_up_next_thread(peer); + handle_received(peer, pr, received); + change = 1; } } -// } -// if (xq_count(&peer->xport->reply_queue)){ - received = xseg_receive(xseg, peer->portno); - if (received) { - //printf("received req id: %u\n", received - xseg->requests); - //print_req(peer->xseg, received); - r = xseg_get_req_data(xseg, received, (void **) &pr); - if (r < 0 || !pr){ - //FIXME what to do here ? - XSEGLOG2(&lc, W, "Received request with no pr data\n"); - xseg_respond(peer->xseg, received, peer->portno, X_ALLOC); - //if fails, put req - } - //fail(peer, received); - //assert pr->req == received; - xseg_cancel_wait(xseg, portno); - wake_up_next_thread(peer); - handle_received(peer, pr, received); + if (change) loops = threshold; - } -// } + }while (change); } XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread)); xseg_wait_signal(xseg, 10000000UL); - xseg_cancel_wait(xseg, portno); + xseg_cancel_wait(xseg, peer->portno_start); XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread)); } return NULL; @@ -391,7 +395,8 @@ int peerd_start_threads(struct peerd *peer) return 0; } -static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno, uint32_t nr_threads, uint32_t defer_portno) +static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, + long portno_end, uint32_t nr_threads, uint32_t defer_portno) { int i; struct peerd *peer; @@ -427,15 +432,26 @@ malloc_fail: if (!peer->xseg) return NULL; - peer->xport = xseg_bind_port(peer->xseg, portno, NULL); - if (!peer->xport){ - printf("cannot bind to port %ld\n", portno); + peer->portno_start = (xport) portno_start; + peer->portno_end= (xport) portno_end; + peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL); + if (!peer->port){ + printf("cannot bind to port %ld\n", peer->portno_start); return NULL; } - printf("%lx\n", (unsigned long) peer->xport); - peer->portno = xseg_portno(peer->xseg, peer->xport); - printf("Peer on port %u/%u\n", peer->portno, - peer->xseg->config.nr_ports); + + xport p; + for (p = peer->portno_start + 1; p <= peer->portno_end; p++) { + struct xseg_port *tmp; + tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port)); + if (!tmp){ + printf("cannot bind to port %ld\n", p); + return NULL; + } + } + + printf("Peer on ports %u-%u\n", peer->portno_start, + peer->portno_end); for (i = 0; i < nr_ops; i++) { peer->peer_reqs[i].peer = peer; @@ -454,7 +470,7 @@ int main(int argc, char *argv[]) //parse args char *spec = ""; int i, r; - long portno = -1; + long portno_start = -1, portno_end = -1; //set defaults here uint32_t nr_ops = 16; uint32_t nr_threads = 16 ; @@ -474,8 +490,14 @@ int main(int argc, char *argv[]) continue; } - if (!strcmp(argv[i], "-p") && i + 1 < argc) { - portno = strtoul(argv[i+1], NULL, 10); + if (!strcmp(argv[i], "-sp") && i + 1 < argc) { + portno_start = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + + if (!strcmp(argv[i], "-ep") && i + 1 < argc) { + portno_end = strtoul(argv[i+1], NULL, 10); i += 1; continue; } @@ -514,7 +536,7 @@ int main(int argc, char *argv[]) verbose = debug_level; //TODO err check - peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno); + peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno); if (!peer) return -1; r = custom_peer_init(peer, argc, argv); diff --git a/xseg/peers/user/mpeer.h b/xseg/peers/user/mpeer.h index 6226303..da4f6ed 100644 --- a/xseg/peers/user/mpeer.h +++ b/xseg/peers/user/mpeer.h @@ -1,16 +1,19 @@ #include +#include /* main mpeer structs */ struct peer_req { struct peerd *peer; struct xseg_request *req; ssize_t retval; + xport portno; void *priv; }; struct peerd { struct xseg *xseg; - struct xseg_port *xport; - uint32_t portno; + struct xseg_port *port; + xport portno_start; + xport portno_end; long nr_ops; uint32_t nr_threads; uint32_t defer_portno; diff --git a/xseg/peers/user/mt-mapperd.c b/xseg/peers/user/mt-mapperd.c index f51719d..289f8cd 100644 --- a/xseg/peers/user/mt-mapperd.c +++ b/xseg/peers/user/mt-mapperd.c @@ -245,7 +245,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, goto out_hash; - req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC); + req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC); if (!req){ XSEGLOG2(&lc, E, "Cannot allocate request for map %s", m->volume); @@ -272,7 +272,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, m->volume); goto out_put; } - p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC); + p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); if (p == NoPort){ XSEGLOG2(&lc, E, "Cannot submit request for map %s", m->volume); @@ -286,7 +286,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, out_unset: xseg_get_req_data(peer->xseg, req, &dummy); out_put: - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); out_fail: remove_map(mapper, m); @@ -423,7 +423,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr, { void *dummy; struct mapperd *mapper = __get_mapperd(peer); - struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, + struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC); if (!req){ XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t" @@ -453,7 +453,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr, mn->object, map->volume, (unsigned long long) mn->objectidx); goto out_put; } - xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC); + xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); if (p == NoPort){ XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t" "(Map: %s [%llu]", @@ -473,7 +473,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr, out_unset: xseg_get_req_data(peer->xseg, req, &dummy); out_put: - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); out_err: XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t" "(Map: %s [%llu]", @@ -487,7 +487,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map) struct mapperd *mapper = __get_mapperd(peer); struct map_node *mn; uint64_t i, pos, max_objidx = calc_map_obj(map); - struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, + struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC); if (!req){ XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume); @@ -526,7 +526,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map) map->volume); goto out_put; } - xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC); + xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); if (p == NoPort){ XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume); @@ -543,7 +543,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map) out_unset: xseg_get_req_data(peer->xseg, req, &dummy); out_put: - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); out_err: XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume); return -1; @@ -684,7 +684,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) goto copyup_zeroblock; - struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, + struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, mapper->bportno, X_ALLOC); if (!req) goto out_err; @@ -707,7 +707,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re if (r<0) goto out_put; r = __set_copyup_node(mio, req, mn); - p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC); + p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); if (p == NoPort) { goto out_unset; } @@ -722,7 +722,7 @@ out_unset: r = __set_copyup_node(mio, req, NULL); xseg_get_req_data(peer->xseg, req, &dummy); out_put: - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); out_err: XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target); return -1; @@ -776,7 +776,7 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr, if (r < 0) goto out_fail; - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); map->flags &= ~MF_MAP_LOADING; XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume); uint64_t qsize = xq_count(&map->pending); @@ -789,7 +789,7 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr, out_fail: XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume); - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); map->flags &= ~MF_MAP_LOADING; while((idx = __xq_pop_head(&map->pending)) != Noneidx){ struct peer_req *preq = (struct peer_req *) idx; @@ -804,7 +804,7 @@ out_err: strncpy(buf, target, req->targetlen); buf[req->targetlen] = 0; XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf); - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); return -1; } @@ -828,7 +828,7 @@ static int handle_mapwrite(struct peerd *peer, struct peer_req *pr, goto out_fail; } - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); map->flags &= ~MF_MAP_WRITING; XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume); uint64_t qsize = xq_count(&map->pending); @@ -842,7 +842,7 @@ static int handle_mapwrite(struct peerd *peer, struct peer_req *pr, out_fail: XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume); - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); map->flags &= ~MF_MAP_WRITING; while((idx = __xq_pop_head(&map->pending)) != Noneidx){ struct peer_req *preq = (struct peer_req *) idx; @@ -857,7 +857,7 @@ out_err: strncpy(buf, target, req->targetlen); buf[req->targetlen] = 0; XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf); - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); return -1; } @@ -1245,12 +1245,12 @@ static int handle_copyup(struct peerd *peer, struct peer_req *pr, goto out_fail; } mn->flags |= MF_OBJECT_WRITING; - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object); return 0; out_fail: - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); __set_copyup_node(mio, req, NULL); while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){ struct peer_req * preq = (struct peer_req *) idx; @@ -1297,7 +1297,7 @@ static int handle_objectwrite(struct peerd *peer, struct peer_req *pr, strncpy(mn->object, tmp.object, tmp.objectlen); mn->object[tmp.objectlen] = 0; mn->objectlen = tmp.objectlen; - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object); uint64_t qsize = xq_count(&mn->pending); @@ -1310,7 +1310,7 @@ static int handle_objectwrite(struct peerd *peer, struct peer_req *pr, out_fail: XSEGLOG2(&lc, E, "Write of object %s failed", mn->object); - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); while((idx = __xq_pop_head(&mn->pending)) != Noneidx){ struct peer_req *preq = (struct peer_req *) idx; fail(peer, preq); @@ -1319,7 +1319,7 @@ out_fail: out_err: XSEGLOG2(&lc, E, "Cannot find map node. Failure!"); - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); return -1; } @@ -1455,7 +1455,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr, return MF_PENDING; } - struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, + struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, mapper->bportno, X_ALLOC); if (!req) goto out_err; @@ -1472,7 +1472,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr, if (r < 0) goto out_put; __set_copyup_node(mio, req, mn); - xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC); + xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); if (p == NoPort) goto out_unset; r = xseg_signal(peer->xseg, p); @@ -1483,7 +1483,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr, out_unset: xseg_get_req_data(peer->xseg, req, &dummy); out_put: - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); out_err: XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object); return -1; @@ -1595,7 +1595,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr, void *dummy; struct mapperd *mapper = __get_mapperd(peer); struct mapper_io *mio = __get_mapper_io(pr); - struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, + struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC); if (!req) goto out_err; @@ -1612,7 +1612,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr, if (r < 0) goto out_put; __set_copyup_node(mio, req, NULL); - xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC); + xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); if (p == NoPort) goto out_unset; r = xseg_signal(peer->xseg, p); @@ -1623,7 +1623,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr, out_unset: xseg_get_req_data(peer->xseg, req, &dummy); out_put: - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); out_err: XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume); return -1; @@ -1695,7 +1695,7 @@ static int handle_delete(struct peerd *peer, struct peer_req *pr, //map block delete map = find_map(mapper, target, req->targetlen); if (!map) { - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); return -1; } handle_map_delete(peer, pr, map, err); @@ -1703,12 +1703,12 @@ static int handle_delete(struct peerd *peer, struct peer_req *pr, //object delete map = mn->map; if (!map) { - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); return -1; } handle_object_delete(peer, pr, mn, err); } - xseg_put_request(peer->xseg, req, peer->portno); + xseg_put_request(peer->xseg, req, pr->portno); return 0; } diff --git a/xseg/peers/user/mt-vlmcd.c b/xseg/peers/user/mt-vlmcd.c index 4c32209..d63947a 100644 --- a/xseg/peers/user/mt-vlmcd.c +++ b/xseg/peers/user/mt-vlmcd.c @@ -74,7 +74,7 @@ static int handle_accepted(struct peerd *peer, struct peer_req *pr, return 0; } vio->err = 0; //reset error state - vio->mreq = xseg_get_request(peer->xseg, peer->portno, + vio->mreq = xseg_get_request(peer->xseg, pr->portno, vlmc->mportno, X_ALLOC); if (!vio->mreq) goto out_err; @@ -104,7 +104,7 @@ static int handle_accepted(struct peerd *peer, struct peer_req *pr, } xseg_set_req_data(peer->xseg, vio->mreq, pr); __set_vio_state(vio, MAPPING); - p = xseg_submit(peer->xseg, vio->mreq, peer->portno, X_ALLOC); + p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC); if (p == NoPort) goto out_unset; r = xseg_signal(peer->xseg, p); @@ -118,7 +118,7 @@ static int handle_accepted(struct peerd *peer, struct peer_req *pr, out_unset: xseg_get_req_data(peer->xseg, vio->mreq, &dummy); out_put: - xseg_put_request(peer->xseg, vio->mreq, peer->portno); + xseg_put_request(peer->xseg, vio->mreq, pr->portno); out_err: __set_vio_state(vio, CONCLUDED); fail(peer, pr); @@ -146,7 +146,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr, /* FIXME shouldn's XS_FAILED be sufficient ?? */ if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){ fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op); - xseg_put_request(peer->xseg, vio->mreq, peer->portno); + xseg_put_request(peer->xseg, vio->mreq, pr->portno); vio->mreq = NULL; __set_vio_state(vio, CONCLUDED); fail(peer, pr); @@ -154,13 +154,12 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr, struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq); char *data = xseg_get_data(peer->xseg, pr->req); *(off_t *)data = xinfo->size; - xseg_put_request(peer->xseg, vio->mreq, peer->portno); + xseg_put_request(peer->xseg, vio->mreq, pr->portno); vio->mreq = NULL; __set_vio_state(vio, CONCLUDED); complete(peer, pr); } else if (vio->mreq->op == X_CLOSE) { - struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq); - xseg_put_request(peer->xseg, vio->mreq, peer->portno); + xseg_put_request(peer->xseg, vio->mreq, pr->portno); vio->mreq = NULL; __set_vio_state(vio, CONCLUDED); complete(peer, pr); @@ -168,7 +167,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr, struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq); if (!mreply->cnt){ printf("foo2\n"); - xseg_put_request(peer->xseg, vio->mreq, peer->portno); + xseg_put_request(peer->xseg, vio->mreq, pr->portno); vio->mreq = NULL; __set_vio_state(vio, CONCLUDED); fail(peer, pr); @@ -178,7 +177,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr, vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *)); if (!vio->breqs) { printf("foo3\n"); - xseg_put_request(peer->xseg, vio->mreq, peer->portno); + xseg_put_request(peer->xseg, vio->mreq, pr->portno); vio->mreq = NULL; __set_vio_state(vio, CONCLUDED); fail(peer, pr); @@ -190,7 +189,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr, datalen = mreply->segs[i].size; offset = mreply->segs[i].offset; targetlen = mreply->segs[i].targetlen; - breq = xseg_get_request(peer->xseg, peer->portno, vlmc->bportno, X_ALLOC); + breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC); if (!breq) { vio->err = 1; break; @@ -198,7 +197,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr, r = xseg_prep_request(peer->xseg, breq, targetlen, datalen); if (r < 0) { vio->err = 1; - xseg_put_request(peer->xseg, breq, peer->portno); + xseg_put_request(peer->xseg, breq, pr->portno); break; } breq->offset = offset; @@ -207,26 +206,26 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr, target = xseg_get_target(peer->xseg, breq); if (!target) { vio->err = 1; - xseg_put_request(peer->xseg, breq, peer->portno); + xseg_put_request(peer->xseg, breq, pr->portno); break; } strncpy(target, mreply->segs[i].target, targetlen); r = xseg_set_req_data(peer->xseg, breq, pr); if (r<0) { vio->err = 1; - xseg_put_request(peer->xseg, breq, peer->portno); + xseg_put_request(peer->xseg, breq, pr->portno); break; } // this should work, right ? breq->data = pr->req->data + pos; pos += datalen; - p = xseg_submit(peer->xseg, breq, peer->portno, X_ALLOC); + p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC); if (p == NoPort){ void *dummy; vio->err = 1; xseg_get_req_data(peer->xseg, breq, &dummy); - xseg_put_request(peer->xseg, breq, peer->portno); + xseg_put_request(peer->xseg, breq, pr->portno); break; } r = xseg_signal(peer->xseg, p); @@ -236,7 +235,7 @@ static int handle_mapping(struct peerd *peer, struct peer_req *pr, vio->breqs[i] = breq; } vio->breq_cnt = i; - xseg_put_request(peer->xseg, vio->mreq, peer->portno); + xseg_put_request(peer->xseg, vio->mreq, pr->portno); vio->mreq = NULL; if (i == 0) { printf("foo4\n"); @@ -269,7 +268,7 @@ static int handle_serving(struct peerd *peer, struct peer_req *pr, //assert breq->serviced == breq->size __sync_fetch_and_add(&pr->req->serviced, breq->serviced); } - xseg_put_request(peer->xseg, breq, peer->portno); + xseg_put_request(peer->xseg, breq, pr->portno); if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) { __set_vio_state(vio, CONCLUDED); diff --git a/xseg/peers/user/pfiled.c b/xseg/peers/user/pfiled.c index c80599d..51a9d38 100644 --- a/xseg/peers/user/pfiled.c +++ b/xseg/peers/user/pfiled.c @@ -618,7 +618,7 @@ void *io_loop(void *arg) for (;;) { accepted = NULL; - accepted = xseg_accept(xseg, portno); + accepted = xseg_accept(xseg, portno, 0); if (accepted) { io->req = accepted; wake_up_next_iothread(pfiled); diff --git a/xseg/peers/user/speer.c b/xseg/peers/user/speer.c new file mode 100644 index 0000000..c84b4e9 --- /dev/null +++ b/xseg/peers/user/speer.c @@ -0,0 +1,540 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__ +#define LOG(level, ...) \ + do { \ + if (level <= verbose) { \ + fprintf(stderr, "%s: " REARRANGE( __func__ , ## __VA_ARGS__, "" )); \ + } \ + }while (0) + + +unsigned int verbose = 0; +struct log_ctx lc; + +struct thread { + struct peerd *peer; + pthread_t tid; + pthread_cond_t cond; + pthread_mutex_t lock; + void (*func)(void *arg); + void *arg; +}; + + +inline int canDefer(struct peerd *peer) +{ + return !(peer->defer_portno == NoPort); +} + +void print_req(struct xseg *xseg, struct xseg_request *req) +{ + char target[64], data[64]; + char *req_target, *req_data; + unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen; + req_target = xseg_get_target(xseg, req); + req_data = xseg_get_data(xseg, req); + + if (1) { + strncpy(target, req_target, end); + target[end] = 0; + strncpy(data, req_data, 63); + data[63] = 0; + printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n" + "src: %u, st: %u, dst: %u dt: %u\n" + "target[%u]:'%s', data[%llu]:\n%s------------------\n\n", + (unsigned long)(req), + (unsigned int)req->op, + (unsigned long long)req->offset, + (unsigned long)req->size, + (unsigned long)req->serviced, + (unsigned int)req->state, + (unsigned int)req->src_portno, + (unsigned int)req->src_transit_portno, + (unsigned int)req->dst_portno, + (unsigned int)req->dst_transit_portno, + (unsigned int)req->targetlen, target, + (unsigned long long)req->datalen, data); + } +} +void log_pr(char *msg, struct peer_req *pr) +{ + char target[64], data[64]; + char *req_target, *req_data; + struct peerd *peer = pr->peer; + struct xseg *xseg = pr->peer->xseg; + req_target = xseg_get_target(xseg, pr->req); + req_data = xseg_get_data(xseg, pr->req); + /* null terminate name in case of req->target is less than 63 characters, + * and next character after name (aka first byte of next buffer) is not + * null + */ + unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen; + if (verbose) { + strncpy(target, req_target, end); + target[end] = 0; + strncpy(data, req_data, 63); + data[63] = 0; + printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n" + "target[%u]:'%s', data[%llu]:\n%s------------------\n\n", + msg, + (unsigned int)(pr - peer->peer_reqs), + (unsigned int)pr->req->op, + (unsigned long long)pr->req->offset, + (unsigned long)pr->req->size, + (unsigned long)pr->req->serviced, + (unsigned long)pr->retval, + (unsigned int)pr->req->state, + (unsigned int)pr->req->targetlen, target, + (unsigned long long)pr->req->datalen, data); + } +} + +inline struct peer_req *alloc_peer_req(struct peerd *peer) +{ + xqindex idx = xq_pop_head(&peer->free_reqs, 1); + if (idx == Noneidx) + return NULL; + return peer->peer_reqs + idx; +} + +inline void free_peer_req(struct peerd *peer, struct peer_req *pr) +{ + xqindex idx = pr - peer->peer_reqs; + pr->req = NULL; + xq_append_head(&peer->free_reqs, idx, 1); +} + +inline static struct thread* alloc_thread(struct peerd *peer) +{ + xqindex idx = xq_pop_head(&peer->threads, 1); + if (idx == Noneidx) + return NULL; + return peer->thread + idx; +} + +inline static void free_thread(struct peerd *peer, struct thread *t) +{ + xqindex idx = t - peer->thread; + xq_append_head(&peer->threads, idx, 1); +} + + +inline static void __wake_up_thread(struct thread *t) +{ + pthread_mutex_lock(&t->lock); + pthread_cond_signal(&t->cond); + pthread_mutex_unlock(&t->lock); +} + +inline static void wake_up_thread(struct thread* t) +{ + if (t){ + __wake_up_thread(t); + } +} + +inline static int wake_up_next_thread(struct peerd *peer) +{ + //struct thread *t = alloc_thread(peer); + //wake_up_thread(t); + //return t; + return (xseg_signal(peer->xseg, peer->portno_start)); +} + +struct timeval resp_start, resp_end, resp_accum = {0, 0}; +uint64_t responds = 0; +void get_responds_stats(){ + printf("Time waiting respond %lu.%06lu sec for %llu times.\n", + //(unsigned int)(t - peer->thread), + resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds); +} + +//FIXME error check +void fail(struct peerd *peer, struct peer_req *pr) +{ + struct xseg_request *req = pr->req; + uint32_t p; + XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs)); + req->state |= XS_FAILED; + //xseg_set_req_data(peer->xseg, pr->req, NULL); + p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); + xseg_signal(peer->xseg, p); + free_peer_req(peer, pr); + wake_up_next_thread(peer); +} + +//FIXME error check +void complete(struct peerd *peer, struct peer_req *pr) +{ + struct xseg_request *req = pr->req; + uint32_t p; + req->state |= XS_SERVED; + //xseg_set_req_data(peer->xseg, pr->req, NULL); + //gettimeofday(&resp_start, NULL); + p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC); + //gettimeofday(&resp_end, NULL); + //responds++; + //timersub(&resp_end, &resp_start, &resp_end); + //timeradd(&resp_end, &resp_accum, &resp_accum); + //printf("xseg_signal: %u\n", p); + xseg_signal(peer->xseg, p); + free_peer_req(peer, pr); + wake_up_next_thread(peer); +} + +void pending(struct peerd *peer, struct peer_req *pr) +{ + pr->req->state = XS_PENDING; +} + +static void handle_accepted(struct peerd *peer, struct peer_req *pr, + struct xseg_request *req) +{ + struct xseg_request *xreq = pr->req; + //assert xreq == req; + XSEGLOG2(&lc, D, "Handle accepted"); + xreq->serviced = 0; + //xreq->state = XS_ACCEPTED; + pr->retval = 0; + dispatch(peer, pr, req); +} + +static void handle_received(struct peerd *peer, struct peer_req *pr, + struct xseg_request *req) +{ + //struct xseg_request *req = pr->req; + //assert req->state != XS_ACCEPTED; + XSEGLOG2(&lc, D, "Handle received \n"); + dispatch(peer, pr, req); + +} +struct timeval sub_start, sub_end, sub_accum = {0, 0}; +uint64_t submits = 0; +void get_submits_stats(){ + printf("Time waiting submit %lu.%06lu sec for %llu times.\n", + //(unsigned int)(t - peer->thread), + sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits); +} + +int submit_peer_req(struct peerd *peer, struct peer_req *pr) +{ + uint32_t ret; + struct xseg_request *req = pr->req; + // assert req->portno == peer->portno ? + //TODO small function with error checking + XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs)); + ret = xseg_set_req_data(peer->xseg, req, (void *)(pr)); + if (ret < 0) + return -1; + //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req)); + //gettimeofday(&sub_start, NULL); + ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC); + //gettimeofday(&sub_end, NULL); + //submits++; + //timersub(&sub_end, &sub_start, &sub_end); + //timeradd(&sub_end, &sub_accum, &sub_accum); + if (ret == NoPort) + return -1; + xseg_signal(peer->xseg, ret); + return 0; +} + +int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg) +{ + struct thread *t = alloc_thread(peer); + if (t) { + t->func = func; + t->arg = arg; + wake_up_thread(t); + return 0; + } else + // we could hijack a thread + return -1; +} + +static void* thread_loop(void *arg) +{ + struct thread *t = (struct thread *) arg; + struct peerd *peer = t->peer; + struct xseg *xseg = peer->xseg; + xport portno_start = peer->portno_start; + xport portno_end = peer->portno_end; + struct peer_req *pr; + uint64_t threshold=1; + pid_t pid =syscall(SYS_gettid); + uint64_t loops; + struct xseg_request *accepted, *received; + int r; + int change; + xport i; + + XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread)); + + XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid); + xseg_init_local_signal(xseg, peer->portno_start); + for (;;) { + if (t->func) { + XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread)); + xseg_cancel_wait(xseg, peer->portno_start); + t->func(t->arg); + t->func = NULL; + t->arg = NULL; + continue; + } + + for(loops= threshold; loops > 0; loops--) { + do { + if (loops == 1) + xseg_prepare_wait(xseg, peer->portno_start); + change = 0; + for (i = portno_start; i <= portno_end; i++) { + accepted = NULL; + received = NULL; + pr = alloc_peer_req(peer); + if (pr) { + accepted = xseg_accept(xseg, i, X_NONBLOCK); + if (accepted) { + XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread)); + pr->req = accepted; + pr->portno = i; + xseg_cancel_wait(xseg, i); + handle_accepted(peer, pr, accepted); + change = 1; + } + else { + free_peer_req(peer, pr); + } + } + received = xseg_receive(xseg, i, X_NONBLOCK); + if (received) { + //printf("received req id: %u\n", received - xseg->requests); + //print_req(peer->xseg, received); + r = xseg_get_req_data(xseg, received, (void **) &pr); + if (r < 0 || !pr){ + //FIXME what to do here ? + XSEGLOG2(&lc, W, "Received request with no pr data\n"); + xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC); + //FIXME if fails, put req + } + xseg_cancel_wait(xseg, i); + handle_received(peer, pr, received); + change = 1; + } + } + if (change) + loops = threshold; + }while (change); + } + XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread)); + xseg_wait_signal(xseg, 10000000UL); + xseg_cancel_wait(xseg, peer->portno_start); + XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread)); + } + return NULL; +} + +void defer_request(struct peerd *peer, struct peer_req *pr) +{ + // assert canDefer(peer); +// xseg_submit(peer->xseg, peer->defer_portno, pr->req); +// xseg_signal(peer->xseg, peer->defer_portno); +// free_peer_req(peer, pr); +} + +static int peerd_loop(struct peerd *peer) +{ + if (peer->interactive_func) + peer->interactive_func(); + for (;;) { + pthread_join(peer->thread[0].tid, NULL); + } + return 0; +} + +static struct xseg *join(char *spec) +{ + struct xseg_config config; + struct xseg *xseg; + + (void)xseg_parse_spec(spec, &config); + xseg = xseg_join(config.type, config.name, "posix", NULL); + if (xseg) + return xseg; + + (void)xseg_create(&config); + return xseg_join(config.type, config.name, "posix", NULL); +} + +int peerd_start_threads(struct peerd *peer) +{ + int i; + uint32_t nr_threads = peer->nr_threads; + //TODO err check + for (i = 0; i < nr_threads; i++) { + peer->thread[i].peer = peer; + pthread_cond_init(&peer->thread[i].cond,NULL); + pthread_mutex_init(&peer->thread[i].lock, NULL); + pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i)); + peer->thread[i].func = NULL; + peer->thread[i].arg = NULL; + + } + return 0; +} + +static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start, + long portno_end, uint32_t nr_threads, uint32_t defer_portno) +{ + int i; + struct peerd *peer; + peer = malloc(sizeof(struct peerd)); + if (!peer) { + perror("malloc"); + return NULL; + } + peer->nr_ops = nr_ops; + peer->defer_portno = defer_portno; + peer->nr_threads = nr_threads; + + peer->thread = calloc(nr_threads, sizeof(struct thread)); + if (!peer->thread) + goto malloc_fail; + peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req)); + if (!peer->peer_reqs){ +malloc_fail: + perror("malloc"); + return NULL; + } + + if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops)) + goto malloc_fail; + if (!xq_alloc_empty(&peer->threads, nr_threads)) + goto malloc_fail; + + if (xseg_initialize()){ + printf("cannot initialize library\n"); + return NULL; + } + peer->xseg = join(spec); + if (!peer->xseg) + return NULL; + + peer->portno_start = (xport) portno_start; + peer->portno_end= (xport) portno_end; + peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL); + if (!peer->port){ + printf("cannot bind to port %ld\n", peer->portno_start); + return NULL; + } + + xport p; + for (p = peer->portno_start + 1; p <= peer->portno_end; p++) { + struct xseg_port *tmp; + tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port)); + if (!tmp){ + printf("cannot bind to port %ld\n", p); + return NULL; + } + } + + printf("Peer on ports %u-%u\n", peer->portno_start, + peer->portno_end); + + for (i = 0; i < nr_ops; i++) { + peer->peer_reqs[i].peer = peer; + peer->peer_reqs[i].req = NULL; + peer->peer_reqs[i].retval = 0; + peer->peer_reqs[i].priv = NULL; + } + peer->interactive_func = NULL; + return peer; +} + + +int main(int argc, char *argv[]) +{ + struct peerd *peer = NULL; + //parse args + char *spec = ""; + int i, r; + long portno_start = -1, portno_end = -1; + //set defaults here + uint32_t nr_ops = 16; + uint32_t nr_threads = 1 ; + unsigned int debug_level = 0; + uint32_t defer_portno = NoPort; + char *logfile = NULL; + + //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level + // -dp xseg_portno to defer blocking requests + // -l log file ? + //TODO print messages on arg parsing error + + for (i = 1; i < argc; i++) { + if (!strcmp(argv[i], "-g") && i + 1 < argc) { + spec = argv[i+1]; + i += 1; + continue; + } + + if (!strcmp(argv[i], "-sp") && i + 1 < argc) { + portno_start = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + + if (!strcmp(argv[i], "-ep") && i + 1 < argc) { + portno_end = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + + if (!strcmp(argv[i], "-n") && i + 1 < argc) { + nr_ops = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + if (!strcmp(argv[i], "-v") && i + 1 < argc ) { + debug_level = atoi(argv[i+1]); + i += 1; + continue; + } + if (!strcmp(argv[i], "-dp") && i + 1 < argc ) { + defer_portno = strtoul(argv[i+1], NULL, 10); + i += 1; + continue; + } + if (!strcmp(argv[i], "-l") && i + 1 < argc ) { + logfile = argv[i+1]; + i += 1; + continue; + } + + } + init_logctx(&lc, argv[0], debug_level, logfile); + XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid)); + + //TODO perform argument sanity checks + verbose = debug_level; + + //TODO err check + peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno); + if (!peer) + return -1; + r = custom_peer_init(peer, argc, argv); + if (r < 0) + return -1; + peerd_start_threads(peer); + return peerd_loop(peer); +} diff --git a/xseg/peers/user/vlmc-tool.c b/xseg/peers/user/vlmc-tool.c index 967df1c..d4bdf18 100644 --- a/xseg/peers/user/vlmc-tool.c +++ b/xseg/peers/user/vlmc-tool.c @@ -79,7 +79,7 @@ int wait_reply(struct xseg_request *expected_req) struct xseg_request *rec; xseg_prepare_wait(xseg, srcport); while(1) { - rec = xseg_receive(xseg, srcport); + rec = xseg_receive(xseg, srcport, 0); if (rec) { if (rec != expected_req) { fprintf(stderr, "Unknown received req. Putting req.\n"); diff --git a/xseg/peers/user/xseg-tool.c b/xseg/peers/user/xseg-tool.c index b18a869..74ce401 100644 --- a/xseg/peers/user/xseg-tool.c +++ b/xseg/peers/user/xseg-tool.c @@ -510,28 +510,28 @@ int cmd_bridge(uint32_t portno1, uint32_t portno2, char *logfile, char *how) active = 0; //FIXME - req = xseg_accept(xseg, portno1); + req = xseg_accept(xseg, portno1, 0); if (req) { xseg_submit(xseg, req, portno2, X_ALLOC); log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req); active += 1; } - req = xseg_accept(xseg, portno2); + req = xseg_accept(xseg, portno2, 0); if (req) { xseg_submit(xseg, req, portno1, X_ALLOC); log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req); active += 1; } - req = xseg_receive(xseg, portno1); + req = xseg_receive(xseg, portno1, 0); if (req) { xseg_respond(xseg, req, portno2, X_ALLOC); log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req); active += 1; } - req = xseg_receive(xseg, portno2); + req = xseg_receive(xseg, portno2, 0); if (req) { xseg_respond(xseg, req, portno1, X_ALLOC); log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req); @@ -632,7 +632,7 @@ int cmd_rndwrite(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksiz } } - received = xseg_receive(xseg, srcport); + received = xseg_receive(xseg, srcport, 0); if (received) { xseg_cancel_wait(xseg, srcport); nr_received += 1; @@ -738,7 +738,7 @@ int cmd_rndread(long loops, int32_t seed, uint32_t targetlen, uint32_t chunksize } } - received = xseg_receive(xseg, srcport); + received = xseg_receive(xseg, srcport, 0); if (received) { xseg_cancel_wait(xseg, srcport); nr_received += 1; @@ -832,7 +832,7 @@ int cmd_submit_reqs(long loops, long concurrent_reqs, int op) perror("Cannot signal peer"); } } - received = xseg_receive(xseg, srcport); + received = xseg_receive(xseg, srcport, 0); if (received) { xseg_cancel_wait(xseg, srcport); --nr_flying; @@ -955,7 +955,7 @@ int cmd_put_requests(void) struct xseg_request *req; for (;;) { - req = xseg_accept(xseg, dstport); + req = xseg_accept(xseg, dstport, 0); if (!req) break; if (xseg_put_request(xseg, req, srcport)) @@ -975,7 +975,7 @@ int cmd_finish(unsigned long nr, int fail) for (; nr--;) { xseg_prepare_wait(xseg, srcport); - req = xseg_accept(xseg, srcport); + req = xseg_accept(xseg, srcport, 0); if (req) { req_target = xseg_get_target(xseg, req); req_data = xseg_get_data(xseg, req); @@ -1054,7 +1054,7 @@ int cmd_wait(uint32_t nr) init_local_signal(); for (;;) { - req = xseg_receive(xseg, srcport); + req = xseg_receive(xseg, srcport, 0); if (req) { handle_reply(req); nr--; @@ -1081,7 +1081,7 @@ int cmd_put_replies(void) struct xseg_request *req; for (;;) { - req = xseg_receive(xseg, dstport); + req = xseg_receive(xseg, dstport, 0); if (!req) break; fprintf(stderr, "request: %08llx%08llx\n" diff --git a/xseg/sys/util.h b/xseg/sys/util.h index 52e04d7..f1abc46 100644 --- a/xseg/sys/util.h +++ b/xseg/sys/util.h @@ -20,8 +20,9 @@ } while(0) /* general purpose xflags */ -#define X_ALLOC ((uint32_t) (1 << 0)) -#define X_LOCAL ((uint32_t) (1 << 1)) +#define X_ALLOC ((uint32_t) (1 << 0)) +#define X_LOCAL ((uint32_t) (1 << 1)) +#define X_NONBLOCK ((uint32_t) (1 << 2)) typedef uint64_t xpointer; diff --git a/xseg/tools/vlmc b/xseg/tools/vlmc index 3922608..d62d062 100755 --- a/xseg/tools/vlmc +++ b/xseg/tools/vlmc @@ -75,7 +75,7 @@ def vlmc_map(args): port = prev + 1 fd = os.open(XSEGBD_SYSFS + "add", os.O_WRONLY) - os.write(fd, "%s %d:%d:%d" % (name, port, VPORT, REQS)) + os.write(fd, "%s %d:%d:%d" % (name, port, port + VPORT_START-4, REQS)) os.close(fd) except Exception, reason: print >> sys.stderr, reason diff --git a/xseg/xseg/xseg.c b/xseg/xseg/xseg.c index 8bb6220..fc3f207 100644 --- a/xseg/xseg/xseg.c +++ b/xseg/xseg/xseg.c @@ -1334,7 +1334,7 @@ out: } -struct xseg_request *xseg_receive(struct xseg *xseg, xport portno) +struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags) { xqindex xqi; xserial serial = NoSerial; @@ -1344,7 +1344,12 @@ struct xseg_request *xseg_receive(struct xseg *xseg, xport portno) if (!port) return NULL; retry: - xlock_acquire(&port->pq_lock, portno); + if (flags & X_NONBLOCK) { + if (!xlock_try_lock(&port->pq_lock, portno)) + return NULL; + } else { + xlock_acquire(&port->pq_lock, portno); + } q = XPTR_TAKE(port->reply_queue, xseg->segment); xqi = __xq_pop_head(q); xlock_release(&port->pq_lock); @@ -1365,7 +1370,7 @@ retry: return req; } -struct xseg_request *xseg_accept(struct xseg *xseg, xport portno) +struct xseg_request *xseg_accept(struct xseg *xseg, xport portno, uint32_t flags) { xqindex xqi; struct xq *q; @@ -1373,7 +1378,13 @@ struct xseg_request *xseg_accept(struct xseg *xseg, xport portno) struct xseg_port *port = xseg_get_port(xseg, portno); if (!port) return NULL; - xlock_acquire(&port->rq_lock, portno); + if (flags & X_NONBLOCK) { + if (!xlock_try_lock(&port->rq_lock, portno)) + return NULL; + } else { + xlock_acquire(&port->rq_lock, portno); + } + q = XPTR_TAKE(port->request_queue, xseg->segment); xqi = __xq_pop_head(q); xlock_release(&port->rq_lock); diff --git a/xseg/xseg/xseg.h b/xseg/xseg/xseg.h index c8e85dc..745ab77 100644 --- a/xseg/xseg/xseg.h +++ b/xseg/xseg/xseg.h @@ -337,13 +337,15 @@ struct xseg_request * xseg_get_request ( struct xseg * xseg, uint32_t flags ); struct xseg_request * xseg_receive ( struct xseg * xseg, - xport portno ); + xport portno, + uint32_t flags ); /* \___________________/ \_________/ */ /* ___________________ _________ */ /* / \ / \ */ struct xseg_request * xseg_accept ( struct xseg * xseg, - xport portno ); + xport portno, + uint32_t flags ); xport xseg_respond ( struct xseg * xseg, struct xseg_request * xreq, -- 1.7.10.4