Bump version to 0.3.5next
[archipelago] / xseg / peers / user / sosd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #define _GNU_SOURCE
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <unistd.h>
41 #include <string.h>
42 #include <fcntl.h>
43 #include <errno.h>
44 #include <signal.h>
45 #include <xseg/xseg.h>
46 #include <util_libs/user/sos/sos.h>
47 #include <sys/time.h>
48
49 #include <signal.h>
50 #include <sys/syscall.h>
51
52 /* maybe add this to store struct */
53 #define objsize (4*1024*1024)
54 #define MAX_VOL_NAME 242
55
56 static int usage(void)
57 {
58         printf("Usage: ./sosd <path_to_disk_image> [options]\n"
59                 "Options: [-p portno]\n"
60                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
61                 "         [-n nr_parallel_ops]\n");
62         return 1;
63 }
64
65 struct io {
66         struct xseg_request *req;
67         ssize_t retval;
68         struct sos_request sos_req;
69         char objname[MAX_VOL_NAME +1 + 12 + 1];
70         struct timeval start;
71 };
72
73 struct store {
74         struct xseg *xseg;
75         struct xseg_port *xport;
76         uint32_t portno;
77         int fd;
78         uint64_t size;
79         struct io *ios;
80         struct xq free_ops;
81         char *free_bufs;
82         struct xq resubmit_ops;
83         char *resubmit_bufs;
84         long nr_ops;
85         sos_handle_t sos;
86         pid_t pid;
87         sigset_t signal_set;
88 };
89
90 static unsigned int verbose;
91
92 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
93 {
94         return;
95 }
96
97 static void signal_self(struct store *store)
98 {
99         union sigval sigval = {0};
100         pid_t me = store->pid;
101         if (sigqueue(me, SIGIO, sigval) < 0)
102                 perror("sigqueue");
103 }
104
105 static int wait_signal(struct store *store)
106 {
107         int r;
108         siginfo_t siginfo;
109         struct timespec ts;
110         uint32_t usec_timeout = 5000;
111
112         ts.tv_sec = usec_timeout / 1000000;
113         ts.tv_nsec = 1000 * (usec_timeout - ts.tv_sec * 1000000);
114
115         r = sigtimedwait(&store->signal_set, &siginfo, &ts);
116         if (r < 0)
117                 return r;
118
119         return siginfo.si_signo;
120
121 }
122
123 static struct io *alloc_io(struct store *store)
124 {
125         xqindex idx = xq_pop_head(&store->free_ops, 1);
126         if (idx == Noneidx)
127                 return NULL;
128         return store->ios + idx;
129 }
130
131 static inline void free_io(struct store *store, struct io *io)
132 {
133         xqindex idx = io - store->ios;
134         io->req = NULL;
135         xq_append_head(&store->free_ops, idx, 1);
136         /* not the right place. sosd_loop couldn't sleep because of that
137          * needed for flush support. maybe this should go to complete function
138          *
139         signal_self(store);
140         */
141 }
142
143 static void resubmit_io(struct store *store, struct io *io)
144 {
145         xqindex idx = io - store->ios;
146         xq_append_tail(&store->resubmit_ops, idx, 1);
147 }
148
149 static struct io* get_resubmitted_io(struct store *store)
150 {
151         xqindex idx = xq_pop_head(&store->resubmit_ops, 1);
152         if (idx == Noneidx)
153                 return NULL;
154         return store->ios + idx;
155 }
156
157 static void log_io(char *msg, struct io *io)
158 {
159         char target[64], data[64];
160         /* null terminate name in case of req->target is less than 63 characters,
161          * and next character after name (aka first byte of next buffer) is not
162          * null
163          */
164         unsigned int end = (io->req->targetlen> 63) ? 63 : io->req->targetlen;
165         if (verbose) {
166                 strncpy(target, io->req->target, end);
167                 target[end] = 0;
168                 strncpy(data, io->req->data, 63);
169                 data[63] = 0;
170                 printf("%s: sos req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
171                                 "target[%u]:'%s', data[%llu]:\n%s------------------\n\n",
172                                 msg,
173                                 (unsigned int)io->sos_req.id,
174                                 (unsigned int)io->req->op,
175                                 (unsigned long long)io->sos_req.offset,
176                                 (unsigned long)io->sos_req.size,
177                                 (unsigned long)io->req->serviced,
178                                 (unsigned long)io->retval,
179                                 (unsigned int)io->req->state,
180                                 (unsigned int)io->req->targetlen, target,
181                                 (unsigned long long)io->req->datalen, data);
182         }
183 }
184
185 static void complete(struct store *store, struct io *io)
186 {
187         struct xseg_request *req = io->req;
188         /*
189         struct timeval end;
190         unsigned long us;
191         gettimeofday(&end, NULL);
192         timersub(&end, &io->start, &end);
193         us = end.tv_sec*1000000 +end.tv_usec;
194         printf("sosd: Request %lu completed after %lu us\n", io->sos_req.id, us);
195         */
196
197         req->state |= XS_SERVED;
198         log_io("complete", io);
199         xseg_respond(store->xseg, req->portno, req);
200         xseg_signal(store->xseg, req->portno);
201         free_io(store, io);
202 }
203
204 static void fail(struct store *store, struct io *io)
205 {
206         struct xseg_request *req = io->req;
207         req->state |= XS_FAILED;
208         log_io("fail", io);
209         xseg_respond(store->xseg, req->portno, req);
210         xseg_signal(store->xseg, req->portno);
211         free_io(store, io);
212 }
213
214 static void pending(struct store *store, struct io *io)
215 {
216         io->req->state = XS_PENDING;
217 }
218
219 static void handle_unknown(struct store *store, struct io *io)
220 {
221         struct xseg_request *req = io->req;
222         snprintf(req->data, req->datalen, "unknown request op");
223         fail(store, io);
224 }
225
226 static int32_t get_sos_op(uint32_t xseg_op)
227 {
228         switch (xseg_op) {
229         case X_READ:
230                 return S_READ;
231         case X_WRITE:
232                 return S_WRITE;
233         default:
234                 return S_NONE;
235         }
236 }
237
238 static uint32_t get_sos_flags(uint32_t xseg_flags)
239 {
240         uint32_t flags = 0;
241         if (xseg_flags & XF_FLUSH) {
242                 flags |= SF_FLUSH;
243         }
244         if (xseg_flags & XF_FUA) {
245                 flags |= SF_FUA;
246         }
247         return flags;
248 }
249
250 static int calculate_sosreq(struct xseg_request *xseg_req, struct sos_request *sos_req)
251 {
252         unsigned int suffix;
253         int r;
254         char *buf;
255
256         /* get object name from offset in volume */
257         buf = sos_req->target;
258         suffix = (unsigned int) ((xseg_req->offset+xseg_req->serviced) / (uint64_t)objsize) ;
259 //      printf("suffix: %u\n", suffix);
260         if (xseg_req->targetlen> MAX_VOL_NAME){
261                 printf("xseg_req targetlen > MAX_VOL_NAME\n");
262                 return -1;
263         }
264         strncpy(buf, xseg_req->target, xseg_req->targetlen);
265         buf[xseg_req->targetlen] = '_';
266         r = snprintf(buf+xseg_req->targetlen+1, 13, "%012u", suffix);
267         if (r >= 13)
268                 return -1;
269
270         //sos_req->target = buf;
271         sos_req->targetlen = xseg_req->targetlen+1+12;
272
273         /* offset should be set to offset in object */
274         sos_req->offset = (xseg_req->offset + xseg_req->serviced) % objsize;
275         /* sos_req offset + sos_req size  < objsize always
276          * request data up to the end of object.
277          */
278         sos_req->size = (xseg_req->datalen - xseg_req->serviced) ;  /* should this be xseg_req->size ? */
279         if (sos_req->size > objsize - sos_req->offset)
280                 sos_req->size = objsize - sos_req->offset;
281         /* this should have been checked before this call */
282         if (xseg_req->serviced < xseg_req->datalen)
283                 sos_req->data = xseg_req->data + xseg_req->serviced;
284         else
285                 return -1;
286 //      printf("name: %s, size: %lu, offset: %lu, data:%s\n", sos_req->target, 
287 //                      sos_req->size, sos_req->offset, sos_req->data);
288         return 0;
289 }
290
291 static void prepare_sosreq(struct store *store, struct io *io)
292 {
293         struct xseg_request *xseg_req = io->req;
294         struct sos_request *sos_req = &io->sos_req;
295         sos_req->flags = get_sos_flags(xseg_req->flags);
296         sos_req->state = S_PENDING;
297         sos_req->retval = 0;
298         sos_req->op = get_sos_op(xseg_req->op);
299         sos_req->priv = store;
300         sos_req->target = io->objname;
301 }
302
303 static inline void prepare_io(struct store *store, struct io *io)
304 {
305         prepare_sosreq(store, io);
306         /* Assign io id to sos_req id. This can be done as an initialization of
307          * ios, to avoid reseting it every time */
308         io->sos_req.id = io - store->ios;
309 }
310
311
312 static void handle_resubmit(struct store *store, struct io *io);
313
314 static void complete_rw(struct store *store, struct io *io)
315 {
316         int r;
317         struct xseg_request *req = io->req;
318         struct sos_request *sos_req = &io->sos_req;
319         if (req->state == XS_ACCEPTED) {
320                 /* should not happen */
321                 fail(store, io);
322                 return;
323         }
324         if (io->retval > 0)
325                 req->serviced += io->retval;
326         else if (io->retval == 0) {
327                 /* reached end of object. zero out rest of data
328                  * requested from this object
329                  */ 
330                 memset(sos_req->data, 0, sos_req->size);
331                 req->serviced += sos_req->size;
332         }
333         else if (io->retval == -2) {
334                 /* object not found. return zeros instead */
335                 memset(sos_req->data, 0, sos_req->size);
336                 req->serviced += sos_req->size;
337         }
338         else {
339                 /* io->retval < 0 */
340                 fail(store, io);
341                 return;
342         }
343         /* request completed ? */
344         if (req->serviced >= req->datalen) {
345                 complete(store, io);
346                 return;
347         }
348
349         if (req != io->req)
350                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
351         if (!req->size) {
352                 /* should not happen */
353                 fail(store, io);
354                 return;
355         }
356
357         switch (req->op) {
358         case X_READ:
359         case X_WRITE:
360                 log_io("resubmitting", io);
361                 resubmit_io(store, io);
362                 signal_self(store);
363                 break;
364         default:
365                 snprintf(req->data, req->datalen,
366                          "wtf, corrupt op %u?\n", req->op);
367                 fail(store, io);
368                 return;
369         }
370 }
371
372 static void handle_read_write(struct store *store, struct io *io)
373 {
374         int r;
375         struct xseg_request *req = io->req;
376         struct sos_request *sos_req = &io->sos_req;
377         struct io *resubmit_io;
378
379         if (req != io->req)
380                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
381
382         prepare_io(store, io);
383         if (!req->size) {
384                 if (req->flags & XF_FLUSH) {
385 #if 0
386                         /* note that with FLUSH/size == 0 
387                          * there will probably be a (uint64_t)-1 offset */
388
389                         /* size must be zero */
390                         sos_req->size = 0;
391                         /* all these should be irrelevant on a flush request */
392                         sos_req->target = 0;
393                         sos_req->targetlen= 0;
394                         sos_req->data = 0;
395                         sos_req->offset = 0;
396                         /* philipgian:
397                          * make sure all pending requests are completed and then
398                          * perform flush request to flush them to disk.
399                          */
400                         while (xq_size(&store->free_ops) != store->nr_ops){
401                                 wait_signal(store);
402                                 /* handle any possible resubmissions */
403                                 resubmit_io = get_resubmitted_io(store);
404                                 while (resubmit_io){
405                                         handle_resubmit(store, resubmit_io);
406                                         resubmit_io = get_resubmitted_io(store);
407                                 }
408                         }
409                         r = sos_submit(store->sos, sos_req);
410                         if (r < 0) 
411                                 fail(store,io);
412                         else {
413                                 complete(store, io);
414                         }
415                         return;
416                 } else {
417                         complete(store, io);
418                         return;
419                 }
420 #else
421                         complete(store, io);
422                         return;
423                 }
424 #endif
425         }
426         r = calculate_sosreq(req, sos_req);     
427         if (r < 0 ) {
428                 fail(store, io);
429                 return;
430         }
431
432         switch (req->op) {
433         case X_READ:
434         case X_WRITE:
435                 //log_io("submit", io);
436                 pending(store, io);
437                 r = sos_submit(store->sos, sos_req);
438                 break;
439         default:
440                 snprintf(req->data, req->datalen,
441                          "wtf, corrupt op %u?\n", req->op);
442                 fail(store, io);
443                 return;
444         }
445
446         if (r) {
447                 strerror_r(errno, req->data, req->datalen);
448                 fail(store, io);
449                 return;
450         }
451 }
452
453 static void handle_returned(struct store *store, struct io *io)
454 {
455         io->retval = io->sos_req.retval;
456         switch (io->req->op){
457                 case X_READ:
458                 case X_WRITE:
459                         complete_rw(store, io);
460                         break;
461                 default:
462                         if (io->sos_req.state & S_FAILED)
463                                 fail(store, io);
464                         else
465                                 complete(store, io);
466         }       
467 }
468
469 /* this is safe for now, as long as callback is only called once.
470  * if callback gets called, then sos_request has been completed and no race
471  * conditions occur.
472  */
473 static int sos_cb(struct sos_request *sos_req, unsigned long event)
474 {
475         struct store *store = (struct store *) sos_req->priv;
476         struct io *io = (struct io*) store->ios + sos_req->id;
477
478         if (event == S_NOTIFY_FAIL){
479                 sos_req->state = S_FAILED;
480         }
481         else if (event == S_NOTIFY_ACK) {
482                 sos_req->state = S_ACKED;
483         }
484         else if (event == S_NOTIFY_COMMIT){
485                 sos_req->state = S_COMMITED;
486         }
487         handle_returned(store, io);
488         return 1;
489 }
490
491 static void handle_info(struct store *store, struct io *io)
492 {
493         struct xseg_request *req = io->req;
494
495         *((uint64_t *) req->data) = store->size;
496         req->serviced = req->datalen = sizeof(store->size);
497         io->retval = req->datalen;
498
499         complete(store, io);
500 }
501
502 static void dispatch(struct store *store, struct io *io)
503 {
504         switch (io->req->op) {
505         case X_READ:
506         case X_WRITE:
507                 handle_read_write(store, io); break;
508         case X_INFO:
509                 handle_info(store, io); break;
510         case X_SYNC:
511         default:
512                 handle_unknown(store, io);
513         }
514 }
515
516 static void handle_resubmit(struct store *store, struct io *io)
517 {
518         dispatch(store, io);
519 }
520
521 static void handle_accepted(struct store *store, struct io *io)
522 {
523         struct xseg_request *req = io->req;
524         req->serviced = 0;
525         req->state = XS_ACCEPTED;
526         io->retval = 0;
527         //log_io("accepted", io);
528         gettimeofday(&io->start, NULL);
529         dispatch(store, io);
530 }
531
532 static int sosd_loop(struct store *store)
533 {
534         struct xseg *xseg = store->xseg;
535         uint32_t portno = store->portno;
536         struct io *io, *resubmit_io;
537         struct xseg_request *accepted;
538
539         for (;;) {
540                 accepted = NULL;
541                 xseg_prepare_wait(xseg, portno);
542                 io = alloc_io(store);
543                 if (io) {
544                         accepted = xseg_accept(xseg, portno);
545                         if (accepted) {
546                                 xseg_cancel_wait(xseg, portno);
547                                 io->req = accepted;
548                                 handle_accepted(store, io);
549                         } else
550                                 free_io(store, io);
551                 }
552                 resubmit_io = get_resubmitted_io(store);
553                 if (resubmit_io){
554                         xseg_cancel_wait(xseg, portno);
555                         handle_resubmit(store, resubmit_io);
556                 }
557                 if (!accepted && !resubmit_io) 
558                         xseg_wait_signal(xseg, 10000);
559         }
560
561         return 0;
562 }
563
564 static struct xseg *join(char *spec)
565 {
566         struct xseg_config config;
567         struct xseg *xseg;
568
569         (void)xseg_parse_spec(spec, &config);
570         xseg = xseg_join(config.type, config.name, "posix", NULL);
571         if (xseg)
572                 return xseg;
573
574         (void)xseg_create(&config);
575         return xseg_join(config.type, config.name, "posix", NULL);
576 }
577
578 static int sosd(char *path, unsigned long size, uint32_t nr_ops,
579                   char *spec, long portno)
580 {
581         struct store *store;
582
583         store = malloc(sizeof(struct store));
584         if (!store) {
585                 perror("malloc");
586                 return -1;
587         }
588
589         store->sos = sos_init(sos_cb);
590         if (!store->sos) {
591                 fprintf(stderr, "SOS init failed\n");
592                 return -1;
593         }
594
595         /*
596         r = daemon(1, 1);
597         if (r < 0)
598                 return r;
599                 */
600
601         store->pid = syscall(SYS_gettid);
602
603         // just a temp solution. 
604         // Make all images 20GB. Maybe use an image header object for a more
605         // permantent solution.
606         store->size=20*1024*1024;
607
608         if (sigemptyset(&store->signal_set))
609                 perror("sigemptyset");
610
611         if (sigaddset(&store->signal_set, SIGIO))
612                 perror("sigaddset");
613
614
615         store->nr_ops = nr_ops;
616         store->free_bufs = calloc(nr_ops, sizeof(xqindex));
617         if (!store->free_bufs)
618                 goto malloc_fail;
619
620         store->resubmit_bufs = calloc(nr_ops, sizeof(xqindex));
621         if (!store->resubmit_bufs)
622                 goto malloc_fail;
623
624         store->ios = calloc(nr_ops, sizeof(struct io));
625         if (!store->ios) {
626 malloc_fail:
627                 perror("malloc");
628                 return -1;
629         }
630
631         xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
632         xq_init_empty(&store->resubmit_ops, nr_ops, store->resubmit_bufs);
633
634
635         if (xseg_initialize()) {
636                 printf("cannot initialize library\n");
637                 return -1;
638         }
639         store->xseg = join(spec);
640         if (!store->xseg)
641                 return -1;
642
643         store->xport = xseg_bind_port(store->xseg, portno);
644         if (!store->xport) {
645                 printf("cannot bind to port %ld\n", portno);
646                 return -1;
647         }
648
649         store->portno = xseg_portno(store->xseg, store->xport);
650         printf("sosd on port %u/%u\n",
651                 store->portno, store->xseg->config.nr_ports);
652         
653         return sosd_loop(store);
654 }
655
656 int main(int argc, char **argv)
657 {
658         char *path, *spec = "";
659         unsigned long size;
660         int i;
661         long portno;
662         uint32_t nr_ops;
663         unsigned int debug_level = 0;
664
665         if (argc < 2)
666                 return usage();
667
668         path = argv[1];
669         size = 0;
670         portno = -1;
671         nr_ops = 0;
672
673         for (i = 2; i < argc; i++) {
674                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
675                         spec = argv[i+1];
676                         i += 1;
677                         continue;
678                 }
679
680                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
681                         portno = strtoul(argv[i+1], NULL, 10);
682                         i += 1;
683                         continue;
684                 }
685
686                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
687                         nr_ops = strtoul(argv[i+1], NULL, 10);
688                         i += 1;
689                         continue;
690                 }
691                 if (!strcmp(argv[i], "-v") ) {
692                         debug_level++;
693                         continue;
694                 }
695         }
696
697         sos_set_debug_level(debug_level);
698         verbose = debug_level;
699
700         if (nr_ops <= 0)
701                 nr_ops = 16;
702
703         return sosd(path, size, nr_ops, spec, portno);
704 }
705