filed: Initial dummy approach to directio
authorFilippos Giannakos <philipgian@grnet.gr>
Wed, 18 Sep 2013 09:09:39 +0000 (12:09 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Tue, 28 Jan 2014 15:01:43 +0000 (17:01 +0200)
xseg/peers/user/filed.c
xseg/tools/archipelago/archipelago/common.py

index 5e35839..afbe284 100644 (file)
@@ -38,6 +38,7 @@
 
 #define _GNU_SOURCE
 #include <stdio.h>
+#include <stdarg.h>
 #include <stdlib.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -109,6 +110,7 @@ struct pfiled {
        uint32_t prefix_len;
        uint32_t uniquestr_len;
        long maxfds;
+       uint32_t directio;
        char vpath[MAX_PATH_SIZE + 1];
        char prefix[MAX_PREFIX_LEN + 1];
        char uniquestr[MAX_UNIQUESTR_LEN + 1];
@@ -225,6 +227,41 @@ static void get_dirs(char buf[6], struct pfiled *pfiled, char *target, uint32_t
        return;
 }
 
+static int strnjoin(char *dest, int n, ...)
+{
+       int pos, i;
+       va_list ap;
+       char *s;
+       int l;
+
+       pos = 0;
+       va_start(ap, n);
+       for (i = 0; i < n; i++) {
+               s = va_arg(ap, char *);
+               l = va_arg(ap, int);
+               strncpy(dest + pos, s, l);
+               pos += l;
+       }
+       dest[pos] = 0;
+       va_end(ap);
+
+       return pos;
+}
+
+static int strjoin(char *dest, char *f, int f_len, char *s, int s_len)
+{
+       int pos;
+
+       pos = 0;
+       strncpy(dest + pos, f, f_len);
+       pos += f_len;
+       strncpy(dest + pos, s, s_len);
+       pos += s_len;
+       dest[pos] = 0;
+
+       return f_len + s_len;
+}
+
 static int create_path(char *buf, struct pfiled *pfiled, char *target,
                        uint32_t targetlen, int mkdirs)
 {
@@ -261,6 +298,284 @@ retry:
        return 0;
 }
 
+
+static ssize_t persisting_read(int fd, void *data, size_t size, off_t offset)
+{
+       ssize_t r = 0, sum = 0;
+       char error_str[1024];
+       XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
+
+       while (sum < size) {
+               XSEGLOG2(&lc, D, "read: %llu, (aligned)size: %llu", sum, size);
+               r = pread(fd, (char *)data + sum, size - sum, offset + sum);
+               if (r < 0) {
+                       XSEGLOG2(&lc, E, "fd: %d, Error: %s", fd, strerror_r(errno, error_str, 1023));
+                       break;
+               } else if (r == 0) {
+                       break;
+               } else {
+                       sum += r;
+               }
+       }
+       XSEGLOG2(&lc, D, "read: %llu, (aligned)size: %llu", sum, size);
+
+       if (sum == 0 && r < 0) {
+               sum = r;
+       }
+       XSEGLOG2(&lc, D, "Finished. Read %d, r = %d", sum, r);
+
+       return sum;
+}
+
+static ssize_t persisting_write(int fd, void *data, size_t size, off_t offset)
+{
+       ssize_t r = 0, sum = 0;
+
+       XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
+       while (sum < size) {
+               XSEGLOG2(&lc, D, "written: %llu, (aligned)size: %llu", sum, size);
+               r = pwrite(fd, (char *)data + sum, size - sum, offset + sum);
+               if (r < 0) {
+                       break;
+               } else {
+                       sum += r;
+               }
+       }
+       XSEGLOG2(&lc, D, "written: %llu, (aligned)size: %llu", sum, size);
+
+       if (sum == 0 && r < 0) {
+               sum = r;
+       }
+       XSEGLOG2(&lc, D, "Finished. Wrote %d, r = %d", sum, r);
+
+       return sum;
+}
+
+static ssize_t aligned_read(int fd, void *data, ssize_t size, off_t offset, int alignment)
+{
+       char *tmp_data;
+       ssize_t r;
+       size_t misaligned_data, misaligned_size, misaligned_offset;
+       off_t aligned_offset=offset;
+       size_t aligned_size=size;
+
+       misaligned_data = (unsigned long)data % alignment;
+       misaligned_size = size % alignment;
+       misaligned_offset = offset % alignment;
+       XSEGLOG2(&lc, D, "misaligned_data: %u, misaligned_size: %u, misaligned_offset: %u", misaligned_data, misaligned_size, misaligned_offset);
+       if (misaligned_data || misaligned_size || misaligned_offset) {
+               aligned_offset = offset - misaligned_offset;
+               aligned_size = size + misaligned_offset;
+
+               misaligned_size = aligned_size % alignment;
+               aligned_size = aligned_size - misaligned_size + alignment;
+               r = posix_memalign(&tmp_data, alignment, aligned_size);
+               if (r < 0) {
+                       return -1;
+               }
+       } else {
+               tmp_data = data;
+               aligned_offset = offset;
+               aligned_size = size;
+       }
+
+       XSEGLOG2(&lc, D, "aligned_data: %u, aligned_size: %u, aligned_offset: %u", tmp_data, aligned_size, aligned_offset);
+       r = persisting_read(fd, tmp_data, aligned_size, aligned_offset);
+
+       //FIXME if r < size ?
+       if (tmp_data != data) {
+               memcpy(data, tmp_data + misaligned_offset, size);
+               free(tmp_data);
+       }
+       if (r >= size)
+               r = size;
+       return r;
+}
+
+pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
+
+int __fcntl_lock(int fd, off_t start, off_t len)
+{
+       return pthread_mutex_lock(&m);
+}
+
+int __fcntl_unlock(int fd, off_t start, off_t len)
+{
+       return pthread_mutex_unlock(&m);
+}
+
+static ssize_t aligned_write(int fd, void *data, size_t size, off_t offset, int alignment)
+{
+       int locked = 0;
+       char *tmp_data;
+       ssize_t r;
+       size_t misaligned_data, misaligned_size, misaligned_offset;
+       size_t aligned_size = size, aligned_offset = offset, read_size;
+       misaligned_data = (unsigned long)data % alignment;
+       misaligned_size = size % alignment;
+       misaligned_offset = offset % alignment;
+       if (misaligned_data || misaligned_size || misaligned_offset) {
+               //if somthing is misaligned then:
+               //
+               // First check if the offset was missaligned.
+               aligned_offset = offset - misaligned_offset;
+
+               // Then adjust the size with the misaligned offset and check if
+               // it remains misaligned.
+               aligned_size = size + misaligned_offset;
+               misaligned_size = aligned_size % alignment;
+
+               // in case there is no misaligned_size
+               if (misaligned_size)
+                       aligned_size = aligned_size + alignment - misaligned_size;
+
+               // Allocate aligned memory
+               r = posix_memalign(&tmp_data, alignment, aligned_size);
+               if (r < 0) {
+                       return -1;
+               }
+
+               XSEGLOG2(&lc, D, "fd: %d, misaligned_data: %u, misaligned_size: %u, misaligned_offset: %u", fd, misaligned_data, misaligned_size, misaligned_offset);
+               XSEGLOG2(&lc, D, "fd: %d, aligned_data: %u, aligned_size: %u, aligned_offset: %u", fd, tmp_data, aligned_size, aligned_offset);
+               XSEGLOG2(&lc, D, "fd: %d, locking from %u to %u", fd, aligned_offset, aligned_offset + aligned_size);
+               __fcntl_lock(fd, aligned_offset, aligned_size + alignment - misaligned_size);
+               locked = 1;
+
+               if (misaligned_offset) {
+                       XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
+                       /* read misaligned_offset */
+                       read_size = alignment;
+                       r = persisting_read(fd, tmp_data, alignment, aligned_offset);
+                       if (r < 0) {
+                               free(tmp_data);
+                               return -1;
+                       } else if (r != read_size) {
+                               memset(tmp_data + r, 0, read_size - r);
+                       }
+               }
+
+               if (misaligned_size) {
+                       read_size = alignment;
+                       r = persisting_read(fd, tmp_data + aligned_size - alignment, alignment,
+                                       aligned_offset + aligned_size - alignment);
+                       if (r < 0) {
+                               free(tmp_data);
+                               return -1;
+                       } else if (r != read_size) {
+                               memset(tmp_data + aligned_size - alignment + r, 0, read_size - r);
+                       }
+               }
+               memcpy(tmp_data + misaligned_offset, data, size);
+       } else {
+               aligned_size = size;
+               aligned_offset = offset;
+               tmp_data = data;
+       }
+
+       r = persisting_write(fd, tmp_data, aligned_size, aligned_offset);
+
+       if (locked) {
+               XSEGLOG2(&lc, D, "fd: %d, unlocking from %u to %u", fd, aligned_offset, aligned_offset + aligned_size);
+               __fcntl_unlock(fd, aligned_offset, aligned_size + alignment - misaligned_size);
+       }
+       if (tmp_data != data) {
+               free(tmp_data);
+       }
+
+       if (r >= size)
+               r = size;
+       return r;
+}
+
+static ssize_t filed_write(int fd, void *data, size_t size, off_t offset, int direct)
+{
+       if (direct)
+               return aligned_write(fd, data, size, offset, 512);
+       else
+               return persisting_write(fd, data, size, offset);
+}
+
+static ssize_t filed_read(int fd, void *data, size_t size, off_t offset, int direct)
+{
+       if (direct)
+               return aligned_read(fd, data, size, offset, 512);
+       else
+               return persisting_read(fd, data, size, offset);
+}
+
+static ssize_t pfiled_read(struct pfiled *pfiled, int fd, void *data, size_t size, off_t offset)
+{
+       return filed_read(fd, data, size, offset, pfiled->directio);
+}
+
+static ssize_t pfiled_write(struct pfiled *pfiled, int fd, void *data, size_t size, off_t offset)
+{
+       return filed_write(fd, data, size, offset, pfiled->directio);
+}
+
+static ssize_t generic_io_path(char *path, void *data, size_t size, off_t offset, int write, int flags, mode_t mode)
+{
+       int fd;
+       ssize_t r;
+
+       fd = open(path, flags, mode);
+       if (fd < 0) {
+               return -1;
+       }
+       XSEGLOG2(&lc, D, "Opened file %s as fd %d", path, fd);
+
+       if (write) {
+               r = filed_write(fd, data, size, offset, flags & O_DIRECT);
+       } else {
+               r = filed_read(fd, data, size, offset, flags & O_DIRECT);
+       }
+
+       close(fd);
+
+       return r;
+}
+
+static ssize_t read_path(char *path, void *data, size_t size, off_t offset, int direct)
+{
+       int flags = O_RDONLY;
+       if (direct)
+               flags |= O_DIRECT;
+
+       return generic_io_path(path, data, size, offset, 0, flags, 0);
+}
+
+static ssize_t pfiled_read_name(struct pfiled *pfiled, char *name, uint32_t namelen, void *data, size_t size, off_t offset)
+{
+       char path[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
+       int r;
+       r = create_path(path, pfiled, name, namelen, 1);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "Could not create path");
+               return -1;
+       }
+       return read_path(path, data, size, offset, pfiled->directio);
+}
+
+static ssize_t write_path(char *path, void *data, size_t size, off_t offset, int direct, int extra_open_flags, mode_t mode)
+{
+       int flags = O_RDWR | extra_open_flags;
+       if (direct)
+               flags |= O_DIRECT;
+       return generic_io_path(path, data, size, offset, 1, flags, mode);
+}
+
+static ssize_t pfiled_write_name(struct pfiled *pfiled, char *name, uint32_t namelen, void *data, size_t size, off_t offset, int extra_open_flags, mode_t mode)
+{
+       char path[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
+       int r;
+       r = create_path(path, pfiled, name, namelen, 1);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "Could not create path");
+               return -1;
+       }
+       return write_path(path, data, size, offset, pfiled->directio, extra_open_flags, mode);
+}
+
 static int is_target_valid_len(struct pfiled *pfiled, char *target,
                uint32_t targetlen, int mode)
 {
@@ -306,7 +621,7 @@ static int is_target_valid(struct pfiled *pfiled, char *target, int mode)
 
 static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetlen)
 {
-       int r, fd;
+       int r, fd, flags;
        char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
        char error_str[1024];
 
@@ -315,8 +630,11 @@ static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetl
                XSEGLOG2(&lc, E, "Could not create path");
                return -1;
        }
+       flags = O_RDWR|O_CREAT;
+       if (pfiled->directio)
+               flags |= O_DIRECT;
        XSEGLOG2(&lc, D, "Opening file %s with O_RDWR|O_CREAT", tmp);
-       fd = open(tmp, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+       fd = open(tmp, flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
        if (fd < 0){
                XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
                return -1;
@@ -326,7 +644,7 @@ static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetl
 
 static int open_file_read(struct pfiled *pfiled, char *target, uint32_t targetlen)
 {
-       int r, fd;
+       int r, fd, flags;
        char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
        char error_str[1024];
 
@@ -336,7 +654,10 @@ static int open_file_read(struct pfiled *pfiled, char *target, uint32_t targetle
                return -1;
        }
        XSEGLOG2(&lc, D, "Opening file %s with O_RDWR", tmp);
-       fd = open(tmp, O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+       flags = O_RDWR;
+       if (pfiled->directio)
+               flags |= O_DIRECT;
+       fd = open(tmp, flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
        if (fd < 0){
                XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
                return -1;
@@ -481,24 +802,16 @@ static void handle_read(struct peerd *peer, struct peer_req *pr)
 
        XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
                        req->size);
-       while (req->serviced < req->size) {
-               XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
-                               req->serviced, req->size);
-               r = pread(fd, data + req->serviced,
-                               req->size- req->serviced,
-                               req->offset + req->serviced);
-               if (r < 0) {
-                       XSEGLOG2(&lc, E, "Cannot read");
-                       break;
-               }
-               else if (r == 0) {
-                       /* reached end of file. zero out the rest data buffer */
-                       memset(data + req->serviced, 0, req->size - req->serviced);
-                       req->serviced = req->size;
-               }
-               else {
-                       req->serviced += r;
-               }
+       r = pfiled_read(pfiled, fd, data, req->size, req->offset);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "Cannot read");
+               req->serviced = 0;
+       } else if (r < req->size) {
+               /* reached end of file. zero out the rest data buffer */
+               memset(data + r, 0, req->size - r);
+               req->serviced = req->size;
+       } else {
+               req->serviced = r;
        }
        XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
                        req->size);
@@ -522,7 +835,8 @@ static void handle_write(struct peerd *peer, struct peer_req *pr)
        struct pfiled *pfiled = __get_pfiled(peer);
        struct fio *fio = __get_fio(pr);
        struct xseg_request *req = pr->req;
-       int r, fd;
+       int fd;
+       ssize_t r;
        char *target = xseg_get_target(peer->xseg, req);
        char *data = xseg_get_data(peer->xseg, req);
 
@@ -556,18 +870,11 @@ static void handle_write(struct peerd *peer, struct peer_req *pr)
 
        XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
                        req->size);
-       while (req->serviced < req->size) {
-               XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
-                               req->serviced, req->size);
-               r = pwrite(fd, data + req->serviced,
-                               req->size- req->serviced,
-                               req->offset + req->serviced);
-               if (r < 0) {
-                       break;
-               }
-               else {
-                       req->serviced += r;
-               }
+       r = pfiled_write(pfiled, fd, data, req->size, req->offset);
+       if (r < 0) {
+               req->serviced = 0;
+       } else {
+               req->serviced = r;
        }
        XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
                        req->size);
@@ -672,17 +979,9 @@ static void handle_copy(struct peerd *peer, struct peer_req *pr)
                goto out;
        }
 
-       r = create_path(buf, pfiled, xcopy->target, xcopy->targetlen, 0);
-       if (r < 0)  {
-               XSEGLOG2(&lc, E, "Create path failed");
-               r = -1;
-               goto out;
-       }
-
-       src = open(buf, O_RDONLY);
+       src = open_file(pfiled, xcopy->target, xcopy->targetlen, READ);
        if (src < 0) {
-               XSEGLOG2(&lc, E, "fail in src %s", buf);
-               r = src;
+               XSEGLOG2(&lc, E, "Failed to open src");
                goto out;
        }
 
@@ -770,105 +1069,63 @@ out:
 }
 
 static int __get_precalculated_hash(struct peerd *peer, char *target,
-               uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
+               uint32_t targetlen, char *hash)
 {
        int ret = -1;
-       int r, fd;
-       uint32_t len, pos;
-       char *hash_file = NULL, *hash_path = NULL;
-       char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
+       int r;
+       uint32_t len, hash_file_len;
+       char *hash_file = NULL;
        struct pfiled *pfiled = __get_pfiled(peer);
 
        XSEGLOG2(&lc, D, "Started.");
 
        hash_file = malloc(MAX_FILENAME_SIZE + 1);
-       hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
-
-       pos = 0;
-       strncpy(hash_file+pos, target, targetlen);
-       pos += targetlen;
-       strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
-       pos += HASH_SUFFIX_LEN;
-       hash_file[pos] = 0;
+       hash_file_len = strjoin(hash_file, target, targetlen, HASH_SUFFIX, HASH_SUFFIX_LEN);
        hash[0] = 0;
 
-       r = create_path(hash_path, pfiled, hash_file, pos, 1);
-       if (r < 0)  {
-               XSEGLOG2(&lc, E, "Create path failed");
-               goto out;
-       }
-
-       fd = open(hash_path, O_RDONLY, S_IRWXU | S_IRUSR);
-       if (fd < 0) {
+       r = pfiled_read_name(pfiled, hash_file, hash_file_len, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
+       if (r < 0) {
                if (errno != ENOENT){
-                       XSEGLOG2(&lc, E, "Error opening %s", hash_path);
+                       XSEGLOG2(&lc, E, "Error opening %s", hash_file);
                } else {
                        XSEGLOG2(&lc, I, "No precalculated hash for %s", hash_file);
                        ret = 0;
                }
                goto out;
        }
-
-       r = pread(fd, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
-       if (r < 0) {
-               XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
-               close(fd);
-               goto out;
-       }
        len = (uint32_t)r;
-
        XSEGLOG2(&lc, D, "Read %u bytes", len);
 
-       r = close(fd);
-       if (r < 0) {
-               XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
-               goto out;
-       }
-
        if (len == HEXLIFIED_SHA256_DIGEST_SIZE){
-               strncpy(hash, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE);
                hash[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
                XSEGLOG2(&lc, D, "Found hash for %s : %s", hash_file, hash);
                ret = 0;
        }
 out:
-       free(hash_path);
+       free(hash_file);
        XSEGLOG2(&lc, D, "Finished.");
        return ret;
 }
 
 static int __set_precalculated_hash(struct peerd *peer, char *target,
-               uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
+               uint32_t targetlen, char *hash)
 {
        int ret = -1;
-       int r, fd;
-       uint32_t len, pos;
-       char *hash_file = NULL, *hash_path = NULL;
-       char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
+       int r;
+       uint32_t len, hash_file_len;
+       char *hash_file = NULL;
        struct pfiled *pfiled = __get_pfiled(peer);
 
        XSEGLOG2(&lc, D, "Started.");
 
        hash_file = malloc(MAX_FILENAME_SIZE + 1);
-       hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
-
-       pos = 0;
-       strncpy(hash_file+pos, target, targetlen);
-       pos += targetlen;
-       strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
-       pos += HASH_SUFFIX_LEN;
-       hash_file[pos] = 0;
+       hash_file_len = strjoin(hash_file, target, targetlen, HASH_SUFFIX, HASH_SUFFIX_LEN);
 
-       r = create_path(hash_path, pfiled, hash_file, pos, 1);
-       if (r < 0)  {
-               XSEGLOG2(&lc, E, "Create path failed");
-               goto out;
-       }
-
-       fd = open(hash_path, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
-       if (fd < 0) {
-               if (errno != ENOENT){
-                       XSEGLOG2(&lc, E, "Error opening %s", hash_path);
+       r = pfiled_write_name(pfiled, hash_file, hash_file_len, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0,
+                       O_CREAT|O_EXCL, S_IWUSR|S_IRUSR);
+       if (r < 0) {
+               if (errno != EEXIST){
+                       XSEGLOG2(&lc, E, "Error opening %s", hash_file);
                } else {
                        XSEGLOG2(&lc, I, "Hash file already exists %s", hash_file);
                        ret = 0;
@@ -876,24 +1133,11 @@ static int __set_precalculated_hash(struct peerd *peer, char *target,
                goto out;
        }
 
-       r = pwrite(fd, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
-       if (r < 0) {
-               XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
-               close(fd);
-               goto out;
-       }
        len = (uint32_t)r;
-
        XSEGLOG2(&lc, D, "Wrote %u bytes", len);
-
-       r = close(fd);
-       if (r < 0) {
-               XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
-               goto out;
-       }
-
+       ret = 0;
 out:
-       free(hash_path);
+       free(hash_file);
        XSEGLOG2(&lc, D, "Finished.");
        return ret;
 }
@@ -907,15 +1151,17 @@ static void handle_hash(struct peerd *peer, struct peer_req *pr)
        //write to hash_tmpfile
        //link file
 
-       int src = -1, dst = -1, r = -1, pos;
+       int len;
+       int src = -1, dst = -1, r = -1;
        ssize_t c;
-       uint64_t sum, written, trailing_zeros;
+       uint64_t sum, trailing_zeros;
        struct pfiled *pfiled = __get_pfiled(peer);
        struct fio *fio = __get_fio(pr);
        struct xseg_request *req = pr->req;
        char *pathname = NULL, *tmpfile_pathname = NULL, *tmpfile = NULL;
        char *target;
-       char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
+//     char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
+       char *hash_name;
        char name[XSEG_MAX_TARGETLEN + 1];
 
        unsigned char *object_data = NULL;
@@ -939,6 +1185,8 @@ static void handle_hash(struct peerd *peer, struct peer_req *pr)
                goto out;
        }
 
+       r = posix_memalign(&hash_name, 512, 512 + 1);
+
        r = __get_precalculated_hash(peer, target, req->targetlen, hash_name);
        if (r < 0) {
                XSEGLOG2(&lc, E, "Error getting precalculated hash");
@@ -956,7 +1204,8 @@ static void handle_hash(struct peerd *peer, struct peer_req *pr)
        name[req->targetlen] = 0;
 
        pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
-       object_data = malloc(sizeof(char) * req->size);
+       //object_data = malloc(sizeof(char) * req->size);
+       r = posix_memalign(&object_data, 512, sizeof(char) * req->size);
        if (!pathname || !object_data){
                XSEGLOG2(&lc, E, "Out of memory");
                goto out;
@@ -969,19 +1218,13 @@ static void handle_hash(struct peerd *peer, struct peer_req *pr)
                goto out;
        }
 
-       sum = 0;
-       while (sum < req->size) {
-               c = pread(src, object_data + sum, req->size - sum, sum);
-               if (c < 0) {
-                       XSEGLOG2(&lc, E, "Error reading from source");
-                       r = -1;
-                       goto out;
-               }
-               if (c == 0) {
-                       break;
-               }
-               sum += c;
+       c = pfiled_read(pfiled, src, object_data, req->size, req->offset);
+       if (c < 0) {
+               XSEGLOG2(&lc, E, "Error reading from source");
+               r = -1;
+               goto out;
        }
+       sum = c;
 
        //rstrip here in case zeros were written in the end
        trailing_zeros = 0;
@@ -1001,15 +1244,14 @@ static void handle_hash(struct peerd *peer, struct peer_req *pr)
 
 
        r = create_path(pathname, pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, 1);
-       if (r < 0)  {
+       if (r < 0) {
                XSEGLOG2(&lc, E, "Create path failed");
                r = -1;
                goto out;
        }
 
 
-
-       dst = open(pathname, O_WRONLY);
+       dst = open_file(pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, READ);
        if (dst > 0) {
                XSEGLOG2(&lc, I, "%s already exists, no write needed", pathname);
                req->serviced = req->size;
@@ -1031,28 +1273,13 @@ static void handle_hash(struct peerd *peer, struct peer_req *pr)
                goto out;
        }
 
-       pos = 0;
-       strncpy(tmpfile + pos, target, req->targetlen);
-       pos += req->targetlen;
-       strncpy(tmpfile + pos, SNAP_SUFFIX, SNAP_SUFFIX_LEN);
-       pos += SNAP_SUFFIX_LEN;
-       strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
-       pos += pfiled->uniquestr_len;
-       strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
-       pos += FIO_STR_ID_LEN;
-       tmpfile[pos] = 0;
-
-       r = create_path(tmpfile_pathname, pfiled, tmpfile, pos, 1);
-       if (r < 0)  {
-               XSEGLOG2(&lc, E, "Create path failed");
-               r = -1;
-               goto out;
-       }
+       len = strnjoin(tmpfile, 4, target, req->targetlen,
+                               HASH_SUFFIX, HASH_SUFFIX_LEN,
+                               pfiled->uniquestr, pfiled->uniquestr_len,
+                               fio->str_id, FIO_STR_ID_LEN);
 
-       XSEGLOG2(&lc, D, "Opening %s", tmpfile_pathname);
-       dst = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL,
-                       S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
-       if (dst < 0) {
+       r = pfiled_write_name(pfiled, tmpfile, len, object_data, sum, 0, O_CREAT|O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+       if (r < 0) {
                if (errno != EEXIST){
                        char error_str[1024];
                        XSEGLOG2(&lc, E, "Error opening %s (%s)", tmpfile_pathname, strerror_r(errno, error_str, 1023));
@@ -1062,18 +1289,18 @@ static void handle_hash(struct peerd *peer, struct peer_req *pr)
                }
                r = -1;
                goto out;
+       } else if (r < sum) {
+               XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
+               r = -1;
+               goto out_unlink;
        }
-       XSEGLOG2(&lc, D, "Opened %s", tmpfile_pathname);
+       XSEGLOG2(&lc, D, "Opened %s and wrote", tmpfile);
 
-       written = 0;
-       while (written < sum) {
-               c = write(dst, object_data + written, sum - written);
-               if (c < 0) {
-                       XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
-                       r = -1;
-                       goto out_unlink;
-               }
-               written += c;
+       r = create_path(tmpfile_pathname, pfiled, tmpfile, len, 1);
+       if (r < 0)  {
+               XSEGLOG2(&lc, E, "Create path failed");
+               r = -1;
+               goto out;
        }
 
        r = link(tmpfile_pathname, pathname);
@@ -1135,16 +1362,16 @@ out_unlink:
        goto out;
 }
 
-static int __locked_by(char *lockfile, char *expected, uint32_t expected_len)
+static int __locked_by(char *lockfile, char *expected, uint32_t expected_len, int direct)
 {
        int ret = -1;
-       int r, fd;
+       int r;
        uint32_t len;
        char tmpbuf[MAX_UNIQUESTR_LEN];
 
        XSEGLOG2(&lc, D, "Started. Lockfile: %s, expected: %s, expected_len: %u", lockfile, expected, expected_len);
-       fd = open(lockfile, O_RDONLY, S_IRWXU | S_IRUSR);
-       if (fd < 0) {
+       r = read_path(lockfile, tmpbuf, MAX_UNIQUESTR_LEN, 0, direct);
+       if (r < 0) {
                if (errno != ENOENT){
                        XSEGLOG2(&lc, E, "Error opening %s", lockfile);
                } else {
@@ -1154,20 +1381,9 @@ static int __locked_by(char *lockfile, char *expected, uint32_t expected_len)
                }
                goto out;
        }
-       r = pread(fd, tmpbuf, MAX_UNIQUESTR_LEN, 0);
-       if (r < 0) {
-               XSEGLOG2(&lc, E, "Error reading from %s", lockfile);
-               close(fd);
-               goto out;
-       }
        len = (uint32_t)r;
        XSEGLOG2(&lc, D, "Read %u bytes", len);
-       r = close(fd);
-       if (r < 0) {
-               XSEGLOG2(&lc, E, "Could not close lockfile %s", lockfile);
-               goto out;
-       }
-       if (len == expected_len && !strncmp(tmpbuf, expected, expected_len)){
+       if (!strncmp(tmpbuf, expected, expected_len)){
                XSEGLOG2(&lc, D, "Lock file %s locked by us.", lockfile);
                ret = 0;
        }
@@ -1179,10 +1395,11 @@ out:
 static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
                        uint32_t flags, int fd)
 {
-       int r;
+       int r, direct;
        XSEGLOG2(&lc, D, "Started. Lockfile: %s, Tmpfile:%s", lockfile, tmpfile);
-       r = pwrite(fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
-       if (r < 0) {
+
+       r = pfiled_write(pfiled, fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
+       if (r < 0 || r < pfiled->uniquestr_len) {
                return -1;
        }
        r = fsync(fd);
@@ -1190,6 +1407,9 @@ static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
                return -1;
        }
 
+       direct = pfiled->directio;
+//     direct = 0;
+
        while (link(tmpfile, lockfile) < 0) {
                //actual error
                if (errno != EEXIST){
@@ -1197,7 +1417,7 @@ static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
                                        tmpfile, lockfile);
                        return -1;
                }
-               r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len);
+               r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len, direct);
                if (!r) {
                        break;
                }
@@ -1222,7 +1442,7 @@ static void handle_acquire(struct peerd *peer, struct peer_req *pr)
        char *tmpfile = malloc(MAX_FILENAME_SIZE);
        char *lockfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
        char *tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
-       int fd = -1, pos;
+       int fd = -1, flags;
        char *target = xseg_get_target(peer->xseg, req);
        uint32_t buf_len, tmpfile_len;
 
@@ -1239,26 +1459,14 @@ static void handle_acquire(struct peerd *peer, struct peer_req *pr)
        }
 
 
-       pos = 0;
-       strncpy(buf + pos, target, req->targetlen);
-       pos = req->targetlen;
-       strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
-       pos += LOCK_SUFFIX_LEN;
-       buf[pos] = 0;
-       buf_len = pos;
+       buf_len = strjoin(buf, target, req->targetlen, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
 
        XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
 
 
-       pos = 0;
-       strncpy(tmpfile + pos, buf, buf_len);
-       pos += buf_len;
-       strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
-       pos += pfiled->uniquestr_len;
-       strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
-       pos += FIO_STR_ID_LEN;
-       tmpfile[pos] = 0;
-       tmpfile_len = pos;
+       tmpfile_len = strnjoin(tmpfile, 3, buf, buf_len,
+                               pfiled->uniquestr, pfiled->uniquestr_len,
+                               fio->str_id, FIO_STR_ID_LEN);
 
        XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
 
@@ -1284,7 +1492,10 @@ static void handle_acquire(struct peerd *peer, struct peer_req *pr)
 
        //nfs v >= 3
        XSEGLOG2(&lc, D, "Tmpfile: %s", tmpfile_pathname);
-       fd = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
+       flags = O_RDWR|O_CREAT|O_EXCL;
+       if (pfiled->directio)
+               flags |= O_DIRECT;
+       fd = open(tmpfile_pathname, flags, S_IRWXU | S_IRUSR);
        if (fd < 0) {
                //actual error
                if (errno != EEXIST){
@@ -1340,7 +1551,7 @@ static void handle_release(struct peerd *peer, struct peer_req *pr)
        char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
        char *tmpbuf = malloc(MAX_UNIQUESTR_LEN + 1);
        char *target = xseg_get_target(peer->xseg, req);
-       int r, pos;
+       int r, buf_len, direct;
 
        if (!buf || !pathname) {
                XSEGLOG2(&lc, E, "Out of memory");
@@ -1354,24 +1565,20 @@ static void handle_release(struct peerd *peer, struct peer_req *pr)
                goto out;
        }
 
-       pos = 0;
-       strncpy(buf + pos, target, req->targetlen);
-       pos += req->targetlen;
-       strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
-       pos += LOCK_SUFFIX_LEN;
-       buf[pos] = 0;
+       buf_len = strnjoin(buf, 2 , target, req->targetlen, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
 
        XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
 
-       r = create_path(pathname, pfiled, buf,
-                       req->targetlen + strlen(LOCK_SUFFIX), 0);
+       r = create_path(pathname, pfiled, buf, buf_len, 0);
        if (r < 0) {
                XSEGLOG2(&lc, E, "Create path failed for %s", buf);
                goto out;
        }
 
+       direct = pfiled->directio;
+
        if ((req->flags & XF_FORCE) || !__locked_by(pathname, pfiled->uniquestr,
-                                               pfiled->uniquestr_len)) {
+                                               pfiled->uniquestr_len, direct)) {
                r = unlink(pathname);
                if (r < 0) {
                        XSEGLOG2(&lc, E, "Could not unlink %s", pathname);
@@ -1480,6 +1687,7 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
        READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
        READ_ARG_STRING("--uniquestr", pfiled->uniquestr, MAX_UNIQUESTR_LEN);
+       READ_ARG_BOOL("--directio", pfiled->directio);
        END_READ_ARGS();
 
        pfiled->uniquestr_len = strlen(pfiled->uniquestr);
index 6969f90..4f2497e 100755 (executable)
@@ -281,12 +281,13 @@ class Sosd(MTpeer):
 
 class Filed(MTpeer):
     def __init__(self, archip_dir=None, prefix=None, fdcache=None,
-                 unique_str=None, nr_threads=1, nr_ops=16, **kwargs):
+                 unique_str=None, nr_threads=1, nr_ops=16, direct=True, **kwargs):
         self.executable = FILE_BLOCKER
         self.archip_dir = archip_dir
         self.prefix = prefix
         self.fdcache = fdcache
         self.unique_str = unique_str
+        self.direct = direct
         nr_threads = nr_ops
         if self.fdcache and fdcache < 2*nr_threads:
             raise Error("Fdcache should be greater than 2*nr_threads")
@@ -319,6 +320,8 @@ class Filed(MTpeer):
         if self.prefix:
             self.cli_opts.append("--prefix")
             self.cli_opts.append(self.prefix)
+        if self.direct:
+            self.cli_opts.append("--directio")
 
 
 class Mapperd(Peer):