mapperd: Always sleep before checking request state.
[archipelago] / xseg / peers / user / vlmcd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 /*
36  * The VoLuMe Composer
37  */
38
39 #define _GNU_SOURCE
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <unistd.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <aio.h>
49 #include <signal.h>
50 #include <limits.h>
51 #include <xseg/xseg.h>
52 #include <pthread.h>
53
54 #include <xseg/protocol.h>
55
56 #include "common.h"  /* Please fix me */
57
58 #define MAX_PATH_SIZE 255
59 #define MAX_FILENAME_SIZE 255
60
61 #define DEFAULT_NR_OPS 128
62
63 #define VLMCD_SANITY_CHECKS 1
64
65 /*
66  * Globals, holding command-line arguments
67  */
68 long cmdline_vportno = -1;
69 long cmdline_mportno = -1;
70 long cmdline_bportno = -1;
71 char *cmdline_xseg_spec = NULL;
72 long cmdline_nr_ops = DEFAULT_NR_OPS;
73
74 /*
75  * vlmcd-specific structure,
76  * containing information on a pending I/O operation
77  */
78 /* FIXME: XS_CONCLUDED equals XS_SERVING? */
79 /* FIXME: is it really vlmcd-specific? */
80 enum io_state_enum {
81         ACCEPTED = 0,
82         MAPPING = 1,
83         SERVING = 2,
84         CONCLUDED = 3
85 };
86
87 struct io {
88         enum io_state_enum state;
89         struct xseg_request *vreq;
90         struct xseg_request *mreq;
91         struct xseg_request **breqs;
92         int breqs_len, breq_cnt;
93 };
94
95 struct vlmcd {
96         struct xseg *xseg;
97         struct xseg_port *vport;
98         uint32_t vportno, mportno, bportno;
99
100         int flying;
101         long nr_ops;
102         struct xq free_ops;
103
104         struct xq free_ios;
105         struct io *ios;
106
107 };
108
109 static inline struct io *__io_from_idx(struct vlmcd *vlmcd, xqindex idx)
110 {
111         if (idx >= vlmcd->nr_ops) {
112                 perr(PE, 0, "Internal error: called with idx = %ld > %ld",
113                         (long)idx, vlmcd->nr_ops);
114                 return NULL;
115         }
116
117         return &vlmcd->ios[idx];
118 }
119
120 static inline xqindex __idx_from_io(struct vlmcd *vlmcd, struct io *io)
121 {
122         long idx = io - vlmcd->ios;
123
124         if (idx < 0 || idx >= vlmcd->nr_ops) {
125                 perr(PE, 0, "Internal error: called with io = %p, idx = %ld, "
126                         "nr_ops = %ld",
127                         (void *)io, (long)(io - vlmcd->ios), vlmcd->nr_ops);
128                 return Noneidx;
129         }
130
131         return idx;
132 }
133
134 static inline struct io *alloc_io(struct vlmcd *vlmcd)
135 {
136         xqindex idx = xq_pop_head(&vlmcd->free_ios, 1);
137         if (idx == Noneidx)
138                 return NULL;
139         ++vlmcd->flying;
140 //      perr(PI, 0, "alloc'd io %p, in-flight reqs: %d", (void *)&vlmcd->ios[idx], vlmcd->flying);
141         return &vlmcd->ios[idx];
142 }
143
144 static inline void free_io(struct vlmcd *vlmcd, struct io *io)
145 {
146         /* FIXME: what if xq_append_head() fails? */
147         xq_append_head(&vlmcd->free_ios, __idx_from_io(vlmcd, io), 1);
148         --vlmcd->flying;
149 }
150
151 static int complete(struct vlmcd *vlmcd, struct io *io)
152 {
153         int ret;
154
155         io->vreq->state |= XS_SERVED;
156 //      perr(PI, 0, "completed io %p", (void *)io);
157         ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
158         always_assert(ret != NoSerial);
159         ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
160         always_assert(ret == 0);
161
162         return 0;
163 }
164
165 static int usage(char *argv0)
166 {
167         fprintf(stderr,
168                 "Usage: %s [-p VLMCD_PORT] [-m MAPPERD_PORT]"
169                         "[-b BLOCKD_POART] [-g XSEG_SPEC] [-n NR_OPS]\n\n"
170                 "where:\n"
171                 "\tVLMCD_PORT: xseg port to listen for requests on\n"
172                 "\tMAPPERD_PORT: xseg port where the mapper lives\n"
173                 "\tBLOCKD_PORT: xseg port where blockd/filed/sosd lives\n"
174                 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
175                         "request_size:extra_size:page_shift'\n"
176                 "\tNR_OPS: number of outstanding xseg requests\n",
177                 argv0);
178
179         return 1;
180 }
181
182 static int safe_atoi(char *s)
183 {
184         long l;
185         char *endp;
186
187         l = strtol(s, &endp, 10);
188         if (s != endp && *endp == '\0')
189                 return l;
190         else
191                 return -1;
192 }
193
194 static void parse_cmdline(int argc, char **argv)
195 {
196         for (;;) {
197                 int c;
198
199                 opterr = 0;
200                 c = getopt(argc, argv, "+:hp:m:b:n:g:");
201                 if (c == -1)
202                         break;
203                 
204                 switch(c) {
205                         case '?':
206                                 perr(PFE, 0, "Unknown option: -%c", optopt);
207                                 break;
208                         case ':':
209                                 perr(PFE, 0, "Option -%c requires an argument",
210                                         optopt);
211                                 break;
212                         case 'h':
213                                 usage(argv[0]);
214                                 exit(0);
215                                 break;
216                         case 'p':
217                                 cmdline_vportno = safe_atoi(optarg);
218                                 break;
219                         case 'm':
220                                 cmdline_mportno = safe_atoi(optarg);
221                                 break;
222                         case 'b':
223                                 cmdline_bportno = safe_atoi(optarg);
224                                 break;
225                         case 'n':
226                                 cmdline_nr_ops = safe_atoi(optarg);
227                                 break;
228                         case 'g':
229                                 /* FIXME: Max length of spec? strdup, eww */
230                                 cmdline_xseg_spec = strdup(optarg);
231                                 if (!cmdline_xseg_spec)
232                                         perr(PFE, 0, "out of memory");
233                                 break;
234                 }
235         }
236
237         argc -= optind;
238         argv += optind;
239
240         /* Sanity check for all arguments */
241         if (cmdline_vportno < 0)
242                 perr(PFE, 0, "no or invalid port specified for vlmcd");
243         if (cmdline_mportno < 0)
244                 perr(PFE, 0, "no or invalid port specified for mapperd");
245         if (cmdline_bportno < 0)
246                 perr(PFE, 0, "no or invalid port specified for blockd/filed/sosd");
247         if (cmdline_nr_ops < 1)
248                 perr(PFE, 0, "specified outstanding request count is invalid");
249         if (!cmdline_xseg_spec)
250                 perr(PFE, 0, "xseg specification is mandatory");
251
252         if (argc)
253                 perr(PFE, 0, "Non-option arguments specified on command line");
254 }
255
256 static struct xseg *join_or_create(char *spec)
257 {
258         struct xseg_config config;
259         struct xseg *xseg;
260
261         (void)xseg_parse_spec(spec, &config);
262         xseg = xseg_join(config.type, config.name, "posix", NULL);
263         if (xseg)
264                 return xseg;
265
266         (void)xseg_create(&config);
267         return xseg_join(config.type, config.name, "posix", NULL);
268 }
269
270 /*
271  * FIXME: What happens if this function fails?
272  * FIXME: How does this function fail? Do we return values from <errno.h>
273  * FIXME: Error reporting: Who prints errors, who prints errno?
274  */
275 static int dispatch(struct vlmcd *vlmcd, struct io *io, struct xseg_request *xreq)
276 {
277         struct xseg *xseg;
278         uint32_t vportno;
279         int i, ret;
280         uint64_t pos;
281
282         always_assert(vlmcd);
283         always_assert(io);
284         xseg = vlmcd->xseg;
285         always_assert(xseg);
286         vportno = vlmcd->vportno;
287
288         /* FIXME: Arguments, sanity checks on them? */
289         switch (io->state) {
290         case ACCEPTED:
291                 /*
292                  * Step 1: Issue a request to the mapper.
293                  */
294                 /* FIXME: xseglog(), strerror(), etc */
295                 /* FIXME: xreq->target a pointer?! why not a field, like * xreq->op? */
296                 always_assert(io->vreq == xreq);
297                 io->vreq->serviced = 0;
298                 io->mreq = xseg_get_request(xseg, vportno);
299                 always_assert(io->mreq);
300                 /*
301                  * FIXME:
302                  * We only care about the length of the target name
303                  * and hope the mapper reply fits in the remaining datalen
304                  * bytes.
305                  */
306                 ret = xseg_prep_request(xseg, io->mreq, io->vreq->targetlen,
307                         io->mreq->bufferlen - io->vreq->targetlen);
308                 always_assert(ret == 0);
309
310                 struct xseg_request *m = io->mreq;
311                 strncpy(m->target, io->vreq->target, m->targetlen);
312                 m->size = io->vreq->size;
313                 m->offset = io->vreq->offset;
314                 m->flags = 0;
315                 m->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
316                 switch (io->vreq->op) {
317                         case X_READ:  m->op = X_MAPR; break;
318                         case X_WRITE: m->op = X_MAPW; break;
319                         case X_INFO:  m->op = X_INFO; break;
320                         default:
321                                 perr(PFE, 0, "Internal error? io->vreq->op = "
322                                         "%d\n", io->vreq->op);
323                 }
324                 if (m->op == X_INFO) {
325                         ret = xseg_submit(xseg, vlmcd->bportno, io->mreq);
326                         always_assert(ret != NoSerial);
327                         always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
328                 }
329                 else {
330                         ret = xseg_submit(xseg, vlmcd->mportno, io->mreq);
331                         always_assert(ret != NoSerial);
332                         always_assert(xseg_signal(xseg, vlmcd->mportno) == 0);
333                 }
334
335                 io->state = MAPPING;
336                 break;
337         case MAPPING:
338                 /*
339                  * Step 2. Issue block requests, one per segment
340                  * in the reply from the mapper.
341                  */
342                 /* FIXME */
343                 /* For every mapped segment, issue a request to blockd */
344                 /* FIXME: what if we run out of xseg requests? */
345                 always_assert(xreq == io->mreq);
346                 always_assert(!(xreq->state & XS_FAILED) && xreq->state & XS_SERVED); /* FIXME: This is too harsh */
347                 if (xreq->op == X_INFO) {
348                         *(off_t *)io->vreq->data = *(off_t *)io->mreq->data;
349                         io->vreq->state |= XS_SERVED;
350
351                         ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
352                         always_assert(ret != NoSerial);
353                         ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
354                         always_assert(ret == 0);
355                         io->state = CONCLUDED;
356                         always_assert(xseg_put_request(xseg, vportno, io->mreq) != NoSerial);
357                         free_io(vlmcd, io);
358                 } else {
359                         struct xseg_reply_map *mreply = (void *)io->mreq->data;
360                         always_assert(mreply->cnt > 0);
361                         //perr(PE, 0, "%llu %llu %llu mreply->target = %d\n", mreply->cnt, mreply->segs[0].size, mreply->segs[0].offset, mreply->segs[0].target[0]);
362
363                         io->breqs_len = mreply->cnt;
364                         io->breqs = calloc(io->breqs_len, sizeof(struct xseg_request *));
365                         always_assert(io->breqs);
366                         for (i = 0, pos = 0; i < mreply->cnt; i++) {
367                                 uint64_t datalen, offset, targetlen;
368                                 struct xseg_request *breq;
369
370                                 datalen = mreply->segs[i].size;
371                                 offset = mreply->segs[i].offset;
372                                 targetlen = strlen(mreply->segs[i].target);
373
374                                 breq = xseg_get_request(xseg, vportno);
375                                 always_assert(breq);
376                                 always_assert(datalen + targetlen <= breq->bufferlen);
377
378                                 ret = xseg_prep_request(xseg, breq, targetlen, datalen);
379                                 breq->datalen = datalen;
380                                 breq->offset = offset;
381                                 breq->size = datalen;
382                                 breq->op = io->vreq->op;
383                                 breq->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
384                                 strncpy(breq->target, mreply->segs[i].target, targetlen);
385                                 /*
386                                  * Get the blocker to place data directly into vreq's
387                                  * buffer. FIXME: Manipulate ->data by hand?
388                                  */
389                                 breq->data = io->vreq->data + pos;
390                                 pos += datalen;
391
392                                 ret = xseg_submit(xseg, vlmcd->bportno, breq);
393                                 always_assert(ret != NoSerial);
394                                 /* possible race? */
395                                 io->breqs[i] = breq;
396                                 always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
397                         }
398                         io->breq_cnt = i;
399                         ret = xseg_put_request(xseg, vportno, io->mreq);
400                         always_assert(ret == 0);
401
402                         io->state = SERVING;
403                 }
404                 break;
405         case SERVING:
406                 /*
407                  * One of the breqs has been completed.
408                  * Update io and vreq counters, complete vreq when
409                  * all of the data have arrived.
410                  */
411 #if VLMCD_SANITY_CHECKS
412                 for (i = 0; i < io->breqs_len; i++)
413                         if (io->breqs[i] == xreq)
414                                 break;
415                 if (i >= io->breqs_len) {
416                         perr(PE, 0, "Called for xreq = %p, not belonging to io %p",
417                                 (void *)xreq, (void *)io);
418                         always_assert(0);
419                         /* FIXME: how do I handle this? */
420                 }
421 #endif
422                 struct xseg_request *breq = xreq;
423                 always_assert(!(breq->state & XS_FAILED) && breq->state & XS_SERVED);
424                 always_assert(breq->serviced == breq->size);
425                 io->vreq->serviced += breq->serviced;
426                 ret = xseg_put_request(xseg, vportno, breq);
427                 always_assert(ret == 0);
428                 
429                 if (--io->breq_cnt == 0) {
430                         always_assert(io->vreq->serviced == io->vreq->datalen);
431                         complete(vlmcd, io);
432                         io->state = CONCLUDED;
433                         free_io(vlmcd, io);
434                 }
435                 break;
436         case CONCLUDED:
437                 perr(PFE, 0, "Internal error, called for CONCLUDED");
438                 break;
439         default:
440                 perr(PFE, 0, "Internal error, io->state = %d\n", io->state);
441         }
442
443         return 0;
444 }
445
446 static int vlmcd_loop(struct vlmcd *vlmcd)
447 {
448         int ret;
449         struct io *io;
450         struct xseg_request *xreq;
451         struct xseg *xseg = vlmcd->xseg;
452         uint32_t vportno = vlmcd->vportno;
453
454         always_assert(xseg);
455
456         for (;;) {
457                 ret = xseg_prepare_wait(xseg, vportno);
458                 always_assert(ret == 0);
459
460                 io = NULL;
461                 /*
462                  * Accept requests from xseg if under the nr_ops limit,
463                  * and check if any replies have been received.
464                  *
465                  * Use ->priv for tracking, retrieve the relevant io struct
466                  * we reply upon our peers to not have touched -> priv
467                  */
468                 if (vlmcd->flying < vlmcd->nr_ops &&
469                     (xreq = xseg_accept(xseg, vportno))) {
470                         io = alloc_io(vlmcd);
471                         io->vreq = xreq;
472                         io->state = ACCEPTED;
473                 } else {
474                         xreq = xseg_receive(xseg, vportno);
475                         if (xreq) {
476                                 io = __io_from_idx(vlmcd, xreq->priv);
477                                 always_assert(io);
478                                 always_assert(io->state != CONCLUDED);
479                         }
480                 }
481
482                 /* io is the pending io currently being processed */
483                 if (io) {
484                         /* FIXME: WHY cancel_wait() anyway? */
485                         ret = xseg_cancel_wait(xseg, vportno);
486                         always_assert(ret == 0);
487                         dispatch(vlmcd, io, xreq);
488                 } else {
489                         /*
490                          * If things are OK, no timeout should ever be needed.
491                          * Otherwise, it's a vlmcd or xseg bug.
492                          * FIXME: sigtimedwait() with zero-valued timeout?
493                          * FAIL.
494                          */
495                         xseg_wait_signal(xseg, 100000UL);
496                 }
497         }
498
499         return 0;
500 }
501
502 /*
503  * FIXME: Initialize the vlmcd struct based on cmdline_* vars
504  */
505 static int vlmcd_init(struct vlmcd *vlmcd)
506 {
507         int ret;
508
509         vlmcd->vportno = cmdline_vportno;
510         vlmcd->mportno = cmdline_mportno;
511         vlmcd->bportno = cmdline_bportno;
512
513         vlmcd->flying = 0;
514         vlmcd->nr_ops = cmdline_nr_ops;
515         vlmcd->ios = calloc(vlmcd->nr_ops, sizeof(struct io));
516         if (!vlmcd->ios) {
517                 perr(PE, 0, "could not allocate memory [ios]");
518                 ret = -ENOMEM;
519                 goto out;
520         }
521
522         /* FIXME: meaning of arguments to xq_alloc_seq()? */
523         if (!xq_alloc_seq(&vlmcd->free_ios, cmdline_nr_ops, cmdline_nr_ops)) {
524                 perr(PE, 0, "could not allocate memory [free_ios]");
525                 ret = -ENOMEM;
526                 goto out_with_ios;
527         }
528
529         /* FIXME: If xseg library fails, is errno set? */
530         if (xseg_initialize()) {
531                 perr(PE, 0, "could not initialize xseg library");
532                 ret = -EIO;
533                 goto out_with_freeios;
534         }
535
536         if (! (vlmcd->xseg = join_or_create(cmdline_xseg_spec))) {
537                 perr(PE, 0, "could not join or create xseg with spec '%s'\n",
538                         cmdline_xseg_spec);
539                 ret = -EIO;
540                 goto out_with_xseginit;
541         }
542
543         if (! (vlmcd->vport = xseg_bind_port(vlmcd->xseg, vlmcd->vportno))) {
544                 perr(PE, 0, "cannot bind to xseg port %ld", (long)vlmcd->vportno);
545                 ret = -EIO;
546                 goto out_with_xsegjoin;
547         }
548
549         vlmcd->vportno = xseg_portno(vlmcd->xseg, vlmcd->vport);
550         
551         perr(PI, 0, "vlmcd on port %u of %u",
552                 vlmcd->vportno, vlmcd->xseg->config.nr_ports);
553
554         ret = 0;
555         goto out;
556
557 out_with_xsegjoin:
558         xseg_leave(vlmcd->xseg);
559 out_with_xseginit:
560         always_assert(xseg_finalize() == 0);
561 out_with_freeios:
562         xq_free(&vlmcd->free_ios);
563 out_with_ios:
564         free(vlmcd->ios);
565 out:
566         return ret;
567 }
568
569 int main(int argc, char *argv[])
570 {
571         struct vlmcd vlmc;
572
573         init_perr("vlmcd");
574         parse_cmdline(argc, argv);
575
576         perr(PI, 0, "v = %ld, m = %ld, b = %ld, nr_ops = %lu\n",
577                 cmdline_vportno, cmdline_mportno, cmdline_bportno, cmdline_nr_ops);
578
579         if (vlmcd_init(&vlmc) < 0)
580                 perr(PFE, 0, "failed to initialize vlmcd");
581
582         return vlmcd_loop(&vlmc);
583 }