rewrite mt-mapperd based on st threads
[archipelago] / xseg / peers / user / pfiled.c
1 /*
2  * The Pithos File Blocker Peer (pfiled)
3  */
4
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <aio.h>
15 #include <signal.h>
16 #include <limits.h>
17 #include <pthread.h>
18 #include <sys/sendfile.h>
19
20 #include <xseg/xseg.h>
21 #include <xseg/protocol.h>
22
23 #include "common.h"                     /* FIXME: */
24
25 #define MAX_PATH_SIZE           1024
26 #define MAX_FILENAME_SIZE       255
27
28 /* default concurrency level (number of threads) */
29 #define DEFAULT_NR_OPS           16
30
31 /* Pithos hash for the zero block
32  * FIXME: Should it be hardcoded?
33  */
34 #define ZERO_BLOCK \
35         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
36
37 /*
38  * Globals, holding command-line arguments
39  */
40 long cmdline_portno = -1;
41 char *cmdline_xseg_spec = NULL;
42 char *cmdline_path = NULL;
43 char *cmdline_vpath = NULL;
44 long cmdline_nr_ops = DEFAULT_NR_OPS;
45 long cmdline_verbose = 0;
46
47 static int usage(char *argv0)
48 {
49         fprintf(stderr,
50                 "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
51                 "where:\n"
52                 "\tPATH: path to pithos data blocks\n"
53                 "\tVPATH: path to modified volume blocks\n"
54                 "\tPORT: xseg port to listen for requests on\n"
55                 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
56                         "request_size:extra_size:page_shift'\n"
57                 "\tNR_OPS: number of outstanding xseg requests\n"
58                 "\t-v: verbose mode\n",
59                 argv0);
60
61         return 1;
62 }
63
64 /* fdcache_node flags */
65 #define READY (1 << 1)
66
67 /* fdcache node info */
68 struct fdcache_node {
69         volatile int fd;
70         volatile unsigned int ref;
71         volatile unsigned long time;
72         volatile unsigned int flags;
73         pthread_cond_t cond;
74         char target[MAX_FILENAME_SIZE + 1];
75 };
76
77 /* pfiled context */
78 struct pfiled {
79         struct xseg *xseg;
80         struct xseg_port *xport;
81         uint32_t portno;
82         uint64_t size;
83         struct io *ios;
84         struct xq free_ops;
85         char *free_bufs;
86         long nr_ops;
87         struct sigevent sigevent;
88         uint32_t path_len;
89         uint32_t vpath_len;
90         uint64_t handled_reqs;
91         long maxfds;
92         struct fdcache_node *fdcache;
93         pthread_t *iothread;
94         pthread_mutex_t cache_lock;
95         char path[MAX_PATH_SIZE + 1];
96         char vpath[MAX_PATH_SIZE + 1];
97 };
98
99 /*
100  * pfiled specific structure 
101  * containing information on a pending I/O operation
102  */
103 struct io {
104         struct pfiled *pfiled;
105         struct xseg_request *req;
106         uint32_t state;
107         ssize_t retval;
108         long fdcacheidx;
109         pthread_cond_t cond;
110         pthread_mutex_t lock;
111 };
112
113
114 static unsigned long sigaction_count;
115
116 static void sigaction_handler(int sig, siginfo_t *siginfo, void *arg)
117 {
118         sigaction_count++;
119 }
120
121 static void log_io(char *msg, struct io *io)
122 {
123         char target[65], data[65];
124         /* null terminate name in case of req->target is less than 63 characters,
125          * and next character after name (aka first byte of next buffer) is not
126          * null
127          */
128         unsigned int end = (io->req->targetlen> 64) ? 64 : io->req->targetlen;
129         unsigned int dend = (io->req->datalen > 64) ? 64 : io->req->datalen;
130         char *req_target = xseg_get_target(io->pfiled->xseg, io->req);
131         char *req_data = xseg_get_data(io->pfiled->xseg, io->req);
132         strncpy(target, req_target, end);
133         target[end] = 0;
134         strncpy(data, req_data, 64);
135         data[dend] = 0;
136
137         fprintf(stderr,
138                 "%s: fd:%u, op:%u offset: %llu size: %lu retval: %lu, reqstate: %u, serviced: %u\n"
139                 "target[%u]: '%s', data[%llu]:\n%s------------------\n\n",
140                 msg,
141                 (unsigned int)io->fdcacheidx, /* this is cacheidx not fd */
142                 (unsigned int)io->req->op,
143                 (unsigned long long)io->req->offset,
144                 (unsigned long)io->req->size,
145                 (unsigned long)io->retval,
146                 (unsigned int)io->req->state,
147                 (unsigned long)io->req->serviced,
148                 (unsigned int)io->req->targetlen, target,
149                 (unsigned long long)io->req->datalen, data);
150 }
151
152 static struct io *alloc_io(struct pfiled *pfiled)
153 {
154         xqindex idx = xq_pop_head(&pfiled->free_ops, 1);
155         if (idx == Noneidx)
156                 return NULL;
157         return pfiled->ios + idx;
158 }
159
160 static inline void free_io(struct pfiled *pfiled, struct io *io)
161 {
162         xqindex idx = io - pfiled->ios;
163         io->req = NULL;
164         xq_append_head(&pfiled->free_ops, idx, 1);
165 }
166
167 static void complete(struct pfiled *pfiled, struct io *io)
168 {
169         struct xseg_request *req = io->req;
170         req->state |= XS_SERVED;
171         if (cmdline_verbose)
172                 log_io("complete", io);
173         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
174         xseg_signal(pfiled->xseg, p);
175         __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
176 }
177
178 static void fail(struct pfiled *pfiled, struct io *io)
179 {
180         struct xseg_request *req = io->req;
181         req->state |= XS_FAILED;
182         if (cmdline_verbose)
183                 log_io("fail", io);
184         xport p = xseg_respond(pfiled->xseg, req, pfiled->portno, X_ALLOC);
185         xseg_signal(pfiled->xseg, p);
186         if (io->fdcacheidx >= 0) {
187                 __sync_fetch_and_sub(&pfiled->fdcache[io->fdcacheidx].ref, 1);
188         }
189 }
190
191 static void handle_unknown(struct pfiled *pfiled, struct io *io)
192 {
193         struct xseg_request *req = io->req;
194         char *data = xseg_get_data(pfiled->xseg, req);
195         snprintf(data, req->datalen, "unknown request op");
196         fail(pfiled, io);
197 }
198
199 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
200 {
201         int i;
202         struct stat st;
203         uint32_t pathlen = strlen(path);
204
205         strncpy(buf, path, pathlen);
206
207         for (i = 0; i < 9; i+= 3) {
208                 buf[pathlen + i] = target[i - (i/3)];
209                 buf[pathlen + i +1] = target[i + 1 - (i/3)];
210                 buf[pathlen + i + 2] = '/';
211                 if (mkdirs == 1) {
212                         buf[pathlen + i + 3] = '\0';
213                         if (stat(buf, &st) < 0) 
214                                 if (mkdir(buf, 0700) < 0) {
215                                         perror(buf);
216                                         return errno;
217                                 }
218                 }
219         }
220
221         strncpy(&buf[pathlen + 9], target, targetlen);
222         buf[pathlen + 9 + targetlen] = '\0';
223
224         return 0;
225 }
226
227 static int dir_open(struct pfiled *pfiled, struct io *io,
228                         char *target, uint32_t targetlen, int mode)
229 {
230         int fd = -1;
231         struct fdcache_node *ce = NULL;
232         long i, lru;
233         char tmp[pfiled->path_len + targetlen + 10];
234         uint64_t min;
235         io->fdcacheidx = -1;
236         if (targetlen> MAX_FILENAME_SIZE)
237                 goto out_err;
238
239 start:
240         /* check cache */
241         pthread_mutex_lock(&pfiled->cache_lock);
242 start_locked:
243         lru = -1;
244         min = UINT64_MAX;
245         for (i = 0; i < pfiled->maxfds; i++) {
246                 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
247                                 && (pfiled->fdcache[i].flags & READY)) {
248                         min = pfiled->fdcache[i].time;
249                         lru = i;
250
251                 }
252
253                 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
254                         if (pfiled->fdcache[i].target[targetlen] == 0) {
255                                 ce = &pfiled->fdcache[i];
256                                 /* if any other io thread is currently opening
257                                  * the file, block until it succeeds or fails
258                                  */
259                                 if (!(ce->flags & READY)) {
260                                         pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
261                                         /* when ready, restart lookup */
262                                         goto start_locked;
263                                 }
264                                 /* if successfully opened */
265                                 if (ce->fd > 0) {
266                                         fd = pfiled->fdcache[i].fd;
267                                         io->fdcacheidx = i;
268                                         goto out;
269                                 }
270                                 /* else open failed for the other io thread, so
271                                  * it should fail for everyone waiting on this
272                                  * file.
273                                  */
274                                 else {
275                                         fd = -1;
276                                         io->fdcacheidx = -1;
277                                         goto out_err_unlock;
278                                 }
279                         }
280                 }
281         }
282         if (lru < 0){
283                 /* all cache entries are currently being used */
284                 pthread_mutex_unlock(&pfiled->cache_lock);
285                 goto start;
286         }
287         if (pfiled->fdcache[lru].ref){
288                 fd = -1;
289                 printf("lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
290                 goto out_err_unlock;
291         }
292         /* make room for new file */
293         ce = &pfiled->fdcache[lru];
294         /* set name here and state to not ready, for any other requests on the
295          * same target that may follow
296          */
297         strncpy(ce->target, target, targetlen);
298         ce->target[targetlen] = 0;
299         ce->flags &= ~READY;
300         pthread_mutex_unlock(&pfiled->cache_lock);
301
302         if (ce->fd >0){
303                 if (close(ce->fd) < 0){
304                         perror("close");
305                 }
306         }
307
308         /* try opening it from pithos blocker dir */
309         if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
310                 fd = -1;
311                 goto new_entry;
312         }
313         
314         fd = open(tmp, O_RDWR);
315         if (fd < 0) {
316                 /* try opening it from the tmp dir */
317                 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
318                         goto new_entry;
319
320                 fd = open(tmp, O_RDWR);
321                 if (fd < 0)  {
322                         if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
323                                 fd = -1;
324                                 goto new_entry;
325                         }
326         
327                         fd = open(tmp, O_RDWR | O_CREAT, 0600);         
328                         if (fd < 0)
329                                 perror(tmp);
330                 }
331         }
332
333         /* insert in cache a negative fd to indicate opening error to
334          * any other ios waiting for the file to open
335          */
336
337         /* insert in cache */
338 new_entry:
339         pthread_mutex_lock(&pfiled->cache_lock);
340         ce->fd = fd;
341         ce->ref = 0;
342         ce->flags = READY;
343         pthread_cond_broadcast(&ce->cond);
344         if (fd > 0) {
345                 io->fdcacheidx = lru;
346         }
347         else {
348                 io->fdcacheidx = -1;
349                 goto out_err_unlock;
350         }
351
352 out:
353         pfiled->handled_reqs++;
354         ce->time = pfiled->handled_reqs;
355         __sync_fetch_and_add(&ce->ref, 1);
356         pthread_mutex_unlock(&pfiled->cache_lock);
357 out_err:
358         return fd;
359
360 out_err_unlock:
361         pthread_mutex_unlock(&pfiled->cache_lock);
362         goto out_err;
363 }
364
365 static void handle_read_write(struct pfiled *pfiled, struct io *io)
366 {
367         int r, fd;
368         struct xseg_request *req = io->req;
369         char *target = xseg_get_target(pfiled->xseg, req);
370         char *data = xseg_get_data(pfiled->xseg, req);
371
372         fd = dir_open(pfiled, io, target, req->targetlen, 0);
373         if (fd < 0){
374                 perror("dir_open");
375                 fail(pfiled, io);
376                 return;
377         }
378
379         if (req != io->req)
380                 printf("0.%p vs %p!\n", (void *)req, (void *)io->req);
381         if (!req->size) {
382                 if (req->flags & (XF_FLUSH | XF_FUA)) {
383                         /* No FLUSH/FUA support yet (O_SYNC ?).
384                          * note that with FLUSH/size == 0 
385                          * there will probably be a (uint64_t)-1 offset */
386                         complete(pfiled, io);
387                         return;
388                 } else {
389                         complete(pfiled, io);
390                         return;
391                 }
392         }
393
394         switch (req->op) {
395         case X_READ:
396                 while (req->serviced < req->datalen) {
397                         r = pread(fd, data + req->serviced, 
398                                         req->datalen - req->serviced,
399                                         req->offset + req->serviced);
400                         if (r < 0) {
401                                 req->datalen = req->serviced;
402                                 perror("pread");
403                         }
404                         else if (r == 0) {
405                                 /* reached end of file. zero out the rest data buffer */
406                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
407                                 req->serviced = req->datalen;
408                         }
409                         else {
410                                 req->serviced += r;
411                         }
412                 }
413                 break;
414         case X_WRITE:
415                 while (req->serviced < req->datalen) {
416                         r = pwrite(fd, data + req->serviced, 
417                                         req->datalen - req->serviced,
418                                         req->offset + req->serviced);
419                         if (r < 0) {
420                                 req->datalen = req->serviced;
421                         }
422                         else if (r == 0) {
423                                 fprintf(stderr, "write returned 0\n");
424                                 memset(data + req->serviced, 0, req->datalen - req->serviced);
425                                 req->serviced = req->datalen;
426                         }
427                         else {
428                                 req->serviced += r;
429                         }
430                 }
431                 r = fsync(fd);
432                 if (r< 0) {
433                         perror("fsync");
434                         /* if fsync fails, then no bytes serviced correctly */
435                         req->serviced = 0;
436                 }
437                 break;
438         default:
439                 snprintf(data, req->datalen,
440                          "wtf, corrupt op %u?\n", req->op);
441                 fail(pfiled, io);
442                 return;
443         }
444
445         if (req->serviced > 0 ) {
446                 complete(pfiled, io);
447         }
448         else {
449                 strerror_r(errno, data, req->datalen);
450                 fail(pfiled, io);
451         }
452         return;
453 }
454
455 static void handle_info(struct pfiled *pfiled, struct io *io)
456 {
457         struct xseg_request *req = io->req;
458         struct stat stat;
459         int fd, r;
460         uint64_t size;
461         char *target = xseg_get_target(pfiled->xseg, req);
462         char *data = xseg_get_data(pfiled->xseg, req);
463         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
464
465         fd = dir_open(pfiled, io, target, req->targetlen, 0);
466         if (fd < 0) {
467                 fail(pfiled, io);
468                 return;
469         }
470
471         r = fstat(fd, &stat);
472         if (r < 0) {
473                 perror("fstat");
474                 fail(pfiled, io);
475                 return;
476         }
477
478         size = (uint64_t)stat.st_size;
479         xinfo->size = size;
480
481         complete(pfiled, io);
482 }
483
484 static void handle_copy(struct pfiled *pfiled, struct io *io)
485 {
486         struct xseg_request *req = io->req;
487         char *target = xseg_get_target(pfiled->xseg, req);
488         char *data = xseg_get_data(pfiled->xseg, req);
489         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
490         struct stat st;
491         //FIXME is 256 enough?
492         char *buf = malloc(256);
493         int n, src, dst;
494
495         dst = dir_open(pfiled, io, target, req->targetlen, 1);
496         if (dst < 0) {
497                 fprintf(stderr, "fail in dst\n");
498                 fail(pfiled, io);
499                 return;
500         }
501
502         if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0)  {
503                 fail(pfiled, io);
504                 return;
505         }
506
507         src = open(buf, O_RDWR);
508         if (src < 0) {
509                 XSEGLOG("fail in src %s\n", buf);
510                 perror("open src");
511                 fail(pfiled, io);
512                 return;
513         }
514
515         fstat(src, &st);
516         n = sendfile(dst, src, 0, st.st_size);
517         if (n != st.st_size) {
518                 fprintf(stderr, "fail in copy\n");
519                 fail(pfiled, io);
520                 goto out;
521         }
522
523         if (n < 0) {
524                 fprintf(stderr, "fail in cp\n");
525                 fail(pfiled, io);
526                 goto out;
527         }
528
529         complete(pfiled, io);
530
531 out:
532         close(src);
533 }
534
535 static void handle_delete(struct pfiled *pfiled, struct io *io)
536 {
537         struct xseg_request *req = io->req;
538         char *buf = malloc(255);
539         int fd;
540         char *target = xseg_get_target(pfiled->xseg, req);
541         
542         fd = dir_open(pfiled, io, target, req->targetlen, 0);
543         if (fd < 0) {
544                 fprintf(stderr, "fail in dir_open\n");
545                 fail(pfiled, io);
546                 return;
547         }
548
549         /* 'invalidate' cache entry */
550         if (io->fdcacheidx >= 0) {
551                 pfiled->fdcache[io->fdcacheidx].fd = -1;
552         }
553
554         close(fd);
555
556         if (create_path(buf, pfiled->vpath, target, req->targetlen, 0) < 0) {
557                 fail(pfiled, io);
558                 return;
559         }
560         unlink(buf);
561
562         complete(pfiled, io);
563
564         return;
565 }
566
567 static void handle_open(struct pfiled *pfiled, struct io *io)
568 {
569         struct xseg_request *req = io->req;
570         char *buf = malloc(MAX_FILENAME_SIZE + strlen("_lock"));
571         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + strlen("_lock"));
572         int fd;
573         char *target = xseg_get_target(pfiled->xseg, req);
574
575         if (!buf || !pathname) {
576                 fail(pfiled, io);
577                 return;
578         }
579
580         strncpy(buf, target, req->targetlen);
581         strncpy(buf+req->targetlen, "_lock", strlen("_lock"));
582
583         if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen("_lock"), 1) < 0) {
584                 goto out_fail;
585         }
586
587         fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
588         if (fd < 0)
589                 goto out_fail;
590         
591         close(fd);
592         free(buf);
593         free(pathname);
594         complete(pfiled, io);
595         return;
596
597 out_fail:
598         free(buf);
599         free(pathname);
600         fail(pfiled, io);
601         return;
602 }
603
604 static void handle_close(struct pfiled *pfiled, struct io *io)
605 {
606         struct xseg_request *req = io->req;
607         char *buf = malloc(MAX_FILENAME_SIZE + strlen("_lock"));
608         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + strlen("_lock"));
609         char *target = xseg_get_target(pfiled->xseg, req);
610
611         if (!buf || !pathname) {
612                 fail(pfiled, io);
613                 return;
614         }
615
616         strncpy(buf, target, req->targetlen);
617         strncpy(buf+req->targetlen, "_lock", strlen("_lock"));
618
619         if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen("_lock"), 1) < 0) {
620                 goto out_fail;
621         }
622         unlink(pathname);
623         free(buf);
624         free(pathname);
625         complete(pfiled, io);
626         return;
627
628 out_fail:
629         free(buf);
630         free(pathname);
631         fail(pfiled, io);
632         return;
633 }
634
635 static void dispatch(struct pfiled *pfiled, struct io *io)
636 {
637         if (cmdline_verbose) { 
638                 fprintf(stderr, "io: 0x%p, req: 0x%p, op %u\n",
639                         (void *)io, (void *)io->req, io->req->op);
640         }
641
642         switch (io->req->op) {
643         case X_READ:
644         case X_WRITE:
645                 handle_read_write(pfiled, io); break;
646         case X_INFO:
647                 handle_info(pfiled, io); break;
648         case X_COPY:
649                 handle_copy(pfiled, io); break;
650         case X_DELETE:
651                 handle_delete(pfiled, io); break;
652         case X_OPEN:
653                 handle_open(pfiled, io); break;
654         case X_CLOSE:
655                 handle_close(pfiled, io); break;
656 //      case X_SNAPSHOT:
657         case X_SYNC:
658         default:
659                 handle_unknown(pfiled, io);
660         }
661 }
662
663 static void handle_accepted(struct pfiled *pfiled, struct io *io)
664 {
665         struct xseg_request *req = io->req;
666         req->serviced = 0;
667         io->state = XS_ACCEPTED;
668         io->retval = 0;
669         dispatch(pfiled, io);
670 }
671
672 static struct io* wake_up_next_iothread(struct pfiled *pfiled)
673 {
674         struct io *io = alloc_io(pfiled);
675
676         if (io){        
677                 pthread_mutex_lock(&io->lock);
678                 pthread_cond_signal(&io->cond);
679                 pthread_mutex_unlock(&io->lock);
680         }
681         return io;
682 }
683
684 void *io_loop(void *arg)
685 {
686         struct io *io = (struct io *) arg;
687         struct pfiled *pfiled = io->pfiled;
688         struct xseg *xseg = pfiled->xseg;
689         uint32_t portno = pfiled->portno;
690         struct xseg_request *accepted;
691
692         for (;;) {
693                 accepted = NULL;
694                 accepted = xseg_accept(xseg, portno, 0);
695                 if (accepted) {
696                         io->req = accepted;
697                         wake_up_next_iothread(pfiled);
698                         handle_accepted(pfiled, io);
699                 }
700                 else {
701                         pthread_mutex_lock(&io->lock);
702                         free_io(pfiled, io);
703                         pthread_cond_wait(&io->cond, &io->lock);
704                         pthread_mutex_unlock(&io->lock);
705                 }
706         }
707
708         return NULL;
709 }
710
711 static struct xseg *join_or_create(char *spec)
712 {
713         struct xseg_config config;
714         struct xseg *xseg;
715
716         (void)xseg_parse_spec(spec, &config);
717         xseg = xseg_join(config.type, config.name, "posix", NULL);
718         if (xseg)
719                 return xseg;
720
721         (void)xseg_create(&config);
722         return xseg_join(config.type, config.name, "posix", NULL);
723 }
724
725 static int pfiled_loop(struct pfiled *pfiled)
726 {
727         struct xseg *xseg = pfiled->xseg;
728         uint32_t portno = pfiled->portno;
729         /* GCC + pthreads glitch? */
730         struct io *io;
731
732         for (;;) {
733                 io = wake_up_next_iothread(pfiled);
734                 xseg_prepare_wait(xseg, portno);
735                 xseg_wait_signal(xseg, 1000000UL);
736         }
737
738         return 0;
739 }
740
741 static int pfiled_init(struct pfiled *pfiled)
742 {
743         struct sigaction sa;
744         int ret;
745         int i;
746
747         pfiled->sigevent.sigev_notify = SIGEV_SIGNAL;
748         pfiled->sigevent.sigev_signo = SIGIO;
749         sa.sa_sigaction = sigaction_handler;
750         sa.sa_flags = SA_SIGINFO;
751
752         if ((ret = sigemptyset(&sa.sa_mask))) {
753                 perr(PE, 0, "[sigemptyset]");
754                 goto out;
755         }
756
757         if ((ret = sigaction(SIGIO, &sa, NULL))) {
758                 perr(PE, 0, "[sigaction]");
759                 /* FIXME: Since this is an init routine, if it fails the program will
760                  * exit and clean its own stuff (mem, sigs etc). We only have to cleanup
761                  * anything xseg-related
762                  */
763                 goto out;
764         }
765
766         pfiled->nr_ops = cmdline_nr_ops;
767         pfiled->maxfds = 2 * cmdline_nr_ops;
768
769         pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
770         if(!pfiled->fdcache) {
771                 ret = -ENOMEM;
772                 perr(PE, 0, "could not allocate memory [fdcache]");
773                 goto out;
774         }
775                 
776
777         pfiled->free_bufs = calloc(pfiled->nr_ops, sizeof(xqindex));
778         if(!pfiled->free_bufs) {
779                 ret = -ENOMEM;
780                 perr(PE, 0, "could not allocate memory [free_bufs]");
781                 goto out;
782         }
783
784         pfiled->iothread = calloc(pfiled->nr_ops, sizeof(pthread_t));
785         if(!pfiled->iothread) {
786                 ret = -ENOMEM;
787                 perr(PE, 0, "could not allocate memory [iothreads]");
788                 goto out;
789         }
790
791         pfiled->ios = calloc(pfiled->nr_ops, sizeof(struct io));
792         if (!pfiled->ios) {
793                 ret = -ENOMEM;
794                 perr(PE, 0, "could not allocate memory [ios]");
795                 goto out;
796         }
797
798         for (i = 0; i < pfiled->nr_ops; i++) {
799                 pfiled->ios[i].pfiled = pfiled;
800                 pthread_cond_init(&pfiled->ios[i].cond, NULL);
801                 pthread_mutex_init(&pfiled->ios[i].lock, NULL);
802         }
803
804         xq_init_seq(&pfiled->free_ops, pfiled->nr_ops, pfiled->nr_ops,
805                                 pfiled->free_bufs);
806         
807         pfiled->handled_reqs = 0;
808
809         strncpy(pfiled->path, cmdline_path, MAX_PATH_SIZE);
810         pfiled->path[MAX_PATH_SIZE] = 0;
811
812         strncpy(pfiled->vpath, cmdline_vpath, MAX_PATH_SIZE);
813         pfiled->vpath[MAX_PATH_SIZE] = 0;
814
815         pfiled->path_len = strlen(pfiled->path);
816         if (pfiled->path[pfiled->path_len -1] != '/'){
817                 pfiled->path[pfiled->path_len] = '/';
818                 pfiled->path[++pfiled->path_len]= 0;
819         }
820
821         pfiled->vpath_len = strlen(pfiled->vpath);
822         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
823                 pfiled->vpath[pfiled->vpath_len] = '/';
824                 pfiled->vpath[++pfiled->vpath_len]= 0;
825         }
826
827         if (xseg_initialize()) {
828                 ret = - ENOMEM;
829                 perr(PE, 0, "could not initialize xseg library");
830                 goto out;
831         }
832
833         pfiled->xseg = join_or_create(cmdline_xseg_spec);
834         if (!pfiled->xseg) {
835                 ret = -EIO;
836                 perr(PE, 0, "could not join xseg with spec '%s'\n", 
837                         cmdline_xseg_spec);
838                 goto out_with_xseginit;
839         }
840
841         pfiled->xport = xseg_bind_port(pfiled->xseg, cmdline_portno, NULL);
842         if (!pfiled->xport) {
843                 ret = -EIO;
844                 perr(PE, 0, "could not bind to xseg port %ld", cmdline_portno);
845                 goto out_with_xsegjoin;
846         }
847
848         pfiled->portno = xseg_portno(pfiled->xseg, pfiled->xport);
849         perr(PI, 0, "filed on port %u/%u\n",
850                 pfiled->portno, pfiled->xseg->config.nr_ports);
851
852         if (xseg_init_local_signal(pfiled->xseg, pfiled->portno) < 0){
853                 printf("cannot int local signals\n");
854                 return -1;
855         }
856
857         for (i = 0; i < pfiled->nr_ops; i++) {
858                 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
859                 pfiled->fdcache[i].flags = READY;
860         }
861         for (i = 0; i < pfiled->nr_ops; i++) {
862                 /* 
863                  * TODO: error check + cond variable to stop io from starting
864                  * unless all threads are created successfully
865                  */
866                 pthread_create(pfiled->iothread + i, NULL, io_loop, (void *) (pfiled->ios + i));
867         }
868         pthread_mutex_init(&pfiled->cache_lock, NULL);
869
870         goto out;
871
872 out_with_xsegjoin:
873         xseg_leave(pfiled->xseg);
874 out_with_xseginit:
875         xseg_finalize();
876 out:
877         return ret;
878 }
879
880 static int safe_atoi(char *s)
881 {
882         long l;
883         char *endp;
884
885         l = strtol(s, &endp, 10);
886         if (s != endp && *endp == '\0')
887                 return l;
888         else
889                 return -1;
890 }
891
892 static void parse_cmdline(int argc, char **argv)
893 {
894         char *argv0 = argv[0];
895
896         for (;;) {
897                 int c;
898
899                 opterr = 0;
900                 c = getopt(argc, argv, "hp:n:g:v");
901                 if (c == -1)
902                         break;
903                 
904                 switch(c) {
905                         case '?':
906                                 perr(PFE, 0, "Unknown option: -%c", optopt);
907                                 break;
908                         case ':':
909                                 perr(PFE, 0, "Option -%c requires an argument",
910                                         optopt);
911                                 break;
912                         case 'h':
913                                 usage(argv0);
914                                 exit(0);
915                                 break;
916                         case 'p':
917                                 cmdline_portno = safe_atoi(optarg);
918                                 break;
919                         case 'n':
920                                 cmdline_nr_ops = safe_atoi(optarg);
921                                 break;
922                         case 'g':
923                                 /* FIXME: Max length of spec? strdup, eww */
924                                 cmdline_xseg_spec = strdup(optarg);
925                                 if (!cmdline_xseg_spec)
926                                         perr(PFE, 0, "out of memory");
927                                 break;
928                         case 'v':
929                                 cmdline_verbose = 1;
930                                 break;
931                 }
932         }
933
934         argc -= optind;
935         argv += optind;
936
937         /* Sanity check for all arguments */
938         if (cmdline_portno < 0) {
939                 usage(argv0);
940                 perr(PFE, 0, "no or invalid port specified");
941         }
942         if (cmdline_nr_ops < 1) {
943                 usage(argv0);
944                 perr(PFE, 0, "specified outstanding request count is invalid");
945         }
946         if (!cmdline_xseg_spec) {
947                 usage(argv0);
948                 perr(PFE, 0, "xseg specification is mandatory");
949         }
950
951         if (argc < 2) {
952                 usage(argv0);
953                 perr(PFE, 0, "path and vpath specification is mandatory");
954         }
955
956         cmdline_path = strdup(argv[0]);
957         if (!cmdline_path)
958                 perr(PFE, 0, "out of memory");
959
960         cmdline_vpath = strdup(argv[1]);
961         if (!cmdline_vpath)
962                 perr(PFE, 0, "out of memory");
963 }
964
965 int main(int argc, char **argv)
966 {
967         struct pfiled pfiled;
968
969         init_perr("pfiled");
970         parse_cmdline(argc, argv);
971
972         perr(PI, 0, "p = %ld, nr_ops = %lu\n", cmdline_portno, cmdline_nr_ops);
973
974         if (pfiled_init(&pfiled) < 0)
975                 perr(PFE, 0, "failed to initialize pfiled");
976
977         return pfiled_loop(&pfiled);
978 }