2 * The Pithos File Blocker Peer (pfiled)
19 #include <sys/sendfile.h>
22 #include <xseg/xseg.h>
23 #include <xseg/protocol.h>
25 #define LOCK_SUFFIX "_lock"
26 #define MAX_PATH_SIZE 1024
27 #define MAX_FILENAME_SIZE (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX)
29 /* default concurrency level (number of threads) */
30 #define DEFAULT_NR_OPS 16
32 /* Pithos hash for the zero block
33 * FIXME: Should it be hardcoded?
36 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
39 * Globals, holding command-line arguments
42 void usage(char *argv0)
45 "Usage: %s <PATH> <VPATH> [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n"
47 "\tPATH: path to pithos data blocks\n"
48 "\tVPATH: path to modified volume blocks\n"
49 "\tPORT: xseg port to listen for requests on\n"
50 "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:"
51 "request_size:extra_size:page_shift'\n"
52 "\tNR_OPS: number of outstanding xseg requests\n"
53 "\t-v: verbose mode\n",
58 /* fdcache_node flags */
59 #define READY (1 << 1)
61 /* fdcache node info */
64 volatile unsigned int ref;
65 volatile unsigned long time;
66 volatile unsigned int flags;
68 char target[MAX_FILENAME_SIZE + 1];
75 uint64_t handled_reqs;
77 struct fdcache_node *fdcache;
78 pthread_mutex_t cache_lock;
79 char path[MAX_PATH_SIZE + 1];
80 char vpath[MAX_PATH_SIZE + 1];
84 * pfiled specific structure
85 * containing information on a pending I/O operation
92 struct pfiled * __get_pfiled(struct peerd *peer)
94 return (struct pfiled *) peer->priv;
97 struct fio * __get_fio(struct peer_req *pr)
99 return (struct fio*) pr->priv;
102 static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
104 struct pfiled *pfiled = __get_pfiled(peer);
105 struct fio *fio = __get_fio(pr);
107 if (fio->fdcacheidx >= 0) {
108 if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) {
109 pthread_mutex_lock(&pfiled->cache_lock);
110 if (!pfiled->fdcache[fio->fdcacheidx].ref){
111 /* invalidate cache entry */
112 fd = pfiled->fdcache[fio->fdcacheidx].fd;
113 pfiled->fdcache[fio->fdcacheidx].fd = -1;
114 pfiled->fdcache[fio->fdcacheidx].target[0] = 0;
115 pfiled->fdcache[fio->fdcacheidx].flags |= READY;
117 pthread_mutex_unlock(&pfiled->cache_lock);
125 static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
127 close_cache_entry(peer, pr);
131 static void pfiled_fail(struct peerd *peer, struct peer_req *pr)
133 close_cache_entry(peer, pr);
137 static void handle_unknown(struct peerd *peer, struct peer_req *pr)
139 XSEGLOG2(&lc, W, "unknown request op");
140 pfiled_fail(peer, pr);
143 static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs)
147 uint32_t pathlen = strlen(path);
149 strncpy(buf, path, pathlen);
151 for (i = 0; i < 9; i+= 3) {
152 buf[pathlen + i] = target[i - (i/3)];
153 buf[pathlen + i +1] = target[i + 1 - (i/3)];
154 buf[pathlen + i + 2] = '/';
156 buf[pathlen + i + 3] = '\0';
158 if (stat(buf, &st) < 0)
159 if (mkdir(buf, 0700) < 0) {
168 strncpy(&buf[pathlen + 9], target, targetlen);
169 buf[pathlen + 9 + targetlen] = '\0';
174 static int dir_open(struct pfiled *pfiled, struct fio *io,
175 char *target, uint32_t targetlen, int mode)
178 struct fdcache_node *ce = NULL;
180 char tmp[pfiled->path_len + targetlen + 10];
183 if (targetlen> MAX_FILENAME_SIZE)
188 pthread_mutex_lock(&pfiled->cache_lock);
192 for (i = 0; i < pfiled->maxfds; i++) {
193 if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time
194 && (pfiled->fdcache[i].flags & READY)) {
195 min = pfiled->fdcache[i].time;
200 if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
201 if (pfiled->fdcache[i].target[targetlen] == 0) {
202 ce = &pfiled->fdcache[i];
203 /* if any other io thread is currently opening
204 * the file, block until it succeeds or fails
206 if (!(ce->flags & READY)) {
207 pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
208 /* when ready, restart lookup */
211 /* if successfully opened */
213 fd = pfiled->fdcache[i].fd;
217 /* else open failed for the other io thread, so
218 * it should fail for everyone waiting on this
230 /* all cache entries are currently being used */
231 pthread_mutex_unlock(&pfiled->cache_lock);
234 if (pfiled->fdcache[lru].ref){
236 XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
239 /* make room for new file */
240 ce = &pfiled->fdcache[lru];
241 /* set name here and state to not ready, for any other requests on the
242 * same target that may follow
244 strncpy(ce->target, target, targetlen);
245 ce->target[targetlen] = 0;
247 pthread_mutex_unlock(&pfiled->cache_lock);
250 if (close(ce->fd) < 0){
251 XSEGLOG2(&lc, W, "Cannot close %s", ce->target);
255 /* try opening it from pithos blocker dir */
256 if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) {
261 fd = open(tmp, O_RDWR);
263 /* try opening it from the tmp dir */
264 if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0)
267 fd = open(tmp, O_RDWR);
269 if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) {
274 fd = open(tmp, O_RDWR | O_CREAT, 0600);
276 XSEGLOG2(&lc, E, "Cannot open %s", tmp);
280 /* insert in cache a negative fd to indicate opening error to
281 * any other ios waiting for the file to open
284 /* insert in cache */
286 pthread_mutex_lock(&pfiled->cache_lock);
290 pthread_cond_broadcast(&ce->cond);
292 io->fdcacheidx = lru;
300 pfiled->handled_reqs++;
301 ce->time = pfiled->handled_reqs;
302 __sync_fetch_and_add(&ce->ref, 1);
303 pthread_mutex_unlock(&pfiled->cache_lock);
308 pthread_mutex_unlock(&pfiled->cache_lock);
312 static void handle_read_write(struct peerd *peer, struct peer_req *pr)
314 struct pfiled *pfiled = __get_pfiled(peer);
315 struct fio *fio = __get_fio(pr);
316 struct xseg_request *req = pr->req;
318 char *target = xseg_get_target(peer->xseg, req);
319 char *data = xseg_get_data(peer->xseg, req);
321 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
323 XSEGLOG2(&lc, E, "Dir open failed");
324 pfiled_fail(peer, pr);
329 if (req->flags & (XF_FLUSH | XF_FUA)) {
330 /* No FLUSH/FUA support yet (O_SYNC ?).
331 * note that with FLUSH/size == 0
332 * there will probably be a (uint64_t)-1 offset */
333 pfiled_complete(peer, pr);
336 pfiled_complete(peer, pr);
343 while (req->serviced < req->datalen) {
344 r = pread(fd, data + req->serviced,
345 req->datalen - req->serviced,
346 req->offset + req->serviced);
348 req->datalen = req->serviced;
349 XSEGLOG2(&lc, E, "Cannot read");
352 /* reached end of file. zero out the rest data buffer */
353 memset(data + req->serviced, 0, req->datalen - req->serviced);
354 req->serviced = req->datalen;
362 while (req->serviced < req->datalen) {
363 r = pwrite(fd, data + req->serviced,
364 req->datalen - req->serviced,
365 req->offset + req->serviced);
367 req->datalen = req->serviced;
375 XSEGLOG2(&lc, E, "Fsync failed.");
376 /* if fsync fails, then no bytes serviced correctly */
381 XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op);
382 pfiled_fail(peer, pr);
386 if (req->serviced > 0 ) {
387 pfiled_complete(peer, pr);
390 pfiled_fail(peer, pr);
395 static void handle_info(struct peerd *peer, struct peer_req *pr)
397 struct pfiled *pfiled = __get_pfiled(peer);
398 struct fio *fio = __get_fio(pr);
399 struct xseg_request *req = pr->req;
403 char *target = xseg_get_target(peer->xseg, req);
404 char *data = xseg_get_data(peer->xseg, req);
405 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data;
407 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
409 XSEGLOG2(&lc, E, "Dir open failed");
410 pfiled_fail(peer, pr);
414 r = fstat(fd, &stat);
416 XSEGLOG2(&lc, E, "fail in stat");
417 pfiled_fail(peer, pr);
421 size = (uint64_t)stat.st_size;
424 pfiled_complete(peer, pr);
427 static void handle_copy(struct peerd *peer, struct peer_req *pr)
429 struct pfiled *pfiled = __get_pfiled(peer);
430 struct fio *fio = __get_fio(pr);
431 struct xseg_request *req = pr->req;
433 char *target = xseg_get_target(peer->xseg, req);
434 char *data = xseg_get_data(peer->xseg, req);
435 struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
437 char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
438 int n, src = -1, dst = -1, r = -1;
441 XSEGLOG2(&lc, E, "Out of memory");
442 pfiled_fail(peer, pr);
446 dst = dir_open(pfiled, fio, target, req->targetlen, 1);
448 XSEGLOG2(&lc, E, "Fail in dst");
453 if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0) {
454 XSEGLOG2(&lc, E, "Create path failed");
459 src = open(buf, O_RDWR);
461 XSEGLOG2(&lc, E, "fail in src %s", buf);
468 XSEGLOG2(&lc, E, "fail in stat for src %s", buf);
472 n = sendfile(dst, src, 0, st.st_size);
473 if (n != st.st_size) {
474 XSEGLOG2(&lc, E, "Copy failed for %s", buf);
485 pfiled_fail(peer, pr);
487 pfiled_complete(peer, pr);
491 static void handle_delete(struct peerd *peer, struct peer_req *pr)
493 struct pfiled *pfiled = __get_pfiled(peer);
494 struct fio *fio = __get_fio(pr);
495 struct xseg_request *req = pr->req;
497 char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
499 char *target = xseg_get_target(peer->xseg, req);
501 XSEGLOG2(&lc, E, "Out of memory");
502 pfiled_fail(peer, pr);
505 fd = dir_open(pfiled, fio, target, req->targetlen, 0);
507 XSEGLOG2(&lc, E, "Dir open failed");
512 /* mark cache entry as invalid
513 * give a chance to pending operations on this file to end.
514 * file will close when all operations are done
516 if (fio->fdcacheidx >= 0) {
517 pthread_mutex_lock(&pfiled->cache_lock);
518 pfiled->fdcache[fio->fdcacheidx].flags &= ~READY;
519 pthread_mutex_unlock(&pfiled->cache_lock);
522 r = create_path(buf, pfiled->vpath, target, req->targetlen, 0);
524 XSEGLOG2(&lc, E, "Create path failed");
531 pfiled_fail(peer, pr);
533 pfiled_complete(peer, pr);
537 static void handle_open(struct peerd *peer, struct peer_req *pr)
539 struct pfiled *pfiled = __get_pfiled(peer);
540 // struct fio *fio = __get_fio(pr);
541 struct xseg_request *req = pr->req;
542 char *buf = malloc(MAX_FILENAME_SIZE);
543 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
545 char *target = xseg_get_target(peer->xseg, req);
547 if (!buf || !pathname) {
548 XSEGLOG2(&lc, E, "Out of memory");
549 pfiled_fail(peer, pr);
553 strncpy(buf, target, req->targetlen);
554 strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
556 if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen(LOCK_SUFFIX), 1) < 0) {
557 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
562 fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
565 if (errno != -EEXIST)
566 XSEGLOG2(&lc, W, "Error opening %s", pathname);
574 pfiled_fail(peer, pr);
576 pfiled_complete(peer, pr);
580 static void handle_close(struct peerd *peer, struct peer_req *pr)
582 struct pfiled *pfiled = __get_pfiled(peer);
583 // struct fio *fio = __get_fio(pr);
584 struct xseg_request *req = pr->req;
585 char *buf = malloc(MAX_FILENAME_SIZE);
586 char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
587 char *target = xseg_get_target(peer->xseg, req);
590 if (!buf || !pathname) {
591 XSEGLOG2(&lc, E, "Out of memory");
596 strncpy(buf, target, req->targetlen);
597 strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
599 r = create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen(LOCK_SUFFIX), 0);
601 XSEGLOG2(&lc, E, "Create path failed for %s", buf);
604 r = unlink(pathname);
616 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
617 enum dispatch_reason reason)
619 struct fio *fio = __get_fio(pr);
620 if (reason == dispatch_accept){
621 fio->fdcacheidx = -1;
622 fio->state = XS_ACCEPTED;
628 handle_read_write(peer, pr); break;
630 handle_info(peer, pr); break;
632 handle_copy(peer, pr); break;
634 handle_delete(peer, pr); break;
636 handle_open(peer, pr); break;
638 handle_close(peer, pr); break;
642 handle_unknown(peer, pr);
647 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
651 struct pfiled *pfiled = malloc(sizeof(struct pfiled));
653 XSEGLOG2(&lc, E, "Out of memory");
659 pfiled->maxfds = 2 * peer->nr_ops;
660 pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
661 if(!pfiled->fdcache) {
662 XSEGLOG2(&lc, E, "Out of memory");
667 for (i = 0; i < peer->nr_ops; i++) {
668 peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
669 if (!peer->peer_reqs->priv){
670 XSEGLOG2(&lc, E, "Out of memory");
676 pfiled->handled_reqs = 0;
677 for (i = 0; i < argc; i++) {
678 if (!strcmp(argv[i], "--path") && (i+1) < argc){
679 strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE);
680 pfiled->path[MAX_PATH_SIZE] = 0;
684 if (!strcmp(argv[i], "--vpath") && (i+1) < argc){
685 strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE);
686 pfiled->vpath[MAX_PATH_SIZE] = 0;
692 pfiled->path_len = strlen(pfiled->path);
693 if (pfiled->path[pfiled->path_len -1] != '/'){
694 pfiled->path[pfiled->path_len] = '/';
695 pfiled->path[++pfiled->path_len]= 0;
698 pfiled->vpath_len = strlen(pfiled->vpath);
699 if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
700 pfiled->vpath[pfiled->vpath_len] = '/';
701 pfiled->vpath[++pfiled->vpath_len]= 0;
704 for (i = 0; i < peer->nr_ops; i++) {
705 pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
706 pfiled->fdcache[i].flags = READY;
708 pthread_mutex_init(&pfiled->cache_lock, NULL);
714 static int safe_atoi(char *s)
719 l = strtol(s, &endp, 10);
720 if (s != endp && *endp == '\0')