/*
+ * Copyright 2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+/*
* The Pithos File Blocker Peer (pfiled)
*/
#include <string.h>
#include <fcntl.h>
#include <errno.h>
-#include <aio.h>
#include <signal.h>
#include <limits.h>
#include <pthread.h>
#include <syscall.h>
#include <sys/sendfile.h>
#include <peer.h>
+#include <xtypes/xcache.h>
+#include <openssl/sha.h>
+#include <sys/resource.h>
#include <xseg/xseg.h>
#include <xseg/protocol.h>
+#ifndef SHA256_DIGEST_SIZE
+#define SHA256_DIGEST_SIZE 32
+#endif
+/* hex representation of sha256 value takes up double the sha256 size */
+#define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
+
+
+#define FIO_STR_ID_LEN 3
#define LOCK_SUFFIX "_lock"
+#define LOCK_SUFFIX_LEN 5
#define MAX_PATH_SIZE 1024
-#define MAX_FILENAME_SIZE (XSEG_MAX_TARGETLEN + 5) //strlen(LOCK_SUFFIX)
+#define MAX_FILENAME_SIZE (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + MAX_UNIQUESTR_LEN + FIO_STR_ID_LEN)
#define MAX_PREFIX_LEN 10
+#define MAX_UNIQUESTR_LEN 128
+#define SNAP_SUFFIX "_snap"
+#define SNAP_SUFFIX_LEN 5
-/* default concurrency level (number of threads) */
-#define DEFAULT_NR_OPS 16
+#define WRITE 1
+#define READ 2
/* Pithos hash for the zero block
* FIXME: Should it be hardcoded?
- */
#define ZERO_BLOCK \
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85"
+ */
/*
* Globals, holding command-line arguments
#define READY (1 << 1)
/* fdcache node info */
-struct fdcache_node {
+struct fdcache_entry {
volatile int fd;
- volatile unsigned int ref;
- volatile unsigned long time;
volatile unsigned int flags;
- pthread_cond_t cond;
- char target[MAX_FILENAME_SIZE + 1];
};
/* pfiled context */
struct pfiled {
- uint32_t path_len;
uint32_t vpath_len;
uint32_t prefix_len;
- uint64_t handled_reqs;
+ uint32_t uniquestr_len;
long maxfds;
- struct fdcache_node *fdcache;
- pthread_mutex_t cache_lock;
char path[MAX_PATH_SIZE + 1];
char vpath[MAX_PATH_SIZE + 1];
- char prefix[MAX_PREFIX_LEN];
+ char prefix[MAX_PREFIX_LEN + 1];
+ char uniquestr[MAX_UNIQUESTR_LEN + 1];
+ struct xcache cache;
};
/*
- * pfiled specific structure
+ * pfiled specific structure
* containing information on a pending I/O operation
*/
struct fio {
uint32_t state;
- long fdcacheidx;
+ xcache_handler h;
+ char str_id[FIO_STR_ID_LEN];
};
struct pfiled * __get_pfiled(struct peerd *peer)
return (struct fio*) pr->priv;
}
+
+/* hexlify function.
+ * Unsafe. Doesn't check if data length is odd!
+ */
+static void hexlify(unsigned char *data, char *hex)
+{
+ int i;
+ for (i=0; i<SHA256_DIGEST_LENGTH; i++)
+ sprintf(hex+2*i, "%02x", data[i]);
+}
+
+
+/* cache ops */
+static void * cache_node_init(void *p, void *xh)
+{
+ //struct peerd *peer = (struct peerd *)p;
+ //struct pfiled *pfiled = __get_pfiled(peer);
+ xcache_handler h = *(xcache_handler *)(xh);
+ struct fdcache_entry *fdentry = malloc(sizeof(struct fdcache_entry));
+ if (!fdentry)
+ return NULL;
+
+ XSEGLOG2(&lc, D, "Initialing node h: %llu with %p",
+ (long long unsigned)h, fdentry);
+
+ fdentry->fd = -1;
+ fdentry->flags = 0;
+
+ return fdentry;
+}
+
+static int cache_init(void *p, void *e)
+{
+ struct fdcache_entry *fdentry = (struct fdcache_entry *)e;
+
+ if (fdentry->fd != -1) {
+ XSEGLOG2(&lc, E, "Found invalid fd %d", fdentry->fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+static void cache_put(void *p, void *e)
+{
+ struct fdcache_entry *fdentry = (struct fdcache_entry *)e;
+
+ XSEGLOG2(&lc, D, "Putting entry %p with fd %d", fdentry, fdentry->fd);
+
+ if (fdentry->fd != -1)
+ close(fdentry->fd);
+
+ fdentry->fd = -1;
+ fdentry->flags = 0;
+ return;
+}
+
static void close_cache_entry(struct peerd *peer, struct peer_req *pr)
{
struct pfiled *pfiled = __get_pfiled(peer);
struct fio *fio = __get_fio(pr);
- int fd = -1;
- if (fio->fdcacheidx >= 0) {
- if (!__sync_sub_and_fetch(&pfiled->fdcache[fio->fdcacheidx].ref, 1) && !(pfiled->fdcache[fio->fdcacheidx].flags & READY)) {
- pthread_mutex_lock(&pfiled->cache_lock);
- if (!pfiled->fdcache[fio->fdcacheidx].ref){
- /* invalidate cache entry */
- fd = pfiled->fdcache[fio->fdcacheidx].fd;
- pfiled->fdcache[fio->fdcacheidx].fd = -1;
- pfiled->fdcache[fio->fdcacheidx].target[0] = 0;
- pfiled->fdcache[fio->fdcacheidx].flags |= READY;
- }
- pthread_mutex_unlock(&pfiled->cache_lock);
- if (fd > 0)
- close(fd);
-
- }
- }
+ if (fio->h != NoEntry)
+ xcache_put(&pfiled->cache, fio->h);
}
static void pfiled_complete(struct peerd *peer, struct peer_req *pr)
buf[pathlen + i + 3] = '\0';
retry:
if (stat(buf, &st) < 0)
- if (mkdir(buf, 0700) < 0) {
+ if (mkdir(buf, 0750) < 0) {
if (errno == EEXIST)
goto retry;
- perror(buf);
- return errno;
+ //perror(buf);
+ return -1;
}
}
}
return 0;
}
-static int dir_open(struct pfiled *pfiled, struct fio *io,
+static int is_target_valid_len(struct pfiled *pfiled, char *target,
+ uint32_t targetlen, int mode)
+{
+ if (targetlen > XSEG_MAX_TARGETLEN) {
+ XSEGLOG2(&lc, E, "Invalid targetlen %u, max: %u",
+ targetlen, XSEG_MAX_TARGETLEN);
+ return -1;
+ }
+ if (mode == WRITE || mode == READ) {
+ /*
+ * if name starts with prefix
+ * assert targetlen >= prefix_len + 6
+ * else
+ * assert targetlen >= 6
+ */
+ /* 6 chars are needed for the directory structrure */
+ if (!pfiled->prefix_len || strncmp(target, pfiled->prefix, pfiled->prefix_len)) {
+ if (targetlen < 6) {
+ XSEGLOG2(&lc, E, "Targetlen should be at least 6");
+ return -1;
+ }
+ } else {
+ if (targetlen < pfiled->prefix_len + 6) {
+ XSEGLOG2(&lc, E, "Targetlen should be at least prefix "
+ "len(%u) + 6", pfiled->prefix_len);
+ return -1;
+ }
+ }
+ } else {
+ XSEGLOG2(&lc, E, "Invalid mode");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int is_target_valid(struct pfiled *pfiled, char *target, int mode)
+{
+ return is_target_valid_len(pfiled, target, strlen(target), mode);
+}
+
+static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetlen)
+{
+ int r, fd;
+ char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
+ char error_str[1024];
+ char *path;
+
+ path = pfiled->vpath;
+ r = create_path(tmp, path, target, targetlen,
+ pfiled->prefix_len, 1);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Could not create path");
+ return -1;
+ }
+ XSEGLOG2(&lc, D, "Opening file %s with O_RDWR|O_CREAT", tmp);
+ fd = open(tmp, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+ if (fd < 0){
+ XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
+ return -1;
+ }
+ return fd;
+}
+
+static int open_file_read(struct pfiled *pfiled, char *target, uint32_t targetlen)
+{
+ int r, fd;
+ char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
+ char error_str[1024];
+ char *path;
+
+ path = pfiled->vpath;
+
+ r = create_path(tmp, pfiled->vpath, target, targetlen,
+ pfiled->prefix_len, 0);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Could not create path");
+ return -1;
+ }
+ XSEGLOG2(&lc, D, "Opening file %s with O_RDWR", tmp);
+ fd = open(tmp, O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+ if (fd < 0){
+ XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
+ return -1;
+ }
+ return fd;
+}
+
+static int open_file(struct pfiled *pfiled, char *target, uint32_t targetlen, int mode)
+{
+ if (mode == WRITE) {
+ return open_file_write(pfiled, target, targetlen);
+ } else if (mode == READ) {
+ return open_file_read(pfiled, target, targetlen);
+
+ } else {
+ XSEGLOG2(&lc, E, "Invalid mode for target");
+ }
+ return -1;
+}
+
+static int dir_open(struct pfiled *pfiled, struct fio *fio,
char *target, uint32_t targetlen, int mode)
{
- int fd = -1;
- struct fdcache_node *ce = NULL;
- long i, lru;
- char tmp[pfiled->path_len + targetlen + 10];
- uint64_t min;
- io->fdcacheidx = -1;
- if (targetlen> MAX_FILENAME_SIZE)
- goto out_err;
+ int r, fd;
+ struct fdcache_entry *e;
+ xcache_handler h = NoEntry, nh;
+ char name[XSEG_MAX_TARGETLEN + 1];
-start:
- /* check cache */
- pthread_mutex_lock(&pfiled->cache_lock);
-start_locked:
- lru = -1;
- min = UINT64_MAX;
- for (i = 0; i < pfiled->maxfds; i++) {
- if (pfiled->fdcache[i].ref == 0 && min > pfiled->fdcache[i].time
- && (pfiled->fdcache[i].flags & READY)) {
- min = pfiled->fdcache[i].time;
- lru = i;
+ if (targetlen > XSEG_MAX_TARGETLEN) {
+ XSEGLOG2(&lc, E, "Invalid targetlen %u, max: %u",
+ targetlen, XSEG_MAX_TARGETLEN);
+ return -1;
+ }
+ strncpy(name, target, targetlen);
+ name[targetlen] = 0;
+ XSEGLOG2(&lc, I, "Dir open started for %s", name);
+
+ h = xcache_lookup(&pfiled->cache, name);
+ if (h == NoEntry) {
+ r = is_target_valid_len(pfiled, target, targetlen, mode);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Invalid len for target %s", name);
+ goto out_err;
+ }
+ h = xcache_alloc_init(&pfiled->cache, name);
+ if (h == NoEntry) {
+ /* FIXME add waitq to wait for free */
+ XSEGLOG2(&lc, E, "Could not allocate cache entry for %s",
+ name);
+ goto out_err;
}
+ XSEGLOG2(&lc, D, "Allocated new handler %llu for %s",
+ (long long unsigned)h, name);
- if (!strncmp(pfiled->fdcache[i].target, target, targetlen)) {
- if (pfiled->fdcache[i].target[targetlen] == 0) {
- ce = &pfiled->fdcache[i];
- /* if any other io thread is currently opening
- * the file, block until it succeeds or fails
- */
- if (!(ce->flags & READY)) {
- pthread_cond_wait(&ce->cond, &pfiled->cache_lock);
- /* when ready, restart lookup */
- goto start_locked;
- }
- /* if successfully opened */
- if (ce->fd > 0) {
- fd = pfiled->fdcache[i].fd;
- io->fdcacheidx = i;
- goto out;
- }
- /* else open failed for the other io thread, so
- * it should fail for everyone waiting on this
- * file.
- */
- else {
- fd = -1;
- io->fdcacheidx = -1;
- goto out_err_unlock;
- }
- }
+ e = xcache_get_entry(&pfiled->cache, h);
+ if (!e) {
+ XSEGLOG2(&lc, E, "Alloced handler but no valid fd cache entry");
+ goto out_free;
}
- }
- if (lru < 0){
- /* all cache entries are currently being used */
- pthread_mutex_unlock(&pfiled->cache_lock);
- goto start;
- }
- if (pfiled->fdcache[lru].ref){
- fd = -1;
- XSEGLOG2(&lc, E, "lru(%ld) ref not 0 (%u)\n", lru, pfiled->fdcache[lru].ref);
- goto out_err_unlock;
- }
- /* make room for new file */
- ce = &pfiled->fdcache[lru];
- /* set name here and state to not ready, for any other requests on the
- * same target that may follow
- */
- strncpy(ce->target, target, targetlen);
- ce->target[targetlen] = 0;
- ce->flags &= ~READY;
- pthread_mutex_unlock(&pfiled->cache_lock);
-
- if (ce->fd >0){
- if (close(ce->fd) < 0){
- XSEGLOG2(&lc, W, "Cannot close %s", ce->target);
+
+ /* open/create file */
+ fd = open_file(pfiled, target, targetlen, mode);
+ if (fd < 0) {
+ XSEGLOG2(&lc, E, "Could not open file for target %s", name);
+ goto out_free;
+ }
+ XSEGLOG2(&lc, D, "Opened file %s. fd %d", name, fd);
+
+ e->fd = fd;
+
+ XSEGLOG2(&lc, D, "Inserting handler %llu for %s to fdcache",
+ (long long unsigned)h, name);
+ nh = xcache_insert(&pfiled->cache, h);
+ if (nh != h) {
+ XSEGLOG2(&lc, D, "Partial cache hit for %s. New handler %llu",
+ name, (long long unsigned)nh);
+ xcache_put(&pfiled->cache, h);
+ h = nh;
}
+ } else {
+ XSEGLOG2(&lc, D, "Cache hit for %s, handler: %llu", name,
+ (long long unsigned)h);
}
- /* try opening it from pithos blocker dir */
- if (create_path(tmp, pfiled->path, target, targetlen, 0, 0) < 0) {
- fd = -1;
- goto new_entry;
+ e = xcache_get_entry(&pfiled->cache, h);
+ if (!e) {
+ XSEGLOG2(&lc, E, "Found handler but no valid fd cache entry");
+ xcache_put(&pfiled->cache, h);
+ fio->h = NoEntry;
+ goto out_err;
}
+ fio->h = h;
- fd = open(tmp, O_RDWR);
- if (fd < 0) {
- /* try opening it from the tmp dir */
- if (create_path(tmp, pfiled->vpath, target, targetlen,
- pfiled->prefix_len, 0) < 0)
- goto new_entry;
-
- fd = open(tmp, O_RDWR);
- if (fd < 0) {
- if (create_path(tmp, pfiled->vpath, target, targetlen,
- pfiled->prefix_len, 1) < 0) {
- fd = -1;
- goto new_entry;
- }
+ //assert e->fd != -1 ?;
+ XSEGLOG2(&lc, I, "Dir open finished for %s", name);
+ return e->fd;
- fd = open(tmp, O_RDWR | O_CREAT, 0600);
- if (fd < 0)
- XSEGLOG2(&lc, E, "Cannot open %s", tmp);
+out_free:
+ xcache_free_new(&pfiled->cache, h);
+out_err:
+ XSEGLOG2(&lc, E, "Dir open failed for %s", name);
+ return -1;
+}
+
+static void handle_read(struct peerd *peer, struct peer_req *pr)
+{
+ struct pfiled *pfiled = __get_pfiled(peer);
+ struct fio *fio = __get_fio(pr);
+ struct xseg_request *req = pr->req;
+ int r, fd;
+ char *target = xseg_get_target(peer->xseg, req);
+ char *data = xseg_get_data(peer->xseg, req);
+
+ XSEGLOG2(&lc, I, "Handle read started for pr: %p, req: %p", pr, pr->req);
+
+ if (!req->size) {
+ pfiled_complete(peer, pr);
+ return;
+ }
+
+ fd = dir_open(pfiled, fio, target, req->targetlen, READ);
+ if (fd < 0){
+ if (errno != ENOENT) {
+ XSEGLOG2(&lc, E, "Open failed");
+ pfiled_fail(peer, pr);
+ return;
+ } else {
+ memset(data, 0, req->datalen);
+ req->serviced = req->datalen;
+ goto out;
}
}
- /* insert in cache a negative fd to indicate opening error to
- * any other ios waiting for the file to open
- */
- /* insert in cache */
-new_entry:
- pthread_mutex_lock(&pfiled->cache_lock);
- ce->fd = fd;
- ce->ref = 0;
- ce->flags = READY;
- pthread_cond_broadcast(&ce->cond);
- if (fd > 0) {
- io->fdcacheidx = lru;
- }
- else {
- io->fdcacheidx = -1;
- goto out_err_unlock;
+ while (req->serviced < req->datalen) {
+ r = pread(fd, data + req->serviced,
+ req->datalen - req->serviced,
+ req->offset + req->serviced);
+ if (r < 0) {
+ req->datalen = req->serviced;
+ XSEGLOG2(&lc, E, "Cannot read");
+ }
+ else if (r == 0) {
+ /* reached end of file. zero out the rest data buffer */
+ memset(data + req->serviced, 0, req->datalen - req->serviced);
+ req->serviced = req->datalen;
+ }
+ else {
+ req->serviced += r;
+ }
}
out:
- pfiled->handled_reqs++;
- ce->time = pfiled->handled_reqs;
- __sync_fetch_and_add(&ce->ref, 1);
- pthread_mutex_unlock(&pfiled->cache_lock);
-out_err:
- return fd;
-
-out_err_unlock:
- pthread_mutex_unlock(&pfiled->cache_lock);
- goto out_err;
+ if (req->serviced > 0 ) {
+ XSEGLOG2(&lc, I, "Handle read completed for pr: %p, req: %p",
+ pr, pr->req);
+ pfiled_complete(peer, pr);
+ }
+ else {
+ XSEGLOG2(&lc, E, "Handle read failed for pr: %p, req: %p",
+ pr, pr->req);
+ pfiled_fail(peer, pr);
+ }
+ return;
}
-static void handle_read_write(struct peerd *peer, struct peer_req *pr)
+static void handle_write(struct peerd *peer, struct peer_req *pr)
{
struct pfiled *pfiled = __get_pfiled(peer);
struct fio *fio = __get_fio(pr);
char *target = xseg_get_target(peer->xseg, req);
char *data = xseg_get_data(peer->xseg, req);
- fd = dir_open(pfiled, fio, target, req->targetlen, 0);
+ XSEGLOG2(&lc, I, "Handle write started for pr: %p, req: %p", pr, pr->req);
+
+ fd = dir_open(pfiled, fio, target, req->targetlen, WRITE);
if (fd < 0){
- XSEGLOG2(&lc, E, "Dir open failed");
+ XSEGLOG2(&lc, E, "Open failed");
pfiled_fail(peer, pr);
return;
}
}
}
- switch (req->op) {
- case X_READ:
- while (req->serviced < req->datalen) {
- r = pread(fd, data + req->serviced,
- req->datalen - req->serviced,
- req->offset + req->serviced);
- if (r < 0) {
- req->datalen = req->serviced;
- XSEGLOG2(&lc, E, "Cannot read");
- }
- else if (r == 0) {
- /* reached end of file. zero out the rest data buffer */
- memset(data + req->serviced, 0, req->datalen - req->serviced);
- req->serviced = req->datalen;
- }
- else {
- req->serviced += r;
- }
- }
- break;
- case X_WRITE:
- while (req->serviced < req->datalen) {
- r = pwrite(fd, data + req->serviced,
- req->datalen - req->serviced,
- req->offset + req->serviced);
- if (r < 0) {
- req->datalen = req->serviced;
- }
- else {
- req->serviced += r;
- }
- }
- r = fsync(fd);
- if (r< 0) {
- XSEGLOG2(&lc, E, "Fsync failed.");
- /* if fsync fails, then no bytes serviced correctly */
- req->serviced = 0;
- }
- break;
- default:
- XSEGLOG2(&lc, E, "wtf, corrupt op %u?\n", req->op);
- pfiled_fail(peer, pr);
- return;
+ while (req->serviced < req->datalen) {
+ r = pwrite(fd, data + req->serviced,
+ req->datalen - req->serviced,
+ req->offset + req->serviced);
+ if (r < 0) {
+ req->datalen = req->serviced;
+ }
+ else {
+ req->serviced += r;
+ }
+ }
+ r = fsync(fd);
+ if (r< 0) {
+ XSEGLOG2(&lc, E, "Fsync failed.");
+ /* if fsync fails, then no bytes serviced correctly */
+ req->serviced = 0;
}
if (req->serviced > 0 ) {
+ XSEGLOG2(&lc, I, "Handle write completed for pr: %p, req: %p",
+ pr, pr->req);
pfiled_complete(peer, pr);
}
else {
+ XSEGLOG2(&lc, E, "Handle write failed for pr: %p, req: %p",
+ pr, pr->req);
pfiled_fail(peer, pr);
}
return;
char *data = xseg_get_data(peer->xseg, req);
struct xseg_reply_info *xinfo = (struct xseg_reply_info *)data;
- fd = dir_open(pfiled, fio, target, req->targetlen, 0);
+ XSEGLOG2(&lc, I, "Handle info started for pr: %p, req: %p", pr, pr->req);
+ fd = dir_open(pfiled, fio, target, req->targetlen, READ);
if (fd < 0) {
XSEGLOG2(&lc, E, "Dir open failed");
pfiled_fail(peer, pr);
size = (uint64_t)stat.st_size;
xinfo->size = size;
+ XSEGLOG2(&lc, I, "Handle info completed for pr: %p, req: %p", pr, pr->req);
pfiled_complete(peer, pr);
}
struct pfiled *pfiled = __get_pfiled(peer);
struct fio *fio = __get_fio(pr);
struct xseg_request *req = pr->req;
-
char *target = xseg_get_target(peer->xseg, req);
char *data = xseg_get_data(peer->xseg, req);
struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
struct stat st;
char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
- int n, src = -1, dst = -1, r = -1;
+ int src = -1, dst = -1, r = -1;
+ ssize_t c = 0, bytes;
+ XSEGLOG2(&lc, I, "Handle copy started for pr: %p, req: %p", pr, pr->req);
if (!buf){
XSEGLOG2(&lc, E, "Out of memory");
pfiled_fail(peer, pr);
return;
}
- dst = dir_open(pfiled, fio, target, req->targetlen, 1);
+ r = is_target_valid_len(pfiled, xcopy->target, xcopy->targetlen, READ);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Source target not valid");
+ goto out;
+ }
+
+ dst = dir_open(pfiled, fio, target, req->targetlen, WRITE);
if (dst < 0) {
XSEGLOG2(&lc, E, "Fail in dst");
r = dst;
goto out;
}
- if (create_path(buf, pfiled->path, xcopy->target,
- xcopy->targetlen, 0, 0) < 0) {
+ r = create_path(buf, pfiled->path, xcopy->target, xcopy->targetlen, 0, 0);
+ if (r < 0) {
XSEGLOG2(&lc, E, "Create path failed");
r = -1;
goto out;
}
- src = open(buf, O_RDWR);
+ src = open(buf, O_RDONLY);
if (src < 0) {
XSEGLOG2(&lc, E, "fail in src %s", buf);
r = src;
goto out;
}
- n = sendfile(dst, src, 0, st.st_size);
- if (n != st.st_size) {
- XSEGLOG2(&lc, E, "Copy failed for %s", buf);
- r = -1;
- goto out;
+ c = 0;
+ while (c < st.st_size) {
+ bytes = sendfile(dst, src, NULL, st.st_size - c);
+ if (bytes < 0) {
+ XSEGLOG2(&lc, E, "Copy failed for %s", buf);
+ r = -1;
+ goto out;
+ }
+ c += bytes;
}
r = 0;
out:
+ req->serviced = c;
if (src > 0)
close(src);
free(buf);
- if (r < 0)
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Handle copy failed for pr: %p, req: %p", pr, pr->req);
pfiled_fail(peer, pr);
- else
+ } else {
+ XSEGLOG2(&lc, I, "Handle copy completed for pr: %p, req: %p", pr, pr->req);
pfiled_complete(peer, pr);
+ }
return;
}
static void handle_delete(struct peerd *peer, struct peer_req *pr)
{
struct pfiled *pfiled = __get_pfiled(peer);
- struct fio *fio = __get_fio(pr);
+ //struct fio *fio = __get_fio(pr);
struct xseg_request *req = pr->req;
-
+ char name[XSEG_MAX_TARGETLEN + 1];
char *buf = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
- int fd, r;
+ int r;
char *target = xseg_get_target(peer->xseg, req);
+
+ XSEGLOG2(&lc, I, "Handle delete started for pr: %p, req: %p", pr, pr->req);
+
if (!buf){
XSEGLOG2(&lc, E, "Out of memory");
pfiled_fail(peer, pr);
return;
}
- fd = dir_open(pfiled, fio, target, req->targetlen, 0);
- if (fd < 0) {
- XSEGLOG2(&lc, E, "Dir open failed");
- r = fd;
- goto out;
- }
- /* mark cache entry as invalid
- * give a chance to pending operations on this file to end.
- * file will close when all operations are done
- */
- if (fio->fdcacheidx >= 0) {
- pthread_mutex_lock(&pfiled->cache_lock);
- pfiled->fdcache[fio->fdcacheidx].flags &= ~READY;
- pthread_mutex_unlock(&pfiled->cache_lock);
+ r = is_target_valid_len(pfiled, target, req->targetlen, READ);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Target not valid");
+ goto out;
}
r = create_path(buf, pfiled->vpath, target, req->targetlen,
r = unlink(buf);
out:
free(buf);
- if (r < 0)
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Handle delete failed for pr: %p, req: %p", pr, pr->req);
pfiled_fail(peer, pr);
- else
+ } else {
+ strncpy(name, target, XSEG_MAX_TARGETLEN);
+ name[XSEG_MAX_TARGETLEN] = 0;
+ xcache_invalidate(&pfiled->cache, name);
+ XSEGLOG2(&lc, I, "Handle delete completed for pr: %p, req: %p", pr, pr->req);
pfiled_complete(peer, pr);
+ }
return;
}
-static void handle_open(struct peerd *peer, struct peer_req *pr)
+static void handle_snapshot(struct peerd *peer, struct peer_req *pr)
{
+ //open src
+ //read all file
+ //sha256 hash
+ //stat (open without create)
+ //write to hash_tmpfile
+ //link file
+ int src = -1, dst = -1, r = -1, pos;
+ ssize_t c;
+ uint64_t sum, written, trailing_zeros;
struct pfiled *pfiled = __get_pfiled(peer);
-// struct fio *fio = __get_fio(pr);
+ struct fio *fio = __get_fio(pr);
+ struct xseg_request *req = pr->req;
+ char *pathname = NULL, *tmpfile_pathname = NULL, *path = NULL, *tmpfile = NULL;
+ char *target, *data;
+ char snapshot_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
+ char name[XSEG_MAX_TARGETLEN + 1];
+
+ unsigned char *object_data = NULL;
+ unsigned char sha[SHA256_DIGEST_SIZE];
+ struct xseg_request_snapshot *xsnapshot;
+ struct xseg_reply_snapshot *xreply;
+
+ target = xseg_get_target(peer->xseg, req);
+ data = xseg_get_data(peer->xseg, req);
+ xsnapshot = (struct xseg_request_snapshot *)data;
+ (void)xsnapshot; //ignore it
+
+ XSEGLOG2(&lc, I, "Handle snapshot started for pr: %p, req: %p", pr, pr->req);
+
+ if (!req->size) {
+ XSEGLOG2(&lc, E, "No request size provided");
+ r = -1;
+ goto out;
+ }
+
+ r = is_target_valid_len(pfiled, target, req->targetlen, READ);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Source target not valid");
+ goto out;
+ }
+
+ strncpy(name, target, req->targetlen);
+ name[req->targetlen] = 0;
+
+ pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+ object_data = malloc(sizeof(char) * req->size);
+ if (!pathname || !object_data){
+ XSEGLOG2(&lc, E, "Out of memory");
+ goto out;
+ }
+
+ src = dir_open(pfiled, fio, target, req->targetlen, READ);
+ if (src < 0) {
+ XSEGLOG2(&lc, E, "Fail in src");
+ r = dst;
+ goto out;
+ }
+
+ sum = 0;
+ while (sum < req->size) {
+ c = pread(src, object_data + sum, req->size - sum, sum);
+ if (c < 0) {
+ XSEGLOG2(&lc, E, "Error reading from source");
+ r = -1;
+ goto out;
+ }
+ if (c == 0) {
+ break;
+ }
+ sum += c;
+ }
+
+ //rstrip here in case zeros were written in the end
+ trailing_zeros = 0;
+ for (;trailing_zeros < sum; trailing_zeros++)
+ if (object_data[sum - trailing_zeros - 1])
+ break;
+
+ XSEGLOG2(&lc, D, "Read %llu, Trainling zeros %llu",
+ sum, trailing_zeros);
+
+ sum -= trailing_zeros;
+ //calculate snapshot name
+ SHA256(object_data, sum, sha);
+
+ r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
+ sizeof(struct xseg_reply_snapshot));
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Resize request failed");
+ r = -1;
+ goto out;
+ }
+
+ xreply = (struct xseg_reply_snapshot *)xseg_get_data(peer->xseg, req);
+ hexlify(sha, xreply->target);
+ xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
+ hexlify(sha, snapshot_name);
+ snapshot_name[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
+
+
+ path = pfiled->vpath;
+ r = create_path(pathname, path, xreply->target, xreply->targetlen, 0, 1);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Create path failed");
+ r = -1;
+ goto out;
+ }
+
+
+
+ dst = open(pathname, O_WRONLY);
+ if (dst > 0) {
+ XSEGLOG2(&lc, I, "%s already exists, no write needed", pathname);
+ r = 0;
+ goto out;
+ }
+
+ tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+ if (!tmpfile_pathname){
+ XSEGLOG2(&lc, E, "Out of memory");
+ r = -1;
+ goto out;
+ }
+
+ tmpfile = malloc(MAX_FILENAME_SIZE);
+ if (!tmpfile){
+ XSEGLOG2(&lc, E, "Out of memory");
+ r = -1;
+ goto out;
+ }
+
+ pos = 0;
+ strncpy(tmpfile + pos, target, req->targetlen);
+ pos += req->targetlen;
+ strncpy(tmpfile + pos, SNAP_SUFFIX, SNAP_SUFFIX_LEN);
+ pos += SNAP_SUFFIX_LEN;
+ strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
+ pos += pfiled->uniquestr_len;
+ strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
+ pos += FIO_STR_ID_LEN;
+ tmpfile[pos] = 0;
+
+ r = create_path(tmpfile_pathname, path, tmpfile, pos, 0, 0);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Create path failed");
+ r = -1;
+ goto out;
+ }
+
+ XSEGLOG2(&lc, D, "Opening %s", tmpfile_pathname);
+ dst = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+ if (dst < 0) {
+ if (errno != EEXIST){
+ XSEGLOG2(&lc, E, "Error opening %s", tmpfile_pathname);
+ } else {
+ XSEGLOG2(&lc, E, "Error opening %s. Stale data found.",
+ tmpfile_pathname);
+ }
+ r = -1;
+ goto out;
+ }
+ XSEGLOG2(&lc, D, "Opened %s", tmpfile_pathname);
+
+ written = 0;
+ while (written < sum) {
+ c = write(dst, object_data + written, sum - written);
+ if (c < 0) {
+ XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
+ r = -1;
+ goto out_unlink;
+ }
+ written += c;
+ }
+
+ r = link(tmpfile_pathname, pathname);
+ if (r < 0 && errno != EEXIST) {
+ XSEGLOG2(&lc, E, "Error linking snapshot file %s. Errno %d",
+ pathname, errno);
+ r = -1;
+ goto out_unlink;
+ }
+
+ r = unlink(tmpfile_pathname);
+ if (r < 0) {
+ XSEGLOG2(&lc, W, "Error unlinking snapshot file %s", tmpfile_pathname);
+ }
+ req->serviced = req->size;
+ r = 0;
+
+out:
+ if (dst > 0) {
+ close(dst);
+ }
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Handle snapshot failed for pr: %p, req: %p. Target %s", pr, pr->req, name);
+ pfiled_fail(peer, pr);
+ } else {
+ XSEGLOG2(&lc, I, "Handle snapshot completed for pr: %p, req: %p\n\t"
+ "Snapshotted %s to %s", pr, pr->req, name, snapshot_name);
+ pfiled_complete(peer, pr);
+ }
+ free(tmpfile_pathname);
+ free(pathname);
+ free(object_data);
+ return;
+
+out_unlink:
+ unlink(tmpfile_pathname);
+ goto out;
+}
+
+static int __locked_by(char *lockfile, char *expected, uint32_t expected_len)
+{
+ int ret = -1;
+ int r, fd;
+ uint32_t len;
+ char tmpbuf[MAX_UNIQUESTR_LEN];
+
+ XSEGLOG2(&lc, D, "Started. Lockfile: %s, expected: %s, expected_len: %u", lockfile, expected, expected_len);
+ fd = open(lockfile, O_RDONLY, S_IRWXU | S_IRUSR);
+ if (fd < 0) {
+ if (errno != ENOENT){
+ XSEGLOG2(&lc, E, "Error opening %s", lockfile);
+ } else {
+ //-2 == retry
+ XSEGLOG2(&lc, I, "lock file removed");
+ ret = -2;
+ }
+ goto out;
+ }
+ r = pread(fd, tmpbuf, MAX_UNIQUESTR_LEN, 0);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Error reading from %s", lockfile);
+ close(fd);
+ goto out;
+ }
+ len = (uint32_t)r;
+ XSEGLOG2(&lc, D, "Read %u bytes", len);
+ r = close(fd);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Could not close lockfile %s", lockfile);
+ goto out;
+ }
+ if (len == expected_len && !strncmp(tmpbuf, expected, expected_len)){
+ XSEGLOG2(&lc, D, "Lock file %s locked by us.", lockfile);
+ ret = 0;
+ }
+out:
+ XSEGLOG2(&lc, D, "Finished. Lockfile: %s", lockfile);
+ return ret;
+}
+
+static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
+ uint32_t flags, int fd)
+{
+ int r;
+ XSEGLOG2(&lc, D, "Started. Lockfile: %s, Tmpfile:%s", lockfile, tmpfile);
+ r = pwrite(fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
+ if (r < 0) {
+ return -1;
+ }
+ r = fsync(fd);
+ if (r < 0) {
+ return -1;
+ }
+
+ while (link(tmpfile, lockfile) < 0) {
+ //actual error
+ if (errno != EEXIST){
+ XSEGLOG2(&lc, E, "Error linking %s to %s",
+ tmpfile, lockfile);
+ return -1;
+ }
+ r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len);
+ if (!r) {
+ break;
+ }
+ if (flags & XF_NOSYNC) {
+ XSEGLOG2(&lc, D, "Could not get lock file %s, "
+ "XF_NOSYNC set. Aborting", lockfile);
+ return -1;
+ }
+ sleep(1);
+ }
+ XSEGLOG2(&lc, D, "Finished. Lockfile: %s", lockfile);
+ return 0;
+}
+
+static void handle_acquire(struct peerd *peer, struct peer_req *pr)
+{
+ int r, ret = -1;
+ struct pfiled *pfiled = __get_pfiled(peer);
+ struct fio *fio = __get_fio(pr);
struct xseg_request *req = pr->req;
char *buf = malloc(MAX_FILENAME_SIZE);
- char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
- int fd = -1;
+ char *tmpfile = malloc(MAX_FILENAME_SIZE);
+ char *lockfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+ char *tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+ int fd = -1, pos;
char *target = xseg_get_target(peer->xseg, req);
+ uint32_t buf_len, tmpfile_len;
- if (!buf || !pathname) {
+ if (!buf || !tmpfile_pathname || !lockfile_pathname) {
XSEGLOG2(&lc, E, "Out of memory");
pfiled_fail(peer, pr);
return;
}
- strncpy(buf, target, req->targetlen);
- strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
+ r = is_target_valid_len(pfiled, target, req->targetlen, READ);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Target not valid");
+ goto out;
+ }
+
+
+ pos = 0;
+ strncpy(buf + pos, target, req->targetlen);
+ pos = req->targetlen;
+ strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
+ pos += LOCK_SUFFIX_LEN;
+ buf[pos] = 0;
+ buf_len = pos;
+
+ XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
+
+
+ pos = 0;
+ strncpy(tmpfile + pos, buf, buf_len);
+ pos += buf_len;
+ strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
+ pos += pfiled->uniquestr_len;
+ strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
+ pos += FIO_STR_ID_LEN;
+ tmpfile[pos] = 0;
+ tmpfile_len = pos;
XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
- if (create_path(pathname, pfiled->vpath, buf,
- req->targetlen + strlen(LOCK_SUFFIX),
- pfiled->prefix_len, 1) < 0) {
+ if (create_path(tmpfile_pathname, pfiled->vpath, tmpfile,
+ tmpfile_len, pfiled->prefix_len, 1) < 0) {
XSEGLOG2(&lc, E, "Create path failed for %s", buf);
goto out;
}
+ if (create_path(lockfile_pathname, pfiled->vpath, buf,
+ buf_len, pfiled->prefix_len, 1) < 0) {
+ XSEGLOG2(&lc, E, "Create path failed for %s", buf);
+ goto out;
+ }
+
+ //create exclusive unique lockfile (block_uniqueid+target)
+ //if (OK)
+ // write blocker uniqueid to the unique lockfile
+ // try to link it to the lockfile
+ // if (OK)
+ // unlink unique lockfile;
+ // complete
+ // else
+ // spin while not able to link
+
//nfs v >= 3
- while ((fd = open(pathname, O_CREAT | O_EXCL, S_IRWXU | S_IRUSR)) < 0){
+ XSEGLOG2(&lc, D, "Tmpfile: %s", tmpfile_pathname);
+ fd = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
+ if (fd < 0) {
//actual error
if (errno != EEXIST){
- XSEGLOG2(&lc, W, "Error opening %s", pathname);
+ XSEGLOG2(&lc, E, "Error opening %s", tmpfile_pathname);
goto out;
+ } else {
+ XSEGLOG2(&lc, E, "Error opening %s. Stale data found.",
+ tmpfile_pathname);
+ }
+ ret = -1;
+ } else {
+ XSEGLOG2(&lc, D, "Tmpfile %s created. Trying to get lock",
+ tmpfile_pathname);
+ r = __try_lock(pfiled, tmpfile_pathname, lockfile_pathname,
+ req->flags, fd);
+ if (r < 0){
+ XSEGLOG2(&lc, E, "Trying to get lock %s failed", buf);
+ ret = -1;
+ } else {
+ XSEGLOG2(&lc, D, "Trying to get lock %s succeed", buf);
+ ret = 0;
+ }
+ r = close(fd);
+ if (r < 0) {
+ XSEGLOG2(&lc, W, "Error closing %s", tmpfile_pathname);
+ }
+ r = unlink(tmpfile_pathname);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Error unlinking %s", tmpfile_pathname);
}
- if (req->flags & XF_NOSYNC)
- goto out;
- sleep(1);
}
- close(fd);
out:
- free(buf);
- free(pathname);
- if (fd < 0){
+ if (ret < 0){
XSEGLOG2(&lc, I, "Failed to acquire lock %s", buf);
pfiled_fail(peer, pr);
}
XSEGLOG2(&lc, I, "Acquired lock %s", buf);
pfiled_complete(peer, pr);
}
+ free(buf);
+ free(lockfile_pathname);
+ free(tmpfile_pathname);
return;
}
-static void handle_close(struct peerd *peer, struct peer_req *pr)
+static void handle_release(struct peerd *peer, struct peer_req *pr)
{
struct pfiled *pfiled = __get_pfiled(peer);
// struct fio *fio = __get_fio(pr);
struct xseg_request *req = pr->req;
- char *buf = malloc(MAX_FILENAME_SIZE);
- char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
+ char *buf = malloc(MAX_FILENAME_SIZE + 1);
+ char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
+ char *tmpbuf = malloc(MAX_UNIQUESTR_LEN + 1);
char *target = xseg_get_target(peer->xseg, req);
- int r;
+ int r, pos;
if (!buf || !pathname) {
XSEGLOG2(&lc, E, "Out of memory");
return;
}
- strncpy(buf, target, req->targetlen);
- strncpy(buf+req->targetlen, LOCK_SUFFIX, strlen(LOCK_SUFFIX));
+ r = is_target_valid_len(pfiled, target, req->targetlen, READ);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Target not valid");
+ goto out;
+ }
+
+ pos = 0;
+ strncpy(buf + pos, target, req->targetlen);
+ pos += req->targetlen;
+ strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
+ pos += LOCK_SUFFIX_LEN;
+ buf[pos] = 0;
+
+ XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
r = create_path(pathname, pfiled->vpath, buf,
req->targetlen + strlen(LOCK_SUFFIX),
XSEGLOG2(&lc, E, "Create path failed for %s", buf);
goto out;
}
- r = unlink(pathname);
+
+ if ((req->flags & XF_FORCE) || !__locked_by(pathname, pfiled->uniquestr,
+ pfiled->uniquestr_len)) {
+ r = unlink(pathname);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Could not unlink %s", pathname);
+ goto out;
+ }
+ } else {
+ r = -1;
+ }
out:
- free(buf);
- free(pathname);
- if (r < 0)
+ if (r < 0) {
fail(peer, pr);
- else
+ }
+ else {
+ XSEGLOG2(&lc, I, "Released lockfile: %s", buf);
complete(peer, pr);
+ }
+ XSEGLOG2(&lc, I, "Finished. Lockfile: %s", buf);
+ free(buf);
+ free(tmpbuf);
+ free(pathname);
return;
}
enum dispatch_reason reason)
{
struct fio *fio = __get_fio(pr);
- if (reason == dispatch_accept){
- fio->fdcacheidx = -1;
- fio->state = XS_ACCEPTED;
- }
-
+ if (reason == dispatch_accept)
+ fio->h = NoEntry;
+
switch (req->op) {
case X_READ:
+ handle_read(peer, pr); break;
case X_WRITE:
- handle_read_write(peer, pr); break;
+ handle_write(peer, pr); break;
case X_INFO:
handle_info(peer, pr); break;
case X_COPY:
handle_copy(peer, pr); break;
case X_DELETE:
handle_delete(peer, pr); break;
- case X_OPEN:
- handle_open(peer, pr); break;
- case X_CLOSE:
- handle_close(peer, pr); break;
- // case X_SNAPSHOT:
+ case X_ACQUIRE:
+ handle_acquire(peer, pr); break;
+ case X_RELEASE:
+ handle_release(peer, pr); break;
+ case X_SNAPSHOT:
+ handle_snapshot(peer, pr); break;
case X_SYNC:
default:
handle_unknown(peer, pr);
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
{
+ /*
+ get blocks,maps paths
+ get optional pithos block,maps paths
+ get fdcache size
+ check if greater than limit (tip: getrlimit)
+ assert cachesize greater than nr_ops
+ assert nr_ops greater than nr_threads
+ get prefix
+ */
+
int ret = 0;
- int i;
+ int i, r;
+ struct fio *fio;
struct pfiled *pfiled = malloc(sizeof(struct pfiled));
+ struct rlimit rlim;
+ struct xcache_ops c_ops = {
+ .on_node_init = cache_node_init,
+ .on_init = cache_init,
+ .on_put = cache_put,
+ };
if (!pfiled){
XSEGLOG2(&lc, E, "Out of memory");
ret = -ENOMEM;
peer->priv = pfiled;
pfiled->maxfds = 2 * peer->nr_ops;
- pfiled->fdcache = calloc(pfiled->maxfds, sizeof(struct fdcache_node));
- if(!pfiled->fdcache) {
- XSEGLOG2(&lc, E, "Out of memory");
- ret = -ENOMEM;
- goto out;
- }
for (i = 0; i < peer->nr_ops; i++) {
peer->peer_reqs[i].priv = malloc(sizeof(struct fio));
ret = -ENOMEM;
goto out;
}
+ fio = __get_fio(&peer->peer_reqs[i]);
+ fio->str_id[0] = '_';
+ fio->str_id[1] = 'a' + (i / 26);
+ fio->str_id[2] = 'a' + (i % 26);
}
pfiled->vpath[0] = 0;
pfiled->path[0] = 0;
- pfiled->handled_reqs = 0;
- /*
- for (i = 0; i < argc; i++) {
- if (!strcmp(argv[i], "--pithos") && (i+1) < argc){
- strncpy(pfiled->path, argv[i+1], MAX_PATH_SIZE);
- pfiled->path[MAX_PATH_SIZE] = 0;
- i += 1;
- continue;
- }
- if (!strcmp(argv[i], "--archip") && (i+1) < argc){
- strncpy(pfiled->vpath, argv[i+1], MAX_PATH_SIZE);
- pfiled->vpath[MAX_PATH_SIZE] = 0;
- i += 1;
- continue;
- }
- if (!strcmp(argv[i], "--prefix") && (i+1) < argc){
- strncpy(pfiled->prefix, argv[i+1], MAX_PREFIX_LEN);
- pfiled->prefix[MAX_PREFIX_LEN] = 0;
- i += 1;
- continue;
- }
- }
- */
+ pfiled->prefix[0] = 0;
+ pfiled->uniquestr[0] = 0;
+
BEGIN_READ_ARGS(argc, argv);
- READ_ARG_STRING("--pithos", pfiled->path, MAX_PATH_SIZE);
+ READ_ARG_ULONG("--fdcache", pfiled->maxfds);
READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
+ READ_ARG_STRING("--uniquestr", pfiled->uniquestr, MAX_UNIQUESTR_LEN);
END_READ_ARGS();
-
+ pfiled->uniquestr_len = strlen(pfiled->uniquestr);
pfiled->prefix_len = strlen(pfiled->prefix);
- //TODO test path exist
- pfiled->path_len = strlen(pfiled->path);
- if (!pfiled->path_len){
- XSEGLOG2(&lc, E, "Pithos path was not provided");
- return -1;
- }
- if (pfiled->path[pfiled->path_len -1] != '/'){
- pfiled->path[pfiled->path_len] = '/';
- pfiled->path[++pfiled->path_len]= 0;
- }
-
+ //TODO test path exist/is_dir/have_access
pfiled->vpath_len = strlen(pfiled->vpath);
if (!pfiled->vpath_len){
- XSEGLOG2(&lc, E, "Archipelagos path was not provided");
+ XSEGLOG2(&lc, E, "Archipelago path was not provided");
+ usage(argv[0]);
return -1;
}
if (pfiled->vpath[pfiled->vpath_len -1] != '/'){
pfiled->vpath[++pfiled->vpath_len]= 0;
}
- for (i = 0; i < peer->nr_ops; i++) {
- pthread_cond_init(&pfiled->fdcache[i].cond, NULL);
- pfiled->fdcache[i].flags = READY;
+ r = getrlimit(RLIMIT_NOFILE, &rlim);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Could not get limit for max fds");
+ return -1;
+ }
+ //check max fds. (> fdcache + nr_ops)
+ //TODO assert fdcache > 2*nr_ops or add waitq
+ if (rlim.rlim_cur < pfiled->maxfds + peer->nr_ops - 4) {
+ XSEGLOG2(&lc, E, "FD limit %d is less than fdcache + nr_ops -4(%u)",
+ rlim.rlim_cur, pfiled->maxfds + peer->nr_ops - 4);
+ return -1;
}
- pthread_mutex_init(&pfiled->cache_lock, NULL);
+ r = xcache_init(&pfiled->cache, pfiled->maxfds, &c_ops, XCACHE_LRU_HEAP, peer);
+ if (r < 0)
+ return -1;
out:
return ret;
void custom_peer_finalize(struct peerd *peer)
{
+ /*
+ we could close all fds, but we can let the system do it for us.
+ */
return;
}
+/*
static int safe_atoi(char *s)
{
long l;
else
return -1;
}
+*/