replace blockd names with sosd
[archipelago] / xseg / peers / blockd.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <aio.h>
11 #include <signal.h>
12
13 #include <xseg/xseg.h>
14
15 static int usage(void)
16 {
17         printf("Usage: ./blockd <path_to_disk_image> [options]\n"
18                 "Options: [-p portno]\n"
19                 "         [-s image size in bytes]\n"
20                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
21                 "         [-n nr_parallel_ops]\n");
22         return 1;
23 }
24
25 struct io {
26         struct aiocb cb;
27         struct xseg_request *req;
28         ssize_t retval;
29 };
30
31 struct store {
32         struct xseg *xseg;
33         struct xseg_port *xport;
34         uint32_t portno;
35         int fd;
36         uint64_t size;
37         struct io *ios;
38         struct xq free_ops;
39         char *free_bufs;
40         struct xq pending_ops;
41         char *pending_bufs;
42         long nr_ops;
43         struct sigevent sigevent;
44 };
45
46 static unsigned long sigaction_count;
47
48 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
49 {
50         sigaction_count ++;
51 }
52
53 static struct io *alloc_io(struct store *store)
54 {
55         xqindex idx = xq_pop_head(&store->free_ops);
56         if (idx == None)
57                 return NULL;
58         return store->ios + idx;
59 }
60
61 static inline void free_io(struct store *store, struct io *io)
62 {
63         xqindex idx = io - store->ios;
64         io->req = NULL;
65         xq_append_head(&store->free_ops, idx);
66 }
67
68 static inline void pending_io(struct store *store, struct io *io)
69 {
70         xqindex idx = io - store->ios;
71         xq_append_head(&store->pending_ops, idx);
72 }
73
74 static inline struct io *get_pending_io(struct store *store)
75 {
76         xqindex idx = xq_pop_head(&store->pending_ops);
77         if (idx == None)
78                 return NULL;
79         return store->ios + idx;
80 }
81
82 static void log_io(char *msg, struct io *io)
83 {
84         char name[64], data[64];
85         /* null terminate name in case of req->name is less than 63 characters,
86          * and next character after name (aka first byte of next buffer) is not
87          * null
88          */
89         unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
90         strncpy(name, io->req->name, end);
91         name[end] = 0;
92         strncpy(data, io->req->data, 63);
93         data[63] = 0;
94         printf("%s: fd:%u, op:%u %llu:%lu retval: %lu, reqstate: %u\n"
95                 "name[%u]:'%s', data[%llu]:\n%s------------------\n\n",
96                 msg,
97                 (unsigned int)io->cb.aio_fildes,
98                 (unsigned int)io->req->op,
99                 (unsigned long long)io->cb.aio_offset,
100                 (unsigned long)io->cb.aio_nbytes,
101                 (unsigned long)io->retval,
102                 (unsigned int)io->req->state,
103                 (unsigned int)io->req->namesize, name,
104                 (unsigned long long)io->req->datasize, data);
105 }
106
107 static void complete(struct store *store, struct io *io)
108 {
109         struct xseg_request *req = io->req;
110         req->state |= XS_SERVED;
111         log_io("complete", io);
112         xseg_respond(store->xseg, req->portno, req);
113         xseg_signal(store->xseg, req->portno);
114         free_io(store, io);
115 }
116
117 static void fail(struct store *store, struct io *io)
118 {
119         struct xseg_request *req = io->req;
120         req->state |= XS_ERROR;
121         log_io("fail", io);
122         xseg_respond(store->xseg, req->portno, req);
123         xseg_signal(store->xseg, req->portno);
124         free_io(store, io);
125 }
126
127 static void pending(struct store *store, struct io *io)
128 {
129         io->req->state = XS_PENDING;
130         pending_io(store, io);
131 }
132
133 static void handle_unknown(struct store *store, struct io *io)
134 {
135         struct xseg_request *req = io->req;
136         snprintf(req->data, req->datasize, "unknown request op");
137         fail(store, io);
138 }
139
140 static inline void prepare_io(struct store *store, struct io *io)
141 {
142         io->cb.aio_fildes = store->fd;
143         io->cb.aio_sigevent = store->sigevent;
144         /* cb->aio_sigevent.sigev_value.sival_int = fd; */
145 }
146
147 static void handle_read_write(struct store *store, struct io *io)
148 {
149         int r;
150         struct xseg_request *req = io->req;
151         struct aiocb *cb = &io->cb;
152
153         if (req->state != XS_ACCEPTED) {
154                 if (io->retval > 0)
155                         req->serviced += io->retval;
156                 else
157                         req->datasize = req->serviced;
158
159                 if (req->serviced >= req->datasize) {
160                         complete(store, io);
161                         return;
162                 }
163         }
164
165         if (req != io->req)
166                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
167         if (!req->size) {
168                 if (req->flags & (XSEG_FLUSH | XSEG_FUA)) {
169                         /* for now, no FLUSH/FUA support.
170                          * note that with FLUSH/size == 0 
171                          * there will probably be a (uint64_t)-1 offset */
172                         complete(store, io);
173                         return;
174                 } else {
175                         complete(store, io);
176                         return;
177                 }
178         }
179
180         prepare_io(store, io);
181         cb->aio_buf = req->data + req->serviced;
182         cb->aio_nbytes = req->datasize - req->serviced;
183         cb->aio_offset = req->offset + req->serviced;
184
185         switch (req->op) {
186         case X_READ:
187                 r = aio_read(cb);
188                 break;
189         case X_WRITE:
190                 r = aio_write(cb);
191                 break;
192         default:
193                 snprintf(req->data, req->datasize,
194                          "wtf, corrupt op %u?\n", req->op);
195                 fail(store, io);
196                 return;
197         }
198
199         if (r) {
200                 strerror_r(errno, req->data, req->datasize);
201                 fail(store, io);
202                 return;
203         }
204
205         pending(store, io);
206 }
207
208 static void dispatch(struct store *store, struct io *io)
209 {
210         switch (io->req->op) {
211         case X_READ:
212         case X_WRITE:
213                 handle_read_write(store, io); break;
214         case X_SYNC:
215         default:
216                 handle_unknown(store, io);
217         }
218 }
219
220 static void handle_pending(struct store *store, struct io *io)
221 {
222         int r = aio_error(&io->cb);
223         if (r == EINPROGRESS) {
224                 pending(store, io);
225                 return;
226         }
227
228         io->retval = aio_return(&io->cb);
229         if (r) {
230                 fail(store, io);
231                 return;
232         }
233
234         dispatch(store, io);
235 }
236
237 static void handle_accepted(struct store *store, struct io *io)
238 {
239         struct xseg_request *req = io->req;
240         req->serviced = 0;
241         req->state = XS_ACCEPTED;
242         io->retval = 0;
243         dispatch(store, io);
244 }
245
246 static int blockd_loop(struct store *store)
247 {
248         struct xseg *xseg = store->xseg;
249         uint32_t portno = store->portno;
250         struct io *io;
251         struct xseg_request *accepted;
252
253         for (;;) {
254                 accepted = NULL;
255                 xseg_prepare_wait(xseg, portno);
256                 io = alloc_io(store);
257                 if (io) {
258                         accepted = xseg_accept(xseg, portno);
259                         if (accepted) {
260                                 xseg_cancel_wait(xseg, portno);
261                                 io->req = accepted;
262                                 handle_accepted(store, io);
263                         } else
264                                 free_io(store, io);
265                 }
266
267                 io = get_pending_io(store);
268                 if (io) {
269                         xseg_cancel_wait(xseg, portno);
270                         handle_pending(store, io);
271                 }
272
273                 if (!io && !accepted) 
274                         xseg_wait_signal(xseg, portno, 10000);
275         }
276
277         return 0;
278 }
279
280 static struct xseg *join(char *spec)
281 {
282         struct xseg_config config;
283         struct xseg *xseg;
284
285         (void)xseg_parse_spec(spec, &config);
286         xseg = xseg_join(config.type, config.name);
287         if (xseg)
288                 return xseg;
289
290         (void)xseg_create(&config);
291         return xseg_join(config.type, config.name);
292 }
293
294 static int blockd(char *path, unsigned long size, uint32_t nr_ops,
295                   char *spec, long portno)
296 {
297         struct stat stat;
298         struct sigaction sa;
299         struct store *store;
300         int r;
301
302         store = malloc(sizeof(struct store));
303         if (!store) {
304                 perror("malloc");
305                 return -1;
306         }
307
308         store->fd = open(path, O_RDWR);
309         while (store->fd < 0) {
310                 if (errno == ENOENT && size)
311                         store->fd = open(path, O_RDWR | O_CREAT, 0600);
312                         if (store->fd >= 0)
313                                 break;
314                 perror(path);
315                 return store->fd;
316         }
317         
318         if (size == 0) {
319                 r = fstat(store->fd, &stat);
320                 if (r < 0) {
321                         perror(path);
322                         return r;
323                 }
324                 size = stat.st_size;
325                 if (size == 0) {
326                         fprintf(stderr, "size cannot be zero\n");
327                         return -1;
328                 }
329         }
330
331         lseek(store->fd, size-1, SEEK_SET);
332         if (write(store->fd, &r, 1) != 1) {
333                 perror("write");
334                 return -1;
335         }
336
337         /*
338         r = daemon(1, 1);
339         if (r < 0)
340                 return r;
341                 */
342
343         store->sigevent.sigev_notify = SIGEV_SIGNAL;
344         store->sigevent.sigev_signo = SIGIO;
345         sa.sa_sigaction = sigaction_handler;
346         sa.sa_flags = SA_SIGINFO;
347         if (sigemptyset(&sa.sa_mask))
348                 perror("sigemptyset");
349
350         if (sigaction(SIGIO, &sa, NULL)) {
351                 perror("sigaction");
352                 return -1;
353         }
354
355         store->nr_ops = nr_ops;
356         store->free_bufs = calloc(nr_ops, sizeof(xqindex));
357         if (!store->free_bufs)
358                 goto malloc_fail;
359
360         store->pending_bufs = calloc(nr_ops, sizeof(xqindex));
361         if (!store->pending_bufs)
362                 goto malloc_fail;
363
364         store->ios = calloc(nr_ops, sizeof(struct io));
365         if (!store->ios) {
366 malloc_fail:
367                 perror("malloc");
368                 return -1;
369         }
370
371         xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
372         xq_init_empty(&store->pending_ops, nr_ops, store->pending_bufs);
373
374         if (xseg_initialize("posix")) {
375                 printf("cannot initialize library\n");
376                 return -1;
377         }
378         store->xseg = join(spec);
379         if (!store->xseg)
380                 return -1;
381
382         store->xport = xseg_bind_port(store->xseg, portno);
383         if (!store->xport) {
384                 printf("cannot bind to port %ld\n", portno);
385                 return -1;
386         }
387
388         store->portno = xseg_portno(store->xseg, store->xport);
389         printf("blockd on port %u/%u\n",
390                 store->portno, store->xseg->config.nr_ports);
391
392         return blockd_loop(store);
393 }
394
395 int main(int argc, char **argv)
396 {
397         char *path, *spec = "";
398         unsigned long size;
399         int i;
400         long portno;
401         uint32_t nr_ops;
402
403         if (argc < 2)
404                 return usage();
405
406         path = argv[1];
407         size = 0;
408         portno = -1;
409         nr_ops = 0;
410
411         for (i = 2; i < argc; i++) {
412                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
413                         spec = argv[i+1];
414                         i += 1;
415                         continue;
416                 }
417
418                 if (!strcmp(argv[i], "-s") && i + 1 < argc) {
419                         size = strtoul(argv[i+1], NULL, 10);
420                         i += 1;
421                         continue;
422                 }
423
424                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
425                         portno = strtoul(argv[i+1], NULL, 10);
426                         i += 1;
427                         continue;
428                 }
429
430                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
431                         nr_ops = strtoul(argv[i+1], NULL, 10);
432                         i += 1;
433                         continue;
434                 }
435         }
436
437         if (nr_ops <= 0)
438                 nr_ops = 16;
439
440         return blockd(path, size, nr_ops, spec, portno);
441 }
442