#define _GNU_SOURCE
#include <stdio.h>
+#include <stdarg.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
uint32_t prefix_len;
uint32_t uniquestr_len;
long maxfds;
+ uint32_t directio;
char vpath[MAX_PATH_SIZE + 1];
char prefix[MAX_PREFIX_LEN + 1];
char uniquestr[MAX_UNIQUESTR_LEN + 1];
return;
}
+static int strnjoin(char *dest, int n, ...)
+{
+ int pos, i;
+ va_list ap;
+ char *s;
+ int l;
+
+ pos = 0;
+ va_start(ap, n);
+ for (i = 0; i < n; i++) {
+ s = va_arg(ap, char *);
+ l = va_arg(ap, int);
+ strncpy(dest + pos, s, l);
+ pos += l;
+ }
+ dest[pos] = 0;
+ va_end(ap);
+
+ return pos;
+}
+
+static int strjoin(char *dest, char *f, int f_len, char *s, int s_len)
+{
+ int pos;
+
+ pos = 0;
+ strncpy(dest + pos, f, f_len);
+ pos += f_len;
+ strncpy(dest + pos, s, s_len);
+ pos += s_len;
+ dest[pos] = 0;
+
+ return f_len + s_len;
+}
+
static int create_path(char *buf, struct pfiled *pfiled, char *target,
uint32_t targetlen, int mkdirs)
{
return 0;
}
+
+static ssize_t persisting_read(int fd, void *data, size_t size, off_t offset)
+{
+ ssize_t r = 0, sum = 0;
+ char error_str[1024];
+ XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
+
+ while (sum < size) {
+ XSEGLOG2(&lc, D, "read: %llu, (aligned)size: %llu", sum, size);
+ r = pread(fd, (char *)data + sum, size - sum, offset + sum);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "fd: %d, Error: %s", fd, strerror_r(errno, error_str, 1023));
+ break;
+ } else if (r == 0) {
+ break;
+ } else {
+ sum += r;
+ }
+ }
+ XSEGLOG2(&lc, D, "read: %llu, (aligned)size: %llu", sum, size);
+
+ if (sum == 0 && r < 0) {
+ sum = r;
+ }
+ XSEGLOG2(&lc, D, "Finished. Read %d, r = %d", sum, r);
+
+ return sum;
+}
+
+static ssize_t persisting_write(int fd, void *data, size_t size, off_t offset)
+{
+ ssize_t r = 0, sum = 0;
+
+ XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
+ while (sum < size) {
+ XSEGLOG2(&lc, D, "written: %llu, (aligned)size: %llu", sum, size);
+ r = pwrite(fd, (char *)data + sum, size - sum, offset + sum);
+ if (r < 0) {
+ break;
+ } else {
+ sum += r;
+ }
+ }
+ XSEGLOG2(&lc, D, "written: %llu, (aligned)size: %llu", sum, size);
+
+ if (sum == 0 && r < 0) {
+ sum = r;
+ }
+ XSEGLOG2(&lc, D, "Finished. Wrote %d, r = %d", sum, r);
+
+ return sum;
+}
+
+static ssize_t aligned_read(int fd, void *data, ssize_t size, off_t offset, int alignment)
+{
+ char *tmp_data;
+ ssize_t r;
+ size_t misaligned_data, misaligned_size, misaligned_offset;
+ off_t aligned_offset=offset;
+ size_t aligned_size=size;
+
+ misaligned_data = (unsigned long)data % alignment;
+ misaligned_size = size % alignment;
+ misaligned_offset = offset % alignment;
+ XSEGLOG2(&lc, D, "misaligned_data: %u, misaligned_size: %u, misaligned_offset: %u", misaligned_data, misaligned_size, misaligned_offset);
+ if (misaligned_data || misaligned_size || misaligned_offset) {
+ aligned_offset = offset - misaligned_offset;
+ aligned_size = size + misaligned_offset;
+
+ misaligned_size = aligned_size % alignment;
+ aligned_size = aligned_size - misaligned_size + alignment;
+ r = posix_memalign(&tmp_data, alignment, aligned_size);
+ if (r < 0) {
+ return -1;
+ }
+ } else {
+ tmp_data = data;
+ aligned_offset = offset;
+ aligned_size = size;
+ }
+
+ XSEGLOG2(&lc, D, "aligned_data: %u, aligned_size: %u, aligned_offset: %u", tmp_data, aligned_size, aligned_offset);
+ r = persisting_read(fd, tmp_data, aligned_size, aligned_offset);
+
+ //FIXME if r < size ?
+ if (tmp_data != data) {
+ memcpy(data, tmp_data + misaligned_offset, size);
+ free(tmp_data);
+ }
+ if (r >= size)
+ r = size;
+ return r;
+}
+
+pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
+
+int __fcntl_lock(int fd, off_t start, off_t len)
+{
+ return pthread_mutex_lock(&m);
+}
+
+int __fcntl_unlock(int fd, off_t start, off_t len)
+{
+ return pthread_mutex_unlock(&m);
+}
+
+static ssize_t aligned_write(int fd, void *data, size_t size, off_t offset, int alignment)
+{
+ int locked = 0;
+ char *tmp_data;
+ ssize_t r;
+ size_t misaligned_data, misaligned_size, misaligned_offset;
+ size_t aligned_size = size, aligned_offset = offset, read_size;
+ misaligned_data = (unsigned long)data % alignment;
+ misaligned_size = size % alignment;
+ misaligned_offset = offset % alignment;
+ if (misaligned_data || misaligned_size || misaligned_offset) {
+ //if somthing is misaligned then:
+ //
+ // First check if the offset was missaligned.
+ aligned_offset = offset - misaligned_offset;
+
+ // Then adjust the size with the misaligned offset and check if
+ // it remains misaligned.
+ aligned_size = size + misaligned_offset;
+ misaligned_size = aligned_size % alignment;
+
+ // in case there is no misaligned_size
+ if (misaligned_size)
+ aligned_size = aligned_size + alignment - misaligned_size;
+
+ // Allocate aligned memory
+ r = posix_memalign(&tmp_data, alignment, aligned_size);
+ if (r < 0) {
+ return -1;
+ }
+
+ XSEGLOG2(&lc, D, "fd: %d, misaligned_data: %u, misaligned_size: %u, misaligned_offset: %u", fd, misaligned_data, misaligned_size, misaligned_offset);
+ XSEGLOG2(&lc, D, "fd: %d, aligned_data: %u, aligned_size: %u, aligned_offset: %u", fd, tmp_data, aligned_size, aligned_offset);
+ XSEGLOG2(&lc, D, "fd: %d, locking from %u to %u", fd, aligned_offset, aligned_offset + aligned_size);
+ __fcntl_lock(fd, aligned_offset, aligned_size + alignment - misaligned_size);
+ locked = 1;
+
+ if (misaligned_offset) {
+ XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
+ /* read misaligned_offset */
+ read_size = alignment;
+ r = persisting_read(fd, tmp_data, alignment, aligned_offset);
+ if (r < 0) {
+ free(tmp_data);
+ return -1;
+ } else if (r != read_size) {
+ memset(tmp_data + r, 0, read_size - r);
+ }
+ }
+
+ if (misaligned_size) {
+ read_size = alignment;
+ r = persisting_read(fd, tmp_data + aligned_size - alignment, alignment,
+ aligned_offset + aligned_size - alignment);
+ if (r < 0) {
+ free(tmp_data);
+ return -1;
+ } else if (r != read_size) {
+ memset(tmp_data + aligned_size - alignment + r, 0, read_size - r);
+ }
+ }
+ memcpy(tmp_data + misaligned_offset, data, size);
+ } else {
+ aligned_size = size;
+ aligned_offset = offset;
+ tmp_data = data;
+ }
+
+ r = persisting_write(fd, tmp_data, aligned_size, aligned_offset);
+
+ if (locked) {
+ XSEGLOG2(&lc, D, "fd: %d, unlocking from %u to %u", fd, aligned_offset, aligned_offset + aligned_size);
+ __fcntl_unlock(fd, aligned_offset, aligned_size + alignment - misaligned_size);
+ }
+ if (tmp_data != data) {
+ free(tmp_data);
+ }
+
+ if (r >= size)
+ r = size;
+ return r;
+}
+
+static ssize_t filed_write(int fd, void *data, size_t size, off_t offset, int direct)
+{
+ if (direct)
+ return aligned_write(fd, data, size, offset, 512);
+ else
+ return persisting_write(fd, data, size, offset);
+}
+
+static ssize_t filed_read(int fd, void *data, size_t size, off_t offset, int direct)
+{
+ if (direct)
+ return aligned_read(fd, data, size, offset, 512);
+ else
+ return persisting_read(fd, data, size, offset);
+}
+
+static ssize_t pfiled_read(struct pfiled *pfiled, int fd, void *data, size_t size, off_t offset)
+{
+ return filed_read(fd, data, size, offset, pfiled->directio);
+}
+
+static ssize_t pfiled_write(struct pfiled *pfiled, int fd, void *data, size_t size, off_t offset)
+{
+ return filed_write(fd, data, size, offset, pfiled->directio);
+}
+
+static ssize_t generic_io_path(char *path, void *data, size_t size, off_t offset, int write, int flags, mode_t mode)
+{
+ int fd;
+ ssize_t r;
+
+ fd = open(path, flags, mode);
+ if (fd < 0) {
+ return -1;
+ }
+ XSEGLOG2(&lc, D, "Opened file %s as fd %d", path, fd);
+
+ if (write) {
+ r = filed_write(fd, data, size, offset, flags & O_DIRECT);
+ } else {
+ r = filed_read(fd, data, size, offset, flags & O_DIRECT);
+ }
+
+ close(fd);
+
+ return r;
+}
+
+static ssize_t read_path(char *path, void *data, size_t size, off_t offset, int direct)
+{
+ int flags = O_RDONLY;
+ if (direct)
+ flags |= O_DIRECT;
+
+ return generic_io_path(path, data, size, offset, 0, flags, 0);
+}
+
+static ssize_t pfiled_read_name(struct pfiled *pfiled, char *name, uint32_t namelen, void *data, size_t size, off_t offset)
+{
+ char path[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
+ int r;
+ r = create_path(path, pfiled, name, namelen, 1);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Could not create path");
+ return -1;
+ }
+ return read_path(path, data, size, offset, pfiled->directio);
+}
+
+static ssize_t write_path(char *path, void *data, size_t size, off_t offset, int direct, int extra_open_flags, mode_t mode)
+{
+ int flags = O_RDWR | extra_open_flags;
+ if (direct)
+ flags |= O_DIRECT;
+ return generic_io_path(path, data, size, offset, 1, flags, mode);
+}
+
+static ssize_t pfiled_write_name(struct pfiled *pfiled, char *name, uint32_t namelen, void *data, size_t size, off_t offset, int extra_open_flags, mode_t mode)
+{
+ char path[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
+ int r;
+ r = create_path(path, pfiled, name, namelen, 1);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Could not create path");
+ return -1;
+ }
+ return write_path(path, data, size, offset, pfiled->directio, extra_open_flags, mode);
+}
+
static int is_target_valid_len(struct pfiled *pfiled, char *target,
uint32_t targetlen, int mode)
{
static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetlen)
{
- int r, fd;
+ int r, fd, flags;
char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
char error_str[1024];
XSEGLOG2(&lc, E, "Could not create path");
return -1;
}
+ flags = O_RDWR|O_CREAT;
+ if (pfiled->directio)
+ flags |= O_DIRECT;
XSEGLOG2(&lc, D, "Opening file %s with O_RDWR|O_CREAT", tmp);
- fd = open(tmp, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+ fd = open(tmp, flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0){
XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
return -1;
static int open_file_read(struct pfiled *pfiled, char *target, uint32_t targetlen)
{
- int r, fd;
+ int r, fd, flags;
char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
char error_str[1024];
return -1;
}
XSEGLOG2(&lc, D, "Opening file %s with O_RDWR", tmp);
- fd = open(tmp, O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+ flags = O_RDWR;
+ if (pfiled->directio)
+ flags |= O_DIRECT;
+ fd = open(tmp, flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if (fd < 0){
XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
return -1;
XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
req->size);
- while (req->serviced < req->size) {
- XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
- req->serviced, req->size);
- r = pread(fd, data + req->serviced,
- req->size- req->serviced,
- req->offset + req->serviced);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Cannot read");
- break;
- }
- else if (r == 0) {
- /* reached end of file. zero out the rest data buffer */
- memset(data + req->serviced, 0, req->size - req->serviced);
- req->serviced = req->size;
- }
- else {
- req->serviced += r;
- }
+ r = pfiled_read(pfiled, fd, data, req->size, req->offset);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Cannot read");
+ req->serviced = 0;
+ } else if (r < req->size) {
+ /* reached end of file. zero out the rest data buffer */
+ memset(data + r, 0, req->size - r);
+ req->serviced = req->size;
+ } else {
+ req->serviced = r;
}
XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
req->size);
struct pfiled *pfiled = __get_pfiled(peer);
struct fio *fio = __get_fio(pr);
struct xseg_request *req = pr->req;
- int r, fd;
+ int fd;
+ ssize_t r;
char *target = xseg_get_target(peer->xseg, req);
char *data = xseg_get_data(peer->xseg, req);
XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
req->size);
- while (req->serviced < req->size) {
- XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
- req->serviced, req->size);
- r = pwrite(fd, data + req->serviced,
- req->size- req->serviced,
- req->offset + req->serviced);
- if (r < 0) {
- break;
- }
- else {
- req->serviced += r;
- }
+ r = pfiled_write(pfiled, fd, data, req->size, req->offset);
+ if (r < 0) {
+ req->serviced = 0;
+ } else {
+ req->serviced = r;
}
XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
req->size);
goto out;
}
- r = create_path(buf, pfiled, xcopy->target, xcopy->targetlen, 0);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Create path failed");
- r = -1;
- goto out;
- }
-
- src = open(buf, O_RDONLY);
+ src = open_file(pfiled, xcopy->target, xcopy->targetlen, READ);
if (src < 0) {
- XSEGLOG2(&lc, E, "fail in src %s", buf);
- r = src;
+ XSEGLOG2(&lc, E, "Failed to open src");
goto out;
}
}
static int __get_precalculated_hash(struct peerd *peer, char *target,
- uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
+ uint32_t targetlen, char *hash)
{
int ret = -1;
- int r, fd;
- uint32_t len, pos;
- char *hash_file = NULL, *hash_path = NULL;
- char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
+ int r;
+ uint32_t len, hash_file_len;
+ char *hash_file = NULL;
struct pfiled *pfiled = __get_pfiled(peer);
XSEGLOG2(&lc, D, "Started.");
hash_file = malloc(MAX_FILENAME_SIZE + 1);
- hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
-
- pos = 0;
- strncpy(hash_file+pos, target, targetlen);
- pos += targetlen;
- strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
- pos += HASH_SUFFIX_LEN;
- hash_file[pos] = 0;
+ hash_file_len = strjoin(hash_file, target, targetlen, HASH_SUFFIX, HASH_SUFFIX_LEN);
hash[0] = 0;
- r = create_path(hash_path, pfiled, hash_file, pos, 1);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Create path failed");
- goto out;
- }
-
- fd = open(hash_path, O_RDONLY, S_IRWXU | S_IRUSR);
- if (fd < 0) {
+ r = pfiled_read_name(pfiled, hash_file, hash_file_len, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
+ if (r < 0) {
if (errno != ENOENT){
- XSEGLOG2(&lc, E, "Error opening %s", hash_path);
+ XSEGLOG2(&lc, E, "Error opening %s", hash_file);
} else {
XSEGLOG2(&lc, I, "No precalculated hash for %s", hash_file);
ret = 0;
}
goto out;
}
-
- r = pread(fd, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
- close(fd);
- goto out;
- }
len = (uint32_t)r;
-
XSEGLOG2(&lc, D, "Read %u bytes", len);
- r = close(fd);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
- goto out;
- }
-
if (len == HEXLIFIED_SHA256_DIGEST_SIZE){
- strncpy(hash, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE);
hash[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
XSEGLOG2(&lc, D, "Found hash for %s : %s", hash_file, hash);
ret = 0;
}
out:
- free(hash_path);
+ free(hash_file);
XSEGLOG2(&lc, D, "Finished.");
return ret;
}
static int __set_precalculated_hash(struct peerd *peer, char *target,
- uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
+ uint32_t targetlen, char *hash)
{
int ret = -1;
- int r, fd;
- uint32_t len, pos;
- char *hash_file = NULL, *hash_path = NULL;
- char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
+ int r;
+ uint32_t len, hash_file_len;
+ char *hash_file = NULL;
struct pfiled *pfiled = __get_pfiled(peer);
XSEGLOG2(&lc, D, "Started.");
hash_file = malloc(MAX_FILENAME_SIZE + 1);
- hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
-
- pos = 0;
- strncpy(hash_file+pos, target, targetlen);
- pos += targetlen;
- strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
- pos += HASH_SUFFIX_LEN;
- hash_file[pos] = 0;
+ hash_file_len = strjoin(hash_file, target, targetlen, HASH_SUFFIX, HASH_SUFFIX_LEN);
- r = create_path(hash_path, pfiled, hash_file, pos, 1);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Create path failed");
- goto out;
- }
-
- fd = open(hash_path, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
- if (fd < 0) {
- if (errno != ENOENT){
- XSEGLOG2(&lc, E, "Error opening %s", hash_path);
+ r = pfiled_write_name(pfiled, hash_file, hash_file_len, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0,
+ O_CREAT|O_EXCL, S_IWUSR|S_IRUSR);
+ if (r < 0) {
+ if (errno != EEXIST){
+ XSEGLOG2(&lc, E, "Error opening %s", hash_file);
} else {
XSEGLOG2(&lc, I, "Hash file already exists %s", hash_file);
ret = 0;
goto out;
}
- r = pwrite(fd, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
- close(fd);
- goto out;
- }
len = (uint32_t)r;
-
XSEGLOG2(&lc, D, "Wrote %u bytes", len);
-
- r = close(fd);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
- goto out;
- }
-
+ ret = 0;
out:
- free(hash_path);
+ free(hash_file);
XSEGLOG2(&lc, D, "Finished.");
return ret;
}
//write to hash_tmpfile
//link file
- int src = -1, dst = -1, r = -1, pos;
+ int len;
+ int src = -1, dst = -1, r = -1;
ssize_t c;
- uint64_t sum, written, trailing_zeros;
+ uint64_t sum, trailing_zeros;
struct pfiled *pfiled = __get_pfiled(peer);
struct fio *fio = __get_fio(pr);
struct xseg_request *req = pr->req;
char *pathname = NULL, *tmpfile_pathname = NULL, *tmpfile = NULL;
char *target;
- char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
+// char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
+ char *hash_name;
char name[XSEG_MAX_TARGETLEN + 1];
unsigned char *object_data = NULL;
goto out;
}
+ r = posix_memalign(&hash_name, 512, 512 + 1);
+
r = __get_precalculated_hash(peer, target, req->targetlen, hash_name);
if (r < 0) {
XSEGLOG2(&lc, E, "Error getting precalculated hash");
name[req->targetlen] = 0;
pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
- object_data = malloc(sizeof(char) * req->size);
+ //object_data = malloc(sizeof(char) * req->size);
+ r = posix_memalign(&object_data, 512, sizeof(char) * req->size);
if (!pathname || !object_data){
XSEGLOG2(&lc, E, "Out of memory");
goto out;
goto out;
}
- sum = 0;
- while (sum < req->size) {
- c = pread(src, object_data + sum, req->size - sum, sum);
- if (c < 0) {
- XSEGLOG2(&lc, E, "Error reading from source");
- r = -1;
- goto out;
- }
- if (c == 0) {
- break;
- }
- sum += c;
+ c = pfiled_read(pfiled, src, object_data, req->size, req->offset);
+ if (c < 0) {
+ XSEGLOG2(&lc, E, "Error reading from source");
+ r = -1;
+ goto out;
}
+ sum = c;
//rstrip here in case zeros were written in the end
trailing_zeros = 0;
r = create_path(pathname, pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, 1);
- if (r < 0) {
+ if (r < 0) {
XSEGLOG2(&lc, E, "Create path failed");
r = -1;
goto out;
}
-
- dst = open(pathname, O_WRONLY);
+ dst = open_file(pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, READ);
if (dst > 0) {
XSEGLOG2(&lc, I, "%s already exists, no write needed", pathname);
req->serviced = req->size;
goto out;
}
- pos = 0;
- strncpy(tmpfile + pos, target, req->targetlen);
- pos += req->targetlen;
- strncpy(tmpfile + pos, SNAP_SUFFIX, SNAP_SUFFIX_LEN);
- pos += SNAP_SUFFIX_LEN;
- strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
- pos += pfiled->uniquestr_len;
- strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
- pos += FIO_STR_ID_LEN;
- tmpfile[pos] = 0;
-
- r = create_path(tmpfile_pathname, pfiled, tmpfile, pos, 1);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Create path failed");
- r = -1;
- goto out;
- }
+ len = strnjoin(tmpfile, 4, target, req->targetlen,
+ HASH_SUFFIX, HASH_SUFFIX_LEN,
+ pfiled->uniquestr, pfiled->uniquestr_len,
+ fio->str_id, FIO_STR_ID_LEN);
- XSEGLOG2(&lc, D, "Opening %s", tmpfile_pathname);
- dst = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
- if (dst < 0) {
+ r = pfiled_write_name(pfiled, tmpfile, len, object_data, sum, 0, O_CREAT|O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
+ if (r < 0) {
if (errno != EEXIST){
char error_str[1024];
XSEGLOG2(&lc, E, "Error opening %s (%s)", tmpfile_pathname, strerror_r(errno, error_str, 1023));
}
r = -1;
goto out;
+ } else if (r < sum) {
+ XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
+ r = -1;
+ goto out_unlink;
}
- XSEGLOG2(&lc, D, "Opened %s", tmpfile_pathname);
+ XSEGLOG2(&lc, D, "Opened %s and wrote", tmpfile);
- written = 0;
- while (written < sum) {
- c = write(dst, object_data + written, sum - written);
- if (c < 0) {
- XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
- r = -1;
- goto out_unlink;
- }
- written += c;
+ r = create_path(tmpfile_pathname, pfiled, tmpfile, len, 1);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Create path failed");
+ r = -1;
+ goto out;
}
r = link(tmpfile_pathname, pathname);
goto out;
}
-static int __locked_by(char *lockfile, char *expected, uint32_t expected_len)
+static int __locked_by(char *lockfile, char *expected, uint32_t expected_len, int direct)
{
int ret = -1;
- int r, fd;
+ int r;
uint32_t len;
char tmpbuf[MAX_UNIQUESTR_LEN];
XSEGLOG2(&lc, D, "Started. Lockfile: %s, expected: %s, expected_len: %u", lockfile, expected, expected_len);
- fd = open(lockfile, O_RDONLY, S_IRWXU | S_IRUSR);
- if (fd < 0) {
+ r = read_path(lockfile, tmpbuf, MAX_UNIQUESTR_LEN, 0, direct);
+ if (r < 0) {
if (errno != ENOENT){
XSEGLOG2(&lc, E, "Error opening %s", lockfile);
} else {
}
goto out;
}
- r = pread(fd, tmpbuf, MAX_UNIQUESTR_LEN, 0);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Error reading from %s", lockfile);
- close(fd);
- goto out;
- }
len = (uint32_t)r;
XSEGLOG2(&lc, D, "Read %u bytes", len);
- r = close(fd);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Could not close lockfile %s", lockfile);
- goto out;
- }
- if (len == expected_len && !strncmp(tmpbuf, expected, expected_len)){
+ if (!strncmp(tmpbuf, expected, expected_len)){
XSEGLOG2(&lc, D, "Lock file %s locked by us.", lockfile);
ret = 0;
}
static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
uint32_t flags, int fd)
{
- int r;
+ int r, direct;
XSEGLOG2(&lc, D, "Started. Lockfile: %s, Tmpfile:%s", lockfile, tmpfile);
- r = pwrite(fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
- if (r < 0) {
+
+ r = pfiled_write(pfiled, fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
+ if (r < 0 || r < pfiled->uniquestr_len) {
return -1;
}
r = fsync(fd);
return -1;
}
+ direct = pfiled->directio;
+// direct = 0;
+
while (link(tmpfile, lockfile) < 0) {
//actual error
if (errno != EEXIST){
tmpfile, lockfile);
return -1;
}
- r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len);
+ r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len, direct);
if (!r) {
break;
}
char *tmpfile = malloc(MAX_FILENAME_SIZE);
char *lockfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
char *tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
- int fd = -1, pos;
+ int fd = -1, flags;
char *target = xseg_get_target(peer->xseg, req);
uint32_t buf_len, tmpfile_len;
}
- pos = 0;
- strncpy(buf + pos, target, req->targetlen);
- pos = req->targetlen;
- strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
- pos += LOCK_SUFFIX_LEN;
- buf[pos] = 0;
- buf_len = pos;
+ buf_len = strjoin(buf, target, req->targetlen, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
- pos = 0;
- strncpy(tmpfile + pos, buf, buf_len);
- pos += buf_len;
- strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
- pos += pfiled->uniquestr_len;
- strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
- pos += FIO_STR_ID_LEN;
- tmpfile[pos] = 0;
- tmpfile_len = pos;
+ tmpfile_len = strnjoin(tmpfile, 3, buf, buf_len,
+ pfiled->uniquestr, pfiled->uniquestr_len,
+ fio->str_id, FIO_STR_ID_LEN);
XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
//nfs v >= 3
XSEGLOG2(&lc, D, "Tmpfile: %s", tmpfile_pathname);
- fd = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
+ flags = O_RDWR|O_CREAT|O_EXCL;
+ if (pfiled->directio)
+ flags |= O_DIRECT;
+ fd = open(tmpfile_pathname, flags, S_IRWXU | S_IRUSR);
if (fd < 0) {
//actual error
if (errno != EEXIST){
char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
char *tmpbuf = malloc(MAX_UNIQUESTR_LEN + 1);
char *target = xseg_get_target(peer->xseg, req);
- int r, pos;
+ int r, buf_len, direct;
if (!buf || !pathname) {
XSEGLOG2(&lc, E, "Out of memory");
goto out;
}
- pos = 0;
- strncpy(buf + pos, target, req->targetlen);
- pos += req->targetlen;
- strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
- pos += LOCK_SUFFIX_LEN;
- buf[pos] = 0;
+ buf_len = strnjoin(buf, 2 , target, req->targetlen, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
- r = create_path(pathname, pfiled, buf,
- req->targetlen + strlen(LOCK_SUFFIX), 0);
+ r = create_path(pathname, pfiled, buf, buf_len, 0);
if (r < 0) {
XSEGLOG2(&lc, E, "Create path failed for %s", buf);
goto out;
}
+ direct = pfiled->directio;
+
if ((req->flags & XF_FORCE) || !__locked_by(pathname, pfiled->uniquestr,
- pfiled->uniquestr_len)) {
+ pfiled->uniquestr_len, direct)) {
r = unlink(pathname);
if (r < 0) {
XSEGLOG2(&lc, E, "Could not unlink %s", pathname);
READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
READ_ARG_STRING("--uniquestr", pfiled->uniquestr, MAX_UNIQUESTR_LEN);
+ READ_ARG_BOOL("--directio", pfiled->directio);
END_READ_ARGS();
pfiled->uniquestr_len = strlen(pfiled->uniquestr);