From: Giannakos Filippos Date: Fri, 2 Nov 2012 16:41:37 +0000 (+0200) Subject: add pfiled based on peer skeleton X-Git-Tag: debian/0.3.1~70^2~10 X-Git-Url: https://code.grnet.gr/git/archipelago/commitdiff_plain/2f0a0b3ce351db0bdc85f5d2ed3d0616467ae631 add pfiled based on peer skeleton --- diff --git a/xseg/peers/user/mt-pfiled.c b/xseg/peers/user/mt-pfiled.c new file mode 100644 index 0000000..f2cbebb --- /dev/null +++ b/xseg/peers/user/mt-pfiled.c @@ -0,0 +1,724 @@ +/* + * The Pithos File Blocker Peer (pfiled) + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#define LOCK_SUFFIX "_lock" +#define MAX_PATH_SIZE 1024 +#define MAX_FILENAME_SIZE (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX) + +/* default concurrency level (number of threads) */ +#define DEFAULT_NR_OPS 16 + +/* Pithos hash for the zero block + * FIXME: Should it be hardcoded? + */ +#define ZERO_BLOCK \ + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85" + +/* + * Globals, holding command-line arguments + */ + +void usage(char *argv0) +{ + fprintf(stderr, + "Usage: %s [-p PORT] [-g XSEG_SPEC] [-n NR_OPS] [-v]\n\n" + "where:\n" + "\tPATH: path to pithos data blocks\n" + "\tVPATH: path to modified volume blocks\n" + "\tPORT: xseg port to listen for requests on\n" + "\tXSEG_SPEC: xseg spec as 'type:name:nr_ports:nr_requests:" + "request_size:extra_size:page_shift'\n" + "\tNR_OPS: number of outstanding xseg requests\n" + "\t-v: verbose mode\n", + argv0); + +} + +/* fdcache_node flags */ +#define READY (1 << 1) + +/* fdcache node info */ +struct fdcache_node { + volatile int fd; + volatile unsigned int ref; + volatile unsigned long time; + volatile unsigned int flags; + pthread_cond_t cond; + char target[MAX_FILENAME_SIZE + 1]; +}; + +/* pfiled context */ +struct pfiled { + uint32_t path_len; + uint32_t vpath_len; + uint64_t handled_reqs; + long maxfds; + struct fdcache_node *fdcache; + pthread_mutex_t cache_lock; + char path[MAX_PATH_SIZE + 1]; + char vpath[MAX_PATH_SIZE + 1]; +}; + +/* + * pfiled specific structure + * containing information on a pending I/O operation + */ +struct fio { + uint32_t state; + long fdcacheidx; +}; + +struct pfiled * __get_pfiled(struct peerd *peer) +{ + return (struct pfiled *) peer->priv; +} + +struct fio * __get_fio(struct peer_req *pr) +{ + return (struct fio*) pr->priv; +} + +static void close_cache_entry(struct peerd *peer, struct peer_req *pr) +{ + struct pfiled *pfiled = __get_pfiled(peer); + struct fio *fio = __get_fio(pr); + int fd = -1; + if (fio->fdcacheidx >= 0) { + if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) { + pthread_mutex_lock(&pfiled->cache_lock); + if (!pfiled->fdcache[fio->fdcacheidx].ref){ + /* invalidate cache entry */ + fd = pfiled->fdcache[fio->fdcacheidx].fd; + pfiled->fdcache[fio->fdcacheidx].fd = -1; + pfiled->fdcache[fio->fdcacheidx].target[0] = 0; + pfiled->fdcache[fio->fdcacheidx].flags |= READY; + } + pthread_mutex_unlock(&pfiled->cache_lock); + if (fd > 0) + close(fd); + + } + } +} + +static void pfiled_complete(struct peerd *peer, struct peer_req *pr) +{ + close_cache_entry(peer, pr); + complete(peer, pr); +} + +static void pfiled_fail(struct peerd *peer, struct peer_req *pr) +{ + close_cache_entry(peer, pr); + fail(peer, pr); +} + +static void handle_unknown(struct peerd *peer, struct peer_req *pr) +{ + XSEGLOG2(&lc, W, "unknown request op"); + pfiled_fail(peer, pr); +} + +static int create_path(char *buf, char *path, char *target, uint32_t targetlen, int mkdirs) +{ + int i; + struct stat st; + uint32_t pathlen = strlen(path); + + strncpy(buf, path, pathlen); + + for (i = 0; i < 9; i+= 3) { + buf[pathlen + i] = target[i - (i/3)]; + buf[pathlen + i +1] = target[i + 1 - (i/3)]; + buf[pathlen + i + 2] = '/'; + if (mkdirs == 1) { + buf[pathlen + i + 3] = '\0'; +retry: + if (stat(buf, &st) < 0) + if (mkdir(buf, 0700) < 0) { + if (errno == EEXIST) + goto retry; + perror(buf); + return errno; + } + } + } + + strncpy(&buf[pathlen + 9], target, targetlen); + buf[pathlen + 9 + targetlen] = '\0'; + + return 0; +} + +static int dir_open(struct pfiled *pfiled, struct fio *io, + char *target, uint32_t targetlen, int mode) +{ + int fd = -1; + struct fdcache_node *ce = NULL; + long i, lru; + char tmp[pfiled->path_len + targetlen + 10]; + uint64_t min; + io->fdcacheidx = -1; + if (targetlen> MAX_FILENAME_SIZE) + goto out_err; + +start: + /* check cache */ + pthread_mutex_lock(&pfiled->cache_lock); +start_locked: + lru = -1; + min = UINT64_MAX; + for (i = 0; i < pfiled->maxfds; i++) { + if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time + && (pfiled->fdcache[i].flags & READY)) { + min = pfiled->fdcache[i].time; + lru = i; + + } + + if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) { + if (pfiled->fdcache[i].target[targetlen] == 0) { + ce = &pfiled->fdcache[i]; + /* if any other io thread is currently opening + * the file, block until it succeeds or fails + */ + if (!(ce->flags & READY)) { + pthread_cond_wait(&ce->cond, &pfiled->cache_lock); + /* when ready, restart lookup */ + goto start_locked; + } + /* if successfully opened */ + if (ce->fd > 0) { + fd = pfiled->fdcache[i].fd; + io->fdcacheidx = i; + goto out; + } + /* else open failed for the other io thread, so + * it should fail for everyone waiting on this + * file. + */ + else { + fd = -1; + io->fdcacheidx = -1; + goto out_err_unlock; + } + } + } + } + if (lru < 0){ + /* all cache entries are currently being used */ + pthread_mutex_unlock(&pfiled->cache_lock); + goto start; + } + if (pfiled->fdcache[lru].ref){ + fd = -1; + XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref); + goto out_err_unlock; + } + /* make room for new file */ + ce = &pfiled->fdcache[lru]; + /* set name here and state to not ready, for any other requests on the + * same target that may follow + */ + strncpy(ce->target, target, targetlen); + ce->target[targetlen] = 0; + ce->flags &= ~READY; + pthread_mutex_unlock(&pfiled->cache_lock); + + if (ce->fd >0){ + if (close(ce->fd) < 0){ + XSEGLOG2(&lc, W, "Cannot close %s", ce->target); + } + } + + /* try opening it from pithos blocker dir */ + if (create_path(tmp, pfiled->path, target, targetlen, 0) < 0) { + fd = -1; + goto new_entry; + } + + fd = open(tmp, O_RDWR); + if (fd < 0) { + /* try opening it from the tmp dir */ + if (create_path(tmp, pfiled->vpath, target, targetlen, 0) < 0) + goto new_entry; + + fd = open(tmp, O_RDWR); + if (fd < 0) { + if (create_path(tmp, pfiled->vpath, target, targetlen, 1) < 0) { + fd = -1; + goto new_entry; + } + + fd = open(tmp, O_RDWR | O_CREAT, 0600); + if (fd < 0) + XSEGLOG2(&lc, E, "Cannot open %s", tmp); + } + } + + /* insert in cache a negative fd to indicate opening error to + * any other ios waiting for the file to open + */ + + /* insert in cache */ +new_entry: + pthread_mutex_lock(&pfiled->cache_lock); + ce->fd = fd; + ce->ref = 0; + ce->flags = READY; + pthread_cond_broadcast(&ce->cond); + if (fd > 0) { + io->fdcacheidx = lru; + } + else { + io->fdcacheidx = -1; + goto out_err_unlock; + } + +out: + pfiled->handled_reqs++; + ce->time = pfiled->handled_reqs; + __sync_fetch_and_add(&ce->ref, 1); + pthread_mutex_unlock(&pfiled->cache_lock); +out_err: + return fd; + +out_err_unlock: + pthread_mutex_unlock(&pfiled->cache_lock); + goto out_err; +} + +static void handle_read_write(struct peerd *peer, struct peer_req *pr) +{ + struct pfiled *pfiled = __get_pfiled(peer); + struct fio *fio = __get_fio(pr); + struct xseg_request *req = pr->req; + int r, fd; + char *target = xseg_get_target(peer->xseg, req); + char *data = xseg_get_data(peer->xseg, req); + + fd = dir_open(pfiled, fio, target, req->targetlen, 0); + if (fd < 0){ + XSEGLOG2(&lc, E, "Dir open failed"); + pfiled_fail(peer, pr); + return; + } + + if (!req->size) { + if (req->flags & (XF_FLUSH | XF_FUA)) { + /* No FLUSH/FUA support yet (O_SYNC ?). + * note that with FLUSH/size == 0 + * there will probably be a (uint64_t)-1 offset */ + pfiled_complete(peer, pr); + return; + } else { + pfiled_complete(peer, pr); + return; + } + } + + switch (req->op) { + case X_READ: + while (req->serviced < req->datalen) { + r = pread(fd, data + req->serviced, + req->datalen - req->serviced, + req->offset + req->serviced); + if (r < 0) { + req->datalen = req->serviced; + XSEGLOG2(&lc, E, "Cannot read"); + } + else if (r == 0) { + /* reached end of file. zero out the rest data buffer */ + memset(data + req->serviced, 0, req->datalen - req->serviced); + req->serviced = req->datalen; + } + else { + req->serviced += r; + } + } + break; + case X_WRITE: + while (req->serviced < req->datalen) { + r = pwrite(fd, data + req->serviced, + req->datalen - req->serviced, + req->offset + req->serviced); + if (r < 0) { + req->datalen = req->serviced; + } + else { + req->serviced += r; + } + } + r = fsync(fd); + if (r< 0) { + XSEGLOG2(&lc, E, "Fsync failed."); + /* if fsync fails, then no bytes serviced correctly */ + req->serviced = 0; + } + break; + default: + XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op); + pfiled_fail(peer, pr); + return; + } + + if (req->serviced > 0 ) { + pfiled_complete(peer, pr); + } + else { + pfiled_fail(peer, pr); + } + return; +} + +static void handle_info(struct peerd *peer, struct peer_req *pr) +{ + struct pfiled *pfiled = __get_pfiled(peer); + struct fio *fio = __get_fio(pr); + struct xseg_request *req = pr->req; + struct stat stat; + int fd, r; + uint64_t size; + char *target = xseg_get_target(peer->xseg, req); + char *data = xseg_get_data(peer->xseg, req); + struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data; + + fd = dir_open(pfiled, fio, target, req->targetlen, 0); + if (fd < 0) { + XSEGLOG2(&lc, E, "Dir open failed"); + pfiled_fail(peer, pr); + return; + } + + r = fstat(fd, &stat); + if (r < 0) { + XSEGLOG2(&lc, E, "fail in stat"); + pfiled_fail(peer, pr); + return; + } + + size = (uint64_t)stat.st_size; + xinfo->size = size; + + pfiled_complete(peer, pr); +} + +static void handle_copy(struct peerd *peer, struct peer_req *pr) +{ + struct pfiled *pfiled = __get_pfiled(peer); + struct fio *fio = __get_fio(pr); + struct xseg_request *req = pr->req; + + char *target = xseg_get_target(peer->xseg, req); + char *data = xseg_get_data(peer->xseg, req); + struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data; + struct stat st; + char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE); + int n, src = -1, dst = -1, r = -1; + + if (!buf){ + XSEGLOG2(&lc, E, "Out of memory"); + pfiled_fail(peer, pr); + return; + } + + dst = dir_open(pfiled, fio, target, req->targetlen, 1); + if (dst < 0) { + XSEGLOG2(&lc, E, "Fail in dst"); + r = dst; + goto out; + } + + if (create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0) < 0) { + XSEGLOG2(&lc, E, "Create path failed"); + r = -1; + goto out; + } + + src = open(buf, O_RDWR); + if (src < 0) { + XSEGLOG2(&lc, E, "fail in src %s", buf); + r = src; + goto out; + } + + r = fstat(src, &st); + if (r < 0){ + XSEGLOG2(&lc, E, "fail in stat for src %s", buf); + goto out; + } + + n = sendfile(dst, src, 0, st.st_size); + if (n != st.st_size) { + XSEGLOG2(&lc, E, "Copy failed for %s", buf); + r = -1; + goto out; + } + r = 0; + +out: + if (src > 0) + close(src); + free(buf); + if (r < 0) + pfiled_fail(peer, pr); + else + pfiled_complete(peer, pr); + return; +} + +static void handle_delete(struct peerd *peer, struct peer_req *pr) +{ + struct pfiled *pfiled = __get_pfiled(peer); + struct fio *fio = __get_fio(pr); + struct xseg_request *req = pr->req; + + char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE); + int fd, r; + char *target = xseg_get_target(peer->xseg, req); + if (!buf){ + XSEGLOG2(&lc, E, "Out of memory"); + pfiled_fail(peer, pr); + return; + } + fd = dir_open(pfiled, fio, target, req->targetlen, 0); + if (fd < 0) { + XSEGLOG2(&lc, E, "Dir open failed"); + r = fd; + goto out; + } + + /* mark cache entry as invalid + * give a chance to pending operations on this file to end. + * file will close when all operations are done + */ + if (fio->fdcacheidx >= 0) { + pthread_mutex_lock(&pfiled->cache_lock); + pfiled->fdcache[fio->fdcacheidx].flags &= ~READY; + pthread_mutex_unlock(&pfiled->cache_lock); + } + + r = create_path(buf, pfiled->vpath, target, req->targetlen, 0); + if (r< 0) { + XSEGLOG2(&lc, E, "Create path failed"); + goto out; + } + r = unlink(buf); +out: + free(buf); + if (r < 0) + pfiled_fail(peer, pr); + else + pfiled_complete(peer, pr); + return; +} + +static void handle_open(struct peerd *peer, struct peer_req *pr) +{ + struct pfiled *pfiled = __get_pfiled(peer); +// struct fio *fio = __get_fio(pr); + struct xseg_request *req = pr->req; + char *buf = malloc(MAX_FILENAME_SIZE); + char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE); + int fd = -1; + char *target = xseg_get_target(peer->xseg, req); + + if (!buf || !pathname) { + XSEGLOG2(&lc, E, "Out of memory"); + pfiled_fail(peer, pr); + return; + } + + strncpy(buf, target, req->targetlen); + strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX)); + + if (create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen(LOCK_SUFFIX), 1) < 0) { + XSEGLOG2(&lc, E, "Create path failed for %s", buf); + goto out; + } + + //nfs v >= 3 + fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR); + if (fd < 0){ + //actual error + if (errno != -EEXIST) + XSEGLOG2(&lc, W, "Error opening %s", pathname); + goto out; + } + close(fd); +out: + free(buf); + free(pathname); + if (fd < 0) + pfiled_fail(peer, pr); + else + pfiled_complete(peer, pr); + return; +} + +static void handle_close(struct peerd *peer, struct peer_req *pr) +{ + struct pfiled *pfiled = __get_pfiled(peer); +// struct fio *fio = __get_fio(pr); + struct xseg_request *req = pr->req; + char *buf = malloc(MAX_FILENAME_SIZE); + char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE); + char *target = xseg_get_target(peer->xseg, req); + int r; + + if (!buf || !pathname) { + XSEGLOG2(&lc, E, "Out of memory"); + fail(peer, pr); + return; + } + + strncpy(buf, target, req->targetlen); + strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX)); + + r = create_path(pathname, pfiled->vpath, buf, req->targetlen + strlen(LOCK_SUFFIX), 0); + if (r < 0) { + XSEGLOG2(&lc, E, "Create path failed for %s", buf); + goto out; + } + r = unlink(pathname); + +out: + free(buf); + free(pathname); + if (r < 0) + fail(peer, pr); + else + complete(peer, pr); + return; +} + +int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req, + enum dispatch_reason reason) +{ + struct fio *fio = __get_fio(pr); + if (reason == dispatch_accept){ + fio->fdcacheidx = -1; + fio->state = XS_ACCEPTED; + } + + switch (req->op) { + case X_READ: + case X_WRITE: + handle_read_write(peer, pr); break; + case X_INFO: + handle_info(peer, pr); break; + case X_COPY: + handle_copy(peer, pr); break; + case X_DELETE: + handle_delete(peer, pr); break; + case X_OPEN: + handle_open(peer, pr); break; + case X_CLOSE: + handle_close(peer, pr); break; + // case X_SNAPSHOT: + case X_SYNC: + default: + handle_unknown(peer, pr); + } + return 0; +} + +int custom_peer_init(struct peerd *peer, int argc, char *argv[]) +{ + int ret = 0; + int i; + struct pfiled *pfiled = malloc(sizeof(struct pfiled)); + if (!pfiled){ + XSEGLOG2(&lc, E, "Out of memory"); + ret = -ENOMEM; + goto out; + } + peer->priv = pfiled; + + pfiled->maxfds = 2 * peer->nr_ops; + pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node)); + if(!pfiled->fdcache) { + XSEGLOG2(&lc, E, "Out of memory"); + ret = -ENOMEM; + goto out; + } + + for (i = 0; i < peer->nr_ops; i++) { + peer->peer_reqs[i].priv = malloc(sizeof(struct fio)); + if (!peer->peer_reqs->priv){ + XSEGLOG2(&lc, E, "Out of memory"); + ret = -ENOMEM; + goto out; + } + } + + pfiled->handled_reqs = 0; + for (i = 0; i < argc; i++) { + if (!strcmp(argv[i], "--path") && (i+1) < argc){ + strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE); + pfiled->path[MAX_PATH_SIZE] = 0; + i += 1; + continue; + } + if (!strcmp(argv[i], "--vpath") && (i+1) < argc){ + strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE); + pfiled->vpath[MAX_PATH_SIZE] = 0; + i += 1; + continue; + } + } + + pfiled->path_len = strlen(pfiled->path); + if (pfiled->path[pfiled->path_len -1] != '/'){ + pfiled->path[pfiled->path_len] = '/'; + pfiled->path[++pfiled->path_len]= 0; + } + + pfiled->vpath_len = strlen(pfiled->vpath); + if (pfiled->vpath[pfiled->vpath_len -1] != '/'){ + pfiled->vpath[pfiled->vpath_len] = '/'; + pfiled->vpath[++pfiled->vpath_len]= 0; + } + + for (i = 0; i < peer->nr_ops; i++) { + pthread_cond_init(&pfiled->fdcache[i].cond, NULL); + pfiled->fdcache[i].flags = READY; + } + pthread_mutex_init(&pfiled->cache_lock, NULL); + +out: + return ret; +} + +static int safe_atoi(char *s) +{ + long l; + char *endp; + + l = strtol(s, &endp, 10); + if (s != endp && *endp == '\0') + return l; + else + return -1; +} diff --git a/xseg/peers/user/peer.c b/xseg/peers/user/peer.c index 85e78ec..779beaf 100644 --- a/xseg/peers/user/peer.c +++ b/xseg/peers/user/peer.c @@ -398,6 +398,7 @@ static void* thread_loop(void *arg) xseg_cancel_wait(xseg, peer->portno_start); XSEGLOG2(&lc, I, "Thread %u woke up\n", (unsigned int) (t- peer->thread)); } + wake_up_next_thread(peer); return NULL; }