blockd: Fix cli option/argument for nr_ops
[archipelago] / xseg / peers / blockd.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <aio.h>
11 #include <signal.h>
12
13 #include <xseg/xseg.h>
14
15 #define TARGET_NAMELEN 128
16
17 static int usage(void)
18 {
19         printf("Usage: ./blockd <path_to_disk_image> [options]\n"
20                 "Options: [-p portno]\n"
21                 "         [-s image size in bytes]\n"
22                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
23                 "         [-n nr_parallel_ops]\n");
24         return 1;
25 }
26
27 struct io {
28         struct aiocb cb;
29         struct xseg_request *req;
30         ssize_t retval;
31 };
32
33 struct store {
34         struct xseg *xseg;
35         struct xseg_port *xport;
36         uint32_t portno;
37         int fd;
38         char name[TARGET_NAMELEN];
39         uint32_t namesize;
40         uint64_t size;
41         struct io *ios;
42         struct xq free_ops;
43         char *free_bufs;
44         struct xq pending_ops;
45         char *pending_bufs;
46         long nr_ops;
47         struct sigevent sigevent;
48 };
49
50 static unsigned long sigaction_count;
51
52 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
53 {
54         sigaction_count ++;
55 }
56
57 static struct io *alloc_io(struct store *store)
58 {
59         xqindex idx = xq_pop_head(&store->free_ops);
60         if (idx == None)
61                 return NULL;
62         return store->ios + idx;
63 }
64
65 static inline void free_io(struct store *store, struct io *io)
66 {
67         xqindex idx = io - store->ios;
68         io->req = NULL;
69         xq_append_head(&store->free_ops, idx);
70 }
71
72 static inline void pending_io(struct store *store, struct io *io)
73 {
74         xqindex idx = io - store->ios;
75         xq_append_tail(&store->pending_ops, idx);
76 }
77
78 static inline struct io *get_pending_io(struct store *store)
79 {
80         xqindex idx = xq_pop_head(&store->pending_ops);
81         if (idx == None)
82                 return NULL;
83         return store->ios + idx;
84 }
85
86 static void log_io(char *msg, struct io *io)
87 {
88         char name[64], data[64];
89         /* null terminate name in case of req->name is less than 63 characters,
90          * and next character after name (aka first byte of next buffer) is not
91          * null
92          */
93         unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
94         strncpy(name, io->req->name, end);
95         name[end] = 0;
96         strncpy(data, io->req->data, 63);
97         data[63] = 0;
98         printf("%s: fd:%u, op:%u %llu:%lu retval: %lu, reqstate: %u\n"
99                 "name[%u]:'%s', data[%llu]:\n%s------------------\n\n",
100                 msg,
101                 (unsigned int)io->cb.aio_fildes,
102                 (unsigned int)io->req->op,
103                 (unsigned long long)io->cb.aio_offset,
104                 (unsigned long)io->cb.aio_nbytes,
105                 (unsigned long)io->retval,
106                 (unsigned int)io->req->state,
107                 (unsigned int)io->req->namesize, name,
108                 (unsigned long long)io->req->datasize, data);
109 }
110
111 static void complete(struct store *store, struct io *io)
112 {
113         struct xseg_request *req = io->req;
114         req->state |= XS_SERVED;
115         log_io("complete", io);
116         xseg_respond(store->xseg, req->portno, req);
117         xseg_signal(store->xseg, req->portno);
118         free_io(store, io);
119 }
120
121 static void fail(struct store *store, struct io *io)
122 {
123         struct xseg_request *req = io->req;
124         req->state |= XS_FAILED;
125         log_io("fail", io);
126         xseg_respond(store->xseg, req->portno, req);
127         xseg_signal(store->xseg, req->portno);
128         free_io(store, io);
129 }
130
131 static void pending(struct store *store, struct io *io)
132 {
133         io->req->state = XS_PENDING;
134         pending_io(store, io);
135 }
136
137 static void handle_unknown(struct store *store, struct io *io)
138 {
139         struct xseg_request *req = io->req;
140         snprintf(req->data, req->datasize, "unknown request op");
141         fail(store, io);
142 }
143
144 static inline void prepare_io(struct store *store, struct io *io)
145 {
146         io->cb.aio_fildes = store->fd;
147         io->cb.aio_sigevent = store->sigevent;
148         /* cb->aio_sigevent.sigev_value.sival_int = fd; */
149 }
150
151 static void handle_read_write(struct store *store, struct io *io)
152 {
153         int r;
154         struct xseg_request *req = io->req;
155         struct aiocb *cb = &io->cb;
156
157         if (req->state != XS_ACCEPTED) {
158                 if (io->retval > 0)
159                         req->serviced += io->retval;
160                 else
161                         req->datasize = req->serviced;
162
163                 if (req->serviced >= req->datasize) {
164                         complete(store, io);
165                         return;
166                 }
167         }
168
169         if (req != io->req)
170                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
171         if (!req->size) {
172                 if (req->flags & (XF_FLUSH | XF_FUA)) {
173                         /* for now, no FLUSH/FUA support.
174                          * note that with FLUSH/size == 0 
175                          * there will probably be a (uint64_t)-1 offset */
176                         complete(store, io);
177                         return;
178                 } else {
179                         complete(store, io);
180                         return;
181                 }
182         }
183
184         prepare_io(store, io);
185         cb->aio_buf = req->data + req->serviced;
186         cb->aio_nbytes = req->datasize - req->serviced;
187         cb->aio_offset = req->offset + req->serviced;
188
189         switch (req->op) {
190         case X_READ:
191                 r = aio_read(cb);
192                 break;
193         case X_WRITE:
194                 r = aio_write(cb);
195                 break;
196         default:
197                 snprintf(req->data, req->datasize,
198                          "wtf, corrupt op %u?\n", req->op);
199                 fail(store, io);
200                 return;
201         }
202
203         if (r) {
204                 strerror_r(errno, req->data, req->datasize);
205                 fail(store, io);
206                 return;
207         }
208
209         pending(store, io);
210 }
211
212 static void handle_info(struct store *store, struct io *io)
213 {
214         struct xseg_request *req = io->req;
215
216         if (req->namesize != store->namesize ||
217                 strncmp(req->name, store->name, store->namesize)) {
218
219                 fail(store, io);
220                 return;
221         }
222
223         *((uint64_t *) req->data) = store->size;
224         req->serviced = req->datasize = sizeof(store->size);
225         io->retval = io->cb.aio_offset = io->cb.aio_nbytes = req->datasize;
226
227         complete(store, io);
228 }
229
230 static void dispatch(struct store *store, struct io *io)
231 {
232         switch (io->req->op) {
233         case X_READ:
234         case X_WRITE:
235                 handle_read_write(store, io); break;
236         case X_INFO:
237                 handle_info(store, io); break;
238         case X_SYNC:
239         default:
240                 handle_unknown(store, io);
241         }
242 }
243
244 static void handle_pending(struct store *store, struct io *io)
245 {
246         int r = aio_error(&io->cb);
247         if (r == EINPROGRESS) {
248                 pending(store, io);
249                 return;
250         }
251
252         io->retval = aio_return(&io->cb);
253         if (r) {
254                 fail(store, io);
255                 return;
256         }
257
258         dispatch(store, io);
259 }
260
261 static void handle_accepted(struct store *store, struct io *io)
262 {
263         struct xseg_request *req = io->req;
264         req->serviced = 0;
265         req->state = XS_ACCEPTED;
266         io->retval = 0;
267         dispatch(store, io);
268 }
269
270 static int blockd_loop(struct store *store)
271 {
272         struct xseg *xseg = store->xseg;
273         uint32_t portno = store->portno;
274         struct io *io;
275         struct xseg_request *accepted;
276
277         for (;;) {
278                 accepted = NULL;
279                 xseg_prepare_wait(xseg, portno);
280                 io = alloc_io(store);
281                 if (io) {
282                         accepted = xseg_accept(xseg, portno);
283                         if (accepted) {
284                                 xseg_cancel_wait(xseg, portno);
285                                 io->req = accepted;
286                                 handle_accepted(store, io);
287                         } else
288                                 free_io(store, io);
289                 }
290
291                 io = get_pending_io(store);
292                 if (io) {
293                         xseg_cancel_wait(xseg, portno);
294                         handle_pending(store, io);
295                 }
296
297                 if (!io && !accepted) 
298                         xseg_wait_signal(xseg, portno, 10000);
299         }
300
301         return 0;
302 }
303
304 static struct xseg *join(char *spec)
305 {
306         struct xseg_config config;
307         struct xseg *xseg;
308
309         (void)xseg_parse_spec(spec, &config);
310         xseg = xseg_join(config.type, config.name);
311         if (xseg)
312                 return xseg;
313
314         (void)xseg_create(&config);
315         return xseg_join(config.type, config.name);
316 }
317
318 static int blockd(char *path, off_t size, uint32_t nr_ops,
319                   char *spec, long portno)
320 {
321         struct stat stat;
322         struct sigaction sa;
323         struct store *store;
324         int r;
325
326         store = malloc(sizeof(struct store));
327         if (!store) {
328                 perror("malloc");
329                 return -1;
330         }
331
332         strncpy(store->name, path, TARGET_NAMELEN);
333         store->name[TARGET_NAMELEN - 1] = '\0';
334         store->namesize = strlen(store->name);
335
336         store->fd = open(path, O_RDWR);
337         while (store->fd < 0) {
338                 if (errno == ENOENT && size)
339                         store->fd = open(path, O_RDWR | O_CREAT, 0600);
340                         if (store->fd >= 0)
341                                 break;
342                 perror(path);
343                 return store->fd;
344         }
345         
346         if (size == 0) {
347                 r = fstat(store->fd, &stat);
348                 if (r < 0) {
349                         perror(path);
350                         return r;
351                 }
352                 size = (uint64_t) stat.st_size;
353                 if (size == 0) {
354                         fprintf(stderr, "size cannot be zero\n");
355                         return -1;
356                 }
357         }
358
359         lseek(store->fd, size-1, SEEK_SET);
360         if (write(store->fd, &r, 1) != 1) {
361                 perror("write");
362                 return -1;
363         }
364
365         store->size = size;
366
367         /*
368         r = daemon(1, 1);
369         if (r < 0)
370                 return r;
371                 */
372
373         store->sigevent.sigev_notify = SIGEV_SIGNAL;
374         store->sigevent.sigev_signo = SIGIO;
375         sa.sa_sigaction = sigaction_handler;
376         sa.sa_flags = SA_SIGINFO;
377         if (sigemptyset(&sa.sa_mask))
378                 perror("sigemptyset");
379
380         if (sigaction(SIGIO, &sa, NULL)) {
381                 perror("sigaction");
382                 return -1;
383         }
384
385         store->nr_ops = nr_ops;
386         store->free_bufs = calloc(nr_ops, sizeof(xqindex));
387         if (!store->free_bufs)
388                 goto malloc_fail;
389
390         store->pending_bufs = calloc(nr_ops, sizeof(xqindex));
391         if (!store->pending_bufs)
392                 goto malloc_fail;
393
394         store->ios = calloc(nr_ops, sizeof(struct io));
395         if (!store->ios) {
396 malloc_fail:
397                 perror("malloc");
398                 return -1;
399         }
400
401         xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
402         xq_init_empty(&store->pending_ops, nr_ops, store->pending_bufs);
403
404         if (xseg_initialize("posix")) {
405                 printf("cannot initialize library\n");
406                 return -1;
407         }
408         store->xseg = join(spec);
409         if (!store->xseg)
410                 return -1;
411
412         store->xport = xseg_bind_port(store->xseg, portno);
413         if (!store->xport) {
414                 printf("cannot bind to port %ld\n", portno);
415                 return -1;
416         }
417
418         store->portno = xseg_portno(store->xseg, store->xport);
419         printf("blockd on port %u/%u\n",
420                 store->portno, store->xseg->config.nr_ports);
421
422         return blockd_loop(store);
423 }
424
425 int main(int argc, char **argv)
426 {
427         char *path, *spec = "";
428         uint64_t size;
429         int i;
430         long portno;
431         uint32_t nr_ops;
432
433         if (argc < 2)
434                 return usage();
435
436         path = argv[1];
437         size = 0;
438         portno = -1;
439         nr_ops = 0;
440
441         for (i = 2; i < argc; i++) {
442                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
443                         spec = argv[i+1];
444                         i += 1;
445                         continue;
446                 }
447
448                 if (!strcmp(argv[i], "-s") && i + 1 < argc) {
449                         size = strtoull(argv[i+1], NULL, 10);
450                         i += 1;
451                         continue;
452                 }
453
454                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
455                         portno = strtoul(argv[i+1], NULL, 10);
456                         i += 1;
457                         continue;
458                 }
459
460                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
461                         nr_ops = strtoul(argv[i+1], NULL, 10);
462                         i += 1;
463                         continue;
464                 }
465         }
466
467         if (nr_ops <= 0)
468                 nr_ops = 16;
469
470         return blockd(path, size, nr_ops, spec, portno);
471 }
472