c80599d5a1014db843c4deeff0cc8696d91bbdd0
[archipelago] / xseg / peers / user / pfiled.c
1 /*
2  * The Pithos File Blocker Peer (pfiled)
3  */
4
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <aio.h>
15 #include <signal.h>
16 #include <limits.h>
17 #include <pthread.h>
18 #include <sys/sendfile.h>
19
20 #include <xseg/xseg.h>
21 #include <xseg/protocol.h>
22
23 #include "common.h"                     /* FIXME: */
24
25 #define MAX_PATH_SIZE           1024
26 #define MAX_FILENAME_SIZE       255
27
28 /* default concurrency level (number of threads) */
29 #define DEFAULT_NR_OPS           16
30
31 /* Pithos hash for the zero block
32  * FIXME: Should it be hardcoded?
33  */
34 #define ZERO_BLOCK \
35         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
36
37 /*
38  * Globals, holding command-line arguments
39  */
40 long cmdline_portno = -1;
41 char *cmdline_xseg_spec = NULL;
42 char *cmdline_path = NULL;
43 char *cmdline_vpath = NULL;
44 long cmdline_nr_ops = DEFAULT_NR_OPS;
45 long cmdline_verbose = 0;
46
47 static int usage(char *argv0)
48 {
49         fprintf(stderr,
50                 "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
51                 "where:\n"
52                 "\tPATH: path to pithos data blocks\n"
53                 "\tVPATH: path to modified volume blocks\n"
54                 "\tPORT: xseg port to listen for requests on\n"
55                 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
56                         "request_size:extra_size:page_shift'\n"
57                 "\tNR_OPS: number of outstanding xseg requests\n"
58                 "\t-v: verbose mode\n",
59                 argv0);
60
61         return 1;
62 }
63
64 /* fdcache_node flags */
65 #define READY (1 << 1)
66
67 /* fdcache node info */
68 struct fdcache_node {
69         volatile int fd;
70         volatile unsigned int ref;
71         volatile unsigned long time;
72         volatile unsigned int flags;
73         pthread_cond_t cond;
74         char target[MAX_FILENAME_SIZE + 1];
75 };
76
77 /* pfiled context */
78 struct pfiled {
79         struct xseg *xseg;
80         struct xseg_port *xport;
81         uint32_t portno;
82         uint64_t size;
83         struct io *ios;
84         struct xq free_ops;
85         char *free_bufs;
86         long nr_ops;
87         struct sigevent sigevent;
88         uint32_t path_len;
89         uint32_t vpath_len;
90         uint64_t handled_reqs;
91         long maxfds;
92         struct fdcache_node *fdcache;
93         pthread_t *iothread;
94         pthread_mutex_t cache_lock;
95         char path[MAX_PATH_SIZE + 1];
96         char vpath[MAX_PATH_SIZE + 1];
97 };
98
99 /*
100  * pfiled specific structure 
101  * containing information on a pending I/O operation
102  */
103 struct io {
104         struct pfiled *pfiled;
105         struct xseg_request *req;
106         uint32_t state;
107         ssize_t retval;
108         long fdcacheidx;
109         pthread_cond_t cond;
110         pthread_mutex_t lock;
111 };
112
113
114 static unsigned long sigaction_count;
115
116 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
117 {
118         sigaction_count++;
119 }
120
121 static void log_io(char *msg, struct io *io)
122 {
123         char target[65], data[65];
124         /* null terminate name in case of req->target is less than 63 characters,
125          * and next character after name (aka first byte of next buffer) is not
126          * null
127          */
128         unsigned int end = (io->req->targetlen> 64) ? 64 : io->req->targetlen;
129         unsigned int dend = (io->req->datalen > 64) ? 64 : io->req->datalen;
130         char *req_target = xseg_get_target(io->pfiled->xseg, io->req);
131         char *req_data = xseg_get_data(io->pfiled->xseg, io->req);
132         strncpy(target, req_target, end);
133         target[end] = 0;
134         strncpy(data, req_data, 64);
135         data[dend] = 0;
136
137         fprintf(stderr,
138                 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u, serviced: %u\n"
139                 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
140                 msg,
141                 (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
142                 (unsigned int)io->req->op,
143                 (unsigned long long)io->req->offset,
144                 (unsigned long)io->req->size,
145                 (unsigned long)io->retval,
146                 (unsigned int)io->req->state,
147                 (unsigned long)io->req->serviced,
148                 (unsigned int)io->req->targetlen, target,
149                 (unsigned long long)io->req->datalen, data);
150 }
151
152 static struct io *alloc_io(struct pfiled *pfiled)
153 {
154         xqindex idx = xq_pop_head(&pfiled->free_ops, 1);
155         if (idx == Noneidx)
156                 return NULL;
157         return pfiled->ios + idx;
158 }
159
160 static inline void free_io(struct pfiled *pfiled, struct io *io)
161 {
162         xqindex idx = io - pfiled->ios;
163         io->req = NULL;
164         xq_append_head(&pfiled->free_ops, idx, 1);
165 }
166
167 static void complete(struct pfiled *pfiled, struct io *io)
168 {
169         struct xseg_request *req = io->req;
170         req->state |= XS_SERVED;
171         if (cmdline_verbose)
172                 log_io("complete", io);
173         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
174         xseg_signal(pfiled->xseg, p);
175         __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
176 }
177
178 static void fail(struct pfiled *pfiled, struct io *io)
179 {
180         struct xseg_request *req = io->req;
181         req->state |= XS_FAILED;
182         if (cmdline_verbose)
183                 log_io("fail", io);
184         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
185         xseg_signal(pfiled->xseg, p);
186         if (io->fdcacheidx >= 0) {
187                 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
188         }
189 }
190
191 static void handle_unknown(struct pfiled *pfiled, struct io *io)
192 {
193         struct xseg_request *req = io->req;
194         char *data = xseg_get_data(pfiled->xseg, req);
195         snprintf(data, req->datalen, "unknown request op");
196         fail(pfiled, io);
197 }
198
199 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
200 {
201         int i;
202         struct stat st;
203         uint32_t pathlen = strlen(path);
204
205         strncpy(buf, path, pathlen);
206
207         for (i = 0; i < 9; i+= 3) {
208                 buf[pathlen + i] = target[i - (i/3)];
209                 buf[pathlen + i +1] = target[i + 1 - (i/3)];
210                 buf[pathlen + i + 2] = '/';
211                 if (mkdirs == 1) {
212                         buf[pathlen + i + 3] = '\0';
213                         if (stat(buf, &st) < 0) 
214                                 if (mkdir(buf, 0600) < 0) {
215                                         perror(buf);
216                                         return errno;
217                                 }
218                 }
219         }
220
221         strncpy(&buf[pathlen + 9], target, targetlen);
222         buf[pathlen + 9 + targetlen] = '\0';
223
224         return 0;
225 }
226
227 static int dir_open(struct pfiled *pfiled, struct io *io,
228                         char *target, uint32_t targetlen, int mode)
229 {
230         int fd = -1;
231         struct fdcache_node *ce = NULL;
232         long i, lru;
233         char tmp[pfiled->path_len + targetlen + 10];
234         uint64_t min;
235         io->fdcacheidx = -1;
236         if (targetlen> MAX_FILENAME_SIZE)
237                 goto out_err;
238
239 start:
240         /* check cache */
241         pthread_mutex_lock(&pfiled->cache_lock);
242 start_locked:
243         lru = -1;
244         min = UINT64_MAX;
245         for (i = 0; i < pfiled->maxfds; i++) {
246                 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
247                                 && (pfiled->fdcache[i].flags & READY)) {
248                         min = pfiled->fdcache[i].time;
249                         lru = i;
250
251                 }
252
253                 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
254                         if (pfiled->fdcache[i].target[targetlen] == 0) {
255                                 ce = &pfiled->fdcache[i];
256                                 /* if any other io thread is currently opening
257                                  * the file, block until it succeeds or fails
258                                  */
259                                 if (!(ce->flags & READY)) {
260                                         pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
261                                         /* when ready, restart lookup */
262                                         goto start_locked;
263                                 }
264                                 /* if successfully opened */
265                                 if (ce->fd > 0) {
266                                         fd = pfiled->fdcache[i].fd;
267                                         io->fdcacheidx = i;
268                                         goto out;
269                                 }
270                                 /* else open failed for the other io thread, so
271                                  * it should fail for everyone waiting on this
272                                  * file.
273                                  */
274                                 else {
275                                         fd = -1;
276                                         io->fdcacheidx = -1;
277                                         goto out_err_unlock;
278                                 }
279                         }
280                 }
281         }
282         if (lru < 0){
283                 /* all cache entries are currently being used */
284                 pthread_mutex_unlock(&pfiled->cache_lock);
285                 goto start;
286         }
287         if (pfiled->fdcache[lru].ref){
288                 fd = -1;
289                 printf("lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
290                 goto out_err_unlock;
291         }
292         /* make room for new file */
293         ce = &pfiled->fdcache[lru];
294         /* set name here and state to not ready, for any other requests on the
295          * same target that may follow
296          */
297         strncpy(ce->target, target, targetlen);
298         ce->target[targetlen] = 0;
299         ce->flags &= ~READY;
300         pthread_mutex_unlock(&pfiled->cache_lock);
301
302         if (ce->fd >0){
303                 if (close(ce->fd) < 0){
304                         perror("close");
305                 }
306         }
307
308         /* try opening it from pithos blocker dir */
309         if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
310                 fd = -1;
311                 goto new_entry;
312         }
313         
314         fd = open(tmp, O_RDWR);
315         if (fd < 0) {
316                 /* try opening it from the tmp dir */
317                 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
318                         goto new_entry;
319
320                 fd = open(tmp, O_RDWR);
321                 if (fd < 0)  {
322                         if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
323                                 fd = -1;
324                                 goto new_entry;
325                         }
326         
327                         fd = open(tmp, O_RDWR | O_CREAT, 0600);         
328                         if (fd < 0)
329                                 perror(tmp);
330                 }
331         }
332
333         /* insert in cache a negative fd to indicate opening error to
334          * any other ios waiting for the file to open
335          */
336
337         /* insert in cache */
338 new_entry:
339         pthread_mutex_lock(&pfiled->cache_lock);
340         ce->fd = fd;
341         ce->ref = 0;
342         ce->flags = READY;
343         pthread_cond_broadcast(&ce->cond);
344         if (fd > 0) {
345                 io->fdcacheidx = lru;
346         }
347         else {
348                 io->fdcacheidx = -1;
349                 goto out_err_unlock;
350         }
351
352 out:
353         pfiled->handled_reqs++;
354         ce->time = pfiled->handled_reqs;
355         __sync_fetch_and_add(&ce->ref, 1);
356         pthread_mutex_unlock(&pfiled->cache_lock);
357 out_err:
358         return fd;
359
360 out_err_unlock:
361         pthread_mutex_unlock(&pfiled->cache_lock);
362         goto out_err;
363 }
364
365 static void handle_read_write(struct pfiled *pfiled, struct io *io)
366 {
367         int r, fd;
368         struct xseg_request *req = io->req;
369         char *target = xseg_get_target(pfiled->xseg, req);
370         char *data = xseg_get_data(pfiled->xseg, req);
371
372         fd = dir_open(pfiled, io, target, req->targetlen, 0);
373         if (fd < 0){
374                 perror("dir_open");
375                 fail(pfiled, io);
376                 return;
377         }
378
379         if (req != io->req)
380                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
381         if (!req->size) {
382                 if (req->flags & (XF_FLUSH | XF_FUA)) {
383                         /* No FLUSH/FUA support yet (O_SYNC ?).
384                          * note that with FLUSH/size == 0 
385                          * there will probably be a (uint64_t)-1 offset */
386                         complete(pfiled, io);
387                         return;
388                 } else {
389                         complete(pfiled, io);
390                         return;
391                 }
392         }
393
394         switch (req->op) {
395         case X_READ:
396                 while (req->serviced < req->datalen) {
397                         r = pread(fd, data + req->serviced, 
398                                         req->datalen - req->serviced,
399                                         req->offset + req->serviced);
400                         if (r < 0) {
401                                 req->datalen = req->serviced;
402                                 perror("pread");
403                         }
404                         else if (r == 0) {
405                                 /* reached end of file. zero out the rest data buffer */
406                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
407                                 req->serviced = req->datalen;
408                         }
409                         else {
410                                 req->serviced += r;
411                         }
412                 }
413                 break;
414         case X_WRITE:
415                 while (req->serviced < req->datalen) {
416                         r = pwrite(fd, data + req->serviced, 
417                                         req->datalen - req->serviced,
418                                         req->offset + req->serviced);
419                         if (r < 0) {
420                                 req->datalen = req->serviced;
421                         }
422                         else if (r == 0) {
423                                 fprintf(stderr, "write returned 0\n");
424                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
425                                 req->serviced = req->datalen;
426                         }
427                         else {
428                                 req->serviced += r;
429                         }
430                 }
431                 r = fsync(fd);
432                 if (r< 0) {
433                         perror("fsync");
434                         /* if fsync fails, then no bytes serviced correctly */
435                         req->serviced = 0;
436                 }
437                 break;
438         default:
439                 snprintf(data, req->datalen,
440                          "wtf, corrupt op %u?\n", req->op);
441                 fail(pfiled, io);
442                 return;
443         }
444
445         if (req->serviced > 0 ) {
446                 complete(pfiled, io);
447         }
448         else {
449                 strerror_r(errno, data, req->datalen);
450                 fail(pfiled, io);
451         }
452         return;
453 }
454
455 static void handle_info(struct pfiled *pfiled, struct io *io)
456 {
457         struct xseg_request *req = io->req;
458         struct stat stat;
459         int fd, r;
460         uint64_t size;
461         char *target = xseg_get_target(pfiled->xseg, req);
462         char *data = xseg_get_data(pfiled->xseg, req);
463         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
464
465         fd = dir_open(pfiled, io, target, req->targetlen, 0);
466         if (fd < 0) {
467                 fail(pfiled, io);
468                 return;
469         }
470
471         r = fstat(fd, &stat);
472         if (r < 0) {
473                 perror("fstat");
474                 fail(pfiled, io);
475                 return;
476         }
477
478         size = (uint64_t)stat.st_size;
479         xinfo->size = size;
480
481         complete(pfiled, io);
482 }
483
484 static void handle_copy(struct pfiled *pfiled, struct io *io)
485 {
486         struct xseg_request *req = io->req;
487         char *target = xseg_get_target(pfiled->xseg, req);
488         char *data = xseg_get_data(pfiled->xseg, req);
489         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
490         struct stat st;
491         char *buf = malloc(256);
492         int n, src, dst;
493
494         dst = dir_open(pfiled, io, target, req->targetlen, 1);
495         if (dst < 0) {
496                 fprintf(stderr, "fail in dst\n");
497                 fail(pfiled, io);
498                 return;
499         }
500
501         if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0)  {
502                 fail(pfiled, io);
503                 return;
504         }
505
506         src = open(buf, O_RDWR);
507         if (src < 0) {
508                 XSEGLOG("fail in src %s\n", buf);
509                 perror("open src");
510                 fail(pfiled, io);
511                 return;
512         }
513
514         fstat(src, &st);
515         n = sendfile(dst, src, 0, st.st_size);
516         if (n != st.st_size) {
517                 fprintf(stderr, "fail in copy\n");
518                 fail(pfiled, io);
519                 goto out;
520         }
521
522         if (n < 0) {
523                 fprintf(stderr, "fail in cp\n");
524                 fail(pfiled, io);
525                 goto out;
526         }
527
528         complete(pfiled, io);
529
530 out:
531         close(src);
532 }
533
534 static void handle_delete(struct pfiled *pfiled, struct io *io)
535 {
536         struct xseg_request *req = io->req;
537         char *buf = malloc(255);
538         int fd;
539         char *target = xseg_get_target(pfiled->xseg, req);
540         
541         fd = dir_open(pfiled, io, target, req->targetlen, 0);
542         if (fd < 0) {
543                 fprintf(stderr, "fail in dir_open\n");
544                 fail(pfiled, io);
545                 return;
546         }
547
548         /* 'invalidate' cache entry */
549         if (io->fdcacheidx >= 0) {
550                 pfiled->fdcache[io->fdcacheidx].fd = -1;
551         }
552
553         close(fd);
554
555         if (create_path(buf, pfiled->vpath, target, req->targetlen, 0) < 0) {
556                 fail(pfiled, io);
557                 return;
558         }
559         unlink(buf);
560
561         complete(pfiled, io);
562
563         return;
564 }
565
566 static void dispatch(struct pfiled *pfiled, struct io *io)
567 {
568         if (cmdline_verbose) { 
569                 fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
570                         (void *)io, (void *)io->req, io->req->op);
571         }
572
573         switch (io->req->op) {
574         case X_READ:
575         case X_WRITE:
576                 handle_read_write(pfiled, io); break;
577         case X_INFO:
578                 handle_info(pfiled, io); break;
579         case X_COPY:
580                 handle_copy(pfiled, io); break;
581         case X_DELETE:
582                 handle_delete(pfiled, io); break;
583 //      case X_SNAPSHOT:
584         case X_SYNC:
585         default:
586                 handle_unknown(pfiled, io);
587         }
588 }
589
590 static void handle_accepted(struct pfiled *pfiled, struct io *io)
591 {
592         struct xseg_request *req = io->req;
593         req->serviced = 0;
594         io->state = XS_ACCEPTED;
595         io->retval = 0;
596         dispatch(pfiled, io);
597 }
598
599 static struct io* wake_up_next_iothread(struct pfiled *pfiled)
600 {
601         struct io *io = alloc_io(pfiled);
602
603         if (io){        
604                 pthread_mutex_lock(&io->lock);
605                 pthread_cond_signal(&io->cond);
606                 pthread_mutex_unlock(&io->lock);
607         }
608         return io;
609 }
610
611 void *io_loop(void *arg)
612 {
613         struct io *io = (struct io *) arg;
614         struct pfiled *pfiled = io->pfiled;
615         struct xseg *xseg = pfiled->xseg;
616         uint32_t portno = pfiled->portno;
617         struct xseg_request *accepted;
618
619         for (;;) {
620                 accepted = NULL;
621                 accepted = xseg_accept(xseg, portno);
622                 if (accepted) {
623                         io->req = accepted;
624                         wake_up_next_iothread(pfiled);
625                         handle_accepted(pfiled, io);
626                 }
627                 else {
628                         pthread_mutex_lock(&io->lock);
629                         free_io(pfiled, io);
630                         pthread_cond_wait(&io->cond, &io->lock);
631                         pthread_mutex_unlock(&io->lock);
632                 }
633         }
634
635         return NULL;
636 }
637
638 static struct xseg *join_or_create(char *spec)
639 {
640         struct xseg_config config;
641         struct xseg *xseg;
642
643         (void)xseg_parse_spec(spec, &config);
644         xseg = xseg_join(config.type, config.name, "posix", NULL);
645         if (xseg)
646                 return xseg;
647
648         (void)xseg_create(&config);
649         return xseg_join(config.type, config.name, "posix", NULL);
650 }
651
652 static int pfiled_loop(struct pfiled *pfiled)
653 {
654         struct xseg *xseg = pfiled->xseg;
655         uint32_t portno = pfiled->portno;
656         /* GCC + pthreads glitch? */
657         struct io *io;
658
659         for (;;) {
660                 io = wake_up_next_iothread(pfiled);
661                 xseg_prepare_wait(xseg, portno);
662                 xseg_wait_signal(xseg, 1000000UL);
663         }
664
665         return 0;
666 }
667
668 static int pfiled_init(struct pfiled *pfiled)
669 {
670         struct sigaction sa;
671         int ret;
672         int i;
673
674         pfiled->sigevent.sigev_notify = SIGEV_SIGNAL;
675         pfiled->sigevent.sigev_signo = SIGIO;
676         sa.sa_sigaction = sigaction_handler;
677         sa.sa_flags = SA_SIGINFO;
678
679         if ((ret = sigemptyset(&sa.sa_mask))) {
680                 perr(PE, 0, "[sigemptyset]");
681                 goto out;
682         }
683
684         if ((ret = sigaction(SIGIO, &sa, NULL))) {
685                 perr(PE, 0, "[sigaction]");
686                 /* FIXME: Since this is an init routine, if it fails the program will
687                  * exit and clean its own stuff (mem, sigs etc). We only have to cleanup
688                  * anything xseg-related
689                  */
690                 goto out;
691         }
692
693         pfiled->nr_ops = cmdline_nr_ops;
694         pfiled->maxfds = 2 * cmdline_nr_ops;
695
696         pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
697         if(!pfiled->fdcache) {
698                 ret = -ENOMEM;
699                 perr(PE, 0, "could not allocate memory [fdcache]");
700                 goto out;
701         }
702                 
703
704         pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
705         if(!pfiled->free_bufs) {
706                 ret = -ENOMEM;
707                 perr(PE, 0, "could not allocate memory [free_bufs]");
708                 goto out;
709         }
710
711         pfiled->iothread = calloc(pfiled->nr_ops, sizeof(pthread_t));
712         if(!pfiled->iothread) {
713                 ret = -ENOMEM;
714                 perr(PE, 0, "could not allocate memory [iothreads]");
715                 goto out;
716         }
717
718         pfiled->ios = calloc(pfiled->nr_ops, sizeof(struct io));
719         if (!pfiled->ios) {
720                 ret = -ENOMEM;
721                 perr(PE, 0, "could not allocate memory [ios]");
722                 goto out;
723         }
724
725         for (i = 0; i < pfiled->nr_ops; i++) {
726                 pfiled->ios[i].pfiled = pfiled;
727                 pthread_cond_init(&pfiled->ios[i].cond, NULL);
728                 pthread_mutex_init(&pfiled->ios[i].lock, NULL);
729         }
730
731         xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
732                                 pfiled->free_bufs);
733         
734         pfiled->handled_reqs = 0;
735
736         strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
737         pfiled->path[MAX_PATH_SIZE] = 0;
738
739         strncpy(pfiled->vpath, cmdline_vpath, MAX_PATH_SIZE);
740         pfiled->vpath[MAX_PATH_SIZE] = 0;
741
742         pfiled->path_len = strlen(pfiled->path);
743         if (pfiled->path[pfiled->path_len -1] != '/'){
744                 pfiled->path[pfiled->path_len] = '/';
745                 pfiled->path[++pfiled->path_len]= 0;
746         }
747
748         pfiled->vpath_len = strlen(pfiled->vpath);
749         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
750                 pfiled->vpath[pfiled->vpath_len] = '/';
751                 pfiled->vpath[++pfiled->vpath_len]= 0;
752         }
753
754         if (xseg_initialize()) {
755                 ret = - ENOMEM;
756                 perr(PE, 0, "could not initialize xseg library");
757                 goto out;
758         }
759
760         pfiled->xseg = join_or_create(cmdline_xseg_spec);
761         if (!pfiled->xseg) {
762                 ret = -EIO;
763                 perr(PE, 0, "could not join xseg with spec '%s'\n", 
764                         cmdline_xseg_spec);
765                 goto out_with_xseginit;
766         }
767
768         pfiled->xport = xseg_bind_port(pfiled->xseg, cmdline_portno, NULL);
769         if (!pfiled->xport) {
770                 ret = -EIO;
771                 perr(PE, 0, "could not bind to xseg port %ld", cmdline_portno);
772                 goto out_with_xsegjoin;
773         }
774
775         pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
776         perr(PI, 0, "filed on port %u/%u\n",
777                 pfiled->portno, pfiled->xseg->config.nr_ports);
778
779         if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
780                 printf("cannot int local signals\n");
781                 return -1;
782         }
783
784         for (i = 0; i < pfiled->nr_ops; i++) {
785                 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
786                 pfiled->fdcache[i].flags = READY;
787         }
788         for (i = 0; i < pfiled->nr_ops; i++) {
789                 /* 
790                  * TODO: error check + cond variable to stop io from starting
791                  * unless all threads are created successfully
792                  */
793                 pthread_create(pfiled->iothread + i, NULL, io_loop, (void *) (pfiled->ios + i));
794         }
795         pthread_mutex_init(&pfiled->cache_lock, NULL);
796
797         goto out;
798
799 out_with_xsegjoin:
800         xseg_leave(pfiled->xseg);
801 out_with_xseginit:
802         xseg_finalize();
803 out:
804         return ret;
805 }
806
807 static int safe_atoi(char *s)
808 {
809         long l;
810         char *endp;
811
812         l = strtol(s, &endp, 10);
813         if (s != endp && *endp == '\0')
814                 return l;
815         else
816                 return -1;
817 }
818
819 static void parse_cmdline(int argc, char **argv)
820 {
821         char *argv0 = argv[0];
822
823         for (;;) {
824                 int c;
825
826                 opterr = 0;
827                 c = getopt(argc, argv, "hp:n:g:v");
828                 if (c == -1)
829                         break;
830                 
831                 switch(c) {
832                         case '?':
833                                 perr(PFE, 0, "Unknown option: -%c", optopt);
834                                 break;
835                         case ':':
836                                 perr(PFE, 0, "Option -%c requires an argument",
837                                         optopt);
838                                 break;
839                         case 'h':
840                                 usage(argv0);
841                                 exit(0);
842                                 break;
843                         case 'p':
844                                 cmdline_portno = safe_atoi(optarg);
845                                 break;
846                         case 'n':
847                                 cmdline_nr_ops = safe_atoi(optarg);
848                                 break;
849                         case 'g':
850                                 /* FIXME: Max length of spec? strdup, eww */
851                                 cmdline_xseg_spec = strdup(optarg);
852                                 if (!cmdline_xseg_spec)
853                                         perr(PFE, 0, "out of memory");
854                                 break;
855                         case 'v':
856                                 cmdline_verbose = 1;
857                                 break;
858                 }
859         }
860
861         argc -= optind;
862         argv += optind;
863
864         /* Sanity check for all arguments */
865         if (cmdline_portno < 0) {
866                 usage(argv0);
867                 perr(PFE, 0, "no or invalid port specified");
868         }
869         if (cmdline_nr_ops < 1) {
870                 usage(argv0);
871                 perr(PFE, 0, "specified outstanding request count is invalid");
872         }
873         if (!cmdline_xseg_spec) {
874                 usage(argv0);
875                 perr(PFE, 0, "xseg specification is mandatory");
876         }
877
878         if (argc < 2) {
879                 usage(argv0);
880                 perr(PFE, 0, "path and vpath specification is mandatory");
881         }
882
883         cmdline_path = strdup(argv[0]);
884         if (!cmdline_path)
885                 perr(PFE, 0, "out of memory");
886
887         cmdline_vpath = strdup(argv[1]);
888         if (!cmdline_vpath)
889                 perr(PFE, 0, "out of memory");
890 }
891
892 int main(int argc, char **argv)
893 {
894         struct pfiled pfiled;
895
896         init_perr("pfiled");
897         parse_cmdline(argc, argv);
898
899         perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
900
901         if (pfiled_init(&pfiled) < 0)
902                 perr(PFE, 0, "failed to initialize pfiled");
903
904         return pfiled_loop(&pfiled);
905 }