Revision 34f66035

b/xseg/peers/user/filed.c
38 38

  
39 39
#define _GNU_SOURCE
40 40
#include <stdio.h>
41
#include <stdarg.h>
41 42
#include <stdlib.h>
42 43
#include <sys/types.h>
43 44
#include <sys/stat.h>
......
109 110
	uint32_t prefix_len;
110 111
	uint32_t uniquestr_len;
111 112
	long maxfds;
113
	uint32_t directio;
112 114
	char vpath[MAX_PATH_SIZE + 1];
113 115
	char prefix[MAX_PREFIX_LEN + 1];
114 116
	char uniquestr[MAX_UNIQUESTR_LEN + 1];
......
225 227
	return;
226 228
}
227 229

  
230
static int strnjoin(char *dest, int n, ...)
231
{
232
	int pos, i;
233
	va_list ap;
234
	char *s;
235
	int l;
236

  
237
	pos = 0;
238
	va_start(ap, n);
239
	for (i = 0; i < n; i++) {
240
		s = va_arg(ap, char *);
241
		l = va_arg(ap, int);
242
		strncpy(dest + pos, s, l);
243
		pos += l;
244
	}
245
	dest[pos] = 0;
246
	va_end(ap);
247

  
248
	return pos;
249
}
250

  
251
static int strjoin(char *dest, char *f, int f_len, char *s, int s_len)
252
{
253
	int pos;
254

  
255
	pos = 0;
256
	strncpy(dest + pos, f, f_len);
257
	pos += f_len;
258
	strncpy(dest + pos, s, s_len);
259
	pos += s_len;
260
	dest[pos] = 0;
261

  
262
	return f_len + s_len;
263
}
264

  
228 265
static int create_path(char *buf, struct pfiled *pfiled, char *target,
229 266
			uint32_t targetlen, int mkdirs)
230 267
{
......
261 298
	return 0;
262 299
}
263 300

  
301

  
302
static ssize_t persisting_read(int fd, void *data, size_t size, off_t offset)
303
{
304
	ssize_t r = 0, sum = 0;
305
	char error_str[1024];
306
	XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
307

  
308
	while (sum < size) {
309
		XSEGLOG2(&lc, D, "read: %llu, (aligned)size: %llu", sum, size);
310
		r = pread(fd, (char *)data + sum, size - sum, offset + sum);
311
		if (r < 0) {
312
			XSEGLOG2(&lc, E, "fd: %d, Error: %s", fd, strerror_r(errno, error_str, 1023));
313
			break;
314
		} else if (r == 0) {
315
			break;
316
		} else {
317
			sum += r;
318
		}
319
	}
320
	XSEGLOG2(&lc, D, "read: %llu, (aligned)size: %llu", sum, size);
321

  
322
	if (sum == 0 && r < 0) {
323
		sum = r;
324
	}
325
	XSEGLOG2(&lc, D, "Finished. Read %d, r = %d", sum, r);
326

  
327
	return sum;
328
}
329

  
330
static ssize_t persisting_write(int fd, void *data, size_t size, off_t offset)
331
{
332
	ssize_t r = 0, sum = 0;
333

  
334
	XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
335
	while (sum < size) {
336
		XSEGLOG2(&lc, D, "written: %llu, (aligned)size: %llu", sum, size);
337
		r = pwrite(fd, (char *)data + sum, size - sum, offset + sum);
338
		if (r < 0) {
339
			break;
340
		} else {
341
			sum += r;
342
		}
343
	}
344
	XSEGLOG2(&lc, D, "written: %llu, (aligned)size: %llu", sum, size);
345

  
346
	if (sum == 0 && r < 0) {
347
		sum = r;
348
	}
349
	XSEGLOG2(&lc, D, "Finished. Wrote %d, r = %d", sum, r);
350

  
351
	return sum;
352
}
353

  
354
static ssize_t aligned_read(int fd, void *data, ssize_t size, off_t offset, int alignment)
355
{
356
	char *tmp_data;
357
	ssize_t r;
358
	size_t misaligned_data, misaligned_size, misaligned_offset;
359
	off_t aligned_offset=offset;
360
	size_t aligned_size=size;
361

  
362
	misaligned_data = (unsigned long)data % alignment;
363
	misaligned_size = size % alignment;
364
	misaligned_offset = offset % alignment;
365
	XSEGLOG2(&lc, D, "misaligned_data: %u, misaligned_size: %u, misaligned_offset: %u", misaligned_data, misaligned_size, misaligned_offset);
366
	if (misaligned_data || misaligned_size || misaligned_offset) {
367
		aligned_offset = offset - misaligned_offset;
368
		aligned_size = size + misaligned_offset;
369

  
370
		misaligned_size = aligned_size % alignment;
371
		aligned_size = aligned_size - misaligned_size + alignment;
372
		r = posix_memalign(&tmp_data, alignment, aligned_size);
373
		if (r < 0) {
374
			return -1;
375
		}
376
	} else {
377
		tmp_data = data;
378
		aligned_offset = offset;
379
		aligned_size = size;
380
	}
381

  
382
	XSEGLOG2(&lc, D, "aligned_data: %u, aligned_size: %u, aligned_offset: %u", tmp_data, aligned_size, aligned_offset);
383
	r = persisting_read(fd, tmp_data, aligned_size, aligned_offset);
384

  
385
	//FIXME if r < size ?
386
	if (tmp_data != data) {
387
		memcpy(data, tmp_data + misaligned_offset, size);
388
		free(tmp_data);
389
	}
390
	if (r >= size)
391
		r = size;
392
	return r;
393
}
394

  
395
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
396

  
397
int __fcntl_lock(int fd, off_t start, off_t len)
398
{
399
	return pthread_mutex_lock(&m);
400
}
401

  
402
int __fcntl_unlock(int fd, off_t start, off_t len)
403
{
404
	return pthread_mutex_unlock(&m);
405
}
406

  
407
static ssize_t aligned_write(int fd, void *data, size_t size, off_t offset, int alignment)
408
{
409
	int locked = 0;
410
	char *tmp_data;
411
	ssize_t r;
412
	size_t misaligned_data, misaligned_size, misaligned_offset;
413
	size_t aligned_size = size, aligned_offset = offset, read_size;
414
	misaligned_data = (unsigned long)data % alignment;
415
	misaligned_size = size % alignment;
416
	misaligned_offset = offset % alignment;
417
	if (misaligned_data || misaligned_size || misaligned_offset) {
418
		//if somthing is misaligned then:
419
		//
420
		// First check if the offset was missaligned.
421
		aligned_offset = offset - misaligned_offset;
422

  
423
		// Then adjust the size with the misaligned offset and check if
424
		// it remains misaligned.
425
		aligned_size = size + misaligned_offset;
426
		misaligned_size = aligned_size % alignment;
427

  
428
		// in case there is no misaligned_size
429
		if (misaligned_size)
430
			aligned_size = aligned_size + alignment - misaligned_size;
431

  
432
		// Allocate aligned memory
433
		r = posix_memalign(&tmp_data, alignment, aligned_size);
434
		if (r < 0) {
435
			return -1;
436
		}
437

  
438
		XSEGLOG2(&lc, D, "fd: %d, misaligned_data: %u, misaligned_size: %u, misaligned_offset: %u", fd, misaligned_data, misaligned_size, misaligned_offset);
439
		XSEGLOG2(&lc, D, "fd: %d, aligned_data: %u, aligned_size: %u, aligned_offset: %u", fd, tmp_data, aligned_size, aligned_offset);
440
		XSEGLOG2(&lc, D, "fd: %d, locking from %u to %u", fd, aligned_offset, aligned_offset + aligned_size);
441
		__fcntl_lock(fd, aligned_offset, aligned_size + alignment - misaligned_size);
442
		locked = 1;
443

  
444
		if (misaligned_offset) {
445
			XSEGLOG2(&lc, D, "fd: %d, size: %d, offset: %d", fd, size, offset);
446
			/* read misaligned_offset */
447
			read_size = alignment;
448
			r = persisting_read(fd, tmp_data, alignment, aligned_offset);
449
			if (r < 0) {
450
				free(tmp_data);
451
				return -1;
452
			} else if (r != read_size) {
453
				memset(tmp_data + r, 0, read_size - r);
454
			}
455
		}
456

  
457
		if (misaligned_size) {
458
			read_size = alignment;
459
			r = persisting_read(fd, tmp_data + aligned_size - alignment, alignment,
460
					aligned_offset + aligned_size - alignment);
461
			if (r < 0) {
462
				free(tmp_data);
463
				return -1;
464
			} else if (r != read_size) {
465
				memset(tmp_data + aligned_size - alignment + r, 0, read_size - r);
466
			}
467
		}
468
		memcpy(tmp_data + misaligned_offset, data, size);
469
	} else {
470
		aligned_size = size;
471
		aligned_offset = offset;
472
		tmp_data = data;
473
	}
474

  
475
	r = persisting_write(fd, tmp_data, aligned_size, aligned_offset);
476

  
477
	if (locked) {
478
		XSEGLOG2(&lc, D, "fd: %d, unlocking from %u to %u", fd, aligned_offset, aligned_offset + aligned_size);
479
		__fcntl_unlock(fd, aligned_offset, aligned_size + alignment - misaligned_size);
480
	}
481
	if (tmp_data != data) {
482
		free(tmp_data);
483
	}
484

  
485
	if (r >= size)
486
		r = size;
487
	return r;
488
}
489

  
490
static ssize_t filed_write(int fd, void *data, size_t size, off_t offset, int direct)
491
{
492
	if (direct)
493
		return aligned_write(fd, data, size, offset, 512);
494
	else
495
		return persisting_write(fd, data, size, offset);
496
}
497

  
498
static ssize_t filed_read(int fd, void *data, size_t size, off_t offset, int direct)
499
{
500
	if (direct)
501
		return aligned_read(fd, data, size, offset, 512);
502
	else
503
		return persisting_read(fd, data, size, offset);
504
}
505

  
506
static ssize_t pfiled_read(struct pfiled *pfiled, int fd, void *data, size_t size, off_t offset)
507
{
508
	return filed_read(fd, data, size, offset, pfiled->directio);
509
}
510

  
511
static ssize_t pfiled_write(struct pfiled *pfiled, int fd, void *data, size_t size, off_t offset)
512
{
513
	return filed_write(fd, data, size, offset, pfiled->directio);
514
}
515

  
516
static ssize_t generic_io_path(char *path, void *data, size_t size, off_t offset, int write, int flags, mode_t mode)
517
{
518
	int fd;
519
	ssize_t r;
520

  
521
	fd = open(path, flags, mode);
522
	if (fd < 0) {
523
		return -1;
524
	}
525
	XSEGLOG2(&lc, D, "Opened file %s as fd %d", path, fd);
526

  
527
	if (write) {
528
		r = filed_write(fd, data, size, offset, flags & O_DIRECT);
529
	} else {
530
		r = filed_read(fd, data, size, offset, flags & O_DIRECT);
531
	}
532

  
533
	close(fd);
534

  
535
	return r;
536
}
537

  
538
static ssize_t read_path(char *path, void *data, size_t size, off_t offset, int direct)
539
{
540
	int flags = O_RDONLY;
541
	if (direct)
542
		flags |= O_DIRECT;
543

  
544
	return generic_io_path(path, data, size, offset, 0, flags, 0);
545
}
546

  
547
static ssize_t pfiled_read_name(struct pfiled *pfiled, char *name, uint32_t namelen, void *data, size_t size, off_t offset)
548
{
549
	char path[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
550
	int r;
551
	r = create_path(path, pfiled, name, namelen, 1);
552
	if (r < 0) {
553
		XSEGLOG2(&lc, E, "Could not create path");
554
		return -1;
555
	}
556
	return read_path(path, data, size, offset, pfiled->directio);
557
}
558

  
559
static ssize_t write_path(char *path, void *data, size_t size, off_t offset, int direct, int extra_open_flags, mode_t mode)
560
{
561
	int flags = O_RDWR | extra_open_flags;
562
	if (direct)
563
		flags |= O_DIRECT;
564
	return generic_io_path(path, data, size, offset, 1, flags, mode);
565
}
566

  
567
static ssize_t pfiled_write_name(struct pfiled *pfiled, char *name, uint32_t namelen, void *data, size_t size, off_t offset, int extra_open_flags, mode_t mode)
568
{
569
	char path[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
570
	int r;
571
	r = create_path(path, pfiled, name, namelen, 1);
572
	if (r < 0) {
573
		XSEGLOG2(&lc, E, "Could not create path");
574
		return -1;
575
	}
576
	return write_path(path, data, size, offset, pfiled->directio, extra_open_flags, mode);
577
}
578

  
264 579
static int is_target_valid_len(struct pfiled *pfiled, char *target,
265 580
		uint32_t targetlen, int mode)
266 581
{
......
306 621

  
307 622
static int open_file_write(struct pfiled *pfiled, char *target, uint32_t targetlen)
308 623
{
309
	int r, fd;
624
	int r, fd, flags;
310 625
	char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
311 626
	char error_str[1024];
312 627

  
......
315 630
		XSEGLOG2(&lc, E, "Could not create path");
316 631
		return -1;
317 632
	}
633
	flags = O_RDWR|O_CREAT;
634
	if (pfiled->directio)
635
		flags |= O_DIRECT;
318 636
	XSEGLOG2(&lc, D, "Opening file %s with O_RDWR|O_CREAT", tmp);
319
	fd = open(tmp, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
637
	fd = open(tmp, flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
320 638
	if (fd < 0){
321 639
		XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
322 640
		return -1;
......
326 644

  
327 645
static int open_file_read(struct pfiled *pfiled, char *target, uint32_t targetlen)
328 646
{
329
	int r, fd;
647
	int r, fd, flags;
330 648
	char tmp[XSEG_MAX_TARGETLEN + MAX_PATH_SIZE + 1];
331 649
	char error_str[1024];
332 650

  
......
336 654
		return -1;
337 655
	}
338 656
	XSEGLOG2(&lc, D, "Opening file %s with O_RDWR", tmp);
339
	fd = open(tmp, O_RDWR, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
657
	flags = O_RDWR;
658
	if (pfiled->directio)
659
		flags |= O_DIRECT;
660
	fd = open(tmp, flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
340 661
	if (fd < 0){
341 662
		XSEGLOG2(&lc, E, "Could not open file %s. Error: %s", tmp, strerror_r(errno, error_str, 1023));
342 663
		return -1;
......
481 802

  
482 803
	XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
483 804
			req->size);
484
	while (req->serviced < req->size) {
485
		XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
486
				req->serviced, req->size);
487
		r = pread(fd, data + req->serviced,
488
				req->size- req->serviced,
489
				req->offset + req->serviced);
490
		if (r < 0) {
491
			XSEGLOG2(&lc, E, "Cannot read");
492
			break;
493
		}
494
		else if (r == 0) {
495
			/* reached end of file. zero out the rest data buffer */
496
			memset(data + req->serviced, 0, req->size - req->serviced);
497
			req->serviced = req->size;
498
		}
499
		else {
500
			req->serviced += r;
501
		}
805
	r = pfiled_read(pfiled, fd, data, req->size, req->offset);
806
	if (r < 0) {
807
		XSEGLOG2(&lc, E, "Cannot read");
808
		req->serviced = 0;
809
	} else if (r < req->size) {
810
		/* reached end of file. zero out the rest data buffer */
811
		memset(data + r, 0, req->size - r);
812
		req->serviced = req->size;
813
	} else {
814
		req->serviced = r;
502 815
	}
503 816
	XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
504 817
			req->size);
......
522 835
	struct pfiled *pfiled = __get_pfiled(peer);
523 836
	struct fio *fio = __get_fio(pr);
524 837
	struct xseg_request *req = pr->req;
525
	int r, fd;
838
	int fd;
839
	ssize_t r;
526 840
	char *target = xseg_get_target(peer->xseg, req);
527 841
	char *data = xseg_get_data(peer->xseg, req);
528 842

  
......
556 870

  
557 871
	XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
558 872
			req->size);
559
	while (req->serviced < req->size) {
560
		XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu",
561
				req->serviced, req->size);
562
		r = pwrite(fd, data + req->serviced,
563
				req->size- req->serviced,
564
				req->offset + req->serviced);
565
		if (r < 0) {
566
			break;
567
		}
568
		else {
569
			req->serviced += r;
570
		}
873
	r = pfiled_write(pfiled, fd, data, req->size, req->offset);
874
	if (r < 0) {
875
		req->serviced = 0;
876
	} else {
877
		req->serviced = r;
571 878
	}
572 879
	XSEGLOG2(&lc, D, "req->serviced: %llu, req->size: %llu", req->serviced,
573 880
			req->size);
......
672 979
		goto out;
673 980
	}
674 981

  
675
	r = create_path(buf, pfiled, xcopy->target, xcopy->targetlen, 0);
676
	if (r < 0)  {
677
		XSEGLOG2(&lc, E, "Create path failed");
678
		r = -1;
679
		goto out;
680
	}
681

  
682
	src = open(buf, O_RDONLY);
982
	src = open_file(pfiled, xcopy->target, xcopy->targetlen, READ);
683 983
	if (src < 0) {
684
		XSEGLOG2(&lc, E, "fail in src %s", buf);
685
		r = src;
984
		XSEGLOG2(&lc, E, "Failed to open src");
686 985
		goto out;
687 986
	}
688 987

  
......
770 1069
}
771 1070

  
772 1071
static int __get_precalculated_hash(struct peerd *peer, char *target,
773
		uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
1072
		uint32_t targetlen, char *hash)
774 1073
{
775 1074
	int ret = -1;
776
	int r, fd;
777
	uint32_t len, pos;
778
	char *hash_file = NULL, *hash_path = NULL;
779
	char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
1075
	int r;
1076
	uint32_t len, hash_file_len;
1077
	char *hash_file = NULL;
780 1078
	struct pfiled *pfiled = __get_pfiled(peer);
781 1079

  
782 1080
	XSEGLOG2(&lc, D, "Started.");
783 1081

  
784 1082
	hash_file = malloc(MAX_FILENAME_SIZE + 1);
785
	hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
786

  
787
	pos = 0;
788
	strncpy(hash_file+pos, target, targetlen);
789
	pos += targetlen;
790
	strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
791
	pos += HASH_SUFFIX_LEN;
792
	hash_file[pos] = 0;
1083
	hash_file_len = strjoin(hash_file, target, targetlen, HASH_SUFFIX, HASH_SUFFIX_LEN);
793 1084
	hash[0] = 0;
794 1085

  
795
	r = create_path(hash_path, pfiled, hash_file, pos, 1);
796
	if (r < 0)  {
797
		XSEGLOG2(&lc, E, "Create path failed");
798
		goto out;
799
	}
800

  
801
	fd = open(hash_path, O_RDONLY, S_IRWXU | S_IRUSR);
802
	if (fd < 0) {
1086
	r = pfiled_read_name(pfiled, hash_file, hash_file_len, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
1087
	if (r < 0) {
803 1088
		if (errno != ENOENT){
804
			XSEGLOG2(&lc, E, "Error opening %s", hash_path);
1089
			XSEGLOG2(&lc, E, "Error opening %s", hash_file);
805 1090
		} else {
806 1091
			XSEGLOG2(&lc, I, "No precalculated hash for %s", hash_file);
807 1092
			ret = 0;
808 1093
		}
809 1094
		goto out;
810 1095
	}
811

  
812
	r = pread(fd, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
813
	if (r < 0) {
814
		XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
815
		close(fd);
816
		goto out;
817
	}
818 1096
	len = (uint32_t)r;
819

  
820 1097
	XSEGLOG2(&lc, D, "Read %u bytes", len);
821 1098

  
822
	r = close(fd);
823
	if (r < 0) {
824
		XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
825
		goto out;
826
	}
827

  
828 1099
	if (len == HEXLIFIED_SHA256_DIGEST_SIZE){
829
		strncpy(hash, tmpbuf, HEXLIFIED_SHA256_DIGEST_SIZE);
830 1100
		hash[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
831 1101
		XSEGLOG2(&lc, D, "Found hash for %s : %s", hash_file, hash);
832 1102
		ret = 0;
833 1103
	}
834 1104
out:
835
	free(hash_path);
1105
	free(hash_file);
836 1106
	XSEGLOG2(&lc, D, "Finished.");
837 1107
	return ret;
838 1108
}
839 1109

  
840 1110
static int __set_precalculated_hash(struct peerd *peer, char *target,
841
		uint32_t targetlen, char hash[HEXLIFIED_SHA256_DIGEST_SIZE + 1])
1111
		uint32_t targetlen, char *hash)
842 1112
{
843 1113
	int ret = -1;
844
	int r, fd;
845
	uint32_t len, pos;
846
	char *hash_file = NULL, *hash_path = NULL;
847
	char tmpbuf[HEXLIFIED_SHA256_DIGEST_SIZE];
1114
	int r;
1115
	uint32_t len, hash_file_len;
1116
	char *hash_file = NULL;
848 1117
	struct pfiled *pfiled = __get_pfiled(peer);
849 1118

  
850 1119
	XSEGLOG2(&lc, D, "Started.");
851 1120

  
852 1121
	hash_file = malloc(MAX_FILENAME_SIZE + 1);
853
	hash_path = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
854

  
855
	pos = 0;
856
	strncpy(hash_file+pos, target, targetlen);
857
	pos += targetlen;
858
	strncpy(hash_file+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
859
	pos += HASH_SUFFIX_LEN;
860
	hash_file[pos] = 0;
1122
	hash_file_len = strjoin(hash_file, target, targetlen, HASH_SUFFIX, HASH_SUFFIX_LEN);
861 1123

  
862
	r = create_path(hash_path, pfiled, hash_file, pos, 1);
863
	if (r < 0)  {
864
		XSEGLOG2(&lc, E, "Create path failed");
865
		goto out;
866
	}
867

  
868
	fd = open(hash_path, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
869
	if (fd < 0) {
870
		if (errno != ENOENT){
871
			XSEGLOG2(&lc, E, "Error opening %s", hash_path);
1124
	r = pfiled_write_name(pfiled, hash_file, hash_file_len, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0,
1125
			O_CREAT|O_EXCL, S_IWUSR|S_IRUSR);
1126
	if (r < 0) {
1127
		if (errno != EEXIST){
1128
			XSEGLOG2(&lc, E, "Error opening %s", hash_file);
872 1129
		} else {
873 1130
			XSEGLOG2(&lc, I, "Hash file already exists %s", hash_file);
874 1131
			ret = 0;
......
876 1133
		goto out;
877 1134
	}
878 1135

  
879
	r = pwrite(fd, hash, HEXLIFIED_SHA256_DIGEST_SIZE, 0);
880
	if (r < 0) {
881
		XSEGLOG2(&lc, E, "Error reading from %s", hash_path);
882
		close(fd);
883
		goto out;
884
	}
885 1136
	len = (uint32_t)r;
886

  
887 1137
	XSEGLOG2(&lc, D, "Wrote %u bytes", len);
888

  
889
	r = close(fd);
890
	if (r < 0) {
891
		XSEGLOG2(&lc, E, "Could not close hash_file %s", hash_path);
892
		goto out;
893
	}
894

  
1138
	ret = 0;
895 1139
out:
896
	free(hash_path);
1140
	free(hash_file);
897 1141
	XSEGLOG2(&lc, D, "Finished.");
898 1142
	return ret;
899 1143
}
......
907 1151
	//write to hash_tmpfile
908 1152
	//link file
909 1153

  
910
	int src = -1, dst = -1, r = -1, pos;
1154
	int len;
1155
	int src = -1, dst = -1, r = -1;
911 1156
	ssize_t c;
912
	uint64_t sum, written, trailing_zeros;
1157
	uint64_t sum, trailing_zeros;
913 1158
	struct pfiled *pfiled = __get_pfiled(peer);
914 1159
	struct fio *fio = __get_fio(pr);
915 1160
	struct xseg_request *req = pr->req;
916 1161
	char *pathname = NULL, *tmpfile_pathname = NULL, *tmpfile = NULL;
917 1162
	char *target;
918
	char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
1163
//	char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
1164
	char *hash_name;
919 1165
	char name[XSEG_MAX_TARGETLEN + 1];
920 1166

  
921 1167
	unsigned char *object_data = NULL;
......
939 1185
		goto out;
940 1186
	}
941 1187

  
1188
	r = posix_memalign(&hash_name, 512, 512 + 1);
1189

  
942 1190
	r = __get_precalculated_hash(peer, target, req->targetlen, hash_name);
943 1191
	if (r < 0) {
944 1192
		XSEGLOG2(&lc, E, "Error getting precalculated hash");
......
956 1204
	name[req->targetlen] = 0;
957 1205

  
958 1206
	pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
959
	object_data = malloc(sizeof(char) * req->size);
1207
	//object_data = malloc(sizeof(char) * req->size);
1208
	r = posix_memalign(&object_data, 512, sizeof(char) * req->size);
960 1209
	if (!pathname || !object_data){
961 1210
		XSEGLOG2(&lc, E, "Out of memory");
962 1211
		goto out;
......
969 1218
		goto out;
970 1219
	}
971 1220

  
972
	sum = 0;
973
	while (sum < req->size) {
974
		c = pread(src, object_data + sum, req->size - sum, sum);
975
		if (c < 0) {
976
			XSEGLOG2(&lc, E, "Error reading from source");
977
			r = -1;
978
			goto out;
979
		}
980
		if (c == 0) {
981
			break;
982
		}
983
		sum += c;
1221
	c = pfiled_read(pfiled, src, object_data, req->size, req->offset);
1222
	if (c < 0) {
1223
		XSEGLOG2(&lc, E, "Error reading from source");
1224
		r = -1;
1225
		goto out;
984 1226
	}
1227
	sum = c;
985 1228

  
986 1229
	//rstrip here in case zeros were written in the end
987 1230
	trailing_zeros = 0;
......
1001 1244

  
1002 1245

  
1003 1246
	r = create_path(pathname, pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, 1);
1004
	if (r < 0)  {
1247
	if (r < 0) {
1005 1248
		XSEGLOG2(&lc, E, "Create path failed");
1006 1249
		r = -1;
1007 1250
		goto out;
1008 1251
	}
1009 1252

  
1010 1253

  
1011

  
1012
	dst = open(pathname, O_WRONLY);
1254
	dst = open_file(pfiled, hash_name, HEXLIFIED_SHA256_DIGEST_SIZE, READ);
1013 1255
	if (dst > 0) {
1014 1256
		XSEGLOG2(&lc, I, "%s already exists, no write needed", pathname);
1015 1257
		req->serviced = req->size;
......
1031 1273
		goto out;
1032 1274
	}
1033 1275

  
1034
	pos = 0;
1035
	strncpy(tmpfile + pos, target, req->targetlen);
1036
	pos += req->targetlen;
1037
	strncpy(tmpfile + pos, SNAP_SUFFIX, SNAP_SUFFIX_LEN);
1038
	pos += SNAP_SUFFIX_LEN;
1039
	strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
1040
	pos += pfiled->uniquestr_len;
1041
	strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
1042
	pos += FIO_STR_ID_LEN;
1043
	tmpfile[pos] = 0;
1044

  
1045
	r = create_path(tmpfile_pathname, pfiled, tmpfile, pos, 1);
1046
	if (r < 0)  {
1047
		XSEGLOG2(&lc, E, "Create path failed");
1048
		r = -1;
1049
		goto out;
1050
	}
1276
	len = strnjoin(tmpfile, 4, target, req->targetlen,
1277
				HASH_SUFFIX, HASH_SUFFIX_LEN,
1278
				pfiled->uniquestr, pfiled->uniquestr_len,
1279
				fio->str_id, FIO_STR_ID_LEN);
1051 1280

  
1052
	XSEGLOG2(&lc, D, "Opening %s", tmpfile_pathname);
1053
	dst = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL,
1054
			S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1055
	if (dst < 0) {
1281
	r = pfiled_write_name(pfiled, tmpfile, len, object_data, sum, 0, O_CREAT|O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
1282
	if (r < 0) {
1056 1283
		if (errno != EEXIST){
1057 1284
			char error_str[1024];
1058 1285
			XSEGLOG2(&lc, E, "Error opening %s (%s)", tmpfile_pathname, strerror_r(errno, error_str, 1023));
......
1062 1289
		}
1063 1290
		r = -1;
1064 1291
		goto out;
1292
	} else if (r < sum) {
1293
		XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
1294
		r = -1;
1295
		goto out_unlink;
1065 1296
	}
1066
	XSEGLOG2(&lc, D, "Opened %s", tmpfile_pathname);
1297
	XSEGLOG2(&lc, D, "Opened %s and wrote", tmpfile);
1067 1298

  
1068
	written = 0;
1069
	while (written < sum) {
1070
		c = write(dst, object_data + written, sum - written);
1071
		if (c < 0) {
1072
			XSEGLOG2(&lc, E, "Error writting to dst file %s", tmpfile_pathname);
1073
			r = -1;
1074
			goto out_unlink;
1075
		}
1076
		written += c;
1299
	r = create_path(tmpfile_pathname, pfiled, tmpfile, len, 1);
1300
	if (r < 0)  {
1301
		XSEGLOG2(&lc, E, "Create path failed");
1302
		r = -1;
1303
		goto out;
1077 1304
	}
1078 1305

  
1079 1306
	r = link(tmpfile_pathname, pathname);
......
1135 1362
	goto out;
1136 1363
}
1137 1364

  
1138
static int __locked_by(char *lockfile, char *expected, uint32_t expected_len)
1365
static int __locked_by(char *lockfile, char *expected, uint32_t expected_len, int direct)
1139 1366
{
1140 1367
	int ret = -1;
1141
	int r, fd;
1368
	int r;
1142 1369
	uint32_t len;
1143 1370
	char tmpbuf[MAX_UNIQUESTR_LEN];
1144 1371

  
1145 1372
	XSEGLOG2(&lc, D, "Started. Lockfile: %s, expected: %s, expected_len: %u", lockfile, expected, expected_len);
1146
	fd = open(lockfile, O_RDONLY, S_IRWXU | S_IRUSR);
1147
	if (fd < 0) {
1373
	r = read_path(lockfile, tmpbuf, MAX_UNIQUESTR_LEN, 0, direct);
1374
	if (r < 0) {
1148 1375
		if (errno != ENOENT){
1149 1376
			XSEGLOG2(&lc, E, "Error opening %s", lockfile);
1150 1377
		} else {
......
1154 1381
		}
1155 1382
		goto out;
1156 1383
	}
1157
	r = pread(fd, tmpbuf, MAX_UNIQUESTR_LEN, 0);
1158
	if (r < 0) {
1159
		XSEGLOG2(&lc, E, "Error reading from %s", lockfile);
1160
		close(fd);
1161
		goto out;
1162
	}
1163 1384
	len = (uint32_t)r;
1164 1385
	XSEGLOG2(&lc, D, "Read %u bytes", len);
1165
	r = close(fd);
1166
	if (r < 0) {
1167
		XSEGLOG2(&lc, E, "Could not close lockfile %s", lockfile);
1168
		goto out;
1169
	}
1170
	if (len == expected_len && !strncmp(tmpbuf, expected, expected_len)){
1386
	if (!strncmp(tmpbuf, expected, expected_len)){
1171 1387
		XSEGLOG2(&lc, D, "Lock file %s locked by us.", lockfile);
1172 1388
		ret = 0;
1173 1389
	}
......
1179 1395
static int __try_lock(struct pfiled *pfiled, char *tmpfile, char *lockfile,
1180 1396
			uint32_t flags, int fd)
1181 1397
{
1182
	int r;
1398
	int r, direct;
1183 1399
	XSEGLOG2(&lc, D, "Started. Lockfile: %s, Tmpfile:%s", lockfile, tmpfile);
1184
	r = pwrite(fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
1185
	if (r < 0) {
1400

  
1401
	r = pfiled_write(pfiled, fd, pfiled->uniquestr, pfiled->uniquestr_len, 0);
1402
	if (r < 0 || r < pfiled->uniquestr_len) {
1186 1403
		return -1;
1187 1404
	}
1188 1405
	r = fsync(fd);
......
1190 1407
		return -1;
1191 1408
	}
1192 1409

  
1410
	direct = pfiled->directio;
1411
//	direct = 0;
1412

  
1193 1413
	while (link(tmpfile, lockfile) < 0) {
1194 1414
		//actual error
1195 1415
		if (errno != EEXIST){
......
1197 1417
					tmpfile, lockfile);
1198 1418
			return -1;
1199 1419
		}
1200
		r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len);
1420
		r = __locked_by(lockfile, pfiled->uniquestr, pfiled->uniquestr_len, direct);
1201 1421
		if (!r) {
1202 1422
			break;
1203 1423
		}
......
1222 1442
	char *tmpfile = malloc(MAX_FILENAME_SIZE);
1223 1443
	char *lockfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1224 1444
	char *tmpfile_pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE);
1225
	int fd = -1, pos;
1445
	int fd = -1, flags;
1226 1446
	char *target = xseg_get_target(peer->xseg, req);
1227 1447
	uint32_t buf_len, tmpfile_len;
1228 1448

  
......
1239 1459
	}
1240 1460

  
1241 1461

  
1242
	pos = 0;
1243
	strncpy(buf + pos, target, req->targetlen);
1244
	pos = req->targetlen;
1245
	strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1246
	pos += LOCK_SUFFIX_LEN;
1247
	buf[pos] = 0;
1248
	buf_len = pos;
1462
	buf_len = strjoin(buf, target, req->targetlen, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1249 1463

  
1250 1464
	XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
1251 1465

  
1252 1466

  
1253
	pos = 0;
1254
	strncpy(tmpfile + pos, buf, buf_len);
1255
	pos += buf_len;
1256
	strncpy(tmpfile + pos, pfiled->uniquestr, pfiled->uniquestr_len);
1257
	pos += pfiled->uniquestr_len;
1258
	strncpy(tmpfile + pos, fio->str_id, FIO_STR_ID_LEN);
1259
	pos += FIO_STR_ID_LEN;
1260
	tmpfile[pos] = 0;
1261
	tmpfile_len = pos;
1467
	tmpfile_len = strnjoin(tmpfile, 3, buf, buf_len,
1468
				pfiled->uniquestr, pfiled->uniquestr_len,
1469
				fio->str_id, FIO_STR_ID_LEN);
1262 1470

  
1263 1471
	XSEGLOG2(&lc, I, "Trying to acquire lock %s", buf);
1264 1472

  
......
1284 1492

  
1285 1493
	//nfs v >= 3
1286 1494
	XSEGLOG2(&lc, D, "Tmpfile: %s", tmpfile_pathname);
1287
	fd = open(tmpfile_pathname, O_WRONLY | O_CREAT | O_EXCL, S_IRWXU | S_IRUSR);
1495
	flags = O_RDWR|O_CREAT|O_EXCL;
1496
	if (pfiled->directio)
1497
		flags |= O_DIRECT;
1498
	fd = open(tmpfile_pathname, flags, S_IRWXU | S_IRUSR);
1288 1499
	if (fd < 0) {
1289 1500
		//actual error
1290 1501
		if (errno != EEXIST){
......
1340 1551
	char *pathname = malloc(MAX_PATH_SIZE + MAX_FILENAME_SIZE + 1);
1341 1552
	char *tmpbuf = malloc(MAX_UNIQUESTR_LEN + 1);
1342 1553
	char *target = xseg_get_target(peer->xseg, req);
1343
	int r, pos;
1554
	int r, buf_len, direct;
1344 1555

  
1345 1556
	if (!buf || !pathname) {
1346 1557
		XSEGLOG2(&lc, E, "Out of memory");
......
1354 1565
		goto out;
1355 1566
	}
1356 1567

  
1357
	pos = 0;
1358
	strncpy(buf + pos, target, req->targetlen);
1359
	pos += req->targetlen;
1360
	strncpy(buf + pos, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1361
	pos += LOCK_SUFFIX_LEN;
1362
	buf[pos] = 0;
1568
	buf_len = strnjoin(buf, 2 , target, req->targetlen, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1363 1569

  
1364 1570
	XSEGLOG2(&lc, I, "Started. Lockfile: %s", buf);
1365 1571

  
1366
	r = create_path(pathname, pfiled, buf,
1367
			req->targetlen + strlen(LOCK_SUFFIX), 0);
1572
	r = create_path(pathname, pfiled, buf, buf_len, 0);
1368 1573
	if (r < 0) {
1369 1574
		XSEGLOG2(&lc, E, "Create path failed for %s", buf);
1370 1575
		goto out;
1371 1576
	}
1372 1577

  
1578
	direct = pfiled->directio;
1579

  
1373 1580
	if ((req->flags & XF_FORCE) || !__locked_by(pathname, pfiled->uniquestr,
1374
						pfiled->uniquestr_len)) {
1581
						pfiled->uniquestr_len, direct)) {
1375 1582
		r = unlink(pathname);
1376 1583
		if (r < 0) {
1377 1584
			XSEGLOG2(&lc, E, "Could not unlink %s", pathname);
......
1480 1687
	READ_ARG_STRING("--archip", pfiled->vpath, MAX_PATH_SIZE);
1481 1688
	READ_ARG_STRING("--prefix", pfiled->prefix, MAX_PREFIX_LEN);
1482 1689
	READ_ARG_STRING("--uniquestr", pfiled->uniquestr, MAX_UNIQUESTR_LEN);
1690
	READ_ARG_BOOL("--directio", pfiled->directio);
1483 1691
	END_READ_ARGS();
1484 1692

  
1485 1693
	pfiled->uniquestr_len = strlen(pfiled->uniquestr);
b/xseg/tools/archipelago/archipelago/common.py
281 281

  
282 282
class Filed(MTpeer):
283 283
    def __init__(self, archip_dir=None, prefix=None, fdcache=None,
284
                 unique_str=None, nr_threads=1, nr_ops=16, **kwargs):
284
                 unique_str=None, nr_threads=1, nr_ops=16, direct=True, **kwargs):
285 285
        self.executable = FILE_BLOCKER
286 286
        self.archip_dir = archip_dir
287 287
        self.prefix = prefix
288 288
        self.fdcache = fdcache
289 289
        self.unique_str = unique_str
290
        self.direct = direct
290 291
        nr_threads = nr_ops
291 292
        if self.fdcache and fdcache < 2*nr_threads:
292 293
            raise Error("Fdcache should be greater than 2*nr_threads")
......
319 320
        if self.prefix:
320 321
            self.cli_opts.append("--prefix")
321 322
            self.cli_opts.append(self.prefix)
323
        if self.direct:
324
            self.cli_opts.append("--directio")
322 325

  
323 326

  
324 327
class Mapperd(Peer):

Also available in: Unified diff