change open/close to acquire/release in mt-pfiled
[archipelago] / xseg / peers / user / pfiled.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 /*
36  * The Pithos File Blocker Peer (pfiled)
37  */
38
39 #define _GNU_SOURCE
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <unistd.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <aio.h>
49 #include <signal.h>
50 #include <limits.h>
51 #include <pthread.h>
52 #include <syscall.h>
53 #include <sys/sendfile.h>
54
55 #include <xseg/xseg.h>
56 #include <xseg/protocol.h>
57
58 #include "common.h"                     /* FIXME: */
59
60 #define MAX_PATH_SIZE           1024
61 #define MAX_FILENAME_SIZE       255
62
63 /* default concurrency level (number of threads) */
64 #define DEFAULT_NR_OPS           16
65
66 /* Pithos hash for the zero block
67  * FIXME: Should it be hardcoded?
68  */
69 #define ZERO_BLOCK \
70         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
71
72 /*
73  * Globals, holding command-line arguments
74  */
75 long cmdline_portno = -1;
76 char *cmdline_xseg_spec = NULL;
77 char *cmdline_path = NULL;
78 char *cmdline_vpath = NULL;
79 char *cmdline_pidfile = NULL;
80 int  cmdline_daemon = 0;
81 long cmdline_nr_ops = DEFAULT_NR_OPS;
82 long cmdline_verbose = 0;
83 volatile unsigned int terminated = 0;
84
85 static int usage(char *argv0)
86 {
87         fprintf(stderr,
88                         "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
89                         "where:\n"
90                         "\tPATH: path to pithos data blocks\n"
91                         "\tVPATH: path to modified volume blocks\n"
92                         "\tPORT: xseg port to listen for requests on\n"
93                         "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
94                         "request_size:extra_size:page_shift'\n"
95                         "\tNR_OPS: number of outstanding xseg requests\n"
96                         "\t-v: verbose mode\n",
97                         argv0);
98
99         return 1;
100 }
101
102 /* fdcache_node flags */
103 #define READY (1 << 1)
104
105 /* fdcache node info */
106 struct fdcache_node {
107         volatile int fd;
108         volatile unsigned int ref;
109         volatile unsigned long time;
110         volatile unsigned int flags;
111         pthread_cond_t cond;
112         char target[MAX_FILENAME_SIZE + 1];
113 };
114
115 /* pfiled context */
116 struct pfiled {
117         struct xseg *xseg;
118         struct xseg_port *xport;
119         uint32_t portno;
120         uint64_t size;
121         struct io *ios;
122         struct xq free_ops;
123         char *free_bufs;
124         long nr_ops;
125         struct sigevent sigevent;
126         uint32_t path_len;
127         uint32_t vpath_len;
128         uint64_t handled_reqs;
129         long maxfds;
130         struct fdcache_node *fdcache;
131         pthread_t *iothread;
132         pthread_mutex_t cache_lock;
133         char path[MAX_PATH_SIZE + 1];
134         char vpath[MAX_PATH_SIZE + 1];
135 };
136
137 /*
138  * pfiled specific structure 
139  * containing information on a pending I/O operation
140  */
141 struct io {
142         struct pfiled *pfiled;
143         struct xseg_request *req;
144         uint32_t state;
145         ssize_t retval;
146         long fdcacheidx;
147         pthread_cond_t cond;
148         pthread_mutex_t lock;
149 };
150
151
152 static inline int isTerminate()
153 {
154         /* ta doesn't need to be taken into account, because the main loops
155          * doesn't check the terminated flag if ta is not 0.
156          * 
157          * #ifdef ST_THREADS
158          * return (!ta & terminated);
159          * #else
160          * return terminated;
161          *  #endif
162          */
163         return terminated;
164 }
165
166 void signal_handler(int signal)
167 {      
168         terminated = 1;
169 }
170
171 static int setup_signals()
172 {      
173         int r;
174         struct sigaction sa;
175         sigemptyset(&sa.sa_mask);
176         sa.sa_flags = 0;
177         sa.sa_handler = signal_handler;
178         r = sigaction(SIGTERM, &sa, NULL);
179         if (r < 0)
180                 return r;
181         r = sigaction(SIGINT, &sa, NULL);
182         if (r < 0)
183                 return r;
184         r = sigaction(SIGQUIT, &sa, NULL);
185         if (r < 0)
186                 return r;
187         return r;
188 }
189
190
191 static unsigned long sigaction_count;
192
193 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
194 {
195         sigaction_count++;
196 }
197
198 static void log_io(char *msg, struct io *io)
199 {
200         char target[65], data[65];
201         /* null terminate name in case of req->target is less than 63 characters,
202          * and next character after name (aka first byte of next buffer) is not
203          * null
204          */
205         unsigned int end = (io->req->targetlen> 64) ? 64 : io->req->targetlen;
206         unsigned int dend = (io->req->datalen > 64) ? 64 : io->req->datalen;
207         char *req_target = xseg_get_target(io->pfiled->xseg, io->req);
208         char *req_data = xseg_get_data(io->pfiled->xseg, io->req);
209         strncpy(target, req_target, end);
210         target[end] = 0;
211         strncpy(data, req_data, 64);
212         data[dend] = 0;
213
214         fprintf(stderr,
215                         "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u, serviced: %lu\n"
216                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
217                         msg,
218                         (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
219                         (unsigned int)io->req->op,
220                         (unsigned long long)io->req->offset,
221                         (unsigned long)io->req->size,
222                         (unsigned long)io->retval,
223                         (unsigned int)io->req->state,
224                         (unsigned long)io->req->serviced,
225                         (unsigned int)io->req->targetlen, target,
226                         (unsigned long long)io->req->datalen, data);
227 }
228
229 static struct io *alloc_io(struct pfiled *pfiled)
230 {
231         xqindex idx = xq_pop_head(&pfiled->free_ops, 1);
232         if (idx == Noneidx)
233                 return NULL;
234         return pfiled->ios + idx;
235 }
236
237 static inline void free_io(struct pfiled *pfiled, struct io *io)
238 {
239         xqindex idx = io - pfiled->ios;
240         io->req = NULL;
241         xq_append_head(&pfiled->free_ops, idx, 1);
242 }
243
244 static void complete(struct pfiled *pfiled, struct io *io)
245 {
246         struct xseg_request *req = io->req;
247         req->state |= XS_SERVED;
248         if (cmdline_verbose)
249                 log_io("complete", io);
250         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
251         xseg_signal(pfiled->xseg, p);
252         __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
253 }
254
255 static void fail(struct pfiled *pfiled, struct io *io)
256 {
257         struct xseg_request *req = io->req;
258         req->state |= XS_FAILED;
259         if (cmdline_verbose)
260                 log_io("fail", io);
261         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
262         xseg_signal(pfiled->xseg, p);
263         if (io->fdcacheidx >= 0) {
264                 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
265         }
266 }
267
268 static void handle_unknown(struct pfiled *pfiled, struct io *io)
269 {
270         struct xseg_request *req = io->req;
271         char *data = xseg_get_data(pfiled->xseg, req);
272         snprintf(data, req->datalen, "unknown request op");
273         fail(pfiled, io);
274 }
275
276 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
277 {
278         int i;
279         struct stat st;
280         uint32_t pathlen = strlen(path);
281
282         strncpy(buf, path, pathlen);
283
284         for (i = 0; i < 9; i+= 3) {
285                 buf[pathlen + i] = target[i - (i/3)];
286                 buf[pathlen + i +1] = target[i + 1 - (i/3)];
287                 buf[pathlen + i + 2] = '/';
288                 if (mkdirs == 1) {
289                         buf[pathlen + i + 3] = '\0';
290                         if (stat(buf, &st) < 0) 
291                                 if (mkdir(buf, 0700) < 0) {
292                                         perror(buf);
293                                         return errno;
294                                 }
295                 }
296         }
297
298         strncpy(&buf[pathlen + 9], target, targetlen);
299         buf[pathlen + 9 + targetlen] = '\0';
300
301         return 0;
302 }
303
304 static int dir_open(struct pfiled *pfiled, struct io *io,
305                 char *target, uint32_t targetlen, int mode)
306 {
307         int fd = -1;
308         struct fdcache_node *ce = NULL;
309         long i, lru;
310         char tmp[pfiled->path_len + targetlen + 10];
311         uint64_t min;
312         io->fdcacheidx = -1;
313         if (targetlen> MAX_FILENAME_SIZE)
314                 goto out_err;
315
316 start:
317         /* check cache */
318         pthread_mutex_lock(&pfiled->cache_lock);
319 start_locked:
320         lru = -1;
321         min = UINT64_MAX;
322         for (i = 0; i < pfiled->maxfds; i++) {
323                 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
324                                 && (pfiled->fdcache[i].flags & READY)) {
325                         min = pfiled->fdcache[i].time;
326                         lru = i;
327
328                 }
329
330                 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
331                         if (pfiled->fdcache[i].target[targetlen] == 0) {
332                                 ce = &pfiled->fdcache[i];
333                                 /* if any other io thread is currently opening
334                                  * the file, block until it succeeds or fails
335                                  */
336                                 if (!(ce->flags & READY)) {
337                                         pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
338                                         /* when ready, restart lookup */
339                                         goto start_locked;
340                                 }
341                                 /* if successfully opened */
342                                 if (ce->fd > 0) {
343                                         fd = pfiled->fdcache[i].fd;
344                                         io->fdcacheidx = i;
345                                         goto out;
346                                 }
347                                 /* else open failed for the other io thread, so
348                                  * it should fail for everyone waiting on this
349                                  * file.
350                                  */
351                                 else {
352                                         fd = -1;
353                                         io->fdcacheidx = -1;
354                                         goto out_err_unlock;
355                                 }
356                         }
357                 }
358         }
359         if (lru < 0){
360                 /* all cache entries are currently being used */
361                 pthread_mutex_unlock(&pfiled->cache_lock);
362                 goto start;
363         }
364         if (pfiled->fdcache[lru].ref){
365                 fd = -1;
366                 printf("lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
367                 goto out_err_unlock;
368         }
369         /* make room for new file */
370         ce = &pfiled->fdcache[lru];
371         /* set name here and state to not ready, for any other requests on the
372          * same target that may follow
373          */
374         strncpy(ce->target, target, targetlen);
375         ce->target[targetlen] = 0;
376         ce->flags &= ~READY;
377         pthread_mutex_unlock(&pfiled->cache_lock);
378
379         if (ce->fd >0){
380                 if (close(ce->fd) < 0){
381                         perror("close");
382                 }
383         }
384
385         /* try opening it from pithos blocker dir */
386         if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
387                 fd = -1;
388                 goto new_entry;
389         }
390
391         fd = open(tmp, O_RDWR);
392         if (fd < 0) {
393                 /* try opening it from the tmp dir */
394                 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
395                         goto new_entry;
396
397                 fd = open(tmp, O_RDWR);
398                 if (fd < 0)  {
399                         if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
400                                 fd = -1;
401                                 goto new_entry;
402                         }
403
404                         fd = open(tmp, O_RDWR | O_CREAT, 0600);         
405                         if (fd < 0)
406                                 perror(tmp);
407                 }
408         }
409
410         /* insert in cache a negative fd to indicate opening error to
411          * any other ios waiting for the file to open
412          */
413
414         /* insert in cache */
415 new_entry:
416         pthread_mutex_lock(&pfiled->cache_lock);
417         ce->fd = fd;
418         ce->ref = 0;
419         ce->flags = READY;
420         pthread_cond_broadcast(&ce->cond);
421         if (fd > 0) {
422                 io->fdcacheidx = lru;
423         }
424         else {
425                 io->fdcacheidx = -1;
426                 goto out_err_unlock;
427         }
428
429 out:
430         pfiled->handled_reqs++;
431         ce->time = pfiled->handled_reqs;
432         __sync_fetch_and_add(&ce->ref, 1);
433         pthread_mutex_unlock(&pfiled->cache_lock);
434 out_err:
435         return fd;
436
437 out_err_unlock:
438         pthread_mutex_unlock(&pfiled->cache_lock);
439         goto out_err;
440 }
441
442 static void handle_read_write(struct pfiled *pfiled, struct io *io)
443 {
444         int r, fd;
445         struct xseg_request *req = io->req;
446         char *target = xseg_get_target(pfiled->xseg, req);
447         char *data = xseg_get_data(pfiled->xseg, req);
448
449         fd = dir_open(pfiled, io, target, req->targetlen, 0);
450         if (fd < 0){
451                 perror("dir_open");
452                 fail(pfiled, io);
453                 return;
454         }
455
456         if (req != io->req)
457                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
458         if (!req->size) {
459                 if (req->flags & (XF_FLUSH | XF_FUA)) {
460                         /* No FLUSH/FUA support yet (O_SYNC ?).
461                          * note that with FLUSH/size == 0 
462                          * there will probably be a (uint64_t)-1 offset */
463                         complete(pfiled, io);
464                         return;
465                 } else {
466                         complete(pfiled, io);
467                         return;
468                 }
469         }
470
471         switch (req->op) {
472                 case X_READ:
473                         while (req->serviced < req->datalen) {
474                                 r = pread(fd, data + req->serviced, 
475                                                 req->datalen - req->serviced,
476                                                 req->offset + req->serviced);
477                                 if (r < 0) {
478                                         req->datalen = req->serviced;
479                                         perror("pread");
480                                 }
481                                 else if (r == 0) {
482                                         /* reached end of file. zero out the rest data buffer */
483                                         memset(data + req->serviced, 0, req->datalen - req->serviced);
484                                         req->serviced = req->datalen;
485                                 }
486                                 else {
487                                         req->serviced += r;
488                                 }
489                         }
490                         break;
491                 case X_WRITE:
492                         while (req->serviced < req->datalen) {
493                                 r = pwrite(fd, data + req->serviced, 
494                                                 req->datalen - req->serviced,
495                                                 req->offset + req->serviced);
496                                 if (r < 0) {
497                                         req->datalen = req->serviced;
498                                 }
499                                 else if (r == 0) {
500                                         fprintf(stderr, "write returned 0\n");
501                                         memset(data + req->serviced, 0, req->datalen - req->serviced);
502                                         req->serviced = req->datalen;
503                                 }
504                                 else {
505                                         req->serviced += r;
506                                 }
507                         }
508                         r = fsync(fd);
509                         if (r< 0) {
510                                 perror("fsync");
511                                 /* if fsync fails, then no bytes serviced correctly */
512                                 req->serviced = 0;
513                         }
514                         break;
515                 default:
516                         snprintf(data, req->datalen,
517                                         "wtf, corrupt op %u?\n", req->op);
518                         fail(pfiled, io);
519                         return;
520         }
521
522         if (req->serviced > 0 ) {
523                 complete(pfiled, io);
524         }
525         else {
526                 strerror_r(errno, data, req->datalen);
527                 fail(pfiled, io);
528         }
529         return;
530 }
531
532 static void handle_info(struct pfiled *pfiled, struct io *io)
533 {
534         struct xseg_request *req = io->req;
535         struct stat stat;
536         int fd, r;
537         uint64_t size;
538         char *target = xseg_get_target(pfiled->xseg, req);
539         char *data = xseg_get_data(pfiled->xseg, req);
540         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
541
542         fd = dir_open(pfiled, io, target, req->targetlen, 0);
543         if (fd < 0) {
544                 fail(pfiled, io);
545                 return;
546         }
547
548         r = fstat(fd, &stat);
549         if (r < 0) {
550                 perror("fstat");
551                 fail(pfiled, io);
552                 return;
553         }
554
555         size = (uint64_t)stat.st_size;
556         xinfo->size = size;
557
558         complete(pfiled, io);
559 }
560
561 static void handle_copy(struct pfiled *pfiled, struct io *io)
562 {
563         struct xseg_request *req = io->req;
564         char *target = xseg_get_target(pfiled->xseg, req);
565         char *data = xseg_get_data(pfiled->xseg, req);
566         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
567         struct stat st;
568         //FIXME is 256 enough?
569         char *buf = malloc(256);
570         int n, src, dst;
571
572         dst = dir_open(pfiled, io, target, req->targetlen, 1);
573         if (dst < 0) {
574                 fprintf(stderr, "fail in dst\n");
575                 fail(pfiled, io);
576                 return;
577         }
578
579         if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0)  {
580                 fail(pfiled, io);
581                 return;
582         }
583
584         src = open(buf, O_RDWR);
585         if (src < 0) {
586                 XSEGLOG("fail in src %s\n", buf);
587                 perror("open src");
588                 fail(pfiled, io);
589                 return;
590         }
591
592         fstat(src, &st);
593         n = sendfile(dst, src, 0, st.st_size);
594         if (n != st.st_size) {
595                 fprintf(stderr, "fail in copy\n");
596                 fail(pfiled, io);
597                 goto out;
598         }
599
600         if (n < 0) {
601                 fprintf(stderr, "fail in cp\n");
602                 fail(pfiled, io);
603                 goto out;
604         }
605
606         complete(pfiled, io);
607
608 out:
609         close(src);
610 }
611
612 static void handle_delete(struct pfiled *pfiled, struct io *io)
613 {
614         struct xseg_request *req = io->req;
615         char *buf = malloc(255);
616         int fd;
617         char *target = xseg_get_target(pfiled->xseg, req);
618
619         fd = dir_open(pfiled, io, target, req->targetlen, 0);
620         if (fd < 0) {
621                 fprintf(stderr, "fail in dir_open\n");
622                 fail(pfiled, io);
623                 return;
624         }
625
626         /* 'invalidate' cache entry */
627         if (io->fdcacheidx >= 0) {
628                 pfiled->fdcache[io->fdcacheidx].fd = -1;
629         }
630
631         close(fd);
632
633         if (create_path(buf, pfiled->vpath, target, req->targetlen, 0) < 0) {
634                 fail(pfiled, io);
635                 return;
636         }
637         unlink(buf);
638
639         complete(pfiled, io);
640
641         return;
642 }
643
644 static void handle_open(struct pfiled *pfiled, struct io *io)
645 {
646         struct xseg_request *req = io->req;
647         char *buf = malloc(MAX_FILENAME_SIZE + strlen("_lock"));
648         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + strlen("_lock"));
649         int fd;
650         char *target = xseg_get_target(pfiled->xseg, req);
651
652         if (!buf || !pathname) {
653                 fail(pfiled, io);
654                 return;
655         }
656
657         strncpy(buf, target, req->targetlen);
658         strncpy(buf+req->targetlen, "_lock", strlen("_lock"));
659
660         if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen("_lock"), 1) < 0) {
661                 goto out_fail;
662         }
663
664         fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
665         if (fd < 0)
666                 goto out_fail;
667
668         close(fd);
669         free(buf);
670         free(pathname);
671         complete(pfiled, io);
672         return;
673
674 out_fail:
675         free(buf);
676         free(pathname);
677         fail(pfiled, io);
678         return;
679 }
680
681 static void handle_close(struct pfiled *pfiled, struct io *io)
682 {
683         struct xseg_request *req = io->req;
684         char *buf = malloc(MAX_FILENAME_SIZE + strlen("_lock"));
685         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + strlen("_lock"));
686         char *target = xseg_get_target(pfiled->xseg, req);
687
688         if (!buf || !pathname) {
689                 fail(pfiled, io);
690                 return;
691         }
692
693         strncpy(buf, target, req->targetlen);
694         strncpy(buf+req->targetlen, "_lock", strlen("_lock"));
695
696         if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen("_lock"), 1) < 0) {
697                 goto out_fail;
698         }
699         unlink(pathname);
700         free(buf);
701         free(pathname);
702         complete(pfiled, io);
703         return;
704
705 out_fail:
706         free(buf);
707         free(pathname);
708         fail(pfiled, io);
709         return;
710 }
711
712 static void dispatch(struct pfiled *pfiled, struct io *io)
713 {
714         if (cmdline_verbose) { 
715                 fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
716                                 (void *)io, (void *)io->req, io->req->op);
717         }
718
719         switch (io->req->op) {
720                 case X_READ:
721                 case X_WRITE:
722                         handle_read_write(pfiled, io); break;
723                 case X_INFO:
724                         handle_info(pfiled, io); break;
725                 case X_COPY:
726                         handle_copy(pfiled, io); break;
727                 case X_DELETE:
728                         handle_delete(pfiled, io); break;
729                 case X_OPEN:
730                         handle_open(pfiled, io); break;
731                 case X_CLOSE:
732                         handle_close(pfiled, io); break;
733                         //      case X_SNAPSHOT:
734                 case X_SYNC:
735                 default:
736                         handle_unknown(pfiled, io);
737         }
738 }
739
740 static void handle_accepted(struct pfiled *pfiled, struct io *io)
741 {
742         struct xseg_request *req = io->req;
743         req->serviced = 0;
744         io->state = XS_ACCEPTED;
745         io->retval = 0;
746         dispatch(pfiled, io);
747 }
748
749 static struct io* wake_up_next_iothread(struct pfiled *pfiled)
750 {
751         struct io *io = alloc_io(pfiled);
752
753         if (io){        
754                 pthread_mutex_lock(&io->lock);
755                 pthread_cond_signal(&io->cond);
756                 pthread_mutex_unlock(&io->lock);
757         }
758         return io;
759 }
760
761 void *io_loop(void *arg)
762 {
763         struct io *io = (struct io *) arg;
764         struct pfiled *pfiled = io->pfiled;
765         struct xseg *xseg = pfiled->xseg;
766         uint32_t portno = pfiled->portno;
767         struct xseg_request *accepted;
768
769         for (;;) {
770                 accepted = NULL;
771                 if (!isTerminate())
772                         accepted = xseg_accept(xseg, portno, 0);
773                 if (accepted) {
774                         io->req = accepted;
775                         wake_up_next_iothread(pfiled);
776                         handle_accepted(pfiled, io);
777                 }
778                 else {
779                         pthread_mutex_lock(&io->lock);
780                         free_io(pfiled, io);
781                         pthread_cond_wait(&io->cond, &io->lock);
782                         pthread_mutex_unlock(&io->lock);
783                 }
784         }
785
786         return NULL;
787 }
788
789 static struct xseg *join_or_create(char *spec)
790 {
791         struct xseg_config config;
792         struct xseg *xseg;
793
794         (void)xseg_parse_spec(spec, &config);
795         xseg = xseg_join(config.type, config.name, "posix", NULL);
796         if (xseg)
797                 return xseg;
798
799         (void)xseg_create(&config);
800         return xseg_join(config.type, config.name, "posix", NULL);
801 }
802
803 static int pfiled_loop(struct pfiled *pfiled)
804 {
805         struct xseg *xseg = pfiled->xseg;
806         uint32_t portno = pfiled->portno;
807         /* GCC + pthreads glitch? */
808         struct io *io;
809         (void)io;
810
811         for (;!(isTerminate() && xq_count(&pfiled->free_ops) == pfiled->nr_ops);) {
812                 io = wake_up_next_iothread(pfiled);
813                 xseg_prepare_wait(xseg, portno);
814                 xseg_wait_signal(xseg, 1000000UL);
815         }
816
817         return 0;
818 }
819
820 static int pfiled_init(struct pfiled *pfiled)
821 {
822         struct sigaction sa;
823         int ret;
824         int i;
825
826         pfiled->sigevent.sigev_notify = SIGEV_SIGNAL;
827         pfiled->sigevent.sigev_signo = SIGIO;
828         sa.sa_sigaction = sigaction_handler;
829         sa.sa_flags = SA_SIGINFO;
830
831         if ((ret = sigemptyset(&sa.sa_mask))) {
832                 perr(PE, 0, "[sigemptyset]");
833                 goto out;
834         }
835
836         if ((ret = sigaction(SIGIO, &sa, NULL))) {
837                 perr(PE, 0, "[sigaction]");
838                 /* FIXME: Since this is an init routine, if it fails the program will
839                  * exit and clean its own stuff (mem, sigs etc). We only have to cleanup
840                  * anything xseg-related
841                  */
842                 goto out;
843         }
844
845         pfiled->nr_ops = cmdline_nr_ops;
846         pfiled->maxfds = 2 * cmdline_nr_ops;
847
848         pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
849         if(!pfiled->fdcache) {
850                 ret = -ENOMEM;
851                 perr(PE, 0, "could not allocate memory [fdcache]");
852                 goto out;
853         }
854
855
856         pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
857         if(!pfiled->free_bufs) {
858                 ret = -ENOMEM;
859                 perr(PE, 0, "could not allocate memory [free_bufs]");
860                 goto out;
861         }
862
863         pfiled->iothread = calloc(pfiled->nr_ops, sizeof(pthread_t));
864         if(!pfiled->iothread) {
865                 ret = -ENOMEM;
866                 perr(PE, 0, "could not allocate memory [iothreads]");
867                 goto out;
868         }
869
870         pfiled->ios = calloc(pfiled->nr_ops, sizeof(struct io));
871         if (!pfiled->ios) {
872                 ret = -ENOMEM;
873                 perr(PE, 0, "could not allocate memory [ios]");
874                 goto out;
875         }
876
877         for (i = 0; i < pfiled->nr_ops; i++) {
878                 pfiled->ios[i].pfiled = pfiled;
879                 pthread_cond_init(&pfiled->ios[i].cond, NULL);
880                 pthread_mutex_init(&pfiled->ios[i].lock, NULL);
881         }
882
883         xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
884                         pfiled->free_bufs);
885
886         pfiled->handled_reqs = 0;
887
888         strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
889         pfiled->path[MAX_PATH_SIZE] = 0;
890
891         strncpy(pfiled->vpath, cmdline_vpath, MAX_PATH_SIZE);
892         pfiled->vpath[MAX_PATH_SIZE] = 0;
893
894         pfiled->path_len = strlen(pfiled->path);
895         if (pfiled->path[pfiled->path_len -1] != '/'){
896                 pfiled->path[pfiled->path_len] = '/';
897                 pfiled->path[++pfiled->path_len]= 0;
898         }
899
900         pfiled->vpath_len = strlen(pfiled->vpath);
901         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
902                 pfiled->vpath[pfiled->vpath_len] = '/';
903                 pfiled->vpath[++pfiled->vpath_len]= 0;
904         }
905
906         if (xseg_initialize()) {
907                 ret = - ENOMEM;
908                 perr(PE, 0, "could not initialize xseg library");
909                 goto out;
910         }
911
912         pfiled->xseg = join_or_create(cmdline_xseg_spec);
913         if (!pfiled->xseg) {
914                 ret = -EIO;
915                 perr(PE, 0, "could not join xseg with spec '%s'\n", 
916                                 cmdline_xseg_spec);
917                 goto out_with_xseginit;
918         }
919
920         pfiled->xport = xseg_bind_port(pfiled->xseg, cmdline_portno, NULL);
921         if (!pfiled->xport) {
922                 ret = -EIO;
923                 perr(PE, 0, "could not bind to xseg port %ld", cmdline_portno);
924                 goto out_with_xsegjoin;
925         }
926
927         pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
928         perr(PI, 0, "filed on port %u/%u\n",
929                         pfiled->portno, pfiled->xseg->config.nr_ports);
930
931         if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
932                 perr(PE, 0, "cannot int local signals\n");
933                 return -1;
934         }
935
936         for (i = 0; i < pfiled->nr_ops; i++) {
937                 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
938                 pfiled->fdcache[i].flags = READY;
939         }
940         for (i = 0; i < pfiled->nr_ops; i++) {
941                 /* 
942                  * TODO: error check + cond variable to stop io from starting
943                  * unless all threads are created successfully
944                  */
945                 pthread_create(pfiled->iothread + i, NULL, io_loop, (void *) (pfiled->ios + i));
946         }
947         pthread_mutex_init(&pfiled->cache_lock, NULL);
948
949         goto out;
950
951 out_with_xsegjoin:
952         xseg_leave(pfiled->xseg);
953 out_with_xseginit:
954         xseg_finalize();
955 out:
956         return ret;
957 }
958
959 static int safe_atoi(char *s)
960 {
961         long l;
962         char *endp;
963
964         l = strtol(s, &endp, 10);
965         if (s != endp && *endp == '\0')
966                 return l;
967         else
968                 return -1;
969 }
970
971 static void parse_cmdline(int argc, char **argv)
972 {
973         char *argv0 = argv[0];
974
975         for (;;) {
976                 int c;
977
978                 opterr = 0;
979                 c = getopt(argc, argv, "dhp:n:g:vf:");
980                 if (c == -1)
981                         break;
982
983                 switch(c) {
984                         case '?':
985                                 perr(PFE, 0, "Unknown option: -%c", optopt);
986                                 break;
987                         case ':':
988                                 perr(PFE, 0, "Option -%c requires an argument",
989                                                 optopt);
990                                 break;
991                         case 'h':
992                                 usage(argv0);
993                                 exit(0);
994                                 break;
995                         case 'p':
996                                 cmdline_portno = safe_atoi(optarg);
997                                 break;
998                         case 'n':
999                                 cmdline_nr_ops = safe_atoi(optarg);
1000                                 break;
1001                         case 'g':
1002                                 /* FIXME: Max length of spec? strdup, eww */
1003                                 cmdline_xseg_spec = strdup(optarg);
1004                                 if (!cmdline_xseg_spec)
1005                                         perr(PFE, 0, "out of memory");
1006                                 break;
1007                         case 'v':
1008                                 cmdline_verbose = 1;
1009                                 break;
1010                         case 'd':
1011                                 cmdline_daemon = 1;
1012                                 break;
1013                         case 'f':
1014                                 /* FIXME: Max length of spec? strdup, eww */
1015                                 cmdline_pidfile = strdup(optarg);
1016                                 if (!cmdline_pidfile)
1017                                         perr(PFE, 0, "out of memory");
1018                                 break;
1019                 }
1020         }
1021
1022         argc -= optind;
1023         argv += optind;
1024
1025         /* Sanity check for all arguments */
1026         if (cmdline_portno < 0) {
1027                 usage(argv0);
1028                 perr(PFE, 0, "no or invalid port specified");
1029         }
1030         if (cmdline_nr_ops < 1) {
1031                 usage(argv0);
1032                 perr(PFE, 0, "specified outstanding request count is invalid");
1033         }
1034         if (!cmdline_xseg_spec) {
1035                 usage(argv0);
1036                 perr(PFE, 0, "xseg specification is mandatory");
1037         }
1038
1039         if (argc < 2) {
1040                 usage(argv0);
1041                 perr(PFE, 0, "path and vpath specification is mandatory");
1042         }
1043
1044         cmdline_path = strdup(argv[0]);
1045         if (!cmdline_path)
1046                 perr(PFE, 0, "out of memory");
1047
1048         cmdline_vpath = strdup(argv[1]);
1049         if (!cmdline_vpath)
1050                 perr(PFE, 0, "out of memory");
1051 }
1052
1053 int pidfile_remove(char *path, int fd)
1054 {       
1055         close(fd);
1056         return (unlink(path));
1057 }
1058
1059 int pidfile_write(int pid_fd)
1060 {       
1061         char buf[16];
1062         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
1063         buf[15] = 0;
1064
1065         lseek(pid_fd, 0, SEEK_SET);
1066         int ret = write(pid_fd, buf, strlen(buf));
1067         return ret;
1068 }
1069
1070 int pidfile_read(char *path, pid_t *pid)
1071 {       
1072         char buf[16], *endptr;
1073         *pid = 0;
1074
1075         int fd = open(path, O_RDONLY);
1076         if (fd < 0)
1077                 return -1;
1078         int ret = read(fd, buf, 15);
1079         buf[15]=0;
1080         close(fd);
1081         if (ret < 0)
1082                 return -1;
1083         else{   
1084                 *pid = strtol(buf, &endptr, 10);
1085                 if (endptr != &buf[ret]){
1086                         *pid = 0;
1087                         return -1;
1088                 }
1089         }
1090         return 0;
1091 }
1092
1093 int pidfile_open(char *path, pid_t *old_pid)
1094 {       
1095         //nfs version > 3
1096         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
1097         if (fd < 0){
1098                 if (errno == -EEXIST)
1099                         pidfile_read(path, old_pid);
1100         }
1101         return fd;
1102 }                
1103
1104 int main(int argc, char **argv)
1105 {
1106         int pid_fd = -1, r = 0;
1107         pid_t old_pid;
1108         struct pfiled pfiled;
1109
1110         init_perr("pfiled");
1111         parse_cmdline(argc, argv);
1112
1113         perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
1114         
1115         if (cmdline_pidfile){
1116                 pid_fd = pidfile_open(cmdline_pidfile, &old_pid);
1117                 if (pid_fd < 0) {
1118                         if (old_pid) {
1119                                 perr(PFE, 0, "Daemon already running, pid: %d.", old_pid);
1120                         } else {
1121                                 perr(PFE, 0, "Cannot open or create pidfile");
1122                         }
1123                         return -1;
1124                 }
1125         }
1126
1127         if (cmdline_daemon){
1128                 if (daemon(0, 1) < 0){
1129                         perr(PFE, 0, "Cannot daemonize");
1130                         r = -1;
1131                         goto out;
1132                 }
1133         }
1134         setup_signals();
1135         if (pid_fd > 0)
1136                 pidfile_write(pid_fd);
1137
1138
1139         if (pfiled_init(&pfiled) < 0){
1140                 r = -1;
1141                 goto out;
1142         }
1143
1144         r = pfiled_loop(&pfiled);
1145 out:
1146         if (pid_fd > 0)
1147                 pidfile_remove(cmdline_pidfile, pid_fd);
1148         return r;
1149
1150 }