blockd: Make request logging optional
[archipelago] / xseg / peers / filed.c
1 #define _GNU_SOURCE
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <aio.h>
11 #include <signal.h>
12 #include <limits.h>
13 #include <xseg/xseg.h>
14 #include <pthread.h>
15
16 #define MAX_PATH_SIZE 255
17 #define MAX_FILENAME_SIZE 255
18
19 static int usage(void)
20 {
21         printf("Usage: ./filed <path_to_directory> [options]\n"
22                 "Options: [-p portno]\n"
23                 "         [-g type:name:nr_ports:nr_requests:request_size:extra_size:page_shift]\n"
24                 "         [-n nr_parallel_ops]\n");
25         return 1;
26 }
27
28 struct fsync_io {
29         unsigned long cacheidx;
30         int fd;
31         uint64_t time;
32 };
33
34 struct io {
35         struct store *store;
36         struct xseg_request *req;
37         ssize_t retval;
38         long fdcacheidx;
39         pthread_cond_t cond;
40         pthread_mutex_t lock;
41 };
42
43 #define READY (1 << 1)
44
45 struct fdcache_node {
46         volatile int fd;
47         volatile unsigned int ref;
48         volatile unsigned long time;
49         volatile unsigned int flags;
50         pthread_cond_t cond;
51         char name[MAX_FILENAME_SIZE + 1];
52 };
53
54 struct store {
55         struct xseg *xseg;
56         struct xseg_port *xport;
57         uint32_t portno;
58         uint64_t size;
59         struct io *ios;
60         struct xq free_ops;
61         char *free_bufs;
62         long nr_ops;
63         struct sigevent sigevent;
64         int dirfd;
65         uint32_t path_len;
66         uint64_t handled_reqs;
67         unsigned long maxfds;
68         struct fdcache_node *fdcache;
69         pthread_t *iothread;
70         pthread_mutex_t cache_lock;
71         char path[MAX_PATH_SIZE + 1];
72 };
73
74 static unsigned long sigaction_count;
75
76 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
77 {
78         sigaction_count++;
79 }
80
81 static void log_io(char *msg, struct io *io)
82 {
83         char name[64], data[64];
84         /* null terminate name in case of req->name is less than 63 characters,
85          * and next character after name (aka first byte of next buffer) is not
86          * null
87          */
88         unsigned int end = (io->req->namesize > 63) ? 63 : io->req->namesize;
89         strncpy(name, io->req->name, end);
90         name[end] = 0;
91         strncpy(data, io->req->data, 63);
92         data[63] = 0;
93 #if 0
94 vkoukis debug off
95         fprintf(stderr,
96                 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
97                 "name[%u]: '%s', data[%llu]:\n%s------------------\n\n",
98                 msg,
99                 (unsigned int)io->fdcacheidx, //this is cacheidx not fd
100                 (unsigned int)io->req->op,
101                 (unsigned long long)io->req->offset,
102                 (unsigned long)io->req->size,
103                 (unsigned long)io->retval,
104                 (unsigned int)io->req->state,
105                 (unsigned int)io->req->namesize, name,
106                 (unsigned long long)io->req->datasize, data);
107 #endif
108 }
109
110 static struct io *alloc_io(struct store *store)
111 {
112         xqindex idx = xq_pop_head(&store->free_ops);
113         if (idx == None)
114                 return NULL;
115         return store->ios + idx;
116 }
117
118 static inline void free_io(struct store *store, struct io *io)
119 {
120         xqindex idx = io - store->ios;
121         io->req = NULL;
122         xq_append_head(&store->free_ops, idx);
123 }
124
125
126 static void complete(struct store *store, struct io *io)
127 {
128         struct xseg_request *req = io->req;
129         req->state |= XS_SERVED;
130         log_io("complete", io);
131         xseg_respond(store->xseg, req->portno, req);
132         xseg_signal(store->xseg, req->portno);
133         __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
134 }
135
136 static void fail(struct store *store, struct io *io)
137 {
138         struct xseg_request *req = io->req;
139         req->state |= XS_FAILED;
140         log_io("fail", io);
141         xseg_respond(store->xseg, req->portno, req);
142         xseg_signal(store->xseg, req->portno);
143         if (io->fdcacheidx >= 0) {
144                 __sync_fetch_and_sub(&store->fdcache[io->fdcacheidx].ref, 1);
145         }
146 }
147
148 static void pending(struct store *store, struct io *io)
149 {
150         io->req->state = XS_PENDING;
151 }
152
153 static void handle_unknown(struct store *store, struct io *io)
154 {
155         struct xseg_request *req = io->req;
156         snprintf(req->data, req->datasize, "unknown request op");
157         fail(store, io);
158 }
159
160 static inline void prepare_io(struct store *store, struct io *io)
161 {
162 }
163
164
165 static int dir_open(    struct store *store, struct io *io,
166                         char *name, uint32_t namesize, int mode )
167 {
168         int fd = -1, r;
169         struct fdcache_node *ce = NULL;
170         long i, lru;
171         uint64_t min;
172         io->fdcacheidx = -1;
173         if (namesize > MAX_FILENAME_SIZE)
174                 goto out_err;
175
176 start:
177         /* check cache */
178         pthread_mutex_lock(&store->cache_lock);
179 start_locked:
180         lru = -1;
181         min = UINT64_MAX;
182         for (i = 0; i < store->maxfds; i++) {
183                 if (store->fdcache[i].ref == 0 && min > store->fdcache[i].time 
184                                 && (store->fdcache[i].flags & READY)) {
185                         min = store->fdcache[i].time;
186                         lru = i;
187
188                 }
189                 if (!strncmp(store->fdcache[i].name, name, namesize)) {
190                         if (store->fdcache[i].name[namesize] == 0) {
191                                 ce = &store->fdcache[i];
192                                 /* if any other io thread is currently opening
193                                  * the file, block until it succeeds or fails
194                                  */
195                                 if (!(ce->flags & READY)) {
196                                         pthread_cond_wait(&ce->cond, &store->cache_lock);
197                                         /* when ready, restart lookup */
198                                         goto start_locked;
199                                 }
200                                 /* if successfully opened */
201                                 if (ce->fd > 0) {
202                                         fd = store->fdcache[i].fd;
203                                         io->fdcacheidx = i;
204                                         goto out;
205                                 }
206                                 /* else open failed for the other io thread, so
207                                  * it should fail for everyone waiting on this
208                                  * file.
209                                  */
210                                 else {
211                                         fd = -1;
212                                         io->fdcacheidx = -1;
213                                         goto out_err_unlock;
214                                 }
215                         }
216                 }
217         }
218         if (lru < 0){
219                 /* all cache entries are currently being used */
220                 pthread_mutex_unlock(&store->cache_lock);
221                 goto start;
222         }
223         if (store->fdcache[lru].ref){
224                 fd = -1;
225                 printf("lru(%ld) ref not 0 (%u)\n", lru, store->fdcache[lru].ref);
226                 goto out_err_unlock;
227         }
228         /* make room for new file */
229         ce = &store->fdcache[lru];
230         /* set name here and state to not ready, for any other requests on the
231          * same target that may follow
232          */
233         strncpy(ce->name, name, namesize);
234         ce->name[namesize] = 0;
235         ce->flags &= ~READY;
236         pthread_mutex_unlock(&store->cache_lock);
237
238         if (ce->fd >0){
239                 if (close(ce->fd) < 0){
240                         perror("close");
241                 }
242         }
243         fd = openat(store->dirfd, ce->name, O_RDWR);    
244         if (fd < 0) {
245                 if (errno == ENOENT){
246                         fd = openat(store->dirfd, ce->name, 
247                                         O_RDWR | O_CREAT, 0600);
248                         if (fd >= 0)
249                                 goto new_entry;
250                 }
251                 perror(store->path);
252                 /* insert in cache a negative fd to indicate opening error to
253                  * any other ios waiting for the file to open
254                  */
255         }       
256         /* insert in cache */
257 new_entry:
258         pthread_mutex_lock(&store->cache_lock);
259         ce->fd = fd;
260         ce->ref = 0;
261         ce->flags = READY;
262         pthread_cond_broadcast(&ce->cond);
263         if (fd > 0) {
264                 io->fdcacheidx = lru;
265         }
266         else {
267                 io->fdcacheidx = -1;
268                 goto out_err_unlock;
269         }
270
271 out:
272         store->handled_reqs++;
273         ce->time = store->handled_reqs;
274         __sync_fetch_and_add(&ce->ref, 1);
275         pthread_mutex_unlock(&store->cache_lock);
276 out_err:
277         return fd;
278
279 out_err_unlock:
280         pthread_mutex_unlock(&store->cache_lock);
281         goto out_err;
282 }
283
284 static void handle_read_write(struct store *store, struct io *io)
285 {
286         int r, fd, mode;
287         struct xseg_request *req = io->req;
288
289         if (req->op == X_WRITE)
290                 mode = 1;
291         else
292                 mode = 0;
293         fd = dir_open(store, io, req->name, req->namesize, mode);
294         if (fd < 0){
295                 perror("dir_open");
296                 fail(store, io);
297                 return;
298         }
299
300         if (req != io->req)
301                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
302         if (!req->size) {
303                 if (req->flags & (XF_FLUSH | XF_FUA)) {
304                         /* No FLUSH/FUA support yet (O_SYNC ?).
305                          * note that with FLUSH/size == 0 
306                          * there will probably be a (uint64_t)-1 offset */
307                         complete(store, io);
308                         return;
309                 } else {
310                         complete(store, io);
311                         return;
312                 }
313         }
314
315
316         prepare_io(store, io);
317
318         switch (req->op) {
319         case X_READ:
320                 while (req->serviced < req->datasize) {
321                         r = pread(fd, req->data + req->serviced, 
322                                         req->datasize - req->serviced,
323                                         req->offset + req->serviced);
324                         if (r < 0) {
325                                 req->datasize = req->serviced;
326                                 perror("pread");
327                         }
328                         else if (r == 0) {
329                                 /* reached end of file. zero out the rest data buffer */
330                                 memset(req->data + req->serviced, 0, req->datasize - req->serviced);
331                                 req->serviced = req->datasize;
332                         }
333                         else {
334                                 req->serviced += r;
335                         }
336                 }
337                 break;
338         case X_WRITE:
339                 while (req->serviced < req->datasize) {
340                         r = pwrite(fd, req->data + req->serviced, 
341                                         req->datasize - req->serviced,
342                                         req->offset + req->serviced);
343                         if (r < 0) {
344                                 req->datasize = req->serviced;
345                         }
346                         else if (r == 0) {
347                                 /* reached end of file. zero out the rest data buffer */
348                                 memset(req->data + req->serviced, 0, req->datasize - req->serviced);
349                                 req->serviced = req->datasize;
350                         }
351                         else {
352                                 req->serviced += r;
353                         }
354                 }
355                 r = fsync(fd);
356                 if (r< 0) {
357                         perror("fsync");
358                         /* if fsync fails, then no bytes serviced correctly */
359                         req->serviced = 0;
360                 }
361                 break;
362         default:
363                 snprintf(req->data, req->datasize,
364                          "wtf, corrupt op %u?\n", req->op);
365                 fail(store, io);
366                 return;
367         }
368
369         if (req->serviced > 0 ) {
370                 complete(store, io);
371         }
372         else {
373                 strerror_r(errno, req->data, req->datasize);
374                 fail(store, io);
375         }
376         return;
377 }
378
379 static void handle_info(struct store *store, struct io *io)
380 {
381         struct xseg_request *req = io->req;
382         struct stat stat;
383         int fd, r;
384         off_t size;
385
386         fd = dir_open(store, io, req->name, req->namesize, 0);
387         if (fd < 0) {
388                 fail(store, io);
389                 return;
390         }
391         r = fstat(fd, &stat);
392         if (r < 0) {
393                 perror("fstat");
394                 fail(store, io);
395                 return;
396         }
397         size = stat.st_size;
398         *((off_t *) req->data) = size;
399         req->datasize = sizeof(size);
400
401         complete(store, io);
402 }
403
404 static void dispatch(struct store *store, struct io *io)
405 {
406         printf("io: 0x%p, req: 0x%p, op %u\n",
407                 (void *)io, (void *)io->req, io->req->op);
408         switch (io->req->op) {
409         case X_READ:
410         case X_WRITE:
411                 handle_read_write(store, io); break;
412         case X_INFO:
413                 handle_info(store, io); break;
414         case X_SYNC:
415         default:
416                 handle_unknown(store, io);
417         }
418 }
419
420 static void handle_accepted(struct store *store, struct io *io)
421 {
422         struct xseg_request *req = io->req;
423         req->serviced = 0;
424         req->state = XS_ACCEPTED;
425         io->retval = 0;
426         dispatch(store, io);
427 }
428
429 static struct io* wake_up_next_iothread(struct store *store)
430 {
431         struct io *io = alloc_io(store);
432         if (io){        
433                 pthread_cond_signal(&io->cond);
434         }
435         return io;
436 }
437
438 void *io_loop(void *arg)
439 {
440         struct io *io = (struct io *) arg;
441         struct store *store = io->store;
442         struct xseg *xseg = store->xseg;
443         uint32_t portno = store->portno;
444         struct xseg_request *accepted;
445
446         for (;;) {
447                 accepted = NULL;
448                 accepted = xseg_accept(xseg, portno);
449                 if (accepted) {
450                         io->req = accepted;
451                         wake_up_next_iothread(store);
452                         handle_accepted(store, io);
453                 }
454                 else {
455                         free_io(store, io);
456                         pthread_mutex_lock(&io->lock);
457                         pthread_cond_wait(&io->cond, &io->lock);
458                         pthread_mutex_unlock(&io->lock);
459                 }
460         }
461
462         return NULL;
463 }
464
465 static struct xseg *join(char *spec)
466 {
467         struct xseg_config config;
468         struct xseg *xseg;
469
470         (void)xseg_parse_spec(spec, &config);
471         xseg = xseg_join(config.type, config.name);
472         if (xseg)
473                 return xseg;
474
475         fprintf(stderr, "Failed to join xseg, creating it...\n");
476         (void)xseg_create(&config);
477         return xseg_join(config.type, config.name);
478 }
479
480 static int filed_loop(struct store *store)
481 {
482         struct xseg *xseg = store->xseg;
483         uint32_t portno = store->portno;
484         struct io *io;
485
486         for (;;) {
487                 io = wake_up_next_iothread(store);
488                 xseg_prepare_wait(xseg, portno);
489                 xseg_wait_signal(xseg, portno, 10000);
490         }
491         return 0;
492 }
493
494 static int filed(       char *path, unsigned long size, uint32_t nr_ops,
495                         char *spec, long portno )
496 {
497         struct stat stat;
498         struct sigaction sa;
499         struct store *store;
500         int r, mode, i;
501         void *status;
502
503         store = malloc(sizeof(struct store));
504         if (!store) {
505                 perror("malloc");
506                 return -1;
507         }
508
509
510         /*
511         r = daemon(1, 1);
512         if (r < 0)
513                 return r;
514                 */
515
516         store->sigevent.sigev_notify = SIGEV_SIGNAL;
517         store->sigevent.sigev_signo = SIGIO;
518         sa.sa_sigaction = sigaction_handler;
519         sa.sa_flags = SA_SIGINFO;
520         if (sigemptyset(&sa.sa_mask))
521                 perror("sigemptyset");
522
523         if (sigaction(SIGIO, &sa, NULL)) {
524                 perror("sigaction");
525                 return -1;
526         }
527
528         store->nr_ops = nr_ops;
529         store->maxfds = 2 * nr_ops;
530
531         store->fdcache = calloc(store->maxfds, sizeof(struct fdcache_node));
532         if(!store->fdcache)
533                 goto malloc_fail;
534
535         store->free_bufs = calloc(store->nr_ops, sizeof(xqindex));
536         if(!store->free_bufs)
537                 goto malloc_fail;
538
539         store->iothread = calloc(store->nr_ops, sizeof(pthread_t));
540         if(!store->iothread)
541                 goto malloc_fail;
542
543         store->ios = calloc(nr_ops, sizeof(struct io));
544         if (!store->ios) {
545 malloc_fail:
546                 perror("malloc");
547                 return -1;
548         }
549
550         for (i = 0; i < nr_ops; i++) {
551                 store->ios[i].store = store;
552                 pthread_cond_init(&store->ios[i].cond, NULL);
553                 pthread_mutex_init(&store->ios[i].lock, NULL);
554         }
555
556         xq_init_seq(&store->free_ops, store->nr_ops, store->nr_ops, store->free_bufs);
557
558         store->handled_reqs = 0;
559         strncpy(store->path, path, MAX_PATH_SIZE);
560         store->path[MAX_PATH_SIZE] = 0;
561
562         store->path_len = strlen(store->path);
563         if (store->path[store->path_len -1] != '/'){
564                 store->path[store->path_len] = '/';
565                 store->path[++store->path_len]= 0;
566         }
567         store->dirfd = open(store->path, O_RDWR);
568         if (!(store->dirfd < 0 && errno == EISDIR)){
569                 fprintf(stderr, "%s is not a directory\n", store->path);
570                 return -1;
571         }
572
573         store->dirfd = open(store->path, O_RDONLY);
574         if (store->dirfd < 0){
575                 perror("Directory open");
576                 return -1;
577         }
578 /*
579         mode = 1;
580         int fd = dir_open(store, ".__tmp", 6, 1);
581         if (fd < 0){
582                 perror("Directory check");
583                 return -1;
584         }
585 */
586         if (xseg_initialize("posix")) {
587                 printf("cannot initialize library\n");
588                 return -1;
589         }
590         store->xseg = join(spec);
591         if (!store->xseg)
592                 return -1;
593
594         store->xport = xseg_bind_port(store->xseg, portno);
595         if (!store->xport) {
596                 printf("cannot bind to port %ld\n", portno);
597                 return -1;
598         }
599
600         store->portno = xseg_portno(store->xseg, store->xport);
601         printf("filed on port %u/%u\n",
602                 store->portno, store->xseg->config.nr_ports);
603
604         for (i = 0; i < nr_ops; i++) {
605                 pthread_cond_init(&store->fdcache[i].cond, NULL);
606                 store->fdcache[i].flags = READY;
607         }
608         for (i = 0; i < nr_ops; i++) {
609                 //TODO error check + cond variable to stop io from starting
610                 //unless all threads are created successfully
611                 pthread_create(store->iothread + i, NULL, io_loop, (void *) (store->ios + i));
612         }
613         pthread_mutex_init(&store->cache_lock, NULL);
614         return filed_loop(store);
615 }
616
617 int main(int argc, char **argv)
618 {
619         char *path, *spec = "";
620         unsigned long size;
621         int i;
622         long portno;
623         uint32_t nr_ops;
624
625         if (argc < 2)
626                 return usage();
627
628         path = argv[1];
629         size = 0;
630         portno = -1;
631         nr_ops = 0;
632
633         for (i = 2; i < argc; i++) {
634                 if (!strcmp(argv[i], "-g") && i + 1 < argc) {
635                         spec = argv[i+1];
636                         i += 1;
637                         continue;
638                 }
639
640                 if (!strcmp(argv[i], "-p") && i + 1 < argc) {
641                         portno = strtoul(argv[i+1], NULL, 10);
642                         i += 1;
643                         continue;
644                 }
645
646                 if (!strcmp(argv[i], "-n") && i + 1 < argc) {
647                         nr_ops = strtoul(argv[i+1], NULL, 10);
648                         i += 1;
649                         continue;
650                 }
651         }
652
653         if (nr_ops <= 0)
654                 nr_ops = 16;
655
656         return filed(path, size, nr_ops, spec, portno);
657 }
658