Merge branch 'xseg-refactor' into debian
[archipelago] / xseg / peers / user / mt-pfiled.c
1 /*
2  * The Pithos File Blocker Peer (pfiled)
3  */
4
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <unistd.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <aio.h>
15 #include <signal.h>
16 #include <limits.h>
17 #include <pthread.h>
18 #include <syscall.h>
19 #include <sys/sendfile.h>
20 #include <peer.h>
21
22 #include <xseg/xseg.h>
23 #include <xseg/protocol.h>
24
25 #define LOCK_SUFFIX             "_lock"
26 #define MAX_PATH_SIZE           1024
27 #define MAX_FILENAME_SIZE       (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX)
28 #define MAX_PREFIX_LEN          10
29
30 /* default concurrency level (number of threads) */
31 #define DEFAULT_NR_OPS           16
32
33 /* Pithos hash for the zero block
34  * FIXME: Should it be hardcoded?
35  */
36 #define ZERO_BLOCK \
37         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
38
39 /*
40  * Globals, holding command-line arguments
41  */
42
43 void custom_peer_usage(char *argv0)
44 {
45         fprintf(stderr, "Custom peer options:\n"
46                 "--pithos PATH --archip VPATH --prefix PREFIX\n\n"
47                 "where:\n"
48                 "\tPATH: path to pithos data blocks\n"
49                 "\tVPATH: path to modified volume blocks\n"
50                 "\tPREFIX: Common prefix of Archipelagos objects to be"
51                 "striped during filesystem hierarchy creation\n"
52                );
53 }
54
55 /* fdcache_node flags */
56 #define READY (1 << 1)
57
58 /* fdcache node info */
59 struct fdcache_node {
60         volatile int fd;
61         volatile unsigned int ref;
62         volatile unsigned long time;
63         volatile unsigned int flags;
64         pthread_cond_t cond;
65         char target[MAX_FILENAME_SIZE + 1];
66 };
67
68 /* pfiled context */
69 struct pfiled {
70         uint32_t path_len;
71         uint32_t vpath_len;
72         uint32_t prefix_len;
73         uint64_t handled_reqs;
74         long maxfds;
75         struct fdcache_node *fdcache;
76         pthread_mutex_t cache_lock;
77         char path[MAX_PATH_SIZE + 1];
78         char vpath[MAX_PATH_SIZE + 1];
79         char prefix[MAX_PREFIX_LEN];
80 };
81
82 /*
83  * pfiled specific structure 
84  * containing information on a pending I/O operation
85  */
86 struct fio {
87         uint32_t state;
88         long fdcacheidx;
89 };
90
91 struct pfiled * __get_pfiled(struct peerd *peer)
92 {
93         return (struct pfiled *) peer->priv;
94 }
95
96 struct fio * __get_fio(struct peer_req *pr)
97 {
98         return (struct fio*) pr->priv;
99 }
100
101 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
102 {
103         struct pfiled *pfiled = __get_pfiled(peer);
104         struct fio *fio = __get_fio(pr);
105         int fd = -1;
106         if (fio->fdcacheidx >= 0) {
107                 if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) {
108                         pthread_mutex_lock(&pfiled->cache_lock);
109                         if (!pfiled->fdcache[fio->fdcacheidx].ref){
110                                 /* invalidate cache entry */
111                                 fd = pfiled->fdcache[fio->fdcacheidx].fd;
112                                 pfiled->fdcache[fio->fdcacheidx].fd = -1;
113                                 pfiled->fdcache[fio->fdcacheidx].target[0] = 0;
114                                 pfiled->fdcache[fio->fdcacheidx].flags |= READY;
115                         }
116                         pthread_mutex_unlock(&pfiled->cache_lock);
117                         if (fd > 0)
118                                 close(fd);
119
120                 }
121         }
122 }
123
124 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
125 {
126         close_cache_entry(peer, pr);
127         complete(peer, pr);
128 }
129
130 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
131 {
132         close_cache_entry(peer, pr);
133         fail(peer, pr);
134 }
135
136 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
137 {
138         XSEGLOG2(&lc, W, "unknown request op");
139         pfiled_fail(peer, pr);
140 }
141
142 static int create_path(char *buf, char *path, char *target, uint32_t targetlen,
143                 uint32_t prefixlen, int mkdirs)
144 {
145         int i;
146         struct stat st;
147         uint32_t pathlen = strlen(path);
148
149         strncpy(buf, path, pathlen);
150
151         for (i = 0; i < 9; i+= 3) {
152                 buf[pathlen + i] = target[prefixlen + i - (i/3)];
153                 buf[pathlen + i +1] = target[prefixlen + i + 1 - (i/3)];
154                 buf[pathlen + i + 2] = '/';
155                 if (mkdirs == 1) {
156                         buf[pathlen + i + 3] = '\0';
157 retry:
158                         if (stat(buf, &st) < 0) 
159                                 if (mkdir(buf, 0700) < 0) {
160                                         if (errno == EEXIST)
161                                                 goto retry;
162                                         perror(buf);
163                                         return errno;
164                                 }
165                 }
166         }
167
168         strncpy(&buf[pathlen + 9], target, targetlen);
169         buf[pathlen + 9 + targetlen] = '\0';
170
171         return 0;
172 }
173
174 static int dir_open(struct pfiled *pfiled, struct fio *io,
175                 char *target, uint32_t targetlen, int mode)
176 {
177         int fd = -1;
178         struct fdcache_node *ce = NULL;
179         long i, lru;
180         char tmp[pfiled->path_len + targetlen + 10];
181         uint64_t min;
182         io->fdcacheidx = -1;
183         if (targetlen> MAX_FILENAME_SIZE)
184                 goto out_err;
185
186 start:
187         /* check cache */
188         pthread_mutex_lock(&pfiled->cache_lock);
189 start_locked:
190         lru = -1;
191         min = UINT64_MAX;
192         for (i = 0; i < pfiled->maxfds; i++) {
193                 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
194                                 && (pfiled->fdcache[i].flags & READY)) {
195                         min = pfiled->fdcache[i].time;
196                         lru = i;
197
198                 }
199
200                 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
201                         if (pfiled->fdcache[i].target[targetlen] == 0) {
202                                 ce = &pfiled->fdcache[i];
203                                 /* if any other io thread is currently opening
204                                  * the file, block until it succeeds or fails
205                                  */
206                                 if (!(ce->flags & READY)) {
207                                         pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
208                                         /* when ready, restart lookup */
209                                         goto start_locked;
210                                 }
211                                 /* if successfully opened */
212                                 if (ce->fd > 0) {
213                                         fd = pfiled->fdcache[i].fd;
214                                         io->fdcacheidx = i;
215                                         goto out;
216                                 }
217                                 /* else open failed for the other io thread, so
218                                  * it should fail for everyone waiting on this
219                                  * file.
220                                  */
221                                 else {
222                                         fd = -1;
223                                         io->fdcacheidx = -1;
224                                         goto out_err_unlock;
225                                 }
226                         }
227                 }
228         }
229         if (lru < 0){
230                 /* all cache entries are currently being used */
231                 pthread_mutex_unlock(&pfiled->cache_lock);
232                 goto start;
233         }
234         if (pfiled->fdcache[lru].ref){
235                 fd = -1;
236                 XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
237                 goto out_err_unlock;
238         }
239         /* make room for new file */
240         ce = &pfiled->fdcache[lru];
241         /* set name here and state to not ready, for any other requests on the
242          * same target that may follow
243          */
244         strncpy(ce->target, target, targetlen);
245         ce->target[targetlen] = 0;
246         ce->flags &= ~READY;
247         pthread_mutex_unlock(&pfiled->cache_lock);
248
249         if (ce->fd >0){
250                 if (close(ce->fd) < 0){
251                         XSEGLOG2(&lc, W, "Cannot close %s", ce->target);
252                 }
253         }
254
255         /* try opening it from pithos blocker dir */
256         if (create_path(tmp, pfiled->path, target, targetlen, 0, 0) < 0) {
257                 fd = -1;
258                 goto new_entry;
259         }
260
261         fd = open(tmp, O_RDWR);
262         if (fd < 0) {
263                 /* try opening it from the tmp dir */
264                 if (create_path(tmp, pfiled->vpath, target, targetlen,
265                                                 pfiled->prefix_len,  0) < 0)
266                         goto new_entry;
267
268                 fd = open(tmp, O_RDWR);
269                 if (fd < 0)  {
270                         if (create_path(tmp, pfiled->vpath, target, targetlen,
271                                                 pfiled->prefix_len, 1) < 0) {
272                                 fd = -1;
273                                 goto new_entry;
274                         }
275
276                         fd = open(tmp, O_RDWR | O_CREAT, 0600);         
277                         if (fd < 0)
278                                 XSEGLOG2(&lc, E, "Cannot open %s", tmp);
279                 }
280         }
281
282         /* insert in cache a negative fd to indicate opening error to
283          * any other ios waiting for the file to open
284          */
285
286         /* insert in cache */
287 new_entry:
288         pthread_mutex_lock(&pfiled->cache_lock);
289         ce->fd = fd;
290         ce->ref = 0;
291         ce->flags = READY;
292         pthread_cond_broadcast(&ce->cond);
293         if (fd > 0) {
294                 io->fdcacheidx = lru;
295         }
296         else {
297                 io->fdcacheidx = -1;
298                 goto out_err_unlock;
299         }
300
301 out:
302         pfiled->handled_reqs++;
303         ce->time = pfiled->handled_reqs;
304         __sync_fetch_and_add(&ce->ref, 1);
305         pthread_mutex_unlock(&pfiled->cache_lock);
306 out_err:
307         return fd;
308
309 out_err_unlock:
310         pthread_mutex_unlock(&pfiled->cache_lock);
311         goto out_err;
312 }
313
314 static void handle_read_write(struct peerd *peer, struct peer_req *pr)
315 {
316         struct pfiled *pfiled = __get_pfiled(peer);
317         struct fio *fio = __get_fio(pr);
318         struct xseg_request *req = pr->req;
319         int r, fd;
320         char *target = xseg_get_target(peer->xseg, req);
321         char *data = xseg_get_data(peer->xseg, req);
322
323         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
324         if (fd < 0){
325                 XSEGLOG2(&lc, E, "Dir open failed");
326                 pfiled_fail(peer, pr);
327                 return;
328         }
329
330         if (!req->size) {
331                 if (req->flags & (XF_FLUSH | XF_FUA)) {
332                         /* No FLUSH/FUA support yet (O_SYNC ?).
333                          * note that with FLUSH/size == 0 
334                          * there will probably be a (uint64_t)-1 offset */
335                         pfiled_complete(peer, pr);
336                         return;
337                 } else {
338                         pfiled_complete(peer, pr);
339                         return;
340                 }
341         }
342
343         switch (req->op) {
344                 case X_READ:
345                         while (req->serviced < req->datalen) {
346                                 r = pread(fd, data + req->serviced, 
347                                                 req->datalen - req->serviced,
348                                                 req->offset + req->serviced);
349                                 if (r < 0) {
350                                         req->datalen = req->serviced;
351                                         XSEGLOG2(&lc, E, "Cannot read");
352                                 }
353                                 else if (r == 0) {
354                                         /* reached end of file. zero out the rest data buffer */
355                                         memset(data + req->serviced, 0, req->datalen - req->serviced);
356                                         req->serviced = req->datalen;
357                                 }
358                                 else {
359                                         req->serviced += r;
360                                 }
361                         }
362                         break;
363                 case X_WRITE:
364                         while (req->serviced < req->datalen) {
365                                 r = pwrite(fd, data + req->serviced, 
366                                                 req->datalen - req->serviced,
367                                                 req->offset + req->serviced);
368                                 if (r < 0) {
369                                         req->datalen = req->serviced;
370                                 }
371                                 else {
372                                         req->serviced += r;
373                                 }
374                         }
375                         r = fsync(fd);
376                         if (r< 0) {
377                                 XSEGLOG2(&lc, E, "Fsync failed.");
378                                 /* if fsync fails, then no bytes serviced correctly */
379                                 req->serviced = 0;
380                         }
381                         break;
382                 default:
383                         XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op);
384                         pfiled_fail(peer, pr);
385                         return;
386         }
387
388         if (req->serviced > 0 ) {
389                 pfiled_complete(peer, pr);
390         }
391         else {
392                 pfiled_fail(peer, pr);
393         }
394         return;
395 }
396
397 static void handle_info(struct peerd *peer, struct peer_req *pr)
398 {
399         struct pfiled *pfiled = __get_pfiled(peer);
400         struct fio *fio = __get_fio(pr);
401         struct xseg_request *req = pr->req;
402         struct stat stat;
403         int fd, r;
404         uint64_t size;
405         char *target = xseg_get_target(peer->xseg, req);
406         char *data = xseg_get_data(peer->xseg, req);
407         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
408
409         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
410         if (fd < 0) {
411                 XSEGLOG2(&lc, E, "Dir open failed");
412                 pfiled_fail(peer, pr);
413                 return;
414         }
415
416         r = fstat(fd, &stat);
417         if (r < 0) {
418                 XSEGLOG2(&lc, E, "fail in stat");
419                 pfiled_fail(peer, pr);
420                 return;
421         }
422
423         size = (uint64_t)stat.st_size;
424         xinfo->size = size;
425
426         pfiled_complete(peer, pr);
427 }
428
429 static void handle_copy(struct peerd *peer, struct peer_req *pr)
430 {
431         struct pfiled *pfiled = __get_pfiled(peer);
432         struct fio *fio = __get_fio(pr);
433         struct xseg_request *req = pr->req;
434         
435         char *target = xseg_get_target(peer->xseg, req);
436         char *data = xseg_get_data(peer->xseg, req);
437         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
438         struct stat st;
439         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
440         int n, src = -1, dst = -1, r = -1;
441
442         if (!buf){
443                 XSEGLOG2(&lc, E, "Out of memory");
444                 pfiled_fail(peer, pr);
445                 return;
446         }
447
448         dst = dir_open(pfiled, fio, target, req->targetlen, 1);
449         if (dst < 0) {
450                 XSEGLOG2(&lc, E, "Fail in dst");
451                 r = dst;
452                 goto out;
453         }
454
455         if (create_path(buf, pfiled->path, xcopy->target,
456                                         xcopy->targetlen, 0, 0) < 0)  {
457                 XSEGLOG2(&lc, E, "Create path failed");
458                 r = -1;
459                 goto out;
460         }
461
462         src = open(buf, O_RDWR);
463         if (src < 0) {
464                 XSEGLOG2(&lc, E, "fail in src %s", buf);
465                 r = src;
466                 goto out;
467         }
468
469         r = fstat(src, &st);
470         if (r < 0){
471                 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
472                 goto out;
473         }
474
475         n = sendfile(dst, src, 0, st.st_size);
476         if (n != st.st_size) {
477                 XSEGLOG2(&lc, E, "Copy failed for %s", buf);
478                 r = -1;
479                 goto out;
480         }
481         r = 0;
482
483 out:
484         if (src > 0)
485                 close(src);
486         free(buf);
487         if (r < 0)
488                 pfiled_fail(peer, pr);
489         else
490                 pfiled_complete(peer, pr);
491         return;
492 }
493
494 static void handle_delete(struct peerd *peer, struct peer_req *pr)
495 {
496         struct pfiled *pfiled = __get_pfiled(peer);
497         struct fio *fio = __get_fio(pr);
498         struct xseg_request *req = pr->req;
499         
500         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
501         int fd, r;
502         char *target = xseg_get_target(peer->xseg, req);
503         if (!buf){
504                 XSEGLOG2(&lc, E, "Out of memory");
505                 pfiled_fail(peer, pr);
506                 return;
507         }
508         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
509         if (fd < 0) {
510                 XSEGLOG2(&lc, E, "Dir open failed");
511                 r = fd;
512                 goto out;
513         }
514
515         /* mark cache entry as invalid 
516          * give a chance to pending operations on this file to end.
517          * file will close when all operations are done 
518          */
519         if (fio->fdcacheidx >= 0) {
520                 pthread_mutex_lock(&pfiled->cache_lock);
521                 pfiled->fdcache[fio->fdcacheidx].flags &= ~READY;
522                 pthread_mutex_unlock(&pfiled->cache_lock);
523         }
524
525         r = create_path(buf, pfiled->vpath, target, req->targetlen,
526                                 pfiled->prefix_len, 0);
527         if (r< 0) {
528                 XSEGLOG2(&lc, E, "Create path failed");
529                 goto out;
530         }
531         r = unlink(buf);
532 out:
533         free(buf);
534         if (r < 0)
535                 pfiled_fail(peer, pr);
536         else
537                 pfiled_complete(peer, pr);
538         return;
539 }
540
541 static void handle_open(struct peerd *peer, struct peer_req *pr)
542 {
543         struct pfiled *pfiled = __get_pfiled(peer);
544 //      struct fio *fio = __get_fio(pr);
545         struct xseg_request *req = pr->req;
546         char *buf = malloc(MAX_FILENAME_SIZE);
547         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
548         int fd = -1;
549         char *target = xseg_get_target(peer->xseg, req);
550
551         if (!buf || !pathname) {
552                 XSEGLOG2(&lc, E, "Out of memory");
553                 pfiled_fail(peer, pr);
554                 return;
555         }
556
557         strncpy(buf, target, req->targetlen);
558         strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
559
560         XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
561
562         if (create_path(pathname, pfiled->vpath, buf, 
563                         req->targetlen + strlen(LOCK_SUFFIX),
564                         pfiled->prefix_len, 1) < 0) {
565                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
566                 goto out;
567         }
568
569         //nfs v >= 3
570         while ((fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR)) < 0){
571                 //actual error
572                 if (errno != EEXIST){
573                         XSEGLOG2(&lc, W, "Error opening %s", pathname);
574                         goto out;
575                 }
576                 if (req->flags & XF_NOSYNC)
577                         goto out;
578                 sleep(1);
579         }
580         close(fd);
581 out:
582         free(buf);
583         free(pathname);
584         if (fd < 0){
585                 XSEGLOG2(&lc, I, "Failed to acquire lock %s", buf);
586                 pfiled_fail(peer, pr);
587         }
588         else{
589                 XSEGLOG2(&lc, I, "Acquired lock %s", buf);
590                 pfiled_complete(peer, pr);
591         }
592         return;
593 }
594
595 static void handle_close(struct peerd *peer, struct peer_req *pr)
596 {
597         struct pfiled *pfiled = __get_pfiled(peer);
598 //      struct fio *fio = __get_fio(pr);
599         struct xseg_request *req = pr->req;
600         char *buf = malloc(MAX_FILENAME_SIZE);
601         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
602         char *target = xseg_get_target(peer->xseg, req);
603         int r;
604
605         if (!buf || !pathname) {
606                 XSEGLOG2(&lc, E, "Out of memory");
607                 fail(peer, pr);
608                 return;
609         }
610
611         strncpy(buf, target, req->targetlen);
612         strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
613
614         r = create_path(pathname, pfiled->vpath, buf,
615                         req->targetlen + strlen(LOCK_SUFFIX),
616                         pfiled->prefix_len, 0);
617         if (r < 0) {
618                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
619                 goto out;
620         }
621         r = unlink(pathname);
622
623 out:
624         free(buf);
625         free(pathname);
626         if (r < 0)
627                 fail(peer, pr);
628         else
629                 complete(peer, pr);
630         return;
631 }
632
633 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
634                                 enum dispatch_reason reason)
635 {
636         struct fio *fio = __get_fio(pr);
637         if (reason == dispatch_accept){
638                 fio->fdcacheidx = -1;
639                 fio->state = XS_ACCEPTED;
640         }
641         
642         switch (req->op) {
643                 case X_READ:
644                 case X_WRITE:
645                         handle_read_write(peer, pr); break;
646                 case X_INFO:
647                         handle_info(peer, pr); break;
648                 case X_COPY:
649                         handle_copy(peer, pr); break;
650                 case X_DELETE:
651                         handle_delete(peer, pr); break;
652                 case X_OPEN:
653                         handle_open(peer, pr); break;
654                 case X_CLOSE:
655                         handle_close(peer, pr); break;
656                         //      case X_SNAPSHOT:
657                 case X_SYNC:
658                 default:
659                         handle_unknown(peer, pr);
660         }
661         return 0;
662 }
663
664 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
665 {
666         int ret = 0;
667         int i;
668         struct pfiled *pfiled = malloc(sizeof(struct pfiled));
669         if (!pfiled){
670                 XSEGLOG2(&lc, E, "Out of memory");
671                 ret = -ENOMEM;
672                 goto out;
673         }
674         peer->priv = pfiled;
675
676         pfiled->maxfds = 2 * peer->nr_ops;
677         pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
678         if(!pfiled->fdcache) {
679                 XSEGLOG2(&lc, E, "Out of memory");
680                 ret = -ENOMEM;
681                 goto out;
682         }
683
684         for (i = 0; i < peer->nr_ops; i++) {
685                 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
686                 if (!peer->peer_reqs->priv){
687                         XSEGLOG2(&lc, E, "Out of memory");
688                         ret = -ENOMEM;
689                         goto out;
690                 }
691         }
692
693         pfiled->vpath[0] = 0;
694         pfiled->path[0] = 0;
695         pfiled->handled_reqs = 0;
696         /*
697         for (i = 0; i < argc; i++) {
698                 if (!strcmp(argv[i], "--pithos") && (i+1) < argc){
699                         strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE);
700                         pfiled->path[MAX_PATH_SIZE] = 0;
701                         i += 1;
702                         continue;
703                 }
704                 if (!strcmp(argv[i], "--archip") && (i+1) < argc){
705                         strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE);
706                         pfiled->vpath[MAX_PATH_SIZE] = 0;
707                         i += 1;
708                         continue;
709                 }
710                 if (!strcmp(argv[i], "--prefix") && (i+1) < argc){
711                         strncpy(pfiled->prefix, argv[i+1], MAX_PREFIX_LEN);
712                         pfiled->prefix[MAX_PREFIX_LEN] = 0;
713                         i += 1;
714                         continue;
715                 }
716         }
717         */
718         BEGIN_READ_ARGS(argc, argv);
719         READ_ARG_STRING("--pithos", pfiled->path, MAX_PATH_SIZE);
720         READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
721         READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
722         END_READ_ARGS();
723
724
725         pfiled->prefix_len = strlen(pfiled->prefix);
726
727         //TODO test path exist
728         pfiled->path_len = strlen(pfiled->path);
729         if (!pfiled->path_len){
730                 XSEGLOG2(&lc, E, "Pithos path was not provided");
731                 return -1;
732         }
733         if (pfiled->path[pfiled->path_len -1] != '/'){
734                 pfiled->path[pfiled->path_len] = '/';
735                 pfiled->path[++pfiled->path_len]= 0;
736         }
737
738         pfiled->vpath_len = strlen(pfiled->vpath);
739         if (!pfiled->vpath_len){
740                 XSEGLOG2(&lc, E, "Archipelagos path was not provided");
741                 return -1;
742         }
743         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
744                 pfiled->vpath[pfiled->vpath_len] = '/';
745                 pfiled->vpath[++pfiled->vpath_len]= 0;
746         }
747
748         for (i = 0; i < peer->nr_ops; i++) {
749                 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
750                 pfiled->fdcache[i].flags = READY;
751         }
752         pthread_mutex_init(&pfiled->cache_lock, NULL);
753
754 out:
755         return ret;
756 }
757
758 void custom_peer_finalize(struct peerd *peer)
759 {
760         return;
761 }
762
763 static int safe_atoi(char *s)
764 {
765         long l;
766         char *endp;
767
768         l = strtol(s, &endp, 10);
769         if (s != endp && *endp == '\0')
770                 return l;
771         else
772                 return -1;
773 }