#include <xseg/xseg.h>
#include <pthread.h>
+#include <xseg/protocol.h>
+
#include "common.h" /* Please fix me */
#define MAX_PATH_SIZE 255
#define MAX_FILENAME_SIZE 255
-#define DEFAULT_NR_OPS 16
+#define DEFAULT_NR_OPS 128
+
+#define VLMCD_SANITY_CHECKS 1
/*
* Globals, holding command-line arguments
*/
-long cmdline_vport = -1;
-long cmdline_mport = -1;
-long cmdline_bport = -1;
+long cmdline_vportno = -1;
+long cmdline_mportno = -1;
+long cmdline_bportno = -1;
+char *cmdline_xseg_spec = NULL;
long cmdline_nr_ops = DEFAULT_NR_OPS;
/*
/* FIXME: is it really vlmcd-specific? */
enum io_state_enum {
ACCEPTED = 0,
- PENDING_MAPPER_REPLY = 1,
- PENDING_BLOCKD_REPLY = 2,
+ MAPPING = 1,
+ SERVING = 2,
CONCLUDED = 3
};
struct io {
enum io_state_enum state;
- struct xseg_request *accepted_req;
- struct xseg_request *mapper_req;
-
- /* FIXME */
+ struct xseg_request *vreq;
+ struct xseg_request *mreq;
+ struct xseg_request **breqs;
+ int breqs_len, breq_cnt;
+};
+
+struct vlmcd {
+ struct xseg *xseg;
+ struct xseg_port *vport;
+ uint32_t vportno, mportno, bportno;
+
+ int flying;
+ long nr_ops;
+ struct xq free_ops;
+
+ struct xq free_ios;
+ struct io *ios;
+
};
+static inline struct io *__io_from_idx(struct vlmcd *vlmcd, xqindex idx)
+{
+ if (idx >= vlmcd->nr_ops) {
+ perr(PE, 0, "Internal error: called with idx = %ld > %ld",
+ (long)idx, vlmcd->nr_ops);
+ return NULL;
+ }
+
+ return &vlmcd->ios[idx];
+}
+
+static inline xqindex __idx_from_io(struct vlmcd *vlmcd, struct io *io)
+{
+ long idx = io - vlmcd->ios;
+
+ if (idx < 0 || idx >= vlmcd->nr_ops) {
+ perr(PE, 0, "Internal error: called with io = %p, idx = %ld, "
+ "nr_ops = %ld",
+ (void *)io, (long)(io - vlmcd->ios), vlmcd->nr_ops);
+ return Noneidx;
+ }
+
+ return idx;
+}
+
+static inline struct io *alloc_io(struct vlmcd *vlmcd)
+{
+ xqindex idx = xq_pop_head(&vlmcd->free_ios);
+ if (idx == Noneidx)
+ return NULL;
+ ++vlmcd->flying;
+// perr(PI, 0, "alloc'd io %p, in-flight reqs: %d", (void *)&vlmcd->ios[idx], vlmcd->flying);
+ return &vlmcd->ios[idx];
+}
+
+static inline void free_io(struct vlmcd *vlmcd, struct io *io)
+{
+ /* FIXME: what if xq_append_head() fails? */
+ xq_append_head(&vlmcd->free_ios, __idx_from_io(vlmcd, io));
+ --vlmcd->flying;
+}
+
+static int complete(struct vlmcd *vlmcd, struct io *io)
+{
+ int ret;
+
+ io->vreq->state |= XS_SERVED;
+// perr(PI, 0, "completed io %p", (void *)io);
+ ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
+ always_assert(ret != NoSerial);
+ ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
+ always_assert(ret == 0);
+
+ return 0;
+}
+
static int usage(char *argv0)
{
fprintf(stderr,
"Usage: %s [-p VLMCD_PORT] [-m MAPPERD_PORT]"
- "[-b BLOCKD_PORT] [-g XSEG_SPEC] [-n NR_OPS]\n\n"
+ "[-b BLOCKD_POART] [-g XSEG_SPEC] [-n NR_OPS]\n\n"
"where:\n"
"\tVLMCD_PORT: xseg port to listen for requests on\n"
"\tMAPPERD_PORT: xseg port where the mapper lives\n"
"\tBLOCKD_PORT: xseg port where blockd/filed/sosd lives\n"
- "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
+ "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
"request_size:extra_size:page_shift'\n"
"\tNR_OPS: number of outstanding xseg requests\n",
argv0);
int c;
opterr = 0;
- c = getopt(argc, argv, "+:hp:m:b:n:");
+ c = getopt(argc, argv, "+:hp:m:b:n:g:");
if (c == -1)
break;
exit(0);
break;
case 'p':
- cmdline_vport = safe_atoi(optarg);
+ cmdline_vportno = safe_atoi(optarg);
break;
case 'm':
- cmdline_mport = safe_atoi(optarg);
+ cmdline_mportno = safe_atoi(optarg);
break;
case 'b':
- cmdline_bport = safe_atoi(optarg);
+ cmdline_bportno = safe_atoi(optarg);
break;
case 'n':
cmdline_nr_ops = safe_atoi(optarg);
break;
+ case 'g':
+ /* FIXME: Max length of spec? strdup, eww */
+ cmdline_xseg_spec = strdup(optarg);
+ if (!cmdline_xseg_spec)
+ perr(PFE, 0, "out of memory");
+ break;
}
}
argv += optind;
/* Sanity check for all arguments */
- if (cmdline_vport < 0)
+ if (cmdline_vportno < 0)
perr(PFE, 0, "no or invalid port specified for vlmcd");
- if (cmdline_mport < 0)
+ if (cmdline_mportno < 0)
perr(PFE, 0, "no or invalid port specified for mapperd");
- if (cmdline_bport < 0)
+ if (cmdline_bportno < 0)
perr(PFE, 0, "no or invalid port specified for blockd/filed/sosd");
if (cmdline_nr_ops < 1)
perr(PFE, 0, "specified outstanding request count is invalid");
+ if (!cmdline_xseg_spec)
+ perr(PFE, 0, "xseg specification is mandatory");
if (argc)
perr(PFE, 0, "Non-option arguments specified on command line");
}
-static struct xseg *join(char *spec)
+static struct xseg *join_or_create(char *spec)
{
struct xseg_config config;
struct xseg *xseg;
(void)xseg_parse_spec(spec, &config);
- xseg = xseg_join(config.type, config.name);
+ xseg = xseg_join(config.type, config.name, "posix", NULL);
if (xseg)
return xseg;
(void)xseg_create(&config);
- return xseg_join(config.type, config.name);
+ return xseg_join(config.type, config.name, "posix", NULL);
}
-static int dispatch(struct io *io)
+/*
+ * FIXME: What happens if this function fails?
+ * FIXME: How does this function fail? Do we return values from <errno.h>
+ * FIXME: Error reporting: Who prints errors, who prints errno?
+ */
+static int dispatch(struct vlmcd *vlmcd, struct io *io, struct xseg_request *xreq)
{
+ struct xseg *xseg;
+ uint32_t vportno;
+ int i, ret;
+ uint64_t pos;
+
+ always_assert(vlmcd);
+ always_assert(io);
+ xseg = vlmcd->xseg;
+ always_assert(xseg);
+ vportno = vlmcd->vportno;
+
/* FIXME: Arguments, sanity checks on them? */
switch (io->state) {
case ACCEPTED:
/*
- * Just accepted a new request,
- * construct and submit a request to
- * the mapper port.
+ * Step 1: Issue a request to the mapper.
*/
- /* FIXME: Do I need to lock io? */
- req_mapper = FIXME;
- xseg_submit(FIXME);
- io->state = PENDING_MAPPER_REPLY;
+ /* FIXME: xseglog(), strerror(), etc */
+ /* FIXME: xreq->target a pointer?! why not a field, like * xreq->op? */
+ always_assert(io->vreq == xreq);
+ io->vreq->serviced = 0;
+ io->mreq = xseg_get_request(xseg, vportno);
+ always_assert(io->mreq);
+ /*
+ * FIXME:
+ * We only care about the length of the target name
+ * and hope the mapper reply fits in the remaining datalen
+ * bytes.
+ */
+ ret = xseg_prep_request(io->mreq, io->vreq->targetlen,
+ io->mreq->bufferlen - io->vreq->targetlen);
+ always_assert(ret == 0);
+
+ struct xseg_request *m = io->mreq;
+ strncpy(m->target, io->vreq->target, m->targetlen);
+ m->size = io->vreq->size;
+ m->offset = io->vreq->offset;
+ m->flags = 0;
+ m->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
+ switch (io->vreq->op) {
+ case X_READ: m->op = X_MAPR; break;
+ case X_WRITE: m->op = X_MAPW; break;
+ case X_INFO: m->op = X_INFO; break;
+ default:
+ perr(PFE, 0, "Internal error? io->vreq->op = "
+ "%d\n", io->vreq->op);
+ }
+ if (m->op == X_INFO) {
+ ret = xseg_submit(xseg, vlmcd->bportno, io->mreq);
+ always_assert(ret != NoSerial);
+ always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
+ }
+ else {
+ ret = xseg_submit(xseg, vlmcd->mportno, io->mreq);
+ always_assert(ret != NoSerial);
+ always_assert(xseg_signal(xseg, vlmcd->mportno) == 0);
+ }
+
+ io->state = MAPPING;
break;
- case PENDING_MAPPER_REPLY:
+ case MAPPING:
/*
- * Now that we know where the requested
- * volume block maps, use the blocker to fetch it.
+ * Step 2. Issue block requests, one per segment
+ * in the reply from the mapper.
*/
- xseg_free_request(req_mapper);
- req_blocker = FIXME;
- xseg_submit(FIXME);
- io->state = PENDING_BLOCKER_REPLY;
+ /* FIXME */
+ /* For every mapped segment, issue a request to blockd */
+ /* FIXME: what if we run out of xseg requests? */
+ always_assert(xreq == io->mreq);
+ always_assert(!(xreq->state & XS_FAILED) && xreq->state & XS_SERVED); /* FIXME: This is too harsh */
+ if (xreq->op == X_INFO) {
+ *(off_t *)io->vreq->data = *(off_t *)io->mreq->data;
+ io->vreq->state |= XS_SERVED;
+
+ ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
+ always_assert(ret != NoSerial);
+ ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
+ always_assert(ret == 0);
+ io->state = CONCLUDED;
+ always_assert(xseg_put_request(xseg, vportno, io->mreq) != NoSerial);
+ free_io(vlmcd, io);
+ } else {
+ struct xseg_reply_map *mreply = (void *)io->mreq->data;
+ always_assert(mreply->cnt > 0);
+ //perr(PE, 0, "%llu %llu %llu mreply->target = %d\n", mreply->cnt, mreply->segs[0].size, mreply->segs[0].offset, mreply->segs[0].target[0]);
+
+ io->breqs_len = mreply->cnt;
+ io->breqs = calloc(io->breqs_len, sizeof(struct xseg_request *));
+ always_assert(io->breqs);
+ for (i = 0, pos = 0; i < mreply->cnt; i++) {
+ uint64_t datalen, offset, targetlen;
+ struct xseg_request *breq;
+
+ datalen = mreply->segs[i].size;
+ offset = mreply->segs[i].offset;
+ targetlen = strlen(mreply->segs[i].target);
+
+ breq = xseg_get_request(xseg, vportno);
+ always_assert(breq);
+ always_assert(datalen + targetlen <= breq->bufferlen);
+
+ ret = xseg_prep_request(breq, targetlen, datalen);
+ breq->datalen = datalen;
+ breq->offset = offset;
+ breq->size = datalen;
+ breq->op = io->vreq->op;
+ breq->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
+ strncpy(breq->target, mreply->segs[i].target, targetlen);
+ /*
+ * Get the blocker to place data directly into vreq's
+ * buffer. FIXME: Manipulate ->data by hand?
+ */
+ breq->data = io->vreq->data + pos;
+ pos += datalen;
+
+ ret = xseg_submit(xseg, vlmcd->bportno, breq);
+ always_assert(ret != NoSerial);
+ /* possible race? */
+ io->breqs[i] = breq;
+ always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
+ }
+ io->breq_cnt = i;
+ ret = xseg_put_request(xseg, vportno, io->mreq);
+ always_assert(ret == 0);
+
+ io->state = SERVING;
+ }
break;
- case PENDING_BLOCKER_REPLY:
+ case SERVING:
/*
- * Now that the block is in place,
- * complete the original request.
+ * One of the breqs has been completed.
+ * Update io and vreq counters, complete vreq when
+ * all of the data have arrived.
*/
- complete(io); /* FIXME: bogus */
+#if VLMCD_SANITY_CHECKS
+ for (i = 0; i < io->breqs_len; i++)
+ if (io->breqs[i] == xreq)
+ break;
+ if (i >= io->breqs_len) {
+ perr(PE, 0, "Called for xreq = %p, not belonging to io %p",
+ (void *)xreq, (void *)io);
+ always_assert(0);
+ /* FIXME: how do I handle this? */
+ }
+#endif
+ struct xseg_request *breq = xreq;
+ always_assert(!(breq->state & XS_FAILED) && breq->state & XS_SERVED);
+ always_assert(breq->serviced == breq->size);
+ io->vreq->serviced += breq->serviced;
+ ret = xseg_put_request(xseg, vportno, breq);
+ always_assert(ret == 0);
+
+ if (--io->breq_cnt == 0) {
+ always_assert(io->vreq->serviced == io->vreq->datalen);
+ complete(vlmcd, io);
+ io->state = CONCLUDED;
+ free_io(vlmcd, io);
+ }
+ break;
+ case CONCLUDED:
+ perr(PFE, 0, "Internal error, called for CONCLUDED");
+ break;
+ default:
+ perr(PFE, 0, "Internal error, io->state = %d\n", io->state);
}
return 0;
}
-static int vlmcd_loop(FIXME)
+static int vlmcd_loop(struct vlmcd *vlmcd)
{
- /* Create nr_ops threads */
- /* Have all threads call xseg_accept() */
+ int ret;
+ struct io *io;
+ struct xseg_request *xreq;
+ struct xseg *xseg = vlmcd->xseg;
+ uint32_t vportno = vlmcd->vportno;
+
+ always_assert(xseg);
for (;;) {
- prepare_wait(xseg, portno);
+ ret = xseg_prepare_wait(xseg, vportno);
+ always_assert(ret == 0);
- /* only accept a new request if there's room */
io = NULL;
- if (remaining) {
- if (xseg_accept()) {
- /* allocate a new pending io structure */
- io = alloc_io();
- req->priv = io;
+ /*
+ * Accept requests from xseg if under the nr_ops limit,
+ * and check if any replies have been received.
+ *
+ * Use ->priv for tracking, retrieve the relevant io struct
+ * we reply upon our peers to not have touched -> priv
+ */
+ if (vlmcd->flying < vlmcd->nr_ops &&
+ (xreq = xseg_accept(xseg, vportno))) {
+ io = alloc_io(vlmcd);
+ io->vreq = xreq;
+ io->state = ACCEPTED;
+ } else {
+ xreq = xseg_receive(xseg, vportno);
+ if (xreq) {
+ io = __io_from_idx(vlmcd, xreq->priv);
+ always_assert(io);
+ always_assert(io->state != CONCLUDED);
}
}
- /* has a reply arrived? */
- if (xseg_receive()) {
- io = req->priv;
- }
-
/* io is the pending io currently being processed */
if (io) {
- cancel_wait(xseg, portno);
- dispatch(io);
+ /* FIXME: WHY cancel_wait() anyway? */
+ ret = xseg_cancel_wait(xseg, vportno);
+ always_assert(ret == 0);
+ dispatch(vlmcd, io, xreq);
} else {
/*
- * FIXME: I don't want a timeout.
* If things are OK, no timeout should ever be needed.
* Otherwise, it's a vlmcd or xseg bug.
+ * FIXME: sigtimedwait() with zero-valued timeout?
+ * FAIL.
*/
- xseg_wait_signal(xseg, portno, 0);
+ xseg_wait_signal(xseg, 100000UL);
}
}
return 0;
}
-static int vlmcd(FIXME)
+/*
+ * FIXME: Initialize the vlmcd struct based on cmdline_* vars
+ */
+static int vlmcd_init(struct vlmcd *vlmcd)
{
- struct xseg *xseg;
+ int ret;
+
+ vlmcd->vportno = cmdline_vportno;
+ vlmcd->mportno = cmdline_mportno;
+ vlmcd->bportno = cmdline_bportno;
+
+ vlmcd->flying = 0;
+ vlmcd->nr_ops = cmdline_nr_ops;
+ vlmcd->ios = calloc(vlmcd->nr_ops, sizeof(struct io));
+ if (!vlmcd->ios) {
+ perr(PE, 0, "could not allocate memory [ios]");
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ /* FIXME: meaning of arguments to xq_alloc_seq()? */
+ if (!xq_alloc_seq(&vlmcd->free_ios, cmdline_nr_ops, cmdline_nr_ops)) {
+ perr(PE, 0, "could not allocate memory [free_ios]");
+ ret = -ENOMEM;
+ goto out_with_ios;
+ }
/* FIXME: If xseg library fails, is errno set? */
- if (xseg_initialize("posix")) {
+ if (xseg_initialize()) {
perr(PE, 0, "could not initialize xseg library");
- return -1;
+ ret = -EIO;
+ goto out_with_freeios;
}
- if (! (xseg = join(spec))) {
+ if (! (vlmcd->xseg = join_or_create(cmdline_xseg_spec))) {
perr(PE, 0, "could not join or create xseg with spec '%s'\n",
- spec);
- return -1;
+ cmdline_xseg_spec);
+ ret = -EIO;
+ goto out_with_xseginit;
}
- if (! (xport = xseg_bind_port(xseg, portno))) {
- perr(PE, 0, "cannot bind to xseg port %ld", portno);
- return 1;
+ if (! (vlmcd->vport = xseg_bind_port(vlmcd->xseg, vlmcd->vportno))) {
+ perr(PE, 0, "cannot bind to xseg port %ld", (long)vlmcd->vportno);
+ ret = -EIO;
+ goto out_with_xsegjoin;
+ }
- portno = xseg_portno(xseg, xport);
+ vlmcd->vportno = xseg_portno(vlmcd->xseg, vlmcd->vport);
perr(PI, 0, "vlmcd on port %u of %u",
- portno, xseg->config.nr_ports);
-
- return vlmcd_loop(FIXME);
+ vlmcd->vportno, vlmcd->xseg->config.nr_ports);
+
+ ret = 0;
+ goto out;
+
+out_with_xsegjoin:
+ xseg_leave(vlmcd->xseg);
+out_with_xseginit:
+ always_assert(xseg_finalize() == 0);
+out_with_freeios:
+ xq_free(&vlmcd->free_ios);
+out_with_ios:
+ free(vlmcd->ios);
+out:
+ return ret;
}
int main(int argc, char *argv[])
{
+ struct vlmcd vlmc;
+
init_perr("vlmcd");
parse_cmdline(argc, argv);
perr(PI, 0, "v = %ld, m = %ld, b = %ld, nr_ops = %lu\n",
- cmdline_vport, cmdline_mport, cmdline_bport, cmdline_nr_ops);
+ cmdline_vportno, cmdline_mportno, cmdline_bportno, cmdline_nr_ops);
- return 0;
-#if 0
- if (nr_ops <= 0)
- nr_ops = 16;
+ if (vlmcd_init(&vlmc) < 0)
+ perr(PFE, 0, "failed to initialize vlmcd");
- return filed(path, size, nr_ops, spec, portno);
-#endif
+ return vlmcd_loop(&vlmc);
}
-