Revision 0c97ba3e xseg/peers/user/cached.c

b/xseg/peers/user/cached.c
56 56
#define DIRTY   3
57 57
#define WRITING 4
58 58

  
59
#define bucket_readable(__status) \
60
	(__status == VALID || __status == DIRTY || __status == WRITING)
59
#define bucket_readable(__bucket_status) \
60
	(__bucket_status == VALID || __bucket_status == DIRTY || __bucket_status == WRITING)
61 61

  
62 62
/* write policies */
63 63
#define WRITETHROUGH 1
64 64
#define WRITEBACK    2
65 65

  
66
#define WRITE_POLICY(__wcp)							\
67
	__wcp == WRITETHROUGH ? "writethrough" :		\
68
	__wcp == WRITEBACK	? "writeback" :				\
66
#define WRITE_POLICY(__wcp)					\
67
	__wcp == WRITETHROUGH	? "writethrough"	:	\
68
	__wcp == WRITEBACK	? "writeback" 		:	\
69 69
	"undefined"
70 70

  
71 71

  
72

  
73
/* object states */
74
#define INVALIDATED (1 << 0)
75

  
76

  
77 72
/* cio states */
78 73
#define CIO_FAILED	1
79 74
#define CIO_ACCEPTED	2
80 75
#define CIO_READING	3
81 76

  
77
/* ce states */
78
#define CE_READY	1
79
#define CE_WRITING	2
80
#define CE_FAILED	3
81
#define CE_INVALIDATED	4
82
#define CE_NOT_READY	5
83

  
82 84
#define BUCKET_SIZE_QUANTUM 4096
83 85

  
84 86
struct cache_io {
......
102 104

  
103 105
struct ce {
104 106
	unsigned char *data;
105
	uint32_t *status;
106
	struct xwaitq *waitq;
107
	uint32_t flags;
108
	struct xlock lock;
109
	struct xworkq workq;
107
	uint32_t *bucket_status;	/* bucket_status of each bucket */
108
	struct xwaitq *bucket_waitq;	/* waitq of each bucket */
109
	uint32_t status;		/* cache entry status */
110
	struct xlock lock;		/* cache entry lock */
111
	struct xworkq workq;		/* workq of the cache entry */
112
	struct xwaitq waitq;		/* waitq of the cache entry */
110 113
	struct peer_req pr;
111 114
};
112 115

  
116
struct req_completion{
117
	struct peer_req *pr;
118
	struct xseg_request *req;
119
};
120

  
121
struct eviction_arg {
122
	struct peerd *peer;
123
	struct ce *evicted;
124
	struct ce *new_entry;
125
};
126

  
127

  
113 128

  
114 129
/*
115 130
 * Helper functions
......
150 165

  
151 166
static int is_not_loading(void *arg)
152 167
{
153
	uint32_t *status = (uint32_t *)arg;
154
	return (*status != LOADING);
168
	uint32_t *bucket_status = (uint32_t *)arg;
169
	return (*bucket_status != LOADING);
155 170
}
156 171

  
172
static int cache_entry_ready(void *arg)
173
{
174
	struct ce *ce = (struct ce *)arg;
175
	return (ce->status == CE_READY);
176
}
157 177

  
158 178
static void print_cached(struct cached *cached)
159 179
{
......
288 308
		 * //Paste data
289 309
		 * req_data = xseg_get_data(xseg, req);
290 310
		 * memcpy(req_data, req_old->data, size);
311
		 * for (i=start; i<=end; i++){
312
		 * 	ce->bucket_status[i] = WRITING;
313
		 * }
291 314
		 */
292 315
	} else {
293 316
		for (i=start; i<=end; i++){
294
			ce->status[i] = LOADING;
317
			ce->bucket_status[i] = LOADING;
295 318
		}
296 319
	}
297 320

  
......
325 348
	struct peerd *peer = (struct peerd *)c;
326 349
	struct cached *cached = peer->priv;
327 350
	struct ce *ce = (struct ce *)e;
328
	ce->flags = 0;
351
	struct cache_io *ce_cio = ce->pr.priv;
352
	ce->status = CE_NOT_READY;
329 353
	memset(ce->data, 0, cached->object_size);
330 354
	for (i = 0; i < cached->buckets_per_object; i++) {
331
		ce->status[i] = INVALID;
355
		ce->bucket_status[i] = INVALID;
332 356
	}
333
	ce->pr.cio->h = NoEntry;
334
	ce->pr.cio->pending_reqs;
357
	ce_cio->h = NoEntry;
358
	ce_cio->pending_reqs = 0;
335 359
	xlock_release(&ce->lock);
336 360
	return 0;
337 361
}
338 362

  
339
void on_put(void *c, void *e)
363
void eviction_work(void *arg)
340 364
{
341
	struct peerd *peer = (struct peerd *)c;
365
	/*
366
	 * In this context we hold a reference to the evicted cache entry and
367
	 * the new cache entry.
368
	 * Evicted cache entry lock is also held.
369
	 */
370
	struct eviction_arg *earg = (struct eviction_arg *)arg;
371
	struct peerd *peer = earg->peer;;
342 372
	struct cached *cached = peer->priv;
343
	struct ce *ce = (struct ce *)e;
344
	//since we are the last referrer to the cache entry
345
	//no lock is needed.
346

  
347
	XSEGLOG2(&lc, D, "Putting cache entry %p", e);
348

  
373
	struct ce *ce_evicted = earg->evicted;
374
	struct ce *ce_new = earg->new_entry;
375
	struct cache_io *ce_cio = ce_evicted->pr.priv;
349 376
	uint32_t start, end, i = 0;
350
	if (cached->write_policy == WRITETHROUGH || ce->flags & INVALIDATED)
377

  
378
	/* recheck status here, in case there was a race */
379
	if (ce_evicted->status & CE_INVALIDATED){
380
		ce_new->status = CE_READY;
381
		xwaitq_signal(&ce_new->waitq);
351 382
		return;
352
	//write all dirty buckets.
383
	}
384

  
353 385
	while(i < cached->buckets_per_object){
354
		if (ce->status[i] != DIRTY){
386
		if (ce_evicted->bucket_status[i] != DIRTY){
355 387
			i++;
356 388
			continue;
357 389
		}
358 390
		start = i;
359 391
		while (i < cached->buckets_per_object &&
360 392
			(i-start)*cached->bucket_size < cached->max_req_size &&
361
				ce->status[i] == DIRTY){
393
				ce_evicted->bucket_status[i] == DIRTY){
362 394
			i++;
363 395
		}
364 396
		end = i;
365
		rw_range(cached, &ce->pr, 1, start, end);
366
		ce->pr.cio->pending_reqs++;
397
		rw_range(peer, &ce_evicted->pr, 1, start, end);
398
		ce_cio->pending_reqs++;
399
	}
400
	if (!ce_cio->pending_reqs){
401
		xcache_put(cached->cache, ce_cio->h);
402
		ce_cio->h = NoEntry;
403
		ce_new->status = CE_READY;
404
		free(earg);
405
		xwaitq_signal(&ce_new->waitq);
367 406
	}
407

  
408
}
409

  
410
int on_evict(void *c, void *evicted, void *new_entry)
411
{
412
	struct peerd *peer = (struct peerd *)c;
413
	struct cached *cached = peer->priv;
414
	struct ce *ce_evicted = (struct ce *)evicted;
415
	struct ce *ce_new = (struct ce *)new_entry;
416
	struct cache_io *ce_cio = (struct cache_io *)ce_evicted->pr.priv;
417
	struct eviction_arg *earg;
418

  
419
	/*
420
	 * Since write policy doesn't change and after a cache entry gets
421
	 * invalidated, it cannot never be valid again, it is safe to procceed
422
	 * without the cache entry lock.
423
	 */
424
	if (cached->write_policy != WRITEBACK ||
425
			ce_evicted->status & CE_INVALIDATED){
426
		xcache_put(cached->cache, ce_cio->h);
427
		ce_cio->h = NoEntry;
428
		ce_new->status = CE_READY;
429
		xwaitq_signal(&ce_new->waitq);
430
		return 0;
431
	}
432

  
433
	earg = malloc(sizeof(struct eviction_arg));
434
	if (!earg)
435
		return -1;
436
	earg->peer = (struct peerd *)peer;
437
	earg->evicted = ce_evicted;
438
	earg->new_entry = ce_new;
439

  
440
	/* In all other cases, we should have the cache entry lock */
441
	if (xworkq_enqueue(&ce_evicted->workq, eviction_work, (void *)earg) < 0){
442
		return -1;
443
	}
444
	return 1;
445
}
446

  
447
void on_put(void *c, void *e)
448
{
449
	//since we are the last referrer to the cache entry
450
	//no lock is needed.
451

  
452
	XSEGLOG2(&lc, D, "Putting cache entry %p", e);
368 453
}
369 454

  
370 455
void * init_node(void *c)
......
379 464
	xlock_release(&ce->lock);
380 465

  
381 466
	ce->data = malloc(sizeof(unsigned char) * cached->object_size);
382
	ce->status = malloc(sizeof(uint32_t) * cached->buckets_per_object);
383
	ce->waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object);
384
	ce->pr.priv = malloc(sizeof(struct cio));
385
	if (!ce->data || !ce->status || !ce->waitq || !ce->pr.priv)
467
	ce->bucket_status = malloc(sizeof(uint32_t) * cached->buckets_per_object);
468
	ce->bucket_waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object);
469
	ce->pr.priv = malloc(sizeof(struct cache_io));
470
	if (!ce->data || !ce->bucket_status || !ce->bucket_waitq || !ce->pr.priv)
386 471
		goto ce_fields_fail;
387 472

  
388 473
	ce->pr.peer = peer;
389 474
	for (i = 0; i < cached->buckets_per_object; i++) {
390
		xwaitq_init(&ce->waitq[i], is_not_loading, &ce->status[i],
391
				XWAIT_SIGNAL_ONE);
475
		xwaitq_init(&ce->bucket_waitq[i], is_not_loading,
476
				&ce->bucket_status[i], XWAIT_SIGNAL_ONE);
392 477
	}
478
	xwaitq_init(&ce->waitq, cache_entry_ready, ce, XWAIT_SIGNAL_ONE);
393 479
	xworkq_init(&ce->workq, &ce->lock, 0);
394 480
	return ce;
395 481

  
396 482
ce_fields_fail:
397 483
	free(ce->data);
398
	free(ce->status);
399
	free(ce->waitq);
484
	free(ce->bucket_status);
485
	free(ce->bucket_waitq);
400 486
	free(ce);
401 487
ce_fail:
402 488
	perror("malloc");
......
405 491

  
406 492
struct xcache_ops c_ops = {
407 493
	.on_init = on_init,
494
	.on_evict = on_evict,
408 495
	.on_put  = on_put,
409 496
	.on_node_init = init_node
410 497
};
......
413 500
					uint32_t limit)
414 501
{
415 502
	uint32_t end = start + 1;
416
	while (end <= limit && ce->status[end] == INVALID)
503
	while (end <= limit && ce->bucket_status[end] == INVALID)
417 504
		end++;
418 505
	return (end - 1);
419 506
}
......
442 529

  
443 530
static void handle_read(void *arg);
444 531
//is this necessary?
445
static void status_changed(void *arg)
532
static void bucket_status_changed(void *arg)
446 533
{
447 534
	/*
448 535
	 * In this context we hold a reference to the cache entry.
......
516 603

  
517 604
	XSEGLOG2(&lc, D, "Start: %lu, Limit %lu", b, limit );
518 605
	for (i = b; i <= limit; i++) {
519
		if (bucket_readable(ce->status[i]))
606
		if (bucket_readable(ce->bucket_status[i]))
520 607
			continue;
521
		if (ce->status[i] != LOADING){
608
		if (ce->bucket_status[i] != LOADING){
522 609
			XSEGLOG2(&lc, D, "Found invalid bucket %lu\n", i);
523 610
			start_bucket = i;
524 611
			end_bucket = __get_last_invalid(ce, start_bucket, limit);
......
539 626
		cio->work.job_fn = handle_read;
540 627
		cio->work.job = pr;
541 628
		/* wait on the last bucket */
542
		xwaitq_enqueue(&ce->waitq[end_bucket], &cio->work);
629
		xwaitq_enqueue(&ce->bucket_waitq[end_bucket], &cio->work);
543 630
		return;
544 631
	}
545 632

  
......
570 657
	struct cached *cached = __get_cached(peer);
571 658
	struct cache_io *cio = __get_cache_io(pr);
572 659
	struct ce *ce = get_cache_entry(cached->cache, cio->h);
573
	(void)ce;
660
	struct xseg_request *req = pr->req;
574 661

  
575 662
	char *req_data;
576 663
	uint32_t start_bucket, end_bucket, last_read_bucket = -1;
577 664
	uint32_t i;
665
	uint64_t data_len, data_start;
578 666
	uint64_t first_bucket_offset = req->offset % cached->bucket_size;
579 667

  
580 668

  
581 669
	//what about FUA?
582 670

  
583
	uint32_t pending_buckets = 0;
584
	start = __get_bucket(cached, req->offset);
585
	end = __get_bucket(cached, req->offset + req->size - 1);
671
	start_bucket = __get_bucket(cached, req->offset);
672
	end_bucket = __get_bucket(cached, req->offset + req->size - 1);
586 673
	/*
587 674
	 * In case of a misaligned write, if the start, end buckets of the write
588 675
	 * are invalid, we have to read them before continue with the write.
589 676
	 */
590
	if (ce->status[start] == INVALID && first_bucket_offset){
591
		if (rw_range(peer, pr, 0, start, start) < 0){
677
	if (ce->bucket_status[start_bucket] == INVALID && first_bucket_offset){
678
		if (rw_range(peer, pr, 0, start_bucket, start_bucket) < 0){
592 679
			cio->state = CIO_FAILED;
593 680
			goto out;
594 681
		}
595 682
		cio->pending_reqs++;
596
		last_read_bucket = start;
683
		last_read_bucket = start_bucket;
597 684
	}
598
	if (start != end && ce->status[end] == INVALID &&
685
	if (start_bucket != end_bucket && ce->bucket_status[end_bucket] == INVALID &&
599 686
			(req->offset + req->size -1) % cached->bucket_size){
600
		if (rw_range(peer, pr, 0, end, end) < 0){
687
		if (rw_range(peer, pr, 0, end_bucket, end_bucket) < 0){
601 688
			cio->state = CIO_FAILED;
602 689
			goto out;
603 690
		}
604 691
		cio->pending_reqs++;
605
		last_read_bucket = end;
692
		last_read_bucket = end_bucket;
606 693
	}
607 694

  
608 695
	if (last_read_bucket != -1){
......
610 697
		cio->work.job = pr;
611 698
		/* wait on the last read bucket */
612 699
		XSEGLOG2(&lc, I, "Enqueuing cio %p in waitq (fn: handle_write).\n", cio);
613
		xwaitq_enqueue(&ce->waitq[last_read_bucket], &cio->work);
700
		xwaitq_enqueue(&ce->bucket_waitq[last_read_bucket], &cio->work);
614 701
		XSEGLOG2(&lc, I, "Handle_write returned after enqueuing cio %p in waitq.\n", cio);
615 702
		return;
616 703
	}
......
626 713
		 * 	mark them as dirty
627 714
		 */
628 715
		/* special care for the first bucket */
629
		data_start = cached->bucket_size * start + first_bucket_offset;
716
		data_start = cached->bucket_size * start_bucket + first_bucket_offset;
630 717
		data_len = cached->bucket_size - first_bucket_offset;
631
		memcpy(ce->data[data_start], req_data, datalen);
632
		ce->status[start] = DIRTY;
718
		memcpy(&ce->data[data_start], req_data, data_len);
719
		ce->bucket_status[start_bucket] = DIRTY;
633 720
		/* and the rest */
634
		for (i = start+1; i <= end; i++) {
635
			data_start = cached->bucket_size * (i-start) +
721
		for (i = start_bucket + 1; i <= end_bucket; i++) {
722
			data_start = cached->bucket_size * (i-start_bucket) +
636 723
				first_bucket_offset;
637 724
			if (data_start + cached->bucket_size <= req->size)
638 725
				data_len = cached->bucket_size;
639 726
			else
640 727
				data_len = req->size - data_start;
641
			memcpy(ce->data[i * cached->bucket_size],
642
				req_data[data_start], data_len);
643
			ce->status[i] = DIRTY;
728
			memcpy(&ce->data[i * cached->bucket_size],
729
				&req_data[data_start], data_len);
730
			ce->bucket_status[i] = DIRTY;
644 731
		}
645 732
	} else {
646 733
		cio->state = CIO_FAILED;
......
659 746
}
660 747

  
661 748
/*
749
 * Assert cache entry is ready.
750
 *
751
 * Depending on the op type, a handler function is enqueued in the workq of the
752
 * target's cache_entry.
753
 */
754
static void handle_readwrite_post(void *arg)
755
{
756
	/*
757
	 * In this context, we hold a reference to the associated cache entry.
758
	 */
759
	struct ce *ce;
760
	struct peer_req *pr = (struct peer_req *)arg;
761
	struct peerd *peer = pr->peer;
762
//	struct cached *cached = __get_cached(peer);
763
//	struct cache_io *ce_cio, *cio = __get_cache_io(pr);
764
	struct xseg_request *req = pr->req;
765
	int r = 0;
766

  
767
	if (ce->status != CE_READY){
768
		XSEGLOG2(&lc, E, "Cache entry %p has status %u", ce, ce->status);
769
		r = -1;
770
		//FIXME defer request ?
771
		goto out;
772
	}
773
	if (req->op == X_WRITE)
774
		r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr);
775
	else if (req->op == X_READ)
776
		r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr);
777
	else {
778
		r = -1;
779
		XSEGLOG2(&lc, E, "Invalid op %u", req->op);
780
	}
781

  
782
out:
783
	if (r < 0){
784
		XSEGLOG2(&lc, E, "Failing pr %p", pr);
785
		cached_fail(peer, pr);
786
	}
787
}
788

  
789
/*
662 790
 * handle_readwrite is called when we accept a request.
663 791
 * Its purpose is to find a handler associated with the request's target
664 792
 * object (partial cache hit), or to allocate a new one (cache_miss) and insert
665 793
 * it in xcache.
666 794
 *
667
 * Depending on the op type, a handler function is enqueued in the workq of the
668
 * target's cache_entry.
795
 * Then, we wait until the returned cache entry is ready.
669 796
 */
670 797
static int handle_readwrite(struct peerd *peer, struct peer_req *pr)
671 798
{
672 799
	struct ce *ce;
673 800
	struct cached *cached = __get_cached(peer);
674
	struct cache_io *cio = __get_cache_io(pr);
801
	struct cache_io *ce_cio, *cio = __get_cache_io(pr);
675 802
	struct xseg_request *req = pr->req;
676 803
	char name[XSEG_MAX_TARGETLEN + 1];
677 804
	char *target;
......
722 849
		XSEGLOG2(&lc, E, "Received cache entry handler %lu but no cache entry", h);
723 850
		goto out;
724 851
	}
852

  
725 853
	cio->h = h;
726 854

  
727
	if (req->op == X_WRITE)
728
		r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr);
729
	else if (req->op == X_READ)
730
		r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr);
731
	else {
732
		r = -1;
733
		XSEGLOG2(&lc, E, "Invalid op %u", req->op);
734
		goto out;
735
	}
855
	ce_cio = ce->pr.priv;
856
	ce_cio->h = h;
857

  
858
	/* wait for the cache_entry to be ready */
859
	cio->work.job_fn = handle_readwrite_post;
860
	cio->work.job = pr;
861
	r = xwaitq_enqueue(&ce->waitq, &cio->work);
736 862

  
737 863
out:
738 864
	if (r < 0){
......
743 869

  
744 870
}
745 871

  
746
struct req_completion{
747
	struct peer_req *pr;
748
	struct xseg_request *req;
749
};
750

  
751 872
/*
752 873
 * complete_read is called when we receive a reply from a request issued by
753 874
 * rw_range. The process mentioned below applies only to buckets previously
......
757 878
 * If not, we mark serviced buckets as VALID, non-serviced buckets as INVALID
758 879
 * and the cio is failed
759 880
 *
760
 * At any point when a bucket status changes, we signal the respective waitq.
881
 * At any point when a bucket bucket_status changes, we signal the respective waitq.
761 882
 */
762 883
static void complete_read(void *arg)
763 884
{
......
810 931

  
811 932
	/* Check serviced buckets */
812 933
	for (i = start; i <= end_serviced && req->serviced; i++) {
813
		if (ce->status[i] != LOADING)
934
		if (ce->bucket_status[i] != LOADING)
814 935
			continue;
815 936

  
816 937
		XSEGLOG2(&lc, D, "Bucket %lu loading and reception successful\n",i);
817 938
		memcpy(ce->data + (i * cached->bucket_size), data, cached->bucket_size);
818
		ce->status[i] = VALID;
819
		xwaitq_signal(&ce->waitq[i]);
939
		ce->bucket_status[i] = VALID;
940
		xwaitq_signal(&ce->bucket_waitq[i]);
820 941
	}
821 942

  
822 943
	/* Check non-serviced buckets */
823 944
	for (; i <= end_size; i++) {
824
		if (ce->status[i] != LOADING)
945
		if (ce->bucket_status[i] != LOADING)
825 946
			continue;
826 947

  
827 948
		XSEGLOG2(&lc, D, "Bucket %lu loading but reception unsuccessful\n", i);
828
		ce->status[i] = INVALID;
949
		ce->bucket_status[i] = INVALID;
829 950
		cio->state = CIO_FAILED;
830
		xwaitq_signal(&ce->waitq[i]);
951
		xwaitq_signal(&ce->bucket_waitq[i]);
831 952
	}
832 953

  
833 954
	xseg_put_request(peer->xseg, rc->req, pr->portno);
......
850 971
	struct cached *cached = __get_cached(peer);
851 972
	struct cache_io *cio = __get_cache_io(pr);
852 973
	struct ce *ce;
853
	uint32_t start, end_serviced, end_requested, i, first_bucket_offset;
854
	uint64_t data_start, data_len;
974
	uint32_t start, end_serviced, end_requested, i;
975
	uint64_t data_start, data_len, first_bucket_offset;
855 976
	char *req_data;
856 977
	int success;
857 978

  
......
880 1001
	end_serviced = __get_bucket(cached, req->offset + req->serviced - 1);
881 1002
	end_requested = __get_bucket(cached, req->offset + req->size - 1);
882 1003

  
883
	uint64_t first_bucket_offset = req->offset % cached->bucket_size;
1004
	first_bucket_offset = req->offset % cached->bucket_size;
884 1005
	req_data = xseg_get_data(peer->xseg, req);
885 1006

  
886 1007
	/*
......
889 1010
	 *		copy data to bucket
890 1011
	 *		mark as valid
891 1012
	 *	else if WRITEBACK
892
	 *		if status writing
1013
	 *		if bucket_status writing
893 1014
	 *			mark as valid
894 1015
	 *
895 1016
	 * No need to signal anything!
......
897 1018
	if (cached->write_policy == WRITETHROUGH){
898 1019
		data_start = start * cached->bucket_size + first_bucket_offset;
899 1020
		data_len = cached->bucket_size - first_bucket_offset;
900
		memcpy(ce->data[data_start], req_data, data_len);
1021
		memcpy(&ce->data[data_start], req_data, data_len);
901 1022
	} else if (cached->write_policy == WRITEBACK) {
902 1023
		;
903 1024
	}
904
	ce->status[start] = VALID;
1025
	ce->bucket_status[start] = VALID;
905 1026
	for (i = start+1; i <= end_serviced; i++) {
906 1027
		if (cached->write_policy == WRITETHROUGH){
907 1028
			data_start = cached->bucket_size * (i - start) +
......
910 1031
				data_len = cached->bucket_size;
911 1032
			else
912 1033
				data_len = req->size - data_start;
913
			memcpy(ce->data[cached->bucket_size * i],
914
					req_data[data_start], data_len);
915
			ce->status[i] = VALID;
1034
			memcpy(&ce->data[cached->bucket_size * i],
1035
					&req_data[data_start], data_len);
1036
			ce->bucket_status[i] = VALID;
916 1037
		} else if (cached->write_policy == WRITEBACK) {
917
			if (ce->status[i] == WRITING)
918
				ce->status[i] = VALID;
1038
			if (ce->bucket_status[i] == WRITING)
1039
				ce->bucket_status[i] = VALID;
919 1040
		}
920 1041
	}
921 1042
out:
......
970 1091
	return 0;
971 1092
}
972 1093

  
973
static int handle_receive_write(struct peerd *peer, struct peer_req *pr)
1094
static int handle_receive_write(struct peerd *peer, struct peer_req *pr,
1095
			struct xseg_request *req)
974 1096
{
975 1097
	XSEGLOG2(&lc, I, "Started\n");
976 1098
	/*

Also available in: Unified diff