make code compile. no validity checks
[archipelago] / xseg / peers / user / sosd.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <signal.h>
11 #include <xseg/xseg.h>
12 #include <util_libs/user/sos/sos.h>
13 #include <sys/time.h>
14
15 #include <signal.h>
16 #include <sys/syscall.h>
17
18 /* maybe add this to store struct */
19 #define objsize (4*1024*1024)
20 #define MAX_VOL_NAME 242
21
22 static int usage(void)
23 {
24         printf("Usage: ./sosd <path_to_disk_image> [options]\n"
25                 "Options: [-p portno]\n"
26                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
27                 "         [-n nr_parallel_ops]\n");
28         return 1;
29 }
30
31 struct io {
32         struct xseg_request *req;
33         ssize_t retval;
34         struct sos_request sos_req;
35         char objname[MAX_VOL_NAME +1 + 12 + 1];
36         struct timeval start;
37 };
38
39 struct store {
40         struct xseg *xseg;
41         struct xseg_port *xport;
42         uint32_t portno;
43         int fd;
44         uint64_t size;
45         struct io *ios;
46         struct xq free_ops;
47         char *free_bufs;
48         struct xq resubmit_ops;
49         char *resubmit_bufs;
50         long nr_ops;
51         sos_handle_t sos;
52         pid_t pid;
53         sigset_t signal_set;
54 };
55
56 static unsigned int verbose;
57
58 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
59 {
60         return;
61 }
62
63 static void signal_self(struct store *store)
64 {
65         union sigval sigval = {0};
66         pid_t me = store->pid;
67         if (sigqueue(me, SIGIO, sigval) < 0)
68                 perror("sigqueue");
69 }
70
71 static int wait_signal(struct store *store)
72 {
73         int r;
74         siginfo_t siginfo;
75         struct timespec ts;
76         uint32_t usec_timeout = 5000;
77
78         ts.tv_sec = usec_timeout / 1000000;
79         ts.tv_nsec = 1000 * (usec_timeout - ts.tv_sec * 1000000);
80
81         r = sigtimedwait(&store->signal_set, &siginfo, &ts);
82         if (r < 0)
83                 return r;
84
85         return siginfo.si_signo;
86
87 }
88
89 static struct io *alloc_io(struct store *store)
90 {
91         xqindex idx = xq_pop_head(&store->free_ops, 1);
92         if (idx == Noneidx)
93                 return NULL;
94         return store->ios + idx;
95 }
96
97 static inline void free_io(struct store *store, struct io *io)
98 {
99         xqindex idx = io - store->ios;
100         io->req = NULL;
101         xq_append_head(&store->free_ops, idx, 1);
102         /* not the right place. sosd_loop couldn't sleep because of that
103          * needed for flush support. maybe this should go to complete function
104          *
105         signal_self(store);
106         */
107 }
108
109 static void resubmit_io(struct store *store, struct io *io)
110 {
111         xqindex idx = io - store->ios;
112         xq_append_tail(&store->resubmit_ops, idx, 1);
113 }
114
115 static struct io* get_resubmitted_io(struct store *store)
116 {
117         xqindex idx = xq_pop_head(&store->resubmit_ops, 1);
118         if (idx == Noneidx)
119                 return NULL;
120         return store->ios + idx;
121 }
122
123 static void log_io(char *msg, struct io *io)
124 {
125         char target[64], data[64];
126         /* null terminate name in case of req->target is less than 63 characters,
127          * and next character after name (aka first byte of next buffer) is not
128          * null
129          */
130         unsigned int end = (io->req->targetlen> 63) ? 63 : io->req->targetlen;
131         if (verbose) {
132                 strncpy(target, io->req->target, end);
133                 target[end] = 0;
134                 strncpy(data, io->req->data, 63);
135                 data[63] = 0;
136                 printf("%s: sos req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
137                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
138                                 msg,
139                                 (unsigned int)io->sos_req.id,
140                                 (unsigned int)io->req->op,
141                                 (unsigned long long)io->sos_req.offset,
142                                 (unsigned long)io->sos_req.size,
143                                 (unsigned long)io->req->serviced,
144                                 (unsigned long)io->retval,
145                                 (unsigned int)io->req->state,
146                                 (unsigned int)io->req->targetlen, target,
147                                 (unsigned long long)io->req->datalen, data);
148         }
149 }
150
151 static void complete(struct store *store, struct io *io)
152 {
153         struct xseg_request *req = io->req;
154         /*
155         struct timeval end;
156         unsigned long us;
157         gettimeofday(&end, NULL);
158         timersub(&end, &io->start, &end);
159         us = end.tv_sec*1000000 +end.tv_usec;
160         printf("sosd: Request %lu completed after %lu us\n", io->sos_req.id, us);
161         */
162
163         req->state |= XS_SERVED;
164         log_io("complete", io);
165         xseg_respond(store->xseg, req->portno, req);
166         xseg_signal(store->xseg, req->portno);
167         free_io(store, io);
168 }
169
170 static void fail(struct store *store, struct io *io)
171 {
172         struct xseg_request *req = io->req;
173         req->state |= XS_FAILED;
174         log_io("fail", io);
175         xseg_respond(store->xseg, req->portno, req);
176         xseg_signal(store->xseg, req->portno);
177         free_io(store, io);
178 }
179
180 static void pending(struct store *store, struct io *io)
181 {
182         io->req->state = XS_PENDING;
183 }
184
185 static void handle_unknown(struct store *store, struct io *io)
186 {
187         struct xseg_request *req = io->req;
188         snprintf(req->data, req->datalen, "unknown request op");
189         fail(store, io);
190 }
191
192 static int32_t get_sos_op(uint32_t xseg_op)
193 {
194         switch (xseg_op) {
195         case X_READ:
196                 return S_READ;
197         case X_WRITE:
198                 return S_WRITE;
199         default:
200                 return S_NONE;
201         }
202 }
203
204 static uint32_t get_sos_flags(uint32_t xseg_flags)
205 {
206         uint32_t flags = 0;
207         if (xseg_flags & XF_FLUSH) {
208                 flags |= SF_FLUSH;
209         }
210         if (xseg_flags & XF_FUA) {
211                 flags |= SF_FUA;
212         }
213         return flags;
214 }
215
216 static int calculate_sosreq(struct xseg_request *xseg_req, struct sos_request *sos_req)
217 {
218         unsigned int suffix;
219         int r;
220         char *buf;
221
222         /* get object name from offset in volume */
223         buf = sos_req->target;
224         suffix = (unsigned int) ((xseg_req->offset+xseg_req->serviced) / (uint64_t)objsize) ;
225 //      printf("suffix: %u\n", suffix);
226         if (xseg_req->targetlen> MAX_VOL_NAME){
227                 printf("xseg_req targetlen > MAX_VOL_NAME\n");
228                 return -1;
229         }
230         strncpy(buf, xseg_req->target, xseg_req->targetlen);
231         buf[xseg_req->targetlen] = '_';
232         r = snprintf(buf+xseg_req->targetlen+1, 13, "%012u", suffix);
233         if (r >= 13)
234                 return -1;
235
236         //sos_req->target = buf;
237         sos_req->targetlen = xseg_req->targetlen+1+12;
238
239         /* offset should be set to offset in object */
240         sos_req->offset = (xseg_req->offset + xseg_req->serviced) % objsize;
241         /* sos_req offset + sos_req size  < objsize always
242          * request data up to the end of object.
243          */
244         sos_req->size = (xseg_req->datalen - xseg_req->serviced) ;  /* should this be xseg_req->size ? */
245         if (sos_req->size > objsize - sos_req->offset)
246                 sos_req->size = objsize - sos_req->offset;
247         /* this should have been checked before this call */
248         if (xseg_req->serviced < xseg_req->datalen)
249                 sos_req->data = xseg_req->data + xseg_req->serviced;
250         else
251                 return -1;
252 //      printf("name: %s, size: %lu, offset: %lu, data:%s\n", sos_req->target, 
253 //                      sos_req->size, sos_req->offset, sos_req->data);
254         return 0;
255 }
256
257 static void prepare_sosreq(struct store *store, struct io *io)
258 {
259         struct xseg_request *xseg_req = io->req;
260         struct sos_request *sos_req = &io->sos_req;
261         sos_req->flags = get_sos_flags(xseg_req->flags);
262         sos_req->state = S_PENDING;
263         sos_req->retval = 0;
264         sos_req->op = get_sos_op(xseg_req->op);
265         sos_req->priv = store;
266         sos_req->target = io->objname;
267 }
268
269 static inline void prepare_io(struct store *store, struct io *io)
270 {
271         prepare_sosreq(store, io);
272         /* Assign io id to sos_req id. This can be done as an initialization of
273          * ios, to avoid reseting it every time */
274         io->sos_req.id = io - store->ios;
275 }
276
277
278 static void handle_resubmit(struct store *store, struct io *io);
279
280 static void complete_rw(struct store *store, struct io *io)
281 {
282         int r;
283         struct xseg_request *req = io->req;
284         struct sos_request *sos_req = &io->sos_req;
285         if (req->state == XS_ACCEPTED) {
286                 /* should not happen */
287                 fail(store, io);
288                 return;
289         }
290         if (io->retval > 0)
291                 req->serviced += io->retval;
292         else if (io->retval == 0) {
293                 /* reached end of object. zero out rest of data
294                  * requested from this object
295                  */ 
296                 memset(sos_req->data, 0, sos_req->size);
297                 req->serviced += sos_req->size;
298         }
299         else if (io->retval == -2) {
300                 /* object not found. return zeros instead */
301                 memset(sos_req->data, 0, sos_req->size);
302                 req->serviced += sos_req->size;
303         }
304         else {
305                 /* io->retval < 0 */
306                 fail(store, io);
307                 return;
308         }
309         /* request completed ? */
310         if (req->serviced >= req->datalen) {
311                 complete(store, io);
312                 return;
313         }
314
315         if (req != io->req)
316                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
317         if (!req->size) {
318                 /* should not happen */
319                 fail(store, io);
320                 return;
321         }
322
323         switch (req->op) {
324         case X_READ:
325         case X_WRITE:
326                 log_io("resubmitting", io);
327                 resubmit_io(store, io);
328                 signal_self(store);
329                 break;
330         default:
331                 snprintf(req->data, req->datalen,
332                          "wtf, corrupt op %u?\n", req->op);
333                 fail(store, io);
334                 return;
335         }
336 }
337
338 static void handle_read_write(struct store *store, struct io *io)
339 {
340         int r;
341         struct xseg_request *req = io->req;
342         struct sos_request *sos_req = &io->sos_req;
343         struct io *resubmit_io;
344
345         if (req != io->req)
346                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
347
348         prepare_io(store, io);
349         if (!req->size) {
350                 if (req->flags & XF_FLUSH) {
351 #if 0
352                         /* note that with FLUSH/size == 0 
353                          * there will probably be a (uint64_t)-1 offset */
354
355                         /* size must be zero */
356                         sos_req->size = 0;
357                         /* all these should be irrelevant on a flush request */
358                         sos_req->target = 0;
359                         sos_req->targetlen= 0;
360                         sos_req->data = 0;
361                         sos_req->offset = 0;
362                         /* philipgian:
363                          * make sure all pending requests are completed and then
364                          * perform flush request to flush them to disk.
365                          */
366                         while (xq_size(&store->free_ops) != store->nr_ops){
367                                 wait_signal(store);
368                                 /* handle any possible resubmissions */
369                                 resubmit_io = get_resubmitted_io(store);
370                                 while (resubmit_io){
371                                         handle_resubmit(store, resubmit_io);
372                                         resubmit_io = get_resubmitted_io(store);
373                                 }
374                         }
375                         r = sos_submit(store->sos, sos_req);
376                         if (r < 0) 
377                                 fail(store,io);
378                         else {
379                                 complete(store, io);
380                         }
381                         return;
382                 } else {
383                         complete(store, io);
384                         return;
385                 }
386 #else
387                         complete(store, io);
388                         return;
389                 }
390 #endif
391         }
392         r = calculate_sosreq(req, sos_req);     
393         if (r < 0 ) {
394                 fail(store, io);
395                 return;
396         }
397
398         switch (req->op) {
399         case X_READ:
400         case X_WRITE:
401                 //log_io("submit", io);
402                 pending(store, io);
403                 r = sos_submit(store->sos, sos_req);
404                 break;
405         default:
406                 snprintf(req->data, req->datalen,
407                          "wtf, corrupt op %u?\n", req->op);
408                 fail(store, io);
409                 return;
410         }
411
412         if (r) {
413                 strerror_r(errno, req->data, req->datalen);
414                 fail(store, io);
415                 return;
416         }
417 }
418
419 static void handle_returned(struct store *store, struct io *io)
420 {
421         io->retval = io->sos_req.retval;
422         switch (io->req->op){
423                 case X_READ:
424                 case X_WRITE:
425                         complete_rw(store, io);
426                         break;
427                 default:
428                         if (io->sos_req.state & S_FAILED)
429                                 fail(store, io);
430                         else
431                                 complete(store, io);
432         }       
433 }
434
435 /* this is safe for now, as long as callback is only called once.
436  * if callback gets called, then sos_request has been completed and no race
437  * conditions occur.
438  */
439 static int sos_cb(struct sos_request *sos_req, unsigned long event)
440 {
441         struct store *store = (struct store *) sos_req->priv;
442         struct io *io = (struct io*) store->ios + sos_req->id;
443
444         if (event == S_NOTIFY_FAIL){
445                 sos_req->state = S_FAILED;
446         }
447         else if (event == S_NOTIFY_ACK) {
448                 sos_req->state = S_ACKED;
449         }
450         else if (event == S_NOTIFY_COMMIT){
451                 sos_req->state = S_COMMITED;
452         }
453         handle_returned(store, io);
454         return 1;
455 }
456
457 static void handle_info(struct store *store, struct io *io)
458 {
459         struct xseg_request *req = io->req;
460
461         *((uint64_t *) req->data) = store->size;
462         req->serviced = req->datalen = sizeof(store->size);
463         io->retval = req->datalen;
464
465         complete(store, io);
466 }
467
468 static void dispatch(struct store *store, struct io *io)
469 {
470         switch (io->req->op) {
471         case X_READ:
472         case X_WRITE:
473                 handle_read_write(store, io); break;
474         case X_INFO:
475                 handle_info(store, io); break;
476         case X_SYNC:
477         default:
478                 handle_unknown(store, io);
479         }
480 }
481
482 static void handle_resubmit(struct store *store, struct io *io)
483 {
484         dispatch(store, io);
485 }
486
487 static void handle_accepted(struct store *store, struct io *io)
488 {
489         struct xseg_request *req = io->req;
490         req->serviced = 0;
491         req->state = XS_ACCEPTED;
492         io->retval = 0;
493         //log_io("accepted", io);
494         gettimeofday(&io->start, NULL);
495         dispatch(store, io);
496 }
497
498 static int sosd_loop(struct store *store)
499 {
500         struct xseg *xseg = store->xseg;
501         uint32_t portno = store->portno;
502         struct io *io, *resubmit_io;
503         struct xseg_request *accepted;
504
505         for (;;) {
506                 accepted = NULL;
507                 xseg_prepare_wait(xseg, portno);
508                 io = alloc_io(store);
509                 if (io) {
510                         accepted = xseg_accept(xseg, portno);
511                         if (accepted) {
512                                 xseg_cancel_wait(xseg, portno);
513                                 io->req = accepted;
514                                 handle_accepted(store, io);
515                         } else
516                                 free_io(store, io);
517                 }
518                 resubmit_io = get_resubmitted_io(store);
519                 if (resubmit_io){
520                         xseg_cancel_wait(xseg, portno);
521                         handle_resubmit(store, resubmit_io);
522                 }
523                 if (!accepted && !resubmit_io) 
524                         xseg_wait_signal(xseg, 10000);
525         }
526
527         return 0;
528 }
529
530 static struct xseg *join(char *spec)
531 {
532         struct xseg_config config;
533         struct xseg *xseg;
534
535         (void)xseg_parse_spec(spec, &config);
536         xseg = xseg_join(config.type, config.name, "posix", NULL);
537         if (xseg)
538                 return xseg;
539
540         (void)xseg_create(&config);
541         return xseg_join(config.type, config.name, "posix", NULL);
542 }
543
544 static int sosd(char *path, unsigned long size, uint32_t nr_ops,
545                   char *spec, long portno)
546 {
547         struct store *store;
548
549         store = malloc(sizeof(struct store));
550         if (!store) {
551                 perror("malloc");
552                 return -1;
553         }
554
555         store->sos = sos_init(sos_cb);
556         if (!store->sos) {
557                 fprintf(stderr, "SOS init failed\n");
558                 return -1;
559         }
560
561         /*
562         r = daemon(1, 1);
563         if (r < 0)
564                 return r;
565                 */
566
567         store->pid = syscall(SYS_gettid);
568
569         // just a temp solution. 
570         // Make all images 20GB. Maybe use an image header object for a more
571         // permantent solution.
572         store->size=20*1024*1024;
573
574         if (sigemptyset(&store->signal_set))
575                 perror("sigemptyset");
576
577         if (sigaddset(&store->signal_set, SIGIO))
578                 perror("sigaddset");
579
580
581         store->nr_ops = nr_ops;
582         store->free_bufs = calloc(nr_ops, sizeof(xqindex));
583         if (!store->free_bufs)
584                 goto malloc_fail;
585
586         store->resubmit_bufs = calloc(nr_ops, sizeof(xqindex));
587         if (!store->resubmit_bufs)
588                 goto malloc_fail;
589
590         store->ios = calloc(nr_ops, sizeof(struct io));
591         if (!store->ios) {
592 malloc_fail:
593                 perror("malloc");
594                 return -1;
595         }
596
597         xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
598         xq_init_empty(&store->resubmit_ops, nr_ops, store->resubmit_bufs);
599
600
601         if (xseg_initialize()) {
602                 printf("cannot initialize library\n");
603                 return -1;
604         }
605         store->xseg = join(spec);
606         if (!store->xseg)
607                 return -1;
608
609         store->xport = xseg_bind_port(store->xseg, portno);
610         if (!store->xport) {
611                 printf("cannot bind to port %ld\n", portno);
612                 return -1;
613         }
614
615         store->portno = xseg_portno(store->xseg, store->xport);
616         printf("sosd on port %u/%u\n",
617                 store->portno, store->xseg->config.nr_ports);
618         
619         return sosd_loop(store);
620 }
621
622 int main(int argc, char **argv)
623 {
624         char *path, *spec = "";
625         unsigned long size;
626         int i;
627         long portno;
628         uint32_t nr_ops;
629         unsigned int debug_level = 0;
630
631         if (argc < 2)
632                 return usage();
633
634         path = argv[1];
635         size = 0;
636         portno = -1;
637         nr_ops = 0;
638
639         for (i = 2; i < argc; i++) {
640                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
641                         spec = argv[i+1];
642                         i += 1;
643                         continue;
644                 }
645
646                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
647                         portno = strtoul(argv[i+1], NULL, 10);
648                         i += 1;
649                         continue;
650                 }
651
652                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
653                         nr_ops = strtoul(argv[i+1], NULL, 10);
654                         i += 1;
655                         continue;
656                 }
657                 if (!strcmp(argv[i], "-v") ) {
658                         debug_level++;
659                         continue;
660                 }
661         }
662
663         sos_set_debug_level(debug_level);
664         verbose = debug_level;
665
666         if (nr_ops <= 0)
667                 nr_ops = 16;
668
669         return sosd(path, size, nr_ops, spec, portno);
670 }
671