Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / vlmcd.c @ a7fb19e1

History | View | Annotate | Download (15 kB)

1 4a131321 Vangelis Koukis
/*
2 6830c9ff Filippos Giannakos
 * Copyright 2012 GRNET S.A. All rights reserved.
3 6830c9ff Filippos Giannakos
 *
4 6830c9ff Filippos Giannakos
 * Redistribution and use in source and binary forms, with or
5 6830c9ff Filippos Giannakos
 * without modification, are permitted provided that the following
6 6830c9ff Filippos Giannakos
 * conditions are met:
7 6830c9ff Filippos Giannakos
 *
8 6830c9ff Filippos Giannakos
 *   1. Redistributions of source code must retain the above
9 6830c9ff Filippos Giannakos
 *      copyright notice, this list of conditions and the following
10 6830c9ff Filippos Giannakos
 *      disclaimer.
11 6830c9ff Filippos Giannakos
 *   2. Redistributions in binary form must reproduce the above
12 6830c9ff Filippos Giannakos
 *      copyright notice, this list of conditions and the following
13 6830c9ff Filippos Giannakos
 *      disclaimer in the documentation and/or other materials
14 6830c9ff Filippos Giannakos
 *      provided with the distribution.
15 6830c9ff Filippos Giannakos
 *
16 6830c9ff Filippos Giannakos
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 6830c9ff Filippos Giannakos
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 6830c9ff Filippos Giannakos
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 6830c9ff Filippos Giannakos
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 6830c9ff Filippos Giannakos
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 6830c9ff Filippos Giannakos
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 6830c9ff Filippos Giannakos
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 6830c9ff Filippos Giannakos
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 6830c9ff Filippos Giannakos
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 6830c9ff Filippos Giannakos
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 6830c9ff Filippos Giannakos
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 6830c9ff Filippos Giannakos
 * POSSIBILITY OF SUCH DAMAGE.
28 6830c9ff Filippos Giannakos
 *
29 6830c9ff Filippos Giannakos
 * The views and conclusions contained in the software and
30 6830c9ff Filippos Giannakos
 * documentation are those of the authors and should not be
31 6830c9ff Filippos Giannakos
 * interpreted as representing official policies, either expressed
32 6830c9ff Filippos Giannakos
 * or implied, of GRNET S.A.
33 6830c9ff Filippos Giannakos
 */
34 6830c9ff Filippos Giannakos
35 6830c9ff Filippos Giannakos
/*
36 4a131321 Vangelis Koukis
 * The VoLuMe Composer
37 4a131321 Vangelis Koukis
 */
38 4a131321 Vangelis Koukis
39 4a131321 Vangelis Koukis
#define _GNU_SOURCE
40 4a131321 Vangelis Koukis
#include <stdio.h>
41 4a131321 Vangelis Koukis
#include <stdlib.h>
42 4a131321 Vangelis Koukis
#include <sys/types.h>
43 4a131321 Vangelis Koukis
#include <sys/stat.h>
44 4a131321 Vangelis Koukis
#include <unistd.h>
45 4a131321 Vangelis Koukis
#include <string.h>
46 4a131321 Vangelis Koukis
#include <fcntl.h>
47 4a131321 Vangelis Koukis
#include <errno.h>
48 4a131321 Vangelis Koukis
#include <aio.h>
49 4a131321 Vangelis Koukis
#include <signal.h>
50 4a131321 Vangelis Koukis
#include <limits.h>
51 4a131321 Vangelis Koukis
#include <xseg/xseg.h>
52 4a131321 Vangelis Koukis
#include <pthread.h>
53 4a131321 Vangelis Koukis
54 d813051b Stratos Psomadakis
#include <xseg/protocol.h>
55 d813051b Stratos Psomadakis
56 4a131321 Vangelis Koukis
#include "common.h"  /* Please fix me */
57 4a131321 Vangelis Koukis
58 4a131321 Vangelis Koukis
#define MAX_PATH_SIZE 255
59 4a131321 Vangelis Koukis
#define MAX_FILENAME_SIZE 255
60 4a131321 Vangelis Koukis
61 d813051b Stratos Psomadakis
#define DEFAULT_NR_OPS 128
62 d813051b Stratos Psomadakis
63 d813051b Stratos Psomadakis
#define VLMCD_SANITY_CHECKS 1
64 4a131321 Vangelis Koukis
65 4a131321 Vangelis Koukis
/*
66 4a131321 Vangelis Koukis
 * Globals, holding command-line arguments
67 4a131321 Vangelis Koukis
 */
68 d813051b Stratos Psomadakis
long cmdline_vportno = -1;
69 d813051b Stratos Psomadakis
long cmdline_mportno = -1;
70 d813051b Stratos Psomadakis
long cmdline_bportno = -1;
71 d813051b Stratos Psomadakis
char *cmdline_xseg_spec = NULL;
72 4a131321 Vangelis Koukis
long cmdline_nr_ops = DEFAULT_NR_OPS;
73 4a131321 Vangelis Koukis
74 4a131321 Vangelis Koukis
/*
75 4a131321 Vangelis Koukis
 * vlmcd-specific structure,
76 4a131321 Vangelis Koukis
 * containing information on a pending I/O operation
77 4a131321 Vangelis Koukis
 */
78 4a131321 Vangelis Koukis
/* FIXME: XS_CONCLUDED equals XS_SERVING? */
79 4a131321 Vangelis Koukis
/* FIXME: is it really vlmcd-specific? */
80 4a131321 Vangelis Koukis
enum io_state_enum {
81 4a131321 Vangelis Koukis
        ACCEPTED = 0,
82 d813051b Stratos Psomadakis
        MAPPING = 1,
83 d813051b Stratos Psomadakis
        SERVING = 2,
84 4a131321 Vangelis Koukis
        CONCLUDED = 3
85 3eb3f140 Vangelis Koukis
};
86 4a131321 Vangelis Koukis
87 4a131321 Vangelis Koukis
struct io {
88 4a131321 Vangelis Koukis
        enum io_state_enum state;
89 d813051b Stratos Psomadakis
        struct xseg_request *vreq;
90 d813051b Stratos Psomadakis
        struct xseg_request *mreq;
91 d813051b Stratos Psomadakis
        struct xseg_request **breqs;
92 d813051b Stratos Psomadakis
        int breqs_len, breq_cnt;
93 d813051b Stratos Psomadakis
};
94 d813051b Stratos Psomadakis
95 d813051b Stratos Psomadakis
struct vlmcd {
96 d813051b Stratos Psomadakis
        struct xseg *xseg;
97 d813051b Stratos Psomadakis
        struct xseg_port *vport;
98 d813051b Stratos Psomadakis
        uint32_t vportno, mportno, bportno;
99 d813051b Stratos Psomadakis
100 d813051b Stratos Psomadakis
        int flying;
101 d813051b Stratos Psomadakis
        long nr_ops;
102 d813051b Stratos Psomadakis
        struct xq free_ops;
103 d813051b Stratos Psomadakis
104 d813051b Stratos Psomadakis
        struct xq free_ios;
105 d813051b Stratos Psomadakis
        struct io *ios;
106 d813051b Stratos Psomadakis
107 3eb3f140 Vangelis Koukis
};
108 4a131321 Vangelis Koukis
109 d813051b Stratos Psomadakis
static inline struct io *__io_from_idx(struct vlmcd *vlmcd, xqindex idx)
110 d813051b Stratos Psomadakis
{
111 d813051b Stratos Psomadakis
        if (idx >= vlmcd->nr_ops) {
112 d813051b Stratos Psomadakis
                perr(PE, 0, "Internal error: called with idx = %ld > %ld",
113 d813051b Stratos Psomadakis
                        (long)idx, vlmcd->nr_ops);
114 d813051b Stratos Psomadakis
                return NULL;
115 d813051b Stratos Psomadakis
        }
116 d813051b Stratos Psomadakis
117 d813051b Stratos Psomadakis
        return &vlmcd->ios[idx];
118 d813051b Stratos Psomadakis
}
119 d813051b Stratos Psomadakis
120 d813051b Stratos Psomadakis
static inline xqindex __idx_from_io(struct vlmcd *vlmcd, struct io *io)
121 d813051b Stratos Psomadakis
{
122 d813051b Stratos Psomadakis
        long idx = io - vlmcd->ios;
123 d813051b Stratos Psomadakis
124 d813051b Stratos Psomadakis
        if (idx < 0 || idx >= vlmcd->nr_ops) {
125 d813051b Stratos Psomadakis
                perr(PE, 0, "Internal error: called with io = %p, idx = %ld, "
126 d813051b Stratos Psomadakis
                        "nr_ops = %ld",
127 d813051b Stratos Psomadakis
                        (void *)io, (long)(io - vlmcd->ios), vlmcd->nr_ops);
128 d813051b Stratos Psomadakis
                return Noneidx;
129 d813051b Stratos Psomadakis
        }
130 d813051b Stratos Psomadakis
131 d813051b Stratos Psomadakis
        return idx;
132 d813051b Stratos Psomadakis
}
133 d813051b Stratos Psomadakis
134 d813051b Stratos Psomadakis
static inline struct io *alloc_io(struct vlmcd *vlmcd)
135 d813051b Stratos Psomadakis
{
136 164d1586 Filippos Giannakos
        xqindex idx = xq_pop_head(&vlmcd->free_ios, 1);
137 d813051b Stratos Psomadakis
        if (idx == Noneidx)
138 d813051b Stratos Psomadakis
                return NULL;
139 d813051b Stratos Psomadakis
        ++vlmcd->flying;
140 d813051b Stratos Psomadakis
//        perr(PI, 0, "alloc'd io %p, in-flight reqs: %d", (void *)&vlmcd->ios[idx], vlmcd->flying);
141 d813051b Stratos Psomadakis
        return &vlmcd->ios[idx];
142 d813051b Stratos Psomadakis
}
143 d813051b Stratos Psomadakis
144 d813051b Stratos Psomadakis
static inline void free_io(struct vlmcd *vlmcd, struct io *io)
145 d813051b Stratos Psomadakis
{
146 d813051b Stratos Psomadakis
        /* FIXME: what if xq_append_head() fails? */
147 164d1586 Filippos Giannakos
        xq_append_head(&vlmcd->free_ios, __idx_from_io(vlmcd, io), 1);
148 d813051b Stratos Psomadakis
        --vlmcd->flying;
149 d813051b Stratos Psomadakis
}
150 d813051b Stratos Psomadakis
151 d813051b Stratos Psomadakis
static int complete(struct vlmcd *vlmcd, struct io *io)
152 d813051b Stratos Psomadakis
{
153 d813051b Stratos Psomadakis
        int ret;
154 d813051b Stratos Psomadakis
155 d813051b Stratos Psomadakis
        io->vreq->state |= XS_SERVED;
156 d813051b Stratos Psomadakis
//        perr(PI, 0, "completed io %p", (void *)io);
157 d813051b Stratos Psomadakis
        ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
158 d813051b Stratos Psomadakis
        always_assert(ret != NoSerial);
159 d813051b Stratos Psomadakis
        ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
160 d813051b Stratos Psomadakis
        always_assert(ret == 0);
161 d813051b Stratos Psomadakis
162 d813051b Stratos Psomadakis
        return 0;
163 d813051b Stratos Psomadakis
}
164 d813051b Stratos Psomadakis
165 4a131321 Vangelis Koukis
static int usage(char *argv0)
166 4a131321 Vangelis Koukis
{
167 4a131321 Vangelis Koukis
        fprintf(stderr,
168 4a131321 Vangelis Koukis
                "Usage: %s [-p VLMCD_PORT] [-m MAPPERD_PORT]"
169 d813051b Stratos Psomadakis
                        "[-b BLOCKD_POART] [-g XSEG_SPEC] [-n NR_OPS]\n\n"
170 4a131321 Vangelis Koukis
                "where:\n"
171 4a131321 Vangelis Koukis
                "\tVLMCD_PORT: xseg port to listen for requests on\n"
172 4a131321 Vangelis Koukis
                "\tMAPPERD_PORT: xseg port where the mapper lives\n"
173 4a131321 Vangelis Koukis
                "\tBLOCKD_PORT: xseg port where blockd/filed/sosd lives\n"
174 d813051b Stratos Psomadakis
                "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
175 4a131321 Vangelis Koukis
                        "request_size:extra_size:page_shift'\n"
176 4a131321 Vangelis Koukis
                "\tNR_OPS: number of outstanding xseg requests\n",
177 4a131321 Vangelis Koukis
                argv0);
178 4a131321 Vangelis Koukis
179 4a131321 Vangelis Koukis
        return 1;
180 4a131321 Vangelis Koukis
}
181 4a131321 Vangelis Koukis
182 4a131321 Vangelis Koukis
static int safe_atoi(char *s)
183 4a131321 Vangelis Koukis
{
184 4a131321 Vangelis Koukis
        long l;
185 4a131321 Vangelis Koukis
        char *endp;
186 4a131321 Vangelis Koukis
187 4a131321 Vangelis Koukis
        l = strtol(s, &endp, 10);
188 4a131321 Vangelis Koukis
        if (s != endp && *endp == '\0')
189 4a131321 Vangelis Koukis
                return l;
190 4a131321 Vangelis Koukis
        else
191 4a131321 Vangelis Koukis
                return -1;
192 4a131321 Vangelis Koukis
}
193 4a131321 Vangelis Koukis
194 4a131321 Vangelis Koukis
static void parse_cmdline(int argc, char **argv)
195 4a131321 Vangelis Koukis
{
196 4a131321 Vangelis Koukis
        for (;;) {
197 4a131321 Vangelis Koukis
                int c;
198 4a131321 Vangelis Koukis
199 4a131321 Vangelis Koukis
                opterr = 0;
200 d813051b Stratos Psomadakis
                c = getopt(argc, argv, "+:hp:m:b:n:g:");
201 4a131321 Vangelis Koukis
                if (c == -1)
202 4a131321 Vangelis Koukis
                        break;
203 4a131321 Vangelis Koukis
                
204 4a131321 Vangelis Koukis
                switch(c) {
205 4a131321 Vangelis Koukis
                        case '?':
206 4a131321 Vangelis Koukis
                                perr(PFE, 0, "Unknown option: -%c", optopt);
207 4a131321 Vangelis Koukis
                                break;
208 4a131321 Vangelis Koukis
                        case ':':
209 4a131321 Vangelis Koukis
                                perr(PFE, 0, "Option -%c requires an argument",
210 4a131321 Vangelis Koukis
                                        optopt);
211 4a131321 Vangelis Koukis
                                break;
212 4a131321 Vangelis Koukis
                        case 'h':
213 4a131321 Vangelis Koukis
                                usage(argv[0]);
214 4a131321 Vangelis Koukis
                                exit(0);
215 4a131321 Vangelis Koukis
                                break;
216 4a131321 Vangelis Koukis
                        case 'p':
217 d813051b Stratos Psomadakis
                                cmdline_vportno = safe_atoi(optarg);
218 4a131321 Vangelis Koukis
                                break;
219 4a131321 Vangelis Koukis
                        case 'm':
220 d813051b Stratos Psomadakis
                                cmdline_mportno = safe_atoi(optarg);
221 4a131321 Vangelis Koukis
                                break;
222 4a131321 Vangelis Koukis
                        case 'b':
223 d813051b Stratos Psomadakis
                                cmdline_bportno = safe_atoi(optarg);
224 4a131321 Vangelis Koukis
                                break;
225 4a131321 Vangelis Koukis
                        case 'n':
226 4a131321 Vangelis Koukis
                                cmdline_nr_ops = safe_atoi(optarg);
227 4a131321 Vangelis Koukis
                                break;
228 d813051b Stratos Psomadakis
                        case 'g':
229 d813051b Stratos Psomadakis
                                /* FIXME: Max length of spec? strdup, eww */
230 d813051b Stratos Psomadakis
                                cmdline_xseg_spec = strdup(optarg);
231 d813051b Stratos Psomadakis
                                if (!cmdline_xseg_spec)
232 d813051b Stratos Psomadakis
                                        perr(PFE, 0, "out of memory");
233 d813051b Stratos Psomadakis
                                break;
234 4a131321 Vangelis Koukis
                }
235 4a131321 Vangelis Koukis
        }
236 4a131321 Vangelis Koukis
237 4a131321 Vangelis Koukis
        argc -= optind;
238 4a131321 Vangelis Koukis
        argv += optind;
239 4a131321 Vangelis Koukis
240 4a131321 Vangelis Koukis
        /* Sanity check for all arguments */
241 d813051b Stratos Psomadakis
        if (cmdline_vportno < 0)
242 4a131321 Vangelis Koukis
                perr(PFE, 0, "no or invalid port specified for vlmcd");
243 d813051b Stratos Psomadakis
        if (cmdline_mportno < 0)
244 4a131321 Vangelis Koukis
                perr(PFE, 0, "no or invalid port specified for mapperd");
245 d813051b Stratos Psomadakis
        if (cmdline_bportno < 0)
246 4a131321 Vangelis Koukis
                perr(PFE, 0, "no or invalid port specified for blockd/filed/sosd");
247 4a131321 Vangelis Koukis
        if (cmdline_nr_ops < 1)
248 4a131321 Vangelis Koukis
                perr(PFE, 0, "specified outstanding request count is invalid");
249 d813051b Stratos Psomadakis
        if (!cmdline_xseg_spec)
250 d813051b Stratos Psomadakis
                perr(PFE, 0, "xseg specification is mandatory");
251 4a131321 Vangelis Koukis
252 4a131321 Vangelis Koukis
        if (argc)
253 4a131321 Vangelis Koukis
                perr(PFE, 0, "Non-option arguments specified on command line");
254 4a131321 Vangelis Koukis
}
255 4a131321 Vangelis Koukis
256 d813051b Stratos Psomadakis
static struct xseg *join_or_create(char *spec)
257 4a131321 Vangelis Koukis
{
258 4a131321 Vangelis Koukis
        struct xseg_config config;
259 4a131321 Vangelis Koukis
        struct xseg *xseg;
260 4a131321 Vangelis Koukis
261 4a131321 Vangelis Koukis
        (void)xseg_parse_spec(spec, &config);
262 d813051b Stratos Psomadakis
        xseg = xseg_join(config.type, config.name, "posix", NULL);
263 4a131321 Vangelis Koukis
        if (xseg)
264 4a131321 Vangelis Koukis
                return xseg;
265 4a131321 Vangelis Koukis
266 4a131321 Vangelis Koukis
        (void)xseg_create(&config);
267 d813051b Stratos Psomadakis
        return xseg_join(config.type, config.name, "posix", NULL);
268 4a131321 Vangelis Koukis
}
269 4a131321 Vangelis Koukis
270 d813051b Stratos Psomadakis
/*
271 d813051b Stratos Psomadakis
 * FIXME: What happens if this function fails?
272 d813051b Stratos Psomadakis
 * FIXME: How does this function fail? Do we return values from <errno.h>
273 d813051b Stratos Psomadakis
 * FIXME: Error reporting: Who prints errors, who prints errno?
274 d813051b Stratos Psomadakis
 */
275 d813051b Stratos Psomadakis
static int dispatch(struct vlmcd *vlmcd, struct io *io, struct xseg_request *xreq)
276 4a131321 Vangelis Koukis
{
277 d813051b Stratos Psomadakis
        struct xseg *xseg;
278 d813051b Stratos Psomadakis
        uint32_t vportno;
279 d813051b Stratos Psomadakis
        int i, ret;
280 d813051b Stratos Psomadakis
        uint64_t pos;
281 d813051b Stratos Psomadakis
282 d813051b Stratos Psomadakis
        always_assert(vlmcd);
283 d813051b Stratos Psomadakis
        always_assert(io);
284 d813051b Stratos Psomadakis
        xseg = vlmcd->xseg;
285 d813051b Stratos Psomadakis
        always_assert(xseg);
286 d813051b Stratos Psomadakis
        vportno = vlmcd->vportno;
287 d813051b Stratos Psomadakis
288 4a131321 Vangelis Koukis
        /* FIXME: Arguments, sanity checks on them? */
289 4a131321 Vangelis Koukis
        switch (io->state) {
290 4a131321 Vangelis Koukis
        case ACCEPTED:
291 4a131321 Vangelis Koukis
                /*
292 d813051b Stratos Psomadakis
                 * Step 1: Issue a request to the mapper.
293 4a131321 Vangelis Koukis
                 */
294 d813051b Stratos Psomadakis
                /* FIXME: xseglog(), strerror(), etc */
295 d813051b Stratos Psomadakis
                /* FIXME: xreq->target a pointer?! why not a field, like * xreq->op? */
296 d813051b Stratos Psomadakis
                always_assert(io->vreq == xreq);
297 d813051b Stratos Psomadakis
                io->vreq->serviced = 0;
298 d813051b Stratos Psomadakis
                io->mreq = xseg_get_request(xseg, vportno);
299 d813051b Stratos Psomadakis
                always_assert(io->mreq);
300 d813051b Stratos Psomadakis
                /*
301 d813051b Stratos Psomadakis
                 * FIXME:
302 d813051b Stratos Psomadakis
                 * We only care about the length of the target name
303 d813051b Stratos Psomadakis
                 * and hope the mapper reply fits in the remaining datalen
304 d813051b Stratos Psomadakis
                 * bytes.
305 d813051b Stratos Psomadakis
                 */
306 79681fdc Filippos Giannakos
                ret = xseg_prep_request(xseg, io->mreq, io->vreq->targetlen,
307 d813051b Stratos Psomadakis
                        io->mreq->bufferlen - io->vreq->targetlen);
308 d813051b Stratos Psomadakis
                always_assert(ret == 0);
309 d813051b Stratos Psomadakis
310 d813051b Stratos Psomadakis
                struct xseg_request *m = io->mreq;
311 d813051b Stratos Psomadakis
                strncpy(m->target, io->vreq->target, m->targetlen);
312 d813051b Stratos Psomadakis
                m->size = io->vreq->size;
313 d813051b Stratos Psomadakis
                m->offset = io->vreq->offset;
314 d813051b Stratos Psomadakis
                m->flags = 0;
315 d813051b Stratos Psomadakis
                m->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
316 d813051b Stratos Psomadakis
                switch (io->vreq->op) {
317 d813051b Stratos Psomadakis
                        case X_READ:  m->op = X_MAPR; break;
318 d813051b Stratos Psomadakis
                        case X_WRITE: m->op = X_MAPW; break;
319 d813051b Stratos Psomadakis
                        case X_INFO:  m->op = X_INFO; break;
320 d813051b Stratos Psomadakis
                        default:
321 d813051b Stratos Psomadakis
                                perr(PFE, 0, "Internal error? io->vreq->op = "
322 d813051b Stratos Psomadakis
                                        "%d\n", io->vreq->op);
323 d813051b Stratos Psomadakis
                }
324 d813051b Stratos Psomadakis
                if (m->op == X_INFO) {
325 d813051b Stratos Psomadakis
                        ret = xseg_submit(xseg, vlmcd->bportno, io->mreq);
326 d813051b Stratos Psomadakis
                        always_assert(ret != NoSerial);
327 d813051b Stratos Psomadakis
                        always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
328 d813051b Stratos Psomadakis
                }
329 d813051b Stratos Psomadakis
                else {
330 d813051b Stratos Psomadakis
                        ret = xseg_submit(xseg, vlmcd->mportno, io->mreq);
331 d813051b Stratos Psomadakis
                        always_assert(ret != NoSerial);
332 d813051b Stratos Psomadakis
                        always_assert(xseg_signal(xseg, vlmcd->mportno) == 0);
333 d813051b Stratos Psomadakis
                }
334 d813051b Stratos Psomadakis
335 d813051b Stratos Psomadakis
                io->state = MAPPING;
336 4a131321 Vangelis Koukis
                break;
337 d813051b Stratos Psomadakis
        case MAPPING:
338 4a131321 Vangelis Koukis
                /*
339 d813051b Stratos Psomadakis
                 * Step 2. Issue block requests, one per segment
340 d813051b Stratos Psomadakis
                 * in the reply from the mapper.
341 4a131321 Vangelis Koukis
                 */
342 d813051b Stratos Psomadakis
                /* FIXME */
343 d813051b Stratos Psomadakis
                /* For every mapped segment, issue a request to blockd */
344 d813051b Stratos Psomadakis
                /* FIXME: what if we run out of xseg requests? */
345 d813051b Stratos Psomadakis
                always_assert(xreq == io->mreq);
346 d813051b Stratos Psomadakis
                always_assert(!(xreq->state & XS_FAILED) && xreq->state & XS_SERVED); /* FIXME: This is too harsh */
347 d813051b Stratos Psomadakis
                if (xreq->op == X_INFO) {
348 d813051b Stratos Psomadakis
                        *(off_t *)io->vreq->data = *(off_t *)io->mreq->data;
349 d813051b Stratos Psomadakis
                        io->vreq->state |= XS_SERVED;
350 d813051b Stratos Psomadakis
351 d813051b Stratos Psomadakis
                        ret = xseg_respond(vlmcd->xseg, io->vreq->portno, io->vreq);
352 d813051b Stratos Psomadakis
                        always_assert(ret != NoSerial);
353 d813051b Stratos Psomadakis
                        ret = xseg_signal(vlmcd->xseg, io->vreq->portno);
354 d813051b Stratos Psomadakis
                        always_assert(ret == 0);
355 d813051b Stratos Psomadakis
                        io->state = CONCLUDED;
356 d813051b Stratos Psomadakis
                        always_assert(xseg_put_request(xseg, vportno, io->mreq) != NoSerial);
357 d813051b Stratos Psomadakis
                        free_io(vlmcd, io);
358 d813051b Stratos Psomadakis
                } else {
359 d813051b Stratos Psomadakis
                        struct xseg_reply_map *mreply = (void *)io->mreq->data;
360 d813051b Stratos Psomadakis
                        always_assert(mreply->cnt > 0);
361 d813051b Stratos Psomadakis
                        //perr(PE, 0, "%llu %llu %llu mreply->target = %d\n", mreply->cnt, mreply->segs[0].size, mreply->segs[0].offset, mreply->segs[0].target[0]);
362 d813051b Stratos Psomadakis
363 d813051b Stratos Psomadakis
                        io->breqs_len = mreply->cnt;
364 d813051b Stratos Psomadakis
                        io->breqs = calloc(io->breqs_len, sizeof(struct xseg_request *));
365 d813051b Stratos Psomadakis
                        always_assert(io->breqs);
366 d813051b Stratos Psomadakis
                        for (i = 0, pos = 0; i < mreply->cnt; i++) {
367 d813051b Stratos Psomadakis
                                uint64_t datalen, offset, targetlen;
368 d813051b Stratos Psomadakis
                                struct xseg_request *breq;
369 d813051b Stratos Psomadakis
370 d813051b Stratos Psomadakis
                                datalen = mreply->segs[i].size;
371 d813051b Stratos Psomadakis
                                offset = mreply->segs[i].offset;
372 d813051b Stratos Psomadakis
                                targetlen = strlen(mreply->segs[i].target);
373 d813051b Stratos Psomadakis
374 d813051b Stratos Psomadakis
                                breq = xseg_get_request(xseg, vportno);
375 d813051b Stratos Psomadakis
                                always_assert(breq);
376 d813051b Stratos Psomadakis
                                always_assert(datalen + targetlen <= breq->bufferlen);
377 d813051b Stratos Psomadakis
378 79681fdc Filippos Giannakos
                                ret = xseg_prep_request(xseg, breq, targetlen, datalen);
379 d813051b Stratos Psomadakis
                                breq->datalen = datalen;
380 d813051b Stratos Psomadakis
                                breq->offset = offset;
381 d813051b Stratos Psomadakis
                                breq->size = datalen;
382 d813051b Stratos Psomadakis
                                breq->op = io->vreq->op;
383 d813051b Stratos Psomadakis
                                breq->priv = __idx_from_io(vlmcd, io); /* use the io's idx for tracking */
384 d813051b Stratos Psomadakis
                                strncpy(breq->target, mreply->segs[i].target, targetlen);
385 d813051b Stratos Psomadakis
                                /*
386 d813051b Stratos Psomadakis
                                 * Get the blocker to place data directly into vreq's
387 d813051b Stratos Psomadakis
                                 * buffer. FIXME: Manipulate ->data by hand?
388 d813051b Stratos Psomadakis
                                 */
389 d813051b Stratos Psomadakis
                                breq->data = io->vreq->data + pos;
390 d813051b Stratos Psomadakis
                                pos += datalen;
391 d813051b Stratos Psomadakis
392 d813051b Stratos Psomadakis
                                ret = xseg_submit(xseg, vlmcd->bportno, breq);
393 d813051b Stratos Psomadakis
                                always_assert(ret != NoSerial);
394 d813051b Stratos Psomadakis
                                /* possible race? */
395 d813051b Stratos Psomadakis
                                io->breqs[i] = breq;
396 d813051b Stratos Psomadakis
                                always_assert(xseg_signal(xseg, vlmcd->bportno) == 0);
397 d813051b Stratos Psomadakis
                        }
398 d813051b Stratos Psomadakis
                        io->breq_cnt = i;
399 d813051b Stratos Psomadakis
                        ret = xseg_put_request(xseg, vportno, io->mreq);
400 d813051b Stratos Psomadakis
                        always_assert(ret == 0);
401 d813051b Stratos Psomadakis
402 d813051b Stratos Psomadakis
                        io->state = SERVING;
403 d813051b Stratos Psomadakis
                }
404 4a131321 Vangelis Koukis
                break;
405 d813051b Stratos Psomadakis
        case SERVING:
406 4a131321 Vangelis Koukis
                /*
407 d813051b Stratos Psomadakis
                 * One of the breqs has been completed.
408 d813051b Stratos Psomadakis
                 * Update io and vreq counters, complete vreq when
409 d813051b Stratos Psomadakis
                 * all of the data have arrived.
410 4a131321 Vangelis Koukis
                 */
411 d813051b Stratos Psomadakis
#if VLMCD_SANITY_CHECKS
412 d813051b Stratos Psomadakis
                for (i = 0; i < io->breqs_len; i++)
413 d813051b Stratos Psomadakis
                        if (io->breqs[i] == xreq)
414 d813051b Stratos Psomadakis
                                break;
415 d813051b Stratos Psomadakis
                if (i >= io->breqs_len) {
416 d813051b Stratos Psomadakis
                        perr(PE, 0, "Called for xreq = %p, not belonging to io %p",
417 d813051b Stratos Psomadakis
                                (void *)xreq, (void *)io);
418 d813051b Stratos Psomadakis
                        always_assert(0);
419 d813051b Stratos Psomadakis
                        /* FIXME: how do I handle this? */
420 d813051b Stratos Psomadakis
                }
421 d813051b Stratos Psomadakis
#endif
422 d813051b Stratos Psomadakis
                struct xseg_request *breq = xreq;
423 d813051b Stratos Psomadakis
                always_assert(!(breq->state & XS_FAILED) && breq->state & XS_SERVED);
424 d813051b Stratos Psomadakis
                always_assert(breq->serviced == breq->size);
425 d813051b Stratos Psomadakis
                io->vreq->serviced += breq->serviced;
426 d813051b Stratos Psomadakis
                ret = xseg_put_request(xseg, vportno, breq);
427 d813051b Stratos Psomadakis
                always_assert(ret == 0);
428 d813051b Stratos Psomadakis
                
429 d813051b Stratos Psomadakis
                if (--io->breq_cnt == 0) {
430 d813051b Stratos Psomadakis
                        always_assert(io->vreq->serviced == io->vreq->datalen);
431 d813051b Stratos Psomadakis
                        complete(vlmcd, io);
432 d813051b Stratos Psomadakis
                        io->state = CONCLUDED;
433 d813051b Stratos Psomadakis
                        free_io(vlmcd, io);
434 d813051b Stratos Psomadakis
                }
435 d813051b Stratos Psomadakis
                break;
436 d813051b Stratos Psomadakis
        case CONCLUDED:
437 d813051b Stratos Psomadakis
                perr(PFE, 0, "Internal error, called for CONCLUDED");
438 d813051b Stratos Psomadakis
                break;
439 d813051b Stratos Psomadakis
        default:
440 d813051b Stratos Psomadakis
                perr(PFE, 0, "Internal error, io->state = %d\n", io->state);
441 4a131321 Vangelis Koukis
        }
442 4a131321 Vangelis Koukis
443 4a131321 Vangelis Koukis
        return 0;
444 4a131321 Vangelis Koukis
}
445 4a131321 Vangelis Koukis
446 d813051b Stratos Psomadakis
static int vlmcd_loop(struct vlmcd *vlmcd)
447 4a131321 Vangelis Koukis
{
448 d813051b Stratos Psomadakis
        int ret;
449 d813051b Stratos Psomadakis
        struct io *io;
450 d813051b Stratos Psomadakis
        struct xseg_request *xreq;
451 d813051b Stratos Psomadakis
        struct xseg *xseg = vlmcd->xseg;
452 d813051b Stratos Psomadakis
        uint32_t vportno = vlmcd->vportno;
453 d813051b Stratos Psomadakis
454 d813051b Stratos Psomadakis
        always_assert(xseg);
455 4a131321 Vangelis Koukis
456 4a131321 Vangelis Koukis
        for (;;) {
457 d813051b Stratos Psomadakis
                ret = xseg_prepare_wait(xseg, vportno);
458 d813051b Stratos Psomadakis
                always_assert(ret == 0);
459 4a131321 Vangelis Koukis
460 4a131321 Vangelis Koukis
                io = NULL;
461 d813051b Stratos Psomadakis
                /*
462 d813051b Stratos Psomadakis
                 * Accept requests from xseg if under the nr_ops limit,
463 d813051b Stratos Psomadakis
                 * and check if any replies have been received.
464 d813051b Stratos Psomadakis
                 *
465 d813051b Stratos Psomadakis
                 * Use ->priv for tracking, retrieve the relevant io struct
466 d813051b Stratos Psomadakis
                 * we reply upon our peers to not have touched -> priv
467 d813051b Stratos Psomadakis
                 */
468 d813051b Stratos Psomadakis
                if (vlmcd->flying < vlmcd->nr_ops &&
469 d813051b Stratos Psomadakis
                    (xreq = xseg_accept(xseg, vportno))) {
470 d813051b Stratos Psomadakis
                        io = alloc_io(vlmcd);
471 d813051b Stratos Psomadakis
                        io->vreq = xreq;
472 d813051b Stratos Psomadakis
                        io->state = ACCEPTED;
473 d813051b Stratos Psomadakis
                } else {
474 d813051b Stratos Psomadakis
                        xreq = xseg_receive(xseg, vportno);
475 d813051b Stratos Psomadakis
                        if (xreq) {
476 d813051b Stratos Psomadakis
                                io = __io_from_idx(vlmcd, xreq->priv);
477 d813051b Stratos Psomadakis
                                always_assert(io);
478 d813051b Stratos Psomadakis
                                always_assert(io->state != CONCLUDED);
479 4a131321 Vangelis Koukis
                        }
480 4a131321 Vangelis Koukis
                }
481 4a131321 Vangelis Koukis
482 4a131321 Vangelis Koukis
                /* io is the pending io currently being processed */
483 4a131321 Vangelis Koukis
                if (io) {
484 d813051b Stratos Psomadakis
                        /* FIXME: WHY cancel_wait() anyway? */
485 d813051b Stratos Psomadakis
                        ret = xseg_cancel_wait(xseg, vportno);
486 d813051b Stratos Psomadakis
                        always_assert(ret == 0);
487 d813051b Stratos Psomadakis
                        dispatch(vlmcd, io, xreq);
488 4a131321 Vangelis Koukis
                } else {
489 4a131321 Vangelis Koukis
                        /*
490 4a131321 Vangelis Koukis
                          * If things are OK, no timeout should ever be needed.
491 4a131321 Vangelis Koukis
                          * Otherwise, it's a vlmcd or xseg bug.
492 d813051b Stratos Psomadakis
                         * FIXME: sigtimedwait() with zero-valued timeout?
493 d813051b Stratos Psomadakis
                         * FAIL.
494 4a131321 Vangelis Koukis
                          */
495 d813051b Stratos Psomadakis
                        xseg_wait_signal(xseg, 100000UL);
496 4a131321 Vangelis Koukis
                }
497 4a131321 Vangelis Koukis
        }
498 4a131321 Vangelis Koukis
499 4a131321 Vangelis Koukis
        return 0;
500 4a131321 Vangelis Koukis
}
501 4a131321 Vangelis Koukis
502 d813051b Stratos Psomadakis
/*
503 d813051b Stratos Psomadakis
 * FIXME: Initialize the vlmcd struct based on cmdline_* vars
504 d813051b Stratos Psomadakis
 */
505 d813051b Stratos Psomadakis
static int vlmcd_init(struct vlmcd *vlmcd)
506 4a131321 Vangelis Koukis
{
507 d813051b Stratos Psomadakis
        int ret;
508 d813051b Stratos Psomadakis
509 d813051b Stratos Psomadakis
        vlmcd->vportno = cmdline_vportno;
510 d813051b Stratos Psomadakis
        vlmcd->mportno = cmdline_mportno;
511 d813051b Stratos Psomadakis
        vlmcd->bportno = cmdline_bportno;
512 d813051b Stratos Psomadakis
513 d813051b Stratos Psomadakis
        vlmcd->flying = 0;
514 d813051b Stratos Psomadakis
        vlmcd->nr_ops = cmdline_nr_ops;
515 d813051b Stratos Psomadakis
        vlmcd->ios = calloc(vlmcd->nr_ops, sizeof(struct io));
516 d813051b Stratos Psomadakis
        if (!vlmcd->ios) {
517 d813051b Stratos Psomadakis
                perr(PE, 0, "could not allocate memory [ios]");
518 d813051b Stratos Psomadakis
                ret = -ENOMEM;
519 d813051b Stratos Psomadakis
                goto out;
520 d813051b Stratos Psomadakis
        }
521 d813051b Stratos Psomadakis
522 d813051b Stratos Psomadakis
        /* FIXME: meaning of arguments to xq_alloc_seq()? */
523 d813051b Stratos Psomadakis
        if (!xq_alloc_seq(&vlmcd->free_ios, cmdline_nr_ops, cmdline_nr_ops)) {
524 d813051b Stratos Psomadakis
                perr(PE, 0, "could not allocate memory [free_ios]");
525 d813051b Stratos Psomadakis
                ret = -ENOMEM;
526 d813051b Stratos Psomadakis
                goto out_with_ios;
527 d813051b Stratos Psomadakis
        }
528 4a131321 Vangelis Koukis
529 4a131321 Vangelis Koukis
        /* FIXME: If xseg library fails, is errno set? */
530 d813051b Stratos Psomadakis
        if (xseg_initialize()) {
531 4a131321 Vangelis Koukis
                perr(PE, 0, "could not initialize xseg library");
532 d813051b Stratos Psomadakis
                ret = -EIO;
533 d813051b Stratos Psomadakis
                goto out_with_freeios;
534 4a131321 Vangelis Koukis
        }
535 4a131321 Vangelis Koukis
536 d813051b Stratos Psomadakis
        if (! (vlmcd->xseg = join_or_create(cmdline_xseg_spec))) {
537 4a131321 Vangelis Koukis
                perr(PE, 0, "could not join or create xseg with spec '%s'\n",
538 d813051b Stratos Psomadakis
                        cmdline_xseg_spec);
539 d813051b Stratos Psomadakis
                ret = -EIO;
540 d813051b Stratos Psomadakis
                goto out_with_xseginit;
541 4a131321 Vangelis Koukis
        }
542 4a131321 Vangelis Koukis
543 d813051b Stratos Psomadakis
        if (! (vlmcd->vport = xseg_bind_port(vlmcd->xseg, vlmcd->vportno))) {
544 d813051b Stratos Psomadakis
                perr(PE, 0, "cannot bind to xseg port %ld", (long)vlmcd->vportno);
545 d813051b Stratos Psomadakis
                ret = -EIO;
546 d813051b Stratos Psomadakis
                goto out_with_xsegjoin;
547 d813051b Stratos Psomadakis
        }
548 4a131321 Vangelis Koukis
549 d813051b Stratos Psomadakis
        vlmcd->vportno = xseg_portno(vlmcd->xseg, vlmcd->vport);
550 4a131321 Vangelis Koukis
        
551 4a131321 Vangelis Koukis
        perr(PI, 0, "vlmcd on port %u of %u",
552 d813051b Stratos Psomadakis
                vlmcd->vportno, vlmcd->xseg->config.nr_ports);
553 d813051b Stratos Psomadakis
554 d813051b Stratos Psomadakis
        ret = 0;
555 d813051b Stratos Psomadakis
        goto out;
556 d813051b Stratos Psomadakis
557 d813051b Stratos Psomadakis
out_with_xsegjoin:
558 d813051b Stratos Psomadakis
        xseg_leave(vlmcd->xseg);
559 d813051b Stratos Psomadakis
out_with_xseginit:
560 d813051b Stratos Psomadakis
        always_assert(xseg_finalize() == 0);
561 d813051b Stratos Psomadakis
out_with_freeios:
562 d813051b Stratos Psomadakis
        xq_free(&vlmcd->free_ios);
563 d813051b Stratos Psomadakis
out_with_ios:
564 d813051b Stratos Psomadakis
        free(vlmcd->ios);
565 d813051b Stratos Psomadakis
out:
566 d813051b Stratos Psomadakis
        return ret;
567 4a131321 Vangelis Koukis
}
568 4a131321 Vangelis Koukis
569 4a131321 Vangelis Koukis
int main(int argc, char *argv[])
570 4a131321 Vangelis Koukis
{
571 d813051b Stratos Psomadakis
        struct vlmcd vlmc;
572 d813051b Stratos Psomadakis
573 4a131321 Vangelis Koukis
        init_perr("vlmcd");
574 4a131321 Vangelis Koukis
        parse_cmdline(argc, argv);
575 4a131321 Vangelis Koukis
576 3eb3f140 Vangelis Koukis
        perr(PI, 0, "v = %ld, m = %ld, b = %ld, nr_ops = %lu\n",
577 d813051b Stratos Psomadakis
                cmdline_vportno, cmdline_mportno, cmdline_bportno, cmdline_nr_ops);
578 4a131321 Vangelis Koukis
579 d813051b Stratos Psomadakis
        if (vlmcd_init(&vlmc) < 0)
580 d813051b Stratos Psomadakis
                perr(PFE, 0, "failed to initialize vlmcd");
581 4a131321 Vangelis Koukis
582 d813051b Stratos Psomadakis
        return vlmcd_loop(&vlmc);
583 4a131321 Vangelis Koukis
}