2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
42 #include <sys/types.h>
51 #include <xseg/xseg.h>
54 #include <xseg/protocol.h>
56 #include "common.h" /* Please fix me */
58 #define MAX_PATH_SIZE 255
59 #define MAX_FILENAME_SIZE 255
61 #define DEFAULT_NR_OPS 128
63 #define VLMCD_SANITY_CHECKS 1
66 * Globals, holding command-line arguments
68 long cmdline_vportno = -1;
69 long cmdline_mportno = -1;
70 long cmdline_bportno = -1;
71 char *cmdline_xseg_spec = NULL;
72 long cmdline_nr_ops = DEFAULT_NR_OPS;
75 * vlmcd-specific structure,
76 * containing information on a pending I/O operation
78 /* FIXME: XS_CONCLUDED equals XS_SERVING? */
79 /* FIXME: is it really vlmcd-specific? */
88 enum io_state_enum state;
89 struct xseg_request *vreq;
90 struct xseg_request *mreq;
91 struct xseg_request **breqs;
92 int breqs_len, breq_cnt;
97 struct xseg_port *vport;
98 uint32_t vportno, mportno, bportno;
109 static inline struct io *__io_from_idx(struct vlmcd *vlmcd, xqindex idx)
111 if (idx >= vlmcd->nr_ops) {
112 perr(PE, 0, "Internal error: called with idx = %ld > %ld",
113 (long)idx, vlmcd->nr_ops);
117 return &vlmcd->ios[idx];
120 static inline xqindex __idx_from_io(struct vlmcd *vlmcd, struct io *io)
122 long idx = io - vlmcd->ios;
124 if (idx < 0 || idx >= vlmcd->nr_ops) {
125 perr(PE, 0, "Internal error: called with io = %p, idx = %ld, "
127 (void *)io, (long)(io - vlmcd->ios), vlmcd->nr_ops);
134 static inline struct io *alloc_io(struct vlmcd *vlmcd)
136 xqindex idx = xq_pop_head(&vlmcd->free_ios, 1);
140 // perr(PI, 0, "alloc'd io %p, in-flight reqs: %d", (void *)&vlmcd->ios[idx], vlmcd->flying);
141 return &vlmcd->ios[idx];
144 static inline void free_io(struct vlmcd *vlmcd, struct io *io)
146 /* FIXME: what if xq_append_head() fails? */
147 xq_append_head(&vlmcd->free_ios, __idx_from_io(vlmcd, io), 1);
151 static int complete(struct vlmcd *vlmcd, struct io *io)
155 io->vreq->state |= XS_SERVED;
156 // perr(PI, 0, "completed io %p", (void *)io);
157 ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
158 always_assert(ret != NoSerial);
159 ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
160 always_assert(ret == 0);
165 static int usage(char *argv0)
168 "Usage: %s [-p VLMCD_PORT] [-m MAPPERD_PORT]"
169 "[-b BLOCKD_POART] [-g XSEG_SPEC] [-n NR_OPS]\n\n"
171 "\tVLMCD_PORT: xseg port to listen for requests on\n"
172 "\tMAPPERD_PORT: xseg port where the mapper lives\n"
173 "\tBLOCKD_PORT: xseg port where blockd/filed/sosd lives\n"
174 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
175 "request_size:extra_size:page_shift'\n"
176 "\tNR_OPS: number of outstanding xseg requests\n",
182 static int safe_atoi(char *s)
187 l = strtol(s, &endp, 10);
188 if (s != endp && *endp == '\0')
194 static void parse_cmdline(int argc, char **argv)
200 c = getopt(argc, argv, "+:hp:m:b:n:g:");
206 perr(PFE, 0, "Unknown option: -%c", optopt);
209 perr(PFE, 0, "Option -%c requires an argument",
217 cmdline_vportno = safe_atoi(optarg);
220 cmdline_mportno = safe_atoi(optarg);
223 cmdline_bportno = safe_atoi(optarg);
226 cmdline_nr_ops = safe_atoi(optarg);
229 /* FIXME: Max length of spec? strdup, eww */
230 cmdline_xseg_spec = strdup(optarg);
231 if (!cmdline_xseg_spec)
232 perr(PFE, 0, "out of memory");
240 /* Sanity check for all arguments */
241 if (cmdline_vportno < 0)
242 perr(PFE, 0, "no or invalid port specified for vlmcd");
243 if (cmdline_mportno < 0)
244 perr(PFE, 0, "no or invalid port specified for mapperd");
245 if (cmdline_bportno < 0)
246 perr(PFE, 0, "no or invalid port specified for blockd/filed/sosd");
247 if (cmdline_nr_ops < 1)
248 perr(PFE, 0, "specified outstanding request count is invalid");
249 if (!cmdline_xseg_spec)
250 perr(PFE, 0, "xseg specification is mandatory");
253 perr(PFE, 0, "Non-option arguments specified on command line");
256 static struct xseg *join_or_create(char *spec)
258 struct xseg_config config;
261 (void)xseg_parse_spec(spec, &config);
262 xseg = xseg_join(config.type, config.name, "posix", NULL);
266 (void)xseg_create(&config);
267 return xseg_join(config.type, config.name, "posix", NULL);
271 * FIXME: What happens if this function fails?
272 * FIXME: How does this function fail? Do we return values from <errno.h>
273 * FIXME: Error reporting: Who prints errors, who prints errno?
275 static int dispatch(struct vlmcd *vlmcd, struct io *io, struct xseg_request *xreq)
282 always_assert(vlmcd);
286 vportno = vlmcd->vportno;
288 /* FIXME: Arguments, sanity checks on them? */
292 * Step 1: Issue a request to the mapper.
294 /* FIXME: xseglog(), strerror(), etc */
295 /* FIXME: xreq->target a pointer?! why not a field, like * xreq->op? */
296 always_assert(io->vreq == xreq);
297 io->vreq->serviced = 0;
298 io->mreq = xseg_get_request(xseg, vportno);
299 always_assert(io->mreq);
302 * We only care about the length of the target name
303 * and hope the mapper reply fits in the remaining datalen
306 ret = xseg_prep_request(xseg, io->mreq, io->vreq->targetlen,
307 io->mreq->bufferlen - io->vreq->targetlen);
308 always_assert(ret == 0);
310 struct xseg_request *m = io->mreq;
311 strncpy(m->target, io->vreq->target, m->targetlen);
312 m->size = io->vreq->size;
313 m->offset = io->vreq->offset;
315 m->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
316 switch (io->vreq->op) {
317 case X_READ: m->op = X_MAPR; break;
318 case X_WRITE: m->op = X_MAPW; break;
319 case X_INFO: m->op = X_INFO; break;
321 perr(PFE, 0, "Internal error? io->vreq->op = "
322 "%d\n", io->vreq->op);
324 if (m->op == X_INFO) {
325 ret = xseg_submit(xseg, vlmcd->bportno, io->mreq);
326 always_assert(ret != NoSerial);
327 always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
330 ret = xseg_submit(xseg, vlmcd->mportno, io->mreq);
331 always_assert(ret != NoSerial);
332 always_assert(xseg_signal(xseg, vlmcd->mportno) == 0);
339 * Step 2. Issue block requests, one per segment
340 * in the reply from the mapper.
343 /* For every mapped segment, issue a request to blockd */
344 /* FIXME: what if we run out of xseg requests? */
345 always_assert(xreq == io->mreq);
346 always_assert(!(xreq->state & XS_FAILED) && xreq->state & XS_SERVED); /* FIXME: This is too harsh */
347 if (xreq->op == X_INFO) {
348 *(off_t *)io->vreq->data = *(off_t *)io->mreq->data;
349 io->vreq->state |= XS_SERVED;
351 ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
352 always_assert(ret != NoSerial);
353 ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
354 always_assert(ret == 0);
355 io->state = CONCLUDED;
356 always_assert(xseg_put_request(xseg, vportno, io->mreq) != NoSerial);
359 struct xseg_reply_map *mreply = (void *)io->mreq->data;
360 always_assert(mreply->cnt > 0);
361 //perr(PE, 0, "%llu %llu %llu mreply->target = %d\n", mreply->cnt, mreply->segs[0].size, mreply->segs[0].offset, mreply->segs[0].target[0]);
363 io->breqs_len = mreply->cnt;
364 io->breqs = calloc(io->breqs_len, sizeof(struct xseg_request *));
365 always_assert(io->breqs);
366 for (i = 0, pos = 0; i < mreply->cnt; i++) {
367 uint64_t datalen, offset, targetlen;
368 struct xseg_request *breq;
370 datalen = mreply->segs[i].size;
371 offset = mreply->segs[i].offset;
372 targetlen = strlen(mreply->segs[i].target);
374 breq = xseg_get_request(xseg, vportno);
376 always_assert(datalen + targetlen <= breq->bufferlen);
378 ret = xseg_prep_request(xseg, breq, targetlen, datalen);
379 breq->datalen = datalen;
380 breq->offset = offset;
381 breq->size = datalen;
382 breq->op = io->vreq->op;
383 breq->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
384 strncpy(breq->target, mreply->segs[i].target, targetlen);
386 * Get the blocker to place data directly into vreq's
387 * buffer. FIXME: Manipulate ->data by hand?
389 breq->data = io->vreq->data + pos;
392 ret = xseg_submit(xseg, vlmcd->bportno, breq);
393 always_assert(ret != NoSerial);
396 always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
399 ret = xseg_put_request(xseg, vportno, io->mreq);
400 always_assert(ret == 0);
407 * One of the breqs has been completed.
408 * Update io and vreq counters, complete vreq when
409 * all of the data have arrived.
411 #if VLMCD_SANITY_CHECKS
412 for (i = 0; i < io->breqs_len; i++)
413 if (io->breqs[i] == xreq)
415 if (i >= io->breqs_len) {
416 perr(PE, 0, "Called for xreq = %p, not belonging to io %p",
417 (void *)xreq, (void *)io);
419 /* FIXME: how do I handle this? */
422 struct xseg_request *breq = xreq;
423 always_assert(!(breq->state & XS_FAILED) && breq->state & XS_SERVED);
424 always_assert(breq->serviced == breq->size);
425 io->vreq->serviced += breq->serviced;
426 ret = xseg_put_request(xseg, vportno, breq);
427 always_assert(ret == 0);
429 if (--io->breq_cnt == 0) {
430 always_assert(io->vreq->serviced == io->vreq->datalen);
432 io->state = CONCLUDED;
437 perr(PFE, 0, "Internal error, called for CONCLUDED");
440 perr(PFE, 0, "Internal error, io->state = %d\n", io->state);
446 static int vlmcd_loop(struct vlmcd *vlmcd)
450 struct xseg_request *xreq;
451 struct xseg *xseg = vlmcd->xseg;
452 uint32_t vportno = vlmcd->vportno;
457 ret = xseg_prepare_wait(xseg, vportno);
458 always_assert(ret == 0);
462 * Accept requests from xseg if under the nr_ops limit,
463 * and check if any replies have been received.
465 * Use ->priv for tracking, retrieve the relevant io struct
466 * we reply upon our peers to not have touched -> priv
468 if (vlmcd->flying < vlmcd->nr_ops &&
469 (xreq = xseg_accept(xseg, vportno))) {
470 io = alloc_io(vlmcd);
472 io->state = ACCEPTED;
474 xreq = xseg_receive(xseg, vportno);
476 io = __io_from_idx(vlmcd, xreq->priv);
478 always_assert(io->state != CONCLUDED);
482 /* io is the pending io currently being processed */
484 /* FIXME: WHY cancel_wait() anyway? */
485 ret = xseg_cancel_wait(xseg, vportno);
486 always_assert(ret == 0);
487 dispatch(vlmcd, io, xreq);
490 * If things are OK, no timeout should ever be needed.
491 * Otherwise, it's a vlmcd or xseg bug.
492 * FIXME: sigtimedwait() with zero-valued timeout?
495 xseg_wait_signal(xseg, 100000UL);
503 * FIXME: Initialize the vlmcd struct based on cmdline_* vars
505 static int vlmcd_init(struct vlmcd *vlmcd)
509 vlmcd->vportno = cmdline_vportno;
510 vlmcd->mportno = cmdline_mportno;
511 vlmcd->bportno = cmdline_bportno;
514 vlmcd->nr_ops = cmdline_nr_ops;
515 vlmcd->ios = calloc(vlmcd->nr_ops, sizeof(struct io));
517 perr(PE, 0, "could not allocate memory [ios]");
522 /* FIXME: meaning of arguments to xq_alloc_seq()? */
523 if (!xq_alloc_seq(&vlmcd->free_ios, cmdline_nr_ops, cmdline_nr_ops)) {
524 perr(PE, 0, "could not allocate memory [free_ios]");
529 /* FIXME: If xseg library fails, is errno set? */
530 if (xseg_initialize()) {
531 perr(PE, 0, "could not initialize xseg library");
533 goto out_with_freeios;
536 if (! (vlmcd->xseg = join_or_create(cmdline_xseg_spec))) {
537 perr(PE, 0, "could not join or create xseg with spec '%s'\n",
540 goto out_with_xseginit;
543 if (! (vlmcd->vport = xseg_bind_port(vlmcd->xseg, vlmcd->vportno))) {
544 perr(PE, 0, "cannot bind to xseg port %ld", (long)vlmcd->vportno);
546 goto out_with_xsegjoin;
549 vlmcd->vportno = xseg_portno(vlmcd->xseg, vlmcd->vport);
551 perr(PI, 0, "vlmcd on port %u of %u",
552 vlmcd->vportno, vlmcd->xseg->config.nr_ports);
558 xseg_leave(vlmcd->xseg);
560 always_assert(xseg_finalize() == 0);
562 xq_free(&vlmcd->free_ios);
569 int main(int argc, char *argv[])
574 parse_cmdline(argc, argv);
576 perr(PI, 0, "v = %ld, m = %ld, b = %ld, nr_ops = %lu\n",
577 cmdline_vportno, cmdline_mportno, cmdline_bportno, cmdline_nr_ops);
579 if (vlmcd_init(&vlmc) < 0)
580 perr(PFE, 0, "failed to initialize vlmcd");
582 return vlmcd_loop(&vlmc);