f3f4dcf8c9f4cb16acc99ff24232f7073b5c2b7a
[archipelago] / xseg / peers / user / filed.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <aio.h>
11 #include <signal.h>
12 #include <limits.h>
13 #include <xseg/xseg.h>
14 #include <pthread.h>
15
16 #define MAX_PATH_SIZE 255
17 #define MAX_FILENAME_SIZE 255
18
19 static int usage(void)
20 {
21         printf("Usage: ./filed <path_to_directory> [options]\n"
22                 "Options: [-p portno]\n"
23                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
24                 "         [-n nr_parallel_ops]\n"
25                 "         [-v]\n");
26         return 1;
27 }
28
29 struct fsync_io {
30         unsigned long cacheidx;
31         int fd;
32         uint64_t time;
33 };
34
35 struct io {
36         struct store *store;
37         struct xseg_request *req;
38         ssize_t retval;
39         long fdcacheidx;
40         pthread_cond_t cond;
41         pthread_mutex_t lock;
42 };
43
44 #define READY (1 << 1)
45
46 struct fdcache_node {
47         volatile int fd;
48         volatile unsigned int ref;
49         volatile unsigned long time;
50         volatile unsigned int flags;
51         pthread_cond_t cond;
52         char target[MAX_FILENAME_SIZE + 1];
53 };
54
55 struct store {
56         struct xseg *xseg;
57         struct xseg_port *xport;
58         uint32_t portno;
59         uint64_t size;
60         struct io *ios;
61         struct xq free_ops;
62         char *free_bufs;
63         long nr_ops;
64         struct sigevent sigevent;
65         int dirfd;
66         uint32_t path_len;
67         uint64_t handled_reqs;
68         unsigned long maxfds;
69         struct fdcache_node *fdcache;
70         pthread_t *iothread;
71         pthread_mutex_t cache_lock;
72         char path[MAX_PATH_SIZE + 1];
73 };
74
75 static unsigned verbose;
76
77 static unsigned long sigaction_count;
78
79 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
80 {
81         sigaction_count++;
82 }
83
84 static void log_io(char *msg, struct io *io)
85 {
86         char target[64], data[64];
87         /* null terminate name in case of req->target is less than 63 characters,
88          * and next character after name (aka first byte of next buffer) is not
89          * null
90          */
91         unsigned int end = (io->req->targetlen> 63) ? 63 : io->req->targetlen;
92         strncpy(target, io->req->target, end);
93         target[end] = 0;
94         strncpy(data, io->req->data, 63);
95         data[63] = 0;
96
97         fprintf(stderr,
98                 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
99                 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
100                 msg,
101                 (unsigned int)io->fdcacheidx, //this is cacheidx not fd
102                 (unsigned int)io->req->op,
103                 (unsigned long long)io->req->offset,
104                 (unsigned long)io->req->size,
105                 (unsigned long)io->retval,
106                 (unsigned int)io->req->state,
107                 (unsigned int)io->req->targetlen, target,
108                 (unsigned long long)io->req->datalen, data);
109 }
110
111 static struct io *alloc_io(struct store *store)
112 {
113         xqindex idx = xq_pop_head(&store->free_ops);
114         if (idx == None)
115                 return NULL;
116         return store->ios + idx;
117 }
118
119 static inline void free_io(struct store *store, struct io *io)
120 {
121         xqindex idx = io - store->ios;
122         io->req = NULL;
123         xq_append_head(&store->free_ops, idx);
124 }
125
126
127 static void complete(struct store *store, struct io *io)
128 {
129         struct xseg_request *req = io->req;
130         req->state |= XS_SERVED;
131         if (verbose)
132                 log_io("complete", io);
133         xseg_respond(store->xseg, req->portno, req);
134         xseg_signal(store->xseg, req->portno);
135         __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
136 }
137
138 static void fail(struct store *store, struct io *io)
139 {
140         struct xseg_request *req = io->req;
141         req->state |= XS_FAILED;
142         if (verbose)
143                 log_io("fail", io);
144         xseg_respond(store->xseg, req->portno, req);
145         xseg_signal(store->xseg, req->portno);
146         if (io->fdcacheidx >= 0) {
147                 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
148         }
149 }
150
151 static void pending(struct store *store, struct io *io)
152 {
153         io->req->state = XS_PENDING;
154 }
155
156 static void handle_unknown(struct store *store, struct io *io)
157 {
158         struct xseg_request *req = io->req;
159         snprintf(req->data, req->datalen, "unknown request op");
160         fail(store, io);
161 }
162
163 static inline void prepare_io(struct store *store, struct io *io)
164 {
165 }
166
167
168 static int dir_open(    struct store *store, struct io *io,
169                         char *target, uint32_t targetlen, int mode      )
170 {
171         int fd = -1, r;
172         struct fdcache_node *ce = NULL;
173         long i, lru;
174         uint64_t min;
175         io->fdcacheidx = -1;
176         if (targetlen> MAX_FILENAME_SIZE)
177                 goto out_err;
178
179 start:
180         /* check cache */
181         pthread_mutex_lock(&store->cache_lock);
182 start_locked:
183         lru = -1;
184         min = UINT64_MAX;
185         for (i = 0; i < store->maxfds; i++) {
186                 if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time 
187                                 && (store->fdcache[i].flags & READY)) {
188                         min = store->fdcache[i].time;
189                         lru = i;
190
191                 }
192                 if (!strncmp(store->fdcache[i].target, target, targetlen)) {
193                         if (store->fdcache[i].target[targetlen] == 0) {
194                                 ce = &store->fdcache[i];
195                                 /* if any other io thread is currently opening
196                                  * the file, block until it succeeds or fails
197                                  */
198                                 if (!(ce->flags & READY)) {
199                                         pthread_cond_wait(&ce->cond, &store->cache_lock);
200                                         /* when ready, restart lookup */
201                                         goto start_locked;
202                                 }
203                                 /* if successfully opened */
204                                 if (ce->fd > 0) {
205                                         fd = store->fdcache[i].fd;
206                                         io->fdcacheidx = i;
207                                         goto out;
208                                 }
209                                 /* else open failed for the other io thread, so
210                                  * it should fail for everyone waiting on this
211                                  * file.
212                                  */
213                                 else {
214                                         fd = -1;
215                                         io->fdcacheidx = -1;
216                                         goto out_err_unlock;
217                                 }
218                         }
219                 }
220         }
221         if (lru < 0){
222                 /* all cache entries are currently being used */
223                 pthread_mutex_unlock(&store->cache_lock);
224                 goto start;
225         }
226         if (store->fdcache[lru].ref){
227                 fd = -1;
228                 printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
229                 goto out_err_unlock;
230         }
231         /* make room for new file */
232         ce = &store->fdcache[lru];
233         /* set name here and state to not ready, for any other requests on the
234          * same target that may follow
235          */
236         strncpy(ce->target, target, targetlen);
237         ce->target[targetlen] = 0;
238         ce->flags &= ~READY;
239         pthread_mutex_unlock(&store->cache_lock);
240
241         if (ce->fd >0){
242                 if (close(ce->fd) < 0){
243                         perror("close");
244                 }
245         }
246         fd = openat(store->dirfd, ce->target, O_RDWR);  
247         if (fd < 0) {
248                 if (errno == ENOENT){
249                         fd = openat(store->dirfd, ce->target, 
250                                         O_RDWR | O_CREAT, 0600);
251                         if (fd >= 0)
252                                 goto new_entry;
253                 }
254                 perror(store->path);
255                 /* insert in cache a negative fd to indicate opening error to
256                  * any other ios waiting for the file to open
257                  */
258         }       
259         /* insert in cache */
260 new_entry:
261         pthread_mutex_lock(&store->cache_lock);
262         ce->fd = fd;
263         ce->ref = 0;
264         ce->flags = READY;
265         pthread_cond_broadcast(&ce->cond);
266         if (fd > 0) {
267                 io->fdcacheidx = lru;
268         }
269         else {
270                 io->fdcacheidx = -1;
271                 goto out_err_unlock;
272         }
273
274 out:
275         store->handled_reqs++;
276         ce->time = store->handled_reqs;
277         __sync_fetch_and_add(&ce->ref, 1);
278         pthread_mutex_unlock(&store->cache_lock);
279 out_err:
280         return fd;
281
282 out_err_unlock:
283         pthread_mutex_unlock(&store->cache_lock);
284         goto out_err;
285 }
286
287 static void handle_read_write(struct store *store, struct io *io)
288 {
289         int r, fd, mode;
290         struct xseg_request *req = io->req;
291
292         if (req->op == X_WRITE)
293                 mode = 1;
294         else
295                 mode = 0;
296         fd = dir_open(store, io, req->target, req->targetlen, mode);
297         if (fd < 0){
298                 perror("dir_open");
299                 fail(store, io);
300                 return;
301         }
302
303         if (req != io->req)
304                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
305         if (!req->size) {
306                 if (req->flags & (XF_FLUSH | XF_FUA)) {
307                         /* No FLUSH/FUA support yet (O_SYNC ?).
308                          * note that with FLUSH/size == 0 
309                          * there will probably be a (uint64_t)-1 offset */
310                         complete(store, io);
311                         return;
312                 } else {
313                         complete(store, io);
314                         return;
315                 }
316         }
317
318
319         prepare_io(store, io);
320
321         switch (req->op) {
322         case X_READ:
323                 while (req->serviced < req->datalen) {
324                         r = pread(fd, req->data + req->serviced, 
325                                         req->datalen - req->serviced,
326                                         req->offset + req->serviced);
327                         if (r < 0) {
328                                 req->datalen = req->serviced;
329                                 perror("pread");
330                         }
331                         else if (r == 0) {
332                                 /* reached end of file. zero out the rest data buffer */
333                                 memset(req->data + req->serviced, 0, req->datalen - req->serviced);
334                                 req->serviced = req->datalen;
335                         }
336                         else {
337                                 req->serviced += r;
338                         }
339                 }
340                 break;
341         case X_WRITE:
342                 while (req->serviced < req->datalen) {
343                         r = pwrite(fd, req->data + req->serviced, 
344                                         req->datalen - req->serviced,
345                                         req->offset + req->serviced);
346                         if (r < 0) {
347                                 req->datalen = req->serviced;
348                         }
349                         else if (r == 0) {
350                                 /* reached end of file. zero out the rest data buffer */
351                                 memset(req->data + req->serviced, 0, req->datalen - req->serviced);
352                                 req->serviced = req->datalen;
353                         }
354                         else {
355                                 req->serviced += r;
356                         }
357                 }
358                 r = fsync(fd);
359                 if (r< 0) {
360                         perror("fsync");
361                         /* if fsync fails, then no bytes serviced correctly */
362                         req->serviced = 0;
363                 }
364                 break;
365         default:
366                 snprintf(req->data, req->datalen,
367                          "wtf, corrupt op %u?\n", req->op);
368                 fail(store, io);
369                 return;
370         }
371
372         if (req->serviced > 0 ) {
373                 complete(store, io);
374         }
375         else {
376                 strerror_r(errno, req->data, req->datalen);
377                 fail(store, io);
378         }
379         return;
380 }
381
382 static void handle_info(struct store *store, struct io *io)
383 {
384         struct xseg_request *req = io->req;
385         struct stat stat;
386         int fd, r;
387         off_t size;
388
389         fd = dir_open(store, io, req->target, req->targetlen, 0);
390         if (fd < 0) {
391                 fail(store, io);
392                 return;
393         }
394         r = fstat(fd, &stat);
395         if (r < 0) {
396                 perror("fstat");
397                 fail(store, io);
398                 return;
399         }
400         size = stat.st_size;
401         *((off_t *) req->data) = size;
402         req->datalen = sizeof(size);
403
404         complete(store, io);
405 }
406
407 static void dispatch(struct store *store, struct io *io)
408 {
409         if (verbose)
410                 printf("io: 0x%p, req: 0x%p, op %u\n",
411                         (void *)io, (void *)io->req, io->req->op);
412         switch (io->req->op) {
413         case X_READ:
414         case X_WRITE:
415                 handle_read_write(store, io); break;
416         case X_INFO:
417                 handle_info(store, io); break;
418         case X_SYNC:
419         default:
420                 handle_unknown(store, io);
421         }
422 }
423
424 static void handle_accepted(struct store *store, struct io *io)
425 {
426         struct xseg_request *req = io->req;
427         req->serviced = 0;
428         req->state = XS_ACCEPTED;
429         io->retval = 0;
430         dispatch(store, io);
431 }
432
433 static struct io* wake_up_next_iothread(struct store *store)
434 {
435         struct io *io = alloc_io(store);
436
437         if (io){        
438                 pthread_mutex_lock(&io->lock);
439                 pthread_cond_signal(&io->cond);
440                 pthread_mutex_unlock(&io->lock);
441         }
442         return io;
443 }
444
445 void *io_loop(void *arg)
446 {
447         struct io *io = (struct io *) arg;
448         struct store *store = io->store;
449         struct xseg *xseg = store->xseg;
450         uint32_t portno = store->portno;
451         struct xseg_request *accepted;
452
453         for (;;) {
454                 accepted = NULL;
455                 accepted = xseg_accept(xseg, portno);
456                 if (accepted) {
457                         io->req = accepted;
458                         wake_up_next_iothread(store);
459                         handle_accepted(store, io);
460                 }
461                 else {
462                         pthread_mutex_lock(&io->lock);
463                         free_io(store, io);
464                         pthread_cond_wait(&io->cond, &io->lock);
465                         pthread_mutex_unlock(&io->lock);
466                 }
467         }
468
469         return NULL;
470 }
471
472 static struct xseg *join(char *spec)
473 {
474         struct xseg_config config;
475         struct xseg *xseg;
476
477         (void)xseg_parse_spec(spec, &config);
478         xseg = xseg_join(config.type, config.name, "posix", NULL);
479         if (xseg)
480                 return xseg;
481
482         fprintf(stderr, "Failed to join xseg, creating it...\n");
483         (void)xseg_create(&config);
484         return xseg_join(config.type, config.name, "posix", NULL);
485 }
486
487 static int filed_loop(struct store *store)
488 {
489         struct xseg *xseg = store->xseg;
490         uint32_t portno = store->portno;
491         struct io *io;
492
493         for (;;) {
494                 io = wake_up_next_iothread(store);
495                 xseg_prepare_wait(xseg, portno);
496                 xseg_wait_signal(xseg, 1000000UL);
497         }
498         return 0;
499 }
500
501 static int filed(       char *path, unsigned long size, uint32_t nr_ops,
502                         char *spec, long portno )
503 {
504         struct stat stat;
505         struct sigaction sa;
506         struct store *store;
507         int r, mode, i;
508         void *status;
509
510         store = malloc(sizeof(struct store));
511         if (!store) {
512                 perror("malloc");
513                 return -1;
514         }
515
516
517         /*
518         r = daemon(1, 1);
519         if (r < 0)
520                 return r;
521                 */
522
523         store->sigevent.sigev_notify = SIGEV_SIGNAL;
524         store->sigevent.sigev_signo = SIGIO;
525         sa.sa_sigaction = sigaction_handler;
526         sa.sa_flags = SA_SIGINFO;
527         if (sigemptyset(&sa.sa_mask))
528                 perror("sigemptyset");
529
530         if (sigaction(SIGIO, &sa, NULL)) {
531                 perror("sigaction");
532                 return -1;
533         }
534
535         store->nr_ops = nr_ops;
536         store->maxfds = 2 * nr_ops;
537
538         store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
539         if(!store->fdcache)
540                 goto malloc_fail;
541
542         store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
543         if(!store->free_bufs)
544                 goto malloc_fail;
545
546         store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
547         if(!store->iothread)
548                 goto malloc_fail;
549
550         store->ios = calloc(nr_ops, sizeof(struct io));
551         if (!store->ios) {
552 malloc_fail:
553                 perror("malloc");
554                 return -1;
555         }
556
557         for (i = 0; i < nr_ops; i++) {
558                 store->ios[i].store = store;
559                 pthread_cond_init(&store->ios[i].cond, NULL);
560                 pthread_mutex_init(&store->ios[i].lock, NULL);
561         }
562
563         xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
564
565         store->handled_reqs = 0;
566         strncpy(store->path, path, MAX_PATH_SIZE);
567         store->path[MAX_PATH_SIZE] = 0;
568
569         store->path_len = strlen(store->path);
570         if (store->path[store->path_len -1] != '/'){
571                 store->path[store->path_len] = '/';
572                 store->path[++store->path_len]= 0;
573         }
574         store->dirfd = open(store->path, O_RDWR);
575         if (!(store->dirfd < 0 && errno == EISDIR)){
576                 fprintf(stderr, "%s is not a directory\n", store->path);
577                 return -1;
578         }
579
580         store->dirfd = open(store->path, O_RDONLY);
581         if (store->dirfd < 0){
582                 perror("Directory open");
583                 return -1;
584         }
585 /*
586         mode = 1;
587         int fd = dir_open(store, ".__tmp", 6, 1);
588         if (fd < 0){
589                 perror("Directory check");
590                 return -1;
591         }
592 */
593         if (xseg_initialize()) {
594                 printf("cannot initialize library\n");
595                 return -1;
596         }
597         store->xseg = join(spec);
598         if (!store->xseg)
599                 return -1;
600
601         store->xport = xseg_bind_port(store->xseg, portno);
602         if (!store->xport) {
603                 printf("cannot bind to port %ld\n", portno);
604                 return -1;
605         }
606
607         store->portno = xseg_portno(store->xseg, store->xport);
608         printf("filed on port %u/%u\n",
609                 store->portno, store->xseg->config.nr_ports);
610
611         for (i = 0; i < nr_ops; i++) {
612                 pthread_cond_init(&store->fdcache[i].cond, NULL);
613                 store->fdcache[i].flags = READY;
614         }
615         for (i = 0; i < nr_ops; i++) {
616                 //TODO error check + cond variable to stop io from starting
617                 //unless all threads are created successfully
618                 pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
619         }
620         pthread_mutex_init(&store->cache_lock, NULL);
621         return filed_loop(store);
622 }
623
624 int main(int argc, char **argv)
625 {
626         char *path, *spec = "";
627         unsigned long size;
628         int i;
629         long portno;
630         uint32_t nr_ops;
631
632         if (argc < 2)
633                 return usage();
634
635         path = argv[1];
636         size = 0;
637         portno = -1;
638         nr_ops = 0;
639
640         for (i = 2; i < argc; i++) {
641                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
642                         spec = argv[i+1];
643                         i += 1;
644                         continue;
645                 }
646
647                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
648                         portno = strtoul(argv[i+1], NULL, 10);
649                         i += 1;
650                         continue;
651                 }
652
653                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
654                         nr_ops = strtoul(argv[i+1], NULL, 10);
655                         i += 1;
656                         continue;
657                 }
658                 if (!strcmp(argv[i], "-v")) {
659                         verbose = 1;
660                         continue;
661                 }
662         }
663
664         if (nr_ops <= 0)
665                 nr_ops = 16;
666
667         return filed(path, size, nr_ops, spec, portno);
668 }
669