fix bugs in vlmc tool
[archipelago] / xseg / peers / user / pfiled.c
1 /*
2  * The Pithos File Blocker Peer (pfiled)
3  */
4
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <aio.h>
15 #include <signal.h>
16 #include <limits.h>
17 #include <pthread.h>
18 #include <sys/sendfile.h>
19
20 #include <xseg/xseg.h>
21 #include <xseg/protocol.h>
22
23 #include "common.h"                     /* FIXME: */
24
25 #define MAX_PATH_SIZE           1024
26 #define MAX_FILENAME_SIZE       255
27
28 /* default concurrency level (number of threads) */
29 #define DEFAULT_NR_OPS           16
30
31 /* Pithos hash for the zero block
32  * FIXME: Should it be hardcoded?
33  */
34 #define ZERO_BLOCK \
35         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
36
37 /*
38  * Globals, holding command-line arguments
39  */
40 long cmdline_portno = -1;
41 char *cmdline_xseg_spec = NULL;
42 char *cmdline_path = NULL;
43 char *cmdline_vpath = NULL;
44 long cmdline_nr_ops = DEFAULT_NR_OPS;
45 long cmdline_verbose = 0;
46
47 static int usage(char *argv0)
48 {
49         fprintf(stderr,
50                 "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
51                 "where:\n"
52                 "\tPATH: path to pithos data blocks\n"
53                 "\tVPATH: path to modified volume blocks\n"
54                 "\tPORT: xseg port to listen for requests on\n"
55                 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
56                         "request_size:extra_size:page_shift'\n"
57                 "\tNR_OPS: number of outstanding xseg requests\n"
58                 "\t-v: verbose mode\n",
59                 argv0);
60
61         return 1;
62 }
63
64 /* fdcache_node flags */
65 #define READY (1 << 1)
66
67 /* fdcache node info */
68 struct fdcache_node {
69         volatile int fd;
70         volatile unsigned int ref;
71         volatile unsigned long time;
72         volatile unsigned int flags;
73         pthread_cond_t cond;
74         char target[MAX_FILENAME_SIZE + 1];
75 };
76
77 /* pfiled context */
78 struct pfiled {
79         struct xseg *xseg;
80         struct xseg_port *xport;
81         uint32_t portno;
82         uint64_t size;
83         struct io *ios;
84         struct xq free_ops;
85         char *free_bufs;
86         long nr_ops;
87         struct sigevent sigevent;
88         uint32_t path_len;
89         uint32_t vpath_len;
90         uint64_t handled_reqs;
91         long maxfds;
92         struct fdcache_node *fdcache;
93         pthread_t *iothread;
94         pthread_mutex_t cache_lock;
95         char path[MAX_PATH_SIZE + 1];
96         char vpath[MAX_PATH_SIZE + 1];
97 };
98
99 /*
100  * pfiled specific structure 
101  * containing information on a pending I/O operation
102  */
103 struct io {
104         struct pfiled *pfiled;
105         struct xseg_request *req;
106         uint32_t state;
107         ssize_t retval;
108         long fdcacheidx;
109         pthread_cond_t cond;
110         pthread_mutex_t lock;
111 };
112
113
114 static unsigned long sigaction_count;
115
116 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
117 {
118         sigaction_count++;
119 }
120
121 static void log_io(char *msg, struct io *io)
122 {
123         char target[65], data[65];
124         /* null terminate name in case of req->target is less than 63 characters,
125          * and next character after name (aka first byte of next buffer) is not
126          * null
127          */
128         unsigned int end = (io->req->targetlen> 64) ? 64 : io->req->targetlen;
129         unsigned int dend = (io->req->datalen > 64) ? 64 : io->req->datalen;
130         char *req_target = xseg_get_target(io->pfiled->xseg, io->req);
131         char *req_data = xseg_get_data(io->pfiled->xseg, io->req);
132         strncpy(target, req_target, end);
133         target[end] = 0;
134         strncpy(data, req_data, 64);
135         data[dend] = 0;
136
137         fprintf(stderr,
138                 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u\n"
139                 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
140                 msg,
141                 (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
142                 (unsigned int)io->req->op,
143                 (unsigned long long)io->req->offset,
144                 (unsigned long)io->req->size,
145                 (unsigned long)io->retval,
146                 (unsigned int)io->req->state,
147                 (unsigned int)io->req->targetlen, target,
148                 (unsigned long long)io->req->datalen, data);
149 }
150
151 static struct io *alloc_io(struct pfiled *pfiled)
152 {
153         xqindex idx = xq_pop_head(&pfiled->free_ops, 1);
154         if (idx == Noneidx)
155                 return NULL;
156         return pfiled->ios + idx;
157 }
158
159 static inline void free_io(struct pfiled *pfiled, struct io *io)
160 {
161         xqindex idx = io - pfiled->ios;
162         io->req = NULL;
163         xq_append_head(&pfiled->free_ops, idx, 1);
164 }
165
166 static void complete(struct pfiled *pfiled, struct io *io)
167 {
168         struct xseg_request *req = io->req;
169         req->state |= XS_SERVED;
170         if (cmdline_verbose)
171                 log_io("complete", io);
172         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
173         xseg_signal(pfiled->xseg, p);
174         __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
175 }
176
177 static void fail(struct pfiled *pfiled, struct io *io)
178 {
179         struct xseg_request *req = io->req;
180         req->state |= XS_FAILED;
181         if (cmdline_verbose)
182                 log_io("fail", io);
183         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
184         xseg_signal(pfiled->xseg, p);
185         if (io->fdcacheidx >= 0) {
186                 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
187         }
188 }
189
190 static void handle_unknown(struct pfiled *pfiled, struct io *io)
191 {
192         struct xseg_request *req = io->req;
193         char *data = xseg_get_data(pfiled->xseg, req);
194         snprintf(data, req->datalen, "unknown request op");
195         fail(pfiled, io);
196 }
197
198 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
199 {
200         int i;
201         struct stat st;
202         uint32_t pathlen = strlen(path);
203         char *start;
204
205         strncpy(buf, path, pathlen);
206
207         start = strchr(target, ':');
208         if (start == NULL)
209                 start = target;
210         else 
211                 start++;
212
213         for (i = 0; i < 9; i+= 3) {
214                 buf[pathlen + i] = start[i - (i/3)];
215                 buf[pathlen + i +1] = start[i + 1 - (i/3)];
216                 buf[pathlen + i + 2] = '/';
217                 if (mkdirs == 1) {
218                         buf[pathlen + i + 3] = '\0';
219                         if (stat(buf, &st) < 0) 
220                                 if (mkdir(buf, 0600) < 0) {
221                                         perror(buf);
222                                         return errno;
223                                 }
224                 }
225         }
226
227         strncpy(&buf[pathlen + 9], target, targetlen);
228         buf[pathlen + 9 + targetlen] = '\0';
229
230         return 0;
231 }
232
233 static int dir_open(struct pfiled *pfiled, struct io *io,
234                         char *target, uint32_t targetlen, int mode)
235 {
236         int fd = -1;
237         struct fdcache_node *ce = NULL;
238         long i, lru;
239         char tmp[pfiled->path_len + targetlen + 10];
240         uint64_t min;
241         io->fdcacheidx = -1;
242         if (targetlen> MAX_FILENAME_SIZE)
243                 goto out_err;
244
245 start:
246         /* check cache */
247         pthread_mutex_lock(&pfiled->cache_lock);
248 start_locked:
249         lru = -1;
250         min = UINT64_MAX;
251         for (i = 0; i < pfiled->maxfds; i++) {
252                 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
253                                 && (pfiled->fdcache[i].flags & READY)) {
254                         min = pfiled->fdcache[i].time;
255                         lru = i;
256
257                 }
258
259                 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
260                         if (pfiled->fdcache[i].target[targetlen] == 0) {
261                                 ce = &pfiled->fdcache[i];
262                                 /* if any other io thread is currently opening
263                                  * the file, block until it succeeds or fails
264                                  */
265                                 if (!(ce->flags & READY)) {
266                                         pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
267                                         /* when ready, restart lookup */
268                                         goto start_locked;
269                                 }
270                                 /* if successfully opened */
271                                 if (ce->fd > 0) {
272                                         fd = pfiled->fdcache[i].fd;
273                                         io->fdcacheidx = i;
274                                         goto out;
275                                 }
276                                 /* else open failed for the other io thread, so
277                                  * it should fail for everyone waiting on this
278                                  * file.
279                                  */
280                                 else {
281                                         fd = -1;
282                                         io->fdcacheidx = -1;
283                                         goto out_err_unlock;
284                                 }
285                         }
286                 }
287         }
288         if (lru < 0){
289                 /* all cache entries are currently being used */
290                 pthread_mutex_unlock(&pfiled->cache_lock);
291                 goto start;
292         }
293         if (pfiled->fdcache[lru].ref){
294                 fd = -1;
295                 printf("lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
296                 goto out_err_unlock;
297         }
298         /* make room for new file */
299         ce = &pfiled->fdcache[lru];
300         /* set name here and state to not ready, for any other requests on the
301          * same target that may follow
302          */
303         strncpy(ce->target, target, targetlen);
304         ce->target[targetlen] = 0;
305         ce->flags &= ~READY;
306         pthread_mutex_unlock(&pfiled->cache_lock);
307
308         if (ce->fd >0){
309                 if (close(ce->fd) < 0){
310                         perror("close");
311                 }
312         }
313
314         /* try opening it from pithos blocker dir */
315         if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
316                 fd = -1;
317                 goto new_entry;
318         }
319         
320         fd = open(tmp, O_RDWR);
321         if (fd < 0) {
322                 /* try opening it from the tmp dir */
323                 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
324                         goto new_entry;
325
326                 fd = open(tmp, O_RDWR);
327                 if (fd < 0)  {
328                         if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
329                                 fd = -1;
330                                 goto new_entry;
331                         }
332         
333                         fd = open(tmp, O_RDWR | O_CREAT, 0600);         
334                         if (fd < 0)
335                                 perror(tmp);
336                 }
337         }
338
339         /* insert in cache a negative fd to indicate opening error to
340          * any other ios waiting for the file to open
341          */
342
343         /* insert in cache */
344 new_entry:
345         pthread_mutex_lock(&pfiled->cache_lock);
346         ce->fd = fd;
347         ce->ref = 0;
348         ce->flags = READY;
349         pthread_cond_broadcast(&ce->cond);
350         if (fd > 0) {
351                 io->fdcacheidx = lru;
352         }
353         else {
354                 io->fdcacheidx = -1;
355                 goto out_err_unlock;
356         }
357
358 out:
359         pfiled->handled_reqs++;
360         ce->time = pfiled->handled_reqs;
361         __sync_fetch_and_add(&ce->ref, 1);
362         pthread_mutex_unlock(&pfiled->cache_lock);
363 out_err:
364         return fd;
365
366 out_err_unlock:
367         pthread_mutex_unlock(&pfiled->cache_lock);
368         goto out_err;
369 }
370
371 static void handle_read_write(struct pfiled *pfiled, struct io *io)
372 {
373         int r, fd;
374         struct xseg_request *req = io->req;
375         char *target = xseg_get_target(pfiled->xseg, req);
376         char *data = xseg_get_data(pfiled->xseg, req);
377
378         fd = dir_open(pfiled, io, target, req->targetlen, 0);
379         if (fd < 0){
380                 perror("dir_open");
381                 fail(pfiled, io);
382                 return;
383         }
384
385         if (req != io->req)
386                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
387         if (!req->size) {
388                 if (req->flags & (XF_FLUSH | XF_FUA)) {
389                         /* No FLUSH/FUA support yet (O_SYNC ?).
390                          * note that with FLUSH/size == 0 
391                          * there will probably be a (uint64_t)-1 offset */
392                         complete(pfiled, io);
393                         return;
394                 } else {
395                         complete(pfiled, io);
396                         return;
397                 }
398         }
399
400         switch (req->op) {
401         case X_READ:
402                 while (req->serviced < req->datalen) {
403                         r = pread(fd, data + req->serviced, 
404                                         req->datalen - req->serviced,
405                                         req->offset + req->serviced);
406                         if (r < 0) {
407                                 req->datalen = req->serviced;
408                                 perror("pread");
409                         }
410                         else if (r == 0) {
411                                 /* reached end of file. zero out the rest data buffer */
412                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
413                                 req->serviced = req->datalen;
414                         }
415                         else {
416                                 req->serviced += r;
417                         }
418                 }
419                 break;
420         case X_WRITE:
421                 while (req->serviced < req->datalen) {
422                         r = pwrite(fd, data + req->serviced, 
423                                         req->datalen - req->serviced,
424                                         req->offset + req->serviced);
425                         if (r < 0) {
426                                 req->datalen = req->serviced;
427                         }
428                         else if (r == 0) {
429                                 /* reached end of file. zero out the rest data buffer */
430                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
431                                 req->serviced = req->datalen;
432                         }
433                         else {
434                                 req->serviced += r;
435                         }
436                 }
437                 r = fsync(fd);
438                 if (r< 0) {
439                         perror("fsync");
440                         /* if fsync fails, then no bytes serviced correctly */
441                         req->serviced = 0;
442                 }
443                 break;
444         default:
445                 snprintf(data, req->datalen,
446                          "wtf, corrupt op %u?\n", req->op);
447                 fail(pfiled, io);
448                 return;
449         }
450
451         if (req->serviced > 0 ) {
452                 complete(pfiled, io);
453         }
454         else {
455                 strerror_r(errno, data, req->datalen);
456                 fail(pfiled, io);
457         }
458         return;
459 }
460
461 static void handle_info(struct pfiled *pfiled, struct io *io)
462 {
463         struct xseg_request *req = io->req;
464         struct stat stat;
465         int fd, r;
466         uint64_t size;
467         char *target = xseg_get_target(pfiled->xseg, req);
468         char *data = xseg_get_data(pfiled->xseg, req);
469         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
470
471         fd = dir_open(pfiled, io, target, req->targetlen, 0);
472         if (fd < 0) {
473                 fail(pfiled, io);
474                 return;
475         }
476
477         r = fstat(fd, &stat);
478         if (r < 0) {
479                 perror("fstat");
480                 fail(pfiled, io);
481                 return;
482         }
483
484         size = (uint64_t)stat.st_size;
485         xinfo->size = size;
486
487         complete(pfiled, io);
488 }
489
490 static void handle_copy(struct pfiled *pfiled, struct io *io)
491 {
492         struct xseg_request *req = io->req;
493         char *target = xseg_get_target(pfiled->xseg, req);
494         char *data = xseg_get_data(pfiled->xseg, req);
495         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
496         struct stat st;
497         char *buf = malloc(256);
498         int n, src, dst;
499
500         dst = dir_open(pfiled, io, target, req->targetlen, 1);
501         if (dst < 0) {
502                 fprintf(stderr, "fail in dst\n");
503                 fail(pfiled, io);
504                 return;
505         }
506
507         if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0)  {
508                 fail(pfiled, io);
509                 return;
510         }
511
512         src = open(buf, O_RDWR);
513         if (src < 0) {
514                 fprintf(stderr, "fail in src\n");
515                 fail(pfiled, io);
516                 return;
517         }
518
519         fstat(src, &st);
520         n = sendfile(dst, src, 0, st.st_size);
521         if (n != st.st_size) {
522                 fprintf(stderr, "fail in copy\n");
523                 fail(pfiled, io);
524                 goto out;
525         }
526
527         if (n < 0) {
528                 fprintf(stderr, "fail in cp\n");
529                 fail(pfiled, io);
530                 goto out;
531         }
532
533         complete(pfiled, io);
534
535 out:
536         close(src);
537 }
538
539 static void handle_delete(struct pfiled *pfiled, struct io *io)
540 {
541         struct xseg_request *req = io->req;
542         char *buf = malloc(255);
543         int fd;
544         char *target = xseg_get_target(pfiled->xseg, req);
545         
546         fd = dir_open(pfiled, io, target, req->targetlen, 0);
547         if (fd < 0) {
548                 fprintf(stderr, "fail in dir_open\n");
549                 fail(pfiled, io);
550                 return;
551         }
552
553         /* 'invalidate' cache entry */
554         if (io->fdcacheidx >= 0) {
555                 pfiled->fdcache[io->fdcacheidx].fd = -1;
556         }
557
558         close(fd);
559
560         if (create_path(buf, pfiled->vpath, target, req->targetlen, 0) < 0) {
561                 fail(pfiled, io);
562                 return;
563         }
564         unlink(buf);
565
566         complete(pfiled, io);
567
568         return;
569 }
570
571 static void dispatch(struct pfiled *pfiled, struct io *io)
572 {
573         if (cmdline_verbose) { 
574                 fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
575                         (void *)io, (void *)io->req, io->req->op);
576         }
577
578         switch (io->req->op) {
579         case X_READ:
580         case X_WRITE:
581                 handle_read_write(pfiled, io); break;
582         case X_INFO:
583                 handle_info(pfiled, io); break;
584         case X_COPY:
585                 handle_copy(pfiled, io); break;
586         case X_DELETE:
587                 handle_delete(pfiled, io); break;
588 //      case X_SNAPSHOT:
589         case X_SYNC:
590         default:
591                 handle_unknown(pfiled, io);
592         }
593 }
594
595 static void handle_accepted(struct pfiled *pfiled, struct io *io)
596 {
597         struct xseg_request *req = io->req;
598         req->serviced = 0;
599         io->state = XS_ACCEPTED;
600         io->retval = 0;
601         dispatch(pfiled, io);
602 }
603
604 static struct io* wake_up_next_iothread(struct pfiled *pfiled)
605 {
606         struct io *io = alloc_io(pfiled);
607
608         if (io){        
609                 pthread_mutex_lock(&io->lock);
610                 pthread_cond_signal(&io->cond);
611                 pthread_mutex_unlock(&io->lock);
612         }
613         return io;
614 }
615
616 void *io_loop(void *arg)
617 {
618         struct io *io = (struct io *) arg;
619         struct pfiled *pfiled = io->pfiled;
620         struct xseg *xseg = pfiled->xseg;
621         uint32_t portno = pfiled->portno;
622         struct xseg_request *accepted;
623
624         for (;;) {
625                 accepted = NULL;
626                 accepted = xseg_accept(xseg, portno);
627                 if (accepted) {
628                         io->req = accepted;
629                         wake_up_next_iothread(pfiled);
630                         handle_accepted(pfiled, io);
631                 }
632                 else {
633                         pthread_mutex_lock(&io->lock);
634                         free_io(pfiled, io);
635                         pthread_cond_wait(&io->cond, &io->lock);
636                         pthread_mutex_unlock(&io->lock);
637                 }
638         }
639
640         return NULL;
641 }
642
643 static struct xseg *join_or_create(char *spec)
644 {
645         struct xseg_config config;
646         struct xseg *xseg;
647
648         (void)xseg_parse_spec(spec, &config);
649         xseg = xseg_join(config.type, config.name, "posix", NULL);
650         if (xseg)
651                 return xseg;
652
653         (void)xseg_create(&config);
654         return xseg_join(config.type, config.name, "posix", NULL);
655 }
656
657 static int pfiled_loop(struct pfiled *pfiled)
658 {
659         struct xseg *xseg = pfiled->xseg;
660         uint32_t portno = pfiled->portno;
661         /* GCC + pthreads glitch? */
662         struct io *io;
663
664         for (;;) {
665                 io = wake_up_next_iothread(pfiled);
666                 xseg_prepare_wait(xseg, portno);
667                 xseg_wait_signal(xseg, 1000000UL);
668         }
669
670         return 0;
671 }
672
673 static int pfiled_init(struct pfiled *pfiled)
674 {
675         struct sigaction sa;
676         int ret;
677         int i;
678
679         pfiled->sigevent.sigev_notify = SIGEV_SIGNAL;
680         pfiled->sigevent.sigev_signo = SIGIO;
681         sa.sa_sigaction = sigaction_handler;
682         sa.sa_flags = SA_SIGINFO;
683
684         if ((ret = sigemptyset(&sa.sa_mask))) {
685                 perr(PE, 0, "[sigemptyset]");
686                 goto out;
687         }
688
689         if ((ret = sigaction(SIGIO, &sa, NULL))) {
690                 perr(PE, 0, "[sigaction]");
691                 /* FIXME: Since this is an init routine, if it fails the program will
692                  * exit and clean its own stuff (mem, sigs etc). We only have to cleanup
693                  * anything xseg-related
694                  */
695                 goto out;
696         }
697
698         pfiled->nr_ops = cmdline_nr_ops;
699         pfiled->maxfds = 2 * cmdline_nr_ops;
700
701         pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
702         if(!pfiled->fdcache) {
703                 ret = -ENOMEM;
704                 perr(PE, 0, "could not allocate memory [fdcache]");
705                 goto out;
706         }
707                 
708
709         pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
710         if(!pfiled->free_bufs) {
711                 ret = -ENOMEM;
712                 perr(PE, 0, "could not allocate memory [free_bufs]");
713                 goto out;
714         }
715
716         pfiled->iothread = calloc(pfiled->nr_ops, sizeof(pthread_t));
717         if(!pfiled->iothread) {
718                 ret = -ENOMEM;
719                 perr(PE, 0, "could not allocate memory [iothreads]");
720                 goto out;
721         }
722
723         pfiled->ios = calloc(pfiled->nr_ops, sizeof(struct io));
724         if (!pfiled->ios) {
725                 ret = -ENOMEM;
726                 perr(PE, 0, "could not allocate memory [ios]");
727                 goto out;
728         }
729
730         for (i = 0; i < pfiled->nr_ops; i++) {
731                 pfiled->ios[i].pfiled = pfiled;
732                 pthread_cond_init(&pfiled->ios[i].cond, NULL);
733                 pthread_mutex_init(&pfiled->ios[i].lock, NULL);
734         }
735
736         xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
737                                 pfiled->free_bufs);
738         
739         pfiled->handled_reqs = 0;
740
741         strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
742         pfiled->path[MAX_PATH_SIZE] = 0;
743
744         strncpy(pfiled->vpath, cmdline_vpath, MAX_PATH_SIZE);
745         pfiled->vpath[MAX_PATH_SIZE] = 0;
746
747         pfiled->path_len = strlen(pfiled->path);
748         if (pfiled->path[pfiled->path_len -1] != '/'){
749                 pfiled->path[pfiled->path_len] = '/';
750                 pfiled->path[++pfiled->path_len]= 0;
751         }
752
753         pfiled->vpath_len = strlen(pfiled->vpath);
754         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
755                 pfiled->vpath[pfiled->vpath_len] = '/';
756                 pfiled->vpath[++pfiled->vpath_len]= 0;
757         }
758
759         if (xseg_initialize()) {
760                 ret = - ENOMEM;
761                 perr(PE, 0, "could not initialize xseg library");
762                 goto out;
763         }
764
765         pfiled->xseg = join_or_create(cmdline_xseg_spec);
766         if (!pfiled->xseg) {
767                 ret = -EIO;
768                 perr(PE, 0, "could not join xseg with spec '%s'\n", 
769                         cmdline_xseg_spec);
770                 goto out_with_xseginit;
771         }
772
773         pfiled->xport = xseg_bind_port(pfiled->xseg, cmdline_portno);
774         if (!pfiled->xport) {
775                 ret = -EIO;
776                 perr(PE, 0, "could not bind to xseg port %ld", cmdline_portno);
777                 goto out_with_xsegjoin;
778         }
779
780         pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
781         perr(PI, 0, "filed on port %u/%u\n",
782                 pfiled->portno, pfiled->xseg->config.nr_ports);
783
784         if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
785                 printf("cannot int local signals\n");
786                 return -1;
787         }
788
789         for (i = 0; i < pfiled->nr_ops; i++) {
790                 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
791                 pfiled->fdcache[i].flags = READY;
792         }
793         for (i = 0; i < pfiled->nr_ops; i++) {
794                 /* 
795                  * TODO: error check + cond variable to stop io from starting
796                  * unless all threads are created successfully
797                  */
798                 pthread_create(pfiled->iothread + i, NULL, io_loop, (void *) (pfiled->ios + i));
799         }
800         pthread_mutex_init(&pfiled->cache_lock, NULL);
801
802         goto out;
803
804 out_with_xsegjoin:
805         xseg_leave(pfiled->xseg);
806 out_with_xseginit:
807         xseg_finalize();
808 out:
809         return ret;
810 }
811
812 static int safe_atoi(char *s)
813 {
814         long l;
815         char *endp;
816
817         l = strtol(s, &endp, 10);
818         if (s != endp && *endp == '\0')
819                 return l;
820         else
821                 return -1;
822 }
823
824 static void parse_cmdline(int argc, char **argv)
825 {
826         char *argv0 = argv[0];
827
828         for (;;) {
829                 int c;
830
831                 opterr = 0;
832                 c = getopt(argc, argv, "hp:n:g:v");
833                 if (c == -1)
834                         break;
835                 
836                 switch(c) {
837                         case '?':
838                                 perr(PFE, 0, "Unknown option: -%c", optopt);
839                                 break;
840                         case ':':
841                                 perr(PFE, 0, "Option -%c requires an argument",
842                                         optopt);
843                                 break;
844                         case 'h':
845                                 usage(argv0);
846                                 exit(0);
847                                 break;
848                         case 'p':
849                                 cmdline_portno = safe_atoi(optarg);
850                                 break;
851                         case 'n':
852                                 cmdline_nr_ops = safe_atoi(optarg);
853                                 break;
854                         case 'g':
855                                 /* FIXME: Max length of spec? strdup, eww */
856                                 cmdline_xseg_spec = strdup(optarg);
857                                 if (!cmdline_xseg_spec)
858                                         perr(PFE, 0, "out of memory");
859                                 break;
860                         case 'v':
861                                 cmdline_verbose = 1;
862                                 break;
863                 }
864         }
865
866         argc -= optind;
867         argv += optind;
868
869         /* Sanity check for all arguments */
870         if (cmdline_portno < 0) {
871                 usage(argv0);
872                 perr(PFE, 0, "no or invalid port specified");
873         }
874         if (cmdline_nr_ops < 1) {
875                 usage(argv0);
876                 perr(PFE, 0, "specified outstanding request count is invalid");
877         }
878         if (!cmdline_xseg_spec) {
879                 usage(argv0);
880                 perr(PFE, 0, "xseg specification is mandatory");
881         }
882
883         if (argc < 2) {
884                 usage(argv0);
885                 perr(PFE, 0, "path and vpath specification is mandatory");
886         }
887
888         cmdline_path = strdup(argv[0]);
889         if (!cmdline_path)
890                 perr(PFE, 0, "out of memory");
891
892         cmdline_vpath = strdup(argv[1]);
893         if (!cmdline_vpath)
894                 perr(PFE, 0, "out of memory");
895 }
896
897 int main(int argc, char **argv)
898 {
899         struct pfiled pfiled;
900
901         init_perr("pfiled");
902         parse_cmdline(argc, argv);
903
904         perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
905
906         if (pfiled_init(&pfiled) < 0)
907                 perr(PFE, 0, "failed to initialize pfiled");
908
909         return pfiled_loop(&pfiled);
910 }