mapperd: Always sleep before checking request state.
[archipelago] / xseg / peers / user / mt-pfiled.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 /*
36  * The Pithos File Blocker Peer (pfiled)
37  */
38
39 #define _GNU_SOURCE
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <unistd.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <aio.h>
49 #include <signal.h>
50 #include <limits.h>
51 #include <pthread.h>
52 #include <syscall.h>
53 #include <sys/sendfile.h>
54 #include <peer.h>
55
56 #include <xseg/xseg.h>
57 #include <xseg/protocol.h>
58
59 #define LOCK_SUFFIX             "_lock"
60 #define MAX_PATH_SIZE           1024
61 #define MAX_FILENAME_SIZE       (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX)
62 #define MAX_PREFIX_LEN          10
63
64 /* default concurrency level (number of threads) */
65 #define DEFAULT_NR_OPS           16
66
67 /* Pithos hash for the zero block
68  * FIXME: Should it be hardcoded?
69  */
70 #define ZERO_BLOCK \
71         "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
72
73 /*
74  * Globals, holding command-line arguments
75  */
76
77 void custom_peer_usage(char *argv0)
78 {
79         fprintf(stderr, "Custom peer options:\n"
80                 "--pithos PATH --archip VPATH --prefix PREFIX\n\n"
81                 "where:\n"
82                 "\tPATH: path to pithos data blocks\n"
83                 "\tVPATH: path to modified volume blocks\n"
84                 "\tPREFIX: Common prefix of Archipelagos objects to be"
85                 "striped during filesystem hierarchy creation\n"
86                );
87 }
88
89 /* fdcache_node flags */
90 #define READY (1 << 1)
91
92 /* fdcache node info */
93 struct fdcache_node {
94         volatile int fd;
95         volatile unsigned int ref;
96         volatile unsigned long time;
97         volatile unsigned int flags;
98         pthread_cond_t cond;
99         char target[MAX_FILENAME_SIZE + 1];
100 };
101
102 /* pfiled context */
103 struct pfiled {
104         uint32_t path_len;
105         uint32_t vpath_len;
106         uint32_t prefix_len;
107         uint64_t handled_reqs;
108         long maxfds;
109         struct fdcache_node *fdcache;
110         pthread_mutex_t cache_lock;
111         char path[MAX_PATH_SIZE + 1];
112         char vpath[MAX_PATH_SIZE + 1];
113         char prefix[MAX_PREFIX_LEN];
114 };
115
116 /*
117  * pfiled specific structure 
118  * containing information on a pending I/O operation
119  */
120 struct fio {
121         uint32_t state;
122         long fdcacheidx;
123 };
124
125 struct pfiled * __get_pfiled(struct peerd *peer)
126 {
127         return (struct pfiled *) peer->priv;
128 }
129
130 struct fio * __get_fio(struct peer_req *pr)
131 {
132         return (struct fio*) pr->priv;
133 }
134
135 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
136 {
137         struct pfiled *pfiled = __get_pfiled(peer);
138         struct fio *fio = __get_fio(pr);
139         int fd = -1;
140         if (fio->fdcacheidx >= 0) {
141                 if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) {
142                         pthread_mutex_lock(&pfiled->cache_lock);
143                         if (!pfiled->fdcache[fio->fdcacheidx].ref){
144                                 /* invalidate cache entry */
145                                 fd = pfiled->fdcache[fio->fdcacheidx].fd;
146                                 pfiled->fdcache[fio->fdcacheidx].fd = -1;
147                                 pfiled->fdcache[fio->fdcacheidx].target[0] = 0;
148                                 pfiled->fdcache[fio->fdcacheidx].flags |= READY;
149                         }
150                         pthread_mutex_unlock(&pfiled->cache_lock);
151                         if (fd > 0)
152                                 close(fd);
153
154                 }
155         }
156 }
157
158 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
159 {
160         close_cache_entry(peer, pr);
161         complete(peer, pr);
162 }
163
164 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
165 {
166         close_cache_entry(peer, pr);
167         fail(peer, pr);
168 }
169
170 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
171 {
172         XSEGLOG2(&lc, W, "unknown request op");
173         pfiled_fail(peer, pr);
174 }
175
176 static int create_path(char *buf, char *path, char *target, uint32_t targetlen,
177                 uint32_t prefixlen, int mkdirs)
178 {
179         int i;
180         struct stat st;
181         uint32_t pathlen = strlen(path);
182
183         strncpy(buf, path, pathlen);
184
185         for (i = 0; i < 9; i+= 3) {
186                 buf[pathlen + i] = target[prefixlen + i - (i/3)];
187                 buf[pathlen + i +1] = target[prefixlen + i + 1 - (i/3)];
188                 buf[pathlen + i + 2] = '/';
189                 if (mkdirs == 1) {
190                         buf[pathlen + i + 3] = '\0';
191 retry:
192                         if (stat(buf, &st) < 0) 
193                                 if (mkdir(buf, 0700) < 0) {
194                                         if (errno == EEXIST)
195                                                 goto retry;
196                                         perror(buf);
197                                         return errno;
198                                 }
199                 }
200         }
201
202         strncpy(&buf[pathlen + 9], target, targetlen);
203         buf[pathlen + 9 + targetlen] = '\0';
204
205         return 0;
206 }
207
208 static int dir_open(struct pfiled *pfiled, struct fio *io,
209                 char *target, uint32_t targetlen, int mode)
210 {
211         int fd = -1;
212         struct fdcache_node *ce = NULL;
213         long i, lru;
214         char tmp[pfiled->path_len + targetlen + 10];
215         uint64_t min;
216         io->fdcacheidx = -1;
217         if (targetlen> MAX_FILENAME_SIZE)
218                 goto out_err;
219
220 start:
221         /* check cache */
222         pthread_mutex_lock(&pfiled->cache_lock);
223 start_locked:
224         lru = -1;
225         min = UINT64_MAX;
226         for (i = 0; i < pfiled->maxfds; i++) {
227                 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time 
228                                 && (pfiled->fdcache[i].flags & READY)) {
229                         min = pfiled->fdcache[i].time;
230                         lru = i;
231
232                 }
233
234                 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
235                         if (pfiled->fdcache[i].target[targetlen] == 0) {
236                                 ce = &pfiled->fdcache[i];
237                                 /* if any other io thread is currently opening
238                                  * the file, block until it succeeds or fails
239                                  */
240                                 if (!(ce->flags & READY)) {
241                                         pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
242                                         /* when ready, restart lookup */
243                                         goto start_locked;
244                                 }
245                                 /* if successfully opened */
246                                 if (ce->fd > 0) {
247                                         fd = pfiled->fdcache[i].fd;
248                                         io->fdcacheidx = i;
249                                         goto out;
250                                 }
251                                 /* else open failed for the other io thread, so
252                                  * it should fail for everyone waiting on this
253                                  * file.
254                                  */
255                                 else {
256                                         fd = -1;
257                                         io->fdcacheidx = -1;
258                                         goto out_err_unlock;
259                                 }
260                         }
261                 }
262         }
263         if (lru < 0){
264                 /* all cache entries are currently being used */
265                 pthread_mutex_unlock(&pfiled->cache_lock);
266                 goto start;
267         }
268         if (pfiled->fdcache[lru].ref){
269                 fd = -1;
270                 XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
271                 goto out_err_unlock;
272         }
273         /* make room for new file */
274         ce = &pfiled->fdcache[lru];
275         /* set name here and state to not ready, for any other requests on the
276          * same target that may follow
277          */
278         strncpy(ce->target, target, targetlen);
279         ce->target[targetlen] = 0;
280         ce->flags &= ~READY;
281         pthread_mutex_unlock(&pfiled->cache_lock);
282
283         if (ce->fd >0){
284                 if (close(ce->fd) < 0){
285                         XSEGLOG2(&lc, W, "Cannot close %s", ce->target);
286                 }
287         }
288
289         /* try opening it from pithos blocker dir */
290         if (create_path(tmp, pfiled->path, target, targetlen, 0, 0) < 0) {
291                 fd = -1;
292                 goto new_entry;
293         }
294
295         fd = open(tmp, O_RDWR);
296         if (fd < 0) {
297                 /* try opening it from the tmp dir */
298                 if (create_path(tmp, pfiled->vpath, target, targetlen,
299                                                 pfiled->prefix_len,  0) < 0)
300                         goto new_entry;
301
302                 fd = open(tmp, O_RDWR);
303                 if (fd < 0)  {
304                         if (create_path(tmp, pfiled->vpath, target, targetlen,
305                                                 pfiled->prefix_len, 1) < 0) {
306                                 fd = -1;
307                                 goto new_entry;
308                         }
309
310                         fd = open(tmp, O_RDWR | O_CREAT, 0600);         
311                         if (fd < 0)
312                                 XSEGLOG2(&lc, E, "Cannot open %s", tmp);
313                 }
314         }
315
316         /* insert in cache a negative fd to indicate opening error to
317          * any other ios waiting for the file to open
318          */
319
320         /* insert in cache */
321 new_entry:
322         pthread_mutex_lock(&pfiled->cache_lock);
323         ce->fd = fd;
324         ce->ref = 0;
325         ce->flags = READY;
326         pthread_cond_broadcast(&ce->cond);
327         if (fd > 0) {
328                 io->fdcacheidx = lru;
329         }
330         else {
331                 io->fdcacheidx = -1;
332                 goto out_err_unlock;
333         }
334
335 out:
336         pfiled->handled_reqs++;
337         ce->time = pfiled->handled_reqs;
338         __sync_fetch_and_add(&ce->ref, 1);
339         pthread_mutex_unlock(&pfiled->cache_lock);
340 out_err:
341         return fd;
342
343 out_err_unlock:
344         pthread_mutex_unlock(&pfiled->cache_lock);
345         goto out_err;
346 }
347
348 static void handle_read_write(struct peerd *peer, struct peer_req *pr)
349 {
350         struct pfiled *pfiled = __get_pfiled(peer);
351         struct fio *fio = __get_fio(pr);
352         struct xseg_request *req = pr->req;
353         int r, fd;
354         char *target = xseg_get_target(peer->xseg, req);
355         char *data = xseg_get_data(peer->xseg, req);
356
357         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
358         if (fd < 0){
359                 XSEGLOG2(&lc, E, "Dir open failed");
360                 pfiled_fail(peer, pr);
361                 return;
362         }
363
364         if (!req->size) {
365                 if (req->flags & (XF_FLUSH | XF_FUA)) {
366                         /* No FLUSH/FUA support yet (O_SYNC ?).
367                          * note that with FLUSH/size == 0 
368                          * there will probably be a (uint64_t)-1 offset */
369                         pfiled_complete(peer, pr);
370                         return;
371                 } else {
372                         pfiled_complete(peer, pr);
373                         return;
374                 }
375         }
376
377         switch (req->op) {
378                 case X_READ:
379                         while (req->serviced < req->datalen) {
380                                 r = pread(fd, data + req->serviced, 
381                                                 req->datalen - req->serviced,
382                                                 req->offset + req->serviced);
383                                 if (r < 0) {
384                                         req->datalen = req->serviced;
385                                         XSEGLOG2(&lc, E, "Cannot read");
386                                 }
387                                 else if (r == 0) {
388                                         /* reached end of file. zero out the rest data buffer */
389                                         memset(data + req->serviced, 0, req->datalen - req->serviced);
390                                         req->serviced = req->datalen;
391                                 }
392                                 else {
393                                         req->serviced += r;
394                                 }
395                         }
396                         break;
397                 case X_WRITE:
398                         while (req->serviced < req->datalen) {
399                                 r = pwrite(fd, data + req->serviced, 
400                                                 req->datalen - req->serviced,
401                                                 req->offset + req->serviced);
402                                 if (r < 0) {
403                                         req->datalen = req->serviced;
404                                 }
405                                 else {
406                                         req->serviced += r;
407                                 }
408                         }
409                         r = fsync(fd);
410                         if (r< 0) {
411                                 XSEGLOG2(&lc, E, "Fsync failed.");
412                                 /* if fsync fails, then no bytes serviced correctly */
413                                 req->serviced = 0;
414                         }
415                         break;
416                 default:
417                         XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op);
418                         pfiled_fail(peer, pr);
419                         return;
420         }
421
422         if (req->serviced > 0 ) {
423                 pfiled_complete(peer, pr);
424         }
425         else {
426                 pfiled_fail(peer, pr);
427         }
428         return;
429 }
430
431 static void handle_info(struct peerd *peer, struct peer_req *pr)
432 {
433         struct pfiled *pfiled = __get_pfiled(peer);
434         struct fio *fio = __get_fio(pr);
435         struct xseg_request *req = pr->req;
436         struct stat stat;
437         int fd, r;
438         uint64_t size;
439         char *target = xseg_get_target(peer->xseg, req);
440         char *data = xseg_get_data(peer->xseg, req);
441         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
442
443         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
444         if (fd < 0) {
445                 XSEGLOG2(&lc, E, "Dir open failed");
446                 pfiled_fail(peer, pr);
447                 return;
448         }
449
450         r = fstat(fd, &stat);
451         if (r < 0) {
452                 XSEGLOG2(&lc, E, "fail in stat");
453                 pfiled_fail(peer, pr);
454                 return;
455         }
456
457         size = (uint64_t)stat.st_size;
458         xinfo->size = size;
459
460         pfiled_complete(peer, pr);
461 }
462
463 static void handle_copy(struct peerd *peer, struct peer_req *pr)
464 {
465         struct pfiled *pfiled = __get_pfiled(peer);
466         struct fio *fio = __get_fio(pr);
467         struct xseg_request *req = pr->req;
468         
469         char *target = xseg_get_target(peer->xseg, req);
470         char *data = xseg_get_data(peer->xseg, req);
471         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
472         struct stat st;
473         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
474         int n, src = -1, dst = -1, r = -1;
475
476         if (!buf){
477                 XSEGLOG2(&lc, E, "Out of memory");
478                 pfiled_fail(peer, pr);
479                 return;
480         }
481
482         dst = dir_open(pfiled, fio, target, req->targetlen, 1);
483         if (dst < 0) {
484                 XSEGLOG2(&lc, E, "Fail in dst");
485                 r = dst;
486                 goto out;
487         }
488
489         if (create_path(buf, pfiled->path, xcopy->target,
490                                         xcopy->targetlen, 0, 0) < 0)  {
491                 XSEGLOG2(&lc, E, "Create path failed");
492                 r = -1;
493                 goto out;
494         }
495
496         src = open(buf, O_RDWR);
497         if (src < 0) {
498                 XSEGLOG2(&lc, E, "fail in src %s", buf);
499                 r = src;
500                 goto out;
501         }
502
503         r = fstat(src, &st);
504         if (r < 0){
505                 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
506                 goto out;
507         }
508
509         n = sendfile(dst, src, 0, st.st_size);
510         if (n != st.st_size) {
511                 XSEGLOG2(&lc, E, "Copy failed for %s", buf);
512                 r = -1;
513                 goto out;
514         }
515         r = 0;
516
517 out:
518         if (src > 0)
519                 close(src);
520         free(buf);
521         if (r < 0)
522                 pfiled_fail(peer, pr);
523         else
524                 pfiled_complete(peer, pr);
525         return;
526 }
527
528 static void handle_delete(struct peerd *peer, struct peer_req *pr)
529 {
530         struct pfiled *pfiled = __get_pfiled(peer);
531         struct fio *fio = __get_fio(pr);
532         struct xseg_request *req = pr->req;
533         
534         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
535         int fd, r;
536         char *target = xseg_get_target(peer->xseg, req);
537         if (!buf){
538                 XSEGLOG2(&lc, E, "Out of memory");
539                 pfiled_fail(peer, pr);
540                 return;
541         }
542         fd = dir_open(pfiled, fio, target, req->targetlen, 0);
543         if (fd < 0) {
544                 XSEGLOG2(&lc, E, "Dir open failed");
545                 r = fd;
546                 goto out;
547         }
548
549         /* mark cache entry as invalid 
550          * give a chance to pending operations on this file to end.
551          * file will close when all operations are done 
552          */
553         if (fio->fdcacheidx >= 0) {
554                 pthread_mutex_lock(&pfiled->cache_lock);
555                 pfiled->fdcache[fio->fdcacheidx].flags &= ~READY;
556                 pthread_mutex_unlock(&pfiled->cache_lock);
557         }
558
559         r = create_path(buf, pfiled->vpath, target, req->targetlen,
560                                 pfiled->prefix_len, 0);
561         if (r< 0) {
562                 XSEGLOG2(&lc, E, "Create path failed");
563                 goto out;
564         }
565         r = unlink(buf);
566 out:
567         free(buf);
568         if (r < 0)
569                 pfiled_fail(peer, pr);
570         else
571                 pfiled_complete(peer, pr);
572         return;
573 }
574
575 static void handle_acquire(struct peerd *peer, struct peer_req *pr)
576 {
577         struct pfiled *pfiled = __get_pfiled(peer);
578 //      struct fio *fio = __get_fio(pr);
579         struct xseg_request *req = pr->req;
580         char *buf = malloc(MAX_FILENAME_SIZE);
581         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
582         int fd = -1;
583         char *target = xseg_get_target(peer->xseg, req);
584
585         if (!buf || !pathname) {
586                 XSEGLOG2(&lc, E, "Out of memory");
587                 pfiled_fail(peer, pr);
588                 return;
589         }
590
591         strncpy(buf, target, req->targetlen);
592         strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
593
594         XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
595
596         if (create_path(pathname, pfiled->vpath, buf, 
597                         req->targetlen + strlen(LOCK_SUFFIX),
598                         pfiled->prefix_len, 1) < 0) {
599                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
600                 goto out;
601         }
602
603         //nfs v >= 3
604         while ((fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR)) < 0){
605                 //actual error
606                 if (errno != EEXIST){
607                         XSEGLOG2(&lc, W, "Error opening %s", pathname);
608                         goto out;
609                 }
610                 if (req->flags & XF_NOSYNC)
611                         goto out;
612                 sleep(1);
613         }
614         close(fd);
615 out:
616         free(buf);
617         free(pathname);
618         if (fd < 0){
619                 XSEGLOG2(&lc, I, "Failed to acquire lock %s", buf);
620                 pfiled_fail(peer, pr);
621         }
622         else{
623                 XSEGLOG2(&lc, I, "Acquired lock %s", buf);
624                 pfiled_complete(peer, pr);
625         }
626         return;
627 }
628
629 static void handle_release(struct peerd *peer, struct peer_req *pr)
630 {
631         struct pfiled *pfiled = __get_pfiled(peer);
632 //      struct fio *fio = __get_fio(pr);
633         struct xseg_request *req = pr->req;
634         char *buf = malloc(MAX_FILENAME_SIZE);
635         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
636         char *target = xseg_get_target(peer->xseg, req);
637         int r;
638
639         if (!buf || !pathname) {
640                 XSEGLOG2(&lc, E, "Out of memory");
641                 fail(peer, pr);
642                 return;
643         }
644
645         strncpy(buf, target, req->targetlen);
646         strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
647
648         r = create_path(pathname, pfiled->vpath, buf,
649                         req->targetlen + strlen(LOCK_SUFFIX),
650                         pfiled->prefix_len, 0);
651         if (r < 0) {
652                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
653                 goto out;
654         }
655         r = unlink(pathname);
656
657 out:
658         free(buf);
659         free(pathname);
660         if (r < 0)
661                 fail(peer, pr);
662         else
663                 complete(peer, pr);
664         return;
665 }
666
667 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
668                                 enum dispatch_reason reason)
669 {
670         struct fio *fio = __get_fio(pr);
671         if (reason == dispatch_accept){
672                 fio->fdcacheidx = -1;
673                 fio->state = XS_ACCEPTED;
674         }
675         
676         switch (req->op) {
677                 case X_READ:
678                 case X_WRITE:
679                         handle_read_write(peer, pr); break;
680                 case X_INFO:
681                         handle_info(peer, pr); break;
682                 case X_COPY:
683                         handle_copy(peer, pr); break;
684                 case X_DELETE:
685                         handle_delete(peer, pr); break;
686                 case X_ACQUIRE:
687                         handle_acquire(peer, pr); break;
688                 case X_CLOSE:
689                         handle_release(peer, pr); break;
690                         //      case X_SNAPSHOT:
691                 case X_SYNC:
692                 default:
693                         handle_unknown(peer, pr);
694         }
695         return 0;
696 }
697
698 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
699 {
700         int ret = 0;
701         int i;
702         struct pfiled *pfiled = malloc(sizeof(struct pfiled));
703         if (!pfiled){
704                 XSEGLOG2(&lc, E, "Out of memory");
705                 ret = -ENOMEM;
706                 goto out;
707         }
708         peer->priv = pfiled;
709
710         pfiled->maxfds = 2 * peer->nr_ops;
711         pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
712         if(!pfiled->fdcache) {
713                 XSEGLOG2(&lc, E, "Out of memory");
714                 ret = -ENOMEM;
715                 goto out;
716         }
717
718         for (i = 0; i < peer->nr_ops; i++) {
719                 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
720                 if (!peer->peer_reqs->priv){
721                         XSEGLOG2(&lc, E, "Out of memory");
722                         ret = -ENOMEM;
723                         goto out;
724                 }
725         }
726
727         pfiled->vpath[0] = 0;
728         pfiled->path[0] = 0;
729         pfiled->handled_reqs = 0;
730         /*
731         for (i = 0; i < argc; i++) {
732                 if (!strcmp(argv[i], "--pithos") && (i+1) < argc){
733                         strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE);
734                         pfiled->path[MAX_PATH_SIZE] = 0;
735                         i += 1;
736                         continue;
737                 }
738                 if (!strcmp(argv[i], "--archip") && (i+1) < argc){
739                         strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE);
740                         pfiled->vpath[MAX_PATH_SIZE] = 0;
741                         i += 1;
742                         continue;
743                 }
744                 if (!strcmp(argv[i], "--prefix") && (i+1) < argc){
745                         strncpy(pfiled->prefix, argv[i+1], MAX_PREFIX_LEN);
746                         pfiled->prefix[MAX_PREFIX_LEN] = 0;
747                         i += 1;
748                         continue;
749                 }
750         }
751         */
752         BEGIN_READ_ARGS(argc, argv);
753         READ_ARG_STRING("--pithos", pfiled->path, MAX_PATH_SIZE);
754         READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
755         READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
756         END_READ_ARGS();
757
758
759         pfiled->prefix_len = strlen(pfiled->prefix);
760
761         //TODO test path exist
762         pfiled->path_len = strlen(pfiled->path);
763         if (!pfiled->path_len){
764                 XSEGLOG2(&lc, E, "Pithos path was not provided");
765                 usage(argv[0]);
766                 return -1;
767         }
768         if (pfiled->path[pfiled->path_len -1] != '/'){
769                 pfiled->path[pfiled->path_len] = '/';
770                 pfiled->path[++pfiled->path_len]= 0;
771         }
772
773         pfiled->vpath_len = strlen(pfiled->vpath);
774         if (!pfiled->vpath_len){
775                 XSEGLOG2(&lc, E, "Archipelagos path was not provided");
776                 usage(argv[0]);
777                 return -1;
778         }
779         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
780                 pfiled->vpath[pfiled->vpath_len] = '/';
781                 pfiled->vpath[++pfiled->vpath_len]= 0;
782         }
783
784         for (i = 0; i < peer->nr_ops; i++) {
785                 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
786                 pfiled->fdcache[i].flags = READY;
787         }
788         pthread_mutex_init(&pfiled->cache_lock, NULL);
789
790 out:
791         return ret;
792 }
793
794 void custom_peer_finalize(struct peerd *peer)
795 {
796         return;
797 }
798
799 /*
800 static int safe_atoi(char *s)
801 {
802         long l;
803         char *endp;
804
805         l = strtol(s, &endp, 10);
806         if (s != endp && *endp == '\0')
807                 return l;
808         else
809                 return -1;
810 }
811 */