2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
36 * The Pithos File Blocker Peer (pfiled)
42 #include <sys/types.h>
53 #include <sys/sendfile.h>
56 #include <xseg/xseg.h>
57 #include <xseg/protocol.h>
59 #define LOCK_SUFFIX "_lock"
60 #define MAX_PATH_SIZE 1024
61 #define MAX_FILENAME_SIZE (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX)
62 #define MAX_PREFIX_LEN 10
64 /* default concurrency level (number of threads) */
65 #define DEFAULT_NR_OPS 16
67 /* Pithos hash for the zero block
68 * FIXME: Should it be hardcoded?
71 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
74 * Globals, holding command-line arguments
77 void custom_peer_usage(char *argv0)
79 fprintf(stderr, "Custom peer options:\n"
80 "--pithos PATH --archip VPATH --prefix PREFIX\n\n"
82 "\tPATH: path to pithos data blocks\n"
83 "\tVPATH: path to modified volume blocks\n"
84 "\tPREFIX: Common prefix of Archipelagos objects to be"
85 "striped during filesystem hierarchy creation\n"
89 /* fdcache_node flags */
90 #define READY (1 << 1)
92 /* fdcache node info */
95 volatile unsigned int ref;
96 volatile unsigned long time;
97 volatile unsigned int flags;
99 char target[MAX_FILENAME_SIZE + 1];
107 uint64_t handled_reqs;
109 struct fdcache_node *fdcache;
110 pthread_mutex_t cache_lock;
111 char path[MAX_PATH_SIZE + 1];
112 char vpath[MAX_PATH_SIZE + 1];
113 char prefix[MAX_PREFIX_LEN];
117 * pfiled specific structure
118 * containing information on a pending I/O operation
125 struct pfiled * __get_pfiled(struct peerd *peer)
127 return (struct pfiled *) peer->priv;
130 struct fio * __get_fio(struct peer_req *pr)
132 return (struct fio*) pr->priv;
135 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
137 struct pfiled *pfiled = __get_pfiled(peer);
138 struct fio *fio = __get_fio(pr);
140 if (fio->fdcacheidx >= 0) {
141 if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) {
142 pthread_mutex_lock(&pfiled->cache_lock);
143 if (!pfiled->fdcache[fio->fdcacheidx].ref){
144 /* invalidate cache entry */
145 fd = pfiled->fdcache[fio->fdcacheidx].fd;
146 pfiled->fdcache[fio->fdcacheidx].fd = -1;
147 pfiled->fdcache[fio->fdcacheidx].target[0] = 0;
148 pfiled->fdcache[fio->fdcacheidx].flags |= READY;
150 pthread_mutex_unlock(&pfiled->cache_lock);
158 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
160 close_cache_entry(peer, pr);
164 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
166 close_cache_entry(peer, pr);
170 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
172 XSEGLOG2(&lc, W, "unknown request op");
173 pfiled_fail(peer, pr);
176 static int create_path(char *buf, char *path, char *target, uint32_t targetlen,
177 uint32_t prefixlen, int mkdirs)
181 uint32_t pathlen = strlen(path);
183 strncpy(buf, path, pathlen);
185 for (i = 0; i < 9; i+= 3) {
186 buf[pathlen + i] = target[prefixlen + i - (i/3)];
187 buf[pathlen + i +1] = target[prefixlen + i + 1 - (i/3)];
188 buf[pathlen + i + 2] = '/';
190 buf[pathlen + i + 3] = '\0';
192 if (stat(buf, &st) < 0)
193 if (mkdir(buf, 0700) < 0) {
202 strncpy(&buf[pathlen + 9], target, targetlen);
203 buf[pathlen + 9 + targetlen] = '\0';
208 static int dir_open(struct pfiled *pfiled, struct fio *io,
209 char *target, uint32_t targetlen, int mode)
212 struct fdcache_node *ce = NULL;
214 char tmp[pfiled->path_len + targetlen + 10];
217 if (targetlen> MAX_FILENAME_SIZE)
222 pthread_mutex_lock(&pfiled->cache_lock);
226 for (i = 0; i < pfiled->maxfds; i++) {
227 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time
228 && (pfiled->fdcache[i].flags & READY)) {
229 min = pfiled->fdcache[i].time;
234 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
235 if (pfiled->fdcache[i].target[targetlen] == 0) {
236 ce = &pfiled->fdcache[i];
237 /* if any other io thread is currently opening
238 * the file, block until it succeeds or fails
240 if (!(ce->flags & READY)) {
241 pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
242 /* when ready, restart lookup */
245 /* if successfully opened */
247 fd = pfiled->fdcache[i].fd;
251 /* else open failed for the other io thread, so
252 * it should fail for everyone waiting on this
264 /* all cache entries are currently being used */
265 pthread_mutex_unlock(&pfiled->cache_lock);
268 if (pfiled->fdcache[lru].ref){
270 XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
273 /* make room for new file */
274 ce = &pfiled->fdcache[lru];
275 /* set name here and state to not ready, for any other requests on the
276 * same target that may follow
278 strncpy(ce->target, target, targetlen);
279 ce->target[targetlen] = 0;
281 pthread_mutex_unlock(&pfiled->cache_lock);
284 if (close(ce->fd) < 0){
285 XSEGLOG2(&lc, W, "Cannot close %s", ce->target);
289 /* try opening it from pithos blocker dir */
290 if (create_path(tmp, pfiled->path, target, targetlen, 0, 0) < 0) {
295 fd = open(tmp, O_RDWR);
297 /* try opening it from the tmp dir */
298 if (create_path(tmp, pfiled->vpath, target, targetlen,
299 pfiled->prefix_len, 0) < 0)
302 fd = open(tmp, O_RDWR);
304 if (create_path(tmp, pfiled->vpath, target, targetlen,
305 pfiled->prefix_len, 1) < 0) {
310 fd = open(tmp, O_RDWR | O_CREAT, 0600);
312 XSEGLOG2(&lc, E, "Cannot open %s", tmp);
316 /* insert in cache a negative fd to indicate opening error to
317 * any other ios waiting for the file to open
320 /* insert in cache */
322 pthread_mutex_lock(&pfiled->cache_lock);
326 pthread_cond_broadcast(&ce->cond);
328 io->fdcacheidx = lru;
336 pfiled->handled_reqs++;
337 ce->time = pfiled->handled_reqs;
338 __sync_fetch_and_add(&ce->ref, 1);
339 pthread_mutex_unlock(&pfiled->cache_lock);
344 pthread_mutex_unlock(&pfiled->cache_lock);
348 static void handle_read_write(struct peerd *peer, struct peer_req *pr)
350 struct pfiled *pfiled = __get_pfiled(peer);
351 struct fio *fio = __get_fio(pr);
352 struct xseg_request *req = pr->req;
354 char *target = xseg_get_target(peer->xseg, req);
355 char *data = xseg_get_data(peer->xseg, req);
357 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
359 XSEGLOG2(&lc, E, "Dir open failed");
360 pfiled_fail(peer, pr);
365 if (req->flags & (XF_FLUSH | XF_FUA)) {
366 /* No FLUSH/FUA support yet (O_SYNC ?).
367 * note that with FLUSH/size == 0
368 * there will probably be a (uint64_t)-1 offset */
369 pfiled_complete(peer, pr);
372 pfiled_complete(peer, pr);
379 while (req->serviced < req->datalen) {
380 r = pread(fd, data + req->serviced,
381 req->datalen - req->serviced,
382 req->offset + req->serviced);
384 req->datalen = req->serviced;
385 XSEGLOG2(&lc, E, "Cannot read");
388 /* reached end of file. zero out the rest data buffer */
389 memset(data + req->serviced, 0, req->datalen - req->serviced);
390 req->serviced = req->datalen;
398 while (req->serviced < req->datalen) {
399 r = pwrite(fd, data + req->serviced,
400 req->datalen - req->serviced,
401 req->offset + req->serviced);
403 req->datalen = req->serviced;
411 XSEGLOG2(&lc, E, "Fsync failed.");
412 /* if fsync fails, then no bytes serviced correctly */
417 XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op);
418 pfiled_fail(peer, pr);
422 if (req->serviced > 0 ) {
423 pfiled_complete(peer, pr);
426 pfiled_fail(peer, pr);
431 static void handle_info(struct peerd *peer, struct peer_req *pr)
433 struct pfiled *pfiled = __get_pfiled(peer);
434 struct fio *fio = __get_fio(pr);
435 struct xseg_request *req = pr->req;
439 char *target = xseg_get_target(peer->xseg, req);
440 char *data = xseg_get_data(peer->xseg, req);
441 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data;
443 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
445 XSEGLOG2(&lc, E, "Dir open failed");
446 pfiled_fail(peer, pr);
450 r = fstat(fd, &stat);
452 XSEGLOG2(&lc, E, "fail in stat");
453 pfiled_fail(peer, pr);
457 size = (uint64_t)stat.st_size;
460 pfiled_complete(peer, pr);
463 static void handle_copy(struct peerd *peer, struct peer_req *pr)
465 struct pfiled *pfiled = __get_pfiled(peer);
466 struct fio *fio = __get_fio(pr);
467 struct xseg_request *req = pr->req;
469 char *target = xseg_get_target(peer->xseg, req);
470 char *data = xseg_get_data(peer->xseg, req);
471 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
473 char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
474 int n, src = -1, dst = -1, r = -1;
477 XSEGLOG2(&lc, E, "Out of memory");
478 pfiled_fail(peer, pr);
482 dst = dir_open(pfiled, fio, target, req->targetlen, 1);
484 XSEGLOG2(&lc, E, "Fail in dst");
489 if (create_path(buf, pfiled->path, xcopy->target,
490 xcopy->targetlen, 0, 0) < 0) {
491 XSEGLOG2(&lc, E, "Create path failed");
496 src = open(buf, O_RDWR);
498 XSEGLOG2(&lc, E, "fail in src %s", buf);
505 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
509 n = sendfile(dst, src, 0, st.st_size);
510 if (n != st.st_size) {
511 XSEGLOG2(&lc, E, "Copy failed for %s", buf);
522 pfiled_fail(peer, pr);
524 pfiled_complete(peer, pr);
528 static void handle_delete(struct peerd *peer, struct peer_req *pr)
530 struct pfiled *pfiled = __get_pfiled(peer);
531 struct fio *fio = __get_fio(pr);
532 struct xseg_request *req = pr->req;
534 char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
536 char *target = xseg_get_target(peer->xseg, req);
538 XSEGLOG2(&lc, E, "Out of memory");
539 pfiled_fail(peer, pr);
542 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
544 XSEGLOG2(&lc, E, "Dir open failed");
549 /* mark cache entry as invalid
550 * give a chance to pending operations on this file to end.
551 * file will close when all operations are done
553 if (fio->fdcacheidx >= 0) {
554 pthread_mutex_lock(&pfiled->cache_lock);
555 pfiled->fdcache[fio->fdcacheidx].flags &= ~READY;
556 pthread_mutex_unlock(&pfiled->cache_lock);
559 r = create_path(buf, pfiled->vpath, target, req->targetlen,
560 pfiled->prefix_len, 0);
562 XSEGLOG2(&lc, E, "Create path failed");
569 pfiled_fail(peer, pr);
571 pfiled_complete(peer, pr);
575 static void handle_acquire(struct peerd *peer, struct peer_req *pr)
577 struct pfiled *pfiled = __get_pfiled(peer);
578 // struct fio *fio = __get_fio(pr);
579 struct xseg_request *req = pr->req;
580 char *buf = malloc(MAX_FILENAME_SIZE);
581 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
583 char *target = xseg_get_target(peer->xseg, req);
585 if (!buf || !pathname) {
586 XSEGLOG2(&lc, E, "Out of memory");
587 pfiled_fail(peer, pr);
591 strncpy(buf, target, req->targetlen);
592 strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
594 XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
596 if (create_path(pathname, pfiled->vpath, buf,
597 req->targetlen + strlen(LOCK_SUFFIX),
598 pfiled->prefix_len, 1) < 0) {
599 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
604 while ((fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR)) < 0){
606 if (errno != EEXIST){
607 XSEGLOG2(&lc, W, "Error opening %s", pathname);
610 if (req->flags & XF_NOSYNC)
619 XSEGLOG2(&lc, I, "Failed to acquire lock %s", buf);
620 pfiled_fail(peer, pr);
623 XSEGLOG2(&lc, I, "Acquired lock %s", buf);
624 pfiled_complete(peer, pr);
629 static void handle_release(struct peerd *peer, struct peer_req *pr)
631 struct pfiled *pfiled = __get_pfiled(peer);
632 // struct fio *fio = __get_fio(pr);
633 struct xseg_request *req = pr->req;
634 char *buf = malloc(MAX_FILENAME_SIZE);
635 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
636 char *target = xseg_get_target(peer->xseg, req);
639 if (!buf || !pathname) {
640 XSEGLOG2(&lc, E, "Out of memory");
645 strncpy(buf, target, req->targetlen);
646 strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
648 r = create_path(pathname, pfiled->vpath, buf,
649 req->targetlen + strlen(LOCK_SUFFIX),
650 pfiled->prefix_len, 0);
652 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
655 r = unlink(pathname);
667 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
668 enum dispatch_reason reason)
670 struct fio *fio = __get_fio(pr);
671 if (reason == dispatch_accept){
672 fio->fdcacheidx = -1;
673 fio->state = XS_ACCEPTED;
679 handle_read_write(peer, pr); break;
681 handle_info(peer, pr); break;
683 handle_copy(peer, pr); break;
685 handle_delete(peer, pr); break;
687 handle_acquire(peer, pr); break;
689 handle_release(peer, pr); break;
693 handle_unknown(peer, pr);
698 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
702 struct pfiled *pfiled = malloc(sizeof(struct pfiled));
704 XSEGLOG2(&lc, E, "Out of memory");
710 pfiled->maxfds = 2 * peer->nr_ops;
711 pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
712 if(!pfiled->fdcache) {
713 XSEGLOG2(&lc, E, "Out of memory");
718 for (i = 0; i < peer->nr_ops; i++) {
719 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
720 if (!peer->peer_reqs->priv){
721 XSEGLOG2(&lc, E, "Out of memory");
727 pfiled->vpath[0] = 0;
729 pfiled->handled_reqs = 0;
731 for (i = 0; i < argc; i++) {
732 if (!strcmp(argv[i], "--pithos") && (i+1) < argc){
733 strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE);
734 pfiled->path[MAX_PATH_SIZE] = 0;
738 if (!strcmp(argv[i], "--archip") && (i+1) < argc){
739 strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE);
740 pfiled->vpath[MAX_PATH_SIZE] = 0;
744 if (!strcmp(argv[i], "--prefix") && (i+1) < argc){
745 strncpy(pfiled->prefix, argv[i+1], MAX_PREFIX_LEN);
746 pfiled->prefix[MAX_PREFIX_LEN] = 0;
752 BEGIN_READ_ARGS(argc, argv);
753 READ_ARG_STRING("--pithos", pfiled->path, MAX_PATH_SIZE);
754 READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
755 READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
759 pfiled->prefix_len = strlen(pfiled->prefix);
761 //TODO test path exist
762 pfiled->path_len = strlen(pfiled->path);
763 if (!pfiled->path_len){
764 XSEGLOG2(&lc, E, "Pithos path was not provided");
768 if (pfiled->path[pfiled->path_len -1] != '/'){
769 pfiled->path[pfiled->path_len] = '/';
770 pfiled->path[++pfiled->path_len]= 0;
773 pfiled->vpath_len = strlen(pfiled->vpath);
774 if (!pfiled->vpath_len){
775 XSEGLOG2(&lc, E, "Archipelagos path was not provided");
779 if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
780 pfiled->vpath[pfiled->vpath_len] = '/';
781 pfiled->vpath[++pfiled->vpath_len]= 0;
784 for (i = 0; i < peer->nr_ops; i++) {
785 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
786 pfiled->fdcache[i].flags = READY;
788 pthread_mutex_init(&pfiled->cache_lock, NULL);
794 void custom_peer_finalize(struct peerd *peer)
800 static int safe_atoi(char *s)
805 l = strtol(s, &endp, 10);
806 if (s != endp && *endp == '\0')