remove debug from launce script.
[archipelago] / xseg / peers / user / filed.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <aio.h>
11 #include <signal.h>
12 #include <limits.h>
13 #include <xseg/xseg.h>
14 #include <pthread.h>
15 #include <xseg/protocol.h>
16 #include <sys/sendfile.h>
17
18
19 #define MAX_PATH_SIZE 255
20 #define MAX_FILENAME_SIZE 255
21
22 static int usage(void)
23 {
24         printf("Usage: ./filed <path_to_directory> [options]\n"
25                 "Options: [-p portno]\n"
26                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
27                 "         [-n nr_parallel_ops]\n"
28                 "         [-v]\n");
29         return 1;
30 }
31
32 struct fsync_io {
33         unsigned long cacheidx;
34         int fd;
35         uint64_t time;
36 };
37
38 struct io {
39         struct store *store;
40         struct xseg_request *req;
41         ssize_t retval;
42         long fdcacheidx;
43         pthread_cond_t cond;
44         pthread_mutex_t lock;
45 };
46
47 #define READY (1 << 1)
48
49 struct fdcache_node {
50         volatile int fd;
51         volatile unsigned int ref;
52         volatile unsigned long time;
53         volatile unsigned int flags;
54         pthread_cond_t cond;
55         char target[MAX_FILENAME_SIZE + 1];
56 };
57
58 struct store {
59         struct xseg *xseg;
60         struct xseg_port *xport;
61         uint32_t portno;
62         uint64_t size;
63         struct io *ios;
64         struct xq free_ops;
65         char *free_bufs;
66         long nr_ops;
67         struct sigevent sigevent;
68         int dirfd;
69         uint32_t path_len;
70         uint64_t handled_reqs;
71         unsigned long maxfds;
72         struct fdcache_node *fdcache;
73         pthread_t *iothread;
74         pthread_mutex_t cache_lock;
75         char path[MAX_PATH_SIZE + 1];
76 };
77
78 static unsigned verbose;
79
80 static unsigned long sigaction_count;
81
82 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
83 {
84         sigaction_count++;
85 }
86
87 static void log_io(char *msg, struct io *io)
88 {
89         char target[64], data[64];
90         /* null terminate name in case of req->target is less than 63 characters,
91          * and next character after name (aka first byte of next buffer) is not
92          * null
93          */
94         struct store* store = io->store;
95         struct xseg *xseg = store->xseg;
96         struct xseg_request *req = io->req;
97         char *req_target = xseg_get_target(xseg, req);
98         char *req_data = xseg_get_data(xseg, req);
99
100         unsigned int end = (req->targetlen> 63) ? 63 : req->targetlen;
101
102         strncpy(target, req_target, end);
103         target[end] = 0;
104         strncpy(data, req_data, 63);
105         data[63] = 0;
106
107         fprintf(stderr,
108                 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
109                 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
110                 msg,
111                 (unsigned int)io->fdcacheidx, //this is cacheidx not fd
112                 (unsigned int)req->op,
113                 (unsigned long long)req->offset,
114                 (unsigned long)req->size,
115                 (unsigned long)io->retval,
116                 (unsigned int)req->state,
117                 (unsigned int)req->targetlen, target,
118                 (unsigned long long)req->datalen, data);
119 }
120
121 static struct io *alloc_io(struct store *store)
122 {
123         xqindex idx = xq_pop_head(&store->free_ops, 1);
124         if (idx == Noneidx)
125                 return NULL;
126         return store->ios + idx;
127 }
128
129 static inline void free_io(struct store *store, struct io *io)
130 {
131         xqindex idx = io - store->ios;
132         io->req = NULL;
133         xq_append_head(&store->free_ops, idx, 1);
134 }
135
136
137 static void complete(struct store *store, struct io *io)
138 {
139         struct xseg_request *req = io->req;
140         xport p;
141         req->state |= XS_SERVED;
142         if (verbose)
143                 log_io("complete", io);
144         while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
145                 ;
146         xseg_signal(store->xseg, p);
147         __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
148 }
149
150 static void fail(struct store *store, struct io *io)
151 {
152         struct xseg_request *req = io->req;
153         xport p;
154         req->state |= XS_FAILED;
155         if (verbose)
156                 log_io("fail", io);
157         while ((p = xseg_respond(store->xseg, req, store->portno, X_ALLOC)) == NoPort)
158                 ;
159         xseg_signal(store->xseg, p);
160         if (io->fdcacheidx >= 0) {
161                 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
162         }
163 }
164
165 static void pending(struct store *store, struct io *io)
166 {
167         io->req->state = XS_PENDING;
168 }
169
170 static void handle_unknown(struct store *store, struct io *io)
171 {
172         struct xseg *xseg = store->xseg;
173         struct xseg_request *req = io->req;
174         char *data = xseg_get_data(xseg, req);
175         snprintf(data, req->datalen, "unknown request op");
176         fail(store, io);
177 }
178
179 static inline void prepare_io(struct store *store, struct io *io)
180 {
181 }
182
183
184 static int dir_open(    struct store *store, struct io *io,
185                         char *target, uint32_t targetlen, int mode      )
186 {
187         int fd = -1;
188         struct fdcache_node *ce = NULL;
189         long i, lru;
190         uint64_t min;
191         io->fdcacheidx = -1;
192         if (targetlen> MAX_FILENAME_SIZE)
193                 goto out_err;
194
195 start:
196         /* check cache */
197         pthread_mutex_lock(&store->cache_lock);
198 start_locked:
199         lru = -1;
200         min = UINT64_MAX;
201         for (i = 0; i < store->maxfds; i++) {
202                 if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time 
203                                 && (store->fdcache[i].flags & READY)) {
204                         min = store->fdcache[i].time;
205                         lru = i;
206
207                 }
208                 if (!strncmp(store->fdcache[i].target, target, targetlen)) {
209                         if (store->fdcache[i].target[targetlen] == 0) {
210                                 ce = &store->fdcache[i];
211                                 /* if any other io thread is currently opening
212                                  * the file, block until it succeeds or fails
213                                  */
214                                 if (!(ce->flags & READY)) {
215                                         pthread_cond_wait(&ce->cond, &store->cache_lock);
216                                         /* when ready, restart lookup */
217                                         goto start_locked;
218                                 }
219                                 /* if successfully opened */
220                                 if (ce->fd > 0) {
221                                         fd = store->fdcache[i].fd;
222                                         io->fdcacheidx = i;
223                                         goto out;
224                                 }
225                                 /* else open failed for the other io thread, so
226                                  * it should fail for everyone waiting on this
227                                  * file.
228                                  */
229                                 else {
230                                         fd = -1;
231                                         io->fdcacheidx = -1;
232                                         goto out_err_unlock;
233                                 }
234                         }
235                 }
236         }
237         if (lru < 0){
238                 /* all cache entries are currently being used */
239                 pthread_mutex_unlock(&store->cache_lock);
240                 goto start;
241         }
242         if (store->fdcache[lru].ref){
243                 fd = -1;
244                 printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
245                 goto out_err_unlock;
246         }
247         /* make room for new file */
248         ce = &store->fdcache[lru];
249         /* set name here and state to not ready, for any other requests on the
250          * same target that may follow
251          */
252         strncpy(ce->target, target, targetlen);
253         ce->target[targetlen] = 0;
254         ce->flags &= ~READY;
255         pthread_mutex_unlock(&store->cache_lock);
256
257         if (ce->fd >0){
258                 if (close(ce->fd) < 0){
259                         perror("close");
260                 }
261         }
262         fd = openat(store->dirfd, ce->target, O_RDWR);  
263         if (fd < 0) {
264                 if (errno == ENOENT){
265                         fd = openat(store->dirfd, ce->target, 
266                                         O_RDWR | O_CREAT, 0600);
267                         if (fd >= 0)
268                                 goto new_entry;
269                 }
270                 perror(store->path);
271                 /* insert in cache a negative fd to indicate opening error to
272                  * any other ios waiting for the file to open
273                  */
274         }       
275         /* insert in cache */
276 new_entry:
277         pthread_mutex_lock(&store->cache_lock);
278         ce->fd = fd;
279         ce->ref = 0;
280         ce->flags = READY;
281         pthread_cond_broadcast(&ce->cond);
282         if (fd > 0) {
283                 io->fdcacheidx = lru;
284         }
285         else {
286                 io->fdcacheidx = -1;
287                 goto out_err_unlock;
288         }
289
290 out:
291         store->handled_reqs++;
292         ce->time = store->handled_reqs;
293         __sync_fetch_and_add(&ce->ref, 1);
294         pthread_mutex_unlock(&store->cache_lock);
295 out_err:
296         return fd;
297
298 out_err_unlock:
299         pthread_mutex_unlock(&store->cache_lock);
300         goto out_err;
301 }
302
303 static void handle_read_write(struct store *store, struct io *io)
304 {
305         int r, fd, mode;
306         struct xseg *xseg = store->xseg;
307         struct xseg_request *req = io->req;
308         char *target = xseg_get_target(xseg, req);
309         char *data = xseg_get_data(xseg, req);
310
311         if (req->op == X_WRITE)
312                 mode = 1;
313         else
314                 mode = 0;
315         fd = dir_open(store, io, target, req->targetlen, mode);
316         if (fd < 0){
317                 perror("dir_open");
318                 fail(store, io);
319                 return;
320         }
321
322         if (req != io->req)
323                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
324         if (!req->size) {
325                 if (req->flags & (XF_FLUSH | XF_FUA)) {
326                         /* No FLUSH/FUA support yet (O_SYNC ?).
327                          * note that with FLUSH/size == 0 
328                          * there will probably be a (uint64_t)-1 offset */
329                         complete(store, io);
330                         return;
331                 } else {
332                         complete(store, io);
333                         return;
334                 }
335         }
336
337
338         prepare_io(store, io);
339
340         switch (req->op) {
341         case X_READ:
342                 while (req->serviced < req->datalen) {
343                         r = pread(fd, data + req->serviced, 
344                                         req->datalen - req->serviced,
345                                         req->offset + req->serviced);
346                         if (r < 0) {
347                                 req->datalen = req->serviced;
348                                 perror("pread");
349                         }
350                         else if (r == 0) {
351                                 /* reached end of file. zero out the rest data buffer */
352                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
353                                 req->serviced = req->datalen;
354                         }
355                         else {
356                                 req->serviced += r;
357                         }
358                 }
359                 break;
360         case X_WRITE:
361                 while (req->serviced < req->datalen) {
362                         r = pwrite(fd, data + req->serviced, 
363                                         req->datalen - req->serviced,
364                                         req->offset + req->serviced);
365                         if (r < 0) {
366                                 req->datalen = req->serviced;
367                         }
368                         else if (r == 0) {
369                                 /* reached end of file. zero out the rest data buffer */
370                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
371                                 req->serviced = req->datalen;
372                         }
373                         else {
374                                 req->serviced += r;
375                         }
376                 }
377                 r = fsync(fd);
378                 if (r< 0) {
379                         perror("fsync");
380                         /* if fsync fails, then no bytes serviced correctly */
381                         req->serviced = 0;
382                 }
383                 break;
384         default:
385                 snprintf(data, req->datalen,
386                          "wtf, corrupt op %u?\n", req->op);
387                 fail(store, io);
388                 return;
389         }
390
391         if (req->serviced > 0 ) {
392                 complete(store, io);
393         }
394         else {
395                 strerror_r(errno, data, req->datalen);
396                 fail(store, io);
397         }
398         return;
399 }
400
401 static void handle_info(struct store *store, struct io *io)
402 {
403         struct xseg *xseg = store->xseg;
404         struct xseg_request *req = io->req;
405         char *target = xseg_get_target(xseg, req);
406         char *data = xseg_get_data(xseg, req);
407         struct stat stat;
408         int fd, r;
409         off_t size;
410
411         fd = dir_open(store, io, target, req->targetlen, 0);
412         if (fd < 0) {
413                 fail(store, io);
414                 return;
415         }
416         r = fstat(fd, &stat);
417         if (r < 0) {
418                 perror("fstat");
419                 fail(store, io);
420                 return;
421         }
422         size = stat.st_size;
423         *((off_t *) data) = size;
424         req->datalen = sizeof(size);
425
426         complete(store, io);
427 }
428
429 static void handle_copy(struct store *store, struct io *io)
430 {
431         struct xseg_request *req = io->req;
432         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(store->xseg, req);
433         struct stat st;
434         int n, src, dst;
435         char buf[XSEG_MAX_TARGETLEN+1];
436         char *target = xseg_get_target(store->xseg, req);
437
438         dst = dir_open(store, io, target, req->targetlen, 1);
439         if (dst < 0) {
440                 fprintf(stderr, "fail in dst\n");
441                 fail(store, io);
442                 return;
443         }
444
445         strncpy(buf, xcopy->target, xcopy->targetlen);
446         buf[xcopy->targetlen] = 0;
447         src = openat(store->dirfd, buf, O_RDWR);        
448         if (src < 0) {
449                 if (errno == ENOENT){
450                         src = openat(store->dirfd, buf, 
451                                         O_RDWR | O_CREAT, 0600);
452                         if (src < 0 ) {
453                                 fprintf(stderr, "fail in src\n");
454                                 fail(store, io);
455                                 return;
456                         }       
457                 } else {
458                         fprintf(stderr, "fail in src\n");
459                         fail(store, io);
460                         return;
461                 }
462         }
463
464         fstat(src, &st);
465         n = sendfile(dst, src, 0, st.st_size);
466         if (n != st.st_size) {
467                 fprintf(stderr, "fail in copy\n");
468                 fail(store, io);
469                 goto out;
470         }
471
472         if (n < 0) {
473                 fprintf(stderr, "fail in cp\n");
474                 fail(store, io);
475                 goto out;
476         }
477
478         complete(store, io);
479
480 out:
481         close(src);
482 }
483
484 static void handle_delete(struct store *store, struct io *io)
485 {
486         struct xseg_request *req = io->req;
487         int fd;
488         char *target = xseg_get_target(store->xseg, req);
489         
490         fd = dir_open(store, io, target, req->targetlen, 0);
491         if (fd < 0) {
492                 fprintf(stderr, "fail in dir_open\n");
493                 fail(store, io);
494                 return;
495         }
496
497         /* 'invalidate' cache entry */
498         if (io->fdcacheidx >= 0) {
499                 store->fdcache[io->fdcacheidx].fd = -1;
500         }
501
502         close(fd);
503         char buf[MAX_FILENAME_SIZE + 1];
504         strncpy(buf, target, req->targetlen);
505         buf[req->targetlen] = 0;
506         unlinkat(store->dirfd, buf, 0);
507
508         complete(store, io);
509
510         return;
511 }
512
513 static void dispatch(struct store *store, struct io *io)
514 {
515         if (verbose)
516                 printf("io: 0x%p, req: 0x%p, op %u\n",
517                         (void *)io, (void *)io->req, io->req->op);
518         switch (io->req->op) {
519         case X_READ:
520         case X_WRITE:
521                 handle_read_write(store, io); break;
522         case X_INFO:
523                 handle_info(store, io); break;
524         case X_DELETE:
525                 handle_delete(store, io); break;
526         case X_COPY:
527                 handle_copy(store, io); break;
528         case X_SYNC:
529         default:
530                 handle_unknown(store, io);
531         }
532 }
533
534 static void handle_accepted(struct store *store, struct io *io)
535 {
536         struct xseg_request *req = io->req;
537         req->serviced = 0;
538         req->state = XS_ACCEPTED;
539         io->retval = 0;
540         dispatch(store, io);
541 }
542
543 static struct io* wake_up_next_iothread(struct store *store)
544 {
545         struct io *io = alloc_io(store);
546
547         if (io){        
548                 pthread_mutex_lock(&io->lock);
549                 pthread_cond_signal(&io->cond);
550                 pthread_mutex_unlock(&io->lock);
551         }
552         return io;
553 }
554
555 void *io_loop(void *arg)
556 {
557         struct io *io = (struct io *) arg;
558         struct store *store = io->store;
559         struct xseg *xseg = store->xseg;
560         uint32_t portno = store->portno;
561         struct xseg_request *accepted;
562
563         for (;;) {
564                 accepted = NULL;
565                 accepted = xseg_accept(xseg, portno);
566                 if (accepted) {
567                         io->req = accepted;
568                         wake_up_next_iothread(store);
569                         handle_accepted(store, io);
570                 }
571                 else {
572                         pthread_mutex_lock(&io->lock);
573                         free_io(store, io);
574                         pthread_cond_wait(&io->cond, &io->lock);
575                         pthread_mutex_unlock(&io->lock);
576                 }
577         }
578
579         return NULL;
580 }
581
582 static struct xseg *join(char *spec)
583 {
584         struct xseg_config config;
585         struct xseg *xseg;
586
587         (void)xseg_parse_spec(spec, &config);
588         xseg = xseg_join(config.type, config.name, "posix", NULL);
589         if (xseg)
590                 return xseg;
591
592         fprintf(stderr, "Failed to join xseg, creating it...\n");
593         (void)xseg_create(&config);
594         return xseg_join(config.type, config.name, "posix", NULL);
595 }
596
597 static int filed_loop(struct store *store)
598 {
599         struct xseg *xseg = store->xseg;
600         uint32_t portno = store->portno;
601         struct io *io;
602
603         for (;;) {
604                 io = wake_up_next_iothread(store);
605                 xseg_prepare_wait(xseg, portno);
606                 xseg_wait_signal(xseg, 1000000UL);
607         }
608         return 0;
609 }
610
611 static int filed(       char *path, unsigned long size, uint32_t nr_ops,
612                         char *spec, long portno )
613 {
614         struct sigaction sa;
615         struct store *store;
616         int i;
617
618         store = malloc(sizeof(struct store));
619         if (!store) {
620                 perror("malloc");
621                 return -1;
622         }
623
624
625         /*
626         r = daemon(1, 1);
627         if (r < 0)
628                 return r;
629                 */
630
631         store->sigevent.sigev_notify = SIGEV_SIGNAL;
632         store->sigevent.sigev_signo = SIGIO;
633         sa.sa_sigaction = sigaction_handler;
634         sa.sa_flags = SA_SIGINFO;
635         if (sigemptyset(&sa.sa_mask))
636                 perror("sigemptyset");
637
638         if (sigaction(SIGIO, &sa, NULL)) {
639                 perror("sigaction");
640                 return -1;
641         }
642
643         store->nr_ops = nr_ops;
644         store->maxfds = 2 * nr_ops;
645
646         store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
647         if(!store->fdcache)
648                 goto malloc_fail;
649
650         store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
651         if(!store->free_bufs)
652                 goto malloc_fail;
653
654         store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
655         if(!store->iothread)
656                 goto malloc_fail;
657
658         store->ios = calloc(nr_ops, sizeof(struct io));
659         if (!store->ios) {
660 malloc_fail:
661                 perror("malloc");
662                 return -1;
663         }
664
665         for (i = 0; i < nr_ops; i++) {
666                 store->ios[i].store = store;
667                 pthread_cond_init(&store->ios[i].cond, NULL);
668                 pthread_mutex_init(&store->ios[i].lock, NULL);
669         }
670
671         xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
672
673         store->handled_reqs = 0;
674         strncpy(store->path, path, MAX_PATH_SIZE);
675         store->path[MAX_PATH_SIZE] = 0;
676
677         store->path_len = strlen(store->path);
678         if (store->path[store->path_len -1] != '/'){
679                 store->path[store->path_len] = '/';
680                 store->path[++store->path_len]= 0;
681         }
682         store->dirfd = open(store->path, O_RDWR);
683         if (!(store->dirfd < 0 && errno == EISDIR)){
684                 fprintf(stderr, "%s is not a directory\n", store->path);
685                 return -1;
686         }
687
688         store->dirfd = open(store->path, O_RDONLY);
689         if (store->dirfd < 0){
690                 perror("Directory open");
691                 return -1;
692         }
693 /*
694         mode = 1;
695         int fd = dir_open(store, ".__tmp", 6, 1);
696         if (fd < 0){
697                 perror("Directory check");
698                 return -1;
699         }
700 */
701         if (xseg_initialize()) {
702                 printf("cannot initialize library\n");
703                 return -1;
704         }
705         store->xseg = join(spec);
706         if (!store->xseg)
707                 return -1;
708
709         store->xport = xseg_bind_port(store->xseg, portno);
710         if (!store->xport) {
711                 printf("cannot bind to port %ld\n", portno);
712                 return -1;
713         }
714
715         store->portno = xseg_portno(store->xseg, store->xport);
716         printf("filed on port %u/%u\n",
717                 store->portno, store->xseg->config.nr_ports);
718
719         if (xseg_init_local_signal(store->xseg, store->portno) < 0){
720                 printf("cannot int local signals\n");
721                 return -1;
722         }
723
724         for (i = 0; i < nr_ops; i++) {
725                 pthread_cond_init(&store->fdcache[i].cond, NULL);
726                 store->fdcache[i].flags = READY;
727         }
728         for (i = 0; i < nr_ops; i++) {
729                 //TODO error check + cond variable to stop io from starting
730                 //unless all threads are created successfully
731                 pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
732         }
733         pthread_mutex_init(&store->cache_lock, NULL);
734         return filed_loop(store);
735 }
736
737 int main(int argc, char **argv)
738 {
739         char *path, *spec = "";
740         unsigned long size;
741         int i;
742         long portno;
743         uint32_t nr_ops;
744
745         if (argc < 2)
746                 return usage();
747
748         path = argv[1];
749         size = 0;
750         portno = -1;
751         nr_ops = 0;
752
753         for (i = 2; i < argc; i++) {
754                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
755                         spec = argv[i+1];
756                         i += 1;
757                         continue;
758                 }
759
760                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
761                         portno = strtoul(argv[i+1], NULL, 10);
762                         i += 1;
763                         continue;
764                 }
765
766                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
767                         nr_ops = strtoul(argv[i+1], NULL, 10);
768                         i += 1;
769                         continue;
770                 }
771                 if (!strcmp(argv[i], "-v")) {
772                         verbose = 1;
773                         continue;
774                 }
775         }
776
777         if (nr_ops <= 0)
778                 nr_ops = 16;
779
780         return filed(path, size, nr_ops, spec, portno);
781 }
782