fix filed
[archipelago] / xseg / peers / user / filed.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <aio.h>
11 #include <signal.h>
12 #include <limits.h>
13 #include <xseg/xseg.h>
14 #include <pthread.h>
15
16 #define MAX_PATH_SIZE 255
17 #define MAX_FILENAME_SIZE 255
18
19 static int usage(void)
20 {
21         printf("Usage: ./filed <path_to_directory> [options]\n"
22                 "Options: [-p portno]\n"
23                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
24                 "         [-n nr_parallel_ops]\n"
25                 "         [-v]\n");
26         return 1;
27 }
28
29 struct fsync_io {
30         unsigned long cacheidx;
31         int fd;
32         uint64_t time;
33 };
34
35 struct io {
36         struct store *store;
37         struct xseg_request *req;
38         ssize_t retval;
39         long fdcacheidx;
40         pthread_cond_t cond;
41         pthread_mutex_t lock;
42 };
43
44 #define READY (1 << 1)
45
46 struct fdcache_node {
47         volatile int fd;
48         volatile unsigned int ref;
49         volatile unsigned long time;
50         volatile unsigned int flags;
51         pthread_cond_t cond;
52         char target[MAX_FILENAME_SIZE + 1];
53 };
54
55 struct store {
56         struct xseg *xseg;
57         struct xseg_port *xport;
58         uint32_t portno;
59         uint64_t size;
60         struct io *ios;
61         struct xq free_ops;
62         char *free_bufs;
63         long nr_ops;
64         struct sigevent sigevent;
65         int dirfd;
66         uint32_t path_len;
67         uint64_t handled_reqs;
68         unsigned long maxfds;
69         struct fdcache_node *fdcache;
70         pthread_t *iothread;
71         pthread_mutex_t cache_lock;
72         char path[MAX_PATH_SIZE + 1];
73 };
74
75 static unsigned verbose;
76
77 static unsigned long sigaction_count;
78
79 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
80 {
81         sigaction_count++;
82 }
83
84 static void log_io(char *msg, struct io *io)
85 {
86         char target[64], data[64];
87         /* null terminate name in case of req->target is less than 63 characters,
88          * and next character after name (aka first byte of next buffer) is not
89          * null
90          */
91         struct store* store = io->store;
92         struct xseg *xseg = store->xseg;
93         struct xseg_request *req = io->req;
94         char *req_target = xseg_get_target(xseg, req);
95         char *req_data = xseg_get_data(xseg, req);
96
97         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
98
99         strncpy(target, req_target, end);
100         target[end] = 0;
101         strncpy(data, req_data, 63);
102         data[63] = 0;
103
104         fprintf(stderr,
105                 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
106                 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
107                 msg,
108                 (unsigned int)io->fdcacheidx, //this is cacheidx not fd
109                 (unsigned int)req->op,
110                 (unsigned long long)req->offset,
111                 (unsigned long)req->size,
112                 (unsigned long)io->retval,
113                 (unsigned int)req->state,
114                 (unsigned int)req->targetlen, target,
115                 (unsigned long long)req->datalen, data);
116 }
117
118 static struct io *alloc_io(struct store *store)
119 {
120         xqindex idx = xq_pop_head(&store->free_ops, 1);
121         if (idx == Noneidx)
122                 return NULL;
123         return store->ios + idx;
124 }
125
126 static inline void free_io(struct store *store, struct io *io)
127 {
128         xqindex idx = io - store->ios;
129         io->req = NULL;
130         xq_append_head(&store->free_ops, idx, 1);
131 }
132
133
134 static void complete(struct store *store, struct io *io)
135 {
136         struct xseg_request *req = io->req;
137         req->state |= XS_SERVED;
138         if (verbose)
139                 log_io("complete", io);
140         xseg_respond(store->xseg, req->portno, req);
141         xseg_signal(store->xseg, req->portno);
142         __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
143 }
144
145 static void fail(struct store *store, struct io *io)
146 {
147         struct xseg_request *req = io->req;
148         req->state |= XS_FAILED;
149         if (verbose)
150                 log_io("fail", io);
151         xseg_respond(store->xseg, req->portno, req);
152         xseg_signal(store->xseg, req->portno);
153         if (io->fdcacheidx >= 0) {
154                 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
155         }
156 }
157
158 static void pending(struct store *store, struct io *io)
159 {
160         io->req->state = XS_PENDING;
161 }
162
163 static void handle_unknown(struct store *store, struct io *io)
164 {
165         struct xseg *xseg = store->xseg;
166         struct xseg_request *req = io->req;
167         char *data = xseg_get_data(xseg, req);
168         snprintf(data, req->datalen, "unknown request op");
169         fail(store, io);
170 }
171
172 static inline void prepare_io(struct store *store, struct io *io)
173 {
174 }
175
176
177 static int dir_open(    struct store *store, struct io *io,
178                         char *target, uint32_t targetlen, int mode      )
179 {
180         int fd = -1;
181         struct fdcache_node *ce = NULL;
182         long i, lru;
183         uint64_t min;
184         io->fdcacheidx = -1;
185         if (targetlen> MAX_FILENAME_SIZE)
186                 goto out_err;
187
188 start:
189         /* check cache */
190         pthread_mutex_lock(&store->cache_lock);
191 start_locked:
192         lru = -1;
193         min = UINT64_MAX;
194         for (i = 0; i < store->maxfds; i++) {
195                 if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time 
196                                 && (store->fdcache[i].flags & READY)) {
197                         min = store->fdcache[i].time;
198                         lru = i;
199
200                 }
201                 if (!strncmp(store->fdcache[i].target, target, targetlen)) {
202                         if (store->fdcache[i].target[targetlen] == 0) {
203                                 ce = &store->fdcache[i];
204                                 /* if any other io thread is currently opening
205                                  * the file, block until it succeeds or fails
206                                  */
207                                 if (!(ce->flags & READY)) {
208                                         pthread_cond_wait(&ce->cond, &store->cache_lock);
209                                         /* when ready, restart lookup */
210                                         goto start_locked;
211                                 }
212                                 /* if successfully opened */
213                                 if (ce->fd > 0) {
214                                         fd = store->fdcache[i].fd;
215                                         io->fdcacheidx = i;
216                                         goto out;
217                                 }
218                                 /* else open failed for the other io thread, so
219                                  * it should fail for everyone waiting on this
220                                  * file.
221                                  */
222                                 else {
223                                         fd = -1;
224                                         io->fdcacheidx = -1;
225                                         goto out_err_unlock;
226                                 }
227                         }
228                 }
229         }
230         if (lru < 0){
231                 /* all cache entries are currently being used */
232                 pthread_mutex_unlock(&store->cache_lock);
233                 goto start;
234         }
235         if (store->fdcache[lru].ref){
236                 fd = -1;
237                 printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
238                 goto out_err_unlock;
239         }
240         /* make room for new file */
241         ce = &store->fdcache[lru];
242         /* set name here and state to not ready, for any other requests on the
243          * same target that may follow
244          */
245         strncpy(ce->target, target, targetlen);
246         ce->target[targetlen] = 0;
247         ce->flags &= ~READY;
248         pthread_mutex_unlock(&store->cache_lock);
249
250         if (ce->fd >0){
251                 if (close(ce->fd) < 0){
252                         perror("close");
253                 }
254         }
255         fd = openat(store->dirfd, ce->target, O_RDWR);  
256         if (fd < 0) {
257                 if (errno == ENOENT){
258                         fd = openat(store->dirfd, ce->target, 
259                                         O_RDWR | O_CREAT, 0600);
260                         if (fd >= 0)
261                                 goto new_entry;
262                 }
263                 perror(store->path);
264                 /* insert in cache a negative fd to indicate opening error to
265                  * any other ios waiting for the file to open
266                  */
267         }       
268         /* insert in cache */
269 new_entry:
270         pthread_mutex_lock(&store->cache_lock);
271         ce->fd = fd;
272         ce->ref = 0;
273         ce->flags = READY;
274         pthread_cond_broadcast(&ce->cond);
275         if (fd > 0) {
276                 io->fdcacheidx = lru;
277         }
278         else {
279                 io->fdcacheidx = -1;
280                 goto out_err_unlock;
281         }
282
283 out:
284         store->handled_reqs++;
285         ce->time = store->handled_reqs;
286         __sync_fetch_and_add(&ce->ref, 1);
287         pthread_mutex_unlock(&store->cache_lock);
288 out_err:
289         return fd;
290
291 out_err_unlock:
292         pthread_mutex_unlock(&store->cache_lock);
293         goto out_err;
294 }
295
296 static void handle_read_write(struct store *store, struct io *io)
297 {
298         int r, fd, mode;
299         struct xseg *xseg = store->xseg;
300         struct xseg_request *req = io->req;
301         char *target = xseg_get_target(xseg, req);
302         char *data = xseg_get_data(xseg, req);
303
304         if (req->op == X_WRITE)
305                 mode = 1;
306         else
307                 mode = 0;
308         fd = dir_open(store, io, target, req->targetlen, mode);
309         if (fd < 0){
310                 perror("dir_open");
311                 fail(store, io);
312                 return;
313         }
314
315         if (req != io->req)
316                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
317         if (!req->size) {
318                 if (req->flags & (XF_FLUSH | XF_FUA)) {
319                         /* No FLUSH/FUA support yet (O_SYNC ?).
320                          * note that with FLUSH/size == 0 
321                          * there will probably be a (uint64_t)-1 offset */
322                         complete(store, io);
323                         return;
324                 } else {
325                         complete(store, io);
326                         return;
327                 }
328         }
329
330
331         prepare_io(store, io);
332
333         switch (req->op) {
334         case X_READ:
335                 while (req->serviced < req->datalen) {
336                         r = pread(fd, data + req->serviced, 
337                                         req->datalen - req->serviced,
338                                         req->offset + req->serviced);
339                         if (r < 0) {
340                                 req->datalen = req->serviced;
341                                 perror("pread");
342                         }
343                         else if (r == 0) {
344                                 /* reached end of file. zero out the rest data buffer */
345                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
346                                 req->serviced = req->datalen;
347                         }
348                         else {
349                                 req->serviced += r;
350                         }
351                 }
352                 break;
353         case X_WRITE:
354                 while (req->serviced < req->datalen) {
355                         r = pwrite(fd, data + req->serviced, 
356                                         req->datalen - req->serviced,
357                                         req->offset + req->serviced);
358                         if (r < 0) {
359                                 req->datalen = req->serviced;
360                         }
361                         else if (r == 0) {
362                                 /* reached end of file. zero out the rest data buffer */
363                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
364                                 req->serviced = req->datalen;
365                         }
366                         else {
367                                 req->serviced += r;
368                         }
369                 }
370                 r = fsync(fd);
371                 if (r< 0) {
372                         perror("fsync");
373                         /* if fsync fails, then no bytes serviced correctly */
374                         req->serviced = 0;
375                 }
376                 break;
377         default:
378                 snprintf(data, req->datalen,
379                          "wtf, corrupt op %u?\n", req->op);
380                 fail(store, io);
381                 return;
382         }
383
384         if (req->serviced > 0 ) {
385                 complete(store, io);
386         }
387         else {
388                 strerror_r(errno, data, req->datalen);
389                 fail(store, io);
390         }
391         return;
392 }
393
394 static void handle_info(struct store *store, struct io *io)
395 {
396         struct xseg *xseg = store->xseg;
397         struct xseg_request *req = io->req;
398         char *target = xseg_get_target(xseg, req);
399         char *data = xseg_get_data(xseg, req);
400         struct stat stat;
401         int fd, r;
402         off_t size;
403
404         fd = dir_open(store, io, target, req->targetlen, 0);
405         if (fd < 0) {
406                 fail(store, io);
407                 return;
408         }
409         r = fstat(fd, &stat);
410         if (r < 0) {
411                 perror("fstat");
412                 fail(store, io);
413                 return;
414         }
415         size = stat.st_size;
416         *((off_t *) data) = size;
417         req->datalen = sizeof(size);
418
419         complete(store, io);
420 }
421
422 static void dispatch(struct store *store, struct io *io)
423 {
424         if (verbose)
425                 printf("io: 0x%p, req: 0x%p, op %u\n",
426                         (void *)io, (void *)io->req, io->req->op);
427         switch (io->req->op) {
428         case X_READ:
429         case X_WRITE:
430                 handle_read_write(store, io); break;
431         case X_INFO:
432                 handle_info(store, io); break;
433         case X_SYNC:
434         default:
435                 handle_unknown(store, io);
436         }
437 }
438
439 static void handle_accepted(struct store *store, struct io *io)
440 {
441         struct xseg_request *req = io->req;
442         req->serviced = 0;
443         req->state = XS_ACCEPTED;
444         io->retval = 0;
445         dispatch(store, io);
446 }
447
448 static struct io* wake_up_next_iothread(struct store *store)
449 {
450         struct io *io = alloc_io(store);
451
452         if (io){        
453                 pthread_mutex_lock(&io->lock);
454                 pthread_cond_signal(&io->cond);
455                 pthread_mutex_unlock(&io->lock);
456         }
457         return io;
458 }
459
460 void *io_loop(void *arg)
461 {
462         struct io *io = (struct io *) arg;
463         struct store *store = io->store;
464         struct xseg *xseg = store->xseg;
465         uint32_t portno = store->portno;
466         struct xseg_request *accepted;
467
468         for (;;) {
469                 accepted = NULL;
470                 accepted = xseg_accept(xseg, portno);
471                 if (accepted) {
472                         io->req = accepted;
473                         wake_up_next_iothread(store);
474                         handle_accepted(store, io);
475                 }
476                 else {
477                         pthread_mutex_lock(&io->lock);
478                         free_io(store, io);
479                         pthread_cond_wait(&io->cond, &io->lock);
480                         pthread_mutex_unlock(&io->lock);
481                 }
482         }
483
484         return NULL;
485 }
486
487 static struct xseg *join(char *spec)
488 {
489         struct xseg_config config;
490         struct xseg *xseg;
491
492         (void)xseg_parse_spec(spec, &config);
493         xseg = xseg_join(config.type, config.name, "posix", NULL);
494         if (xseg)
495                 return xseg;
496
497         fprintf(stderr, "Failed to join xseg, creating it...\n");
498         (void)xseg_create(&config);
499         return xseg_join(config.type, config.name, "posix", NULL);
500 }
501
502 static int filed_loop(struct store *store)
503 {
504         struct xseg *xseg = store->xseg;
505         uint32_t portno = store->portno;
506         struct io *io;
507
508         for (;;) {
509                 io = wake_up_next_iothread(store);
510                 xseg_prepare_wait(xseg, portno);
511                 xseg_wait_signal(xseg, 1000000UL);
512         }
513         return 0;
514 }
515
516 static int filed(       char *path, unsigned long size, uint32_t nr_ops,
517                         char *spec, long portno )
518 {
519         struct sigaction sa;
520         struct store *store;
521         int i;
522
523         store = malloc(sizeof(struct store));
524         if (!store) {
525                 perror("malloc");
526                 return -1;
527         }
528
529
530         /*
531         r = daemon(1, 1);
532         if (r < 0)
533                 return r;
534                 */
535
536         store->sigevent.sigev_notify = SIGEV_SIGNAL;
537         store->sigevent.sigev_signo = SIGIO;
538         sa.sa_sigaction = sigaction_handler;
539         sa.sa_flags = SA_SIGINFO;
540         if (sigemptyset(&sa.sa_mask))
541                 perror("sigemptyset");
542
543         if (sigaction(SIGIO, &sa, NULL)) {
544                 perror("sigaction");
545                 return -1;
546         }
547
548         store->nr_ops = nr_ops;
549         store->maxfds = 2 * nr_ops;
550
551         store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
552         if(!store->fdcache)
553                 goto malloc_fail;
554
555         store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
556         if(!store->free_bufs)
557                 goto malloc_fail;
558
559         store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
560         if(!store->iothread)
561                 goto malloc_fail;
562
563         store->ios = calloc(nr_ops, sizeof(struct io));
564         if (!store->ios) {
565 malloc_fail:
566                 perror("malloc");
567                 return -1;
568         }
569
570         for (i = 0; i < nr_ops; i++) {
571                 store->ios[i].store = store;
572                 pthread_cond_init(&store->ios[i].cond, NULL);
573                 pthread_mutex_init(&store->ios[i].lock, NULL);
574         }
575
576         xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
577
578         store->handled_reqs = 0;
579         strncpy(store->path, path, MAX_PATH_SIZE);
580         store->path[MAX_PATH_SIZE] = 0;
581
582         store->path_len = strlen(store->path);
583         if (store->path[store->path_len -1] != '/'){
584                 store->path[store->path_len] = '/';
585                 store->path[++store->path_len]= 0;
586         }
587         store->dirfd = open(store->path, O_RDWR);
588         if (!(store->dirfd < 0 && errno == EISDIR)){
589                 fprintf(stderr, "%s is not a directory\n", store->path);
590                 return -1;
591         }
592
593         store->dirfd = open(store->path, O_RDONLY);
594         if (store->dirfd < 0){
595                 perror("Directory open");
596                 return -1;
597         }
598 /*
599         mode = 1;
600         int fd = dir_open(store, ".__tmp", 6, 1);
601         if (fd < 0){
602                 perror("Directory check");
603                 return -1;
604         }
605 */
606         if (xseg_initialize()) {
607                 printf("cannot initialize library\n");
608                 return -1;
609         }
610         store->xseg = join(spec);
611         if (!store->xseg)
612                 return -1;
613
614         store->xport = xseg_bind_port(store->xseg, portno);
615         if (!store->xport) {
616                 printf("cannot bind to port %ld\n", portno);
617                 return -1;
618         }
619
620         store->portno = xseg_portno(store->xseg, store->xport);
621         printf("filed on port %u/%u\n",
622                 store->portno, store->xseg->config.nr_ports);
623
624         for (i = 0; i < nr_ops; i++) {
625                 pthread_cond_init(&store->fdcache[i].cond, NULL);
626                 store->fdcache[i].flags = READY;
627         }
628         for (i = 0; i < nr_ops; i++) {
629                 //TODO error check + cond variable to stop io from starting
630                 //unless all threads are created successfully
631                 pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
632         }
633         pthread_mutex_init(&store->cache_lock, NULL);
634         return filed_loop(store);
635 }
636
637 int main(int argc, char **argv)
638 {
639         char *path, *spec = "";
640         unsigned long size;
641         int i;
642         long portno;
643         uint32_t nr_ops;
644
645         if (argc < 2)
646                 return usage();
647
648         path = argv[1];
649         size = 0;
650         portno = -1;
651         nr_ops = 0;
652
653         for (i = 2; i < argc; i++) {
654                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
655                         spec = argv[i+1];
656                         i += 1;
657                         continue;
658                 }
659
660                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
661                         portno = strtoul(argv[i+1], NULL, 10);
662                         i += 1;
663                         continue;
664                 }
665
666                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
667                         nr_ops = strtoul(argv[i+1], NULL, 10);
668                         i += 1;
669                         continue;
670                 }
671                 if (!strcmp(argv[i], "-v")) {
672                         verbose = 1;
673                         continue;
674                 }
675         }
676
677         if (nr_ops <= 0)
678                 nr_ops = 16;
679
680         return filed(path, size, nr_ops, spec, portno);
681 }
682