2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
36 * The Pithos File Blocker Peer (pfiled)
42 #include <sys/types.h>
52 #include <sys/sendfile.h>
54 #include <xtypes/xcache.h>
55 #include <openssl/sha.h>
56 #include <sys/resource.h>
58 #include <xseg/xseg.h>
59 #include <xseg/protocol.h>
63 #define FIO_STR_ID_LEN 3
64 #define LOCK_SUFFIX "_lock"
65 #define LOCK_SUFFIX_LEN 5
66 #define HASH_SUFFIX "_hash"
67 #define HASH_SUFFIX_LEN 5
68 #define MAX_PATH_SIZE 1024
69 #define MAX_FILENAME_SIZE (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + MAX_UNIQUESTR_LEN + FIO_STR_ID_LEN)
70 #define MAX_PREFIX_LEN 10
71 #define MAX_UNIQUESTR_LEN 128
72 #define SNAP_SUFFIX "_snap"
73 #define SNAP_SUFFIX_LEN 5
78 #define min(_a, _b) (_a < _b ? _a : _b)
81 * Globals, holding command-line arguments
84 void custom_peer_usage(char *argv0)
86 fprintf(stderr, "General peer options:\n"
87 " Option | Default | \n"
88 " --------------------------------------------\n"
89 " --fdcache | 2 * nr_ops | Fd cache size\n"
90 " --archip | None | Archipelago directory\n"
91 " --prefix | None | Common prefix of objects that should be stripped\n"
92 " --uniquestr | None | Unique string for this instance\n"
97 /* fdcache_node flags */
98 #define READY (1 << 1)
100 /* fdcache node info */
101 struct fdcache_entry {
103 volatile unsigned int flags;
110 uint32_t uniquestr_len;
112 char vpath[MAX_PATH_SIZE + 1];
113 char prefix[MAX_PREFIX_LEN + 1];
114 char uniquestr[MAX_UNIQUESTR_LEN + 1];
119 * pfiled specific structure
120 * containing information on a pending I/O operation
125 char str_id[FIO_STR_ID_LEN];
128 struct pfiled * __get_pfiled(struct peerd *peer)
130 return (struct pfiled *) peer->priv;
133 struct fio * __get_fio(struct peer_req *pr)
135 return (struct fio*) pr->priv;
140 static void * cache_node_init(void *p, void *xh)
142 //struct peerd *peer = (struct peerd *)p;
143 //struct pfiled *pfiled = __get_pfiled(peer);
144 xcache_handler h = *(xcache_handler *)(xh);
145 struct fdcache_entry *fdentry = malloc(sizeof(struct fdcache_entry));
149 XSEGLOG2(&lc, D, "Initialing node h: %llu with %p",
150 (long long unsigned)h, fdentry);
158 static int cache_init(void *p, void *e)
160 struct fdcache_entry *fdentry = (struct fdcache_entry *)e;
162 if (fdentry->fd != -1) {
163 XSEGLOG2(&lc, E, "Found invalid fd %d", fdentry->fd);
170 static void cache_put(void *p, void *e)
172 struct fdcache_entry *fdentry = (struct fdcache_entry *)e;
174 XSEGLOG2(&lc, D, "Putting entry %p with fd %d", fdentry, fdentry->fd);
176 if (fdentry->fd != -1)
184 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
186 struct pfiled *pfiled = __get_pfiled(peer);
187 struct fio *fio = __get_fio(pr);
188 if (fio->h != NoEntry)
189 xcache_put(&pfiled->cache, fio->h);
192 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
194 close_cache_entry(peer, pr);
198 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
200 close_cache_entry(peer, pr);
204 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
206 XSEGLOG2(&lc, W, "unknown request op");
207 pfiled_fail(peer, pr);
210 static void get_dirs(char buf[6], struct pfiled *pfiled, char *target, uint32_t targetlen)
212 unsigned char sha[SHA256_DIGEST_SIZE];
213 char hex[HEXLIFIED_SHA256_DIGEST_SIZE];
214 char *prefix = pfiled->prefix;
215 uint32_t prefixlen = pfiled->prefix_len;
217 if (strncmp(target, prefix, prefixlen)) {
218 strncpy(buf, target, 6);
222 SHA256((unsigned char *)target, targetlen, sha);
223 hexlify(sha, 3, hex);
224 strncpy(buf, hex, 6);
228 static int create_path(char *buf, struct pfiled *pfiled, char *target,
229 uint32_t targetlen, int mkdirs)
234 char *path = pfiled->vpath;
235 uint32_t pathlen = pfiled->vpath_len;
237 get_dirs(dirs, pfiled, target, targetlen);
239 strncpy(buf, path, pathlen);
241 for (i = 0; i < 9; i+= 3) {
242 buf[pathlen + i] = dirs[i - (i/3)];
243 buf[pathlen + i +1] = dirs[i + 1 - (i/3)];
244 buf[pathlen + i + 2] = '/';
246 buf[pathlen + i + 3] = '\0';
248 if (stat(buf, &st) < 0)
249 if (mkdir(buf, 0750) < 0) {
258 strncpy(&buf[pathlen + 9], target, targetlen);
259 buf[pathlen + 9 + targetlen] = '\0';
264 static int is_target_valid_len(struct pfiled *pfiled, char *target,
265 uint32_t targetlen, int mode)
267 if (targetlen > XSEG_MAX_TARGETLEN) {
268 XSEGLOG2(&lc, E, "Invalid targetlen %u, max: %u",
269 targetlen, XSEG_MAX_TARGETLEN);
272 if (mode == WRITE || mode == READ) {
274 * if name starts with prefix
275 * assert targetlen >= prefix_len + 6
277 * assert targetlen >= 6
279 /* 6 chars are needed for the directory structrure */
280 if (!pfiled->prefix_len || strncmp(target, pfiled->prefix, pfiled->prefix_len)) {
282 XSEGLOG2(&lc, E, "Targetlen should be at least 6");
286 if (targetlen < pfiled->prefix_len + 6) {
287 XSEGLOG2(&lc, E, "Targetlen should be at least prefix "
288 "len(%u) + 6", pfiled->prefix_len);
293 XSEGLOG2(&lc, E, "Invalid mode");
301 static int is_target_valid(struct pfiled *pfiled, char *target, int mode)
303 return is_target_valid_len(pfiled, target, strlen(target), mode);
307 static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetlen)
310 char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
311 char error_str[1024];
313 r = create_path(tmp, pfiled, target, targetlen, 1);
315 XSEGLOG2(&lc, E, "Could not create path");
318 XSEGLOG2(&lc, D, "Opening file %s with O_RDWR|O_CREAT", tmp);
319 fd = open(tmp, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
321 XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
327 static int open_file_read(struct pfiled *pfiled, char *target, uint32_t targetlen)
330 char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
331 char error_str[1024];
333 r = create_path(tmp, pfiled, target, targetlen, 0);
335 XSEGLOG2(&lc, E, "Could not create path");
338 XSEGLOG2(&lc, D, "Opening file %s with O_RDWR", tmp);
339 fd = open(tmp, O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
341 XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
347 static int open_file(struct pfiled *pfiled, char *target, uint32_t targetlen, int mode)
350 return open_file_write(pfiled, target, targetlen);
351 } else if (mode == READ) {
352 return open_file_read(pfiled, target, targetlen);
355 XSEGLOG2(&lc, E, "Invalid mode for target");
360 static int dir_open(struct pfiled *pfiled, struct fio *fio,
361 char *target, uint32_t targetlen, int mode)
364 struct fdcache_entry *e;
365 xcache_handler h = NoEntry, nh;
366 char name[XSEG_MAX_TARGETLEN + 1];
368 if (targetlen > XSEG_MAX_TARGETLEN) {
369 XSEGLOG2(&lc, E, "Invalid targetlen %u, max: %u",
370 targetlen, XSEG_MAX_TARGETLEN);
373 strncpy(name, target, targetlen);
375 XSEGLOG2(&lc, I, "Dir open started for %s", name);
377 h = xcache_lookup(&pfiled->cache, name);
379 r = is_target_valid_len(pfiled, target, targetlen, mode);
381 XSEGLOG2(&lc, E, "Invalid len for target %s", name);
385 h = xcache_alloc_init(&pfiled->cache, name);
387 /* FIXME add waitq to wait for free */
388 XSEGLOG2(&lc, E, "Could not allocate cache entry for %s",
392 XSEGLOG2(&lc, D, "Allocated new handler %llu for %s",
393 (long long unsigned)h, name);
395 e = xcache_get_entry(&pfiled->cache, h);
397 XSEGLOG2(&lc, E, "Alloced handler but no valid fd cache entry");
401 /* open/create file */
402 fd = open_file(pfiled, target, targetlen, mode);
404 XSEGLOG2(&lc, E, "Could not open file for target %s", name);
407 XSEGLOG2(&lc, D, "Opened file %s. fd %d", name, fd);
411 XSEGLOG2(&lc, D, "Inserting handler %llu for %s to fdcache",
412 (long long unsigned)h, name);
413 nh = xcache_insert(&pfiled->cache, h);
415 XSEGLOG2(&lc, D, "Partial cache hit for %s. New handler %llu",
416 name, (long long unsigned)nh);
417 xcache_put(&pfiled->cache, h);
421 XSEGLOG2(&lc, D, "Cache hit for %s, handler: %llu", name,
422 (long long unsigned)h);
425 e = xcache_get_entry(&pfiled->cache, h);
427 XSEGLOG2(&lc, E, "Found handler but no valid fd cache entry");
428 xcache_put(&pfiled->cache, h);
434 //assert e->fd != -1 ?;
435 XSEGLOG2(&lc, I, "Dir open finished for %s", name);
439 xcache_free_new(&pfiled->cache, h);
441 XSEGLOG2(&lc, E, "Dir open failed for %s", name);
445 static void handle_read(struct peerd *peer, struct peer_req *pr)
447 struct pfiled *pfiled = __get_pfiled(peer);
448 struct fio *fio = __get_fio(pr);
449 struct xseg_request *req = pr->req;
451 char *target = xseg_get_target(peer->xseg, req);
452 char *data = xseg_get_data(peer->xseg, req);
454 XSEGLOG2(&lc, I, "Handle read started for pr: %p, req: %p", pr, pr->req);
457 pfiled_complete(peer, pr);
461 if (req->datalen < req->size) {
462 XSEGLOG2(&lc, E, "Request datalen is less than request size");
463 pfiled_fail(peer, pr);
468 fd = dir_open(pfiled, fio, target, req->targetlen, READ);
470 if (errno != ENOENT) {
471 XSEGLOG2(&lc, E, "Open failed");
472 pfiled_fail(peer, pr);
475 memset(data, 0, req->size);
476 req->serviced = req->size;
482 XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
484 while (req->serviced < req->size) {
485 XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
486 req->serviced, req->size);
487 r = pread(fd, data + req->serviced,
488 req->size- req->serviced,
489 req->offset + req->serviced);
491 XSEGLOG2(&lc, E, "Cannot read");
495 /* reached end of file. zero out the rest data buffer */
496 memset(data + req->serviced, 0, req->size - req->serviced);
497 req->serviced = req->size;
503 XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
507 if (req->serviced > 0 ) {
508 XSEGLOG2(&lc, I, "Handle read completed for pr: %p, req: %p",
510 pfiled_complete(peer, pr);
513 XSEGLOG2(&lc, E, "Handle read failed for pr: %p, req: %p",
515 pfiled_fail(peer, pr);
520 static void handle_write(struct peerd *peer, struct peer_req *pr)
522 struct pfiled *pfiled = __get_pfiled(peer);
523 struct fio *fio = __get_fio(pr);
524 struct xseg_request *req = pr->req;
526 char *target = xseg_get_target(peer->xseg, req);
527 char *data = xseg_get_data(peer->xseg, req);
529 XSEGLOG2(&lc, I, "Handle write started for pr: %p, req: %p", pr, pr->req);
531 if (req->datalen < req->size) {
532 XSEGLOG2(&lc, E, "Request datalen is less than request size");
533 pfiled_fail(peer, pr);
537 fd = dir_open(pfiled, fio, target, req->targetlen, WRITE);
539 XSEGLOG2(&lc, E, "Open failed");
540 pfiled_fail(peer, pr);
545 if (req->flags & (XF_FLUSH | XF_FUA)) {
546 /* No FLUSH/FUA support yet (O_SYNC ?).
547 * note that with FLUSH/size == 0
548 * there will probably be a (uint64_t)-1 offset */
549 pfiled_complete(peer, pr);
552 pfiled_complete(peer, pr);
557 XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
559 while (req->serviced < req->size) {
560 XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
561 req->serviced, req->size);
562 r = pwrite(fd, data + req->serviced,
563 req->size- req->serviced,
564 req->offset + req->serviced);
572 XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
576 XSEGLOG2(&lc, E, "Fsync failed.");
577 /* if fsync fails, then no bytes serviced correctly */
581 if (req->serviced > 0 ) {
582 XSEGLOG2(&lc, I, "Handle write completed for pr: %p, req: %p",
584 pfiled_complete(peer, pr);
587 XSEGLOG2(&lc, E, "Handle write failed for pr: %p, req: %p",
589 pfiled_fail(peer, pr);
594 static void handle_info(struct peerd *peer, struct peer_req *pr)
596 struct pfiled *pfiled = __get_pfiled(peer);
597 struct fio *fio = __get_fio(pr);
598 struct xseg_request *req = pr->req;
602 char *target = xseg_get_target(peer->xseg, req);
603 char *data = xseg_get_data(peer->xseg, req);
604 char buf[XSEG_MAX_TARGETLEN + 1];
605 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data;
607 if (req->datalen < sizeof(struct xseg_reply_info)) {
608 strncpy(buf, target, req->targetlen);
609 r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info));
611 XSEGLOG2(&lc, E, "Cannot resize request");
612 pfiled_fail(peer, pr);
615 target = xseg_get_target(peer->xseg, req);
616 strncpy(target, buf, req->targetlen);
619 XSEGLOG2(&lc, I, "Handle info started for pr: %p, req: %p", pr, pr->req);
620 fd = dir_open(pfiled, fio, target, req->targetlen, READ);
622 XSEGLOG2(&lc, E, "Dir open failed");
623 pfiled_fail(peer, pr);
627 r = fstat(fd, &stat);
629 XSEGLOG2(&lc, E, "fail in stat");
630 pfiled_fail(peer, pr);
634 size = (uint64_t)stat.st_size;
637 XSEGLOG2(&lc, I, "Handle info completed for pr: %p, req: %p", pr, pr->req);
638 pfiled_complete(peer, pr);
641 static void handle_copy(struct peerd *peer, struct peer_req *pr)
643 struct pfiled *pfiled = __get_pfiled(peer);
644 struct fio *fio = __get_fio(pr);
645 struct xseg_request *req = pr->req;
646 char *target = xseg_get_target(peer->xseg, req);
647 char *data = xseg_get_data(peer->xseg, req);
648 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
650 char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
651 int src = -1, dst = -1, r = -1;
652 ssize_t c = 0, bytes;
655 XSEGLOG2(&lc, I, "Handle copy started for pr: %p, req: %p", pr, pr->req);
657 XSEGLOG2(&lc, E, "Out of memory");
658 pfiled_fail(peer, pr);
662 r = is_target_valid_len(pfiled, xcopy->target, xcopy->targetlen, READ);
664 XSEGLOG2(&lc, E, "Source target not valid");
668 dst = dir_open(pfiled, fio, target, req->targetlen, WRITE);
670 XSEGLOG2(&lc, E, "Fail in dst");
675 r = create_path(buf, pfiled, xcopy->target, xcopy->targetlen, 0);
677 XSEGLOG2(&lc, E, "Create path failed");
682 src = open(buf, O_RDONLY);
684 XSEGLOG2(&lc, E, "fail in src %s", buf);
691 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
697 limit = min(req->size, st.st_size);
699 bytes = sendfile(dst, src, NULL, limit - c);
701 XSEGLOG2(&lc, E, "Copy failed for %s", buf);
711 if (limit && c == limit)
712 req->serviced = req->size;
718 XSEGLOG2(&lc, E, "Handle copy failed for pr: %p, req: %p", pr, pr->req);
719 pfiled_fail(peer, pr);
721 XSEGLOG2(&lc, I, "Handle copy completed for pr: %p, req: %p", pr, pr->req);
722 pfiled_complete(peer, pr);
727 static void handle_delete(struct peerd *peer, struct peer_req *pr)
729 struct pfiled *pfiled = __get_pfiled(peer);
730 //struct fio *fio = __get_fio(pr);
731 struct xseg_request *req = pr->req;
732 char name[XSEG_MAX_TARGETLEN + 1];
733 char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
735 char *target = xseg_get_target(peer->xseg, req);
737 XSEGLOG2(&lc, I, "Handle delete started for pr: %p, req: %p", pr, pr->req);
740 XSEGLOG2(&lc, E, "Out of memory");
741 pfiled_fail(peer, pr);
745 r = is_target_valid_len(pfiled, target, req->targetlen, READ);
747 XSEGLOG2(&lc, E, "Target not valid");
751 r = create_path(buf, pfiled, target, req->targetlen, 0);
753 XSEGLOG2(&lc, E, "Create path failed");
760 XSEGLOG2(&lc, E, "Handle delete failed for pr: %p, req: %p", pr, pr->req);
761 pfiled_fail(peer, pr);
763 strncpy(name, target, XSEG_MAX_TARGETLEN);
764 name[XSEG_MAX_TARGETLEN] = 0;
765 xcache_invalidate(&pfiled->cache, name);
766 XSEGLOG2(&lc, I, "Handle delete completed for pr: %p, req: %p", pr, pr->req);
767 pfiled_complete(peer, pr);
772 static int __get_precalculated_hash(struct peerd *peer, char *target,
773 uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
778 char *hash_file = NULL, *hash_path = NULL;
779 char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
780 struct pfiled *pfiled = __get_pfiled(peer);
782 XSEGLOG2(&lc, D, "Started.");
784 hash_file = malloc(MAX_FILENAME_SIZE + 1);
785 hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
788 strncpy(hash_file+pos, target, targetlen);
790 strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
791 pos += HASH_SUFFIX_LEN;
795 r = create_path(hash_path, pfiled, hash_file, pos, 1);
797 XSEGLOG2(&lc, E, "Create path failed");
801 fd = open(hash_path, O_RDONLY, S_IRWXU | S_IRUSR);
803 if (errno != ENOENT){
804 XSEGLOG2(&lc, E, "Error opening %s", hash_path);
806 XSEGLOG2(&lc, I, "No precalculated hash for %s", hash_file);
812 r = pread(fd, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
814 XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
820 XSEGLOG2(&lc, D, "Read %u bytes", len);
824 XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
828 if (len == HEXLIFIED_SHA256_DIGEST_SIZE){
829 strncpy(hash, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE);
830 hash[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
831 XSEGLOG2(&lc, D, "Found hash for %s : %s", hash_file, hash);
836 XSEGLOG2(&lc, D, "Finished.");
840 static int __set_precalculated_hash(struct peerd *peer, char *target,
841 uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
846 char *hash_file = NULL, *hash_path = NULL;
847 char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
848 struct pfiled *pfiled = __get_pfiled(peer);
850 XSEGLOG2(&lc, D, "Started.");
852 hash_file = malloc(MAX_FILENAME_SIZE + 1);
853 hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
856 strncpy(hash_file+pos, target, targetlen);
858 strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
859 pos += HASH_SUFFIX_LEN;
862 r = create_path(hash_path, pfiled, hash_file, pos, 1);
864 XSEGLOG2(&lc, E, "Create path failed");
868 fd = open(hash_path, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
870 if (errno != ENOENT){
871 XSEGLOG2(&lc, E, "Error opening %s", hash_path);
873 XSEGLOG2(&lc, I, "Hash file already exists %s", hash_file);
879 r = pwrite(fd, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
881 XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
887 XSEGLOG2(&lc, D, "Wrote %u bytes", len);
891 XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
897 XSEGLOG2(&lc, D, "Finished.");
901 static void handle_hash(struct peerd *peer, struct peer_req *pr)
906 //stat (open without create)
907 //write to hash_tmpfile
910 int src = -1, dst = -1, r = -1, pos;
912 uint64_t sum, written, trailing_zeros;
913 struct pfiled *pfiled = __get_pfiled(peer);
914 struct fio *fio = __get_fio(pr);
915 struct xseg_request *req = pr->req;
916 char *pathname = NULL, *tmpfile_pathname = NULL, *tmpfile = NULL;
918 char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
919 char name[XSEG_MAX_TARGETLEN + 1];
921 unsigned char *object_data = NULL;
922 unsigned char sha[SHA256_DIGEST_SIZE];
923 struct xseg_reply_hash *xreply;
925 target = xseg_get_target(peer->xseg, req);
927 XSEGLOG2(&lc, I, "Handle hash started for pr: %p, req: %p",
931 XSEGLOG2(&lc, E, "No request size provided");
936 r = is_target_valid_len(pfiled, target, req->targetlen, READ);
938 XSEGLOG2(&lc, E, "Source target not valid");
942 r = __get_precalculated_hash(peer, target, req->targetlen, hash_name);
944 XSEGLOG2(&lc, E, "Error getting precalculated hash");
948 if (hash_name[0] != 0) {
949 XSEGLOG2(&lc, I, "Precalucated hash found %s", hash_name);
953 XSEGLOG2(&lc, I, "No precalculated hash found");
955 strncpy(name, target, req->targetlen);
956 name[req->targetlen] = 0;
958 pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
959 object_data = malloc(sizeof(char) * req->size);
960 if (!pathname || !object_data){
961 XSEGLOG2(&lc, E, "Out of memory");
965 src = dir_open(pfiled, fio, target, req->targetlen, READ);
967 XSEGLOG2(&lc, E, "Fail in src");
973 while (sum < req->size) {
974 c = pread(src, object_data + sum, req->size - sum, sum);
976 XSEGLOG2(&lc, E, "Error reading from source");
986 //rstrip here in case zeros were written in the end
988 for (;trailing_zeros < sum; trailing_zeros++)
989 if (object_data[sum - trailing_zeros - 1])
992 XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
993 sum, trailing_zeros);
995 sum -= trailing_zeros;
996 //calculate hash name
997 SHA256(object_data, sum, sha);
999 hexlify(sha, SHA256_DIGEST_SIZE, hash_name);
1000 hash_name[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
1003 r = create_path(pathname, pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, 1);
1005 XSEGLOG2(&lc, E, "Create path failed");
1012 dst = open(pathname, O_WRONLY);
1014 XSEGLOG2(&lc, I, "%s already exists, no write needed", pathname);
1015 req->serviced = req->size;
1020 tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
1021 if (!tmpfile_pathname){
1022 XSEGLOG2(&lc, E, "Out of memory");
1027 tmpfile = malloc(MAX_FILENAME_SIZE);
1029 XSEGLOG2(&lc, E, "Out of memory");
1035 strncpy(tmpfile + pos, target, req->targetlen);
1036 pos += req->targetlen;
1037 strncpy(tmpfile + pos, SNAP_SUFFIX, SNAP_SUFFIX_LEN);
1038 pos += SNAP_SUFFIX_LEN;
1039 strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
1040 pos += pfiled->uniquestr_len;
1041 strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
1042 pos += FIO_STR_ID_LEN;
1045 r = create_path(tmpfile_pathname, pfiled, tmpfile, pos, 1);
1047 XSEGLOG2(&lc, E, "Create path failed");
1052 XSEGLOG2(&lc, D, "Opening %s", tmpfile_pathname);
1053 dst = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL,
1054 S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1056 if (errno != EEXIST){
1057 char error_str[1024];
1058 XSEGLOG2(&lc, E, "Error opening %s (%s)", tmpfile_pathname, strerror_r(errno, error_str, 1023));
1060 XSEGLOG2(&lc, E, "Error opening %s. Stale data found.",
1066 XSEGLOG2(&lc, D, "Opened %s", tmpfile_pathname);
1069 while (written < sum) {
1070 c = write(dst, object_data + written, sum - written);
1072 XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
1079 r = link(tmpfile_pathname, pathname);
1080 if (r < 0 && errno != EEXIST) {
1081 XSEGLOG2(&lc, E, "Error linking tmp file %s. Errno %d",
1087 r = unlink(tmpfile_pathname);
1089 XSEGLOG2(&lc, W, "Error unlinking tmp file %s", tmpfile_pathname);
1093 r = __set_precalculated_hash(peer, target, req->targetlen, hash_name);
1095 XSEGLOG2(&lc, W, "Error setting precalculated hash");
1100 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
1101 sizeof(struct xseg_reply_hash));
1103 XSEGLOG2(&lc, E, "Resize request failed");
1108 xreply = (struct xseg_reply_hash *)xseg_get_data(peer->xseg, req);
1109 strncpy(xreply->target, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE);
1110 xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
1112 req->serviced = req->size;
1120 XSEGLOG2(&lc, E, "Handle hash failed for pr: %p, req: %p. ",
1121 "Target %s", pr, pr->req, name);
1122 pfiled_fail(peer, pr);
1124 XSEGLOG2(&lc, I, "Handle hash completed for pr: %p, req: %p\n\t"
1125 "hashed %s to %s", pr, pr->req, name, hash_name);
1126 pfiled_complete(peer, pr);
1128 free(tmpfile_pathname);
1134 unlink(tmpfile_pathname);
1138 static int __locked_by(char *lockfile, char *expected, uint32_t expected_len)
1143 char tmpbuf[MAX_UNIQUESTR_LEN];
1145 XSEGLOG2(&lc, D, "Started. Lockfile: %s, expected: %s, expected_len: %u", lockfile, expected, expected_len);
1146 fd = open(lockfile, O_RDONLY, S_IRWXU | S_IRUSR);
1148 if (errno != ENOENT){
1149 XSEGLOG2(&lc, E, "Error opening %s", lockfile);
1152 XSEGLOG2(&lc, I, "lock file removed");
1157 r = pread(fd, tmpbuf, MAX_UNIQUESTR_LEN, 0);
1159 XSEGLOG2(&lc, E, "Error reading from %s", lockfile);
1164 XSEGLOG2(&lc, D, "Read %u bytes", len);
1167 XSEGLOG2(&lc, E, "Could not close lockfile %s", lockfile);
1170 if (len == expected_len && !strncmp(tmpbuf, expected, expected_len)){
1171 XSEGLOG2(&lc, D, "Lock file %s locked by us.", lockfile);
1175 XSEGLOG2(&lc, D, "Finished. Lockfile: %s", lockfile);
1179 static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
1180 uint32_t flags, int fd)
1183 XSEGLOG2(&lc, D, "Started. Lockfile: %s, Tmpfile:%s", lockfile, tmpfile);
1184 r = pwrite(fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
1193 while (link(tmpfile, lockfile) < 0) {
1195 if (errno != EEXIST){
1196 XSEGLOG2(&lc, E, "Error linking %s to %s",
1200 r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len);
1204 if (flags & XF_NOSYNC) {
1205 XSEGLOG2(&lc, D, "Could not get lock file %s, "
1206 "XF_NOSYNC set. Aborting", lockfile);
1211 XSEGLOG2(&lc, D, "Finished. Lockfile: %s", lockfile);
1215 static void handle_acquire(struct peerd *peer, struct peer_req *pr)
1218 struct pfiled *pfiled = __get_pfiled(peer);
1219 struct fio *fio = __get_fio(pr);
1220 struct xseg_request *req = pr->req;
1221 char *buf = malloc(MAX_FILENAME_SIZE);
1222 char *tmpfile = malloc(MAX_FILENAME_SIZE);
1223 char *lockfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1224 char *tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1226 char *target = xseg_get_target(peer->xseg, req);
1227 uint32_t buf_len, tmpfile_len;
1229 if (!buf || !tmpfile_pathname || !lockfile_pathname) {
1230 XSEGLOG2(&lc, E, "Out of memory");
1231 pfiled_fail(peer, pr);
1235 r = is_target_valid_len(pfiled, target, req->targetlen, READ);
1237 XSEGLOG2(&lc, E, "Target not valid");
1243 strncpy(buf + pos, target, req->targetlen);
1244 pos = req->targetlen;
1245 strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1246 pos += LOCK_SUFFIX_LEN;
1250 XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
1254 strncpy(tmpfile + pos, buf, buf_len);
1256 strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
1257 pos += pfiled->uniquestr_len;
1258 strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
1259 pos += FIO_STR_ID_LEN;
1263 XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
1265 if (create_path(tmpfile_pathname, pfiled, tmpfile, tmpfile_len, 1) < 0) {
1266 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1270 if (create_path(lockfile_pathname, pfiled, buf, buf_len, 1) < 0) {
1271 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1275 //create exclusive unique lockfile (block_uniqueid+target)
1277 // write blocker uniqueid to the unique lockfile
1278 // try to link it to the lockfile
1280 // unlink unique lockfile;
1283 // spin while not able to link
1286 XSEGLOG2(&lc, D, "Tmpfile: %s", tmpfile_pathname);
1287 fd = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
1290 if (errno != EEXIST){
1291 XSEGLOG2(&lc, E, "Error opening %s", tmpfile_pathname);
1294 XSEGLOG2(&lc, E, "Error opening %s. Stale data found.",
1299 XSEGLOG2(&lc, D, "Tmpfile %s created. Trying to get lock",
1301 r = __try_lock(pfiled, tmpfile_pathname, lockfile_pathname,
1304 XSEGLOG2(&lc, E, "Trying to get lock %s failed", buf);
1307 XSEGLOG2(&lc, D, "Trying to get lock %s succeed", buf);
1312 XSEGLOG2(&lc, W, "Error closing %s", tmpfile_pathname);
1314 r = unlink(tmpfile_pathname);
1316 XSEGLOG2(&lc, E, "Error unlinking %s", tmpfile_pathname);
1321 XSEGLOG2(&lc, I, "Failed to acquire lock %s", buf);
1322 pfiled_fail(peer, pr);
1325 XSEGLOG2(&lc, I, "Acquired lock %s", buf);
1326 pfiled_complete(peer, pr);
1329 free(lockfile_pathname);
1330 free(tmpfile_pathname);
1334 static void handle_release(struct peerd *peer, struct peer_req *pr)
1336 struct pfiled *pfiled = __get_pfiled(peer);
1337 // struct fio *fio = __get_fio(pr);
1338 struct xseg_request *req = pr->req;
1339 char *buf = malloc(MAX_FILENAME_SIZE + 1);
1340 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
1341 char *tmpbuf = malloc(MAX_UNIQUESTR_LEN + 1);
1342 char *target = xseg_get_target(peer->xseg, req);
1345 if (!buf || !pathname) {
1346 XSEGLOG2(&lc, E, "Out of memory");
1351 r = is_target_valid_len(pfiled, target, req->targetlen, READ);
1353 XSEGLOG2(&lc, E, "Target not valid");
1358 strncpy(buf + pos, target, req->targetlen);
1359 pos += req->targetlen;
1360 strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1361 pos += LOCK_SUFFIX_LEN;
1364 XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
1366 r = create_path(pathname, pfiled, buf,
1367 req->targetlen + strlen(LOCK_SUFFIX), 0);
1369 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1373 if ((req->flags & XF_FORCE) || !__locked_by(pathname, pfiled->uniquestr,
1374 pfiled->uniquestr_len)) {
1375 r = unlink(pathname);
1377 XSEGLOG2(&lc, E, "Could not unlink %s", pathname);
1389 XSEGLOG2(&lc, I, "Released lockfile: %s", buf);
1392 XSEGLOG2(&lc, I, "Finished. Lockfile: %s", buf);
1399 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1400 enum dispatch_reason reason)
1402 struct fio *fio = __get_fio(pr);
1403 if (reason == dispatch_accept)
1408 handle_read(peer, pr); break;
1410 handle_write(peer, pr); break;
1412 handle_info(peer, pr); break;
1414 handle_copy(peer, pr); break;
1416 handle_delete(peer, pr); break;
1418 handle_acquire(peer, pr); break;
1420 handle_release(peer, pr); break;
1422 handle_hash(peer, pr); break;
1425 handle_unknown(peer, pr);
1430 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1433 get blocks,maps paths
1434 get optional pithos block,maps paths
1436 check if greater than limit (tip: getrlimit)
1437 assert cachesize greater than nr_ops
1438 assert nr_ops greater than nr_threads
1445 struct pfiled *pfiled = malloc(sizeof(struct pfiled));
1447 struct xcache_ops c_ops = {
1448 .on_node_init = cache_node_init,
1449 .on_init = cache_init,
1450 .on_put = cache_put,
1453 XSEGLOG2(&lc, E, "Out of memory");
1457 peer->priv = pfiled;
1459 pfiled->maxfds = 2 * peer->nr_ops;
1461 for (i = 0; i < peer->nr_ops; i++) {
1462 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
1463 if (!peer->peer_reqs->priv){
1464 XSEGLOG2(&lc, E, "Out of memory");
1468 fio = __get_fio(&peer->peer_reqs[i]);
1469 fio->str_id[0] = '_';
1470 fio->str_id[1] = 'a' + (i / 26);
1471 fio->str_id[2] = 'a' + (i % 26);
1474 pfiled->vpath[0] = 0;
1475 pfiled->prefix[0] = 0;
1476 pfiled->uniquestr[0] = 0;
1478 BEGIN_READ_ARGS(argc, argv);
1479 READ_ARG_ULONG("--fdcache", pfiled->maxfds);
1480 READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
1481 READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
1482 READ_ARG_STRING("--uniquestr", pfiled->uniquestr, MAX_UNIQUESTR_LEN);
1485 pfiled->uniquestr_len = strlen(pfiled->uniquestr);
1486 pfiled->prefix_len = strlen(pfiled->prefix);
1488 //TODO test path exist/is_dir/have_access
1489 pfiled->vpath_len = strlen(pfiled->vpath);
1490 if (!pfiled->vpath_len){
1491 XSEGLOG2(&lc, E, "Archipelago path was not provided");
1495 if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
1496 pfiled->vpath[pfiled->vpath_len] = '/';
1497 pfiled->vpath[++pfiled->vpath_len]= 0;
1500 r = getrlimit(RLIMIT_NOFILE, &rlim);
1502 XSEGLOG2(&lc, E, "Could not get limit for max fds");
1505 //TODO check nr_ops == nr_threads.
1507 r = xcache_init(&pfiled->cache, pfiled->maxfds, &c_ops, XCACHE_LRU_HEAP, peer);
1510 //check max fds. (> fdcache + nr_threads)
1511 //TODO assert fdcache > 2*nr_threads or add waitq
1512 if (rlim.rlim_cur < pfiled->cache.size + peer->nr_threads - 4) {
1513 XSEGLOG2(&lc, E, "FD limit %d is less than cachesize + nr_ops -4(%u)",
1514 rlim.rlim_cur, pfiled->cache.size + peer->nr_ops - 4);
1522 void custom_peer_finalize(struct peerd *peer)
1525 we could close all fds, but we can let the system do it for us.
1531 static int safe_atoi(char *s)
1536 l = strtol(s, &endp, 10);
1537 if (s != endp && *endp == '\0')