[ -z "${HOSTNAME}" ] && HOSTNAME=`hostname`
[ -z "${XSEG_HOME}" ] && XSEG_HOME="/root/archip/xseg"
[ -z "${MODULES_DIR}" ] && MODULES_DIR="${XSEG_HOME}/lib/kernel"
- [ -z "${SPEC}" ] && SPEC="segdev:xsegbd:16:1024:12"
+ [ -z "${SPEC}" ] && SPEC="segdev:xsegbd:512:1024:12"
[ -z "${REQS}" ] && REQS=512
- [ -z "${PORTS}" ] && PORTS=16
+ [ -z "${PORTS}" ] && PORTS=512
[ -z "${IMAGES}" ] && IMAGES="/home/user/archip/xseg/peers/user/foo"
[ -z "${MAPS}" ] && MAPS="/home/user/archip/xseg/peers/user/foo"
[ -z "${PITHOS}" ] && PITHOS="/srv/pithos/data/blocks"
[ -z "${CHRDEV_MAJOR}" ] && CHRDEV_MAJOR=60
[ -z "${NR_OPS}" ] && NR_OPS=128
[ -z "${BPORT}" ] && BPORT=0
- [ -z "${VPORT}" ] && VPORT=2
+ [ -z "${VPORT_START}" ] && VPORT_START=4
+ [ -z "${VPORT_END}" ] && VPORT_END=200
[ -z "${MPORT}" ] && MPORT=1
- [ -z "${MBPORT}" ] && MBPORT=3
- [ -z "${VTOOL}" ] && VTOOL=15
+ [ -z "${MBPORT}" ] && MBPORT=2
+ [ -z "${VTOOL}" ] && VTOOL=3
}
}
function spawn_vlmcd {
- pgrep -f "peers/user/mt-vlmcd" || \
- "${XSEG_HOME}/peers/user/mt-vlmcd" -t 1 -p "$VPORT" -bp "$BPORT" -mp "$MPORT" -g \
+ pgrep -f "peers/user/st-vlmcd" || \
+ "${XSEG_HOME}/peers/user/st-vlmcd" -t 1 -sp "$VPORT_START" \
+ -ep "$VPORT_END" -bp "$BPORT" -mp "$MPORT" -g \
"${SPEC}" -n ${NR_OPS} &> "${XSEG_LOGS}/vlmcd-${HOSTNAME}" &
# alloc_requests "$VPORT:0" 128
}
function spawn_mapperdc {
pgrep -f "mt-mapperd" || \
- "${XSEG_HOME}/peers/user/mt-mapperd" -t 1 -p "$MPORT" -bp "$BPORT" -mbp "$MBPORT" -g "${SPEC}"\
+ "${XSEG_HOME}/peers/user/mt-mapperd" -t 1 -sp "$MPORT" -ep "$MPORT"\
+ -bp "$BPORT" -mbp "$MBPORT" -g "${SPEC}"\
-n ${NR_OPS} &> "${XSEG_LOGS}/mapperd-${HOSTNAME}" &
# alloc_requests "$MPORT:0" 128
}
spawn_vlmcd
;;
stop)
- pkill -f peers/user/mt-vlmcd || true
+ pkill -f peers/user/st-vlmcd || true
# free_requests "$VPORT:0" 128 || true
pkill -f peers/user/mt-mapperd || true
# free_requests "$MPORT:0" 128 || true
for (;;) {
xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
- xreq = xseg_receive(xsegbd_dev->xseg, portno);
+ xreq = xseg_receive(xsegbd_dev->xseg, portno, 0);
if (!xreq)
break;
#all: filed xseg sosd vlmcd mapperd
#all: filed xseg vlmcd mapperd
-all: filed xseg mt-sosd dummy mt-vlmcd mapperd mt-mapperd pfiled monitor vlmc-xseg
+all: filed xseg mt-sosd dummy mt-vlmcd mapperd mt-mapperd pfiled monitor vlmc-xseg st-vlmcd
filed: filed.c $(BASE)/xseg/xseg.h
mt-vlmcd: mt-vlmcd.c mpeer.c mpeer.h $(BASE)/xseg/protocol.h
$(CC) $(CFLAGS) -o $@ $< mpeer.c $(INC) -L$(LIB) -lxseg -lpthread
+st-vlmcd: mt-vlmcd.c speer.c mpeer.h $(BASE)/xseg/protocol.h
+ $(CC) $(CFLAGS) -o $@ $< speer.c $(INC) -L$(LIB) -lxseg -lpthread
+
mt-mapperd: mt-mapperd.c mpeer.c mpeer.h $(BASE)/xseg/protocol.h
$(CC) $(CFLAGS) -o $@ $< mpeer.c $(INC) -L$(LIB) -lxseg -lpthread -lgcrypt
$(CC) $(CFLAGS) -o $@ $< $(INC) -L$(LIB) -lxseg
clean:
- rm -f filed xseg sosd vlmcd mapperd mt-sosd dummy monitor mt-mapperd pfiled vlmc-xseg
+ rm -f filed xseg sosd vlmcd mapperd mt-sosd dummy monitor mt-mapperd pfiled vlmc-xseg st-vlmcd
for (;;) {
accepted = NULL;
- accepted = xseg_accept(xseg, portno);
+ accepted = xseg_accept(xseg, portno, 0);
if (accepted) {
io->req = accepted;
wake_up_next_iothread(store);
ret = xseg_prepare_wait(xseg, mportno);
always_assert(ret == 0);
- xreq = xseg_accept(xseg, mportno);
+ xreq = xseg_accept(xseg, mportno, 0);
if (xreq) {
xseg_cancel_wait(xseg, mportno);
/*
int mpause(struct peerd *peer)
{
struct xseg *xseg = peer->xseg;
- struct xseg_port *port = xseg_get_port(xseg, peer->portno);
+ struct xseg_port *port = xseg_get_port(xseg, peer->portno_start);
if (!port)
return -1;
- xlock_acquire(&port->rq_lock, peer->portno);
- xlock_acquire(&port->pq_lock, peer->portno);
+ xlock_acquire(&port->rq_lock, peer->portno_start);
+ xlock_acquire(&port->pq_lock, peer->portno_start);
return 0;
}
int munpause(struct peerd *peer)
{
struct xseg *xseg = peer->xseg;
- struct xseg_port *port = xseg_get_port(xseg, peer->portno);
+ struct xseg_port *port = xseg_get_port(xseg, peer->portno_start);
if (!port)
return -1;
//struct thread *t = alloc_thread(peer);
//wake_up_thread(t);
//return t;
- return (xseg_signal(peer->xseg, peer->portno));
+ return (xseg_signal(peer->xseg, peer->portno_start));
}
struct timeval resp_start, resp_end, resp_accum = {0, 0};
XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
req->state |= XS_FAILED;
//xseg_set_req_data(peer->xseg, pr->req, NULL);
- p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
xseg_signal(peer->xseg, p);
free_peer_req(peer, pr);
wake_up_next_thread(peer);
req->state |= XS_SERVED;
//xseg_set_req_data(peer->xseg, pr->req, NULL);
//gettimeofday(&resp_start, NULL);
- p = xseg_respond(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
//gettimeofday(&resp_end, NULL);
//responds++;
//timersub(&resp_end, &resp_start, &resp_end);
return -1;
//printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
//gettimeofday(&sub_start, NULL);
- ret = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
//gettimeofday(&sub_end, NULL);
//submits++;
//timersub(&sub_end, &sub_start, &sub_end);
struct thread *t = (struct thread *) arg;
struct peerd *peer = t->peer;
struct xseg *xseg = peer->xseg;
- uint32_t portno = peer->portno;
+ xport portno_start = peer->portno_start;
+ xport portno_end = peer->portno_end;
struct peer_req *pr;
- uint64_t threshold=1000;
+ uint64_t threshold=1;
pid_t pid =syscall(SYS_gettid);
uint64_t loops;
struct xseg_request *accepted, *received;
int r;
+ int change;
+ xport i;
XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
- xseg_init_local_signal(xseg, portno);
+ xseg_init_local_signal(xseg, peer->portno_start);
for (;;) {
if (t->func) {
XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
- xseg_cancel_wait(xseg, portno);
+ xseg_cancel_wait(xseg, peer->portno_start);
t->func(t->arg);
t->func = NULL;
t->arg = NULL;
}
for(loops= threshold; loops > 0; loops--) {
- accepted = NULL;
- received = NULL;
- if (loops == 1)
- xseg_prepare_wait(xseg, portno);
-
-// if (xq_count(&peer->xport->request_queue)){
- pr = alloc_peer_req(peer);
- if (pr) {
- accepted = xseg_accept(xseg, peer->portno);
- if (accepted) {
- XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
- pr->req = accepted;
- xseg_cancel_wait(xseg, portno);
- wake_up_next_thread(peer);
- handle_accepted(peer, pr, accepted);
- loops = threshold;
+ do {
+ if (loops == 1)
+ xseg_prepare_wait(xseg, peer->portno_start);
+ change = 0;
+ for (i = portno_start; i <= portno_end; i++) {
+ accepted = NULL;
+ received = NULL;
+ pr = alloc_peer_req(peer);
+ if (pr) {
+ accepted = xseg_accept(xseg, i, X_NONBLOCK);
+ if (accepted) {
+ XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
+ pr->req = accepted;
+ pr->portno = i;
+ xseg_cancel_wait(xseg, i);
+ wake_up_next_thread(peer);
+ handle_accepted(peer, pr, accepted);
+ change = 1;
+ }
+ else {
+ free_peer_req(peer, pr);
+ }
}
- else {
- free_peer_req(peer, pr);
+ received = xseg_receive(xseg, i, X_NONBLOCK);
+ if (received) {
+ //printf("received req id: %u\n", received - xseg->requests);
+ //print_req(peer->xseg, received);
+ r = xseg_get_req_data(xseg, received, (void **) &pr);
+ if (r < 0 || !pr){
+ //FIXME what to do here ?
+ XSEGLOG2(&lc, W, "Received request with no pr data\n");
+ xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+ //FIXME if fails, put req
+ }
+ xseg_cancel_wait(xseg, i);
+ wake_up_next_thread(peer);
+ handle_received(peer, pr, received);
+ change = 1;
}
}
-// }
-// if (xq_count(&peer->xport->reply_queue)){
- received = xseg_receive(xseg, peer->portno);
- if (received) {
- //printf("received req id: %u\n", received - xseg->requests);
- //print_req(peer->xseg, received);
- r = xseg_get_req_data(xseg, received, (void **) &pr);
- if (r < 0 || !pr){
- //FIXME what to do here ?
- XSEGLOG2(&lc, W, "Received request with no pr data\n");
- xseg_respond(peer->xseg, received, peer->portno, X_ALLOC);
- //if fails, put req
- }
- //fail(peer, received);
- //assert pr->req == received;
- xseg_cancel_wait(xseg, portno);
- wake_up_next_thread(peer);
- handle_received(peer, pr, received);
+ if (change)
loops = threshold;
- }
-// }
+ }while (change);
}
XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
xseg_wait_signal(xseg, 10000000UL);
- xseg_cancel_wait(xseg, portno);
+ xseg_cancel_wait(xseg, peer->portno_start);
XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
}
return NULL;
return 0;
}
-static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno, uint32_t nr_threads, uint32_t defer_portno)
+static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
+ long portno_end, uint32_t nr_threads, uint32_t defer_portno)
{
int i;
struct peerd *peer;
if (!peer->xseg)
return NULL;
- peer->xport = xseg_bind_port(peer->xseg, portno, NULL);
- if (!peer->xport){
- printf("cannot bind to port %ld\n", portno);
+ peer->portno_start = (xport) portno_start;
+ peer->portno_end= (xport) portno_end;
+ peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
+ if (!peer->port){
+ printf("cannot bind to port %ld\n", peer->portno_start);
return NULL;
}
- printf("%lx\n", (unsigned long) peer->xport);
- peer->portno = xseg_portno(peer->xseg, peer->xport);
- printf("Peer on port %u/%u\n", peer->portno,
- peer->xseg->config.nr_ports);
+
+ xport p;
+ for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
+ struct xseg_port *tmp;
+ tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port));
+ if (!tmp){
+ printf("cannot bind to port %ld\n", p);
+ return NULL;
+ }
+ }
+
+ printf("Peer on ports %u-%u\n", peer->portno_start,
+ peer->portno_end);
for (i = 0; i < nr_ops; i++) {
peer->peer_reqs[i].peer = peer;
//parse args
char *spec = "";
int i, r;
- long portno = -1;
+ long portno_start = -1, portno_end = -1;
//set defaults here
uint32_t nr_ops = 16;
uint32_t nr_threads = 16 ;
continue;
}
- if (!strcmp(argv[i], "-p") && i + 1 < argc) {
- portno = strtoul(argv[i+1], NULL, 10);
+ if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
+ portno_start = strtoul(argv[i+1], NULL, 10);
+ i += 1;
+ continue;
+ }
+
+ if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
+ portno_end = strtoul(argv[i+1], NULL, 10);
i += 1;
continue;
}
verbose = debug_level;
//TODO err check
- peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno);
+ peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
if (!peer)
return -1;
r = custom_peer_init(peer, argc, argv);
#include <stddef.h>
+#include <xseg/xseg.h>
/* main mpeer structs */
struct peer_req {
struct peerd *peer;
struct xseg_request *req;
ssize_t retval;
+ xport portno;
void *priv;
};
struct peerd {
struct xseg *xseg;
- struct xseg_port *xport;
- uint32_t portno;
+ struct xseg_port *port;
+ xport portno_start;
+ xport portno_end;
long nr_ops;
uint32_t nr_threads;
uint32_t defer_portno;
goto out_hash;
- req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
+ req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
m->volume);
m->volume);
goto out_put;
}
- p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for map %s",
m->volume);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_fail:
remove_map(mapper, m);
{
void *dummy;
struct mapperd *mapper = __get_mapperd(peer);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
mn->object, map->volume, (unsigned long long) mn->objectidx);
goto out_put;
}
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
"(Map: %s [%llu]",
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
"(Map: %s [%llu]",
struct mapperd *mapper = __get_mapperd(peer);
struct map_node *mn;
uint64_t i, pos, max_objidx = calc_map_obj(map);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
map->volume);
goto out_put;
}
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for map %s",
map->volume);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
return -1;
if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE))
goto copyup_zeroblock;
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->bportno, X_ALLOC);
if (!req)
goto out_err;
if (r<0)
goto out_put;
r = __set_copyup_node(mio, req, mn);
- p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort) {
goto out_unset;
}
r = __set_copyup_node(mio, req, NULL);
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
return -1;
if (r < 0)
goto out_fail;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_LOADING;
XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
uint64_t qsize = xq_count(&map->pending);
out_fail:
XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_LOADING;
while((idx = __xq_pop_head(&map->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
strncpy(buf, target, req->targetlen);
buf[req->targetlen] = 0;
XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
goto out_fail;
}
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_WRITING;
XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
uint64_t qsize = xq_count(&map->pending);
out_fail:
XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_WRITING;
while((idx = __xq_pop_head(&map->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
strncpy(buf, target, req->targetlen);
buf[req->targetlen] = 0;
XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
goto out_fail;
}
mn->flags |= MF_OBJECT_WRITING;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
return 0;
out_fail:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
__set_copyup_node(mio, req, NULL);
while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
struct peer_req * preq = (struct peer_req *) idx;
strncpy(mn->object, tmp.object, tmp.objectlen);
mn->object[tmp.objectlen] = 0;
mn->objectlen = tmp.objectlen;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
uint64_t qsize = xq_count(&mn->pending);
out_fail:
XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
fail(peer, preq);
out_err:
XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
return MF_PENDING;
}
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->bportno, X_ALLOC);
if (!req)
goto out_err;
if (r < 0)
goto out_put;
__set_copyup_node(mio, req, mn);
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort)
goto out_unset;
r = xseg_signal(peer->xseg, p);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
return -1;
void *dummy;
struct mapperd *mapper = __get_mapperd(peer);
struct mapper_io *mio = __get_mapper_io(pr);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req)
goto out_err;
if (r < 0)
goto out_put;
__set_copyup_node(mio, req, NULL);
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort)
goto out_unset;
r = xseg_signal(peer->xseg, p);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
return -1;
//map block delete
map = find_map(mapper, target, req->targetlen);
if (!map) {
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
handle_map_delete(peer, pr, map, err);
//object delete
map = mn->map;
if (!map) {
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
handle_object_delete(peer, pr, mn, err);
}
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return 0;
}
return 0;
}
vio->err = 0; //reset error state
- vio->mreq = xseg_get_request(peer->xseg, peer->portno,
+ vio->mreq = xseg_get_request(peer->xseg, pr->portno,
vlmc->mportno, X_ALLOC);
if (!vio->mreq)
goto out_err;
}
xseg_set_req_data(peer->xseg, vio->mreq, pr);
__set_vio_state(vio, MAPPING);
- p = xseg_submit(peer->xseg, vio->mreq, peer->portno, X_ALLOC);
+ p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
if (p == NoPort)
goto out_unset;
r = xseg_signal(peer->xseg, p);
out_unset:
xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
out_put:
- xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+ xseg_put_request(peer->xseg, vio->mreq, pr->portno);
out_err:
__set_vio_state(vio, CONCLUDED);
fail(peer, pr);
/* FIXME shouldn's XS_FAILED be sufficient ?? */
if (vio->mreq->state & XS_FAILED && !(vio->mreq->state & XS_SERVED)){
fprintf(stderr, "req %lx (op: %d) failed\n", vio->mreq, vio->mreq->op);
- xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+ xseg_put_request(peer->xseg, vio->mreq, pr->portno);
vio->mreq = NULL;
__set_vio_state(vio, CONCLUDED);
fail(peer, pr);
struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
char *data = xseg_get_data(peer->xseg, pr->req);
*(off_t *)data = xinfo->size;
- xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+ xseg_put_request(peer->xseg, vio->mreq, pr->portno);
vio->mreq = NULL;
__set_vio_state(vio, CONCLUDED);
complete(peer, pr);
} else if (vio->mreq->op == X_CLOSE) {
- struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
- xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+ xseg_put_request(peer->xseg, vio->mreq, pr->portno);
vio->mreq = NULL;
__set_vio_state(vio, CONCLUDED);
complete(peer, pr);
struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
if (!mreply->cnt){
printf("foo2\n");
- xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+ xseg_put_request(peer->xseg, vio->mreq, pr->portno);
vio->mreq = NULL;
__set_vio_state(vio, CONCLUDED);
fail(peer, pr);
vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
if (!vio->breqs) {
printf("foo3\n");
- xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+ xseg_put_request(peer->xseg, vio->mreq, pr->portno);
vio->mreq = NULL;
__set_vio_state(vio, CONCLUDED);
fail(peer, pr);
datalen = mreply->segs[i].size;
offset = mreply->segs[i].offset;
targetlen = mreply->segs[i].targetlen;
- breq = xseg_get_request(peer->xseg, peer->portno, vlmc->bportno, X_ALLOC);
+ breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
if (!breq) {
vio->err = 1;
break;
r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
if (r < 0) {
vio->err = 1;
- xseg_put_request(peer->xseg, breq, peer->portno);
+ xseg_put_request(peer->xseg, breq, pr->portno);
break;
}
breq->offset = offset;
target = xseg_get_target(peer->xseg, breq);
if (!target) {
vio->err = 1;
- xseg_put_request(peer->xseg, breq, peer->portno);
+ xseg_put_request(peer->xseg, breq, pr->portno);
break;
}
strncpy(target, mreply->segs[i].target, targetlen);
r = xseg_set_req_data(peer->xseg, breq, pr);
if (r<0) {
vio->err = 1;
- xseg_put_request(peer->xseg, breq, peer->portno);
+ xseg_put_request(peer->xseg, breq, pr->portno);
break;
}
// this should work, right ?
breq->data = pr->req->data + pos;
pos += datalen;
- p = xseg_submit(peer->xseg, breq, peer->portno, X_ALLOC);
+ p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
if (p == NoPort){
void *dummy;
vio->err = 1;
xseg_get_req_data(peer->xseg, breq, &dummy);
- xseg_put_request(peer->xseg, breq, peer->portno);
+ xseg_put_request(peer->xseg, breq, pr->portno);
break;
}
r = xseg_signal(peer->xseg, p);
vio->breqs[i] = breq;
}
vio->breq_cnt = i;
- xseg_put_request(peer->xseg, vio->mreq, peer->portno);
+ xseg_put_request(peer->xseg, vio->mreq, pr->portno);
vio->mreq = NULL;
if (i == 0) {
printf("foo4\n");
//assert breq->serviced == breq->size
__sync_fetch_and_add(&pr->req->serviced, breq->serviced);
}
- xseg_put_request(peer->xseg, breq, peer->portno);
+ xseg_put_request(peer->xseg, breq, pr->portno);
if (!__sync_sub_and_fetch(&vio->breq_cnt, 1)) {
__set_vio_state(vio, CONCLUDED);
for (;;) {
accepted = NULL;
- accepted = xseg_accept(xseg, portno);
+ accepted = xseg_accept(xseg, portno, 0);
if (accepted) {
io->req = accepted;
wake_up_next_iothread(pfiled);
--- /dev/null
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <xseg/xseg.h>
+#include <pthread.h>
+#include <mpeer.h>
+#include <sys/syscall.h>
+#include <sys/time.h>
+#include <signal.h>
+
+#define REARRANGE(__fun_name__, __format__, ...) __format__ "%s", __fun_name__, ##__VA_ARGS__
+#define LOG(level, ...) \
+ do { \
+ if (level <= verbose) { \
+ fprintf(stderr, "%s: " REARRANGE( __func__ , ## __VA_ARGS__, "" )); \
+ } \
+ }while (0)
+
+
+unsigned int verbose = 0;
+struct log_ctx lc;
+
+struct thread {
+ struct peerd *peer;
+ pthread_t tid;
+ pthread_cond_t cond;
+ pthread_mutex_t lock;
+ void (*func)(void *arg);
+ void *arg;
+};
+
+
+inline int canDefer(struct peerd *peer)
+{
+ return !(peer->defer_portno == NoPort);
+}
+
+void print_req(struct xseg *xseg, struct xseg_request *req)
+{
+ char target[64], data[64];
+ char *req_target, *req_data;
+ unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
+ req_target = xseg_get_target(xseg, req);
+ req_data = xseg_get_data(xseg, req);
+
+ if (1) {
+ strncpy(target, req_target, end);
+ target[end] = 0;
+ strncpy(data, req_data, 63);
+ data[63] = 0;
+ printf("req id:%lu, op:%u %llu:%lu serviced: %lu, reqstate: %u\n"
+ "src: %u, st: %u, dst: %u dt: %u\n"
+ "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
+ (unsigned long)(req),
+ (unsigned int)req->op,
+ (unsigned long long)req->offset,
+ (unsigned long)req->size,
+ (unsigned long)req->serviced,
+ (unsigned int)req->state,
+ (unsigned int)req->src_portno,
+ (unsigned int)req->src_transit_portno,
+ (unsigned int)req->dst_portno,
+ (unsigned int)req->dst_transit_portno,
+ (unsigned int)req->targetlen, target,
+ (unsigned long long)req->datalen, data);
+ }
+}
+void log_pr(char *msg, struct peer_req *pr)
+{
+ char target[64], data[64];
+ char *req_target, *req_data;
+ struct peerd *peer = pr->peer;
+ struct xseg *xseg = pr->peer->xseg;
+ req_target = xseg_get_target(xseg, pr->req);
+ req_data = xseg_get_data(xseg, pr->req);
+ /* null terminate name in case of req->target is less than 63 characters,
+ * and next character after name (aka first byte of next buffer) is not
+ * null
+ */
+ unsigned int end = (pr->req->targetlen> 63) ? 63 : pr->req->targetlen;
+ if (verbose) {
+ strncpy(target, req_target, end);
+ target[end] = 0;
+ strncpy(data, req_data, 63);
+ data[63] = 0;
+ printf("%s: req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
+ "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
+ msg,
+ (unsigned int)(pr - peer->peer_reqs),
+ (unsigned int)pr->req->op,
+ (unsigned long long)pr->req->offset,
+ (unsigned long)pr->req->size,
+ (unsigned long)pr->req->serviced,
+ (unsigned long)pr->retval,
+ (unsigned int)pr->req->state,
+ (unsigned int)pr->req->targetlen, target,
+ (unsigned long long)pr->req->datalen, data);
+ }
+}
+
+inline struct peer_req *alloc_peer_req(struct peerd *peer)
+{
+ xqindex idx = xq_pop_head(&peer->free_reqs, 1);
+ if (idx == Noneidx)
+ return NULL;
+ return peer->peer_reqs + idx;
+}
+
+inline void free_peer_req(struct peerd *peer, struct peer_req *pr)
+{
+ xqindex idx = pr - peer->peer_reqs;
+ pr->req = NULL;
+ xq_append_head(&peer->free_reqs, idx, 1);
+}
+
+inline static struct thread* alloc_thread(struct peerd *peer)
+{
+ xqindex idx = xq_pop_head(&peer->threads, 1);
+ if (idx == Noneidx)
+ return NULL;
+ return peer->thread + idx;
+}
+
+inline static void free_thread(struct peerd *peer, struct thread *t)
+{
+ xqindex idx = t - peer->thread;
+ xq_append_head(&peer->threads, idx, 1);
+}
+
+
+inline static void __wake_up_thread(struct thread *t)
+{
+ pthread_mutex_lock(&t->lock);
+ pthread_cond_signal(&t->cond);
+ pthread_mutex_unlock(&t->lock);
+}
+
+inline static void wake_up_thread(struct thread* t)
+{
+ if (t){
+ __wake_up_thread(t);
+ }
+}
+
+inline static int wake_up_next_thread(struct peerd *peer)
+{
+ //struct thread *t = alloc_thread(peer);
+ //wake_up_thread(t);
+ //return t;
+ return (xseg_signal(peer->xseg, peer->portno_start));
+}
+
+struct timeval resp_start, resp_end, resp_accum = {0, 0};
+uint64_t responds = 0;
+void get_responds_stats(){
+ printf("Time waiting respond %lu.%06lu sec for %llu times.\n",
+ //(unsigned int)(t - peer->thread),
+ resp_accum.tv_sec, resp_accum.tv_usec, (long long unsigned int) responds);
+}
+
+//FIXME error check
+void fail(struct peerd *peer, struct peer_req *pr)
+{
+ struct xseg_request *req = pr->req;
+ uint32_t p;
+ XSEGLOG2(&lc, D, "failing req %u", (unsigned int) (pr - peer->peer_reqs));
+ req->state |= XS_FAILED;
+ //xseg_set_req_data(peer->xseg, pr->req, NULL);
+ p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+ xseg_signal(peer->xseg, p);
+ free_peer_req(peer, pr);
+ wake_up_next_thread(peer);
+}
+
+//FIXME error check
+void complete(struct peerd *peer, struct peer_req *pr)
+{
+ struct xseg_request *req = pr->req;
+ uint32_t p;
+ req->state |= XS_SERVED;
+ //xseg_set_req_data(peer->xseg, pr->req, NULL);
+ //gettimeofday(&resp_start, NULL);
+ p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
+ //gettimeofday(&resp_end, NULL);
+ //responds++;
+ //timersub(&resp_end, &resp_start, &resp_end);
+ //timeradd(&resp_end, &resp_accum, &resp_accum);
+ //printf("xseg_signal: %u\n", p);
+ xseg_signal(peer->xseg, p);
+ free_peer_req(peer, pr);
+ wake_up_next_thread(peer);
+}
+
+void pending(struct peerd *peer, struct peer_req *pr)
+{
+ pr->req->state = XS_PENDING;
+}
+
+static void handle_accepted(struct peerd *peer, struct peer_req *pr,
+ struct xseg_request *req)
+{
+ struct xseg_request *xreq = pr->req;
+ //assert xreq == req;
+ XSEGLOG2(&lc, D, "Handle accepted");
+ xreq->serviced = 0;
+ //xreq->state = XS_ACCEPTED;
+ pr->retval = 0;
+ dispatch(peer, pr, req);
+}
+
+static void handle_received(struct peerd *peer, struct peer_req *pr,
+ struct xseg_request *req)
+{
+ //struct xseg_request *req = pr->req;
+ //assert req->state != XS_ACCEPTED;
+ XSEGLOG2(&lc, D, "Handle received \n");
+ dispatch(peer, pr, req);
+
+}
+struct timeval sub_start, sub_end, sub_accum = {0, 0};
+uint64_t submits = 0;
+void get_submits_stats(){
+ printf("Time waiting submit %lu.%06lu sec for %llu times.\n",
+ //(unsigned int)(t - peer->thread),
+ sub_accum.tv_sec, sub_accum.tv_usec, (long long unsigned int) submits);
+}
+
+int submit_peer_req(struct peerd *peer, struct peer_req *pr)
+{
+ uint32_t ret;
+ struct xseg_request *req = pr->req;
+ // assert req->portno == peer->portno ?
+ //TODO small function with error checking
+ XSEGLOG2 (&lc, D, "submitting peer req %u\n", (unsigned int)(pr - peer->peer_reqs));
+ ret = xseg_set_req_data(peer->xseg, req, (void *)(pr));
+ if (ret < 0)
+ return -1;
+ //printf("pr: %x , req_data: %x \n", pr, xseg_get_req_data(peer->xseg, req));
+ //gettimeofday(&sub_start, NULL);
+ ret = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
+ //gettimeofday(&sub_end, NULL);
+ //submits++;
+ //timersub(&sub_end, &sub_start, &sub_end);
+ //timeradd(&sub_end, &sub_accum, &sub_accum);
+ if (ret == NoPort)
+ return -1;
+ xseg_signal(peer->xseg, ret);
+ return 0;
+}
+
+int thread_execute(struct peerd *peer, void (*func)(void *arg), void *arg)
+{
+ struct thread *t = alloc_thread(peer);
+ if (t) {
+ t->func = func;
+ t->arg = arg;
+ wake_up_thread(t);
+ return 0;
+ } else
+ // we could hijack a thread
+ return -1;
+}
+
+static void* thread_loop(void *arg)
+{
+ struct thread *t = (struct thread *) arg;
+ struct peerd *peer = t->peer;
+ struct xseg *xseg = peer->xseg;
+ xport portno_start = peer->portno_start;
+ xport portno_end = peer->portno_end;
+ struct peer_req *pr;
+ uint64_t threshold=1;
+ pid_t pid =syscall(SYS_gettid);
+ uint64_t loops;
+ struct xseg_request *accepted, *received;
+ int r;
+ int change;
+ xport i;
+
+ XSEGLOG2(&lc, D, "thread %u\n", (unsigned int) (t- peer->thread));
+
+ XSEGLOG2(&lc, I, "Thread %u has tid %u.\n", (unsigned int) (t- peer->thread), pid);
+ xseg_init_local_signal(xseg, peer->portno_start);
+ for (;;) {
+ if (t->func) {
+ XSEGLOG2(&lc, D, "Thread %u executes function\n", (unsigned int) (t- peer->thread));
+ xseg_cancel_wait(xseg, peer->portno_start);
+ t->func(t->arg);
+ t->func = NULL;
+ t->arg = NULL;
+ continue;
+ }
+
+ for(loops= threshold; loops > 0; loops--) {
+ do {
+ if (loops == 1)
+ xseg_prepare_wait(xseg, peer->portno_start);
+ change = 0;
+ for (i = portno_start; i <= portno_end; i++) {
+ accepted = NULL;
+ received = NULL;
+ pr = alloc_peer_req(peer);
+ if (pr) {
+ accepted = xseg_accept(xseg, i, X_NONBLOCK);
+ if (accepted) {
+ XSEGLOG2(&lc, D, "Thread %u accepted\n", (unsigned int) (t- peer->thread));
+ pr->req = accepted;
+ pr->portno = i;
+ xseg_cancel_wait(xseg, i);
+ handle_accepted(peer, pr, accepted);
+ change = 1;
+ }
+ else {
+ free_peer_req(peer, pr);
+ }
+ }
+ received = xseg_receive(xseg, i, X_NONBLOCK);
+ if (received) {
+ //printf("received req id: %u\n", received - xseg->requests);
+ //print_req(peer->xseg, received);
+ r = xseg_get_req_data(xseg, received, (void **) &pr);
+ if (r < 0 || !pr){
+ //FIXME what to do here ?
+ XSEGLOG2(&lc, W, "Received request with no pr data\n");
+ xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
+ //FIXME if fails, put req
+ }
+ xseg_cancel_wait(xseg, i);
+ handle_received(peer, pr, received);
+ change = 1;
+ }
+ }
+ if (change)
+ loops = threshold;
+ }while (change);
+ }
+ XSEGLOG2(&lc, I, "Thread %u goes to sleep\n", (unsigned int) (t- peer->thread));
+ xseg_wait_signal(xseg, 10000000UL);
+ xseg_cancel_wait(xseg, peer->portno_start);
+ XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread));
+ }
+ return NULL;
+}
+
+void defer_request(struct peerd *peer, struct peer_req *pr)
+{
+ // assert canDefer(peer);
+// xseg_submit(peer->xseg, peer->defer_portno, pr->req);
+// xseg_signal(peer->xseg, peer->defer_portno);
+// free_peer_req(peer, pr);
+}
+
+static int peerd_loop(struct peerd *peer)
+{
+ if (peer->interactive_func)
+ peer->interactive_func();
+ for (;;) {
+ pthread_join(peer->thread[0].tid, NULL);
+ }
+ return 0;
+}
+
+static struct xseg *join(char *spec)
+{
+ struct xseg_config config;
+ struct xseg *xseg;
+
+ (void)xseg_parse_spec(spec, &config);
+ xseg = xseg_join(config.type, config.name, "posix", NULL);
+ if (xseg)
+ return xseg;
+
+ (void)xseg_create(&config);
+ return xseg_join(config.type, config.name, "posix", NULL);
+}
+
+int peerd_start_threads(struct peerd *peer)
+{
+ int i;
+ uint32_t nr_threads = peer->nr_threads;
+ //TODO err check
+ for (i = 0; i < nr_threads; i++) {
+ peer->thread[i].peer = peer;
+ pthread_cond_init(&peer->thread[i].cond,NULL);
+ pthread_mutex_init(&peer->thread[i].lock, NULL);
+ pthread_create(&peer->thread[i].tid, NULL, thread_loop, (void *)(peer->thread + i));
+ peer->thread[i].func = NULL;
+ peer->thread[i].arg = NULL;
+
+ }
+ return 0;
+}
+
+static struct peerd* peerd_init(uint32_t nr_ops, char* spec, long portno_start,
+ long portno_end, uint32_t nr_threads, uint32_t defer_portno)
+{
+ int i;
+ struct peerd *peer;
+ peer = malloc(sizeof(struct peerd));
+ if (!peer) {
+ perror("malloc");
+ return NULL;
+ }
+ peer->nr_ops = nr_ops;
+ peer->defer_portno = defer_portno;
+ peer->nr_threads = nr_threads;
+
+ peer->thread = calloc(nr_threads, sizeof(struct thread));
+ if (!peer->thread)
+ goto malloc_fail;
+ peer->peer_reqs = calloc(nr_ops, sizeof(struct peer_req));
+ if (!peer->peer_reqs){
+malloc_fail:
+ perror("malloc");
+ return NULL;
+ }
+
+ if (!xq_alloc_seq(&peer->free_reqs, nr_ops, nr_ops))
+ goto malloc_fail;
+ if (!xq_alloc_empty(&peer->threads, nr_threads))
+ goto malloc_fail;
+
+ if (xseg_initialize()){
+ printf("cannot initialize library\n");
+ return NULL;
+ }
+ peer->xseg = join(spec);
+ if (!peer->xseg)
+ return NULL;
+
+ peer->portno_start = (xport) portno_start;
+ peer->portno_end= (xport) portno_end;
+ peer->port = xseg_bind_port(peer->xseg, peer->portno_start, NULL);
+ if (!peer->port){
+ printf("cannot bind to port %ld\n", peer->portno_start);
+ return NULL;
+ }
+
+ xport p;
+ for (p = peer->portno_start + 1; p <= peer->portno_end; p++) {
+ struct xseg_port *tmp;
+ tmp = xseg_bind_port(peer->xseg, p, (void *)xseg_get_signal_desc(peer->xseg, peer->port));
+ if (!tmp){
+ printf("cannot bind to port %ld\n", p);
+ return NULL;
+ }
+ }
+
+ printf("Peer on ports %u-%u\n", peer->portno_start,
+ peer->portno_end);
+
+ for (i = 0; i < nr_ops; i++) {
+ peer->peer_reqs[i].peer = peer;
+ peer->peer_reqs[i].req = NULL;
+ peer->peer_reqs[i].retval = 0;
+ peer->peer_reqs[i].priv = NULL;
+ }
+ peer->interactive_func = NULL;
+ return peer;
+}
+
+
+int main(int argc, char *argv[])
+{
+ struct peerd *peer = NULL;
+ //parse args
+ char *spec = "";
+ int i, r;
+ long portno_start = -1, portno_end = -1;
+ //set defaults here
+ uint32_t nr_ops = 16;
+ uint32_t nr_threads = 1 ;
+ unsigned int debug_level = 0;
+ uint32_t defer_portno = NoPort;
+ char *logfile = NULL;
+
+ //capture here -g spec, -n nr_ops, -p portno, -t nr_threads -v verbose level
+ // -dp xseg_portno to defer blocking requests
+ // -l log file ?
+ //TODO print messages on arg parsing error
+
+ for (i = 1; i < argc; i++) {
+ if (!strcmp(argv[i], "-g") && i + 1 < argc) {
+ spec = argv[i+1];
+ i += 1;
+ continue;
+ }
+
+ if (!strcmp(argv[i], "-sp") && i + 1 < argc) {
+ portno_start = strtoul(argv[i+1], NULL, 10);
+ i += 1;
+ continue;
+ }
+
+ if (!strcmp(argv[i], "-ep") && i + 1 < argc) {
+ portno_end = strtoul(argv[i+1], NULL, 10);
+ i += 1;
+ continue;
+ }
+
+ if (!strcmp(argv[i], "-n") && i + 1 < argc) {
+ nr_ops = strtoul(argv[i+1], NULL, 10);
+ i += 1;
+ continue;
+ }
+ if (!strcmp(argv[i], "-v") && i + 1 < argc ) {
+ debug_level = atoi(argv[i+1]);
+ i += 1;
+ continue;
+ }
+ if (!strcmp(argv[i], "-dp") && i + 1 < argc ) {
+ defer_portno = strtoul(argv[i+1], NULL, 10);
+ i += 1;
+ continue;
+ }
+ if (!strcmp(argv[i], "-l") && i + 1 < argc ) {
+ logfile = argv[i+1];
+ i += 1;
+ continue;
+ }
+
+ }
+ init_logctx(&lc, argv[0], debug_level, logfile);
+ XSEGLOG2(&lc, D, "Main thread has tid %ld.\n", syscall(SYS_gettid));
+
+ //TODO perform argument sanity checks
+ verbose = debug_level;
+
+ //TODO err check
+ peer = peerd_init(nr_ops, spec, portno_start, portno_end, nr_threads, defer_portno);
+ if (!peer)
+ return -1;
+ r = custom_peer_init(peer, argc, argv);
+ if (r < 0)
+ return -1;
+ peerd_start_threads(peer);
+ return peerd_loop(peer);
+}
struct xseg_request *rec;
xseg_prepare_wait(xseg, srcport);
while(1) {
- rec = xseg_receive(xseg, srcport);
+ rec = xseg_receive(xseg, srcport, 0);
if (rec) {
if (rec != expected_req) {
fprintf(stderr, "Unknown received req. Putting req.\n");
active = 0;
//FIXME
- req = xseg_accept(xseg, portno1);
+ req = xseg_accept(xseg, portno1, 0);
if (req) {
xseg_submit(xseg, req, portno2, X_ALLOC);
log_req(logfd, portno1, portno2, LOG_ACCEPT, method, req);
active += 1;
}
- req = xseg_accept(xseg, portno2);
+ req = xseg_accept(xseg, portno2, 0);
if (req) {
xseg_submit(xseg, req, portno1, X_ALLOC);
log_req(logfd, portno2, portno1, LOG_ACCEPT, method, req);
active += 1;
}
- req = xseg_receive(xseg, portno1);
+ req = xseg_receive(xseg, portno1, 0);
if (req) {
xseg_respond(xseg, req, portno2, X_ALLOC);
log_req(logfd, portno1, portno2, LOG_RECEIVE, method, req);
active += 1;
}
- req = xseg_receive(xseg, portno2);
+ req = xseg_receive(xseg, portno2, 0);
if (req) {
xseg_respond(xseg, req, portno1, X_ALLOC);
log_req(logfd, portno2, portno1, LOG_RECEIVE, method, req);
}
}
- received = xseg_receive(xseg, srcport);
+ received = xseg_receive(xseg, srcport, 0);
if (received) {
xseg_cancel_wait(xseg, srcport);
nr_received += 1;
}
}
- received = xseg_receive(xseg, srcport);
+ received = xseg_receive(xseg, srcport, 0);
if (received) {
xseg_cancel_wait(xseg, srcport);
nr_received += 1;
perror("Cannot signal peer");
}
}
- received = xseg_receive(xseg, srcport);
+ received = xseg_receive(xseg, srcport, 0);
if (received) {
xseg_cancel_wait(xseg, srcport);
--nr_flying;
struct xseg_request *req;
for (;;) {
- req = xseg_accept(xseg, dstport);
+ req = xseg_accept(xseg, dstport, 0);
if (!req)
break;
if (xseg_put_request(xseg, req, srcport))
for (; nr--;) {
xseg_prepare_wait(xseg, srcport);
- req = xseg_accept(xseg, srcport);
+ req = xseg_accept(xseg, srcport, 0);
if (req) {
req_target = xseg_get_target(xseg, req);
req_data = xseg_get_data(xseg, req);
init_local_signal();
for (;;) {
- req = xseg_receive(xseg, srcport);
+ req = xseg_receive(xseg, srcport, 0);
if (req) {
handle_reply(req);
nr--;
struct xseg_request *req;
for (;;) {
- req = xseg_receive(xseg, dstport);
+ req = xseg_receive(xseg, dstport, 0);
if (!req)
break;
fprintf(stderr, "request: %08llx%08llx\n"
} while(0)
/* general purpose xflags */
-#define X_ALLOC ((uint32_t) (1 << 0))
-#define X_LOCAL ((uint32_t) (1 << 1))
+#define X_ALLOC ((uint32_t) (1 << 0))
+#define X_LOCAL ((uint32_t) (1 << 1))
+#define X_NONBLOCK ((uint32_t) (1 << 2))
typedef uint64_t xpointer;
port = prev + 1
fd = os.open(XSEGBD_SYSFS + "add", os.O_WRONLY)
- os.write(fd, "%s %d:%d:%d" % (name, port, VPORT, REQS))
+ os.write(fd, "%s %d:%d:%d" % (name, port, port + VPORT_START-4, REQS))
os.close(fd)
except Exception, reason:
print >> sys.stderr, reason
}
-struct xseg_request *xseg_receive(struct xseg *xseg, xport portno)
+struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags)
{
xqindex xqi;
xserial serial = NoSerial;
if (!port)
return NULL;
retry:
- xlock_acquire(&port->pq_lock, portno);
+ if (flags & X_NONBLOCK) {
+ if (!xlock_try_lock(&port->pq_lock, portno))
+ return NULL;
+ } else {
+ xlock_acquire(&port->pq_lock, portno);
+ }
q = XPTR_TAKE(port->reply_queue, xseg->segment);
xqi = __xq_pop_head(q);
xlock_release(&port->pq_lock);
return req;
}
-struct xseg_request *xseg_accept(struct xseg *xseg, xport portno)
+struct xseg_request *xseg_accept(struct xseg *xseg, xport portno, uint32_t flags)
{
xqindex xqi;
struct xq *q;
struct xseg_port *port = xseg_get_port(xseg, portno);
if (!port)
return NULL;
- xlock_acquire(&port->rq_lock, portno);
+ if (flags & X_NONBLOCK) {
+ if (!xlock_try_lock(&port->rq_lock, portno))
+ return NULL;
+ } else {
+ xlock_acquire(&port->rq_lock, portno);
+ }
+
q = XPTR_TAKE(port->request_queue, xseg->segment);
xqi = __xq_pop_head(q);
xlock_release(&port->rq_lock);
uint32_t flags );
struct xseg_request * xseg_receive ( struct xseg * xseg,
- xport portno );
+ xport portno,
+ uint32_t flags );
/* \___________________/ \_________/ */
/* ___________________ _________ */
/* / \ / \ */
struct xseg_request * xseg_accept ( struct xseg * xseg,
- xport portno );
+ xport portno,
+ uint32_t flags );
xport xseg_respond ( struct xseg * xseg,
struct xseg_request * xreq,