make some field types arch-neutral
[archipelago] / xseg / peers / blockd.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <aio.h>
11 #include <signal.h>
12
13 #include <xseg/xseg.h>
14
15 static int usage(void)
16 {
17         printf("Usage: ./blockd <path_to_disk_image> [options]\n"
18                 "Options: [-p portno]\n"
19                 "         [-s image size in bytes]\n"
20                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
21                 "         [-n nr_parallel_ops]\n");
22         return 1;
23 }
24
25 struct io {
26         struct aiocb cb;
27         struct xseg_request *req;
28         ssize_t retval;
29 };
30
31 struct store {
32         struct xseg *xseg;
33         struct xseg_port *xport;
34         uint32_t portno;
35         int fd;
36         off_t size;
37         struct io *ios;
38         struct xq free_ops;
39         char *free_bufs;
40         struct xq pending_ops;
41         char *pending_bufs;
42         long nr_ops;
43         struct sigevent sigevent;
44 };
45
46 static unsigned long sigaction_count;
47
48 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
49 {
50         sigaction_count ++;
51 }
52
53 static struct io *alloc_io(struct store *store)
54 {
55         xqindex idx = xq_pop_head(&store->free_ops);
56         if (idx == None)
57                 return NULL;
58         return store->ios + idx;
59 }
60
61 static inline void free_io(struct store *store, struct io *io)
62 {
63         xqindex idx = io - store->ios;
64         io->req = NULL;
65         xq_append_head(&store->free_ops, idx);
66 }
67
68 static inline void pending_io(struct store *store, struct io *io)
69 {
70         xqindex idx = io - store->ios;
71         xq_append_tail(&store->pending_ops, idx);
72 }
73
74 static inline struct io *get_pending_io(struct store *store)
75 {
76         xqindex idx = xq_pop_head(&store->pending_ops);
77         if (idx == None)
78                 return NULL;
79         return store->ios + idx;
80 }
81
82 static void log_io(char *msg, struct io *io)
83 {
84         char name[64], data[64];
85         /* null terminate name in case of req->name is less than 63 characters,
86          * and next character after name (aka first byte of next buffer) is not
87          * null
88          */
89         unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
90         strncpy(name, io->req->name, end);
91         name[end] = 0;
92         strncpy(data, io->req->data, 63);
93         data[63] = 0;
94         printf("%s: fd:%u, op:%u %llu:%lu retval: %lu, reqstate: %u\n"
95                 "name[%u]:'%s', data[%llu]:\n%s------------------\n\n",
96                 msg,
97                 (unsigned int)io->cb.aio_fildes,
98                 (unsigned int)io->req->op,
99                 (unsigned long long)io->cb.aio_offset,
100                 (unsigned long)io->cb.aio_nbytes,
101                 (unsigned long)io->retval,
102                 (unsigned int)io->req->state,
103                 (unsigned int)io->req->namesize, name,
104                 (unsigned long long)io->req->datasize, data);
105 }
106
107 static void complete(struct store *store, struct io *io)
108 {
109         struct xseg_request *req = io->req;
110         req->state |= XS_SERVED;
111         log_io("complete", io);
112         xseg_respond(store->xseg, req->portno, req);
113         xseg_signal(store->xseg, req->portno);
114         free_io(store, io);
115 }
116
117 static void fail(struct store *store, struct io *io)
118 {
119         struct xseg_request *req = io->req;
120         req->state |= XS_ERROR;
121         log_io("fail", io);
122         xseg_respond(store->xseg, req->portno, req);
123         xseg_signal(store->xseg, req->portno);
124         free_io(store, io);
125 }
126
127 static void pending(struct store *store, struct io *io)
128 {
129         io->req->state = XS_PENDING;
130         pending_io(store, io);
131 }
132
133 static void handle_unknown(struct store *store, struct io *io)
134 {
135         struct xseg_request *req = io->req;
136         snprintf(req->data, req->datasize, "unknown request op");
137         fail(store, io);
138 }
139
140 static inline void prepare_io(struct store *store, struct io *io)
141 {
142         io->cb.aio_fildes = store->fd;
143         io->cb.aio_sigevent = store->sigevent;
144         /* cb->aio_sigevent.sigev_value.sival_int = fd; */
145 }
146
147 static void handle_read_write(struct store *store, struct io *io)
148 {
149         int r;
150         struct xseg_request *req = io->req;
151         struct aiocb *cb = &io->cb;
152
153         if (req->state != XS_ACCEPTED) {
154                 if (io->retval > 0)
155                         req->serviced += io->retval;
156                 else
157                         req->datasize = req->serviced;
158
159                 if (req->serviced >= req->datasize) {
160                         complete(store, io);
161                         return;
162                 }
163         }
164
165         if (req != io->req)
166                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
167         if (!req->size) {
168                 if (req->flags & (XF_FLUSH | XF_FUA)) {
169                         /* for now, no FLUSH/FUA support.
170                          * note that with FLUSH/size == 0 
171                          * there will probably be a (uint64_t)-1 offset */
172                         complete(store, io);
173                         return;
174                 } else {
175                         complete(store, io);
176                         return;
177                 }
178         }
179
180         prepare_io(store, io);
181         cb->aio_buf = req->data + req->serviced;
182         cb->aio_nbytes = req->datasize - req->serviced;
183         cb->aio_offset = req->offset + req->serviced;
184
185         switch (req->op) {
186         case X_READ:
187                 r = aio_read(cb);
188                 break;
189         case X_WRITE:
190                 r = aio_write(cb);
191                 break;
192         default:
193                 snprintf(req->data, req->datasize,
194                          "wtf, corrupt op %u?\n", req->op);
195                 fail(store, io);
196                 return;
197         }
198
199         if (r) {
200                 strerror_r(errno, req->data, req->datasize);
201                 fail(store, io);
202                 return;
203         }
204
205         pending(store, io);
206 }
207
208 static void handle_info(struct store *store, struct io *io)
209 {
210         struct xseg_request *req = io->req;
211
212         *((off_t *) req->data) = store->size;
213         req->datasize = sizeof(store->size);
214
215         complete(store, io);
216 }
217
218 static void dispatch(struct store *store, struct io *io)
219 {
220         switch (io->req->op) {
221         case X_READ:
222         case X_WRITE:
223                 handle_read_write(store, io); break;
224         case X_INFO:
225                 handle_info(store, io); break;
226         case X_SYNC:
227         default:
228                 handle_unknown(store, io);
229         }
230 }
231
232 static void handle_pending(struct store *store, struct io *io)
233 {
234         int r = aio_error(&io->cb);
235         if (r == EINPROGRESS) {
236                 pending(store, io);
237                 return;
238         }
239
240         io->retval = aio_return(&io->cb);
241         if (r) {
242                 fail(store, io);
243                 return;
244         }
245
246         dispatch(store, io);
247 }
248
249 static void handle_accepted(struct store *store, struct io *io)
250 {
251         struct xseg_request *req = io->req;
252         req->serviced = 0;
253         req->state = XS_ACCEPTED;
254         io->retval = 0;
255         dispatch(store, io);
256 }
257
258 static int blockd_loop(struct store *store)
259 {
260         struct xseg *xseg = store->xseg;
261         uint32_t portno = store->portno;
262         struct io *io;
263         struct xseg_request *accepted;
264
265         for (;;) {
266                 accepted = NULL;
267                 xseg_prepare_wait(xseg, portno);
268                 io = alloc_io(store);
269                 if (io) {
270                         accepted = xseg_accept(xseg, portno);
271                         if (accepted) {
272                                 xseg_cancel_wait(xseg, portno);
273                                 io->req = accepted;
274                                 handle_accepted(store, io);
275                         } else
276                                 free_io(store, io);
277                 }
278
279                 io = get_pending_io(store);
280                 if (io) {
281                         xseg_cancel_wait(xseg, portno);
282                         handle_pending(store, io);
283                 }
284
285                 if (!io && !accepted) 
286                         xseg_wait_signal(xseg, portno, 10000);
287         }
288
289         return 0;
290 }
291
292 static struct xseg *join(char *spec)
293 {
294         struct xseg_config config;
295         struct xseg *xseg;
296
297         (void)xseg_parse_spec(spec, &config);
298         xseg = xseg_join(config.type, config.name);
299         if (xseg)
300                 return xseg;
301
302         (void)xseg_create(&config);
303         return xseg_join(config.type, config.name);
304 }
305
306 static int blockd(char *path, off_t size, uint32_t nr_ops,
307                   char *spec, long portno)
308 {
309         struct stat stat;
310         struct sigaction sa;
311         struct store *store;
312         int r;
313
314         store = malloc(sizeof(struct store));
315         if (!store) {
316                 perror("malloc");
317                 return -1;
318         }
319
320         store->fd = open(path, O_RDWR);
321         while (store->fd < 0) {
322                 if (errno == ENOENT && size)
323                         store->fd = open(path, O_RDWR | O_CREAT, 0600);
324                         if (store->fd >= 0)
325                                 break;
326                 perror(path);
327                 return store->fd;
328         }
329         
330         if (size == 0) {
331                 r = fstat(store->fd, &stat);
332                 if (r < 0) {
333                         perror(path);
334                         return r;
335                 }
336                 size = stat.st_size;
337                 if (size == 0) {
338                         fprintf(stderr, "size cannot be zero\n");
339                         return -1;
340                 }
341         }
342
343         lseek(store->fd, size-1, SEEK_SET);
344         if (write(store->fd, &r, 1) != 1) {
345                 perror("write");
346                 return -1;
347         }
348
349         store->size = size;
350
351         /*
352         r = daemon(1, 1);
353         if (r < 0)
354                 return r;
355                 */
356
357         store->sigevent.sigev_notify = SIGEV_SIGNAL;
358         store->sigevent.sigev_signo = SIGIO;
359         sa.sa_sigaction = sigaction_handler;
360         sa.sa_flags = SA_SIGINFO;
361         if (sigemptyset(&sa.sa_mask))
362                 perror("sigemptyset");
363
364         if (sigaction(SIGIO, &sa, NULL)) {
365                 perror("sigaction");
366                 return -1;
367         }
368
369         store->nr_ops = nr_ops;
370         store->free_bufs = calloc(nr_ops, sizeof(xqindex));
371         if (!store->free_bufs)
372                 goto malloc_fail;
373
374         store->pending_bufs = calloc(nr_ops, sizeof(xqindex));
375         if (!store->pending_bufs)
376                 goto malloc_fail;
377
378         store->ios = calloc(nr_ops, sizeof(struct io));
379         if (!store->ios) {
380 malloc_fail:
381                 perror("malloc");
382                 return -1;
383         }
384
385         xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
386         xq_init_empty(&store->pending_ops, nr_ops, store->pending_bufs);
387
388         if (xseg_initialize("posix")) {
389                 printf("cannot initialize library\n");
390                 return -1;
391         }
392         store->xseg = join(spec);
393         if (!store->xseg)
394                 return -1;
395
396         store->xport = xseg_bind_port(store->xseg, portno);
397         if (!store->xport) {
398                 printf("cannot bind to port %ld\n", portno);
399                 return -1;
400         }
401
402         store->portno = xseg_portno(store->xseg, store->xport);
403         printf("blockd on port %u/%u\n",
404                 store->portno, store->xseg->config.nr_ports);
405
406         return blockd_loop(store);
407 }
408
409 int main(int argc, char **argv)
410 {
411         char *path, *spec = "";
412         off_t size;
413         int i;
414         long portno;
415         uint32_t nr_ops;
416
417         if (argc < 2)
418                 return usage();
419
420         path = argv[1];
421         size = 0;
422         portno = -1;
423         nr_ops = 0;
424
425         for (i = 2; i < argc; i++) {
426                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
427                         spec = argv[i+1];
428                         i += 1;
429                         continue;
430                 }
431
432                 if (!strcmp(argv[i], "-s") && i + 1 < argc) {
433                         size = strtoull(argv[i+1], NULL, 10);
434                         i += 1;
435                         continue;
436                 }
437
438                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
439                         portno = strtoul(argv[i+1], NULL, 10);
440                         i += 1;
441                         continue;
442                 }
443
444                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
445                         nr_ops = strtoul(argv[i+1], NULL, 10);
446                         i += 1;
447                         continue;
448                 }
449         }
450
451         if (nr_ops <= 0)
452                 nr_ops = 16;
453
454         return blockd(path, size, nr_ops, spec, portno);
455 }
456