filed: Initial dummy approach to directio
[archipelago] / xseg / peers / user / filed.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 /*
36  * The Pithos File Blocker Peer (pfiled)
37  */
38
39 #define _GNU_SOURCE
40 #include <stdio.h>
41 #include <stdarg.h>
42 #include <stdlib.h>
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <unistd.h>
46 #include <string.h>
47 #include <fcntl.h>
48 #include <errno.h>
49 #include <signal.h>
50 #include <limits.h>
51 #include <pthread.h>
52 #include <syscall.h>
53 #include <sys/sendfile.h>
54 #include <peer.h>
55 #include <xtypes/xcache.h>
56 #include <openssl/sha.h>
57 #include <sys/resource.h>
58
59 #include <xseg/xseg.h>
60 #include <xseg/protocol.h>
61
62 #include <hash.h>
63
64 #define FIO_STR_ID_LEN          3
65 #define LOCK_SUFFIX             "_lock"
66 #define LOCK_SUFFIX_LEN         5
67 #define HASH_SUFFIX             "_hash"
68 #define HASH_SUFFIX_LEN         5
69 #define MAX_PATH_SIZE           1024
70 #define MAX_FILENAME_SIZE       (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + MAX_UNIQUESTR_LEN + FIO_STR_ID_LEN)
71 #define MAX_PREFIX_LEN          10
72 #define MAX_UNIQUESTR_LEN       128
73 #define SNAP_SUFFIX             "_snap"
74 #define SNAP_SUFFIX_LEN         5
75
76 #define WRITE 1
77 #define READ 2
78
79 #define min(_a, _b) (_a < _b ? _a : _b)
80
81 /*
82  * Globals, holding command-line arguments
83  */
84
85 void custom_peer_usage(char *argv0)
86 {
87          fprintf(stderr, "General peer options:\n"
88                 "  Option        | Default    | \n"
89                 "  --------------------------------------------\n"
90                 "    --fdcache   | 2 * nr_ops | Fd cache size\n"
91                 "    --archip    | None       | Archipelago directory\n"
92                 "    --prefix    | None       | Common prefix of objects that should be stripped\n"
93                 "    --uniquestr | None       | Unique string for this instance\n"
94                 "\n"
95                );
96 }
97
98 /* fdcache_node flags */
99 #define READY (1 << 1)
100
101 /* fdcache node info */
102 struct fdcache_entry {
103         volatile int fd;
104         volatile unsigned int flags;
105 };
106
107 /* pfiled context */
108 struct pfiled {
109         uint32_t vpath_len;
110         uint32_t prefix_len;
111         uint32_t uniquestr_len;
112         long maxfds;
113         uint32_t directio;
114         char vpath[MAX_PATH_SIZE + 1];
115         char prefix[MAX_PREFIX_LEN + 1];
116         char uniquestr[MAX_UNIQUESTR_LEN + 1];
117         struct xcache cache;
118 };
119
120 /*
121  * pfiled specific structure
122  * containing information on a pending I/O operation
123  */
124 struct fio {
125         uint32_t state;
126         xcache_handler h;
127         char str_id[FIO_STR_ID_LEN];
128 };
129
130 struct pfiled * __get_pfiled(struct peerd *peer)
131 {
132         return (struct pfiled *) peer->priv;
133 }
134
135 struct fio * __get_fio(struct peer_req *pr)
136 {
137         return (struct fio*) pr->priv;
138 }
139
140
141 /* cache ops */
142 static void * cache_node_init(void *p, void *xh)
143 {
144         //struct peerd *peer = (struct peerd *)p;
145         //struct pfiled *pfiled = __get_pfiled(peer);
146         xcache_handler h = *(xcache_handler *)(xh);
147         struct fdcache_entry *fdentry = malloc(sizeof(struct fdcache_entry));
148         if (!fdentry)
149                 return NULL;
150
151         XSEGLOG2(&lc, D, "Initialing node h: %llu with %p",
152                         (long long unsigned)h, fdentry);
153
154         fdentry->fd = -1;
155         fdentry->flags = 0;
156
157         return fdentry;
158 }
159
160 static int cache_init(void *p, void *e)
161 {
162         struct fdcache_entry *fdentry = (struct fdcache_entry *)e;
163
164         if (fdentry->fd != -1) {
165                 XSEGLOG2(&lc, E, "Found invalid fd %d", fdentry->fd);
166                 return -1;
167         }
168
169         return 0;
170 }
171
172 static void cache_put(void *p, void *e)
173 {
174         struct fdcache_entry *fdentry = (struct fdcache_entry *)e;
175
176         XSEGLOG2(&lc, D, "Putting entry %p with fd %d", fdentry, fdentry->fd);
177
178         if (fdentry->fd != -1)
179                 close(fdentry->fd);
180
181         fdentry->fd = -1;
182         fdentry->flags = 0;
183         return;
184 }
185
186 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
187 {
188         struct pfiled *pfiled = __get_pfiled(peer);
189         struct fio *fio = __get_fio(pr);
190         if (fio->h != NoEntry)
191                 xcache_put(&pfiled->cache, fio->h);
192 }
193
194 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
195 {
196         close_cache_entry(peer, pr);
197         complete(peer, pr);
198 }
199
200 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
201 {
202         close_cache_entry(peer, pr);
203         fail(peer, pr);
204 }
205
206 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
207 {
208         XSEGLOG2(&lc, W, "unknown request op");
209         pfiled_fail(peer, pr);
210 }
211
212 static void get_dirs(char buf[6], struct pfiled *pfiled, char *target, uint32_t targetlen)
213 {
214         unsigned char sha[SHA256_DIGEST_SIZE];
215         char hex[HEXLIFIED_SHA256_DIGEST_SIZE];
216         char *prefix = pfiled->prefix;
217         uint32_t prefixlen = pfiled->prefix_len;
218
219         if (strncmp(target, prefix, prefixlen)) {
220                 strncpy(buf, target, 6);
221                 return;
222         }
223
224         SHA256((unsigned char *)target, targetlen, sha);
225         hexlify(sha, 3, hex);
226         strncpy(buf, hex, 6);
227         return;
228 }
229
230 static int strnjoin(char *dest, int n, ...)
231 {
232         int pos, i;
233         va_list ap;
234         char *s;
235         int l;
236
237         pos = 0;
238         va_start(ap, n);
239         for (i = 0; i < n; i++) {
240                 s = va_arg(ap, char *);
241                 l = va_arg(ap, int);
242                 strncpy(dest + pos, s, l);
243                 pos += l;
244         }
245         dest[pos] = 0;
246         va_end(ap);
247
248         return pos;
249 }
250
251 static int strjoin(char *dest, char *f, int f_len, char *s, int s_len)
252 {
253         int pos;
254
255         pos = 0;
256         strncpy(dest + pos, f, f_len);
257         pos += f_len;
258         strncpy(dest + pos, s, s_len);
259         pos += s_len;
260         dest[pos] = 0;
261
262         return f_len + s_len;
263 }
264
265 static int create_path(char *buf, struct pfiled *pfiled, char *target,
266                         uint32_t targetlen, int mkdirs)
267 {
268         int i;
269         struct stat st;
270         char dirs[6];
271         char *path = pfiled->vpath;
272         uint32_t pathlen = pfiled->vpath_len;
273
274         get_dirs(dirs, pfiled, target, targetlen);
275
276         strncpy(buf, path, pathlen);
277
278         for (i = 0; i < 9; i+= 3) {
279                 buf[pathlen + i] = dirs[i - (i/3)];
280                 buf[pathlen + i +1] = dirs[i + 1 - (i/3)];
281                 buf[pathlen + i + 2] = '/';
282                 if (mkdirs == 1) {
283                         buf[pathlen + i + 3] = '\0';
284 retry:
285                         if (stat(buf, &st) < 0) 
286                                 if (mkdir(buf, 0750) < 0) {
287                                         if (errno == EEXIST)
288                                                 goto retry;
289                                         //perror(buf);
290                                         return -1;
291                                 }
292                 }
293         }
294
295         strncpy(&buf[pathlen + 9], target, targetlen);
296         buf[pathlen + 9 + targetlen] = '\0';
297
298         return 0;
299 }
300
301
302 static ssize_t persisting_read(int fd, void *data, size_t size, off_t offset)
303 {
304         ssize_t r = 0, sum = 0;
305         char error_str[1024];
306         XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
307
308         while (sum < size) {
309                 XSEGLOG2(&lc, D, "read: %llu, (aligned)size: %llu", sum, size);
310                 r = pread(fd, (char *)data + sum, size - sum, offset + sum);
311                 if (r < 0) {
312                         XSEGLOG2(&lc, E, "fd: %d, Error: %s", fd, strerror_r(errno, error_str, 1023));
313                         break;
314                 } else if (r == 0) {
315                         break;
316                 } else {
317                         sum += r;
318                 }
319         }
320         XSEGLOG2(&lc, D, "read: %llu, (aligned)size: %llu", sum, size);
321
322         if (sum == 0 && r < 0) {
323                 sum = r;
324         }
325         XSEGLOG2(&lc, D, "Finished. Read %d, r = %d", sum, r);
326
327         return sum;
328 }
329
330 static ssize_t persisting_write(int fd, void *data, size_t size, off_t offset)
331 {
332         ssize_t r = 0, sum = 0;
333
334         XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
335         while (sum < size) {
336                 XSEGLOG2(&lc, D, "written: %llu, (aligned)size: %llu", sum, size);
337                 r = pwrite(fd, (char *)data + sum, size - sum, offset + sum);
338                 if (r < 0) {
339                         break;
340                 } else {
341                         sum += r;
342                 }
343         }
344         XSEGLOG2(&lc, D, "written: %llu, (aligned)size: %llu", sum, size);
345
346         if (sum == 0 && r < 0) {
347                 sum = r;
348         }
349         XSEGLOG2(&lc, D, "Finished. Wrote %d, r = %d", sum, r);
350
351         return sum;
352 }
353
354 static ssize_t aligned_read(int fd, void *data, ssize_t size, off_t offset, int alignment)
355 {
356         char *tmp_data;
357         ssize_t r;
358         size_t misaligned_data, misaligned_size, misaligned_offset;
359         off_t aligned_offset=offset;
360         size_t aligned_size=size;
361
362         misaligned_data = (unsigned long)data % alignment;
363         misaligned_size = size % alignment;
364         misaligned_offset = offset % alignment;
365         XSEGLOG2(&lc, D, "misaligned_data: %u, misaligned_size: %u, misaligned_offset: %u", misaligned_data, misaligned_size, misaligned_offset);
366         if (misaligned_data || misaligned_size || misaligned_offset) {
367                 aligned_offset = offset - misaligned_offset;
368                 aligned_size = size + misaligned_offset;
369
370                 misaligned_size = aligned_size % alignment;
371                 aligned_size = aligned_size - misaligned_size + alignment;
372                 r = posix_memalign(&tmp_data, alignment, aligned_size);
373                 if (r < 0) {
374                         return -1;
375                 }
376         } else {
377                 tmp_data = data;
378                 aligned_offset = offset;
379                 aligned_size = size;
380         }
381
382         XSEGLOG2(&lc, D, "aligned_data: %u, aligned_size: %u, aligned_offset: %u", tmp_data, aligned_size, aligned_offset);
383         r = persisting_read(fd, tmp_data, aligned_size, aligned_offset);
384
385         //FIXME if r < size ?
386         if (tmp_data != data) {
387                 memcpy(data, tmp_data + misaligned_offset, size);
388                 free(tmp_data);
389         }
390         if (r >= size)
391                 r = size;
392         return r;
393 }
394
395 pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
396
397 int __fcntl_lock(int fd, off_t start, off_t len)
398 {
399         return pthread_mutex_lock(&m);
400 }
401
402 int __fcntl_unlock(int fd, off_t start, off_t len)
403 {
404         return pthread_mutex_unlock(&m);
405 }
406
407 static ssize_t aligned_write(int fd, void *data, size_t size, off_t offset, int alignment)
408 {
409         int locked = 0;
410         char *tmp_data;
411         ssize_t r;
412         size_t misaligned_data, misaligned_size, misaligned_offset;
413         size_t aligned_size = size, aligned_offset = offset, read_size;
414         misaligned_data = (unsigned long)data % alignment;
415         misaligned_size = size % alignment;
416         misaligned_offset = offset % alignment;
417         if (misaligned_data || misaligned_size || misaligned_offset) {
418                 //if somthing is misaligned then:
419                 //
420                 // First check if the offset was missaligned.
421                 aligned_offset = offset - misaligned_offset;
422
423                 // Then adjust the size with the misaligned offset and check if
424                 // it remains misaligned.
425                 aligned_size = size + misaligned_offset;
426                 misaligned_size = aligned_size % alignment;
427
428                 // in case there is no misaligned_size
429                 if (misaligned_size)
430                         aligned_size = aligned_size + alignment - misaligned_size;
431
432                 // Allocate aligned memory
433                 r = posix_memalign(&tmp_data, alignment, aligned_size);
434                 if (r < 0) {
435                         return -1;
436                 }
437
438                 XSEGLOG2(&lc, D, "fd: %d, misaligned_data: %u, misaligned_size: %u, misaligned_offset: %u", fd, misaligned_data, misaligned_size, misaligned_offset);
439                 XSEGLOG2(&lc, D, "fd: %d, aligned_data: %u, aligned_size: %u, aligned_offset: %u", fd, tmp_data, aligned_size, aligned_offset);
440                 XSEGLOG2(&lc, D, "fd: %d, locking from %u to %u", fd, aligned_offset, aligned_offset + aligned_size);
441                 __fcntl_lock(fd, aligned_offset, aligned_size + alignment - misaligned_size);
442                 locked = 1;
443
444                 if (misaligned_offset) {
445                         XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
446                         /* read misaligned_offset */
447                         read_size = alignment;
448                         r = persisting_read(fd, tmp_data, alignment, aligned_offset);
449                         if (r < 0) {
450                                 free(tmp_data);
451                                 return -1;
452                         } else if (r != read_size) {
453                                 memset(tmp_data + r, 0, read_size - r);
454                         }
455                 }
456
457                 if (misaligned_size) {
458                         read_size = alignment;
459                         r = persisting_read(fd, tmp_data + aligned_size - alignment, alignment,
460                                         aligned_offset + aligned_size - alignment);
461                         if (r < 0) {
462                                 free(tmp_data);
463                                 return -1;
464                         } else if (r != read_size) {
465                                 memset(tmp_data + aligned_size - alignment + r, 0, read_size - r);
466                         }
467                 }
468                 memcpy(tmp_data + misaligned_offset, data, size);
469         } else {
470                 aligned_size = size;
471                 aligned_offset = offset;
472                 tmp_data = data;
473         }
474
475         r = persisting_write(fd, tmp_data, aligned_size, aligned_offset);
476
477         if (locked) {
478                 XSEGLOG2(&lc, D, "fd: %d, unlocking from %u to %u", fd, aligned_offset, aligned_offset + aligned_size);
479                 __fcntl_unlock(fd, aligned_offset, aligned_size + alignment - misaligned_size);
480         }
481         if (tmp_data != data) {
482                 free(tmp_data);
483         }
484
485         if (r >= size)
486                 r = size;
487         return r;
488 }
489
490 static ssize_t filed_write(int fd, void *data, size_t size, off_t offset, int direct)
491 {
492         if (direct)
493                 return aligned_write(fd, data, size, offset, 512);
494         else
495                 return persisting_write(fd, data, size, offset);
496 }
497
498 static ssize_t filed_read(int fd, void *data, size_t size, off_t offset, int direct)
499 {
500         if (direct)
501                 return aligned_read(fd, data, size, offset, 512);
502         else
503                 return persisting_read(fd, data, size, offset);
504 }
505
506 static ssize_t pfiled_read(struct pfiled *pfiled, int fd, void *data, size_t size, off_t offset)
507 {
508         return filed_read(fd, data, size, offset, pfiled->directio);
509 }
510
511 static ssize_t pfiled_write(struct pfiled *pfiled, int fd, void *data, size_t size, off_t offset)
512 {
513         return filed_write(fd, data, size, offset, pfiled->directio);
514 }
515
516 static ssize_t generic_io_path(char *path, void *data, size_t size, off_t offset, int write, int flags, mode_t mode)
517 {
518         int fd;
519         ssize_t r;
520
521         fd = open(path, flags, mode);
522         if (fd < 0) {
523                 return -1;
524         }
525         XSEGLOG2(&lc, D, "Opened file %s as fd %d", path, fd);
526
527         if (write) {
528                 r = filed_write(fd, data, size, offset, flags & O_DIRECT);
529         } else {
530                 r = filed_read(fd, data, size, offset, flags & O_DIRECT);
531         }
532
533         close(fd);
534
535         return r;
536 }
537
538 static ssize_t read_path(char *path, void *data, size_t size, off_t offset, int direct)
539 {
540         int flags = O_RDONLY;
541         if (direct)
542                 flags |= O_DIRECT;
543
544         return generic_io_path(path, data, size, offset, 0, flags, 0);
545 }
546
547 static ssize_t pfiled_read_name(struct pfiled *pfiled, char *name, uint32_t namelen, void *data, size_t size, off_t offset)
548 {
549         char path[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
550         int r;
551         r = create_path(path, pfiled, name, namelen, 1);
552         if (r < 0) {
553                 XSEGLOG2(&lc, E, "Could not create path");
554                 return -1;
555         }
556         return read_path(path, data, size, offset, pfiled->directio);
557 }
558
559 static ssize_t write_path(char *path, void *data, size_t size, off_t offset, int direct, int extra_open_flags, mode_t mode)
560 {
561         int flags = O_RDWR | extra_open_flags;
562         if (direct)
563                 flags |= O_DIRECT;
564         return generic_io_path(path, data, size, offset, 1, flags, mode);
565 }
566
567 static ssize_t pfiled_write_name(struct pfiled *pfiled, char *name, uint32_t namelen, void *data, size_t size, off_t offset, int extra_open_flags, mode_t mode)
568 {
569         char path[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
570         int r;
571         r = create_path(path, pfiled, name, namelen, 1);
572         if (r < 0) {
573                 XSEGLOG2(&lc, E, "Could not create path");
574                 return -1;
575         }
576         return write_path(path, data, size, offset, pfiled->directio, extra_open_flags, mode);
577 }
578
579 static int is_target_valid_len(struct pfiled *pfiled, char *target,
580                 uint32_t targetlen, int mode)
581 {
582         if (targetlen > XSEG_MAX_TARGETLEN) {
583                 XSEGLOG2(&lc, E, "Invalid targetlen %u, max: %u",
584                                 targetlen, XSEG_MAX_TARGETLEN);
585                 return -1;
586         }
587         if (mode == WRITE || mode == READ) {
588                 /*
589                  * if name starts with prefix
590                  *      assert targetlen >= prefix_len + 6
591                  * else
592                  *      assert targetlen >= 6
593                  */
594                 /* 6 chars are needed for the directory structrure */
595                 if (!pfiled->prefix_len || strncmp(target, pfiled->prefix, pfiled->prefix_len)) {
596                         if (targetlen < 6) {
597                                 XSEGLOG2(&lc, E, "Targetlen should be at least 6");
598                                 return -1;
599                         }
600                 } else {
601                         if (targetlen < pfiled->prefix_len + 6) {
602                                 XSEGLOG2(&lc, E, "Targetlen should be at least prefix "
603                                                 "len(%u) + 6", pfiled->prefix_len);
604                                 return -1;
605                         }
606                 }
607         } else {
608                 XSEGLOG2(&lc, E, "Invalid mode");
609                 return -1;
610         }
611
612         return 0;
613 }
614
615 /*
616 static int is_target_valid(struct pfiled *pfiled, char *target, int mode)
617 {
618         return is_target_valid_len(pfiled, target, strlen(target), mode);
619 }
620 */
621
622 static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetlen)
623 {
624         int r, fd, flags;
625         char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
626         char error_str[1024];
627
628         r = create_path(tmp, pfiled, target, targetlen, 1);
629         if (r < 0) {
630                 XSEGLOG2(&lc, E, "Could not create path");
631                 return -1;
632         }
633         flags = O_RDWR|O_CREAT;
634         if (pfiled->directio)
635                 flags |= O_DIRECT;
636         XSEGLOG2(&lc, D, "Opening file %s with O_RDWR|O_CREAT", tmp);
637         fd = open(tmp, flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
638         if (fd < 0){
639                 XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
640                 return -1;
641         }
642         return fd;
643 }
644
645 static int open_file_read(struct pfiled *pfiled, char *target, uint32_t targetlen)
646 {
647         int r, fd, flags;
648         char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
649         char error_str[1024];
650
651         r = create_path(tmp, pfiled, target, targetlen, 0);
652         if (r < 0) {
653                 XSEGLOG2(&lc, E, "Could not create path");
654                 return -1;
655         }
656         XSEGLOG2(&lc, D, "Opening file %s with O_RDWR", tmp);
657         flags = O_RDWR;
658         if (pfiled->directio)
659                 flags |= O_DIRECT;
660         fd = open(tmp, flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
661         if (fd < 0){
662                 XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
663                 return -1;
664         }
665         return fd;
666 }
667
668 static int open_file(struct pfiled *pfiled, char *target, uint32_t targetlen, int mode)
669 {
670         if (mode == WRITE) {
671                 return open_file_write(pfiled, target, targetlen);
672         } else if (mode == READ) {
673                 return open_file_read(pfiled, target, targetlen);
674
675         } else {
676                 XSEGLOG2(&lc, E, "Invalid mode for target");
677         }
678         return -1;
679 }
680
681 static int dir_open(struct pfiled *pfiled, struct fio *fio,
682                 char *target, uint32_t targetlen, int mode)
683 {
684         int r, fd;
685         struct fdcache_entry *e;
686         xcache_handler h = NoEntry, nh;
687         char name[XSEG_MAX_TARGETLEN + 1];
688
689         if (targetlen > XSEG_MAX_TARGETLEN) {
690                 XSEGLOG2(&lc, E, "Invalid targetlen %u, max: %u",
691                                 targetlen, XSEG_MAX_TARGETLEN);
692                 return -1;
693         }
694         strncpy(name, target, targetlen);
695         name[targetlen] = 0;
696         XSEGLOG2(&lc, I, "Dir open started for %s", name);
697
698         h = xcache_lookup(&pfiled->cache, name);
699         if (h == NoEntry) {
700                 r = is_target_valid_len(pfiled, target, targetlen, mode);
701                 if (r < 0) {
702                         XSEGLOG2(&lc, E, "Invalid len for target %s", name);
703                         goto out_err;
704                 }
705
706                 h = xcache_alloc_init(&pfiled->cache, name);
707                 if (h == NoEntry) {
708                         /* FIXME add waitq to wait for free */
709                         XSEGLOG2(&lc, E, "Could not allocate cache entry for %s",
710                                         name);
711                         goto out_err;
712                 }
713                 XSEGLOG2(&lc, D, "Allocated new handler %llu for %s",
714                                 (long long unsigned)h, name);
715
716                 e = xcache_get_entry(&pfiled->cache, h);
717                 if (!e) {
718                         XSEGLOG2(&lc, E, "Alloced handler but no valid fd cache entry");
719                         goto out_free;
720                 }
721
722                 /* open/create file */
723                 fd = open_file(pfiled, target, targetlen, mode);
724                 if (fd < 0) {
725                         XSEGLOG2(&lc, E, "Could not open file for target %s", name);
726                         goto out_free;
727                 }
728                 XSEGLOG2(&lc, D, "Opened file %s. fd %d", name, fd);
729
730                 e->fd = fd;
731
732                 XSEGLOG2(&lc, D, "Inserting handler %llu for %s to fdcache",
733                                 (long long unsigned)h, name);
734                 nh = xcache_insert(&pfiled->cache, h);
735                 if (nh != h) {
736                         XSEGLOG2(&lc, D, "Partial cache hit for %s. New handler %llu",
737                                         name, (long long unsigned)nh);
738                         xcache_put(&pfiled->cache, h);
739                         h = nh;
740                 }
741         } else {
742                 XSEGLOG2(&lc, D, "Cache hit for %s, handler: %llu", name,
743                                 (long long unsigned)h);
744         }
745
746         e = xcache_get_entry(&pfiled->cache, h);
747         if (!e) {
748                 XSEGLOG2(&lc, E, "Found handler but no valid fd cache entry");
749                 xcache_put(&pfiled->cache, h);
750                 fio->h = NoEntry;
751                 goto out_err;
752         }
753         fio->h = h;
754
755         //assert e->fd != -1 ?;
756         XSEGLOG2(&lc, I, "Dir open finished for %s", name);
757         return e->fd;
758
759 out_free:
760         xcache_free_new(&pfiled->cache, h);
761 out_err:
762         XSEGLOG2(&lc, E, "Dir open failed for %s", name);
763         return -1;
764 }
765
766 static void handle_read(struct peerd *peer, struct peer_req *pr)
767 {
768         struct pfiled *pfiled = __get_pfiled(peer);
769         struct fio *fio = __get_fio(pr);
770         struct xseg_request *req = pr->req;
771         int r, fd;
772         char *target = xseg_get_target(peer->xseg, req);
773         char *data = xseg_get_data(peer->xseg, req);
774
775         XSEGLOG2(&lc, I, "Handle read started for pr: %p, req: %p", pr, pr->req);
776
777         if (!req->size) {
778                 pfiled_complete(peer, pr);
779                 return;
780         }
781
782         if (req->datalen < req->size) {
783                 XSEGLOG2(&lc, E, "Request datalen is less than request size");
784                 pfiled_fail(peer, pr);
785                 return;
786         }
787
788
789         fd = dir_open(pfiled, fio, target, req->targetlen, READ);
790         if (fd < 0){
791                 if (errno != ENOENT) {
792                         XSEGLOG2(&lc, E, "Open failed");
793                         pfiled_fail(peer, pr);
794                         return;
795                 } else {
796                         memset(data, 0, req->size);
797                         req->serviced = req->size;
798                         goto out;
799                 }
800         }
801
802
803         XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
804                         req->size);
805         r = pfiled_read(pfiled, fd, data, req->size, req->offset);
806         if (r < 0) {
807                 XSEGLOG2(&lc, E, "Cannot read");
808                 req->serviced = 0;
809         } else if (r < req->size) {
810                 /* reached end of file. zero out the rest data buffer */
811                 memset(data + r, 0, req->size - r);
812                 req->serviced = req->size;
813         } else {
814                 req->serviced = r;
815         }
816         XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
817                         req->size);
818
819 out:
820         if (req->serviced > 0 ) {
821                 XSEGLOG2(&lc, I, "Handle read completed for pr: %p, req: %p",
822                                 pr, pr->req);
823                 pfiled_complete(peer, pr);
824         }
825         else {
826                 XSEGLOG2(&lc, E, "Handle read failed for pr: %p, req: %p",
827                                 pr, pr->req);
828                 pfiled_fail(peer, pr);
829         }
830         return;
831 }
832
833 static void handle_write(struct peerd *peer, struct peer_req *pr)
834 {
835         struct pfiled *pfiled = __get_pfiled(peer);
836         struct fio *fio = __get_fio(pr);
837         struct xseg_request *req = pr->req;
838         int fd;
839         ssize_t r;
840         char *target = xseg_get_target(peer->xseg, req);
841         char *data = xseg_get_data(peer->xseg, req);
842
843         XSEGLOG2(&lc, I, "Handle write started for pr: %p, req: %p", pr, pr->req);
844
845         if (req->datalen < req->size) {
846                 XSEGLOG2(&lc, E, "Request datalen is less than request size");
847                 pfiled_fail(peer, pr);
848                 return;
849         }
850
851         fd = dir_open(pfiled, fio, target, req->targetlen, WRITE);
852         if (fd < 0){
853                 XSEGLOG2(&lc, E, "Open failed");
854                 pfiled_fail(peer, pr);
855                 return;
856         }
857
858         if (!req->size) {
859                 if (req->flags & (XF_FLUSH | XF_FUA)) {
860                         /* No FLUSH/FUA support yet (O_SYNC ?).
861                          * note that with FLUSH/size == 0 
862                          * there will probably be a (uint64_t)-1 offset */
863                         pfiled_complete(peer, pr);
864                         return;
865                 } else {
866                         pfiled_complete(peer, pr);
867                         return;
868                 }
869         }
870
871         XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
872                         req->size);
873         r = pfiled_write(pfiled, fd, data, req->size, req->offset);
874         if (r < 0) {
875                 req->serviced = 0;
876         } else {
877                 req->serviced = r;
878         }
879         XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
880                         req->size);
881         r = fsync(fd);
882         if (r< 0) {
883                 XSEGLOG2(&lc, E, "Fsync failed.");
884                 /* if fsync fails, then no bytes serviced correctly */
885                 req->serviced = 0;
886         }
887
888         if (req->serviced > 0 ) {
889                 XSEGLOG2(&lc, I, "Handle write completed for pr: %p, req: %p",
890                                 pr, pr->req);
891                 pfiled_complete(peer, pr);
892         }
893         else {
894                 XSEGLOG2(&lc, E, "Handle write failed for pr: %p, req: %p",
895                                 pr, pr->req);
896                 pfiled_fail(peer, pr);
897         }
898         return;
899 }
900
901 static void handle_info(struct peerd *peer, struct peer_req *pr)
902 {
903         struct pfiled *pfiled = __get_pfiled(peer);
904         struct fio *fio = __get_fio(pr);
905         struct xseg_request *req = pr->req;
906         struct stat stat;
907         int fd, r;
908         uint64_t size;
909         char *target = xseg_get_target(peer->xseg, req);
910         char *data = xseg_get_data(peer->xseg, req);
911         char buf[XSEG_MAX_TARGETLEN + 1];
912         struct xseg_reply_info *xinfo  = (struct xseg_reply_info *)data;
913
914         if (req->datalen < sizeof(struct xseg_reply_info)) {
915                 strncpy(buf, target, req->targetlen);
916                 r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info));
917                 if (r < 0) {
918                         XSEGLOG2(&lc, E, "Cannot resize request");
919                         pfiled_fail(peer, pr);
920                         return;
921                 }
922                 target = xseg_get_target(peer->xseg, req);
923                 strncpy(target, buf, req->targetlen);
924         }
925
926         XSEGLOG2(&lc, I, "Handle info started for pr: %p, req: %p", pr, pr->req);
927         fd = dir_open(pfiled, fio, target, req->targetlen, READ);
928         if (fd < 0) {
929                 XSEGLOG2(&lc, E, "Dir open failed");
930                 pfiled_fail(peer, pr);
931                 return;
932         }
933
934         r = fstat(fd, &stat);
935         if (r < 0) {
936                 XSEGLOG2(&lc, E, "fail in stat");
937                 pfiled_fail(peer, pr);
938                 return;
939         }
940
941         size = (uint64_t)stat.st_size;
942         xinfo->size = size;
943
944         XSEGLOG2(&lc, I, "Handle info completed for pr: %p, req: %p", pr, pr->req);
945         pfiled_complete(peer, pr);
946 }
947
948 static void handle_copy(struct peerd *peer, struct peer_req *pr)
949 {
950         struct pfiled *pfiled = __get_pfiled(peer);
951         struct fio *fio = __get_fio(pr);
952         struct xseg_request *req = pr->req;
953         char *target = xseg_get_target(peer->xseg, req);
954         char *data = xseg_get_data(peer->xseg, req);
955         struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
956         struct stat st;
957         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
958         int src = -1, dst = -1, r = -1;
959         ssize_t c = 0, bytes;
960         ssize_t limit = 0;
961
962         XSEGLOG2(&lc, I, "Handle copy started for pr: %p, req: %p", pr, pr->req);
963         if (!buf){
964                 XSEGLOG2(&lc, E, "Out of memory");
965                 pfiled_fail(peer, pr);
966                 return;
967         }
968
969         r = is_target_valid_len(pfiled, xcopy->target, xcopy->targetlen, READ);
970         if (r < 0) {
971                 XSEGLOG2(&lc, E, "Source target not valid");
972                 goto out;
973         }
974
975         dst = dir_open(pfiled, fio, target, req->targetlen, WRITE);
976         if (dst < 0) {
977                 XSEGLOG2(&lc, E, "Fail in dst");
978                 r = dst;
979                 goto out;
980         }
981
982         src = open_file(pfiled, xcopy->target, xcopy->targetlen, READ);
983         if (src < 0) {
984                 XSEGLOG2(&lc, E, "Failed to open src");
985                 goto out;
986         }
987
988         r = fstat(src, &st);
989         if (r < 0){
990                 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
991                 goto out;
992         }
993
994         c = 0;
995
996         limit = min(req->size, st.st_size);
997         while (c < limit) {
998                 bytes = sendfile(dst, src, NULL, limit - c);
999                 if (bytes < 0) {
1000                         XSEGLOG2(&lc, E, "Copy failed for %s", buf);
1001                         r = -1;
1002                         goto out;
1003                 }
1004                 c += bytes;
1005         }
1006         r = 0;
1007
1008 out:
1009         req->serviced = c;
1010         if (limit && c == limit)
1011                 req->serviced = req->size;
1012
1013         if (src > 0)
1014                 close(src);
1015         free(buf);
1016         if (r < 0) {
1017                 XSEGLOG2(&lc, E, "Handle copy failed for pr: %p, req: %p", pr, pr->req);
1018                 pfiled_fail(peer, pr);
1019         } else {
1020                 XSEGLOG2(&lc, I, "Handle copy completed for pr: %p, req: %p", pr, pr->req);
1021                 pfiled_complete(peer, pr);
1022         }
1023         return;
1024 }
1025
1026 static void handle_delete(struct peerd *peer, struct peer_req *pr)
1027 {
1028         struct pfiled *pfiled = __get_pfiled(peer);
1029         //struct fio *fio = __get_fio(pr);
1030         struct xseg_request *req = pr->req;
1031         char name[XSEG_MAX_TARGETLEN + 1];
1032         char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1033         int r;
1034         char *target = xseg_get_target(peer->xseg, req);
1035
1036         XSEGLOG2(&lc, I, "Handle delete started for pr: %p, req: %p", pr, pr->req);
1037
1038         if (!buf){
1039                 XSEGLOG2(&lc, E, "Out of memory");
1040                 pfiled_fail(peer, pr);
1041                 return;
1042         }
1043
1044         r = is_target_valid_len(pfiled, target, req->targetlen, READ);
1045         if (r < 0) {
1046                 XSEGLOG2(&lc, E, "Target not valid");
1047                 goto out;
1048         }
1049
1050         r = create_path(buf, pfiled, target, req->targetlen, 0);
1051         if (r< 0) {
1052                 XSEGLOG2(&lc, E, "Create path failed");
1053                 goto out;
1054         }
1055         r = unlink(buf);
1056 out:
1057         free(buf);
1058         if (r < 0) {
1059                 XSEGLOG2(&lc, E, "Handle delete failed for pr: %p, req: %p", pr, pr->req);
1060                 pfiled_fail(peer, pr);
1061         } else {
1062                 strncpy(name, target, XSEG_MAX_TARGETLEN);
1063                 name[XSEG_MAX_TARGETLEN] = 0;
1064                 xcache_invalidate(&pfiled->cache, name);
1065                 XSEGLOG2(&lc, I, "Handle delete completed for pr: %p, req: %p", pr, pr->req);
1066                 pfiled_complete(peer, pr);
1067         }
1068         return;
1069 }
1070
1071 static int __get_precalculated_hash(struct peerd *peer, char *target,
1072                 uint32_t targetlen, char *hash)
1073 {
1074         int ret = -1;
1075         int r;
1076         uint32_t len, hash_file_len;
1077         char *hash_file = NULL;
1078         struct pfiled *pfiled = __get_pfiled(peer);
1079
1080         XSEGLOG2(&lc, D, "Started.");
1081
1082         hash_file = malloc(MAX_FILENAME_SIZE + 1);
1083         hash_file_len = strjoin(hash_file, target, targetlen, HASH_SUFFIX, HASH_SUFFIX_LEN);
1084         hash[0] = 0;
1085
1086         r = pfiled_read_name(pfiled, hash_file, hash_file_len, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
1087         if (r < 0) {
1088                 if (errno != ENOENT){
1089                         XSEGLOG2(&lc, E, "Error opening %s", hash_file);
1090                 } else {
1091                         XSEGLOG2(&lc, I, "No precalculated hash for %s", hash_file);
1092                         ret = 0;
1093                 }
1094                 goto out;
1095         }
1096         len = (uint32_t)r;
1097         XSEGLOG2(&lc, D, "Read %u bytes", len);
1098
1099         if (len == HEXLIFIED_SHA256_DIGEST_SIZE){
1100                 hash[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
1101                 XSEGLOG2(&lc, D, "Found hash for %s : %s", hash_file, hash);
1102                 ret = 0;
1103         }
1104 out:
1105         free(hash_file);
1106         XSEGLOG2(&lc, D, "Finished.");
1107         return ret;
1108 }
1109
1110 static int __set_precalculated_hash(struct peerd *peer, char *target,
1111                 uint32_t targetlen, char *hash)
1112 {
1113         int ret = -1;
1114         int r;
1115         uint32_t len, hash_file_len;
1116         char *hash_file = NULL;
1117         struct pfiled *pfiled = __get_pfiled(peer);
1118
1119         XSEGLOG2(&lc, D, "Started.");
1120
1121         hash_file = malloc(MAX_FILENAME_SIZE + 1);
1122         hash_file_len = strjoin(hash_file, target, targetlen, HASH_SUFFIX, HASH_SUFFIX_LEN);
1123
1124         r = pfiled_write_name(pfiled, hash_file, hash_file_len, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0,
1125                         O_CREAT|O_EXCL, S_IWUSR|S_IRUSR);
1126         if (r < 0) {
1127                 if (errno != EEXIST){
1128                         XSEGLOG2(&lc, E, "Error opening %s", hash_file);
1129                 } else {
1130                         XSEGLOG2(&lc, I, "Hash file already exists %s", hash_file);
1131                         ret = 0;
1132                 }
1133                 goto out;
1134         }
1135
1136         len = (uint32_t)r;
1137         XSEGLOG2(&lc, D, "Wrote %u bytes", len);
1138         ret = 0;
1139 out:
1140         free(hash_file);
1141         XSEGLOG2(&lc, D, "Finished.");
1142         return ret;
1143 }
1144
1145 static void handle_hash(struct peerd *peer, struct peer_req *pr)
1146 {
1147         //open src
1148         //read all file
1149         //sha256 hash
1150         //stat (open without create)
1151         //write to hash_tmpfile
1152         //link file
1153
1154         int len;
1155         int src = -1, dst = -1, r = -1;
1156         ssize_t c;
1157         uint64_t sum, trailing_zeros;
1158         struct pfiled *pfiled = __get_pfiled(peer);
1159         struct fio *fio = __get_fio(pr);
1160         struct xseg_request *req = pr->req;
1161         char *pathname = NULL, *tmpfile_pathname = NULL, *tmpfile = NULL;
1162         char *target;
1163 //      char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
1164         char *hash_name;
1165         char name[XSEG_MAX_TARGETLEN + 1];
1166
1167         unsigned char *object_data = NULL;
1168         unsigned char sha[SHA256_DIGEST_SIZE];
1169         struct xseg_reply_hash *xreply;
1170
1171         target = xseg_get_target(peer->xseg, req);
1172
1173         XSEGLOG2(&lc, I, "Handle hash started for pr: %p, req: %p",
1174                         pr, pr->req);
1175
1176         if (!req->size) {
1177                 XSEGLOG2(&lc, E, "No request size provided");
1178                 r = -1;
1179                 goto out;
1180         }
1181
1182         r = is_target_valid_len(pfiled, target, req->targetlen, READ);
1183         if (r < 0) {
1184                 XSEGLOG2(&lc, E, "Source target not valid");
1185                 goto out;
1186         }
1187
1188         r = posix_memalign(&hash_name, 512, 512 + 1);
1189
1190         r = __get_precalculated_hash(peer, target, req->targetlen, hash_name);
1191         if (r < 0) {
1192                 XSEGLOG2(&lc, E, "Error getting precalculated hash");
1193                 goto out;
1194         }
1195
1196         if (hash_name[0] != 0) {
1197                 XSEGLOG2(&lc, I, "Precalucated hash found %s", hash_name);
1198                 goto found;
1199         }
1200
1201         XSEGLOG2(&lc, I, "No precalculated hash found");
1202
1203         strncpy(name, target, req->targetlen);
1204         name[req->targetlen] = 0;
1205
1206         pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
1207         //object_data = malloc(sizeof(char) * req->size);
1208         r = posix_memalign(&object_data, 512, sizeof(char) * req->size);
1209         if (!pathname || !object_data){
1210                 XSEGLOG2(&lc, E, "Out of memory");
1211                 goto out;
1212         }
1213
1214         src = dir_open(pfiled, fio, target, req->targetlen, READ);
1215         if (src < 0) {
1216                 XSEGLOG2(&lc, E, "Fail in src");
1217                 r = dst;
1218                 goto out;
1219         }
1220
1221         c = pfiled_read(pfiled, src, object_data, req->size, req->offset);
1222         if (c < 0) {
1223                 XSEGLOG2(&lc, E, "Error reading from source");
1224                 r = -1;
1225                 goto out;
1226         }
1227         sum = c;
1228
1229         //rstrip here in case zeros were written in the end
1230         trailing_zeros = 0;
1231         for (;trailing_zeros < sum; trailing_zeros++)
1232                 if (object_data[sum - trailing_zeros - 1])
1233                         break;
1234
1235         XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
1236                         sum, trailing_zeros);
1237
1238         sum -= trailing_zeros;
1239         //calculate hash name
1240         SHA256(object_data, sum, sha);
1241
1242         hexlify(sha, SHA256_DIGEST_SIZE, hash_name);
1243         hash_name[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
1244
1245
1246         r = create_path(pathname, pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, 1);
1247         if (r < 0) {
1248                 XSEGLOG2(&lc, E, "Create path failed");
1249                 r = -1;
1250                 goto out;
1251         }
1252
1253
1254         dst = open_file(pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, READ);
1255         if (dst > 0) {
1256                 XSEGLOG2(&lc, I, "%s already exists, no write needed", pathname);
1257                 req->serviced = req->size;
1258                 r = 0;
1259                 goto out;
1260         }
1261
1262         tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
1263         if (!tmpfile_pathname){
1264                 XSEGLOG2(&lc, E, "Out of memory");
1265                 r = -1;
1266                 goto out;
1267         }
1268
1269         tmpfile = malloc(MAX_FILENAME_SIZE);
1270         if (!tmpfile){
1271                 XSEGLOG2(&lc, E, "Out of memory");
1272                 r = -1;
1273                 goto out;
1274         }
1275
1276         len = strnjoin(tmpfile, 4, target, req->targetlen,
1277                                 HASH_SUFFIX, HASH_SUFFIX_LEN,
1278                                 pfiled->uniquestr, pfiled->uniquestr_len,
1279                                 fio->str_id, FIO_STR_ID_LEN);
1280
1281         r = pfiled_write_name(pfiled, tmpfile, len, object_data, sum, 0, O_CREAT|O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1282         if (r < 0) {
1283                 if (errno != EEXIST){
1284                         char error_str[1024];
1285                         XSEGLOG2(&lc, E, "Error opening %s (%s)", tmpfile_pathname, strerror_r(errno, error_str, 1023));
1286                 } else {
1287                         XSEGLOG2(&lc, E, "Error opening %s. Stale data found.",
1288                                         tmpfile_pathname);
1289                 }
1290                 r = -1;
1291                 goto out;
1292         } else if (r < sum) {
1293                 XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
1294                 r = -1;
1295                 goto out_unlink;
1296         }
1297         XSEGLOG2(&lc, D, "Opened %s and wrote", tmpfile);
1298
1299         r = create_path(tmpfile_pathname, pfiled, tmpfile, len, 1);
1300         if (r < 0)  {
1301                 XSEGLOG2(&lc, E, "Create path failed");
1302                 r = -1;
1303                 goto out;
1304         }
1305
1306         r = link(tmpfile_pathname, pathname);
1307         if (r < 0 && errno != EEXIST) {
1308                 XSEGLOG2(&lc, E, "Error linking tmp file %s. Errno %d",
1309                                 pathname, errno);
1310                 r = -1;
1311                 goto out_unlink;
1312         }
1313
1314         r = unlink(tmpfile_pathname);
1315         if (r < 0) {
1316                 XSEGLOG2(&lc, W, "Error unlinking tmp file %s", tmpfile_pathname);
1317                 r = 0;
1318         }
1319
1320         r = __set_precalculated_hash(peer, target, req->targetlen, hash_name);
1321         if (r < 0) {
1322                 XSEGLOG2(&lc, W, "Error setting precalculated hash");
1323                 r = 0;
1324         }
1325
1326 found:
1327         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
1328                         sizeof(struct xseg_reply_hash));
1329         if (r < 0)  {
1330                 XSEGLOG2(&lc, E, "Resize request failed");
1331                 r = -1;
1332                 goto out;
1333         }
1334
1335         xreply = (struct xseg_reply_hash *)xseg_get_data(peer->xseg, req);
1336         strncpy(xreply->target, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE);
1337         xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
1338
1339         req->serviced = req->size;
1340         r = 0;
1341
1342 out:
1343         if (dst > 0) {
1344                 close(dst);
1345         }
1346         if (r < 0) {
1347                 XSEGLOG2(&lc, E, "Handle hash failed for pr: %p, req: %p. ",
1348                                 "Target %s", pr, pr->req, name);
1349                 pfiled_fail(peer, pr);
1350         } else {
1351                 XSEGLOG2(&lc, I, "Handle hash completed for pr: %p, req: %p\n\t"
1352                                 "hashed %s to %s", pr, pr->req, name, hash_name);
1353                 pfiled_complete(peer, pr);
1354         }
1355         free(tmpfile_pathname);
1356         free(pathname);
1357         free(object_data);
1358         return;
1359
1360 out_unlink:
1361         unlink(tmpfile_pathname);
1362         goto out;
1363 }
1364
1365 static int __locked_by(char *lockfile, char *expected, uint32_t expected_len, int direct)
1366 {
1367         int ret = -1;
1368         int r;
1369         uint32_t len;
1370         char tmpbuf[MAX_UNIQUESTR_LEN];
1371
1372         XSEGLOG2(&lc, D, "Started. Lockfile: %s, expected: %s, expected_len: %u", lockfile, expected, expected_len);
1373         r = read_path(lockfile, tmpbuf, MAX_UNIQUESTR_LEN, 0, direct);
1374         if (r < 0) {
1375                 if (errno != ENOENT){
1376                         XSEGLOG2(&lc, E, "Error opening %s", lockfile);
1377                 } else {
1378                         //-2 == retry
1379                         XSEGLOG2(&lc, I, "lock file removed");
1380                         ret = -2;
1381                 }
1382                 goto out;
1383         }
1384         len = (uint32_t)r;
1385         XSEGLOG2(&lc, D, "Read %u bytes", len);
1386         if (!strncmp(tmpbuf, expected, expected_len)){
1387                 XSEGLOG2(&lc, D, "Lock file %s locked by us.", lockfile);
1388                 ret = 0;
1389         }
1390 out:
1391         XSEGLOG2(&lc, D, "Finished. Lockfile: %s", lockfile);
1392         return ret;
1393 }
1394
1395 static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
1396                         uint32_t flags, int fd)
1397 {
1398         int r, direct;
1399         XSEGLOG2(&lc, D, "Started. Lockfile: %s, Tmpfile:%s", lockfile, tmpfile);
1400
1401         r = pfiled_write(pfiled, fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
1402         if (r < 0 || r < pfiled->uniquestr_len) {
1403                 return -1;
1404         }
1405         r = fsync(fd);
1406         if (r < 0) {
1407                 return -1;
1408         }
1409
1410         direct = pfiled->directio;
1411 //      direct = 0;
1412
1413         while (link(tmpfile, lockfile) < 0) {
1414                 //actual error
1415                 if (errno != EEXIST){
1416                         XSEGLOG2(&lc, E, "Error linking %s to %s",
1417                                         tmpfile, lockfile);
1418                         return -1;
1419                 }
1420                 r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len, direct);
1421                 if (!r) {
1422                         break;
1423                 }
1424                 if (flags & XF_NOSYNC) {
1425                         XSEGLOG2(&lc, D, "Could not get lock file %s, "
1426                                         "XF_NOSYNC set. Aborting", lockfile);
1427                         return -1;
1428                 }
1429                 sleep(1);
1430         }
1431         XSEGLOG2(&lc, D, "Finished. Lockfile: %s", lockfile);
1432         return 0;
1433 }
1434
1435 static void handle_acquire(struct peerd *peer, struct peer_req *pr)
1436 {
1437         int r, ret = -1;
1438         struct pfiled *pfiled = __get_pfiled(peer);
1439         struct fio *fio = __get_fio(pr);
1440         struct xseg_request *req = pr->req;
1441         char *buf = malloc(MAX_FILENAME_SIZE);
1442         char *tmpfile = malloc(MAX_FILENAME_SIZE);
1443         char *lockfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1444         char *tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1445         int fd = -1, flags;
1446         char *target = xseg_get_target(peer->xseg, req);
1447         uint32_t buf_len, tmpfile_len;
1448
1449         if (!buf || !tmpfile_pathname || !lockfile_pathname) {
1450                 XSEGLOG2(&lc, E, "Out of memory");
1451                 pfiled_fail(peer, pr);
1452                 return;
1453         }
1454
1455         r = is_target_valid_len(pfiled, target, req->targetlen, READ);
1456         if (r < 0) {
1457                 XSEGLOG2(&lc, E, "Target not valid");
1458                 goto out;
1459         }
1460
1461
1462         buf_len = strjoin(buf, target, req->targetlen, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1463
1464         XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
1465
1466
1467         tmpfile_len = strnjoin(tmpfile, 3, buf, buf_len,
1468                                 pfiled->uniquestr, pfiled->uniquestr_len,
1469                                 fio->str_id, FIO_STR_ID_LEN);
1470
1471         XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
1472
1473         if (create_path(tmpfile_pathname, pfiled, tmpfile, tmpfile_len, 1) < 0) {
1474                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1475                 goto out;
1476         }
1477
1478         if (create_path(lockfile_pathname, pfiled, buf, buf_len, 1) < 0) {
1479                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1480                 goto out;
1481         }
1482
1483         //create exclusive unique lockfile (block_uniqueid+target)
1484         //if (OK)
1485         //      write blocker uniqueid to the unique lockfile
1486         //      try to link it to the lockfile
1487         //      if (OK)
1488         //              unlink unique lockfile;
1489         //              complete
1490         //      else
1491         //              spin while not able to link
1492
1493         //nfs v >= 3
1494         XSEGLOG2(&lc, D, "Tmpfile: %s", tmpfile_pathname);
1495         flags = O_RDWR|O_CREAT|O_EXCL;
1496         if (pfiled->directio)
1497                 flags |= O_DIRECT;
1498         fd = open(tmpfile_pathname, flags, S_IRWXU | S_IRUSR);
1499         if (fd < 0) {
1500                 //actual error
1501                 if (errno != EEXIST){
1502                         XSEGLOG2(&lc, E, "Error opening %s", tmpfile_pathname);
1503                         goto out;
1504                 } else {
1505                         XSEGLOG2(&lc, E, "Error opening %s. Stale data found.",
1506                                         tmpfile_pathname);
1507                 }
1508                 ret = -1;
1509         } else {
1510                 XSEGLOG2(&lc, D, "Tmpfile %s created. Trying to get lock",
1511                                 tmpfile_pathname);
1512                 r = __try_lock(pfiled, tmpfile_pathname, lockfile_pathname,
1513                                 req->flags, fd);
1514                 if (r < 0){
1515                         XSEGLOG2(&lc, E, "Trying to get lock %s failed", buf);
1516                         ret = -1;
1517                 } else {
1518                         XSEGLOG2(&lc, D, "Trying to get lock %s succeed", buf);
1519                         ret = 0;
1520                 }
1521                 r = close(fd);
1522                 if (r < 0) {
1523                         XSEGLOG2(&lc, W, "Error closing %s", tmpfile_pathname);
1524                 }
1525                 r = unlink(tmpfile_pathname);
1526                 if (r < 0) {
1527                         XSEGLOG2(&lc, E, "Error unlinking %s", tmpfile_pathname);
1528                 }
1529         }
1530 out:
1531         if (ret < 0){
1532                 XSEGLOG2(&lc, I, "Failed to acquire lock %s", buf);
1533                 pfiled_fail(peer, pr);
1534         }
1535         else{
1536                 XSEGLOG2(&lc, I, "Acquired lock %s", buf);
1537                 pfiled_complete(peer, pr);
1538         }
1539         free(buf);
1540         free(lockfile_pathname);
1541         free(tmpfile_pathname);
1542         return;
1543 }
1544
1545 static void handle_release(struct peerd *peer, struct peer_req *pr)
1546 {
1547         struct pfiled *pfiled = __get_pfiled(peer);
1548 //      struct fio *fio = __get_fio(pr);
1549         struct xseg_request *req = pr->req;
1550         char *buf = malloc(MAX_FILENAME_SIZE + 1);
1551         char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
1552         char *tmpbuf = malloc(MAX_UNIQUESTR_LEN + 1);
1553         char *target = xseg_get_target(peer->xseg, req);
1554         int r, buf_len, direct;
1555
1556         if (!buf || !pathname) {
1557                 XSEGLOG2(&lc, E, "Out of memory");
1558                 fail(peer, pr);
1559                 return;
1560         }
1561
1562         r = is_target_valid_len(pfiled, target, req->targetlen, READ);
1563         if (r < 0) {
1564                 XSEGLOG2(&lc, E, "Target not valid");
1565                 goto out;
1566         }
1567
1568         buf_len = strnjoin(buf, 2 , target, req->targetlen, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1569
1570         XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
1571
1572         r = create_path(pathname, pfiled, buf, buf_len, 0);
1573         if (r < 0) {
1574                 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1575                 goto out;
1576         }
1577
1578         direct = pfiled->directio;
1579
1580         if ((req->flags & XF_FORCE) || !__locked_by(pathname, pfiled->uniquestr,
1581                                                 pfiled->uniquestr_len, direct)) {
1582                 r = unlink(pathname);
1583                 if (r < 0) {
1584                         XSEGLOG2(&lc, E, "Could not unlink %s", pathname);
1585                         goto out;
1586                 }
1587         } else {
1588                 r = -1;
1589         }
1590
1591 out:
1592         if (r < 0) {
1593                 fail(peer, pr);
1594         }
1595         else {
1596                 XSEGLOG2(&lc, I, "Released lockfile: %s", buf);
1597                 complete(peer, pr);
1598         }
1599         XSEGLOG2(&lc, I, "Finished. Lockfile: %s", buf);
1600         free(buf);
1601         free(tmpbuf);
1602         free(pathname);
1603         return;
1604 }
1605
1606 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1607                                 enum dispatch_reason reason)
1608 {
1609         struct fio *fio = __get_fio(pr);
1610         if (reason == dispatch_accept)
1611                 fio->h = NoEntry;
1612
1613         switch (req->op) {
1614                 case X_READ:
1615                         handle_read(peer, pr); break;
1616                 case X_WRITE:
1617                         handle_write(peer, pr); break;
1618                 case X_INFO:
1619                         handle_info(peer, pr); break;
1620                 case X_COPY:
1621                         handle_copy(peer, pr); break;
1622                 case X_DELETE:
1623                         handle_delete(peer, pr); break;
1624                 case X_ACQUIRE:
1625                         handle_acquire(peer, pr); break;
1626                 case X_RELEASE:
1627                         handle_release(peer, pr); break;
1628                 case X_HASH:
1629                         handle_hash(peer, pr); break;
1630                 case X_SYNC:
1631                 default:
1632                         handle_unknown(peer, pr);
1633         }
1634         return 0;
1635 }
1636
1637 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1638 {
1639         /*
1640         get blocks,maps paths
1641         get optional pithos block,maps paths
1642         get fdcache size
1643         check if greater than limit (tip: getrlimit)
1644         assert cachesize greater than nr_ops
1645         assert nr_ops greater than nr_threads
1646         get prefix
1647         */
1648
1649         int ret = 0;
1650         int i, r;
1651         struct fio *fio;
1652         struct pfiled *pfiled = malloc(sizeof(struct pfiled));
1653         struct rlimit rlim;
1654         struct xcache_ops c_ops = {
1655                 .on_node_init = cache_node_init,
1656                 .on_init = cache_init,
1657                 .on_put = cache_put,
1658         };
1659         if (!pfiled){
1660                 XSEGLOG2(&lc, E, "Out of memory");
1661                 ret = -ENOMEM;
1662                 goto out;
1663         }
1664         peer->priv = pfiled;
1665
1666         pfiled->maxfds = 2 * peer->nr_ops;
1667
1668         for (i = 0; i < peer->nr_ops; i++) {
1669                 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
1670                 if (!peer->peer_reqs->priv){
1671                         XSEGLOG2(&lc, E, "Out of memory");
1672                         ret = -ENOMEM;
1673                         goto out;
1674                 }
1675                 fio = __get_fio(&peer->peer_reqs[i]);
1676                 fio->str_id[0] = '_';
1677                 fio->str_id[1] = 'a' + (i / 26);
1678                 fio->str_id[2] = 'a' + (i % 26);
1679         }
1680
1681         pfiled->vpath[0] = 0;
1682         pfiled->prefix[0] = 0;
1683         pfiled->uniquestr[0] = 0;
1684
1685         BEGIN_READ_ARGS(argc, argv);
1686         READ_ARG_ULONG("--fdcache", pfiled->maxfds);
1687         READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
1688         READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
1689         READ_ARG_STRING("--uniquestr", pfiled->uniquestr, MAX_UNIQUESTR_LEN);
1690         READ_ARG_BOOL("--directio", pfiled->directio);
1691         END_READ_ARGS();
1692
1693         pfiled->uniquestr_len = strlen(pfiled->uniquestr);
1694         pfiled->prefix_len = strlen(pfiled->prefix);
1695
1696         //TODO test path exist/is_dir/have_access
1697         pfiled->vpath_len = strlen(pfiled->vpath);
1698         if (!pfiled->vpath_len){
1699                 XSEGLOG2(&lc, E, "Archipelago path was not provided");
1700                 usage(argv[0]);
1701                 return -1;
1702         }
1703         if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
1704                 pfiled->vpath[pfiled->vpath_len] = '/';
1705                 pfiled->vpath[++pfiled->vpath_len]= 0;
1706         }
1707
1708         r = getrlimit(RLIMIT_NOFILE, &rlim);
1709         if (r < 0) {
1710                 XSEGLOG2(&lc, E, "Could not get limit for max fds");
1711                 return -1;
1712         }
1713         //TODO check nr_ops == nr_threads.
1714         //
1715         r = xcache_init(&pfiled->cache, pfiled->maxfds, &c_ops, XCACHE_LRU_HEAP, peer);
1716         if (r < 0)
1717                 return -1;
1718         //check max fds. (> fdcache + nr_threads)
1719         //TODO assert fdcache > 2*nr_threads or add waitq
1720         if (rlim.rlim_cur < pfiled->cache.size + peer->nr_threads - 4) {
1721                 XSEGLOG2(&lc, E, "FD limit %d is less than cachesize + nr_ops -4(%u)",
1722                                 rlim.rlim_cur, pfiled->cache.size + peer->nr_ops - 4);
1723                 return -1;
1724         }
1725
1726 out:
1727         return ret;
1728 }
1729
1730 void custom_peer_finalize(struct peerd *peer)
1731 {
1732         /*
1733         we could close all fds, but we can let the system do it for us.
1734         */
1735         return;
1736 }
1737
1738 /*
1739 static int safe_atoi(char *s)
1740 {
1741         long l;
1742         char *endp;
1743
1744         l = strtol(s, &endp, 10);
1745         if (s != endp && *endp == '\0')
1746                 return l;
1747         else
1748                 return -1;
1749 }
1750 */