17 #include <xseg/xseg.h>
20 #include <xseg/protocol.h>
22 #include "common.h" /* Please fix me */
24 #define MAX_PATH_SIZE 255
25 #define MAX_FILENAME_SIZE 255
27 #define DEFAULT_NR_OPS 128
29 #define VLMCD_SANITY_CHECKS 1
32 * Globals, holding command-line arguments
34 long cmdline_vportno = -1;
35 long cmdline_mportno = -1;
36 long cmdline_bportno = -1;
37 char *cmdline_xseg_spec = NULL;
38 long cmdline_nr_ops = DEFAULT_NR_OPS;
41 * vlmcd-specific structure,
42 * containing information on a pending I/O operation
44 /* FIXME: XS_CONCLUDED equals XS_SERVING? */
45 /* FIXME: is it really vlmcd-specific? */
54 enum io_state_enum state;
55 struct xseg_request *vreq;
56 struct xseg_request *mreq;
57 struct xseg_request **breqs;
58 int breqs_len, breq_cnt;
63 struct xseg_port *vport;
64 uint32_t vportno, mportno, bportno;
75 static inline struct io *__io_from_idx(struct vlmcd *vlmcd, xqindex idx)
77 if (idx >= vlmcd->nr_ops) {
78 perr(PE, 0, "Internal error: called with idx = %ld > %ld",
79 (long)idx, vlmcd->nr_ops);
83 return &vlmcd->ios[idx];
86 static inline xqindex __idx_from_io(struct vlmcd *vlmcd, struct io *io)
88 long idx = io - vlmcd->ios;
90 if (idx < 0 || idx >= vlmcd->nr_ops) {
91 perr(PE, 0, "Internal error: called with io = %p, idx = %ld, "
93 (void *)io, (long)(io - vlmcd->ios), vlmcd->nr_ops);
100 static inline struct io *alloc_io(struct vlmcd *vlmcd)
102 xqindex idx = xq_pop_head(&vlmcd->free_ios, 1);
106 // perr(PI, 0, "alloc'd io %p, in-flight reqs: %d", (void *)&vlmcd->ios[idx], vlmcd->flying);
107 return &vlmcd->ios[idx];
110 static inline void free_io(struct vlmcd *vlmcd, struct io *io)
112 /* FIXME: what if xq_append_head() fails? */
113 xq_append_head(&vlmcd->free_ios, __idx_from_io(vlmcd, io), 1);
117 static int complete(struct vlmcd *vlmcd, struct io *io)
121 io->vreq->state |= XS_SERVED;
122 // perr(PI, 0, "completed io %p", (void *)io);
123 ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
124 always_assert(ret != NoSerial);
125 ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
126 always_assert(ret == 0);
131 static int usage(char *argv0)
134 "Usage: %s [-p VLMCD_PORT] [-m MAPPERD_PORT]"
135 "[-b BLOCKD_POART] [-g XSEG_SPEC] [-n NR_OPS]\n\n"
137 "\tVLMCD_PORT: xseg port to listen for requests on\n"
138 "\tMAPPERD_PORT: xseg port where the mapper lives\n"
139 "\tBLOCKD_PORT: xseg port where blockd/filed/sosd lives\n"
140 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
141 "request_size:extra_size:page_shift'\n"
142 "\tNR_OPS: number of outstanding xseg requests\n",
148 static int safe_atoi(char *s)
153 l = strtol(s, &endp, 10);
154 if (s != endp && *endp == '\0')
160 static void parse_cmdline(int argc, char **argv)
166 c = getopt(argc, argv, "+:hp:m:b:n:g:");
172 perr(PFE, 0, "Unknown option: -%c", optopt);
175 perr(PFE, 0, "Option -%c requires an argument",
183 cmdline_vportno = safe_atoi(optarg);
186 cmdline_mportno = safe_atoi(optarg);
189 cmdline_bportno = safe_atoi(optarg);
192 cmdline_nr_ops = safe_atoi(optarg);
195 /* FIXME: Max length of spec? strdup, eww */
196 cmdline_xseg_spec = strdup(optarg);
197 if (!cmdline_xseg_spec)
198 perr(PFE, 0, "out of memory");
206 /* Sanity check for all arguments */
207 if (cmdline_vportno < 0)
208 perr(PFE, 0, "no or invalid port specified for vlmcd");
209 if (cmdline_mportno < 0)
210 perr(PFE, 0, "no or invalid port specified for mapperd");
211 if (cmdline_bportno < 0)
212 perr(PFE, 0, "no or invalid port specified for blockd/filed/sosd");
213 if (cmdline_nr_ops < 1)
214 perr(PFE, 0, "specified outstanding request count is invalid");
215 if (!cmdline_xseg_spec)
216 perr(PFE, 0, "xseg specification is mandatory");
219 perr(PFE, 0, "Non-option arguments specified on command line");
222 static struct xseg *join_or_create(char *spec)
224 struct xseg_config config;
227 (void)xseg_parse_spec(spec, &config);
228 xseg = xseg_join(config.type, config.name, "posix", NULL);
232 (void)xseg_create(&config);
233 return xseg_join(config.type, config.name, "posix", NULL);
237 * FIXME: What happens if this function fails?
238 * FIXME: How does this function fail? Do we return values from <errno.h>
239 * FIXME: Error reporting: Who prints errors, who prints errno?
241 static int dispatch(struct vlmcd *vlmcd, struct io *io, struct xseg_request *xreq)
248 always_assert(vlmcd);
252 vportno = vlmcd->vportno;
254 /* FIXME: Arguments, sanity checks on them? */
258 * Step 1: Issue a request to the mapper.
260 /* FIXME: xseglog(), strerror(), etc */
261 /* FIXME: xreq->target a pointer?! why not a field, like * xreq->op? */
262 always_assert(io->vreq == xreq);
263 io->vreq->serviced = 0;
264 io->mreq = xseg_get_request(xseg, vportno);
265 always_assert(io->mreq);
268 * We only care about the length of the target name
269 * and hope the mapper reply fits in the remaining datalen
272 ret = xseg_prep_request(xseg, io->mreq, io->vreq->targetlen,
273 io->mreq->bufferlen - io->vreq->targetlen);
274 always_assert(ret == 0);
276 struct xseg_request *m = io->mreq;
277 strncpy(m->target, io->vreq->target, m->targetlen);
278 m->size = io->vreq->size;
279 m->offset = io->vreq->offset;
281 m->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
282 switch (io->vreq->op) {
283 case X_READ: m->op = X_MAPR; break;
284 case X_WRITE: m->op = X_MAPW; break;
285 case X_INFO: m->op = X_INFO; break;
287 perr(PFE, 0, "Internal error? io->vreq->op = "
288 "%d\n", io->vreq->op);
290 if (m->op == X_INFO) {
291 ret = xseg_submit(xseg, vlmcd->bportno, io->mreq);
292 always_assert(ret != NoSerial);
293 always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
296 ret = xseg_submit(xseg, vlmcd->mportno, io->mreq);
297 always_assert(ret != NoSerial);
298 always_assert(xseg_signal(xseg, vlmcd->mportno) == 0);
305 * Step 2. Issue block requests, one per segment
306 * in the reply from the mapper.
309 /* For every mapped segment, issue a request to blockd */
310 /* FIXME: what if we run out of xseg requests? */
311 always_assert(xreq == io->mreq);
312 always_assert(!(xreq->state & XS_FAILED) && xreq->state & XS_SERVED); /* FIXME: This is too harsh */
313 if (xreq->op == X_INFO) {
314 *(off_t *)io->vreq->data = *(off_t *)io->mreq->data;
315 io->vreq->state |= XS_SERVED;
317 ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
318 always_assert(ret != NoSerial);
319 ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
320 always_assert(ret == 0);
321 io->state = CONCLUDED;
322 always_assert(xseg_put_request(xseg, vportno, io->mreq) != NoSerial);
325 struct xseg_reply_map *mreply = (void *)io->mreq->data;
326 always_assert(mreply->cnt > 0);
327 //perr(PE, 0, "%llu %llu %llu mreply->target = %d\n", mreply->cnt, mreply->segs[0].size, mreply->segs[0].offset, mreply->segs[0].target[0]);
329 io->breqs_len = mreply->cnt;
330 io->breqs = calloc(io->breqs_len, sizeof(struct xseg_request *));
331 always_assert(io->breqs);
332 for (i = 0, pos = 0; i < mreply->cnt; i++) {
333 uint64_t datalen, offset, targetlen;
334 struct xseg_request *breq;
336 datalen = mreply->segs[i].size;
337 offset = mreply->segs[i].offset;
338 targetlen = strlen(mreply->segs[i].target);
340 breq = xseg_get_request(xseg, vportno);
342 always_assert(datalen + targetlen <= breq->bufferlen);
344 ret = xseg_prep_request(xseg, breq, targetlen, datalen);
345 breq->datalen = datalen;
346 breq->offset = offset;
347 breq->size = datalen;
348 breq->op = io->vreq->op;
349 breq->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
350 strncpy(breq->target, mreply->segs[i].target, targetlen);
352 * Get the blocker to place data directly into vreq's
353 * buffer. FIXME: Manipulate ->data by hand?
355 breq->data = io->vreq->data + pos;
358 ret = xseg_submit(xseg, vlmcd->bportno, breq);
359 always_assert(ret != NoSerial);
362 always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
365 ret = xseg_put_request(xseg, vportno, io->mreq);
366 always_assert(ret == 0);
373 * One of the breqs has been completed.
374 * Update io and vreq counters, complete vreq when
375 * all of the data have arrived.
377 #if VLMCD_SANITY_CHECKS
378 for (i = 0; i < io->breqs_len; i++)
379 if (io->breqs[i] == xreq)
381 if (i >= io->breqs_len) {
382 perr(PE, 0, "Called for xreq = %p, not belonging to io %p",
383 (void *)xreq, (void *)io);
385 /* FIXME: how do I handle this? */
388 struct xseg_request *breq = xreq;
389 always_assert(!(breq->state & XS_FAILED) && breq->state & XS_SERVED);
390 always_assert(breq->serviced == breq->size);
391 io->vreq->serviced += breq->serviced;
392 ret = xseg_put_request(xseg, vportno, breq);
393 always_assert(ret == 0);
395 if (--io->breq_cnt == 0) {
396 always_assert(io->vreq->serviced == io->vreq->datalen);
398 io->state = CONCLUDED;
403 perr(PFE, 0, "Internal error, called for CONCLUDED");
406 perr(PFE, 0, "Internal error, io->state = %d\n", io->state);
412 static int vlmcd_loop(struct vlmcd *vlmcd)
416 struct xseg_request *xreq;
417 struct xseg *xseg = vlmcd->xseg;
418 uint32_t vportno = vlmcd->vportno;
423 ret = xseg_prepare_wait(xseg, vportno);
424 always_assert(ret == 0);
428 * Accept requests from xseg if under the nr_ops limit,
429 * and check if any replies have been received.
431 * Use ->priv for tracking, retrieve the relevant io struct
432 * we reply upon our peers to not have touched -> priv
434 if (vlmcd->flying < vlmcd->nr_ops &&
435 (xreq = xseg_accept(xseg, vportno))) {
436 io = alloc_io(vlmcd);
438 io->state = ACCEPTED;
440 xreq = xseg_receive(xseg, vportno);
442 io = __io_from_idx(vlmcd, xreq->priv);
444 always_assert(io->state != CONCLUDED);
448 /* io is the pending io currently being processed */
450 /* FIXME: WHY cancel_wait() anyway? */
451 ret = xseg_cancel_wait(xseg, vportno);
452 always_assert(ret == 0);
453 dispatch(vlmcd, io, xreq);
456 * If things are OK, no timeout should ever be needed.
457 * Otherwise, it's a vlmcd or xseg bug.
458 * FIXME: sigtimedwait() with zero-valued timeout?
461 xseg_wait_signal(xseg, 100000UL);
469 * FIXME: Initialize the vlmcd struct based on cmdline_* vars
471 static int vlmcd_init(struct vlmcd *vlmcd)
475 vlmcd->vportno = cmdline_vportno;
476 vlmcd->mportno = cmdline_mportno;
477 vlmcd->bportno = cmdline_bportno;
480 vlmcd->nr_ops = cmdline_nr_ops;
481 vlmcd->ios = calloc(vlmcd->nr_ops, sizeof(struct io));
483 perr(PE, 0, "could not allocate memory [ios]");
488 /* FIXME: meaning of arguments to xq_alloc_seq()? */
489 if (!xq_alloc_seq(&vlmcd->free_ios, cmdline_nr_ops, cmdline_nr_ops)) {
490 perr(PE, 0, "could not allocate memory [free_ios]");
495 /* FIXME: If xseg library fails, is errno set? */
496 if (xseg_initialize()) {
497 perr(PE, 0, "could not initialize xseg library");
499 goto out_with_freeios;
502 if (! (vlmcd->xseg = join_or_create(cmdline_xseg_spec))) {
503 perr(PE, 0, "could not join or create xseg with spec '%s'\n",
506 goto out_with_xseginit;
509 if (! (vlmcd->vport = xseg_bind_port(vlmcd->xseg, vlmcd->vportno))) {
510 perr(PE, 0, "cannot bind to xseg port %ld", (long)vlmcd->vportno);
512 goto out_with_xsegjoin;
515 vlmcd->vportno = xseg_portno(vlmcd->xseg, vlmcd->vport);
517 perr(PI, 0, "vlmcd on port %u of %u",
518 vlmcd->vportno, vlmcd->xseg->config.nr_ports);
524 xseg_leave(vlmcd->xseg);
526 always_assert(xseg_finalize() == 0);
528 xq_free(&vlmcd->free_ios);
535 int main(int argc, char *argv[])
540 parse_cmdline(argc, argv);
542 perr(PI, 0, "v = %ld, m = %ld, b = %ld, nr_ops = %lu\n",
543 cmdline_vportno, cmdline_mportno, cmdline_bportno, cmdline_nr_ops);
545 if (vlmcd_init(&vlmc) < 0)
546 perr(PFE, 0, "failed to initialize vlmcd");
548 return vlmcd_loop(&vlmc);