fix bugs in vlmc tool
[archipelago] / xseg / peers / user / vlmcd.c
1 /*
2  * The VoLuMe Composer
3  */
4
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <aio.h>
15 #include <signal.h>
16 #include <limits.h>
17 #include <xseg/xseg.h>
18 #include <pthread.h>
19
20 #include <xseg/protocol.h>
21
22 #include "common.h"  /* Please fix me */
23
24 #define MAX_PATH_SIZE 255
25 #define MAX_FILENAME_SIZE 255
26
27 #define DEFAULT_NR_OPS 128
28
29 #define VLMCD_SANITY_CHECKS 1
30
31 /*
32  * Globals, holding command-line arguments
33  */
34 long cmdline_vportno = -1;
35 long cmdline_mportno = -1;
36 long cmdline_bportno = -1;
37 char *cmdline_xseg_spec = NULL;
38 long cmdline_nr_ops = DEFAULT_NR_OPS;
39
40 /*
41  * vlmcd-specific structure,
42  * containing information on a pending I/O operation
43  */
44 /* FIXME: XS_CONCLUDED equals XS_SERVING? */
45 /* FIXME: is it really vlmcd-specific? */
46 enum io_state_enum {
47         ACCEPTED = 0,
48         MAPPING = 1,
49         SERVING = 2,
50         CONCLUDED = 3
51 };
52
53 struct io {
54         enum io_state_enum state;
55         struct xseg_request *vreq;
56         struct xseg_request *mreq;
57         struct xseg_request **breqs;
58         int breqs_len, breq_cnt;
59 };
60
61 struct vlmcd {
62         struct xseg *xseg;
63         struct xseg_port *vport;
64         uint32_t vportno, mportno, bportno;
65
66         int flying;
67         long nr_ops;
68         struct xq free_ops;
69
70         struct xq free_ios;
71         struct io *ios;
72
73 };
74
75 static inline struct io *__io_from_idx(struct vlmcd *vlmcd, xqindex idx)
76 {
77         if (idx >= vlmcd->nr_ops) {
78                 perr(PE, 0, "Internal error: called with idx = %ld > %ld",
79                         (long)idx, vlmcd->nr_ops);
80                 return NULL;
81         }
82
83         return &vlmcd->ios[idx];
84 }
85
86 static inline xqindex __idx_from_io(struct vlmcd *vlmcd, struct io *io)
87 {
88         long idx = io - vlmcd->ios;
89
90         if (idx < 0 || idx >= vlmcd->nr_ops) {
91                 perr(PE, 0, "Internal error: called with io = %p, idx = %ld, "
92                         "nr_ops = %ld",
93                         (void *)io, (long)(io - vlmcd->ios), vlmcd->nr_ops);
94                 return Noneidx;
95         }
96
97         return idx;
98 }
99
100 static inline struct io *alloc_io(struct vlmcd *vlmcd)
101 {
102         xqindex idx = xq_pop_head(&vlmcd->free_ios, 1);
103         if (idx == Noneidx)
104                 return NULL;
105         ++vlmcd->flying;
106 //      perr(PI, 0, "alloc'd io %p, in-flight reqs: %d", (void *)&vlmcd->ios[idx], vlmcd->flying);
107         return &vlmcd->ios[idx];
108 }
109
110 static inline void free_io(struct vlmcd *vlmcd, struct io *io)
111 {
112         /* FIXME: what if xq_append_head() fails? */
113         xq_append_head(&vlmcd->free_ios, __idx_from_io(vlmcd, io), 1);
114         --vlmcd->flying;
115 }
116
117 static int complete(struct vlmcd *vlmcd, struct io *io)
118 {
119         int ret;
120
121         io->vreq->state |= XS_SERVED;
122 //      perr(PI, 0, "completed io %p", (void *)io);
123         ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
124         always_assert(ret != NoSerial);
125         ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
126         always_assert(ret == 0);
127
128         return 0;
129 }
130
131 static int usage(char *argv0)
132 {
133         fprintf(stderr,
134                 "Usage: %s [-p VLMCD_PORT] [-m MAPPERD_PORT]"
135                         "[-b BLOCKD_POART] [-g XSEG_SPEC] [-n NR_OPS]\n\n"
136                 "where:\n"
137                 "\tVLMCD_PORT: xseg port to listen for requests on\n"
138                 "\tMAPPERD_PORT: xseg port where the mapper lives\n"
139                 "\tBLOCKD_PORT: xseg port where blockd/filed/sosd lives\n"
140                 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
141                         "request_size:extra_size:page_shift'\n"
142                 "\tNR_OPS: number of outstanding xseg requests\n",
143                 argv0);
144
145         return 1;
146 }
147
148 static int safe_atoi(char *s)
149 {
150         long l;
151         char *endp;
152
153         l = strtol(s, &endp, 10);
154         if (s != endp && *endp == '\0')
155                 return l;
156         else
157                 return -1;
158 }
159
160 static void parse_cmdline(int argc, char **argv)
161 {
162         for (;;) {
163                 int c;
164
165                 opterr = 0;
166                 c = getopt(argc, argv, "+:hp:m:b:n:g:");
167                 if (c == -1)
168                         break;
169                 
170                 switch(c) {
171                         case '?':
172                                 perr(PFE, 0, "Unknown option: -%c", optopt);
173                                 break;
174                         case ':':
175                                 perr(PFE, 0, "Option -%c requires an argument",
176                                         optopt);
177                                 break;
178                         case 'h':
179                                 usage(argv[0]);
180                                 exit(0);
181                                 break;
182                         case 'p':
183                                 cmdline_vportno = safe_atoi(optarg);
184                                 break;
185                         case 'm':
186                                 cmdline_mportno = safe_atoi(optarg);
187                                 break;
188                         case 'b':
189                                 cmdline_bportno = safe_atoi(optarg);
190                                 break;
191                         case 'n':
192                                 cmdline_nr_ops = safe_atoi(optarg);
193                                 break;
194                         case 'g':
195                                 /* FIXME: Max length of spec? strdup, eww */
196                                 cmdline_xseg_spec = strdup(optarg);
197                                 if (!cmdline_xseg_spec)
198                                         perr(PFE, 0, "out of memory");
199                                 break;
200                 }
201         }
202
203         argc -= optind;
204         argv += optind;
205
206         /* Sanity check for all arguments */
207         if (cmdline_vportno < 0)
208                 perr(PFE, 0, "no or invalid port specified for vlmcd");
209         if (cmdline_mportno < 0)
210                 perr(PFE, 0, "no or invalid port specified for mapperd");
211         if (cmdline_bportno < 0)
212                 perr(PFE, 0, "no or invalid port specified for blockd/filed/sosd");
213         if (cmdline_nr_ops < 1)
214                 perr(PFE, 0, "specified outstanding request count is invalid");
215         if (!cmdline_xseg_spec)
216                 perr(PFE, 0, "xseg specification is mandatory");
217
218         if (argc)
219                 perr(PFE, 0, "Non-option arguments specified on command line");
220 }
221
222 static struct xseg *join_or_create(char *spec)
223 {
224         struct xseg_config config;
225         struct xseg *xseg;
226
227         (void)xseg_parse_spec(spec, &config);
228         xseg = xseg_join(config.type, config.name, "posix", NULL);
229         if (xseg)
230                 return xseg;
231
232         (void)xseg_create(&config);
233         return xseg_join(config.type, config.name, "posix", NULL);
234 }
235
236 /*
237  * FIXME: What happens if this function fails?
238  * FIXME: How does this function fail? Do we return values from <errno.h>
239  * FIXME: Error reporting: Who prints errors, who prints errno?
240  */
241 static int dispatch(struct vlmcd *vlmcd, struct io *io, struct xseg_request *xreq)
242 {
243         struct xseg *xseg;
244         uint32_t vportno;
245         int i, ret;
246         uint64_t pos;
247
248         always_assert(vlmcd);
249         always_assert(io);
250         xseg = vlmcd->xseg;
251         always_assert(xseg);
252         vportno = vlmcd->vportno;
253
254         /* FIXME: Arguments, sanity checks on them? */
255         switch (io->state) {
256         case ACCEPTED:
257                 /*
258                  * Step 1: Issue a request to the mapper.
259                  */
260                 /* FIXME: xseglog(), strerror(), etc */
261                 /* FIXME: xreq->target a pointer?! why not a field, like * xreq->op? */
262                 always_assert(io->vreq == xreq);
263                 io->vreq->serviced = 0;
264                 io->mreq = xseg_get_request(xseg, vportno);
265                 always_assert(io->mreq);
266                 /*
267                  * FIXME:
268                  * We only care about the length of the target name
269                  * and hope the mapper reply fits in the remaining datalen
270                  * bytes.
271                  */
272                 ret = xseg_prep_request(xseg, io->mreq, io->vreq->targetlen,
273                         io->mreq->bufferlen - io->vreq->targetlen);
274                 always_assert(ret == 0);
275
276                 struct xseg_request *m = io->mreq;
277                 strncpy(m->target, io->vreq->target, m->targetlen);
278                 m->size = io->vreq->size;
279                 m->offset = io->vreq->offset;
280                 m->flags = 0;
281                 m->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
282                 switch (io->vreq->op) {
283                         case X_READ:  m->op = X_MAPR; break;
284                         case X_WRITE: m->op = X_MAPW; break;
285                         case X_INFO:  m->op = X_INFO; break;
286                         default:
287                                 perr(PFE, 0, "Internal error? io->vreq->op = "
288                                         "%d\n", io->vreq->op);
289                 }
290                 if (m->op == X_INFO) {
291                         ret = xseg_submit(xseg, vlmcd->bportno, io->mreq);
292                         always_assert(ret != NoSerial);
293                         always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
294                 }
295                 else {
296                         ret = xseg_submit(xseg, vlmcd->mportno, io->mreq);
297                         always_assert(ret != NoSerial);
298                         always_assert(xseg_signal(xseg, vlmcd->mportno) == 0);
299                 }
300
301                 io->state = MAPPING;
302                 break;
303         case MAPPING:
304                 /*
305                  * Step 2. Issue block requests, one per segment
306                  * in the reply from the mapper.
307                  */
308                 /* FIXME */
309                 /* For every mapped segment, issue a request to blockd */
310                 /* FIXME: what if we run out of xseg requests? */
311                 always_assert(xreq == io->mreq);
312                 always_assert(!(xreq->state & XS_FAILED) && xreq->state & XS_SERVED); /* FIXME: This is too harsh */
313                 if (xreq->op == X_INFO) {
314                         *(off_t *)io->vreq->data = *(off_t *)io->mreq->data;
315                         io->vreq->state |= XS_SERVED;
316
317                         ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
318                         always_assert(ret != NoSerial);
319                         ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
320                         always_assert(ret == 0);
321                         io->state = CONCLUDED;
322                         always_assert(xseg_put_request(xseg, vportno, io->mreq) != NoSerial);
323                         free_io(vlmcd, io);
324                 } else {
325                         struct xseg_reply_map *mreply = (void *)io->mreq->data;
326                         always_assert(mreply->cnt > 0);
327                         //perr(PE, 0, "%llu %llu %llu mreply->target = %d\n", mreply->cnt, mreply->segs[0].size, mreply->segs[0].offset, mreply->segs[0].target[0]);
328
329                         io->breqs_len = mreply->cnt;
330                         io->breqs = calloc(io->breqs_len, sizeof(struct xseg_request *));
331                         always_assert(io->breqs);
332                         for (i = 0, pos = 0; i < mreply->cnt; i++) {
333                                 uint64_t datalen, offset, targetlen;
334                                 struct xseg_request *breq;
335
336                                 datalen = mreply->segs[i].size;
337                                 offset = mreply->segs[i].offset;
338                                 targetlen = strlen(mreply->segs[i].target);
339
340                                 breq = xseg_get_request(xseg, vportno);
341                                 always_assert(breq);
342                                 always_assert(datalen + targetlen <= breq->bufferlen);
343
344                                 ret = xseg_prep_request(xseg, breq, targetlen, datalen);
345                                 breq->datalen = datalen;
346                                 breq->offset = offset;
347                                 breq->size = datalen;
348                                 breq->op = io->vreq->op;
349                                 breq->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
350                                 strncpy(breq->target, mreply->segs[i].target, targetlen);
351                                 /*
352                                  * Get the blocker to place data directly into vreq's
353                                  * buffer. FIXME: Manipulate ->data by hand?
354                                  */
355                                 breq->data = io->vreq->data + pos;
356                                 pos += datalen;
357
358                                 ret = xseg_submit(xseg, vlmcd->bportno, breq);
359                                 always_assert(ret != NoSerial);
360                                 /* possible race? */
361                                 io->breqs[i] = breq;
362                                 always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
363                         }
364                         io->breq_cnt = i;
365                         ret = xseg_put_request(xseg, vportno, io->mreq);
366                         always_assert(ret == 0);
367
368                         io->state = SERVING;
369                 }
370                 break;
371         case SERVING:
372                 /*
373                  * One of the breqs has been completed.
374                  * Update io and vreq counters, complete vreq when
375                  * all of the data have arrived.
376                  */
377 #if VLMCD_SANITY_CHECKS
378                 for (i = 0; i < io->breqs_len; i++)
379                         if (io->breqs[i] == xreq)
380                                 break;
381                 if (i >= io->breqs_len) {
382                         perr(PE, 0, "Called for xreq = %p, not belonging to io %p",
383                                 (void *)xreq, (void *)io);
384                         always_assert(0);
385                         /* FIXME: how do I handle this? */
386                 }
387 #endif
388                 struct xseg_request *breq = xreq;
389                 always_assert(!(breq->state & XS_FAILED) && breq->state & XS_SERVED);
390                 always_assert(breq->serviced == breq->size);
391                 io->vreq->serviced += breq->serviced;
392                 ret = xseg_put_request(xseg, vportno, breq);
393                 always_assert(ret == 0);
394                 
395                 if (--io->breq_cnt == 0) {
396                         always_assert(io->vreq->serviced == io->vreq->datalen);
397                         complete(vlmcd, io);
398                         io->state = CONCLUDED;
399                         free_io(vlmcd, io);
400                 }
401                 break;
402         case CONCLUDED:
403                 perr(PFE, 0, "Internal error, called for CONCLUDED");
404                 break;
405         default:
406                 perr(PFE, 0, "Internal error, io->state = %d\n", io->state);
407         }
408
409         return 0;
410 }
411
412 static int vlmcd_loop(struct vlmcd *vlmcd)
413 {
414         int ret;
415         struct io *io;
416         struct xseg_request *xreq;
417         struct xseg *xseg = vlmcd->xseg;
418         uint32_t vportno = vlmcd->vportno;
419
420         always_assert(xseg);
421
422         for (;;) {
423                 ret = xseg_prepare_wait(xseg, vportno);
424                 always_assert(ret == 0);
425
426                 io = NULL;
427                 /*
428                  * Accept requests from xseg if under the nr_ops limit,
429                  * and check if any replies have been received.
430                  *
431                  * Use ->priv for tracking, retrieve the relevant io struct
432                  * we reply upon our peers to not have touched -> priv
433                  */
434                 if (vlmcd->flying < vlmcd->nr_ops &&
435                     (xreq = xseg_accept(xseg, vportno))) {
436                         io = alloc_io(vlmcd);
437                         io->vreq = xreq;
438                         io->state = ACCEPTED;
439                 } else {
440                         xreq = xseg_receive(xseg, vportno);
441                         if (xreq) {
442                                 io = __io_from_idx(vlmcd, xreq->priv);
443                                 always_assert(io);
444                                 always_assert(io->state != CONCLUDED);
445                         }
446                 }
447
448                 /* io is the pending io currently being processed */
449                 if (io) {
450                         /* FIXME: WHY cancel_wait() anyway? */
451                         ret = xseg_cancel_wait(xseg, vportno);
452                         always_assert(ret == 0);
453                         dispatch(vlmcd, io, xreq);
454                 } else {
455                         /*
456                          * If things are OK, no timeout should ever be needed.
457                          * Otherwise, it's a vlmcd or xseg bug.
458                          * FIXME: sigtimedwait() with zero-valued timeout?
459                          * FAIL.
460                          */
461                         xseg_wait_signal(xseg, 100000UL);
462                 }
463         }
464
465         return 0;
466 }
467
468 /*
469  * FIXME: Initialize the vlmcd struct based on cmdline_* vars
470  */
471 static int vlmcd_init(struct vlmcd *vlmcd)
472 {
473         int ret;
474
475         vlmcd->vportno = cmdline_vportno;
476         vlmcd->mportno = cmdline_mportno;
477         vlmcd->bportno = cmdline_bportno;
478
479         vlmcd->flying = 0;
480         vlmcd->nr_ops = cmdline_nr_ops;
481         vlmcd->ios = calloc(vlmcd->nr_ops, sizeof(struct io));
482         if (!vlmcd->ios) {
483                 perr(PE, 0, "could not allocate memory [ios]");
484                 ret = -ENOMEM;
485                 goto out;
486         }
487
488         /* FIXME: meaning of arguments to xq_alloc_seq()? */
489         if (!xq_alloc_seq(&vlmcd->free_ios, cmdline_nr_ops, cmdline_nr_ops)) {
490                 perr(PE, 0, "could not allocate memory [free_ios]");
491                 ret = -ENOMEM;
492                 goto out_with_ios;
493         }
494
495         /* FIXME: If xseg library fails, is errno set? */
496         if (xseg_initialize()) {
497                 perr(PE, 0, "could not initialize xseg library");
498                 ret = -EIO;
499                 goto out_with_freeios;
500         }
501
502         if (! (vlmcd->xseg = join_or_create(cmdline_xseg_spec))) {
503                 perr(PE, 0, "could not join or create xseg with spec '%s'\n",
504                         cmdline_xseg_spec);
505                 ret = -EIO;
506                 goto out_with_xseginit;
507         }
508
509         if (! (vlmcd->vport = xseg_bind_port(vlmcd->xseg, vlmcd->vportno))) {
510                 perr(PE, 0, "cannot bind to xseg port %ld", (long)vlmcd->vportno);
511                 ret = -EIO;
512                 goto out_with_xsegjoin;
513         }
514
515         vlmcd->vportno = xseg_portno(vlmcd->xseg, vlmcd->vport);
516         
517         perr(PI, 0, "vlmcd on port %u of %u",
518                 vlmcd->vportno, vlmcd->xseg->config.nr_ports);
519
520         ret = 0;
521         goto out;
522
523 out_with_xsegjoin:
524         xseg_leave(vlmcd->xseg);
525 out_with_xseginit:
526         always_assert(xseg_finalize() == 0);
527 out_with_freeios:
528         xq_free(&vlmcd->free_ios);
529 out_with_ios:
530         free(vlmcd->ios);
531 out:
532         return ret;
533 }
534
535 int main(int argc, char *argv[])
536 {
537         struct vlmcd vlmc;
538
539         init_perr("vlmcd");
540         parse_cmdline(argc, argv);
541
542         perr(PI, 0, "v = %ld, m = %ld, b = %ld, nr_ops = %lu\n",
543                 cmdline_vportno, cmdline_mportno, cmdline_bportno, cmdline_nr_ops);
544
545         if (vlmcd_init(&vlmc) < 0)
546                 perr(PFE, 0, "failed to initialize vlmcd");
547
548         return vlmcd_loop(&vlmc);
549 }