add support for delete in filed
[archipelago] / xseg / peers / user / filed.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <aio.h>
11 #include <signal.h>
12 #include <limits.h>
13 #include <xseg/xseg.h>
14 #include <pthread.h>
15 #include <xseg/protocol.h>
16 #include <sys/sendfile.h>
17
18
19 #define MAX_PATH_SIZE 255
20 #define MAX_FILENAME_SIZE 255
21
22 static int usage(void)
23 {
24         printf("Usage: ./filed <path_to_directory> [options]\n"
25                 "Options: [-p portno]\n"
26                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
27                 "         [-n nr_parallel_ops]\n"
28                 "         [-v]\n");
29         return 1;
30 }
31
32 struct fsync_io {
33         unsigned long cacheidx;
34         int fd;
35         uint64_t time;
36 };
37
38 struct io {
39         struct store *store;
40         struct xseg_request *req;
41         ssize_t retval;
42         long fdcacheidx;
43         pthread_cond_t cond;
44         pthread_mutex_t lock;
45 };
46
47 #define READY (1 << 1)
48
49 struct fdcache_node {
50         volatile int fd;
51         volatile unsigned int ref;
52         volatile unsigned long time;
53         volatile unsigned int flags;
54         pthread_cond_t cond;
55         char target[MAX_FILENAME_SIZE + 1];
56 };
57
58 struct store {
59         struct xseg *xseg;
60         struct xseg_port *xport;
61         uint32_t portno;
62         uint64_t size;
63         struct io *ios;
64         struct xq free_ops;
65         char *free_bufs;
66         long nr_ops;
67         struct sigevent sigevent;
68         int dirfd;
69         uint32_t path_len;
70         uint64_t handled_reqs;
71         unsigned long maxfds;
72         struct fdcache_node *fdcache;
73         pthread_t *iothread;
74         pthread_mutex_t cache_lock;
75         char path[MAX_PATH_SIZE + 1];
76 };
77
78 static unsigned verbose;
79
80 static unsigned long sigaction_count;
81
82 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
83 {
84         sigaction_count++;
85 }
86
87 static void log_io(char *msg, struct io *io)
88 {
89         char target[64], data[64];
90         /* null terminate name in case of req->target is less than 63 characters,
91          * and next character after name (aka first byte of next buffer) is not
92          * null
93          */
94         struct store* store = io->store;
95         struct xseg *xseg = store->xseg;
96         struct xseg_request *req = io->req;
97         char *req_target = xseg_get_target(xseg, req);
98         char *req_data = xseg_get_data(xseg, req);
99
100         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
101
102         strncpy(target, req_target, end);
103         target[end] = 0;
104         strncpy(data, req_data, 63);
105         data[63] = 0;
106
107         fprintf(stderr,
108                 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
109                 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
110                 msg,
111                 (unsigned int)io->fdcacheidx, //this is cacheidx not fd
112                 (unsigned int)req->op,
113                 (unsigned long long)req->offset,
114                 (unsigned long)req->size,
115                 (unsigned long)io->retval,
116                 (unsigned int)req->state,
117                 (unsigned int)req->targetlen, target,
118                 (unsigned long long)req->datalen, data);
119 }
120
121 static struct io *alloc_io(struct store *store)
122 {
123         xqindex idx = xq_pop_head(&store->free_ops, 1);
124         if (idx == Noneidx)
125                 return NULL;
126         return store->ios + idx;
127 }
128
129 static inline void free_io(struct store *store, struct io *io)
130 {
131         xqindex idx = io - store->ios;
132         io->req = NULL;
133         xq_append_head(&store->free_ops, idx, 1);
134 }
135
136
137 static void complete(struct store *store, struct io *io)
138 {
139         struct xseg_request *req = io->req;
140         xport p;
141         req->state |= XS_SERVED;
142         if (verbose)
143                 log_io("complete", io);
144         while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
145                 ;
146         xseg_signal(store->xseg, p);
147         __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
148 }
149
150 static void fail(struct store *store, struct io *io)
151 {
152         struct xseg_request *req = io->req;
153         xport p;
154         req->state |= XS_FAILED;
155         if (verbose)
156                 log_io("fail", io);
157         while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
158                 ;
159         xseg_signal(store->xseg, p);
160         if (io->fdcacheidx >= 0) {
161                 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
162         }
163 }
164
165 static void pending(struct store *store, struct io *io)
166 {
167         io->req->state = XS_PENDING;
168 }
169
170 static void handle_unknown(struct store *store, struct io *io)
171 {
172         struct xseg *xseg = store->xseg;
173         struct xseg_request *req = io->req;
174         char *data = xseg_get_data(xseg, req);
175         snprintf(data, req->datalen, "unknown request op");
176         fail(store, io);
177 }
178
179 static inline void prepare_io(struct store *store, struct io *io)
180 {
181 }
182
183
184 static int dir_open(    struct store *store, struct io *io,
185                         char *target, uint32_t targetlen, int mode      )
186 {
187         int fd = -1;
188         struct fdcache_node *ce = NULL;
189         long i, lru;
190         uint64_t min;
191         io->fdcacheidx = -1;
192         if (targetlen> MAX_FILENAME_SIZE)
193                 goto out_err;
194
195 start:
196         /* check cache */
197         pthread_mutex_lock(&store->cache_lock);
198 start_locked:
199         lru = -1;
200         min = UINT64_MAX;
201         for (i = 0; i < store->maxfds; i++) {
202                 if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time 
203                                 && (store->fdcache[i].flags & READY)) {
204                         min = store->fdcache[i].time;
205                         lru = i;
206
207                 }
208                 if (!strncmp(store->fdcache[i].target, target, targetlen)) {
209                         if (store->fdcache[i].target[targetlen] == 0) {
210                                 ce = &store->fdcache[i];
211                                 /* if any other io thread is currently opening
212                                  * the file, block until it succeeds or fails
213                                  */
214                                 if (!(ce->flags & READY)) {
215                                         pthread_cond_wait(&ce->cond, &store->cache_lock);
216                                         /* when ready, restart lookup */
217                                         goto start_locked;
218                                 }
219                                 /* if successfully opened */
220                                 if (ce->fd > 0) {
221                                         fd = store->fdcache[i].fd;
222                                         io->fdcacheidx = i;
223                                         goto out;
224                                 }
225                                 /* else open failed for the other io thread, so
226                                  * it should fail for everyone waiting on this
227                                  * file.
228                                  */
229                                 else {
230                                         fd = -1;
231                                         io->fdcacheidx = -1;
232                                         goto out_err_unlock;
233                                 }
234                         }
235                 }
236         }
237         if (lru < 0){
238                 /* all cache entries are currently being used */
239                 pthread_mutex_unlock(&store->cache_lock);
240                 goto start;
241         }
242         if (store->fdcache[lru].ref){
243                 fd = -1;
244                 printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
245                 goto out_err_unlock;
246         }
247         /* make room for new file */
248         ce = &store->fdcache[lru];
249         /* set name here and state to not ready, for any other requests on the
250          * same target that may follow
251          */
252         strncpy(ce->target, target, targetlen);
253         ce->target[targetlen] = 0;
254         ce->flags &= ~READY;
255         pthread_mutex_unlock(&store->cache_lock);
256
257         if (ce->fd >0){
258                 if (close(ce->fd) < 0){
259                         perror("close");
260                 }
261         }
262         fd = openat(store->dirfd, ce->target, O_RDWR);  
263         if (fd < 0) {
264                 if (errno == ENOENT){
265                         fd = openat(store->dirfd, ce->target, 
266                                         O_RDWR | O_CREAT, 0600);
267                         if (fd >= 0)
268                                 goto new_entry;
269                 }
270                 perror(store->path);
271                 /* insert in cache a negative fd to indicate opening error to
272                  * any other ios waiting for the file to open
273                  */
274         }       
275         /* insert in cache */
276 new_entry:
277         pthread_mutex_lock(&store->cache_lock);
278         ce->fd = fd;
279         ce->ref = 0;
280         ce->flags = READY;
281         pthread_cond_broadcast(&ce->cond);
282         if (fd > 0) {
283                 io->fdcacheidx = lru;
284         }
285         else {
286                 io->fdcacheidx = -1;
287                 goto out_err_unlock;
288         }
289
290 out:
291         store->handled_reqs++;
292         ce->time = store->handled_reqs;
293         __sync_fetch_and_add(&ce->ref, 1);
294         pthread_mutex_unlock(&store->cache_lock);
295 out_err:
296         return fd;
297
298 out_err_unlock:
299         pthread_mutex_unlock(&store->cache_lock);
300         goto out_err;
301 }
302
303 static void handle_read_write(struct store *store, struct io *io)
304 {
305         int r, fd, mode;
306         struct xseg *xseg = store->xseg;
307         struct xseg_request *req = io->req;
308         char *target = xseg_get_target(xseg, req);
309         char *data = xseg_get_data(xseg, req);
310
311         if (req->op == X_WRITE)
312                 mode = 1;
313         else
314                 mode = 0;
315         fd = dir_open(store, io, target, req->targetlen, mode);
316         if (fd < 0){
317                 perror("dir_open");
318                 fail(store, io);
319                 return;
320         }
321
322         if (req != io->req)
323                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
324         if (!req->size) {
325                 if (req->flags & (XF_FLUSH | XF_FUA)) {
326                         /* No FLUSH/FUA support yet (O_SYNC ?).
327                          * note that with FLUSH/size == 0 
328                          * there will probably be a (uint64_t)-1 offset */
329                         complete(store, io);
330                         return;
331                 } else {
332                         complete(store, io);
333                         return;
334                 }
335         }
336
337
338         prepare_io(store, io);
339
340         switch (req->op) {
341         case X_READ:
342                 while (req->serviced < req->datalen) {
343                         r = pread(fd, data + req->serviced, 
344                                         req->datalen - req->serviced,
345                                         req->offset + req->serviced);
346                         if (r < 0) {
347                                 req->datalen = req->serviced;
348                                 perror("pread");
349                         }
350                         else if (r == 0) {
351                                 /* reached end of file. zero out the rest data buffer */
352                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
353                                 req->serviced = req->datalen;
354                         }
355                         else {
356                                 req->serviced += r;
357                         }
358                 }
359                 break;
360         case X_WRITE:
361                 while (req->serviced < req->datalen) {
362                         r = pwrite(fd, data + req->serviced, 
363                                         req->datalen - req->serviced,
364                                         req->offset + req->serviced);
365                         if (r < 0) {
366                                 req->datalen = req->serviced;
367                         }
368                         else if (r == 0) {
369                                 /* reached end of file. zero out the rest data buffer */
370                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
371                                 req->serviced = req->datalen;
372                         }
373                         else {
374                                 req->serviced += r;
375                         }
376                 }
377                 r = fsync(fd);
378                 if (r< 0) {
379                         perror("fsync");
380                         /* if fsync fails, then no bytes serviced correctly */
381                         req->serviced = 0;
382                 }
383                 break;
384         default:
385                 snprintf(data, req->datalen,
386                          "wtf, corrupt op %u?\n", req->op);
387                 fail(store, io);
388                 return;
389         }
390
391         if (req->serviced > 0 ) {
392                 complete(store, io);
393         }
394         else {
395                 strerror_r(errno, data, req->datalen);
396                 fail(store, io);
397         }
398         return;
399 }
400
401 static void handle_info(struct store *store, struct io *io)
402 {
403         struct xseg *xseg = store->xseg;
404         struct xseg_request *req = io->req;
405         char *target = xseg_get_target(xseg, req);
406         char *data = xseg_get_data(xseg, req);
407         struct stat stat;
408         int fd, r;
409         off_t size;
410
411         fd = dir_open(store, io, target, req->targetlen, 0);
412         if (fd < 0) {
413                 fail(store, io);
414                 return;
415         }
416         r = fstat(fd, &stat);
417         if (r < 0) {
418                 perror("fstat");
419                 fail(store, io);
420                 return;
421         }
422         size = stat.st_size;
423         *((off_t *) data) = size;
424         req->datalen = sizeof(size);
425
426         complete(store, io);
427 }
428
429 static void handle_copy(struct store *store, struct io *io)
430 {
431         struct xseg_request *req = io->req;
432         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(store->xseg, req);
433         struct stat st;
434         int n, src, dst;
435         char *target = xseg_get_target(store->xseg, req);
436
437         dst = dir_open(store, io, target, req->targetlen, 1);
438         if (dst < 0) {
439                 fprintf(stderr, "fail in dst\n");
440                 fail(store, io);
441                 free(buf);
442                 return;
443         }
444
445         src = openat(store->dirfd, xcopy->target, O_RDWR);      
446         if (src < 0) {
447                 fprintf(stderr, "fail in src\n");
448                 fail(store, io);
449                 free(buf);
450                 return;
451         }
452
453         fstat(src, &st);
454         n = sendfile(dst, src, 0, st.st_size);
455         if (n != st.st_size) {
456                 fprintf(stderr, "fail in copy\n");
457                 fail(store, io);
458                 goto out;
459         }
460
461         if (n < 0) {
462                 fprintf(stderr, "fail in cp\n");
463                 fail(store, io);
464                 goto out;
465         }
466
467         complete(store, io);
468
469 out:
470         free(buf);
471         close(src);
472 }
473
474 static void handle_delete(struct store *store, struct io *io)
475 {
476         struct xseg_request *req = io->req;
477         int fd;
478         char *target = xseg_get_target(store->xseg, req);
479         
480         fd = dir_open(store, io, target, req->targetlen, 0);
481         if (fd < 0) {
482                 fprintf(stderr, "fail in dir_open\n");
483                 fail(store, io);
484                 return;
485         }
486
487         /* 'invalidate' cache entry */
488         if (io->fdcacheidx >= 0) {
489                 store->fdcache[io->fdcacheidx].fd = -1;
490         }
491
492         close(fd);
493         char buf[MAX_FILENAME_SIZE + 1];
494         strncpy(buf, target, req->targetlen);
495         buf[req->targetlen] = 0;
496         unlinkat(store->dirfd, buf, 0);
497
498         complete(pfiled, io);
499
500         return;
501 }
502
503 static void dispatch(struct store *store, struct io *io)
504 {
505         if (verbose)
506                 printf("io: 0x%p, req: 0x%p, op %u\n",
507                         (void *)io, (void *)io->req, io->req->op);
508         switch (io->req->op) {
509         case X_READ:
510         case X_WRITE:
511                 handle_read_write(store, io); break;
512         case X_INFO:
513                 handle_info(store, io); break;
514         case X_DELETE:
515                 handle_delete(store, io); break;
516         case X_COPY:
517                 handle_copy(store, io); break;
518         case X_SYNC:
519         default:
520                 handle_unknown(store, io);
521         }
522 }
523
524 static void handle_accepted(struct store *store, struct io *io)
525 {
526         struct xseg_request *req = io->req;
527         req->serviced = 0;
528         req->state = XS_ACCEPTED;
529         io->retval = 0;
530         dispatch(store, io);
531 }
532
533 static struct io* wake_up_next_iothread(struct store *store)
534 {
535         struct io *io = alloc_io(store);
536
537         if (io){        
538                 pthread_mutex_lock(&io->lock);
539                 pthread_cond_signal(&io->cond);
540                 pthread_mutex_unlock(&io->lock);
541         }
542         return io;
543 }
544
545 void *io_loop(void *arg)
546 {
547         struct io *io = (struct io *) arg;
548         struct store *store = io->store;
549         struct xseg *xseg = store->xseg;
550         uint32_t portno = store->portno;
551         struct xseg_request *accepted;
552
553         for (;;) {
554                 accepted = NULL;
555                 accepted = xseg_accept(xseg, portno);
556                 if (accepted) {
557                         io->req = accepted;
558                         wake_up_next_iothread(store);
559                         handle_accepted(store, io);
560                 }
561                 else {
562                         pthread_mutex_lock(&io->lock);
563                         free_io(store, io);
564                         pthread_cond_wait(&io->cond, &io->lock);
565                         pthread_mutex_unlock(&io->lock);
566                 }
567         }
568
569         return NULL;
570 }
571
572 static struct xseg *join(char *spec)
573 {
574         struct xseg_config config;
575         struct xseg *xseg;
576
577         (void)xseg_parse_spec(spec, &config);
578         xseg = xseg_join(config.type, config.name, "posix", NULL);
579         if (xseg)
580                 return xseg;
581
582         fprintf(stderr, "Failed to join xseg, creating it...\n");
583         (void)xseg_create(&config);
584         return xseg_join(config.type, config.name, "posix", NULL);
585 }
586
587 static int filed_loop(struct store *store)
588 {
589         struct xseg *xseg = store->xseg;
590         uint32_t portno = store->portno;
591         struct io *io;
592
593         for (;;) {
594                 io = wake_up_next_iothread(store);
595                 xseg_prepare_wait(xseg, portno);
596                 xseg_wait_signal(xseg, 1000000UL);
597         }
598         return 0;
599 }
600
601 static int filed(       char *path, unsigned long size, uint32_t nr_ops,
602                         char *spec, long portno )
603 {
604         struct sigaction sa;
605         struct store *store;
606         int i;
607
608         store = malloc(sizeof(struct store));
609         if (!store) {
610                 perror("malloc");
611                 return -1;
612         }
613
614
615         /*
616         r = daemon(1, 1);
617         if (r < 0)
618                 return r;
619                 */
620
621         store->sigevent.sigev_notify = SIGEV_SIGNAL;
622         store->sigevent.sigev_signo = SIGIO;
623         sa.sa_sigaction = sigaction_handler;
624         sa.sa_flags = SA_SIGINFO;
625         if (sigemptyset(&sa.sa_mask))
626                 perror("sigemptyset");
627
628         if (sigaction(SIGIO, &sa, NULL)) {
629                 perror("sigaction");
630                 return -1;
631         }
632
633         store->nr_ops = nr_ops;
634         store->maxfds = 2 * nr_ops;
635
636         store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
637         if(!store->fdcache)
638                 goto malloc_fail;
639
640         store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
641         if(!store->free_bufs)
642                 goto malloc_fail;
643
644         store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
645         if(!store->iothread)
646                 goto malloc_fail;
647
648         store->ios = calloc(nr_ops, sizeof(struct io));
649         if (!store->ios) {
650 malloc_fail:
651                 perror("malloc");
652                 return -1;
653         }
654
655         for (i = 0; i < nr_ops; i++) {
656                 store->ios[i].store = store;
657                 pthread_cond_init(&store->ios[i].cond, NULL);
658                 pthread_mutex_init(&store->ios[i].lock, NULL);
659         }
660
661         xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
662
663         store->handled_reqs = 0;
664         strncpy(store->path, path, MAX_PATH_SIZE);
665         store->path[MAX_PATH_SIZE] = 0;
666
667         store->path_len = strlen(store->path);
668         if (store->path[store->path_len -1] != '/'){
669                 store->path[store->path_len] = '/';
670                 store->path[++store->path_len]= 0;
671         }
672         store->dirfd = open(store->path, O_RDWR);
673         if (!(store->dirfd < 0 && errno == EISDIR)){
674                 fprintf(stderr, "%s is not a directory\n", store->path);
675                 return -1;
676         }
677
678         store->dirfd = open(store->path, O_RDONLY);
679         if (store->dirfd < 0){
680                 perror("Directory open");
681                 return -1;
682         }
683 /*
684         mode = 1;
685         int fd = dir_open(store, ".__tmp", 6, 1);
686         if (fd < 0){
687                 perror("Directory check");
688                 return -1;
689         }
690 */
691         if (xseg_initialize()) {
692                 printf("cannot initialize library\n");
693                 return -1;
694         }
695         store->xseg = join(spec);
696         if (!store->xseg)
697                 return -1;
698
699         store->xport = xseg_bind_port(store->xseg, portno);
700         if (!store->xport) {
701                 printf("cannot bind to port %ld\n", portno);
702                 return -1;
703         }
704
705         store->portno = xseg_portno(store->xseg, store->xport);
706         printf("filed on port %u/%u\n",
707                 store->portno, store->xseg->config.nr_ports);
708
709         if (xseg_init_local_signal(store->xseg, store->portno) < 0){
710                 printf("cannot int local signals\n");
711                 return -1;
712         }
713
714         for (i = 0; i < nr_ops; i++) {
715                 pthread_cond_init(&store->fdcache[i].cond, NULL);
716                 store->fdcache[i].flags = READY;
717         }
718         for (i = 0; i < nr_ops; i++) {
719                 //TODO error check + cond variable to stop io from starting
720                 //unless all threads are created successfully
721                 pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
722         }
723         pthread_mutex_init(&store->cache_lock, NULL);
724         return filed_loop(store);
725 }
726
727 int main(int argc, char **argv)
728 {
729         char *path, *spec = "";
730         unsigned long size;
731         int i;
732         long portno;
733         uint32_t nr_ops;
734
735         if (argc < 2)
736                 return usage();
737
738         path = argv[1];
739         size = 0;
740         portno = -1;
741         nr_ops = 0;
742
743         for (i = 2; i < argc; i++) {
744                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
745                         spec = argv[i+1];
746                         i += 1;
747                         continue;
748                 }
749
750                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
751                         portno = strtoul(argv[i+1], NULL, 10);
752                         i += 1;
753                         continue;
754                 }
755
756                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
757                         nr_ops = strtoul(argv[i+1], NULL, 10);
758                         i += 1;
759                         continue;
760                 }
761                 if (!strcmp(argv[i], "-v")) {
762                         verbose = 1;
763                         continue;
764                 }
765         }
766
767         if (nr_ops <= 0)
768                 nr_ops = 16;
769
770         return filed(path, size, nr_ops, spec, portno);
771 }
772