Revision 0c97ba3e xseg/peers/user/cached.c
b/xseg/peers/user/cached.c | ||
---|---|---|
56 | 56 |
#define DIRTY 3 |
57 | 57 |
#define WRITING 4 |
58 | 58 |
|
59 |
#define bucket_readable(__status) \ |
|
60 |
(__status == VALID || __status == DIRTY || __status == WRITING)
|
|
59 |
#define bucket_readable(__bucket_status) \
|
|
60 |
(__bucket_status == VALID || __bucket_status == DIRTY || __bucket_status == WRITING)
|
|
61 | 61 |
|
62 | 62 |
/* write policies */ |
63 | 63 |
#define WRITETHROUGH 1 |
64 | 64 |
#define WRITEBACK 2 |
65 | 65 |
|
66 |
#define WRITE_POLICY(__wcp) \
|
|
67 |
__wcp == WRITETHROUGH ? "writethrough" : \
|
|
68 |
__wcp == WRITEBACK ? "writeback" : \
|
|
66 |
#define WRITE_POLICY(__wcp) \ |
|
67 |
__wcp == WRITETHROUGH ? "writethrough" : \
|
|
68 |
__wcp == WRITEBACK ? "writeback" : \
|
|
69 | 69 |
"undefined" |
70 | 70 |
|
71 | 71 |
|
72 |
|
|
73 |
/* object states */ |
|
74 |
#define INVALIDATED (1 << 0) |
|
75 |
|
|
76 |
|
|
77 | 72 |
/* cio states */ |
78 | 73 |
#define CIO_FAILED 1 |
79 | 74 |
#define CIO_ACCEPTED 2 |
80 | 75 |
#define CIO_READING 3 |
81 | 76 |
|
77 |
/* ce states */ |
|
78 |
#define CE_READY 1 |
|
79 |
#define CE_WRITING 2 |
|
80 |
#define CE_FAILED 3 |
|
81 |
#define CE_INVALIDATED 4 |
|
82 |
#define CE_NOT_READY 5 |
|
83 |
|
|
82 | 84 |
#define BUCKET_SIZE_QUANTUM 4096 |
83 | 85 |
|
84 | 86 |
struct cache_io { |
... | ... | |
102 | 104 |
|
103 | 105 |
struct ce { |
104 | 106 |
unsigned char *data; |
105 |
uint32_t *status; |
|
106 |
struct xwaitq *waitq; |
|
107 |
uint32_t flags; |
|
108 |
struct xlock lock; |
|
109 |
struct xworkq workq; |
|
107 |
uint32_t *bucket_status; /* bucket_status of each bucket */ |
|
108 |
struct xwaitq *bucket_waitq; /* waitq of each bucket */ |
|
109 |
uint32_t status; /* cache entry status */ |
|
110 |
struct xlock lock; /* cache entry lock */ |
|
111 |
struct xworkq workq; /* workq of the cache entry */ |
|
112 |
struct xwaitq waitq; /* waitq of the cache entry */ |
|
110 | 113 |
struct peer_req pr; |
111 | 114 |
}; |
112 | 115 |
|
116 |
struct req_completion{ |
|
117 |
struct peer_req *pr; |
|
118 |
struct xseg_request *req; |
|
119 |
}; |
|
120 |
|
|
121 |
struct eviction_arg { |
|
122 |
struct peerd *peer; |
|
123 |
struct ce *evicted; |
|
124 |
struct ce *new_entry; |
|
125 |
}; |
|
126 |
|
|
127 |
|
|
113 | 128 |
|
114 | 129 |
/* |
115 | 130 |
* Helper functions |
... | ... | |
150 | 165 |
|
151 | 166 |
static int is_not_loading(void *arg) |
152 | 167 |
{ |
153 |
uint32_t *status = (uint32_t *)arg; |
|
154 |
return (*status != LOADING); |
|
168 |
uint32_t *bucket_status = (uint32_t *)arg;
|
|
169 |
return (*bucket_status != LOADING);
|
|
155 | 170 |
} |
156 | 171 |
|
172 |
static int cache_entry_ready(void *arg) |
|
173 |
{ |
|
174 |
struct ce *ce = (struct ce *)arg; |
|
175 |
return (ce->status == CE_READY); |
|
176 |
} |
|
157 | 177 |
|
158 | 178 |
static void print_cached(struct cached *cached) |
159 | 179 |
{ |
... | ... | |
288 | 308 |
* //Paste data |
289 | 309 |
* req_data = xseg_get_data(xseg, req); |
290 | 310 |
* memcpy(req_data, req_old->data, size); |
311 |
* for (i=start; i<=end; i++){ |
|
312 |
* ce->bucket_status[i] = WRITING; |
|
313 |
* } |
|
291 | 314 |
*/ |
292 | 315 |
} else { |
293 | 316 |
for (i=start; i<=end; i++){ |
294 |
ce->status[i] = LOADING; |
|
317 |
ce->bucket_status[i] = LOADING;
|
|
295 | 318 |
} |
296 | 319 |
} |
297 | 320 |
|
... | ... | |
325 | 348 |
struct peerd *peer = (struct peerd *)c; |
326 | 349 |
struct cached *cached = peer->priv; |
327 | 350 |
struct ce *ce = (struct ce *)e; |
328 |
ce->flags = 0; |
|
351 |
struct cache_io *ce_cio = ce->pr.priv; |
|
352 |
ce->status = CE_NOT_READY; |
|
329 | 353 |
memset(ce->data, 0, cached->object_size); |
330 | 354 |
for (i = 0; i < cached->buckets_per_object; i++) { |
331 |
ce->status[i] = INVALID; |
|
355 |
ce->bucket_status[i] = INVALID;
|
|
332 | 356 |
} |
333 |
ce->pr.cio->h = NoEntry;
|
|
334 |
ce->pr.cio->pending_reqs;
|
|
357 |
ce_cio->h = NoEntry;
|
|
358 |
ce_cio->pending_reqs = 0;
|
|
335 | 359 |
xlock_release(&ce->lock); |
336 | 360 |
return 0; |
337 | 361 |
} |
338 | 362 |
|
339 |
void on_put(void *c, void *e)
|
|
363 |
void eviction_work(void *arg)
|
|
340 | 364 |
{ |
341 |
struct peerd *peer = (struct peerd *)c; |
|
365 |
/* |
|
366 |
* In this context we hold a reference to the evicted cache entry and |
|
367 |
* the new cache entry. |
|
368 |
* Evicted cache entry lock is also held. |
|
369 |
*/ |
|
370 |
struct eviction_arg *earg = (struct eviction_arg *)arg; |
|
371 |
struct peerd *peer = earg->peer;; |
|
342 | 372 |
struct cached *cached = peer->priv; |
343 |
struct ce *ce = (struct ce *)e; |
|
344 |
//since we are the last referrer to the cache entry |
|
345 |
//no lock is needed. |
|
346 |
|
|
347 |
XSEGLOG2(&lc, D, "Putting cache entry %p", e); |
|
348 |
|
|
373 |
struct ce *ce_evicted = earg->evicted; |
|
374 |
struct ce *ce_new = earg->new_entry; |
|
375 |
struct cache_io *ce_cio = ce_evicted->pr.priv; |
|
349 | 376 |
uint32_t start, end, i = 0; |
350 |
if (cached->write_policy == WRITETHROUGH || ce->flags & INVALIDATED) |
|
377 |
|
|
378 |
/* recheck status here, in case there was a race */ |
|
379 |
if (ce_evicted->status & CE_INVALIDATED){ |
|
380 |
ce_new->status = CE_READY; |
|
381 |
xwaitq_signal(&ce_new->waitq); |
|
351 | 382 |
return; |
352 |
//write all dirty buckets. |
|
383 |
} |
|
384 |
|
|
353 | 385 |
while(i < cached->buckets_per_object){ |
354 |
if (ce->status[i] != DIRTY){
|
|
386 |
if (ce_evicted->bucket_status[i] != DIRTY){
|
|
355 | 387 |
i++; |
356 | 388 |
continue; |
357 | 389 |
} |
358 | 390 |
start = i; |
359 | 391 |
while (i < cached->buckets_per_object && |
360 | 392 |
(i-start)*cached->bucket_size < cached->max_req_size && |
361 |
ce->status[i] == DIRTY){
|
|
393 |
ce_evicted->bucket_status[i] == DIRTY){
|
|
362 | 394 |
i++; |
363 | 395 |
} |
364 | 396 |
end = i; |
365 |
rw_range(cached, &ce->pr, 1, start, end); |
|
366 |
ce->pr.cio->pending_reqs++; |
|
397 |
rw_range(peer, &ce_evicted->pr, 1, start, end); |
|
398 |
ce_cio->pending_reqs++; |
|
399 |
} |
|
400 |
if (!ce_cio->pending_reqs){ |
|
401 |
xcache_put(cached->cache, ce_cio->h); |
|
402 |
ce_cio->h = NoEntry; |
|
403 |
ce_new->status = CE_READY; |
|
404 |
free(earg); |
|
405 |
xwaitq_signal(&ce_new->waitq); |
|
367 | 406 |
} |
407 |
|
|
408 |
} |
|
409 |
|
|
410 |
int on_evict(void *c, void *evicted, void *new_entry) |
|
411 |
{ |
|
412 |
struct peerd *peer = (struct peerd *)c; |
|
413 |
struct cached *cached = peer->priv; |
|
414 |
struct ce *ce_evicted = (struct ce *)evicted; |
|
415 |
struct ce *ce_new = (struct ce *)new_entry; |
|
416 |
struct cache_io *ce_cio = (struct cache_io *)ce_evicted->pr.priv; |
|
417 |
struct eviction_arg *earg; |
|
418 |
|
|
419 |
/* |
|
420 |
* Since write policy doesn't change and after a cache entry gets |
|
421 |
* invalidated, it cannot never be valid again, it is safe to procceed |
|
422 |
* without the cache entry lock. |
|
423 |
*/ |
|
424 |
if (cached->write_policy != WRITEBACK || |
|
425 |
ce_evicted->status & CE_INVALIDATED){ |
|
426 |
xcache_put(cached->cache, ce_cio->h); |
|
427 |
ce_cio->h = NoEntry; |
|
428 |
ce_new->status = CE_READY; |
|
429 |
xwaitq_signal(&ce_new->waitq); |
|
430 |
return 0; |
|
431 |
} |
|
432 |
|
|
433 |
earg = malloc(sizeof(struct eviction_arg)); |
|
434 |
if (!earg) |
|
435 |
return -1; |
|
436 |
earg->peer = (struct peerd *)peer; |
|
437 |
earg->evicted = ce_evicted; |
|
438 |
earg->new_entry = ce_new; |
|
439 |
|
|
440 |
/* In all other cases, we should have the cache entry lock */ |
|
441 |
if (xworkq_enqueue(&ce_evicted->workq, eviction_work, (void *)earg) < 0){ |
|
442 |
return -1; |
|
443 |
} |
|
444 |
return 1; |
|
445 |
} |
|
446 |
|
|
447 |
void on_put(void *c, void *e) |
|
448 |
{ |
|
449 |
//since we are the last referrer to the cache entry |
|
450 |
//no lock is needed. |
|
451 |
|
|
452 |
XSEGLOG2(&lc, D, "Putting cache entry %p", e); |
|
368 | 453 |
} |
369 | 454 |
|
370 | 455 |
void * init_node(void *c) |
... | ... | |
379 | 464 |
xlock_release(&ce->lock); |
380 | 465 |
|
381 | 466 |
ce->data = malloc(sizeof(unsigned char) * cached->object_size); |
382 |
ce->status = malloc(sizeof(uint32_t) * cached->buckets_per_object); |
|
383 |
ce->waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object); |
|
384 |
ce->pr.priv = malloc(sizeof(struct cio)); |
|
385 |
if (!ce->data || !ce->status || !ce->waitq || !ce->pr.priv)
|
|
467 |
ce->bucket_status = malloc(sizeof(uint32_t) * cached->buckets_per_object);
|
|
468 |
ce->bucket_waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object);
|
|
469 |
ce->pr.priv = malloc(sizeof(struct cache_io));
|
|
470 |
if (!ce->data || !ce->bucket_status || !ce->bucket_waitq || !ce->pr.priv)
|
|
386 | 471 |
goto ce_fields_fail; |
387 | 472 |
|
388 | 473 |
ce->pr.peer = peer; |
389 | 474 |
for (i = 0; i < cached->buckets_per_object; i++) { |
390 |
xwaitq_init(&ce->waitq[i], is_not_loading, &ce->status[i],
|
|
391 |
XWAIT_SIGNAL_ONE); |
|
475 |
xwaitq_init(&ce->bucket_waitq[i], is_not_loading,
|
|
476 |
&ce->bucket_status[i], XWAIT_SIGNAL_ONE);
|
|
392 | 477 |
} |
478 |
xwaitq_init(&ce->waitq, cache_entry_ready, ce, XWAIT_SIGNAL_ONE); |
|
393 | 479 |
xworkq_init(&ce->workq, &ce->lock, 0); |
394 | 480 |
return ce; |
395 | 481 |
|
396 | 482 |
ce_fields_fail: |
397 | 483 |
free(ce->data); |
398 |
free(ce->status); |
|
399 |
free(ce->waitq); |
|
484 |
free(ce->bucket_status);
|
|
485 |
free(ce->bucket_waitq);
|
|
400 | 486 |
free(ce); |
401 | 487 |
ce_fail: |
402 | 488 |
perror("malloc"); |
... | ... | |
405 | 491 |
|
406 | 492 |
struct xcache_ops c_ops = { |
407 | 493 |
.on_init = on_init, |
494 |
.on_evict = on_evict, |
|
408 | 495 |
.on_put = on_put, |
409 | 496 |
.on_node_init = init_node |
410 | 497 |
}; |
... | ... | |
413 | 500 |
uint32_t limit) |
414 | 501 |
{ |
415 | 502 |
uint32_t end = start + 1; |
416 |
while (end <= limit && ce->status[end] == INVALID) |
|
503 |
while (end <= limit && ce->bucket_status[end] == INVALID)
|
|
417 | 504 |
end++; |
418 | 505 |
return (end - 1); |
419 | 506 |
} |
... | ... | |
442 | 529 |
|
443 | 530 |
static void handle_read(void *arg); |
444 | 531 |
//is this necessary? |
445 |
static void status_changed(void *arg) |
|
532 |
static void bucket_status_changed(void *arg)
|
|
446 | 533 |
{ |
447 | 534 |
/* |
448 | 535 |
* In this context we hold a reference to the cache entry. |
... | ... | |
516 | 603 |
|
517 | 604 |
XSEGLOG2(&lc, D, "Start: %lu, Limit %lu", b, limit ); |
518 | 605 |
for (i = b; i <= limit; i++) { |
519 |
if (bucket_readable(ce->status[i])) |
|
606 |
if (bucket_readable(ce->bucket_status[i]))
|
|
520 | 607 |
continue; |
521 |
if (ce->status[i] != LOADING){ |
|
608 |
if (ce->bucket_status[i] != LOADING){
|
|
522 | 609 |
XSEGLOG2(&lc, D, "Found invalid bucket %lu\n", i); |
523 | 610 |
start_bucket = i; |
524 | 611 |
end_bucket = __get_last_invalid(ce, start_bucket, limit); |
... | ... | |
539 | 626 |
cio->work.job_fn = handle_read; |
540 | 627 |
cio->work.job = pr; |
541 | 628 |
/* wait on the last bucket */ |
542 |
xwaitq_enqueue(&ce->waitq[end_bucket], &cio->work); |
|
629 |
xwaitq_enqueue(&ce->bucket_waitq[end_bucket], &cio->work);
|
|
543 | 630 |
return; |
544 | 631 |
} |
545 | 632 |
|
... | ... | |
570 | 657 |
struct cached *cached = __get_cached(peer); |
571 | 658 |
struct cache_io *cio = __get_cache_io(pr); |
572 | 659 |
struct ce *ce = get_cache_entry(cached->cache, cio->h); |
573 |
(void)ce;
|
|
660 |
struct xseg_request *req = pr->req;
|
|
574 | 661 |
|
575 | 662 |
char *req_data; |
576 | 663 |
uint32_t start_bucket, end_bucket, last_read_bucket = -1; |
577 | 664 |
uint32_t i; |
665 |
uint64_t data_len, data_start; |
|
578 | 666 |
uint64_t first_bucket_offset = req->offset % cached->bucket_size; |
579 | 667 |
|
580 | 668 |
|
581 | 669 |
//what about FUA? |
582 | 670 |
|
583 |
uint32_t pending_buckets = 0; |
|
584 |
start = __get_bucket(cached, req->offset); |
|
585 |
end = __get_bucket(cached, req->offset + req->size - 1); |
|
671 |
start_bucket = __get_bucket(cached, req->offset); |
|
672 |
end_bucket = __get_bucket(cached, req->offset + req->size - 1); |
|
586 | 673 |
/* |
587 | 674 |
* In case of a misaligned write, if the start, end buckets of the write |
588 | 675 |
* are invalid, we have to read them before continue with the write. |
589 | 676 |
*/ |
590 |
if (ce->status[start] == INVALID && first_bucket_offset){
|
|
591 |
if (rw_range(peer, pr, 0, start, start) < 0){
|
|
677 |
if (ce->bucket_status[start_bucket] == INVALID && first_bucket_offset){
|
|
678 |
if (rw_range(peer, pr, 0, start_bucket, start_bucket) < 0){
|
|
592 | 679 |
cio->state = CIO_FAILED; |
593 | 680 |
goto out; |
594 | 681 |
} |
595 | 682 |
cio->pending_reqs++; |
596 |
last_read_bucket = start; |
|
683 |
last_read_bucket = start_bucket;
|
|
597 | 684 |
} |
598 |
if (start != end && ce->status[end] == INVALID &&
|
|
685 |
if (start_bucket != end_bucket && ce->bucket_status[end_bucket] == INVALID &&
|
|
599 | 686 |
(req->offset + req->size -1) % cached->bucket_size){ |
600 |
if (rw_range(peer, pr, 0, end, end) < 0){
|
|
687 |
if (rw_range(peer, pr, 0, end_bucket, end_bucket) < 0){
|
|
601 | 688 |
cio->state = CIO_FAILED; |
602 | 689 |
goto out; |
603 | 690 |
} |
604 | 691 |
cio->pending_reqs++; |
605 |
last_read_bucket = end; |
|
692 |
last_read_bucket = end_bucket;
|
|
606 | 693 |
} |
607 | 694 |
|
608 | 695 |
if (last_read_bucket != -1){ |
... | ... | |
610 | 697 |
cio->work.job = pr; |
611 | 698 |
/* wait on the last read bucket */ |
612 | 699 |
XSEGLOG2(&lc, I, "Enqueuing cio %p in waitq (fn: handle_write).\n", cio); |
613 |
xwaitq_enqueue(&ce->waitq[last_read_bucket], &cio->work); |
|
700 |
xwaitq_enqueue(&ce->bucket_waitq[last_read_bucket], &cio->work);
|
|
614 | 701 |
XSEGLOG2(&lc, I, "Handle_write returned after enqueuing cio %p in waitq.\n", cio); |
615 | 702 |
return; |
616 | 703 |
} |
... | ... | |
626 | 713 |
* mark them as dirty |
627 | 714 |
*/ |
628 | 715 |
/* special care for the first bucket */ |
629 |
data_start = cached->bucket_size * start + first_bucket_offset; |
|
716 |
data_start = cached->bucket_size * start_bucket + first_bucket_offset;
|
|
630 | 717 |
data_len = cached->bucket_size - first_bucket_offset; |
631 |
memcpy(ce->data[data_start], req_data, datalen);
|
|
632 |
ce->status[start] = DIRTY;
|
|
718 |
memcpy(&ce->data[data_start], req_data, data_len);
|
|
719 |
ce->bucket_status[start_bucket] = DIRTY;
|
|
633 | 720 |
/* and the rest */ |
634 |
for (i = start+1; i <= end; i++) {
|
|
635 |
data_start = cached->bucket_size * (i-start) + |
|
721 |
for (i = start_bucket + 1; i <= end_bucket; i++) {
|
|
722 |
data_start = cached->bucket_size * (i-start_bucket) +
|
|
636 | 723 |
first_bucket_offset; |
637 | 724 |
if (data_start + cached->bucket_size <= req->size) |
638 | 725 |
data_len = cached->bucket_size; |
639 | 726 |
else |
640 | 727 |
data_len = req->size - data_start; |
641 |
memcpy(ce->data[i * cached->bucket_size], |
|
642 |
req_data[data_start], data_len); |
|
643 |
ce->status[i] = DIRTY; |
|
728 |
memcpy(&ce->data[i * cached->bucket_size],
|
|
729 |
&req_data[data_start], data_len);
|
|
730 |
ce->bucket_status[i] = DIRTY;
|
|
644 | 731 |
} |
645 | 732 |
} else { |
646 | 733 |
cio->state = CIO_FAILED; |
... | ... | |
659 | 746 |
} |
660 | 747 |
|
661 | 748 |
/* |
749 |
* Assert cache entry is ready. |
|
750 |
* |
|
751 |
* Depending on the op type, a handler function is enqueued in the workq of the |
|
752 |
* target's cache_entry. |
|
753 |
*/ |
|
754 |
static void handle_readwrite_post(void *arg) |
|
755 |
{ |
|
756 |
/* |
|
757 |
* In this context, we hold a reference to the associated cache entry. |
|
758 |
*/ |
|
759 |
struct ce *ce; |
|
760 |
struct peer_req *pr = (struct peer_req *)arg; |
|
761 |
struct peerd *peer = pr->peer; |
|
762 |
// struct cached *cached = __get_cached(peer); |
|
763 |
// struct cache_io *ce_cio, *cio = __get_cache_io(pr); |
|
764 |
struct xseg_request *req = pr->req; |
|
765 |
int r = 0; |
|
766 |
|
|
767 |
if (ce->status != CE_READY){ |
|
768 |
XSEGLOG2(&lc, E, "Cache entry %p has status %u", ce, ce->status); |
|
769 |
r = -1; |
|
770 |
//FIXME defer request ? |
|
771 |
goto out; |
|
772 |
} |
|
773 |
if (req->op == X_WRITE) |
|
774 |
r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr); |
|
775 |
else if (req->op == X_READ) |
|
776 |
r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr); |
|
777 |
else { |
|
778 |
r = -1; |
|
779 |
XSEGLOG2(&lc, E, "Invalid op %u", req->op); |
|
780 |
} |
|
781 |
|
|
782 |
out: |
|
783 |
if (r < 0){ |
|
784 |
XSEGLOG2(&lc, E, "Failing pr %p", pr); |
|
785 |
cached_fail(peer, pr); |
|
786 |
} |
|
787 |
} |
|
788 |
|
|
789 |
/* |
|
662 | 790 |
* handle_readwrite is called when we accept a request. |
663 | 791 |
* Its purpose is to find a handler associated with the request's target |
664 | 792 |
* object (partial cache hit), or to allocate a new one (cache_miss) and insert |
665 | 793 |
* it in xcache. |
666 | 794 |
* |
667 |
* Depending on the op type, a handler function is enqueued in the workq of the |
|
668 |
* target's cache_entry. |
|
795 |
* Then, we wait until the returned cache entry is ready. |
|
669 | 796 |
*/ |
670 | 797 |
static int handle_readwrite(struct peerd *peer, struct peer_req *pr) |
671 | 798 |
{ |
672 | 799 |
struct ce *ce; |
673 | 800 |
struct cached *cached = __get_cached(peer); |
674 |
struct cache_io *cio = __get_cache_io(pr); |
|
801 |
struct cache_io *ce_cio, *cio = __get_cache_io(pr);
|
|
675 | 802 |
struct xseg_request *req = pr->req; |
676 | 803 |
char name[XSEG_MAX_TARGETLEN + 1]; |
677 | 804 |
char *target; |
... | ... | |
722 | 849 |
XSEGLOG2(&lc, E, "Received cache entry handler %lu but no cache entry", h); |
723 | 850 |
goto out; |
724 | 851 |
} |
852 |
|
|
725 | 853 |
cio->h = h; |
726 | 854 |
|
727 |
if (req->op == X_WRITE) |
|
728 |
r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr); |
|
729 |
else if (req->op == X_READ) |
|
730 |
r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr); |
|
731 |
else { |
|
732 |
r = -1; |
|
733 |
XSEGLOG2(&lc, E, "Invalid op %u", req->op); |
|
734 |
goto out; |
|
735 |
} |
|
855 |
ce_cio = ce->pr.priv; |
|
856 |
ce_cio->h = h; |
|
857 |
|
|
858 |
/* wait for the cache_entry to be ready */ |
|
859 |
cio->work.job_fn = handle_readwrite_post; |
|
860 |
cio->work.job = pr; |
|
861 |
r = xwaitq_enqueue(&ce->waitq, &cio->work); |
|
736 | 862 |
|
737 | 863 |
out: |
738 | 864 |
if (r < 0){ |
... | ... | |
743 | 869 |
|
744 | 870 |
} |
745 | 871 |
|
746 |
struct req_completion{ |
|
747 |
struct peer_req *pr; |
|
748 |
struct xseg_request *req; |
|
749 |
}; |
|
750 |
|
|
751 | 872 |
/* |
752 | 873 |
* complete_read is called when we receive a reply from a request issued by |
753 | 874 |
* rw_range. The process mentioned below applies only to buckets previously |
... | ... | |
757 | 878 |
* If not, we mark serviced buckets as VALID, non-serviced buckets as INVALID |
758 | 879 |
* and the cio is failed |
759 | 880 |
* |
760 |
* At any point when a bucket status changes, we signal the respective waitq. |
|
881 |
* At any point when a bucket bucket_status changes, we signal the respective waitq.
|
|
761 | 882 |
*/ |
762 | 883 |
static void complete_read(void *arg) |
763 | 884 |
{ |
... | ... | |
810 | 931 |
|
811 | 932 |
/* Check serviced buckets */ |
812 | 933 |
for (i = start; i <= end_serviced && req->serviced; i++) { |
813 |
if (ce->status[i] != LOADING) |
|
934 |
if (ce->bucket_status[i] != LOADING)
|
|
814 | 935 |
continue; |
815 | 936 |
|
816 | 937 |
XSEGLOG2(&lc, D, "Bucket %lu loading and reception successful\n",i); |
817 | 938 |
memcpy(ce->data + (i * cached->bucket_size), data, cached->bucket_size); |
818 |
ce->status[i] = VALID; |
|
819 |
xwaitq_signal(&ce->waitq[i]); |
|
939 |
ce->bucket_status[i] = VALID;
|
|
940 |
xwaitq_signal(&ce->bucket_waitq[i]);
|
|
820 | 941 |
} |
821 | 942 |
|
822 | 943 |
/* Check non-serviced buckets */ |
823 | 944 |
for (; i <= end_size; i++) { |
824 |
if (ce->status[i] != LOADING) |
|
945 |
if (ce->bucket_status[i] != LOADING)
|
|
825 | 946 |
continue; |
826 | 947 |
|
827 | 948 |
XSEGLOG2(&lc, D, "Bucket %lu loading but reception unsuccessful\n", i); |
828 |
ce->status[i] = INVALID; |
|
949 |
ce->bucket_status[i] = INVALID;
|
|
829 | 950 |
cio->state = CIO_FAILED; |
830 |
xwaitq_signal(&ce->waitq[i]); |
|
951 |
xwaitq_signal(&ce->bucket_waitq[i]);
|
|
831 | 952 |
} |
832 | 953 |
|
833 | 954 |
xseg_put_request(peer->xseg, rc->req, pr->portno); |
... | ... | |
850 | 971 |
struct cached *cached = __get_cached(peer); |
851 | 972 |
struct cache_io *cio = __get_cache_io(pr); |
852 | 973 |
struct ce *ce; |
853 |
uint32_t start, end_serviced, end_requested, i, first_bucket_offset;
|
|
854 |
uint64_t data_start, data_len; |
|
974 |
uint32_t start, end_serviced, end_requested, i; |
|
975 |
uint64_t data_start, data_len, first_bucket_offset;
|
|
855 | 976 |
char *req_data; |
856 | 977 |
int success; |
857 | 978 |
|
... | ... | |
880 | 1001 |
end_serviced = __get_bucket(cached, req->offset + req->serviced - 1); |
881 | 1002 |
end_requested = __get_bucket(cached, req->offset + req->size - 1); |
882 | 1003 |
|
883 |
uint64_t first_bucket_offset = req->offset % cached->bucket_size;
|
|
1004 |
first_bucket_offset = req->offset % cached->bucket_size; |
|
884 | 1005 |
req_data = xseg_get_data(peer->xseg, req); |
885 | 1006 |
|
886 | 1007 |
/* |
... | ... | |
889 | 1010 |
* copy data to bucket |
890 | 1011 |
* mark as valid |
891 | 1012 |
* else if WRITEBACK |
892 |
* if status writing |
|
1013 |
* if bucket_status writing
|
|
893 | 1014 |
* mark as valid |
894 | 1015 |
* |
895 | 1016 |
* No need to signal anything! |
... | ... | |
897 | 1018 |
if (cached->write_policy == WRITETHROUGH){ |
898 | 1019 |
data_start = start * cached->bucket_size + first_bucket_offset; |
899 | 1020 |
data_len = cached->bucket_size - first_bucket_offset; |
900 |
memcpy(ce->data[data_start], req_data, data_len); |
|
1021 |
memcpy(&ce->data[data_start], req_data, data_len);
|
|
901 | 1022 |
} else if (cached->write_policy == WRITEBACK) { |
902 | 1023 |
; |
903 | 1024 |
} |
904 |
ce->status[start] = VALID; |
|
1025 |
ce->bucket_status[start] = VALID;
|
|
905 | 1026 |
for (i = start+1; i <= end_serviced; i++) { |
906 | 1027 |
if (cached->write_policy == WRITETHROUGH){ |
907 | 1028 |
data_start = cached->bucket_size * (i - start) + |
... | ... | |
910 | 1031 |
data_len = cached->bucket_size; |
911 | 1032 |
else |
912 | 1033 |
data_len = req->size - data_start; |
913 |
memcpy(ce->data[cached->bucket_size * i], |
|
914 |
req_data[data_start], data_len); |
|
915 |
ce->status[i] = VALID; |
|
1034 |
memcpy(&ce->data[cached->bucket_size * i],
|
|
1035 |
&req_data[data_start], data_len);
|
|
1036 |
ce->bucket_status[i] = VALID;
|
|
916 | 1037 |
} else if (cached->write_policy == WRITEBACK) { |
917 |
if (ce->status[i] == WRITING) |
|
918 |
ce->status[i] = VALID; |
|
1038 |
if (ce->bucket_status[i] == WRITING)
|
|
1039 |
ce->bucket_status[i] = VALID;
|
|
919 | 1040 |
} |
920 | 1041 |
} |
921 | 1042 |
out: |
... | ... | |
970 | 1091 |
return 0; |
971 | 1092 |
} |
972 | 1093 |
|
973 |
static int handle_receive_write(struct peerd *peer, struct peer_req *pr) |
|
1094 |
static int handle_receive_write(struct peerd *peer, struct peer_req *pr, |
|
1095 |
struct xseg_request *req) |
|
974 | 1096 |
{ |
975 | 1097 |
XSEGLOG2(&lc, I, "Started\n"); |
976 | 1098 |
/* |
Also available in: Unified diff