add obj handlers output in xseg reportall
[archipelago] / xseg / peers / user / pfiled.c
1 /*
2  * The Pithos File Blocker Peer (pfiled)
3  */
4
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <aio.h>
15 #include <signal.h>
16 #include <limits.h>
17 #include <pthread.h>
18 #include <syscall.h>
19 #include <sys/sendfile.h>
20
21 #include <xseg/xseg.h>
22 #include <xseg/protocol.h>
23
24 #include "common.h"                     /* FIXME: */
25
26 #define MAX_PATH_SIZE           1024
27 #define MAX_FILENAME_SIZE       255
28
29 /* default concurrency level (number of threads) */
30 #define DEFAULT_NR_OPS           16
31
32 /* Pithos hash for the zero block
33  * FIXME: Should it be hardcoded?
34  */
35 #define ZERO_BLOCK \
36         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
37
38 /*
39  * Globals, holding command-line arguments
40  */
41 long cmdline_portno = -1;
42 char *cmdline_xseg_spec = NULL;
43 char *cmdline_path = NULL;
44 char *cmdline_vpath = NULL;
45 char *cmdline_pidfile = NULL;
46 int  cmdline_daemon = 0;
47 long cmdline_nr_ops = DEFAULT_NR_OPS;
48 long cmdline_verbose = 0;
49 volatile unsigned int terminated = 0;
50
51 static int usage(char *argv0)
52 {
53         fprintf(stderr,
54                         "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
55                         "where:\n"
56                         "\tPATH: path to pithos data blocks\n"
57                         "\tVPATH: path to modified volume blocks\n"
58                         "\tPORT: xseg port to listen for requests on\n"
59                         "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
60                         "request_size:extra_size:page_shift'\n"
61                         "\tNR_OPS: number of outstanding xseg requests\n"
62                         "\t-v: verbose mode\n",
63                         argv0);
64
65         return 1;
66 }
67
68 /* fdcache_node flags */
69 #define READY (1 << 1)
70
71 /* fdcache node info */
72 struct fdcache_node {
73         volatile int fd;
74         volatile unsigned int ref;
75         volatile unsigned long time;
76         volatile unsigned int flags;
77         pthread_cond_t cond;
78         char target[MAX_FILENAME_SIZE + 1];
79 };
80
81 /* pfiled context */
82 struct pfiled {
83         struct xseg *xseg;
84         struct xseg_port *xport;
85         uint32_t portno;
86         uint64_t size;
87         struct io *ios;
88         struct xq free_ops;
89         char *free_bufs;
90         long nr_ops;
91         struct sigevent sigevent;
92         uint32_t path_len;
93         uint32_t vpath_len;
94         uint64_t handled_reqs;
95         long maxfds;
96         struct fdcache_node *fdcache;
97         pthread_t *iothread;
98         pthread_mutex_t cache_lock;
99         char path[MAX_PATH_SIZE + 1];
100         char vpath[MAX_PATH_SIZE + 1];
101 };
102
103 /*
104  * pfiled specific structure 
105  * containing information on a pending I/O operation
106  */
107 struct io {
108         struct pfiled *pfiled;
109         struct xseg_request *req;
110         uint32_t state;
111         ssize_t retval;
112         long fdcacheidx;
113         pthread_cond_t cond;
114         pthread_mutex_t lock;
115 };
116
117
118 static inline int isTerminate()
119 {
120         /* ta doesn't need to be taken into account, because the main loops
121          * doesn't check the terminated flag if ta is not 0.
122          * 
123          * #ifdef ST_THREADS
124          * return (!ta & terminated);
125          * #else
126          * return terminated;
127          *  #endif
128          */
129         return terminated;
130 }
131
132 void signal_handler(int signal)
133 {      
134         terminated = 1;
135 }
136
137 static int setup_signals()
138 {      
139         int r;
140         struct sigaction sa;
141         sigemptyset(&sa.sa_mask);
142         sa.sa_flags = 0;
143         sa.sa_handler = signal_handler;
144         r = sigaction(SIGTERM, &sa, NULL);
145         if (r < 0)
146                 return r;
147         r = sigaction(SIGINT, &sa, NULL);
148         if (r < 0)
149                 return r;
150         r = sigaction(SIGQUIT, &sa, NULL);
151         if (r < 0)
152                 return r;
153         return r;
154 }
155
156
157 static unsigned long sigaction_count;
158
159 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
160 {
161         sigaction_count++;
162 }
163
164 static void log_io(char *msg, struct io *io)
165 {
166         char target[65], data[65];
167         /* null terminate name in case of req->target is less than 63 characters,
168          * and next character after name (aka first byte of next buffer) is not
169          * null
170          */
171         unsigned int end = (io->req->targetlen> 64) ? 64 : io->req->targetlen;
172         unsigned int dend = (io->req->datalen > 64) ? 64 : io->req->datalen;
173         char *req_target = xseg_get_target(io->pfiled->xseg, io->req);
174         char *req_data = xseg_get_data(io->pfiled->xseg, io->req);
175         strncpy(target, req_target, end);
176         target[end] = 0;
177         strncpy(data, req_data, 64);
178         data[dend] = 0;
179
180         fprintf(stderr,
181                         "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u, serviced: %u\n"
182                         "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
183                         msg,
184                         (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
185                         (unsigned int)io->req->op,
186                         (unsigned long long)io->req->offset,
187                         (unsigned long)io->req->size,
188                         (unsigned long)io->retval,
189                         (unsigned int)io->req->state,
190                         (unsigned long)io->req->serviced,
191                         (unsigned int)io->req->targetlen, target,
192                         (unsigned long long)io->req->datalen, data);
193 }
194
195 static struct io *alloc_io(struct pfiled *pfiled)
196 {
197         xqindex idx = xq_pop_head(&pfiled->free_ops, 1);
198         if (idx == Noneidx)
199                 return NULL;
200         return pfiled->ios + idx;
201 }
202
203 static inline void free_io(struct pfiled *pfiled, struct io *io)
204 {
205         xqindex idx = io - pfiled->ios;
206         io->req = NULL;
207         xq_append_head(&pfiled->free_ops, idx, 1);
208 }
209
210 static void complete(struct pfiled *pfiled, struct io *io)
211 {
212         struct xseg_request *req = io->req;
213         req->state |= XS_SERVED;
214         if (cmdline_verbose)
215                 log_io("complete", io);
216         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
217         xseg_signal(pfiled->xseg, p);
218         __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
219 }
220
221 static void fail(struct pfiled *pfiled, struct io *io)
222 {
223         struct xseg_request *req = io->req;
224         req->state |= XS_FAILED;
225         if (cmdline_verbose)
226                 log_io("fail", io);
227         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
228         xseg_signal(pfiled->xseg, p);
229         if (io->fdcacheidx >= 0) {
230                 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
231         }
232 }
233
234 static void handle_unknown(struct pfiled *pfiled, struct io *io)
235 {
236         struct xseg_request *req = io->req;
237         char *data = xseg_get_data(pfiled->xseg, req);
238         snprintf(data, req->datalen, "unknown request op");
239         fail(pfiled, io);
240 }
241
242 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
243 {
244         int i;
245         struct stat st;
246         uint32_t pathlen = strlen(path);
247
248         strncpy(buf, path, pathlen);
249
250         for (i = 0; i < 9; i+= 3) {
251                 buf[pathlen + i] = target[i - (i/3)];
252                 buf[pathlen + i +1] = target[i + 1 - (i/3)];
253                 buf[pathlen + i + 2] = '/';
254                 if (mkdirs == 1) {
255                         buf[pathlen + i + 3] = '\0';
256                         if (stat(buf, &st) < 0) 
257                                 if (mkdir(buf, 0700) < 0) {
258                                         perror(buf);
259                                         return errno;
260                                 }
261                 }
262         }
263
264         strncpy(&buf[pathlen + 9], target, targetlen);
265         buf[pathlen + 9 + targetlen] = '\0';
266
267         return 0;
268 }
269
270 static int dir_open(struct pfiled *pfiled, struct io *io,
271                 char *target, uint32_t targetlen, int mode)
272 {
273         int fd = -1;
274         struct fdcache_node *ce = NULL;
275         long i, lru;
276         char tmp[pfiled->path_len + targetlen + 10];
277         uint64_t min;
278         io->fdcacheidx = -1;
279         if (targetlen> MAX_FILENAME_SIZE)
280                 goto out_err;
281
282 start:
283         /* check cache */
284         pthread_mutex_lock(&pfiled->cache_lock);
285 start_locked:
286         lru = -1;
287         min = UINT64_MAX;
288         for (i = 0; i < pfiled->maxfds; i++) {
289                 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
290                                 && (pfiled->fdcache[i].flags & READY)) {
291                         min = pfiled->fdcache[i].time;
292                         lru = i;
293
294                 }
295
296                 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
297                         if (pfiled->fdcache[i].target[targetlen] == 0) {
298                                 ce = &pfiled->fdcache[i];
299                                 /* if any other io thread is currently opening
300                                  * the file, block until it succeeds or fails
301                                  */
302                                 if (!(ce->flags & READY)) {
303                                         pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
304                                         /* when ready, restart lookup */
305                                         goto start_locked;
306                                 }
307                                 /* if successfully opened */
308                                 if (ce->fd > 0) {
309                                         fd = pfiled->fdcache[i].fd;
310                                         io->fdcacheidx = i;
311                                         goto out;
312                                 }
313                                 /* else open failed for the other io thread, so
314                                  * it should fail for everyone waiting on this
315                                  * file.
316                                  */
317                                 else {
318                                         fd = -1;
319                                         io->fdcacheidx = -1;
320                                         goto out_err_unlock;
321                                 }
322                         }
323                 }
324         }
325         if (lru < 0){
326                 /* all cache entries are currently being used */
327                 pthread_mutex_unlock(&pfiled->cache_lock);
328                 goto start;
329         }
330         if (pfiled->fdcache[lru].ref){
331                 fd = -1;
332                 printf("lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
333                 goto out_err_unlock;
334         }
335         /* make room for new file */
336         ce = &pfiled->fdcache[lru];
337         /* set name here and state to not ready, for any other requests on the
338          * same target that may follow
339          */
340         strncpy(ce->target, target, targetlen);
341         ce->target[targetlen] = 0;
342         ce->flags &= ~READY;
343         pthread_mutex_unlock(&pfiled->cache_lock);
344
345         if (ce->fd >0){
346                 if (close(ce->fd) < 0){
347                         perror("close");
348                 }
349         }
350
351         /* try opening it from pithos blocker dir */
352         if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
353                 fd = -1;
354                 goto new_entry;
355         }
356
357         fd = open(tmp, O_RDWR);
358         if (fd < 0) {
359                 /* try opening it from the tmp dir */
360                 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
361                         goto new_entry;
362
363                 fd = open(tmp, O_RDWR);
364                 if (fd < 0)  {
365                         if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
366                                 fd = -1;
367                                 goto new_entry;
368                         }
369
370                         fd = open(tmp, O_RDWR | O_CREAT, 0600);         
371                         if (fd < 0)
372                                 perror(tmp);
373                 }
374         }
375
376         /* insert in cache a negative fd to indicate opening error to
377          * any other ios waiting for the file to open
378          */
379
380         /* insert in cache */
381 new_entry:
382         pthread_mutex_lock(&pfiled->cache_lock);
383         ce->fd = fd;
384         ce->ref = 0;
385         ce->flags = READY;
386         pthread_cond_broadcast(&ce->cond);
387         if (fd > 0) {
388                 io->fdcacheidx = lru;
389         }
390         else {
391                 io->fdcacheidx = -1;
392                 goto out_err_unlock;
393         }
394
395 out:
396         pfiled->handled_reqs++;
397         ce->time = pfiled->handled_reqs;
398         __sync_fetch_and_add(&ce->ref, 1);
399         pthread_mutex_unlock(&pfiled->cache_lock);
400 out_err:
401         return fd;
402
403 out_err_unlock:
404         pthread_mutex_unlock(&pfiled->cache_lock);
405         goto out_err;
406 }
407
408 static void handle_read_write(struct pfiled *pfiled, struct io *io)
409 {
410         int r, fd;
411         struct xseg_request *req = io->req;
412         char *target = xseg_get_target(pfiled->xseg, req);
413         char *data = xseg_get_data(pfiled->xseg, req);
414
415         fd = dir_open(pfiled, io, target, req->targetlen, 0);
416         if (fd < 0){
417                 perror("dir_open");
418                 fail(pfiled, io);
419                 return;
420         }
421
422         if (req != io->req)
423                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
424         if (!req->size) {
425                 if (req->flags & (XF_FLUSH | XF_FUA)) {
426                         /* No FLUSH/FUA support yet (O_SYNC ?).
427                          * note that with FLUSH/size == 0 
428                          * there will probably be a (uint64_t)-1 offset */
429                         complete(pfiled, io);
430                         return;
431                 } else {
432                         complete(pfiled, io);
433                         return;
434                 }
435         }
436
437         switch (req->op) {
438                 case X_READ:
439                         while (req->serviced < req->datalen) {
440                                 r = pread(fd, data + req->serviced, 
441                                                 req->datalen - req->serviced,
442                                                 req->offset + req->serviced);
443                                 if (r < 0) {
444                                         req->datalen = req->serviced;
445                                         perror("pread");
446                                 }
447                                 else if (r == 0) {
448                                         /* reached end of file. zero out the rest data buffer */
449                                         memset(data + req->serviced, 0, req->datalen - req->serviced);
450                                         req->serviced = req->datalen;
451                                 }
452                                 else {
453                                         req->serviced += r;
454                                 }
455                         }
456                         break;
457                 case X_WRITE:
458                         while (req->serviced < req->datalen) {
459                                 r = pwrite(fd, data + req->serviced, 
460                                                 req->datalen - req->serviced,
461                                                 req->offset + req->serviced);
462                                 if (r < 0) {
463                                         req->datalen = req->serviced;
464                                 }
465                                 else if (r == 0) {
466                                         fprintf(stderr, "write returned 0\n");
467                                         memset(data + req->serviced, 0, req->datalen - req->serviced);
468                                         req->serviced = req->datalen;
469                                 }
470                                 else {
471                                         req->serviced += r;
472                                 }
473                         }
474                         r = fsync(fd);
475                         if (r< 0) {
476                                 perror("fsync");
477                                 /* if fsync fails, then no bytes serviced correctly */
478                                 req->serviced = 0;
479                         }
480                         break;
481                 default:
482                         snprintf(data, req->datalen,
483                                         "wtf, corrupt op %u?\n", req->op);
484                         fail(pfiled, io);
485                         return;
486         }
487
488         if (req->serviced > 0 ) {
489                 complete(pfiled, io);
490         }
491         else {
492                 strerror_r(errno, data, req->datalen);
493                 fail(pfiled, io);
494         }
495         return;
496 }
497
498 static void handle_info(struct pfiled *pfiled, struct io *io)
499 {
500         struct xseg_request *req = io->req;
501         struct stat stat;
502         int fd, r;
503         uint64_t size;
504         char *target = xseg_get_target(pfiled->xseg, req);
505         char *data = xseg_get_data(pfiled->xseg, req);
506         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
507
508         fd = dir_open(pfiled, io, target, req->targetlen, 0);
509         if (fd < 0) {
510                 fail(pfiled, io);
511                 return;
512         }
513
514         r = fstat(fd, &stat);
515         if (r < 0) {
516                 perror("fstat");
517                 fail(pfiled, io);
518                 return;
519         }
520
521         size = (uint64_t)stat.st_size;
522         xinfo->size = size;
523
524         complete(pfiled, io);
525 }
526
527 static void handle_copy(struct pfiled *pfiled, struct io *io)
528 {
529         struct xseg_request *req = io->req;
530         char *target = xseg_get_target(pfiled->xseg, req);
531         char *data = xseg_get_data(pfiled->xseg, req);
532         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
533         struct stat st;
534         //FIXME is 256 enough?
535         char *buf = malloc(256);
536         int n, src, dst;
537
538         dst = dir_open(pfiled, io, target, req->targetlen, 1);
539         if (dst < 0) {
540                 fprintf(stderr, "fail in dst\n");
541                 fail(pfiled, io);
542                 return;
543         }
544
545         if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0)  {
546                 fail(pfiled, io);
547                 return;
548         }
549
550         src = open(buf, O_RDWR);
551         if (src < 0) {
552                 XSEGLOG("fail in src %s\n", buf);
553                 perror("open src");
554                 fail(pfiled, io);
555                 return;
556         }
557
558         fstat(src, &st);
559         n = sendfile(dst, src, 0, st.st_size);
560         if (n != st.st_size) {
561                 fprintf(stderr, "fail in copy\n");
562                 fail(pfiled, io);
563                 goto out;
564         }
565
566         if (n < 0) {
567                 fprintf(stderr, "fail in cp\n");
568                 fail(pfiled, io);
569                 goto out;
570         }
571
572         complete(pfiled, io);
573
574 out:
575         close(src);
576 }
577
578 static void handle_delete(struct pfiled *pfiled, struct io *io)
579 {
580         struct xseg_request *req = io->req;
581         char *buf = malloc(255);
582         int fd;
583         char *target = xseg_get_target(pfiled->xseg, req);
584
585         fd = dir_open(pfiled, io, target, req->targetlen, 0);
586         if (fd < 0) {
587                 fprintf(stderr, "fail in dir_open\n");
588                 fail(pfiled, io);
589                 return;
590         }
591
592         /* 'invalidate' cache entry */
593         if (io->fdcacheidx >= 0) {
594                 pfiled->fdcache[io->fdcacheidx].fd = -1;
595         }
596
597         close(fd);
598
599         if (create_path(buf, pfiled->vpath, target, req->targetlen, 0) < 0) {
600                 fail(pfiled, io);
601                 return;
602         }
603         unlink(buf);
604
605         complete(pfiled, io);
606
607         return;
608 }
609
610 static void handle_open(struct pfiled *pfiled, struct io *io)
611 {
612         struct xseg_request *req = io->req;
613         char *buf = malloc(MAX_FILENAME_SIZE + strlen("_lock"));
614         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + strlen("_lock"));
615         int fd;
616         char *target = xseg_get_target(pfiled->xseg, req);
617
618         if (!buf || !pathname) {
619                 fail(pfiled, io);
620                 return;
621         }
622
623         strncpy(buf, target, req->targetlen);
624         strncpy(buf+req->targetlen, "_lock", strlen("_lock"));
625
626         if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen("_lock"), 1) < 0) {
627                 goto out_fail;
628         }
629
630         fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
631         if (fd < 0)
632                 goto out_fail;
633
634         close(fd);
635         free(buf);
636         free(pathname);
637         complete(pfiled, io);
638         return;
639
640 out_fail:
641         free(buf);
642         free(pathname);
643         fail(pfiled, io);
644         return;
645 }
646
647 static void handle_close(struct pfiled *pfiled, struct io *io)
648 {
649         struct xseg_request *req = io->req;
650         char *buf = malloc(MAX_FILENAME_SIZE + strlen("_lock"));
651         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + strlen("_lock"));
652         char *target = xseg_get_target(pfiled->xseg, req);
653
654         if (!buf || !pathname) {
655                 fail(pfiled, io);
656                 return;
657         }
658
659         strncpy(buf, target, req->targetlen);
660         strncpy(buf+req->targetlen, "_lock", strlen("_lock"));
661
662         if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen("_lock"), 1) < 0) {
663                 goto out_fail;
664         }
665         unlink(pathname);
666         free(buf);
667         free(pathname);
668         complete(pfiled, io);
669         return;
670
671 out_fail:
672         free(buf);
673         free(pathname);
674         fail(pfiled, io);
675         return;
676 }
677
678 static void dispatch(struct pfiled *pfiled, struct io *io)
679 {
680         if (cmdline_verbose) { 
681                 fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
682                                 (void *)io, (void *)io->req, io->req->op);
683         }
684
685         switch (io->req->op) {
686                 case X_READ:
687                 case X_WRITE:
688                         handle_read_write(pfiled, io); break;
689                 case X_INFO:
690                         handle_info(pfiled, io); break;
691                 case X_COPY:
692                         handle_copy(pfiled, io); break;
693                 case X_DELETE:
694                         handle_delete(pfiled, io); break;
695                 case X_OPEN:
696                         handle_open(pfiled, io); break;
697                 case X_CLOSE:
698                         handle_close(pfiled, io); break;
699                         //      case X_SNAPSHOT:
700                 case X_SYNC:
701                 default:
702                         handle_unknown(pfiled, io);
703         }
704 }
705
706 static void handle_accepted(struct pfiled *pfiled, struct io *io)
707 {
708         struct xseg_request *req = io->req;
709         req->serviced = 0;
710         io->state = XS_ACCEPTED;
711         io->retval = 0;
712         dispatch(pfiled, io);
713 }
714
715 static struct io* wake_up_next_iothread(struct pfiled *pfiled)
716 {
717         struct io *io = alloc_io(pfiled);
718
719         if (io){        
720                 pthread_mutex_lock(&io->lock);
721                 pthread_cond_signal(&io->cond);
722                 pthread_mutex_unlock(&io->lock);
723         }
724         return io;
725 }
726
727 void *io_loop(void *arg)
728 {
729         struct io *io = (struct io *) arg;
730         struct pfiled *pfiled = io->pfiled;
731         struct xseg *xseg = pfiled->xseg;
732         uint32_t portno = pfiled->portno;
733         struct xseg_request *accepted;
734
735         for (;;) {
736                 accepted = NULL;
737                 if (!isTerminate())
738                         accepted = xseg_accept(xseg, portno, 0);
739                 if (accepted) {
740                         io->req = accepted;
741                         wake_up_next_iothread(pfiled);
742                         handle_accepted(pfiled, io);
743                 }
744                 else {
745                         pthread_mutex_lock(&io->lock);
746                         free_io(pfiled, io);
747                         pthread_cond_wait(&io->cond, &io->lock);
748                         pthread_mutex_unlock(&io->lock);
749                 }
750         }
751
752         return NULL;
753 }
754
755 static struct xseg *join_or_create(char *spec)
756 {
757         struct xseg_config config;
758         struct xseg *xseg;
759
760         (void)xseg_parse_spec(spec, &config);
761         xseg = xseg_join(config.type, config.name, "posix", NULL);
762         if (xseg)
763                 return xseg;
764
765         (void)xseg_create(&config);
766         return xseg_join(config.type, config.name, "posix", NULL);
767 }
768
769 static int pfiled_loop(struct pfiled *pfiled)
770 {
771         struct xseg *xseg = pfiled->xseg;
772         uint32_t portno = pfiled->portno;
773         /* GCC + pthreads glitch? */
774         struct io *io;
775
776         for (;!(isTerminate() && xq_count(&pfiled->free_ops) == pfiled->nr_ops);) {
777                 io = wake_up_next_iothread(pfiled);
778                 xseg_prepare_wait(xseg, portno);
779                 xseg_wait_signal(xseg, 1000000UL);
780         }
781
782         return 0;
783 }
784
785 static int pfiled_init(struct pfiled *pfiled)
786 {
787         struct sigaction sa;
788         int ret;
789         int i;
790
791         pfiled->sigevent.sigev_notify = SIGEV_SIGNAL;
792         pfiled->sigevent.sigev_signo = SIGIO;
793         sa.sa_sigaction = sigaction_handler;
794         sa.sa_flags = SA_SIGINFO;
795
796         if ((ret = sigemptyset(&sa.sa_mask))) {
797                 perr(PE, 0, "[sigemptyset]");
798                 goto out;
799         }
800
801         if ((ret = sigaction(SIGIO, &sa, NULL))) {
802                 perr(PE, 0, "[sigaction]");
803                 /* FIXME: Since this is an init routine, if it fails the program will
804                  * exit and clean its own stuff (mem, sigs etc). We only have to cleanup
805                  * anything xseg-related
806                  */
807                 goto out;
808         }
809
810         pfiled->nr_ops = cmdline_nr_ops;
811         pfiled->maxfds = 2 * cmdline_nr_ops;
812
813         pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
814         if(!pfiled->fdcache) {
815                 ret = -ENOMEM;
816                 perr(PE, 0, "could not allocate memory [fdcache]");
817                 goto out;
818         }
819
820
821         pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
822         if(!pfiled->free_bufs) {
823                 ret = -ENOMEM;
824                 perr(PE, 0, "could not allocate memory [free_bufs]");
825                 goto out;
826         }
827
828         pfiled->iothread = calloc(pfiled->nr_ops, sizeof(pthread_t));
829         if(!pfiled->iothread) {
830                 ret = -ENOMEM;
831                 perr(PE, 0, "could not allocate memory [iothreads]");
832                 goto out;
833         }
834
835         pfiled->ios = calloc(pfiled->nr_ops, sizeof(struct io));
836         if (!pfiled->ios) {
837                 ret = -ENOMEM;
838                 perr(PE, 0, "could not allocate memory [ios]");
839                 goto out;
840         }
841
842         for (i = 0; i < pfiled->nr_ops; i++) {
843                 pfiled->ios[i].pfiled = pfiled;
844                 pthread_cond_init(&pfiled->ios[i].cond, NULL);
845                 pthread_mutex_init(&pfiled->ios[i].lock, NULL);
846         }
847
848         xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
849                         pfiled->free_bufs);
850
851         pfiled->handled_reqs = 0;
852
853         strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
854         pfiled->path[MAX_PATH_SIZE] = 0;
855
856         strncpy(pfiled->vpath, cmdline_vpath, MAX_PATH_SIZE);
857         pfiled->vpath[MAX_PATH_SIZE] = 0;
858
859         pfiled->path_len = strlen(pfiled->path);
860         if (pfiled->path[pfiled->path_len -1] != '/'){
861                 pfiled->path[pfiled->path_len] = '/';
862                 pfiled->path[++pfiled->path_len]= 0;
863         }
864
865         pfiled->vpath_len = strlen(pfiled->vpath);
866         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
867                 pfiled->vpath[pfiled->vpath_len] = '/';
868                 pfiled->vpath[++pfiled->vpath_len]= 0;
869         }
870
871         if (xseg_initialize()) {
872                 ret = - ENOMEM;
873                 perr(PE, 0, "could not initialize xseg library");
874                 goto out;
875         }
876
877         pfiled->xseg = join_or_create(cmdline_xseg_spec);
878         if (!pfiled->xseg) {
879                 ret = -EIO;
880                 perr(PE, 0, "could not join xseg with spec '%s'\n", 
881                                 cmdline_xseg_spec);
882                 goto out_with_xseginit;
883         }
884
885         pfiled->xport = xseg_bind_port(pfiled->xseg, cmdline_portno, NULL);
886         if (!pfiled->xport) {
887                 ret = -EIO;
888                 perr(PE, 0, "could not bind to xseg port %ld", cmdline_portno);
889                 goto out_with_xsegjoin;
890         }
891
892         pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
893         perr(PI, 0, "filed on port %u/%u\n",
894                         pfiled->portno, pfiled->xseg->config.nr_ports);
895
896         if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
897                 perr(PE, 0, "cannot int local signals\n");
898                 return -1;
899         }
900
901         for (i = 0; i < pfiled->nr_ops; i++) {
902                 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
903                 pfiled->fdcache[i].flags = READY;
904         }
905         for (i = 0; i < pfiled->nr_ops; i++) {
906                 /* 
907                  * TODO: error check + cond variable to stop io from starting
908                  * unless all threads are created successfully
909                  */
910                 pthread_create(pfiled->iothread + i, NULL, io_loop, (void *) (pfiled->ios + i));
911         }
912         pthread_mutex_init(&pfiled->cache_lock, NULL);
913
914         goto out;
915
916 out_with_xsegjoin:
917         xseg_leave(pfiled->xseg);
918 out_with_xseginit:
919         xseg_finalize();
920 out:
921         return ret;
922 }
923
924 static int safe_atoi(char *s)
925 {
926         long l;
927         char *endp;
928
929         l = strtol(s, &endp, 10);
930         if (s != endp && *endp == '\0')
931                 return l;
932         else
933                 return -1;
934 }
935
936 static void parse_cmdline(int argc, char **argv)
937 {
938         char *argv0 = argv[0];
939
940         for (;;) {
941                 int c;
942
943                 opterr = 0;
944                 c = getopt(argc, argv, "dhp:n:g:vf:");
945                 if (c == -1)
946                         break;
947
948                 switch(c) {
949                         case '?':
950                                 perr(PFE, 0, "Unknown option: -%c", optopt);
951                                 break;
952                         case ':':
953                                 perr(PFE, 0, "Option -%c requires an argument",
954                                                 optopt);
955                                 break;
956                         case 'h':
957                                 usage(argv0);
958                                 exit(0);
959                                 break;
960                         case 'p':
961                                 cmdline_portno = safe_atoi(optarg);
962                                 break;
963                         case 'n':
964                                 cmdline_nr_ops = safe_atoi(optarg);
965                                 break;
966                         case 'g':
967                                 /* FIXME: Max length of spec? strdup, eww */
968                                 cmdline_xseg_spec = strdup(optarg);
969                                 if (!cmdline_xseg_spec)
970                                         perr(PFE, 0, "out of memory");
971                                 break;
972                         case 'v':
973                                 cmdline_verbose = 1;
974                                 break;
975                         case 'd':
976                                 cmdline_daemon = 1;
977                                 break;
978                         case 'f':
979                                 /* FIXME: Max length of spec? strdup, eww */
980                                 cmdline_pidfile = strdup(optarg);
981                                 if (!cmdline_pidfile)
982                                         perr(PFE, 0, "out of memory");
983                                 break;
984                 }
985         }
986
987         argc -= optind;
988         argv += optind;
989
990         /* Sanity check for all arguments */
991         if (cmdline_portno < 0) {
992                 usage(argv0);
993                 perr(PFE, 0, "no or invalid port specified");
994         }
995         if (cmdline_nr_ops < 1) {
996                 usage(argv0);
997                 perr(PFE, 0, "specified outstanding request count is invalid");
998         }
999         if (!cmdline_xseg_spec) {
1000                 usage(argv0);
1001                 perr(PFE, 0, "xseg specification is mandatory");
1002         }
1003
1004         if (argc < 2) {
1005                 usage(argv0);
1006                 perr(PFE, 0, "path and vpath specification is mandatory");
1007         }
1008
1009         cmdline_path = strdup(argv[0]);
1010         if (!cmdline_path)
1011                 perr(PFE, 0, "out of memory");
1012
1013         cmdline_vpath = strdup(argv[1]);
1014         if (!cmdline_vpath)
1015                 perr(PFE, 0, "out of memory");
1016 }
1017
1018 int pidfile_remove(char *path, int fd)
1019 {       
1020         close(fd);
1021         return (unlink(path));
1022 }
1023
1024 int pidfile_write(int pid_fd)
1025 {       
1026         char buf[16];
1027         snprintf(buf, sizeof(buf), "%ld", syscall(SYS_gettid));
1028         buf[15] = 0;
1029
1030         lseek(pid_fd, 0, SEEK_SET);
1031         int ret = write(pid_fd, buf, strlen(buf));
1032         return ret;
1033 }
1034
1035 int pidfile_read(char *path, pid_t *pid)
1036 {       
1037         char buf[16], *endptr;
1038         *pid = 0;
1039
1040         int fd = open(path, O_RDONLY);
1041         if (fd < 0)
1042                 return -1;
1043         int ret = read(fd, buf, 15);
1044         buf[15]=0;
1045         close(fd);
1046         if (ret < 0)
1047                 return -1;
1048         else{   
1049                 *pid = strtol(buf, &endptr, 10);
1050                 if (endptr != &buf[ret]){
1051                         *pid = 0;
1052                         return -1;
1053                 }
1054         }
1055         return 0;
1056 }
1057
1058 int pidfile_open(char *path, pid_t *old_pid)
1059 {       
1060         //nfs version > 3
1061         int fd = open(path, O_CREAT|O_EXCL|O_WRONLY);
1062         if (fd < 0){
1063                 if (errno == -EEXIST)
1064                         pidfile_read(path, old_pid);
1065         }
1066         return fd;
1067 }                
1068
1069 int main(int argc, char **argv)
1070 {
1071         int pid_fd = -1, r = 0;
1072         pid_t old_pid;
1073         struct pfiled pfiled;
1074
1075         init_perr("pfiled");
1076         parse_cmdline(argc, argv);
1077
1078         perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
1079         
1080         if (cmdline_pidfile){
1081                 pid_fd = pidfile_open(cmdline_pidfile, &old_pid);
1082                 if (pid_fd < 0) {
1083                         if (old_pid) {
1084                                 perr(PFE, 0, "Daemon already running, pid: %d.", old_pid);
1085                         } else {
1086                                 perr(PFE, 0, "Cannot open or create pidfile");
1087                         }
1088                         return -1;
1089                 }
1090         }
1091
1092         if (cmdline_daemon){
1093                 if (daemon(0, 1) < 0){
1094                         perr(PFE, 0, "Cannot daemonize");
1095                         r = -1;
1096                         goto out;
1097                 }
1098         }
1099         setup_signals();
1100         if (pid_fd > 0)
1101                 pidfile_write(pid_fd);
1102
1103
1104         if (pfiled_init(&pfiled) < 0){
1105                 r = -1;
1106                 goto out;
1107         }
1108
1109         r = pfiled_loop(&pfiled);
1110 out:
1111         if (pid_fd > 0)
1112                 pidfile_remove(cmdline_pidfile, pid_fd);
1113         return r;
1114
1115 }