replace blockd names with sosd
[archipelago] / xseg / peers / sosd.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <signal.h>
11 #include <xseg/xseg.h>
12 #include <sos/sos.h>
13 #include <sys/time.h>
14
15 #include <signal.h>
16 #include <sys/syscall.h>
17
18 /* maybe add this to store struct */
19 #define objsize (4*1024*1024)
20 #define MAX_VOL_NAME 242
21
22 static int usage(void)
23 {
24         printf("Usage: ./sosd <path_to_disk_image> [options]\n"
25                 "Options: [-p portno]\n"
26                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
27                 "         [-n nr_parallel_ops]\n");
28         return 1;
29 }
30
31 struct io {
32         struct xseg_request *req;
33         ssize_t retval;
34         struct sos_request sos_req;
35         char objname[MAX_VOL_NAME +1 + 12 + 1];
36         struct timeval start;
37 };
38
39 struct store {
40         struct xseg *xseg;
41         struct xseg_port *xport;
42         uint32_t portno;
43         int fd;
44         uint64_t size;
45         struct io *ios;
46         struct xq free_ops;
47         char *free_bufs;
48         struct xq resubmit_ops;
49         char *resubmit_bufs;
50         long nr_ops;
51         sos_handle_t sos;
52         pid_t pid;
53         sigset_t signal_set;
54 };
55
56
57
58 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
59 {
60         return;
61 }
62
63 static void signal_self(struct store *store)
64 {
65         union sigval sigval = {0};
66         pid_t me = store->pid;
67         if (sigqueue(me, SIGIO, sigval) < 0)
68                 perror("sigqueue");
69 }
70
71 static int wait_signal(struct store *store)
72 {
73         int r;
74         siginfo_t siginfo;
75         struct timespec ts;
76         uint32_t usec_timeout = 5000;
77
78         ts.tv_sec = usec_timeout / 1000000;
79         ts.tv_nsec = 1000 * (usec_timeout - ts.tv_sec * 1000000);
80
81         r = sigtimedwait(&store->signal_set, &siginfo, &ts);
82         if (r < 0)
83                 return r;
84
85         return siginfo.si_signo;
86
87 }
88
89 static struct io *alloc_io(struct store *store)
90 {
91         xqindex idx = xq_pop_head(&store->free_ops);
92         if (idx == None)
93                 return NULL;
94         return store->ios + idx;
95 }
96
97 static inline void free_io(struct store *store, struct io *io)
98 {
99         xqindex idx = io - store->ios;
100         io->req = NULL;
101         xq_append_head(&store->free_ops, idx);
102         signal_self(store);
103 }
104
105 static void resubmit_io(struct store *store, struct io *io)
106 {
107         xqindex idx = io - store->ios;
108         xq_append_tail(&store->resubmit_ops, idx);
109 }
110
111 static struct io* get_resubmitted_io(struct store *store)
112 {
113         xqindex idx = xq_pop_head(&store->resubmit_ops);
114         if (idx == None)
115                 return NULL;
116         return store->ios + idx;
117 }
118
119 static void log_io(char *msg, struct io *io)
120 {
121         char name[64], data[64];
122         /* null terminate name in case of req->name is less than 63 characters,
123          * and next character after name (aka first byte of next buffer) is not
124          * null
125          */
126         unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
127         strncpy(name, io->req->name, end);
128         name[end] = 0;
129         strncpy(data, io->req->data, 63);
130         data[63] = 0;
131         printf("%s: sos req id:%u, op:%u %llu:%lu serviced: %lu, retval: %lu, reqstate: %u\n"
132                 "name[%u]:'%s', data[%llu]:\n%s------------------\n\n",
133                 msg,
134                 (unsigned int)io->sos_req.id,
135                 (unsigned int)io->req->op,
136                 (unsigned long long)io->sos_req.offset,
137                 (unsigned long)io->sos_req.size,
138                 (unsigned long)io->req->serviced,
139                 (unsigned long)io->retval,
140                 (unsigned int)io->req->state,
141                 (unsigned int)io->req->namesize, name,
142                 (unsigned long long)io->req->datasize, data);
143 }
144
145 static void complete(struct store *store, struct io *io)
146 {
147         struct xseg_request *req = io->req;
148         /*
149         struct timeval end;
150         unsigned long us;
151         gettimeofday(&end, NULL);
152         timersub(&end, &io->start, &end);
153         us = end.tv_sec*1000000 +end.tv_usec;
154         printf("sosd: Request %lu completed after %lu us\n", io->sos_req.id, us);
155         */
156
157         req->state |= XS_SERVED;
158         log_io("complete", io);
159         xseg_respond(store->xseg, req->portno, req);
160         xseg_signal(store->xseg, req->portno);
161         free_io(store, io);
162 }
163
164 static void fail(struct store *store, struct io *io)
165 {
166         struct xseg_request *req = io->req;
167         req->state |= XS_ERROR;
168         log_io("fail", io);
169         xseg_respond(store->xseg, req->portno, req);
170         xseg_signal(store->xseg, req->portno);
171         free_io(store, io);
172 }
173
174 static void pending(struct store *store, struct io *io)
175 {
176         io->req->state = XS_PENDING;
177 }
178
179 static void handle_unknown(struct store *store, struct io *io)
180 {
181         struct xseg_request *req = io->req;
182         snprintf(req->data, req->datasize, "unknown request op");
183         fail(store, io);
184 }
185
186 static int32_t get_sos_op(uint32_t xseg_op)
187 {
188         switch (xseg_op) {
189         case X_READ:
190                 return S_READ;
191         case X_WRITE:
192                 return S_WRITE;
193         default:
194                 return S_NONE;
195         }
196 }
197
198 static uint32_t get_sos_flags(uint32_t xseg_flags)
199 {
200         uint32_t flags = 0;
201         if (xseg_flags & XF_FLUSH) {
202                 flags |= SF_FLUSH;
203         }
204         if (xseg_flags & XF_FUA) {
205                 flags |= SF_FUA;
206         }
207         return flags;
208 }
209
210 static int calculate_sosreq(struct xseg_request *xseg_req, struct sos_request *sos_req)
211 {
212         unsigned int suffix;
213         int r;
214         char *buf;
215
216         /* get object name from offset in volume */
217         buf = sos_req->name;
218         suffix = (unsigned int) ((xseg_req->offset+xseg_req->serviced) / (uint64_t)objsize) ;
219 //      printf("suffix: %u\n", suffix);
220         if (xseg_req->namesize > MAX_VOL_NAME){
221                 printf("xseg_req namesize > MAX_VOL_NAME\n");
222                 return -1;
223         }
224         strncpy(buf, xseg_req->name, xseg_req->namesize);
225         buf[xseg_req->namesize] = '_';
226         r = snprintf(buf+xseg_req->namesize+1, 13, "%012u", suffix);
227         if (r >= 13)
228                 return -1;
229
230         //sos_req->name = buf;
231         sos_req->namesize = xseg_req->namesize+1+12;
232
233         /* offset should be set to offset in object */
234         sos_req->offset = (xseg_req->offset + xseg_req->serviced) % objsize;
235         /* sos_req offset + sos_req size  < objsize always
236          * request data up to the end of object.
237          */
238         sos_req->size = (xseg_req->datasize - xseg_req->serviced) ;  /* should this be xseg_req->size ? */
239         if (sos_req->size > objsize - sos_req->offset)
240                 sos_req->size = objsize - sos_req->offset;
241         /* this should have been checked before this call */
242         if (xseg_req->serviced < xseg_req->datasize)
243                 sos_req->data = xseg_req->data + xseg_req->serviced;
244         else
245                 return -1;
246 //      printf("name: %s, size: %lu, offset: %lu, data:%s\n", sos_req->name, 
247 //                      sos_req->size, sos_req->offset, sos_req->data);
248         return 0;
249 }
250
251 static void prepare_sosreq(struct store *store, struct io *io)
252 {
253         struct xseg_request *xseg_req = io->req;
254         struct sos_request *sos_req = &io->sos_req;
255         sos_req->flags = get_sos_flags(xseg_req->flags);
256         sos_req->state = S_PENDING;
257         sos_req->retval = 0;
258         sos_req->op = get_sos_op(xseg_req->op);
259         sos_req->priv = store;
260         sos_req->name = io->objname;
261 }
262
263 static inline void prepare_io(struct store *store, struct io *io)
264 {
265         prepare_sosreq(store, io);
266         /* Assign io id to sos_req id. This can be done as an initialization of
267          * ios, to avoid reseting it every time */
268         io->sos_req.id = io - store->ios;
269 }
270
271
272 static void handle_resubmit(struct store *store, struct io *io);
273
274 static void complete_rw(struct store *store, struct io *io)
275 {
276         int r;
277         struct xseg_request *req = io->req;
278         struct sos_request *sos_req = &io->sos_req;
279         if (req->state == XS_ACCEPTED) {
280                 /* should not happen */
281                 fail(store, io);
282                 return;
283         }
284         if (io->retval > 0)
285                 req->serviced += io->retval;
286         else if (io->retval == 0) {
287                 /* reached end of object. zero out rest of data
288                  * requested from this object
289                  */ 
290                 memset(sos_req->data, 0, sos_req->size);
291                 req->serviced += sos_req->size;
292         }
293         else if (io->retval == -2) {
294                 /* object not found. return zeros instead */
295                 memset(sos_req->data, 0, sos_req->size);
296                 req->serviced += sos_req->size;
297         }
298         else {
299                 /* io->retval < 0 */
300                 fail(store, io);
301                 return;
302         }
303         /* request completed ? */
304         if (req->serviced >= req->datasize) {
305                 complete(store, io);
306                 return;
307         }
308
309         if (req != io->req)
310                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
311         if (!req->size) {
312                 /* should not happen */
313                 fail(store, io);
314                 return;
315         }
316
317         switch (req->op) {
318         case X_READ:
319         case X_WRITE:
320                 log_io("resubmitting", io);
321                 resubmit_io(store, io);
322                 signal_self(store);
323                 break;
324         default:
325                 snprintf(req->data, req->datasize,
326                          "wtf, corrupt op %u?\n", req->op);
327                 fail(store, io);
328                 return;
329         }
330 }
331
332 static void handle_read_write(struct store *store, struct io *io)
333 {
334         int r;
335         struct xseg_request *req = io->req;
336         struct sos_request *sos_req = &io->sos_req;
337         struct io *resubmit_io;
338
339         if (req != io->req)
340                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
341
342         prepare_io(store, io);
343         if (!req->size) {
344                 if (req->flags & XF_FLUSH) {
345                         /* note that with FLUSH/size == 0 
346                          * there will probably be a (uint64_t)-1 offset */
347
348                         /* size must be zero */
349                         sos_req->size = 0;
350                         /* all these should be irrelevant on a flush request */
351                         sos_req->name = 0;
352                         sos_req->namesize = 0;
353                         sos_req->data = 0;
354                         sos_req->offset = 0;
355                         /* philipgian:
356                          * make sure all pending requests are completed and then
357                          * perform flush request to flush them to disk.
358                          */
359                         while (xq_size(&store->free_ops) != store->nr_ops){
360                                 wait_signal(store);
361                                 /* handle any possible resubmissions */
362                                 resubmit_io = get_resubmitted_io(store);
363                                 while (resubmit_io){
364                                         handle_resubmit(store, resubmit_io);
365                                         resubmit_io = get_resubmitted_io(store);
366                                 }
367                         }
368                         r = sos_submit(store->sos, sos_req);
369                         if (r < 0) 
370                                 fail(store,io);
371                         else {
372                                 complete(store, io);
373                         }
374                         return;
375                 } else {
376                         complete(store, io);
377                         return;
378                 }
379         }
380         r = calculate_sosreq(req, sos_req);     
381         if (r < 0 ) {
382                 fail(store, io);
383                 return;
384         }
385
386         switch (req->op) {
387         case X_READ:
388         case X_WRITE:
389                 //log_io("submit", io);
390                 pending(store, io);
391                 r = sos_submit(store->sos, sos_req);
392                 break;
393         default:
394                 snprintf(req->data, req->datasize,
395                          "wtf, corrupt op %u?\n", req->op);
396                 fail(store, io);
397                 return;
398         }
399
400         if (r) {
401                 strerror_r(errno, req->data, req->datasize);
402                 fail(store, io);
403                 return;
404         }
405 }
406
407 static void handle_returned(struct store *store, struct io *io)
408 {
409         io->retval = io->sos_req.retval;
410         switch (io->req->op){
411                 case X_READ:
412                 case X_WRITE:
413                         complete_rw(store, io);
414                         break;
415                 default:
416                         if (io->sos_req.state & S_FAILED)
417                                 fail(store, io);
418                         else
419                                 complete(store, io);
420         }       
421 }
422
423 /* this is safe for now, as long as callback is only called once.
424  * if callback gets called, then sos_request has been completed and no race
425  * conditions occur.
426  */
427 static int sos_cb(struct sos_request *sos_req, unsigned long event)
428 {
429         struct store *store = (struct store *) sos_req->priv;
430         struct io *io = (struct io*) store->ios + sos_req->id;
431
432         if (event == S_NOTIFY_FAIL){
433                 sos_req->state = S_FAILED;
434         }
435         else if (event == S_NOTIFY_ACK) {
436                 sos_req->state = S_ACKED;
437         }
438         else if (event == S_NOTIFY_COMMIT){
439                 sos_req->state = S_COMMITED;
440         }
441         handle_returned(store, io);
442         return 1;
443 }
444
445
446 static void dispatch(struct store *store, struct io *io)
447 {
448         switch (io->req->op) {
449         case X_READ:
450         case X_WRITE:
451                 handle_read_write(store, io); break;
452         case X_SYNC:
453         default:
454                 handle_unknown(store, io);
455         }
456 }
457
458 static void handle_resubmit(struct store *store, struct io *io)
459 {
460         dispatch(store, io);
461 }
462
463 static void handle_accepted(struct store *store, struct io *io)
464 {
465         struct xseg_request *req = io->req;
466         req->serviced = 0;
467         req->state = XS_ACCEPTED;
468         io->retval = 0;
469         //log_io("accepted", io);
470         gettimeofday(&io->start, NULL);
471         dispatch(store, io);
472 }
473
474 static int sosd_loop(struct store *store)
475 {
476         struct xseg *xseg = store->xseg;
477         uint32_t portno = store->portno;
478         struct io *io, *resubmit_io;
479         struct xseg_request *accepted;
480
481         for (;;) {
482                 accepted = NULL;
483                 xseg_prepare_wait(xseg, portno);
484                 io = alloc_io(store);
485                 if (io) {
486                         accepted = xseg_accept(xseg, portno);
487                         if (accepted) {
488                                 xseg_cancel_wait(xseg, portno);
489                                 io->req = accepted;
490                                 handle_accepted(store, io);
491                         } else
492                                 free_io(store, io);
493                 }
494                 resubmit_io = get_resubmitted_io(store);
495                 if (resubmit_io){
496                         xseg_cancel_wait(xseg, portno);
497                         handle_resubmit(store, resubmit_io);
498                 }
499                 if (!accepted && !resubmit_io) 
500                         xseg_wait_signal(xseg, portno, 10000);
501         }
502
503         return 0;
504 }
505
506 static struct xseg *join(char *spec)
507 {
508         struct xseg_config config;
509         struct xseg *xseg;
510
511         (void)xseg_parse_spec(spec, &config);
512         xseg = xseg_join(config.type, config.name);
513         if (xseg)
514                 return xseg;
515
516         (void)xseg_create(&config);
517         return xseg_join(config.type, config.name);
518 }
519
520 static int sosd(char *path, unsigned long size, uint32_t nr_ops,
521                   char *spec, long portno)
522 {
523         struct store *store;
524
525         store = malloc(sizeof(struct store));
526         if (!store) {
527                 perror("malloc");
528                 return -1;
529         }
530
531         store->sos = sos_init(sos_cb);
532         if (!store->sos) {
533                 fprintf(stderr, "SOS init failed\n");
534                 return -1;
535         }
536
537         /*
538         r = daemon(1, 1);
539         if (r < 0)
540                 return r;
541                 */
542
543         store->pid = syscall(SYS_gettid);
544
545         if (sigemptyset(&store->signal_set))
546                 perror("sigemptyset");
547
548         if (sigaddset(&store->signal_set, SIGIO))
549                 perror("sigaddset");
550
551
552         store->nr_ops = nr_ops;
553         store->free_bufs = calloc(nr_ops, sizeof(xqindex));
554         if (!store->free_bufs)
555                 goto malloc_fail;
556
557         store->resubmit_bufs = calloc(nr_ops, sizeof(xqindex));
558         if (!store->resubmit_bufs)
559                 goto malloc_fail;
560
561         store->ios = calloc(nr_ops, sizeof(struct io));
562         if (!store->ios) {
563 malloc_fail:
564                 perror("malloc");
565                 return -1;
566         }
567
568         xq_init_seq(&store->free_ops, nr_ops, nr_ops, store->free_bufs);
569         xq_init_empty(&store->resubmit_ops, nr_ops, store->resubmit_bufs);
570
571
572         if (xseg_initialize("posix")) {
573                 printf("cannot initialize library\n");
574                 return -1;
575         }
576         store->xseg = join(spec);
577         if (!store->xseg)
578                 return -1;
579
580         store->xport = xseg_bind_port(store->xseg, portno);
581         if (!store->xport) {
582                 printf("cannot bind to port %ld\n", portno);
583                 return -1;
584         }
585
586         store->portno = xseg_portno(store->xseg, store->xport);
587         printf("sosd on port %u/%u\n",
588                 store->portno, store->xseg->config.nr_ports);
589         
590         return sosd_loop(store);
591 }
592
593 int main(int argc, char **argv)
594 {
595         char *path, *spec = "";
596         unsigned long size;
597         int i;
598         long portno;
599         uint32_t nr_ops;
600         unsigned int debug_level = 0;
601
602         if (argc < 2)
603                 return usage();
604
605         path = argv[1];
606         size = 0;
607         portno = -1;
608         nr_ops = 0;
609
610         for (i = 2; i < argc; i++) {
611                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
612                         spec = argv[i+1];
613                         i += 1;
614                         continue;
615                 }
616
617                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
618                         portno = strtoul(argv[i+1], NULL, 10);
619                         i += 1;
620                         continue;
621                 }
622
623                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
624                         nr_ops = strtoul(argv[i+1], NULL, 10);
625                         i += 1;
626                         continue;
627                 }
628                 if (!strcmp(argv[i], "-v") ) {
629                         debug_level++;
630                         continue;
631                 }
632         }
633
634         sos_set_debug_level(debug_level);
635
636         if (nr_ops <= 0)
637                 nr_ops = 16;
638
639         return sosd(path, size, nr_ops, spec, portno);
640 }
641