filed: Respect req->size for copy requests
[archipelago] / xseg / peers / user / filed.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 /*
36  * The Pithos File Blocker Peer (pfiled)
37  */
38
39 #define _GNU_SOURCE
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <unistd.h>
45 #include <string.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <signal.h>
49 #include <limits.h>
50 #include <pthread.h>
51 #include <syscall.h>
52 #include <sys/sendfile.h>
53 #include <peer.h>
54 #include <xtypes/xcache.h>
55 #include <openssl/sha.h>
56 #include <sys/resource.h>
57
58 #include <xseg/xseg.h>
59 #include <xseg/protocol.h>
60
61 #include <hash.h>
62
63 #define FIO_STR_ID_LEN          3
64 #define LOCK_SUFFIX             "_lock"
65 #define LOCK_SUFFIX_LEN         5
66 #define HASH_SUFFIX             "_hash"
67 #define HASH_SUFFIX_LEN         5
68 #define MAX_PATH_SIZE           1024
69 #define MAX_FILENAME_SIZE       (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + MAX_UNIQUESTR_LEN + FIO_STR_ID_LEN)
70 #define MAX_PREFIX_LEN          10
71 #define MAX_UNIQUESTR_LEN       128
72 #define SNAP_SUFFIX             "_snap"
73 #define SNAP_SUFFIX_LEN         5
74
75 #define WRITE 1
76 #define READ 2
77
78 #define min(_a, _b) (_a < _b ? _a : _b)
79
80 /*
81  * Globals, holding command-line arguments
82  */
83
84 void custom_peer_usage(char *argv0)
85 {
86          fprintf(stderr, "General peer options:\n"
87                 "  Option        | Default    | \n"
88                 "  --------------------------------------------\n"
89                 "    --fdcache   | 2 * nr_ops | Fd cache size\n"
90                 "    --archip    | None       | Archipelago directory\n"
91                 "    --prefix    | None       | Common prefix of objects that should be stripped\n"
92                 "    --uniquestr | None       | Unique string for this instance\n"
93                 "\n"
94                );
95 }
96
97 /* fdcache_node flags */
98 #define READY (1 << 1)
99
100 /* fdcache node info */
101 struct fdcache_entry {
102         volatile int fd;
103         volatile unsigned int flags;
104 };
105
106 /* pfiled context */
107 struct pfiled {
108         uint32_t vpath_len;
109         uint32_t prefix_len;
110         uint32_t uniquestr_len;
111         long maxfds;
112         char vpath[MAX_PATH_SIZE + 1];
113         char prefix[MAX_PREFIX_LEN + 1];
114         char uniquestr[MAX_UNIQUESTR_LEN + 1];
115         struct xcache cache;
116 };
117
118 /*
119  * pfiled specific structure
120  * containing information on a pending I/O operation
121  */
122 struct fio {
123         uint32_t state;
124         xcache_handler h;
125         char str_id[FIO_STR_ID_LEN];
126 };
127
128 struct pfiled * __get_pfiled(struct peerd *peer)
129 {
130         return (struct pfiled *) peer->priv;
131 }
132
133 struct fio * __get_fio(struct peer_req *pr)
134 {
135         return (struct fio*) pr->priv;
136 }
137
138
139 /* cache ops */
140 static void * cache_node_init(void *p, void *xh)
141 {
142         //struct peerd *peer = (struct peerd *)p;
143         //struct pfiled *pfiled = __get_pfiled(peer);
144         xcache_handler h = *(xcache_handler *)(xh);
145         struct fdcache_entry *fdentry = malloc(sizeof(struct fdcache_entry));
146         if (!fdentry)
147                 return NULL;
148
149         XSEGLOG2(&lc, D, "Initialing node h: %llu with %p",
150                         (long long unsigned)h, fdentry);
151
152         fdentry->fd = -1;
153         fdentry->flags = 0;
154
155         return fdentry;
156 }
157
158 static int cache_init(void *p, void *e)
159 {
160         struct fdcache_entry *fdentry = (struct fdcache_entry *)e;
161
162         if (fdentry->fd != -1) {
163                 XSEGLOG2(&lc, E, "Found invalid fd %d", fdentry->fd);
164                 return -1;
165         }
166
167         return 0;
168 }
169
170 static void cache_put(void *p, void *e)
171 {
172         struct fdcache_entry *fdentry = (struct fdcache_entry *)e;
173
174         XSEGLOG2(&lc, D, "Putting entry %p with fd %d", fdentry, fdentry->fd);
175
176         if (fdentry->fd != -1)
177                 close(fdentry->fd);
178
179         fdentry->fd = -1;
180         fdentry->flags = 0;
181         return;
182 }
183
184 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
185 {
186         struct pfiled *pfiled = __get_pfiled(peer);
187         struct fio *fio = __get_fio(pr);
188         if (fio->h != NoEntry)
189                 xcache_put(&pfiled->cache, fio->h);
190 }
191
192 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
193 {
194         close_cache_entry(peer, pr);
195         complete(peer, pr);
196 }
197
198 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
199 {
200         close_cache_entry(peer, pr);
201         fail(peer, pr);
202 }
203
204 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
205 {
206         XSEGLOG2(&lc, W, "unknown request op");
207         pfiled_fail(peer, pr);
208 }
209
210 static void get_dirs(char buf[6], struct pfiled *pfiled, char *target, uint32_t targetlen)
211 {
212         unsigned char sha[SHA256_DIGEST_SIZE];
213         char hex[HEXLIFIED_SHA256_DIGEST_SIZE];
214         char *prefix = pfiled->prefix;
215         uint32_t prefixlen = pfiled->prefix_len;
216
217         if (strncmp(target, prefix, prefixlen)) {
218                 strncpy(buf, target, 6);
219                 return;
220         }
221
222         SHA256((unsigned char *)target, targetlen, sha);
223         hexlify(sha, 3, hex);
224         strncpy(buf, hex, 6);
225         return;
226 }
227
228 static int create_path(char *buf, struct pfiled *pfiled, char *target,
229                         uint32_t targetlen, int mkdirs)
230 {
231         int i;
232         struct stat st;
233         char dirs[6];
234         char *path = pfiled->vpath;
235         uint32_t pathlen = pfiled->vpath_len;
236
237         get_dirs(dirs, pfiled, target, targetlen);
238
239         strncpy(buf, path, pathlen);
240
241         for (i = 0; i < 9; i+= 3) {
242                 buf[pathlen + i] = dirs[i - (i/3)];
243                 buf[pathlen + i +1] = dirs[i + 1 - (i/3)];
244                 buf[pathlen + i + 2] = '/';
245                 if (mkdirs == 1) {
246                         buf[pathlen + i + 3] = '\0';
247 retry:
248                         if (stat(buf, &st) < 0) 
249                                 if (mkdir(buf, 0750) < 0) {
250                                         if (errno == EEXIST)
251                                                 goto retry;
252                                         //perror(buf);
253                                         return -1;
254                                 }
255                 }
256         }
257
258         strncpy(&buf[pathlen + 9], target, targetlen);
259         buf[pathlen + 9 + targetlen] = '\0';
260
261         return 0;
262 }
263
264 static int is_target_valid_len(struct pfiled *pfiled, char *target,
265                 uint32_t targetlen, int mode)
266 {
267         if (targetlen > XSEG_MAX_TARGETLEN) {
268                 XSEGLOG2(&lc, E, "Invalid targetlen %u, max: %u",
269                                 targetlen, XSEG_MAX_TARGETLEN);
270                 return -1;
271         }
272         if (mode == WRITE || mode == READ) {
273                 /*
274                  * if name starts with prefix
275                  *      assert targetlen >= prefix_len + 6
276                  * else
277                  *      assert targetlen >= 6
278                  */
279                 /* 6 chars are needed for the directory structrure */
280                 if (!pfiled->prefix_len || strncmp(target, pfiled->prefix, pfiled->prefix_len)) {
281                         if (targetlen < 6) {
282                                 XSEGLOG2(&lc, E, "Targetlen should be at least 6");
283                                 return -1;
284                         }
285                 } else {
286                         if (targetlen < pfiled->prefix_len + 6) {
287                                 XSEGLOG2(&lc, E, "Targetlen should be at least prefix "
288                                                 "len(%u) + 6", pfiled->prefix_len);
289                                 return -1;
290                         }
291                 }
292         } else {
293                 XSEGLOG2(&lc, E, "Invalid mode");
294                 return -1;
295         }
296
297         return 0;
298 }
299
300 /*
301 static int is_target_valid(struct pfiled *pfiled, char *target, int mode)
302 {
303         return is_target_valid_len(pfiled, target, strlen(target), mode);
304 }
305 */
306
307 static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetlen)
308 {
309         int r, fd;
310         char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
311         char error_str[1024];
312
313         r = create_path(tmp, pfiled, target, targetlen, 1);
314         if (r < 0) {
315                 XSEGLOG2(&lc, E, "Could not create path");
316                 return -1;
317         }
318         XSEGLOG2(&lc, D, "Opening file %s with O_RDWR|O_CREAT", tmp);
319         fd = open(tmp, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
320         if (fd < 0){
321                 XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
322                 return -1;
323         }
324         return fd;
325 }
326
327 static int open_file_read(struct pfiled *pfiled, char *target, uint32_t targetlen)
328 {
329         int r, fd;
330         char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
331         char error_str[1024];
332
333         r = create_path(tmp, pfiled, target, targetlen, 0);
334         if (r < 0) {
335                 XSEGLOG2(&lc, E, "Could not create path");
336                 return -1;
337         }
338         XSEGLOG2(&lc, D, "Opening file %s with O_RDWR", tmp);
339         fd = open(tmp, O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
340         if (fd < 0){
341                 XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
342                 return -1;
343         }
344         return fd;
345 }
346
347 static int open_file(struct pfiled *pfiled, char *target, uint32_t targetlen, int mode)
348 {
349         if (mode == WRITE) {
350                 return open_file_write(pfiled, target, targetlen);
351         } else if (mode == READ) {
352                 return open_file_read(pfiled, target, targetlen);
353
354         } else {
355                 XSEGLOG2(&lc, E, "Invalid mode for target");
356         }
357         return -1;
358 }
359
360 static int dir_open(struct pfiled *pfiled, struct fio *fio,
361                 char *target, uint32_t targetlen, int mode)
362 {
363         int r, fd;
364         struct fdcache_entry *e;
365         xcache_handler h = NoEntry, nh;
366         char name[XSEG_MAX_TARGETLEN + 1];
367
368         if (targetlen > XSEG_MAX_TARGETLEN) {
369                 XSEGLOG2(&lc, E, "Invalid targetlen %u, max: %u",
370                                 targetlen, XSEG_MAX_TARGETLEN);
371                 return -1;
372         }
373         strncpy(name, target, targetlen);
374         name[targetlen] = 0;
375         XSEGLOG2(&lc, I, "Dir open started for %s", name);
376
377         h = xcache_lookup(&pfiled->cache, name);
378         if (h == NoEntry) {
379                 r = is_target_valid_len(pfiled, target, targetlen, mode);
380                 if (r < 0) {
381                         XSEGLOG2(&lc, E, "Invalid len for target %s", name);
382                         goto out_err;
383                 }
384
385                 h = xcache_alloc_init(&pfiled->cache, name);
386                 if (h == NoEntry) {
387                         /* FIXME add waitq to wait for free */
388                         XSEGLOG2(&lc, E, "Could not allocate cache entry for %s",
389                                         name);
390                         goto out_err;
391                 }
392                 XSEGLOG2(&lc, D, "Allocated new handler %llu for %s",
393                                 (long long unsigned)h, name);
394
395                 e = xcache_get_entry(&pfiled->cache, h);
396                 if (!e) {
397                         XSEGLOG2(&lc, E, "Alloced handler but no valid fd cache entry");
398                         goto out_free;
399                 }
400
401                 /* open/create file */
402                 fd = open_file(pfiled, target, targetlen, mode);
403                 if (fd < 0) {
404                         XSEGLOG2(&lc, E, "Could not open file for target %s", name);
405                         goto out_free;
406                 }
407                 XSEGLOG2(&lc, D, "Opened file %s. fd %d", name, fd);
408
409                 e->fd = fd;
410
411                 XSEGLOG2(&lc, D, "Inserting handler %llu for %s to fdcache",
412                                 (long long unsigned)h, name);
413                 nh = xcache_insert(&pfiled->cache, h);
414                 if (nh != h) {
415                         XSEGLOG2(&lc, D, "Partial cache hit for %s. New handler %llu",
416                                         name, (long long unsigned)nh);
417                         xcache_put(&pfiled->cache, h);
418                         h = nh;
419                 }
420         } else {
421                 XSEGLOG2(&lc, D, "Cache hit for %s, handler: %llu", name,
422                                 (long long unsigned)h);
423         }
424
425         e = xcache_get_entry(&pfiled->cache, h);
426         if (!e) {
427                 XSEGLOG2(&lc, E, "Found handler but no valid fd cache entry");
428                 xcache_put(&pfiled->cache, h);
429                 fio->h = NoEntry;
430                 goto out_err;
431         }
432         fio->h = h;
433
434         //assert e->fd != -1 ?;
435         XSEGLOG2(&lc, I, "Dir open finished for %s", name);
436         return e->fd;
437
438 out_free:
439         xcache_free_new(&pfiled->cache, h);
440 out_err:
441         XSEGLOG2(&lc, E, "Dir open failed for %s", name);
442         return -1;
443 }
444
445 static void handle_read(struct peerd *peer, struct peer_req *pr)
446 {
447         struct pfiled *pfiled = __get_pfiled(peer);
448         struct fio *fio = __get_fio(pr);
449         struct xseg_request *req = pr->req;
450         int r, fd;
451         char *target = xseg_get_target(peer->xseg, req);
452         char *data = xseg_get_data(peer->xseg, req);
453
454         XSEGLOG2(&lc, I, "Handle read started for pr: %p, req: %p", pr, pr->req);
455
456         if (!req->size) {
457                 pfiled_complete(peer, pr);
458                 return;
459         }
460
461         if (req->datalen < req->size) {
462                 XSEGLOG2(&lc, E, "Request datalen is less than request size");
463                 pfiled_fail(peer, pr);
464                 return;
465         }
466
467
468         fd = dir_open(pfiled, fio, target, req->targetlen, READ);
469         if (fd < 0){
470                 if (errno != ENOENT) {
471                         XSEGLOG2(&lc, E, "Open failed");
472                         pfiled_fail(peer, pr);
473                         return;
474                 } else {
475                         memset(data, 0, req->size);
476                         req->serviced = req->size;
477                         goto out;
478                 }
479         }
480
481
482         XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
483                         req->size);
484         while (req->serviced < req->size) {
485                 XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
486                                 req->serviced, req->size);
487                 r = pread(fd, data + req->serviced,
488                                 req->size- req->serviced,
489                                 req->offset + req->serviced);
490                 if (r < 0) {
491                         XSEGLOG2(&lc, E, "Cannot read");
492                         break;
493                 }
494                 else if (r == 0) {
495                         /* reached end of file. zero out the rest data buffer */
496                         memset(data + req->serviced, 0, req->size - req->serviced);
497                         req->serviced = req->size;
498                 }
499                 else {
500                         req->serviced += r;
501                 }
502         }
503         XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
504                         req->size);
505
506 out:
507         if (req->serviced > 0 ) {
508                 XSEGLOG2(&lc, I, "Handle read completed for pr: %p, req: %p",
509                                 pr, pr->req);
510                 pfiled_complete(peer, pr);
511         }
512         else {
513                 XSEGLOG2(&lc, E, "Handle read failed for pr: %p, req: %p",
514                                 pr, pr->req);
515                 pfiled_fail(peer, pr);
516         }
517         return;
518 }
519
520 static void handle_write(struct peerd *peer, struct peer_req *pr)
521 {
522         struct pfiled *pfiled = __get_pfiled(peer);
523         struct fio *fio = __get_fio(pr);
524         struct xseg_request *req = pr->req;
525         int r, fd;
526         char *target = xseg_get_target(peer->xseg, req);
527         char *data = xseg_get_data(peer->xseg, req);
528
529         XSEGLOG2(&lc, I, "Handle write started for pr: %p, req: %p", pr, pr->req);
530
531         if (req->datalen < req->size) {
532                 XSEGLOG2(&lc, E, "Request datalen is less than request size");
533                 pfiled_fail(peer, pr);
534                 return;
535         }
536
537         fd = dir_open(pfiled, fio, target, req->targetlen, WRITE);
538         if (fd < 0){
539                 XSEGLOG2(&lc, E, "Open failed");
540                 pfiled_fail(peer, pr);
541                 return;
542         }
543
544         if (!req->size) {
545                 if (req->flags & (XF_FLUSH | XF_FUA)) {
546                         /* No FLUSH/FUA support yet (O_SYNC ?).
547                          * note that with FLUSH/size == 0 
548                          * there will probably be a (uint64_t)-1 offset */
549                         pfiled_complete(peer, pr);
550                         return;
551                 } else {
552                         pfiled_complete(peer, pr);
553                         return;
554                 }
555         }
556
557         XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
558                         req->size);
559         while (req->serviced < req->size) {
560                 XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
561                                 req->serviced, req->size);
562                 r = pwrite(fd, data + req->serviced,
563                                 req->size- req->serviced,
564                                 req->offset + req->serviced);
565                 if (r < 0) {
566                         break;
567                 }
568                 else {
569                         req->serviced += r;
570                 }
571         }
572         XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
573                         req->size);
574         r = fsync(fd);
575         if (r< 0) {
576                 XSEGLOG2(&lc, E, "Fsync failed.");
577                 /* if fsync fails, then no bytes serviced correctly */
578                 req->serviced = 0;
579         }
580
581         if (req->serviced > 0 ) {
582                 XSEGLOG2(&lc, I, "Handle write completed for pr: %p, req: %p",
583                                 pr, pr->req);
584                 pfiled_complete(peer, pr);
585         }
586         else {
587                 XSEGLOG2(&lc, E, "Handle write failed for pr: %p, req: %p",
588                                 pr, pr->req);
589                 pfiled_fail(peer, pr);
590         }
591         return;
592 }
593
594 static void handle_info(struct peerd *peer, struct peer_req *pr)
595 {
596         struct pfiled *pfiled = __get_pfiled(peer);
597         struct fio *fio = __get_fio(pr);
598         struct xseg_request *req = pr->req;
599         struct stat stat;
600         int fd, r;
601         uint64_t size;
602         char *target = xseg_get_target(peer->xseg, req);
603         char *data = xseg_get_data(peer->xseg, req);
604         char buf[XSEG_MAX_TARGETLEN + 1];
605         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
606
607         if (req->datalen < sizeof(struct xseg_reply_info)) {
608                 strncpy(buf, target, req->targetlen);
609                 r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info));
610                 if (r < 0) {
611                         XSEGLOG2(&lc, E, "Cannot resize request");
612                         pfiled_fail(peer, pr);
613                         return;
614                 }
615                 target = xseg_get_target(peer->xseg, req);
616                 strncpy(target, buf, req->targetlen);
617         }
618
619         XSEGLOG2(&lc, I, "Handle info started for pr: %p, req: %p", pr, pr->req);
620         fd = dir_open(pfiled, fio, target, req->targetlen, READ);
621         if (fd < 0) {
622                 XSEGLOG2(&lc, E, "Dir open failed");
623                 pfiled_fail(peer, pr);
624                 return;
625         }
626
627         r = fstat(fd, &stat);
628         if (r < 0) {
629                 XSEGLOG2(&lc, E, "fail in stat");
630                 pfiled_fail(peer, pr);
631                 return;
632         }
633
634         size = (uint64_t)stat.st_size;
635         xinfo->size = size;
636
637         XSEGLOG2(&lc, I, "Handle info completed for pr: %p, req: %p", pr, pr->req);
638         pfiled_complete(peer, pr);
639 }
640
641 static void handle_copy(struct peerd *peer, struct peer_req *pr)
642 {
643         struct pfiled *pfiled = __get_pfiled(peer);
644         struct fio *fio = __get_fio(pr);
645         struct xseg_request *req = pr->req;
646         char *target = xseg_get_target(peer->xseg, req);
647         char *data = xseg_get_data(peer->xseg, req);
648         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
649         struct stat st;
650         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
651         int src = -1, dst = -1, r = -1;
652         ssize_t c = 0, bytes;
653         ssize_t limit = 0;
654
655         XSEGLOG2(&lc, I, "Handle copy started for pr: %p, req: %p", pr, pr->req);
656         if (!buf){
657                 XSEGLOG2(&lc, E, "Out of memory");
658                 pfiled_fail(peer, pr);
659                 return;
660         }
661
662         r = is_target_valid_len(pfiled, xcopy->target, xcopy->targetlen, READ);
663         if (r < 0) {
664                 XSEGLOG2(&lc, E, "Source target not valid");
665                 goto out;
666         }
667
668         dst = dir_open(pfiled, fio, target, req->targetlen, WRITE);
669         if (dst < 0) {
670                 XSEGLOG2(&lc, E, "Fail in dst");
671                 r = dst;
672                 goto out;
673         }
674
675         r = create_path(buf, pfiled, xcopy->target, xcopy->targetlen, 0);
676         if (r < 0)  {
677                 XSEGLOG2(&lc, E, "Create path failed");
678                 r = -1;
679                 goto out;
680         }
681
682         src = open(buf, O_RDONLY);
683         if (src < 0) {
684                 XSEGLOG2(&lc, E, "fail in src %s", buf);
685                 r = src;
686                 goto out;
687         }
688
689         r = fstat(src, &st);
690         if (r < 0){
691                 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
692                 goto out;
693         }
694
695         c = 0;
696
697         limit = min(req->size, st.st_size);
698         while (c < limit) {
699                 bytes = sendfile(dst, src, NULL, limit - c);
700                 if (bytes < 0) {
701                         XSEGLOG2(&lc, E, "Copy failed for %s", buf);
702                         r = -1;
703                         goto out;
704                 }
705                 c += bytes;
706         }
707         r = 0;
708
709 out:
710         req->serviced = c;
711         if (limit && c == limit)
712                 req->serviced = req->size;
713
714         if (src > 0)
715                 close(src);
716         free(buf);
717         if (r < 0) {
718                 XSEGLOG2(&lc, E, "Handle copy failed for pr: %p, req: %p", pr, pr->req);
719                 pfiled_fail(peer, pr);
720         } else {
721                 XSEGLOG2(&lc, I, "Handle copy completed for pr: %p, req: %p", pr, pr->req);
722                 pfiled_complete(peer, pr);
723         }
724         return;
725 }
726
727 static void handle_delete(struct peerd *peer, struct peer_req *pr)
728 {
729         struct pfiled *pfiled = __get_pfiled(peer);
730         //struct fio *fio = __get_fio(pr);
731         struct xseg_request *req = pr->req;
732         char name[XSEG_MAX_TARGETLEN + 1];
733         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
734         int r;
735         char *target = xseg_get_target(peer->xseg, req);
736
737         XSEGLOG2(&lc, I, "Handle delete started for pr: %p, req: %p", pr, pr->req);
738
739         if (!buf){
740                 XSEGLOG2(&lc, E, "Out of memory");
741                 pfiled_fail(peer, pr);
742                 return;
743         }
744
745         r = is_target_valid_len(pfiled, target, req->targetlen, READ);
746         if (r < 0) {
747                 XSEGLOG2(&lc, E, "Target not valid");
748                 goto out;
749         }
750
751         r = create_path(buf, pfiled, target, req->targetlen, 0);
752         if (r< 0) {
753                 XSEGLOG2(&lc, E, "Create path failed");
754                 goto out;
755         }
756         r = unlink(buf);
757 out:
758         free(buf);
759         if (r < 0) {
760                 XSEGLOG2(&lc, E, "Handle delete failed for pr: %p, req: %p", pr, pr->req);
761                 pfiled_fail(peer, pr);
762         } else {
763                 strncpy(name, target, XSEG_MAX_TARGETLEN);
764                 name[XSEG_MAX_TARGETLEN] = 0;
765                 xcache_invalidate(&pfiled->cache, name);
766                 XSEGLOG2(&lc, I, "Handle delete completed for pr: %p, req: %p", pr, pr->req);
767                 pfiled_complete(peer, pr);
768         }
769         return;
770 }
771
772 static int __get_precalculated_hash(struct peerd *peer, char *target,
773                 uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
774 {
775         int ret = -1;
776         int r, fd;
777         uint32_t len, pos;
778         char *hash_file = NULL, *hash_path = NULL;
779         char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
780         struct pfiled *pfiled = __get_pfiled(peer);
781
782         XSEGLOG2(&lc, D, "Started.");
783
784         hash_file = malloc(MAX_FILENAME_SIZE + 1);
785         hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
786
787         pos = 0;
788         strncpy(hash_file+pos, target, targetlen);
789         pos += targetlen;
790         strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
791         pos += HASH_SUFFIX_LEN;
792         hash_file[pos] = 0;
793         hash[0] = 0;
794
795         r = create_path(hash_path, pfiled, hash_file, pos, 1);
796         if (r < 0)  {
797                 XSEGLOG2(&lc, E, "Create path failed");
798                 goto out;
799         }
800
801         fd = open(hash_path, O_RDONLY, S_IRWXU | S_IRUSR);
802         if (fd < 0) {
803                 if (errno != ENOENT){
804                         XSEGLOG2(&lc, E, "Error opening %s", hash_path);
805                 } else {
806                         XSEGLOG2(&lc, I, "No precalculated hash for %s", hash_file);
807                         ret = 0;
808                 }
809                 goto out;
810         }
811
812         r = pread(fd, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
813         if (r < 0) {
814                 XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
815                 close(fd);
816                 goto out;
817         }
818         len = (uint32_t)r;
819
820         XSEGLOG2(&lc, D, "Read %u bytes", len);
821
822         r = close(fd);
823         if (r < 0) {
824                 XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
825                 goto out;
826         }
827
828         if (len == HEXLIFIED_SHA256_DIGEST_SIZE){
829                 strncpy(hash, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE);
830                 hash[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
831                 XSEGLOG2(&lc, D, "Found hash for %s : %s", hash_file, hash);
832                 ret = 0;
833         }
834 out:
835         free(hash_path);
836         XSEGLOG2(&lc, D, "Finished.");
837         return ret;
838 }
839
840 static int __set_precalculated_hash(struct peerd *peer, char *target,
841                 uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
842 {
843         int ret = -1;
844         int r, fd;
845         uint32_t len, pos;
846         char *hash_file = NULL, *hash_path = NULL;
847         char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
848         struct pfiled *pfiled = __get_pfiled(peer);
849
850         XSEGLOG2(&lc, D, "Started.");
851
852         hash_file = malloc(MAX_FILENAME_SIZE + 1);
853         hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
854
855         pos = 0;
856         strncpy(hash_file+pos, target, targetlen);
857         pos += targetlen;
858         strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
859         pos += HASH_SUFFIX_LEN;
860         hash_file[pos] = 0;
861
862         r = create_path(hash_path, pfiled, hash_file, pos, 1);
863         if (r < 0)  {
864                 XSEGLOG2(&lc, E, "Create path failed");
865                 goto out;
866         }
867
868         fd = open(hash_path, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
869         if (fd < 0) {
870                 if (errno != ENOENT){
871                         XSEGLOG2(&lc, E, "Error opening %s", hash_path);
872                 } else {
873                         XSEGLOG2(&lc, I, "Hash file already exists %s", hash_file);
874                         ret = 0;
875                 }
876                 goto out;
877         }
878
879         r = pwrite(fd, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
880         if (r < 0) {
881                 XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
882                 close(fd);
883                 goto out;
884         }
885         len = (uint32_t)r;
886
887         XSEGLOG2(&lc, D, "Wrote %u bytes", len);
888
889         r = close(fd);
890         if (r < 0) {
891                 XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
892                 goto out;
893         }
894
895 out:
896         free(hash_path);
897         XSEGLOG2(&lc, D, "Finished.");
898         return ret;
899 }
900
901 static void handle_hash(struct peerd *peer, struct peer_req *pr)
902 {
903         //open src
904         //read all file
905         //sha256 hash
906         //stat (open without create)
907         //write to hash_tmpfile
908         //link file
909
910         int src = -1, dst = -1, r = -1, pos;
911         ssize_t c;
912         uint64_t sum, written, trailing_zeros;
913         struct pfiled *pfiled = __get_pfiled(peer);
914         struct fio *fio = __get_fio(pr);
915         struct xseg_request *req = pr->req;
916         char *pathname = NULL, *tmpfile_pathname = NULL, *tmpfile = NULL;
917         char *target;
918         char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
919         char name[XSEG_MAX_TARGETLEN + 1];
920
921         unsigned char *object_data = NULL;
922         unsigned char sha[SHA256_DIGEST_SIZE];
923         struct xseg_reply_hash *xreply;
924
925         target = xseg_get_target(peer->xseg, req);
926
927         XSEGLOG2(&lc, I, "Handle hash started for pr: %p, req: %p",
928                         pr, pr->req);
929
930         if (!req->size) {
931                 XSEGLOG2(&lc, E, "No request size provided");
932                 r = -1;
933                 goto out;
934         }
935
936         r = is_target_valid_len(pfiled, target, req->targetlen, READ);
937         if (r < 0) {
938                 XSEGLOG2(&lc, E, "Source target not valid");
939                 goto out;
940         }
941
942         r = __get_precalculated_hash(peer, target, req->targetlen, hash_name);
943         if (r < 0) {
944                 XSEGLOG2(&lc, E, "Error getting precalculated hash");
945                 goto out;
946         }
947
948         if (hash_name[0] != 0) {
949                 XSEGLOG2(&lc, I, "Precalucated hash found %s", hash_name);
950                 goto found;
951         }
952
953         XSEGLOG2(&lc, I, "No precalculated hash found");
954
955         strncpy(name, target, req->targetlen);
956         name[req->targetlen] = 0;
957
958         pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
959         object_data = malloc(sizeof(char) * req->size);
960         if (!pathname || !object_data){
961                 XSEGLOG2(&lc, E, "Out of memory");
962                 goto out;
963         }
964
965         src = dir_open(pfiled, fio, target, req->targetlen, READ);
966         if (src < 0) {
967                 XSEGLOG2(&lc, E, "Fail in src");
968                 r = dst;
969                 goto out;
970         }
971
972         sum = 0;
973         while (sum < req->size) {
974                 c = pread(src, object_data + sum, req->size - sum, sum);
975                 if (c < 0) {
976                         XSEGLOG2(&lc, E, "Error reading from source");
977                         r = -1;
978                         goto out;
979                 }
980                 if (c == 0) {
981                         break;
982                 }
983                 sum += c;
984         }
985
986         //rstrip here in case zeros were written in the end
987         trailing_zeros = 0;
988         for (;trailing_zeros < sum; trailing_zeros++)
989                 if (object_data[sum - trailing_zeros - 1])
990                         break;
991
992         XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
993                         sum, trailing_zeros);
994
995         sum -= trailing_zeros;
996         //calculate hash name
997         SHA256(object_data, sum, sha);
998
999         hexlify(sha, SHA256_DIGEST_SIZE, hash_name);
1000         hash_name[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
1001
1002
1003         r = create_path(pathname, pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, 1);
1004         if (r < 0)  {
1005                 XSEGLOG2(&lc, E, "Create path failed");
1006                 r = -1;
1007                 goto out;
1008         }
1009
1010
1011
1012         dst = open(pathname, O_WRONLY);
1013         if (dst > 0) {
1014                 XSEGLOG2(&lc, I, "%s already exists, no write needed", pathname);
1015                 req->serviced = req->size;
1016                 r = 0;
1017                 goto out;
1018         }
1019
1020         tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
1021         if (!tmpfile_pathname){
1022                 XSEGLOG2(&lc, E, "Out of memory");
1023                 r = -1;
1024                 goto out;
1025         }
1026
1027         tmpfile = malloc(MAX_FILENAME_SIZE);
1028         if (!tmpfile){
1029                 XSEGLOG2(&lc, E, "Out of memory");
1030                 r = -1;
1031                 goto out;
1032         }
1033
1034         pos = 0;
1035         strncpy(tmpfile + pos, target, req->targetlen);
1036         pos += req->targetlen;
1037         strncpy(tmpfile + pos, SNAP_SUFFIX, SNAP_SUFFIX_LEN);
1038         pos += SNAP_SUFFIX_LEN;
1039         strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
1040         pos += pfiled->uniquestr_len;
1041         strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
1042         pos += FIO_STR_ID_LEN;
1043         tmpfile[pos] = 0;
1044
1045         r = create_path(tmpfile_pathname, pfiled, tmpfile, pos, 1);
1046         if (r < 0)  {
1047                 XSEGLOG2(&lc, E, "Create path failed");
1048                 r = -1;
1049                 goto out;
1050         }
1051
1052         XSEGLOG2(&lc, D, "Opening %s", tmpfile_pathname);
1053         dst = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL,
1054                         S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1055         if (dst < 0) {
1056                 if (errno != EEXIST){
1057                         char error_str[1024];
1058                         XSEGLOG2(&lc, E, "Error opening %s (%s)", tmpfile_pathname, strerror_r(errno, error_str, 1023));
1059                 } else {
1060                         XSEGLOG2(&lc, E, "Error opening %s. Stale data found.",
1061                                         tmpfile_pathname);
1062                 }
1063                 r = -1;
1064                 goto out;
1065         }
1066         XSEGLOG2(&lc, D, "Opened %s", tmpfile_pathname);
1067
1068         written = 0;
1069         while (written < sum) {
1070                 c = write(dst, object_data + written, sum - written);
1071                 if (c < 0) {
1072                         XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
1073                         r = -1;
1074                         goto out_unlink;
1075                 }
1076                 written += c;
1077         }
1078
1079         r = link(tmpfile_pathname, pathname);
1080         if (r < 0 && errno != EEXIST) {
1081                 XSEGLOG2(&lc, E, "Error linking tmp file %s. Errno %d",
1082                                 pathname, errno);
1083                 r = -1;
1084                 goto out_unlink;
1085         }
1086
1087         r = unlink(tmpfile_pathname);
1088         if (r < 0) {
1089                 XSEGLOG2(&lc, W, "Error unlinking tmp file %s", tmpfile_pathname);
1090                 r = 0;
1091         }
1092
1093         r = __set_precalculated_hash(peer, target, req->targetlen, hash_name);
1094         if (r < 0) {
1095                 XSEGLOG2(&lc, W, "Error setting precalculated hash");
1096                 r = 0;
1097         }
1098
1099 found:
1100         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
1101                         sizeof(struct xseg_reply_hash));
1102         if (r < 0)  {
1103                 XSEGLOG2(&lc, E, "Resize request failed");
1104                 r = -1;
1105                 goto out;
1106         }
1107
1108         xreply = (struct xseg_reply_hash *)xseg_get_data(peer->xseg, req);
1109         strncpy(xreply->target, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE);
1110         xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
1111
1112         req->serviced = req->size;
1113         r = 0;
1114
1115 out:
1116         if (dst > 0) {
1117                 close(dst);
1118         }
1119         if (r < 0) {
1120                 XSEGLOG2(&lc, E, "Handle hash failed for pr: %p, req: %p. ",
1121                                 "Target %s", pr, pr->req, name);
1122                 pfiled_fail(peer, pr);
1123         } else {
1124                 XSEGLOG2(&lc, I, "Handle hash completed for pr: %p, req: %p\n\t"
1125                                 "hashed %s to %s", pr, pr->req, name, hash_name);
1126                 pfiled_complete(peer, pr);
1127         }
1128         free(tmpfile_pathname);
1129         free(pathname);
1130         free(object_data);
1131         return;
1132
1133 out_unlink:
1134         unlink(tmpfile_pathname);
1135         goto out;
1136 }
1137
1138 static int __locked_by(char *lockfile, char *expected, uint32_t expected_len)
1139 {
1140         int ret = -1;
1141         int r, fd;
1142         uint32_t len;
1143         char tmpbuf[MAX_UNIQUESTR_LEN];
1144
1145         XSEGLOG2(&lc, D, "Started. Lockfile: %s, expected: %s, expected_len: %u", lockfile, expected, expected_len);
1146         fd = open(lockfile, O_RDONLY, S_IRWXU | S_IRUSR);
1147         if (fd < 0) {
1148                 if (errno != ENOENT){
1149                         XSEGLOG2(&lc, E, "Error opening %s", lockfile);
1150                 } else {
1151                         //-2 == retry
1152                         XSEGLOG2(&lc, I, "lock file removed");
1153                         ret = -2;
1154                 }
1155                 goto out;
1156         }
1157         r = pread(fd, tmpbuf, MAX_UNIQUESTR_LEN, 0);
1158         if (r < 0) {
1159                 XSEGLOG2(&lc, E, "Error reading from %s", lockfile);
1160                 close(fd);
1161                 goto out;
1162         }
1163         len = (uint32_t)r;
1164         XSEGLOG2(&lc, D, "Read %u bytes", len);
1165         r = close(fd);
1166         if (r < 0) {
1167                 XSEGLOG2(&lc, E, "Could not close lockfile %s", lockfile);
1168                 goto out;
1169         }
1170         if (len == expected_len && !strncmp(tmpbuf, expected, expected_len)){
1171                 XSEGLOG2(&lc, D, "Lock file %s locked by us.", lockfile);
1172                 ret = 0;
1173         }
1174 out:
1175         XSEGLOG2(&lc, D, "Finished. Lockfile: %s", lockfile);
1176         return ret;
1177 }
1178
1179 static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
1180                         uint32_t flags, int fd)
1181 {
1182         int r;
1183         XSEGLOG2(&lc, D, "Started. Lockfile: %s, Tmpfile:%s", lockfile, tmpfile);
1184         r = pwrite(fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
1185         if (r < 0) {
1186                 return -1;
1187         }
1188         r = fsync(fd);
1189         if (r < 0) {
1190                 return -1;
1191         }
1192
1193         while (link(tmpfile, lockfile) < 0) {
1194                 //actual error
1195                 if (errno != EEXIST){
1196                         XSEGLOG2(&lc, E, "Error linking %s to %s",
1197                                         tmpfile, lockfile);
1198                         return -1;
1199                 }
1200                 r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len);
1201                 if (!r) {
1202                         break;
1203                 }
1204                 if (flags & XF_NOSYNC) {
1205                         XSEGLOG2(&lc, D, "Could not get lock file %s, "
1206                                         "XF_NOSYNC set. Aborting", lockfile);
1207                         return -1;
1208                 }
1209                 sleep(1);
1210         }
1211         XSEGLOG2(&lc, D, "Finished. Lockfile: %s", lockfile);
1212         return 0;
1213 }
1214
1215 static void handle_acquire(struct peerd *peer, struct peer_req *pr)
1216 {
1217         int r, ret = -1;
1218         struct pfiled *pfiled = __get_pfiled(peer);
1219         struct fio *fio = __get_fio(pr);
1220         struct xseg_request *req = pr->req;
1221         char *buf = malloc(MAX_FILENAME_SIZE);
1222         char *tmpfile = malloc(MAX_FILENAME_SIZE);
1223         char *lockfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1224         char *tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1225         int fd = -1, pos;
1226         char *target = xseg_get_target(peer->xseg, req);
1227         uint32_t buf_len, tmpfile_len;
1228
1229         if (!buf || !tmpfile_pathname || !lockfile_pathname) {
1230                 XSEGLOG2(&lc, E, "Out of memory");
1231                 pfiled_fail(peer, pr);
1232                 return;
1233         }
1234
1235         r = is_target_valid_len(pfiled, target, req->targetlen, READ);
1236         if (r < 0) {
1237                 XSEGLOG2(&lc, E, "Target not valid");
1238                 goto out;
1239         }
1240
1241
1242         pos = 0;
1243         strncpy(buf + pos, target, req->targetlen);
1244         pos = req->targetlen;
1245         strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1246         pos += LOCK_SUFFIX_LEN;
1247         buf[pos] = 0;
1248         buf_len = pos;
1249
1250         XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
1251
1252
1253         pos = 0;
1254         strncpy(tmpfile + pos, buf, buf_len);
1255         pos += buf_len;
1256         strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
1257         pos += pfiled->uniquestr_len;
1258         strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
1259         pos += FIO_STR_ID_LEN;
1260         tmpfile[pos] = 0;
1261         tmpfile_len = pos;
1262
1263         XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
1264
1265         if (create_path(tmpfile_pathname, pfiled, tmpfile, tmpfile_len, 1) < 0) {
1266                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1267                 goto out;
1268         }
1269
1270         if (create_path(lockfile_pathname, pfiled, buf, buf_len, 1) < 0) {
1271                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1272                 goto out;
1273         }
1274
1275         //create exclusive unique lockfile (block_uniqueid+target)
1276         //if (OK)
1277         //      write blocker uniqueid to the unique lockfile
1278         //      try to link it to the lockfile
1279         //      if (OK)
1280         //              unlink unique lockfile;
1281         //              complete
1282         //      else
1283         //              spin while not able to link
1284
1285         //nfs v >= 3
1286         XSEGLOG2(&lc, D, "Tmpfile: %s", tmpfile_pathname);
1287         fd = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
1288         if (fd < 0) {
1289                 //actual error
1290                 if (errno != EEXIST){
1291                         XSEGLOG2(&lc, E, "Error opening %s", tmpfile_pathname);
1292                         goto out;
1293                 } else {
1294                         XSEGLOG2(&lc, E, "Error opening %s. Stale data found.",
1295                                         tmpfile_pathname);
1296                 }
1297                 ret = -1;
1298         } else {
1299                 XSEGLOG2(&lc, D, "Tmpfile %s created. Trying to get lock",
1300                                 tmpfile_pathname);
1301                 r = __try_lock(pfiled, tmpfile_pathname, lockfile_pathname,
1302                                 req->flags, fd);
1303                 if (r < 0){
1304                         XSEGLOG2(&lc, E, "Trying to get lock %s failed", buf);
1305                         ret = -1;
1306                 } else {
1307                         XSEGLOG2(&lc, D, "Trying to get lock %s succeed", buf);
1308                         ret = 0;
1309                 }
1310                 r = close(fd);
1311                 if (r < 0) {
1312                         XSEGLOG2(&lc, W, "Error closing %s", tmpfile_pathname);
1313                 }
1314                 r = unlink(tmpfile_pathname);
1315                 if (r < 0) {
1316                         XSEGLOG2(&lc, E, "Error unlinking %s", tmpfile_pathname);
1317                 }
1318         }
1319 out:
1320         if (ret < 0){
1321                 XSEGLOG2(&lc, I, "Failed to acquire lock %s", buf);
1322                 pfiled_fail(peer, pr);
1323         }
1324         else{
1325                 XSEGLOG2(&lc, I, "Acquired lock %s", buf);
1326                 pfiled_complete(peer, pr);
1327         }
1328         free(buf);
1329         free(lockfile_pathname);
1330         free(tmpfile_pathname);
1331         return;
1332 }
1333
1334 static void handle_release(struct peerd *peer, struct peer_req *pr)
1335 {
1336         struct pfiled *pfiled = __get_pfiled(peer);
1337 //      struct fio *fio = __get_fio(pr);
1338         struct xseg_request *req = pr->req;
1339         char *buf = malloc(MAX_FILENAME_SIZE + 1);
1340         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
1341         char *tmpbuf = malloc(MAX_UNIQUESTR_LEN + 1);
1342         char *target = xseg_get_target(peer->xseg, req);
1343         int r, pos;
1344
1345         if (!buf || !pathname) {
1346                 XSEGLOG2(&lc, E, "Out of memory");
1347                 fail(peer, pr);
1348                 return;
1349         }
1350
1351         r = is_target_valid_len(pfiled, target, req->targetlen, READ);
1352         if (r < 0) {
1353                 XSEGLOG2(&lc, E, "Target not valid");
1354                 goto out;
1355         }
1356
1357         pos = 0;
1358         strncpy(buf + pos, target, req->targetlen);
1359         pos += req->targetlen;
1360         strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1361         pos += LOCK_SUFFIX_LEN;
1362         buf[pos] = 0;
1363
1364         XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
1365
1366         r = create_path(pathname, pfiled, buf,
1367                         req->targetlen + strlen(LOCK_SUFFIX), 0);
1368         if (r < 0) {
1369                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1370                 goto out;
1371         }
1372
1373         if ((req->flags & XF_FORCE) || !__locked_by(pathname, pfiled->uniquestr,
1374                                                 pfiled->uniquestr_len)) {
1375                 r = unlink(pathname);
1376                 if (r < 0) {
1377                         XSEGLOG2(&lc, E, "Could not unlink %s", pathname);
1378                         goto out;
1379                 }
1380         } else {
1381                 r = -1;
1382         }
1383
1384 out:
1385         if (r < 0) {
1386                 fail(peer, pr);
1387         }
1388         else {
1389                 XSEGLOG2(&lc, I, "Released lockfile: %s", buf);
1390                 complete(peer, pr);
1391         }
1392         XSEGLOG2(&lc, I, "Finished. Lockfile: %s", buf);
1393         free(buf);
1394         free(tmpbuf);
1395         free(pathname);
1396         return;
1397 }
1398
1399 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1400                                 enum dispatch_reason reason)
1401 {
1402         struct fio *fio = __get_fio(pr);
1403         if (reason == dispatch_accept)
1404                 fio->h = NoEntry;
1405
1406         switch (req->op) {
1407                 case X_READ:
1408                         handle_read(peer, pr); break;
1409                 case X_WRITE:
1410                         handle_write(peer, pr); break;
1411                 case X_INFO:
1412                         handle_info(peer, pr); break;
1413                 case X_COPY:
1414                         handle_copy(peer, pr); break;
1415                 case X_DELETE:
1416                         handle_delete(peer, pr); break;
1417                 case X_ACQUIRE:
1418                         handle_acquire(peer, pr); break;
1419                 case X_RELEASE:
1420                         handle_release(peer, pr); break;
1421                 case X_HASH:
1422                         handle_hash(peer, pr); break;
1423                 case X_SYNC:
1424                 default:
1425                         handle_unknown(peer, pr);
1426         }
1427         return 0;
1428 }
1429
1430 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1431 {
1432         /*
1433         get blocks,maps paths
1434         get optional pithos block,maps paths
1435         get fdcache size
1436         check if greater than limit (tip: getrlimit)
1437         assert cachesize greater than nr_ops
1438         assert nr_ops greater than nr_threads
1439         get prefix
1440         */
1441
1442         int ret = 0;
1443         int i, r;
1444         struct fio *fio;
1445         struct pfiled *pfiled = malloc(sizeof(struct pfiled));
1446         struct rlimit rlim;
1447         struct xcache_ops c_ops = {
1448                 .on_node_init = cache_node_init,
1449                 .on_init = cache_init,
1450                 .on_put = cache_put,
1451         };
1452         if (!pfiled){
1453                 XSEGLOG2(&lc, E, "Out of memory");
1454                 ret = -ENOMEM;
1455                 goto out;
1456         }
1457         peer->priv = pfiled;
1458
1459         pfiled->maxfds = 2 * peer->nr_ops;
1460
1461         for (i = 0; i < peer->nr_ops; i++) {
1462                 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
1463                 if (!peer->peer_reqs->priv){
1464                         XSEGLOG2(&lc, E, "Out of memory");
1465                         ret = -ENOMEM;
1466                         goto out;
1467                 }
1468                 fio = __get_fio(&peer->peer_reqs[i]);
1469                 fio->str_id[0] = '_';
1470                 fio->str_id[1] = 'a' + (i / 26);
1471                 fio->str_id[2] = 'a' + (i % 26);
1472         }
1473
1474         pfiled->vpath[0] = 0;
1475         pfiled->prefix[0] = 0;
1476         pfiled->uniquestr[0] = 0;
1477
1478         BEGIN_READ_ARGS(argc, argv);
1479         READ_ARG_ULONG("--fdcache", pfiled->maxfds);
1480         READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
1481         READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
1482         READ_ARG_STRING("--uniquestr", pfiled->uniquestr, MAX_UNIQUESTR_LEN);
1483         END_READ_ARGS();
1484
1485         pfiled->uniquestr_len = strlen(pfiled->uniquestr);
1486         pfiled->prefix_len = strlen(pfiled->prefix);
1487
1488         //TODO test path exist/is_dir/have_access
1489         pfiled->vpath_len = strlen(pfiled->vpath);
1490         if (!pfiled->vpath_len){
1491                 XSEGLOG2(&lc, E, "Archipelago path was not provided");
1492                 usage(argv[0]);
1493                 return -1;
1494         }
1495         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
1496                 pfiled->vpath[pfiled->vpath_len] = '/';
1497                 pfiled->vpath[++pfiled->vpath_len]= 0;
1498         }
1499
1500         r = getrlimit(RLIMIT_NOFILE, &rlim);
1501         if (r < 0) {
1502                 XSEGLOG2(&lc, E, "Could not get limit for max fds");
1503                 return -1;
1504         }
1505         //TODO check nr_ops == nr_threads.
1506         //
1507         r = xcache_init(&pfiled->cache, pfiled->maxfds, &c_ops, XCACHE_LRU_HEAP, peer);
1508         if (r < 0)
1509                 return -1;
1510         //check max fds. (> fdcache + nr_threads)
1511         //TODO assert fdcache > 2*nr_threads or add waitq
1512         if (rlim.rlim_cur < pfiled->cache.size + peer->nr_threads - 4) {
1513                 XSEGLOG2(&lc, E, "FD limit %d is less than cachesize + nr_ops -4(%u)",
1514                                 rlim.rlim_cur, pfiled->cache.size + peer->nr_ops - 4);
1515                 return -1;
1516         }
1517
1518 out:
1519         return ret;
1520 }
1521
1522 void custom_peer_finalize(struct peerd *peer)
1523 {
1524         /*
1525         we could close all fds, but we can let the system do it for us.
1526         */
1527         return;
1528 }
1529
1530 /*
1531 static int safe_atoi(char *s)
1532 {
1533         long l;
1534         char *endp;
1535
1536         l = strtol(s, &endp, 10);
1537         if (s != endp && *endp == '\0')
1538                 return l;
1539         else
1540                 return -1;
1541 }
1542 */