2 * The Pithos File Blocker Peer (pfiled)
19 #include <sys/sendfile.h>
22 #include <xseg/xseg.h>
23 #include <xseg/protocol.h>
25 #define LOCK_SUFFIX "_lock"
26 #define MAX_PATH_SIZE 1024
27 #define MAX_FILENAME_SIZE (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX)
28 #define MAX_PREFIX_LEN 10
30 /* default concurrency level (number of threads) */
31 #define DEFAULT_NR_OPS 16
33 /* Pithos hash for the zero block
34 * FIXME: Should it be hardcoded?
37 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
40 * Globals, holding command-line arguments
43 void custom_peer_usage(char *argv0)
45 fprintf(stderr, "Custom peer options:\n"
46 "--pithos PATH --archip VPATH --prefix PREFIX\n\n"
48 "\tPATH: path to pithos data blocks\n"
49 "\tVPATH: path to modified volume blocks\n"
50 "\tPREFIX: Common prefix of Archipelagos objects to be"
51 "striped during filesystem hierarchy creation\n"
55 /* fdcache_node flags */
56 #define READY (1 << 1)
58 /* fdcache node info */
61 volatile unsigned int ref;
62 volatile unsigned long time;
63 volatile unsigned int flags;
65 char target[MAX_FILENAME_SIZE + 1];
73 uint64_t handled_reqs;
75 struct fdcache_node *fdcache;
76 pthread_mutex_t cache_lock;
77 char path[MAX_PATH_SIZE + 1];
78 char vpath[MAX_PATH_SIZE + 1];
79 char prefix[MAX_PREFIX_LEN];
83 * pfiled specific structure
84 * containing information on a pending I/O operation
91 struct pfiled * __get_pfiled(struct peerd *peer)
93 return (struct pfiled *) peer->priv;
96 struct fio * __get_fio(struct peer_req *pr)
98 return (struct fio*) pr->priv;
101 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
103 struct pfiled *pfiled = __get_pfiled(peer);
104 struct fio *fio = __get_fio(pr);
106 if (fio->fdcacheidx >= 0) {
107 if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) {
108 pthread_mutex_lock(&pfiled->cache_lock);
109 if (!pfiled->fdcache[fio->fdcacheidx].ref){
110 /* invalidate cache entry */
111 fd = pfiled->fdcache[fio->fdcacheidx].fd;
112 pfiled->fdcache[fio->fdcacheidx].fd = -1;
113 pfiled->fdcache[fio->fdcacheidx].target[0] = 0;
114 pfiled->fdcache[fio->fdcacheidx].flags |= READY;
116 pthread_mutex_unlock(&pfiled->cache_lock);
124 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
126 close_cache_entry(peer, pr);
130 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
132 close_cache_entry(peer, pr);
136 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
138 XSEGLOG2(&lc, W, "unknown request op");
139 pfiled_fail(peer, pr);
142 static int create_path(char *buf, char *path, char *target, uint32_t targetlen,
143 uint32_t prefixlen, int mkdirs)
147 uint32_t pathlen = strlen(path);
149 strncpy(buf, path, pathlen);
151 for (i = 0; i < 9; i+= 3) {
152 buf[pathlen + i] = target[prefixlen + i - (i/3)];
153 buf[pathlen + i +1] = target[prefixlen + i + 1 - (i/3)];
154 buf[pathlen + i + 2] = '/';
156 buf[pathlen + i + 3] = '\0';
158 if (stat(buf, &st) < 0)
159 if (mkdir(buf, 0700) < 0) {
168 strncpy(&buf[pathlen + 9], target, targetlen);
169 buf[pathlen + 9 + targetlen] = '\0';
174 static int dir_open(struct pfiled *pfiled, struct fio *io,
175 char *target, uint32_t targetlen, int mode)
178 struct fdcache_node *ce = NULL;
180 char tmp[pfiled->path_len + targetlen + 10];
183 if (targetlen> MAX_FILENAME_SIZE)
188 pthread_mutex_lock(&pfiled->cache_lock);
192 for (i = 0; i < pfiled->maxfds; i++) {
193 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time
194 && (pfiled->fdcache[i].flags & READY)) {
195 min = pfiled->fdcache[i].time;
200 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
201 if (pfiled->fdcache[i].target[targetlen] == 0) {
202 ce = &pfiled->fdcache[i];
203 /* if any other io thread is currently opening
204 * the file, block until it succeeds or fails
206 if (!(ce->flags & READY)) {
207 pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
208 /* when ready, restart lookup */
211 /* if successfully opened */
213 fd = pfiled->fdcache[i].fd;
217 /* else open failed for the other io thread, so
218 * it should fail for everyone waiting on this
230 /* all cache entries are currently being used */
231 pthread_mutex_unlock(&pfiled->cache_lock);
234 if (pfiled->fdcache[lru].ref){
236 XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
239 /* make room for new file */
240 ce = &pfiled->fdcache[lru];
241 /* set name here and state to not ready, for any other requests on the
242 * same target that may follow
244 strncpy(ce->target, target, targetlen);
245 ce->target[targetlen] = 0;
247 pthread_mutex_unlock(&pfiled->cache_lock);
250 if (close(ce->fd) < 0){
251 XSEGLOG2(&lc, W, "Cannot close %s", ce->target);
255 /* try opening it from pithos blocker dir */
256 if (create_path(tmp, pfiled->path, target, targetlen, 0, 0) < 0) {
261 fd = open(tmp, O_RDWR);
263 /* try opening it from the tmp dir */
264 if (create_path(tmp, pfiled->vpath, target, targetlen,
265 pfiled->prefix_len, 0) < 0)
268 fd = open(tmp, O_RDWR);
270 if (create_path(tmp, pfiled->vpath, target, targetlen,
271 pfiled->prefix_len, 1) < 0) {
276 fd = open(tmp, O_RDWR | O_CREAT, 0600);
278 XSEGLOG2(&lc, E, "Cannot open %s", tmp);
282 /* insert in cache a negative fd to indicate opening error to
283 * any other ios waiting for the file to open
286 /* insert in cache */
288 pthread_mutex_lock(&pfiled->cache_lock);
292 pthread_cond_broadcast(&ce->cond);
294 io->fdcacheidx = lru;
302 pfiled->handled_reqs++;
303 ce->time = pfiled->handled_reqs;
304 __sync_fetch_and_add(&ce->ref, 1);
305 pthread_mutex_unlock(&pfiled->cache_lock);
310 pthread_mutex_unlock(&pfiled->cache_lock);
314 static void handle_read_write(struct peerd *peer, struct peer_req *pr)
316 struct pfiled *pfiled = __get_pfiled(peer);
317 struct fio *fio = __get_fio(pr);
318 struct xseg_request *req = pr->req;
320 char *target = xseg_get_target(peer->xseg, req);
321 char *data = xseg_get_data(peer->xseg, req);
323 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
325 XSEGLOG2(&lc, E, "Dir open failed");
326 pfiled_fail(peer, pr);
331 if (req->flags & (XF_FLUSH | XF_FUA)) {
332 /* No FLUSH/FUA support yet (O_SYNC ?).
333 * note that with FLUSH/size == 0
334 * there will probably be a (uint64_t)-1 offset */
335 pfiled_complete(peer, pr);
338 pfiled_complete(peer, pr);
345 while (req->serviced < req->datalen) {
346 r = pread(fd, data + req->serviced,
347 req->datalen - req->serviced,
348 req->offset + req->serviced);
350 req->datalen = req->serviced;
351 XSEGLOG2(&lc, E, "Cannot read");
354 /* reached end of file. zero out the rest data buffer */
355 memset(data + req->serviced, 0, req->datalen - req->serviced);
356 req->serviced = req->datalen;
364 while (req->serviced < req->datalen) {
365 r = pwrite(fd, data + req->serviced,
366 req->datalen - req->serviced,
367 req->offset + req->serviced);
369 req->datalen = req->serviced;
377 XSEGLOG2(&lc, E, "Fsync failed.");
378 /* if fsync fails, then no bytes serviced correctly */
383 XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op);
384 pfiled_fail(peer, pr);
388 if (req->serviced > 0 ) {
389 pfiled_complete(peer, pr);
392 pfiled_fail(peer, pr);
397 static void handle_info(struct peerd *peer, struct peer_req *pr)
399 struct pfiled *pfiled = __get_pfiled(peer);
400 struct fio *fio = __get_fio(pr);
401 struct xseg_request *req = pr->req;
405 char *target = xseg_get_target(peer->xseg, req);
406 char *data = xseg_get_data(peer->xseg, req);
407 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data;
409 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
411 XSEGLOG2(&lc, E, "Dir open failed");
412 pfiled_fail(peer, pr);
416 r = fstat(fd, &stat);
418 XSEGLOG2(&lc, E, "fail in stat");
419 pfiled_fail(peer, pr);
423 size = (uint64_t)stat.st_size;
426 pfiled_complete(peer, pr);
429 static void handle_copy(struct peerd *peer, struct peer_req *pr)
431 struct pfiled *pfiled = __get_pfiled(peer);
432 struct fio *fio = __get_fio(pr);
433 struct xseg_request *req = pr->req;
435 char *target = xseg_get_target(peer->xseg, req);
436 char *data = xseg_get_data(peer->xseg, req);
437 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
439 char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
440 int n, src = -1, dst = -1, r = -1;
443 XSEGLOG2(&lc, E, "Out of memory");
444 pfiled_fail(peer, pr);
448 dst = dir_open(pfiled, fio, target, req->targetlen, 1);
450 XSEGLOG2(&lc, E, "Fail in dst");
455 if (create_path(buf, pfiled->path, xcopy->target,
456 xcopy->targetlen, 0, 0) < 0) {
457 XSEGLOG2(&lc, E, "Create path failed");
462 src = open(buf, O_RDWR);
464 XSEGLOG2(&lc, E, "fail in src %s", buf);
471 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
475 n = sendfile(dst, src, 0, st.st_size);
476 if (n != st.st_size) {
477 XSEGLOG2(&lc, E, "Copy failed for %s", buf);
488 pfiled_fail(peer, pr);
490 pfiled_complete(peer, pr);
494 static void handle_delete(struct peerd *peer, struct peer_req *pr)
496 struct pfiled *pfiled = __get_pfiled(peer);
497 struct fio *fio = __get_fio(pr);
498 struct xseg_request *req = pr->req;
500 char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
502 char *target = xseg_get_target(peer->xseg, req);
504 XSEGLOG2(&lc, E, "Out of memory");
505 pfiled_fail(peer, pr);
508 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
510 XSEGLOG2(&lc, E, "Dir open failed");
515 /* mark cache entry as invalid
516 * give a chance to pending operations on this file to end.
517 * file will close when all operations are done
519 if (fio->fdcacheidx >= 0) {
520 pthread_mutex_lock(&pfiled->cache_lock);
521 pfiled->fdcache[fio->fdcacheidx].flags &= ~READY;
522 pthread_mutex_unlock(&pfiled->cache_lock);
525 r = create_path(buf, pfiled->vpath, target, req->targetlen,
526 pfiled->prefix_len, 0);
528 XSEGLOG2(&lc, E, "Create path failed");
535 pfiled_fail(peer, pr);
537 pfiled_complete(peer, pr);
541 static void handle_open(struct peerd *peer, struct peer_req *pr)
543 struct pfiled *pfiled = __get_pfiled(peer);
544 // struct fio *fio = __get_fio(pr);
545 struct xseg_request *req = pr->req;
546 char *buf = malloc(MAX_FILENAME_SIZE);
547 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
549 char *target = xseg_get_target(peer->xseg, req);
551 if (!buf || !pathname) {
552 XSEGLOG2(&lc, E, "Out of memory");
553 pfiled_fail(peer, pr);
557 strncpy(buf, target, req->targetlen);
558 strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
560 XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
562 if (create_path(pathname, pfiled->vpath, buf,
563 req->targetlen + strlen(LOCK_SUFFIX),
564 pfiled->prefix_len, 1) < 0) {
565 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
570 while ((fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR)) < 0){
572 if (errno != EEXIST){
573 XSEGLOG2(&lc, W, "Error opening %s", pathname);
576 if (req->flags & XF_NOSYNC)
585 XSEGLOG2(&lc, I, "Failed to acquire lock %s", buf);
586 pfiled_fail(peer, pr);
589 XSEGLOG2(&lc, I, "Acquired lock %s", buf);
590 pfiled_complete(peer, pr);
595 static void handle_close(struct peerd *peer, struct peer_req *pr)
597 struct pfiled *pfiled = __get_pfiled(peer);
598 // struct fio *fio = __get_fio(pr);
599 struct xseg_request *req = pr->req;
600 char *buf = malloc(MAX_FILENAME_SIZE);
601 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
602 char *target = xseg_get_target(peer->xseg, req);
605 if (!buf || !pathname) {
606 XSEGLOG2(&lc, E, "Out of memory");
611 strncpy(buf, target, req->targetlen);
612 strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
614 r = create_path(pathname, pfiled->vpath, buf,
615 req->targetlen + strlen(LOCK_SUFFIX),
616 pfiled->prefix_len, 0);
618 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
621 r = unlink(pathname);
633 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
634 enum dispatch_reason reason)
636 struct fio *fio = __get_fio(pr);
637 if (reason == dispatch_accept){
638 fio->fdcacheidx = -1;
639 fio->state = XS_ACCEPTED;
645 handle_read_write(peer, pr); break;
647 handle_info(peer, pr); break;
649 handle_copy(peer, pr); break;
651 handle_delete(peer, pr); break;
653 handle_open(peer, pr); break;
655 handle_close(peer, pr); break;
659 handle_unknown(peer, pr);
664 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
668 struct pfiled *pfiled = malloc(sizeof(struct pfiled));
670 XSEGLOG2(&lc, E, "Out of memory");
676 pfiled->maxfds = 2 * peer->nr_ops;
677 pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
678 if(!pfiled->fdcache) {
679 XSEGLOG2(&lc, E, "Out of memory");
684 for (i = 0; i < peer->nr_ops; i++) {
685 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
686 if (!peer->peer_reqs->priv){
687 XSEGLOG2(&lc, E, "Out of memory");
693 pfiled->vpath[0] = 0;
695 pfiled->handled_reqs = 0;
697 for (i = 0; i < argc; i++) {
698 if (!strcmp(argv[i], "--pithos") && (i+1) < argc){
699 strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE);
700 pfiled->path[MAX_PATH_SIZE] = 0;
704 if (!strcmp(argv[i], "--archip") && (i+1) < argc){
705 strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE);
706 pfiled->vpath[MAX_PATH_SIZE] = 0;
710 if (!strcmp(argv[i], "--prefix") && (i+1) < argc){
711 strncpy(pfiled->prefix, argv[i+1], MAX_PREFIX_LEN);
712 pfiled->prefix[MAX_PREFIX_LEN] = 0;
718 BEGIN_READ_ARGS(argc, argv);
719 READ_ARG_STRING("--pithos", pfiled->path, MAX_PATH_SIZE);
720 READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
721 READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
725 pfiled->prefix_len = strlen(pfiled->prefix);
727 //TODO test path exist
728 pfiled->path_len = strlen(pfiled->path);
729 if (!pfiled->path_len){
730 XSEGLOG2(&lc, E, "Pithos path was not provided");
733 if (pfiled->path[pfiled->path_len -1] != '/'){
734 pfiled->path[pfiled->path_len] = '/';
735 pfiled->path[++pfiled->path_len]= 0;
738 pfiled->vpath_len = strlen(pfiled->vpath);
739 if (!pfiled->vpath_len){
740 XSEGLOG2(&lc, E, "Archipelagos path was not provided");
743 if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
744 pfiled->vpath[pfiled->vpath_len] = '/';
745 pfiled->vpath[++pfiled->vpath_len]= 0;
748 for (i = 0; i < peer->nr_ops; i++) {
749 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
750 pfiled->fdcache[i].flags = READY;
752 pthread_mutex_init(&pfiled->cache_lock, NULL);
758 void custom_peer_finalize(struct peerd *peer)
763 static int safe_atoi(char *s)
768 l = strtol(s, &endp, 10);
769 if (s != endp && *endp == '\0')