Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / cached.c @ cbae83fd

History | View | Annotate | Download (42 kB)

1
/*
2
 * Copyright 2013 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <unistd.h>
37
#include <sys/types.h>
38
#include <pthread.h>
39
#include <xseg/xseg.h>
40
#include <peer.h>
41
#include <time.h>
42
#include <xtypes/xlock.h>
43
#include <xtypes/xq.h>
44
#include <xtypes/xhash.h>
45
#include <xtypes/xworkq.h>
46
#include <xtypes/xwaitq.h>
47
#include <xseg/protocol.h>
48
#include <xtypes/xcache.h>
49

    
50
#define MAX_ARG_LEN 12
51

    
52
/* bucket states */
53
#define INVALID 0
54
#define LOADING 1
55
#define VALID   2
56
#define DIRTY   3
57
#define WRITING 4
58

    
59
#define bucket_readable(__bucket_status) \
60
        (__bucket_status == VALID || __bucket_status == DIRTY || __bucket_status == WRITING)
61

    
62
/* write policies */
63
#define WRITETHROUGH 1
64
#define WRITEBACK    2
65

    
66
#define WRITE_POLICY(__wcp)                                        \
67
        __wcp == WRITETHROUGH        ? "writethrough"        :        \
68
        __wcp == WRITEBACK        ? "writeback"                 :        \
69
        "undefined"
70

    
71

    
72
/* cio states */
73
#define CIO_FAILED        1
74
#define CIO_ACCEPTED        2
75
#define CIO_READING        3
76

    
77
/* ce states */
78
#define CE_READY        1
79
#define CE_WRITING        2
80
#define CE_FAILED        3
81
#define CE_INVALIDATED        4
82
#define CE_NOT_READY        5
83
#define CE_EVICTED        6
84

    
85
#define BUCKET_SIZE_QUANTUM 4096
86

    
87
struct cache_io {
88
        uint32_t state;
89
        xcache_handler h;
90
        uint32_t pending_reqs;
91
        struct work work;
92
};
93

    
94
struct cached {
95
        struct xcache *cache;
96
        uint64_t cache_size; /*Number of objects*/
97
        uint64_t max_req_size;
98
        uint32_t object_size; /*Bytes*/
99
        uint32_t bucket_size; /*In bytes*/
100
        uint32_t buckets_per_object;
101
        xport bportno;
102
        int write_policy;
103
        struct xworkq workq;
104
        //scheduler
105
};
106

    
107
struct ce {
108
        unsigned char *data;
109
        uint32_t *bucket_status;        /* bucket_status of each bucket */
110
        struct xwaitq *bucket_waitq;        /* waitq of each bucket */
111
        uint32_t status;                /* cache entry status */
112
        struct xlock lock;                /* cache entry lock */
113
        struct xworkq workq;                /* workq of the cache entry */
114
        struct xwaitq waitq;                /* waitq of the cache entry */
115
        struct peer_req pr;
116
        struct ce *evicted_by;
117
};
118

    
119
struct req_completion{
120
        struct peer_req *pr;
121
        struct xseg_request *req;
122
};
123

    
124
struct eviction_arg {
125
        struct peerd *peer;
126
        struct ce *evicted;
127
        struct ce *new_entry;
128
};
129

    
130

    
131

    
132
/*
133
 * Helper functions
134
 */
135

    
136
#define MIN(__a__, __b__) ((__a__ < __b__) ? __a__ : __b__)
137

    
138
static inline struct cached * __get_cached(struct peerd *peer)
139
{
140
        return (struct cached *) peer->priv;
141
}
142

    
143
static inline struct cache_io * __get_cache_io(struct peer_req *pr)
144
{
145
        return (struct cache_io *) pr->priv;
146
}
147

    
148
static inline uint32_t __calculate_size(struct cached *cached,
149
                uint32_t start, uint32_t end)
150
{
151
        return (end - start + 1) * cached->bucket_size;
152
}
153

    
154
static inline uint32_t __calculate_offset(struct cached *cached,
155
                uint32_t start)
156
{
157
        return start * cached->bucket_size;
158
}
159

    
160
static inline uint64_t __quantize(uint64_t num, uint32_t quantum)
161
{
162
        quantum--;
163
        return num & (uint64_t)(~quantum);
164
}
165

    
166
static uint32_t __get_bucket(struct cached *cache, uint64_t offset)
167
{
168
        return (offset / cache->bucket_size);
169
}
170

    
171
static int is_not_loading(void *arg)
172
{
173
        uint32_t *bucket_status = (uint32_t *)arg;
174
        return (*bucket_status != LOADING);
175
}
176

    
177
static int cache_entry_ready(void *arg)
178
{
179
        struct ce *ce = (struct ce *)arg;
180
        return (ce->status == CE_READY);
181
}
182

    
183
static void __set_buckets_status(struct ce *ce, uint32_t start, uint32_t end,
184
                uint32_t status)
185
{
186
        uint32_t i;
187
        for (i = start; i <= end; i++) {
188
                ce->bucket_status[i] = status;
189
        }
190
}
191

    
192
static void print_cached(struct cached *cached)
193
{
194
        if (!cached) {
195
                XSEGLOG2(&lc, W, "Struct cached is NULL\n");
196
                return;
197
        }
198

    
199
        XSEGLOG2(&lc, I, "Struct cached fields:\n"
200
                        "                     cache        = %p\n"
201
                        "                     cache_size   = %lu\n"
202
                        "                     max_req_size = %lu\n"
203
                        "                     object_size  = %lu\n"
204
                        "                     bucket_size  = %lu\n"
205
                        "                     bucks_per_obj= %lu\n"
206
                        "                     Bportno      = %d\n"
207
                        "                     write_policy = %s\n",
208
                        cached->cache, cached->cache_size, cached->max_req_size,
209
                        cached->object_size, cached->bucket_size,
210
                        cached->buckets_per_object, cached->bportno,
211
                        WRITE_POLICY(cached->write_policy));
212
}
213

    
214
int read_write_policy(char *write_policy)
215
{
216
        if (strcmp(write_policy, "writethrough") == 0)
217
                return WRITETHROUGH;
218
        if (strcmp(write_policy, "writeback") == 0)
219
                return WRITEBACK;
220
        return -1;
221
}
222

    
223
/*
224
 * Convert string to size in bytes.
225
 * If syntax is invalid, return 0. Values such as zero and non-integer
226
 * multiples of segment's page size should not be accepted.
227
 */
228
uint64_t str2num(char *str)
229
{
230
        char *unit;
231
        uint64_t num;
232

    
233
        num = strtoll(str, &unit, 10);
234
        if (strlen(unit) > 1) //Invalid syntax
235
                return 0;
236
        else if (strlen(unit) < 1) //Plain number in bytes
237
                return num;
238

    
239
        switch (*unit) {
240
                case 'g':
241
                case 'G':
242
                        num *= 1024;
243
                case 'm':
244
                case 'M':
245
                        num *= 1024;
246
                case 'k':
247
                case 'K':
248
                        num *= 1024;
249
                        break;
250
                default:
251
                        num = 0;
252
        }
253
        return num;
254
}
255

    
256
static void signal_waitq(void *q, void *arg)
257
{
258
        struct xwaitq *waitq = (struct xwaitq *)arg;
259
        xwaitq_signal(waitq);
260
}
261

    
262
/*
263
 * serve_req is called only when all the requested buckets are readable.
264
 */
265
static int serve_req(struct peerd *peer, struct peer_req *pr)
266
{
267
        struct cached *cached = __get_cached(peer);
268
        struct cache_io *cio = __get_cache_io(pr);
269
        struct ce *ce = xcache_get_entry(cached->cache, cio->h);
270
        struct xseg *xseg = peer->xseg;
271
        struct xseg_request *req = pr->req;
272
        char *req_data = xseg_get_data(xseg, req);
273

    
274
        XSEGLOG2(&lc, D, "Started\n");
275
        req->serviced = req->size;
276

    
277
        //assert req->serviced <= req->datalen
278
        memcpy(req_data, ce->data + req->offset, req->serviced);
279
        XSEGLOG2(&lc, D, "Finished\n");
280

    
281
        return 0;
282
}
283

    
284
/*
285
 * rw_range handles the issuing of requests to the blocker. Usually called when
286
 * we need to read(write) data from(to) slower media.
287
 *
288
 * read/write an object range in buckets.
289
 *
290
 * Associate the request with the given pr.
291
 *
292
 */
293
static int rw_range(struct peerd *peer, struct peer_req *pr, int write,
294
                xcache_handler h, uint32_t start, uint32_t end)
295
{
296
        struct cached *cached = __get_cached(peer);
297
        //struct cache_io *cio = __get_cache_io(pr);
298
        struct xseg_request *req;
299
        struct xseg *xseg = peer->xseg;
300
        xport srcport = pr->portno;
301
        xport dstport = cached->bportno;
302
        xport p;
303
        char *req_target, *req_data;
304
        int r;
305
        char *target;
306
        uint32_t targetlen;
307
        struct ce *ce;
308

    
309
        target = xcache_get_name(cached->cache, h);
310
        targetlen = strlen(target);
311
        ce = xcache_get_entry(cached->cache, h);
312

    
313
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
314
        if (!req) {
315
                XSEGLOG2(&lc, W, "Cannot get request\n");
316
                return -1;
317
        }
318

    
319
        req->size = __calculate_size(cached, start, end);
320
        req->offset = __calculate_offset(cached, start);
321
        if (write)
322
                req->op = X_WRITE;
323
        else
324
                req->op = X_READ;
325

    
326
        //Allocate enough space for the data and the target's name
327
        r = xseg_prep_request(xseg, req, targetlen, req->size);
328
        if (r < 0) {
329
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
330
                                targetlen, (unsigned long long)req->size);
331
                goto put_xseg_request;
332
        }
333

    
334
        req_target = xseg_get_target(xseg, req);
335
        strncpy(req_target, target, targetlen);
336

    
337
        if (req->op == X_WRITE) {
338
                 req_data = xseg_get_data(xseg, req);
339
                 memcpy(req_data, ce->data+req->offset, req->size);
340
        } else {
341
        }
342

    
343
        r = xseg_set_req_data(xseg, req, pr);
344
        if (r < 0) {
345
                XSEGLOG2(&lc, W, "Cannot set request data\n");
346
                goto put_xseg_request;
347
        }
348

    
349
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
350
        if (p == NoPort) {
351
                XSEGLOG2(&lc, W, "Cannot submit request\n");
352
                goto out_unset_data;
353
        }
354

    
355
        r = xseg_signal(xseg, p);
356

    
357
        return 0;
358

    
359
out_unset_data:
360
        xseg_set_req_data(xseg, req, NULL);
361
put_xseg_request:
362
        if (xseg_put_request(xseg, req, srcport))
363
                XSEGLOG2(&lc, W, "Cannot put request\n");
364
        return -1;
365
}
366

    
367
int on_init(void *c, void *e)
368
{
369
        uint32_t i;
370
        struct peerd *peer = (struct peerd *)c;
371
        struct cached *cached = peer->priv;
372
        struct ce *ce = (struct ce *)e;
373
        struct cache_io *ce_cio = ce->pr.priv;
374
        ce->status = CE_READY;
375
        memset(ce->data, 0, cached->object_size);
376
        for (i = 0; i < cached->buckets_per_object; i++) {
377
                ce->bucket_status[i] = INVALID;
378
        }
379
        ce_cio->h = NoEntry;
380
        ce_cio->pending_reqs = 0;
381
        //FIXME what here?
382
        ce_cio->state = CIO_ACCEPTED;
383
        ce->evicted_by = NULL;
384
        xlock_release(&ce->lock);
385
        return 0;
386
}
387

    
388
/*
389
 * Insertion/eviction process with writeback policy:
390
 *
391
 * Insertion evicts a cache entry.
392
 * On eviction gets called.
393
 *         If cache entry is invalidated then we have no write back to do.
394
 * Else we issue a write dirty pages work serialized on the object lock.
395
 *
396
 * Now all pending operations are serialized on the object work queue.
397
 * Since there are no read hazards with the dirty buckets write, they pose no
398
 * problem. But a pending write does, since it will not be written back to the
399
 * permament storage media when cache entry is put. So, for any pending write
400
 * operations on the evicted cache entry, we must treat them with write through
401
 * policy.
402
 *
403
 * When all pending operations on the evicted cache entry are done, on_put gets
404
 * called and the waiting cache entry can continue.
405
 */
406

    
407
void eviction_work(void *wq, void *arg)
408
{
409
        /*
410
         * In this context we hold a reference to the evicted cache entry and
411
         * the new cache entry.
412
         * Evicted cache entry lock is also held.
413
         */
414
        struct eviction_arg *earg = (struct eviction_arg *)arg;
415
        struct peerd *peer = earg->peer;;
416
        struct cached *cached = peer->priv;
417
        struct ce *ce_evicted = earg->evicted;
418
        struct ce *ce_new = earg->new_entry;
419
        struct cache_io *ce_cio = ce_evicted->pr.priv;
420
        uint32_t start, end, i = 0;
421

    
422
        /* recheck status here, in case there was a race */
423
        if (ce_evicted->status == CE_INVALIDATED){
424
                xcache_put(cached->cache, ce_cio->h);
425
                return;
426
        }
427

    
428
        /* write all dirty buckets */
429
        while(i < cached->buckets_per_object){
430
                if (ce_evicted->bucket_status[i] != DIRTY){
431
                        i++;
432
                        continue;
433
                }
434
                start = i;
435
                while (i < cached->buckets_per_object &&
436
                        (i-start)*cached->bucket_size < cached->max_req_size &&
437
                                ce_evicted->bucket_status[i] == DIRTY){
438
                        i++;
439
                }
440
                end = i-1;
441
                if (rw_range(peer, &ce_evicted->pr, 1, ce_cio->h, start, end) < 0){
442
                        ce_cio->state = CIO_FAILED;
443
                }
444
                __set_buckets_status(ce_evicted, start, end, WRITING);
445
                ce_cio->pending_reqs++;
446
        }
447

    
448
        /*
449
         * If no request was issued, eviction is now completed.
450
         * Notify the waiter, if failed.
451
         */
452
        if (!ce_cio->pending_reqs){
453
                if (ce_cio->state == CIO_FAILED){
454
                        ce_new->status = CE_FAILED;
455
                        ce_evicted->evicted_by = NULL;
456
                        xworkq_enqueue(&cached->workq, signal_waitq, &ce_new->waitq);
457
                } else {
458
                        xcache_put(cached->cache, ce_cio->h);
459
                }
460
        }
461
        free(earg);
462
}
463

    
464
int on_evict(void *c, void *evicted, void *new_entry)
465
{
466
        struct peerd *peer = (struct peerd *)c;
467
        struct cached *cached = peer->priv;
468
        struct ce *ce_evicted = (struct ce *)evicted;
469
        struct ce *ce_new = (struct ce *)new_entry;
470
        struct cache_io *ce_cio = (struct cache_io *)ce_evicted->pr.priv;
471
        struct eviction_arg *earg;
472

    
473
        /*
474
         * Since write policy doesn't change and after a cache entry gets
475
         * invalidated, it cannot never be valid again, it is safe to procceed
476
         * without the cache entry lock.
477
         */
478
        XSEGLOG2(&lc, D, "Evicting cache entry %p", evicted);
479
        ce_evicted->evicted_by = ce_new;
480
        if (cached->write_policy != WRITEBACK ||
481
                        ce_evicted->status == CE_INVALIDATED){
482
                return 0;
483
        }
484
        ce_evicted->status = CE_EVICTED;
485
        ce_new->status = CE_NOT_READY;
486

    
487
        earg = malloc(sizeof(struct eviction_arg));
488
        if (!earg){
489
                goto out_error;
490
        }
491
        earg->peer = (struct peerd *)peer;
492
        earg->evicted = ce_evicted;
493
        earg->new_entry = ce_new;
494

    
495
        /*
496
         * In all other cases, we should have the cache entry lock before
497
         * proceeding.
498
         */
499
        if (xworkq_enqueue(&ce_evicted->workq, eviction_work, (void *)earg) < 0){
500
                goto out_error_free;
501
        }
502
        xworkq_signal(&ce_evicted->workq);
503
        return 1;
504

    
505
out_error_free:
506
        free(earg);
507
out_error:
508
        /*
509
         * In case of an eviction error, do not put the old cache entry, but
510
         * mark as failed the new one.
511
         */
512
        ce_new->status = CE_FAILED;
513
        ce_evicted->evicted_by = NULL;
514
        xworkq_enqueue(&cached->workq, signal_waitq, &ce_new->waitq);
515
        return -1;
516
}
517

    
518
void on_put(void *c, void *e)
519
{
520
        //since we are the last referrer to the cache entry
521
        //no lock is needed.
522

    
523
        struct peerd *peer = (struct peerd *)c;
524
        struct cached *cached = peer->priv;
525
        struct ce *ce = (struct ce *)e;
526
        struct cache_io *ce_cio = (struct cache_io *)ce->pr.priv;
527

    
528
        XSEGLOG2(&lc, D, "Putting cache entry %p", e);
529
        if (ce->evicted_by){
530
                ce->evicted_by->status = CE_READY;
531
                xworkq_enqueue(&cached->workq, signal_waitq, &ce->evicted_by->waitq);
532
        }
533
}
534

    
535
void * init_node(void *c)
536
{
537
        int i;
538
        struct peerd *peer = (struct peerd *)c;
539
        struct cached *cached = peer->priv;
540

    
541
        struct ce *ce = malloc(sizeof(struct ce));
542
        if (!ce)
543
                goto ce_fail;
544
        xlock_release(&ce->lock);
545

    
546
        ce->data = malloc(sizeof(unsigned char) * cached->object_size);
547
        ce->bucket_status = malloc(sizeof(uint32_t) * cached->buckets_per_object);
548
        ce->bucket_waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object);
549
        ce->pr.priv = malloc(sizeof(struct cache_io));
550
        if (!ce->data || !ce->bucket_status || !ce->bucket_waitq || !ce->pr.priv)
551
                goto ce_fields_fail;
552

    
553
        ce->pr.peer = peer;
554
        ce->pr.portno = peer->portno_start;
555
        for (i = 0; i < cached->buckets_per_object; i++) {
556
                xwaitq_init(&ce->bucket_waitq[i], is_not_loading,
557
                                &ce->bucket_status[i], XWAIT_SIGNAL_ONE);
558
        }
559
        xwaitq_init(&ce->waitq, cache_entry_ready, ce, XWAIT_SIGNAL_ONE);
560
        xworkq_init(&ce->workq, &ce->lock, 0);
561
        return ce;
562

    
563
ce_fields_fail:
564
        free(ce->data);
565
        free(ce->bucket_status);
566
        free(ce->bucket_waitq);
567
        free(ce);
568
ce_fail:
569
        perror("malloc");
570
        return NULL;
571
}
572

    
573
struct xcache_ops c_ops = {
574
        .on_init = on_init,
575
        .on_evict = on_evict,
576
        .on_put  = on_put,
577
        .on_node_init = init_node
578
};
579

    
580
static uint32_t __get_last_invalid(struct ce *ce, uint32_t start,
581
                                        uint32_t limit)
582
{
583
        uint32_t end = start + 1;
584
        while (end <= limit && ce->bucket_status[end] == INVALID)
585
                end++;
586
        return (end - 1);
587
}
588

    
589
static void cached_fail(struct peerd *peer, struct peer_req *pr)
590
{
591
        struct cached *cached = __get_cached(peer);
592
        struct cache_io *cio = __get_cache_io(pr);
593
        if (cio->h != NoEntry){
594
                xcache_put(cached->cache, cio->h);
595
        }
596
        cio->h = NoEntry;
597
        fail(peer, pr);
598
}
599

    
600
static void cached_complete(struct peerd *peer, struct peer_req *pr)
601
{
602
        struct cached *cached = __get_cached(peer);
603
        struct cache_io *cio = __get_cache_io(pr);
604
        if (cio->h != NoEntry){
605
                xcache_put(cached->cache, cio->h);
606
        }
607
        cio->h = NoEntry;
608
        complete(peer, pr);
609
}
610

    
611
static void handle_read(void *q, void *arg);
612
static void handle_write(void *q, void *arg);
613
static int forward_req(struct peerd *peer, struct peer_req *pr,
614
                        struct xseg_request *req);
615

    
616
static void bucket_status_changed(void *q, void *arg)
617
{
618
        /*
619
         * In this context we hold a reference to the cache entry.
620
         *
621
         * This function gets called only after the bucket at which the
622
         * current peer_req is waiting, has finished loading of failed.
623
         *
624
         * Assumptions:
625
         *         Each pr waits only at one bucket at any time. That means that
626
         *         under no circumstances, this function get called simutaneously
627
         *         for the same pr.
628
         */
629
        struct peer_req *pr = (struct peer_req *)arg;
630
        struct peerd *peer = pr->peer;
631
        struct cached *cached = __get_cached(peer);
632
        struct cache_io *cio = __get_cache_io(pr);
633
        struct ce *ce = xcache_get_entry(cached->cache, cio->h);
634
        void (*fn)(void *q, void *arg);
635

    
636
        if (pr->req->op == X_WRITE){
637
                fn = handle_write;
638
        } else {
639
                fn = handle_read;
640
        }
641

    
642
        if (xworkq_enqueue(&ce->workq, fn, (void *)pr) < 0){
643
                //FAIL or mark as failed ? are we the last?
644
                if (cio->pending_reqs){
645
                        // cannot put here, since there are outstanding reqs to
646
                        // be received.
647
                        // Simply mark pr as failed.
648
                        cio->state = CIO_FAILED;
649
                } else {
650
                        //safe to fail here, since there is no pending action on
651
                        //this pr.
652
                        cached_fail(peer, pr);
653
                }
654
                return;
655
        }
656
        xworkq_signal(&ce->workq);
657
}
658

    
659
/*
660
 * handle_read reads all buckets within a given request's range.
661
 * If a bucket is:
662
 * VALID || DIRTY || WRITING: it's good to read.
663
 * INVALID: we have to issue a request (via blocker) to read it from slower
664
 *          media.
665
 * LOADING: We have to wait (on a waitq) for the slower media to answer our
666
 *          previous request.
667
 *
668
 * If unreadable buckets exist, it waits on the last unreadable bucket.
669
 */
670
static void handle_read(void *q, void *arg)
671
{
672
        /*
673
         * In this context we hold a reference to the cache entry and
674
         * the assocciated lock
675
         */
676

    
677
        struct peer_req *pr = (struct peer_req *)arg;
678
        struct peerd *peer = pr->peer;
679
        struct cached *cached = __get_cached(peer);
680
        struct cache_io *cio = __get_cache_io(pr);
681
        struct xseg_request *req = pr->req;
682
        struct ce *ce = xcache_get_entry(cached->cache, cio->h);
683

    
684
        uint32_t start_bucket, end_bucket;
685
        uint32_t i, b, limit;
686

    
687
        uint32_t pending_buckets = 0;
688

    
689
        XSEGLOG2(&lc, D, "Handle read started for %p (ce: %p)", pr, ce );
690
        if (cio->state == CIO_FAILED)
691
                goto out;
692
        b = __get_bucket(cached, req->offset);
693
        limit = __get_bucket(cached, req->offset + req->size - 1);
694
        //assert limit < cached->object_size
695

    
696

    
697
        XSEGLOG2(&lc, D, "Start: %lu, Limit %lu", b, limit );
698
        for (i = b; i <= limit; i++) {
699
                if (bucket_readable(ce->bucket_status[i]))
700
                        continue;
701
                if (ce->bucket_status[i] != LOADING){
702
                        XSEGLOG2(&lc, D, "Found invalid bucket %lu\n", i);
703
                        start_bucket = i;
704
                        end_bucket = __get_last_invalid(ce, start_bucket,
705
                                MIN(limit, cached->max_req_size / cached->bucket_size));
706
                        i = end_bucket + 1;
707
                        if (rw_range(peer, pr, 0, cio->h, start_bucket, end_bucket) < 0){
708
                                cio->state = CIO_FAILED;
709
                                break;
710
                        }
711
                        __set_buckets_status(ce, start_bucket, end_bucket, LOADING);
712
                        cio->pending_reqs++;
713
                        cio->state =  CIO_READING;
714
                }
715
                pending_buckets++;
716
        }
717

    
718
        if (pending_buckets) {
719
                XSEGLOG2(&lc, D, "Pending buckets exists: %u\n", pending_buckets);
720
                /* Do not put cache entry yet */
721
                cio->work.job_fn = bucket_status_changed;
722
                cio->work.job = pr;
723
                /* wait on the last bucket */
724
                xwaitq_enqueue(&ce->bucket_waitq[end_bucket], &cio->work);
725
                return;
726
        }
727

    
728
out:
729
        if (cio->state == CIO_FAILED){
730
                if (!cio->pending_reqs)
731
                        cached_fail(peer, pr);
732
        } else {
733
                if (serve_req(peer, pr)) {
734
                        XSEGLOG2(&lc, E,"Serve of request failed\n");
735
                        cached_fail(peer, pr);
736
                }
737
                cached_complete(peer, pr);
738
        }
739
        return;
740
}
741

    
742
static void handle_write(void *q, void *arg)
743
{
744
        /*
745
         * In this context we hold a reference to the cache entry and
746
         * the assocciated lock
747
         */
748

    
749
        struct peer_req *pr = (struct peer_req *)arg;
750
        struct peerd *peer = pr->peer;
751
        struct cached *cached = __get_cached(peer);
752
        struct cache_io *cio = __get_cache_io(pr);
753
        struct ce *ce = xcache_get_entry(cached->cache, cio->h);
754
        struct xseg_request *req = pr->req;
755

    
756
        char *req_data;
757
        uint32_t start_bucket, end_bucket, last_read_bucket = -1;
758
        uint32_t i;
759
        uint64_t data_len, data_start;
760
        uint64_t first_bucket_offset = req->offset % cached->bucket_size;
761

    
762

    
763
        //what about FUA?
764
        //
765
        //if invalidated, fake complete
766

    
767
        start_bucket = __get_bucket(cached, req->offset);
768
        end_bucket = __get_bucket(cached, req->offset + req->size - 1);
769
        /*
770
         * In case of a misaligned write, if the start, end buckets of the write
771
         * are invalid, we have to read them before continue with the write.
772
         */
773
        if (ce->bucket_status[start_bucket] == INVALID && first_bucket_offset){
774
                if (rw_range(peer, pr, 0, cio->h, start_bucket, start_bucket) < 0){
775
                        cio->state = CIO_FAILED;
776
                        goto out;
777
                }
778
                __set_buckets_status(ce, start_bucket, start_bucket, LOADING);
779
                cio->pending_reqs++;
780
                last_read_bucket = start_bucket;
781
        }
782
        if (ce->bucket_status[end_bucket] == INVALID &&
783
                        (req->offset + req->size) % cached->bucket_size){
784
                if (rw_range(peer, pr, 0, cio->h, end_bucket, end_bucket) < 0){
785
                        cio->state = CIO_FAILED;
786
                        goto out;
787
                }
788
                __set_buckets_status(ce, end_bucket, end_bucket, LOADING);
789
                cio->pending_reqs++;
790
                last_read_bucket = end_bucket;
791
        }
792

    
793
        if (last_read_bucket != -1){
794
                cio->work.job_fn = bucket_status_changed;
795
                cio->work.job = pr;
796
                /* wait on the last read bucket */
797
                XSEGLOG2(&lc, I, "Enqueuing cio %p in waitq (fn: handle_write).\n", cio);
798
                xwaitq_enqueue(&ce->bucket_waitq[last_read_bucket], &cio->work);
799
                XSEGLOG2(&lc, I, "Handle_write returned after enqueuing cio %p in waitq.\n", cio);
800
                return;
801
        }
802

    
803
        req_data = xseg_get_data(peer->xseg, pr->req);
804
        if (cached->write_policy == WRITETHROUGH){
805
                // || ce->status == CE_EVICTED
806
                //forward_req but make sure we don't put it on receive
807
                if (forward_req(peer, pr, pr->req) < 0){
808
                        cio->state = CIO_FAILED;
809
                        goto out;
810
                }
811
                return;
812
        } else if (cached->write_policy == WRITEBACK) {
813
                /*
814
                 * for each bucket
815
                 *         write all buckets
816
                 *         mark them as dirty
817
                 */
818
                /* special care for the first bucket */
819
                data_start = cached->bucket_size * start_bucket + first_bucket_offset;
820
                data_len = cached->bucket_size - first_bucket_offset;
821
                memcpy(&ce->data[data_start], req_data, data_len);
822
                ce->bucket_status[start_bucket] = DIRTY;
823
                /* and the rest */
824
                for (i = start_bucket + 1; i <= end_bucket; i++) {
825
                        data_start = cached->bucket_size * (i-start_bucket) +
826
                                first_bucket_offset;
827
                        if (data_start + cached->bucket_size <= req->size)
828
                                data_len = cached->bucket_size;
829
                        else
830
                                data_len = req->size - data_start;
831
                        memcpy(&ce->data[i * cached->bucket_size],
832
                                &req_data[data_start], data_len);
833
                        ce->bucket_status[i] = DIRTY;
834
                }
835
        } else {
836
                cio->state = CIO_FAILED;
837
                XSEGLOG2(&lc, E, "Invalid cache write policy");
838
        }
839

    
840
out:
841
        if (cio->state == CIO_FAILED){
842
                if (!cio->pending_reqs)
843
                        cached_fail(peer, pr);
844
        }
845
        else{
846
                cached_complete(peer, pr);
847
        }
848
        return;
849
}
850

    
851
/*
852
 * Assert cache entry is ready.
853
 *
854
 * Depending on the op type, a handler function is enqueued in the workq of the
855
 * target's cache_entry.
856
 */
857
static void handle_readwrite_post(void *q, void *arg)
858
{
859
        /*
860
         * In this context, we hold a reference to the associated cache entry.
861
         */
862
        struct ce *ce;
863
        struct peer_req *pr = (struct peer_req *)arg;
864
        struct peerd *peer = pr->peer;
865
        struct cached *cached = __get_cached(peer);
866
        struct cache_io *ce_cio, *cio = __get_cache_io(pr);
867
        struct xseg_request *req = pr->req;
868
        int r = 0;
869

    
870
        ce = xcache_get_entry(cached->cache, cio->h); 
871
        if (ce->status != CE_READY){
872
                XSEGLOG2(&lc, E, "Cache entry %p has status %u", ce, ce->status);
873
                r = -1;
874
                //FIXME defer request ?
875
                goto out;
876
        }
877
        if (req->op == X_WRITE)
878
                r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr);
879
        else if (req->op == X_READ)
880
                r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr);
881
        else {
882
                r = -1;
883
                XSEGLOG2(&lc, E, "Invalid op %u", req->op);
884
        }
885

    
886
out:
887
        if (r < 0){
888
                XSEGLOG2(&lc, E, "Failing pr %p", pr);
889
                cached_fail(peer, pr);
890
        } else {
891
                xworkq_signal(&ce->workq);
892
        }
893
}
894

    
895
/*
896
 * handle_readwrite is called when we accept a request.
897
 * Its purpose is to find a handler associated with the request's target
898
 * object (partial cache hit), or to allocate a new one (cache_miss) and insert
899
 * it in xcache.
900
 *
901
 * Then, we wait until the returned cache entry is ready.
902
 */
903
static int handle_readwrite(struct peerd *peer, struct peer_req *pr)
904
{
905
        struct ce *ce;
906
        struct cached *cached = __get_cached(peer);
907
        struct cache_io *ce_cio, *cio = __get_cache_io(pr);
908
        struct xseg_request *req = pr->req;
909
        char name[XSEG_MAX_TARGETLEN + 1];
910
        char *target;
911
        int r = -1;
912
        xcache_handler h = NoEntry, nh;
913

    
914
        //TODO: assert req->size != 0 --> complete req
915
        //assert (req->offset % cached->bucket_size) == 0;
916
        //NOTE: What about writes with req->size = 0?
917
        XSEGLOG2(&lc, D, "Started\n");
918

    
919
        target = xseg_get_target(peer->xseg, req);
920
        strncpy(name, target, req->targetlen);
921
        name[req->targetlen] = 0;
922
        XSEGLOG2(&lc, D, "Target is %s\n", name);
923

    
924
        h = xcache_lookup(cached->cache, name);
925
        if (h == NoEntry){
926
                XSEGLOG2(&lc, D, "Cache miss\n");
927
                h = xcache_alloc_init(cached->cache, name);
928
                if (h == NoEntry){
929
                        XSEGLOG2(&lc, E, "Could not allocate cache entry");
930
                        goto out;
931
                }
932
                nh = xcache_insert(cached->cache, h);
933
                if (nh == NoEntry){
934
                        xcache_put(cached->cache, h);
935
                        XSEGLOG2(&lc, E, "Could not insert cache entry");
936
                        goto out;
937
                }
938
                if (nh != h){
939
                        /* if insert returns another cache entry than the one we
940
                         * allocated and requested to be inserted, then
941
                         * someone else beat us to the insertion of a cache
942
                         * entry assocciated with the same name. Use this cache
943
                         * entry instead and put the one we allocated.
944
                         */
945
                        xcache_put(cached->cache, h);
946
                        h = nh;
947
                }
948
        } else {
949
                XSEGLOG2(&lc, D, "Cache hit\n");
950
        }
951

    
952
        ce = (struct ce *)xcache_get_entry(cached->cache, h);
953
        if (!ce){
954
                r = -1;
955
                XSEGLOG2(&lc, E, "Received cache entry handler %lu but no cache entry", h);
956
                goto out;
957
        }
958

    
959
        cio->h = h;
960

    
961
        ce_cio = ce->pr.priv;
962
        ce_cio->h = h;
963

    
964
        /* wait for the cache_entry to be ready */
965
        cio->work.job_fn = handle_readwrite_post;
966
        cio->work.job = pr;
967
        r = xwaitq_enqueue(&ce->waitq, &cio->work);
968

    
969
out:
970
        if (r < 0){
971
                XSEGLOG2(&lc, E, "Failing pr %p", pr);
972
                cached_fail(peer, pr);
973
        }
974
        return r;
975

    
976
}
977

    
978
/*
979
 * complete_read is called when we receive a reply from a request issued by
980
 * rw_range. The process mentioned below applies only to buckets previously
981
 * marked as LOADING:
982
 *
983
 * If all requested buckets are serviced, we mark these buckets as VALID.
984
 * If not, we mark serviced buckets as VALID, non-serviced buckets as INVALID
985
 * and the cio is failed
986
 *
987
 * At any point when a bucket bucket_status changes, we signal the respective waitq.
988
 */
989
static void complete_read(void *q, void *arg)
990
{
991
        /*
992
         * In this context we hold a reference to the cache entry and
993
         * the assocciated lock
994
         */
995
        struct req_completion *rc = (struct req_completion *)arg;
996
        struct peer_req *pr = rc->pr;
997
        struct xseg_request *req = rc->req;
998
        struct peerd *peer = pr->peer;
999
        struct cached *cached = __get_cached(peer);
1000
        struct cache_io *cio = __get_cache_io(pr);
1001
        struct ce *ce = xcache_get_entry(cached->cache, cio->h);
1002
        char *data = xseg_get_data(peer->xseg, req);
1003
        uint32_t start, end_serviced, end_size, i;
1004
        int success;
1005

    
1006
        XSEGLOG2(&lc, D, "Started\n");
1007
        XSEGLOG2(&lc, D, "Target: %s.\n Serviced vs total: %lu/%lu.\n",
1008
                        xseg_get_target(peer->xseg, req), req->serviced, req->size);
1009

    
1010

    
1011

    
1012
        /*
1013
         * Synchronize pending_reqs of the cache_io here, since each cache_io
1014
         * refers to only one object, and therefore we can use the object lock
1015
         * to synchronize between receive contextes.
1016
         */
1017
        cio->pending_reqs--;
1018

    
1019
        /* TODO: Assert req->size != 0 ? */
1020
        if (!req->size){
1021
                XSEGLOG2(&lc, E, "BUG: zero sized read");
1022
                cio->state = CIO_FAILED;
1023
                goto out;
1024
        }
1025

    
1026
        /* Check if request has been correctly served */
1027
        success = ((req->state & XS_SERVED) && req->serviced == req->size);
1028
        if (!success)
1029
                cio->state = CIO_FAILED;
1030
        if (success && req->serviced % cached->bucket_size) {
1031
                XSEGLOG2(&lc, E, "Misaligned successful read. Failing.");
1032
                success = 0;
1033
                cio->state = CIO_FAILED;
1034
        }
1035

    
1036
        start = __get_bucket(cached, req->offset);
1037
        end_size = __get_bucket(cached, req->offset + req->size - 1);
1038

    
1039

    
1040
        if (success)
1041
                end_serviced = __get_bucket(cached, req->offset + req->serviced - 1);
1042
        else
1043
                end_serviced = start;
1044

    
1045
        XSEGLOG2(&lc, D,"Stats: \n"
1046
                        "start        = %lu\n"
1047
                        "end_serviced = %lu\n"
1048
                        "end_size     = %lu\n",
1049
                        start, end_serviced, end_size);
1050

    
1051
        /* Check serviced buckets */
1052
        for (i = start; i <= end_serviced && success; i++) {
1053
                if (ce->bucket_status[i] != LOADING)
1054
                        continue;
1055

    
1056
                XSEGLOG2(&lc, D, "Bucket %lu loading and reception successful\n",i);
1057
                memcpy(ce->data + (i * cached->bucket_size), data + (i - start) * cached->bucket_size, cached->bucket_size);
1058
                ce->bucket_status[i] = VALID;
1059
                xworkq_enqueue(&cached->workq, signal_waitq, &ce->bucket_waitq[i]);
1060
        }
1061

    
1062
        /* Check non-serviced buckets */
1063
        for (; i <= end_size; i++) {
1064
                if (ce->bucket_status[i] != LOADING)
1065
                        continue;
1066

    
1067
                XSEGLOG2(&lc, D, "Bucket %lu loading but reception unsuccessful\n", i);
1068
                ce->bucket_status[i] = INVALID;
1069
                xworkq_enqueue(&cached->workq, signal_waitq, &ce->bucket_waitq[i]);
1070
        }
1071

    
1072
out:
1073
        xseg_put_request(peer->xseg, rc->req, pr->portno);
1074
        free(rc);
1075
        XSEGLOG2(&lc, D, "Finished\n");
1076
}
1077

    
1078
static void complete_write_through(struct peerd *peer, struct peer_req *pr,
1079
                                        struct xseg_request *req)
1080
{
1081
        /*
1082
         * In this context we hold a reference to the cache entry and
1083
         * the assocciated lock
1084
         */
1085
        struct cached *cached = __get_cached(peer);
1086
        struct cache_io *cio = __get_cache_io(pr);
1087
        struct ce *ce;
1088
        uint32_t start, end_serviced, end_requested, i;
1089
        uint64_t data_start, data_len, first_bucket_offset;
1090
        char *req_data;
1091
        int success;
1092

    
1093
        XSEGLOG2(&lc, I, "Started\n");
1094

    
1095
        success = (req->state == XS_SERVED && req->serviced == req->size);
1096
        ce = xcache_get_entry(cached->cache, cio->h);
1097
        if (!success){
1098
                XSEGLOG2(&lc, E, "Write failed");
1099
                cio->state = CIO_FAILED;
1100
                goto out;
1101
        }
1102

    
1103
        /*
1104
         * If the cache entry was evicted or invalidated, then there is no point
1105
         * updating the cache entry buckets.
1106
         * If there is a request on a loading bucket, it should complete by the
1107
         * completion of the read request. Otherwise there is a serious bug.
1108
         */
1109
        if (ce->status == CE_EVICTED || ce->status == CE_INVALIDATED)
1110
                goto out;
1111

    
1112
        start = __get_bucket(cached, req->offset);
1113
        end_serviced = __get_bucket(cached, req->offset + req->serviced - 1);
1114
//        end_requested = __get_bucket(cached, req->offset + req->size - 1);
1115

    
1116
        first_bucket_offset = req->offset % cached->bucket_size;
1117
        req_data = xseg_get_data(peer->xseg, req);
1118

    
1119
        /*
1120
         * for each serviced bucket
1121
         *        copy data to bucket
1122
         *        mark as valid
1123
         *        signal_waitq
1124
         */
1125

    
1126
        data_start = start * cached->bucket_size + first_bucket_offset;
1127
        data_len = cached->bucket_size - first_bucket_offset;
1128
        memcpy(&ce->data[data_start], req_data, data_len);
1129
        ce->bucket_status[start] = VALID;
1130
        xworkq_enqueue(&cached->workq, signal_waitq, &ce->bucket_waitq[start]);
1131

    
1132
        for (i = start+1; i <= end_serviced; i++) {
1133
                data_start = cached->bucket_size * (i - start) +
1134
                        first_bucket_offset;
1135
                if (data_start + cached->bucket_size <= req->size)
1136
                        data_len = cached->bucket_size;
1137
                else
1138
                        data_len = req->size - data_start;
1139
                memcpy(&ce->data[cached->bucket_size * i], &req_data[data_start],
1140
                                data_len);
1141
                ce->bucket_status[i] = VALID;
1142
                xworkq_enqueue(&cached->workq, signal_waitq, &ce->bucket_waitq[i]);
1143
        }
1144
out:
1145
        /*
1146
         * Here we do not put request, because we forwarded the original request.
1147
         */
1148
        if (cio->pending_reqs){
1149
                XSEGLOG2(&lc, I, "Finished\n");
1150
                return;
1151
        }
1152

    
1153
        /*
1154
         * This pr, was triggered in response to an accepted request and
1155
         * therefore must be completed or failed.
1156
         */
1157

    
1158
        if (cio->state == CIO_FAILED)
1159
                cached_fail(peer, pr);
1160
        else
1161
                cached_complete(peer, pr);
1162
        XSEGLOG2(&lc, I, "Finished\n");
1163
        return;
1164
}
1165

    
1166
static void complete_write_back(struct peerd *peer, struct peer_req *pr,
1167
                struct xseg_request *req)
1168
{
1169
        /*
1170
         * In this context we hold a reference to the cache entry and
1171
         * the assocciated lock
1172
         */
1173
        struct cached *cached = __get_cached(peer);
1174
        struct cache_io *cio = __get_cache_io(pr);
1175
        struct ce *ce;
1176
        uint32_t start, end_serviced, end_requested, i;
1177
        uint64_t first_bucket_offset;
1178
        char *req_data;
1179
        int success;
1180

    
1181
        XSEGLOG2(&lc, I, "Started\n");
1182

    
1183
        success = (req->state == XS_SERVED && req->serviced == req->size);
1184
        ce = xcache_get_entry(cached->cache, cio->h);
1185
        if (!success){
1186
                XSEGLOG2(&lc, E, "Write failed");
1187
                cio->state = CIO_FAILED;
1188
                goto out;
1189
        }
1190

    
1191
        /*
1192
         * If the cache entry was evicted or invalidated, then there is no point
1193
         * updating the cache entry buckets.
1194
         * If there is a request on a loading bucket, it should complete by the
1195
         * completion of the read request. Otherwise there is a serious bug.
1196
         */
1197
        if (ce->status == CE_EVICTED || ce->status == CE_INVALIDATED)
1198
                goto out;
1199

    
1200
        start = __get_bucket(cached, req->offset);
1201
        end_serviced = __get_bucket(cached, req->offset + req->serviced - 1);
1202
//        end_requested = __get_bucket(cached, req->offset + req->size - 1);
1203

    
1204
        first_bucket_offset = req->offset % cached->bucket_size;
1205
        req_data = xseg_get_data(peer->xseg, req);
1206

    
1207
        /*
1208
         * for each serviced bucket
1209
         *        if bucket_status writing
1210
         *                mark as valid
1211
         *                signal bucket
1212
         */
1213

    
1214
        /* should we signal with the cache entry lock held ? */
1215

    
1216
        if (ce->bucket_status[start] == WRITING){
1217
                ce->bucket_status[start] = VALID;
1218
                xworkq_enqueue(&cached->workq, signal_waitq, &ce->bucket_waitq[start]);
1219
        }
1220
        ce->bucket_status[start] = VALID;
1221

    
1222
        for (i = start+1; i <= end_serviced; i++) {
1223
                if (ce->bucket_status[i] == WRITING){
1224
                        ce->bucket_status[i] = VALID;
1225
                        xworkq_enqueue(&cached->workq, signal_waitq, &ce->bucket_waitq[i]);
1226
                }
1227
        }
1228
out:
1229
        xseg_put_request(peer->xseg, req, pr->portno);
1230

    
1231
        if (cio->pending_reqs){
1232
                XSEGLOG2(&lc, I, "Finished\n");
1233
                return;
1234
        }
1235

    
1236
        /*
1237
         * This write was a result of a cache eviction or (in future), an
1238
         * explicit cache flush, and therefore, we have nothing to respond. We
1239
         * do however have to put the cache entry and/or notify waiter of the
1240
         * evicted entry.
1241
         */
1242

    
1243
        if (ce->status == CE_EVICTED){
1244
                if (cio->state == CIO_FAILED){
1245
                        ce->evicted_by->status = CE_FAILED;
1246
                        xworkq_enqueue(&cached->workq, signal_waitq, &ce->evicted_by->waitq);
1247
                        ce->evicted_by = NULL;
1248
                }
1249
        } else {
1250
                /* a flush request */
1251
                //TODO notify the rest of the flushing process
1252
        }
1253
        xcache_put(cached->cache, cio->h);
1254
        XSEGLOG2(&lc, I, "Finished\n");
1255
        return;
1256
}
1257

    
1258

    
1259
void complete_write(void *q, void *arg)
1260
{
1261
        /*
1262
         * In this context we hold a reference to the cache entry and
1263
         * the assocciated lock
1264
         */
1265

    
1266
        struct req_completion *rc = (struct req_completion *)arg;
1267
        struct peer_req *pr = rc->pr;
1268
        struct xseg_request *req = rc->req;
1269
        struct peerd *peer = pr->peer;
1270
        struct cached *cached = __get_cached(peer);
1271
        struct cache_io *cio = __get_cache_io(pr);
1272

    
1273
        XSEGLOG2(&lc, I, "Started\n");
1274

    
1275
        free(rc);
1276

    
1277
        /*
1278
         * Synchronize pending_reqs of the cache_io here, since each cache_io
1279
         * refers to only one object, and therefore we can use the object lock
1280
         * to synchronize between receive contextes.
1281
         */
1282
        cio->pending_reqs--;
1283

    
1284
        if (cached->write_policy == WRITETHROUGH){
1285
                complete_write_through(peer, pr, req);
1286
        } else if (cached->write_policy == WRITEBACK) {
1287
                complete_write_back(peer, pr, req);
1288
        }
1289
        return;
1290
}
1291

    
1292
static int handle_receive_read(struct peerd *peer, struct peer_req *pr,
1293
                        struct xseg_request *req)
1294
{
1295
        XSEGLOG2(&lc, D, "Started\n");
1296
        /*
1297
         * Should be rentrant
1298
         */
1299
        struct cached *cached = __get_cached(peer);
1300
        struct cache_io *cio = __get_cache_io(pr);
1301
        /*assert there is a handler for received cio*/
1302
        struct ce *ce = xcache_get_entry(cached->cache, cio->h);
1303

    
1304
        struct req_completion *rc;
1305

    
1306
        rc = malloc(sizeof(struct req_completion));
1307
        if (!rc) {
1308
                perror("malloc");
1309
                return -1;
1310
        }
1311

    
1312
        rc->pr = pr;
1313
        rc->req = req;
1314
        if (xworkq_enqueue(&ce->workq, complete_read, (void *)rc) < 0){
1315
                free(rc);
1316
                return -1;
1317
                //TODO WHAT?
1318
        }
1319
        xworkq_signal(&ce->workq);
1320
        XSEGLOG2(&lc, D, "Finished\n");
1321
        return 0;
1322
}
1323

    
1324
static int handle_receive_write(struct peerd *peer, struct peer_req *pr,
1325
                        struct xseg_request *req)
1326
{
1327
        XSEGLOG2(&lc, I, "Started\n");
1328
        /*
1329
         * Should be rentrant
1330
         */
1331
        struct cached *cached = __get_cached(peer);
1332
        struct cache_io *cio = __get_cache_io(pr);
1333
        /*assert there is a handler for received cio*/
1334
        struct ce *ce = xcache_get_entry(cached->cache, cio->h);
1335

    
1336
        struct req_completion *rc;
1337

    
1338
        rc = malloc(sizeof(struct req_completion));
1339
        if (!rc) {
1340
                perror("malloc");
1341
                return -1;
1342
        }
1343

    
1344
        rc->pr = pr;
1345
        rc->req = req;
1346
        if (xworkq_enqueue(&ce->workq, complete_write, (void *)rc) < 0){
1347
                free(rc);
1348
                return -1;
1349
                //TODO WHAT?
1350
        }
1351
        xworkq_signal(&ce->workq);
1352
        XSEGLOG2(&lc, I, "Finished\n");
1353
        return 0;
1354
}
1355

    
1356
static int handle_delete(struct peerd *peer, struct peer_req *pr)
1357
{
1358
        //h = cache_lookup
1359
        //if h
1360
        //        cio->h = h
1361
        //
1362
        //send delete to blocker
1363
        return 0;
1364
}
1365

    
1366
static int handle_receive_delete(struct peerd *peer, struct peer_req *pr)
1367
{
1368
        //if success
1369
        //        if cio->h
1370
        //                //this should not write any dirty data
1371
        //                xcache_remove(h)
1372
        return 0;
1373
}
1374

    
1375
/*
1376
 * Special forward request function, that associates the request with the pr
1377
 * before forwarding.
1378
 */
1379
static int forward_req(struct peerd *peer, struct peer_req *pr,
1380
                        struct xseg_request *req)
1381
{
1382
        struct cached *cached = __get_cached(peer);
1383
        struct cache_io *cio = __get_cache_io(pr);
1384

    
1385
        xport p;
1386
        xseg_set_req_data(peer->xseg, req, (void *)pr);
1387
        p = xseg_forward(peer->xseg, req, cached->bportno, pr->portno, X_ALLOC);
1388
        if (p == NoPort){
1389
                xseg_set_req_data(peer->xseg, req, NULL);
1390
                return -1;
1391
        }
1392
        cio->pending_reqs++;
1393

    
1394
        xseg_signal(peer->xseg, p);
1395

    
1396
        return 0;
1397
}
1398

    
1399
static int handle_receive(struct peerd *peer, struct peer_req *pr,
1400
                        struct xseg_request *req)
1401
{
1402
        //if not read/write/delete
1403
        //        put req;
1404
        //        complete or fail pr;
1405
        int r = 0;
1406
        xport p;
1407

    
1408
        switch (req->op){
1409
                case X_READ: r = handle_receive_read(peer, pr, req); break;
1410
                case X_WRITE: r = handle_receive_write(peer, pr, req); break;
1411
//                case X_DELETE: r = handle_receive_delete(peer, pr, req); break;
1412
                default:
1413
                        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
1414
                        if (p == NoPort)
1415
                                r = xseg_put_request(peer->xseg, req, pr->portno);
1416
                        break;
1417
        }
1418

    
1419
        return r;
1420
}
1421

    
1422
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1423
                enum dispatch_reason reason)
1424
{
1425
        struct cached *cached = __get_cached(peer);
1426
        struct cache_io *cio = __get_cache_io(pr);
1427

    
1428
        switch (reason) {
1429
                case dispatch_accept:
1430
                        XSEGLOG2(&lc, D, "In dispatch accept");
1431
                        if (req->op == X_READ || req->op == X_WRITE) {
1432
                                /*We cache only read/write requests*/
1433
                                cio->state = CIO_ACCEPTED;
1434
                                handle_readwrite(peer, pr);
1435
                        } else {
1436
                                /*FIXME: Other requests should be forwarded to blocker*/
1437
                                fail(peer, pr);
1438
                        }
1439
                        break;
1440
                case dispatch_receive:
1441
                        XSEGLOG2(&lc, D, "In dispatch receive");
1442
                        handle_receive(peer, pr, req);
1443
                        break;
1444
                case dispatch_internal:
1445
                default:
1446
                        XSEGLOG2(&lc, E, "Invalid dispatch reason");
1447
        }
1448
        /* Before returning, perform pending jobs 
1449
         * This should probably be called before xseg_wait_signal
1450
         */
1451
        xworkq_signal(&cached->workq);
1452
        return 0;
1453
}
1454

    
1455

    
1456
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1457
{
1458
        int i;
1459
        char bucket_size[MAX_ARG_LEN + 1];
1460
        char object_size[MAX_ARG_LEN + 1];
1461
        char max_req_size[MAX_ARG_LEN + 1];
1462
        char write_policy[MAX_ARG_LEN + 1];
1463
        long bportno = -1;
1464
        long cache_size = -1;
1465

    
1466
        bucket_size[0] = 0;
1467
        object_size[0] = 0;
1468
        max_req_size[0] = 0;
1469
        write_policy[0] = 0;
1470

    
1471
        /*Allocate enough space for needed structs*/
1472
        struct cached *cached = malloc(sizeof(struct cached));
1473
        if (!cached) {
1474
                perror("malloc");
1475
                goto fail;
1476
        }
1477
        cached->cache = malloc(sizeof(struct xcache));
1478
        if (!cached->cache) {
1479
                perror("malloc");
1480
                goto cache_fail;
1481
        }
1482
        peer->priv = cached;
1483

    
1484
        for (i = 0; i < peer->nr_ops; i++) {
1485
                struct cache_io *cio = malloc(sizeof(struct cache_io));
1486
                if (!cio) {
1487
                        perror("malloc");
1488
                        goto cio_fail;
1489
                }
1490
                cio->h = NoEntry;
1491
                cio->pending_reqs = 0;
1492
                peer->peer_reqs[i].priv = cio;
1493
        }
1494

    
1495
        /*Read arguments*/
1496
        BEGIN_READ_ARGS(argc, argv);
1497
        READ_ARG_ULONG("-bp", bportno);
1498
        READ_ARG_ULONG("-cs", cache_size);
1499
        READ_ARG_STRING("-mrs", max_req_size, MAX_ARG_LEN);
1500
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
1501
        READ_ARG_STRING("-bs", bucket_size, MAX_ARG_LEN);
1502
        READ_ARG_STRING("-wcp", write_policy, MAX_ARG_LEN);
1503
        END_READ_ARGS();
1504

    
1505
        /*Parse arguments for:*/
1506

    
1507
        /*Bucket size*/
1508
        if (!bucket_size[0]) {
1509
                cached->bucket_size = BUCKET_SIZE_QUANTUM; /*Default value*/
1510
        } else {
1511
                cached->bucket_size = str2num(bucket_size);
1512
                if (!cached->bucket_size) {
1513
                        XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", bucket_size);
1514
                        goto arg_fail;
1515
                }
1516
                if (cached->bucket_size % BUCKET_SIZE_QUANTUM) {
1517
                        XSEGLOG2(&lc, E, "Misaligned bucket size: %s\n", bucket_size);
1518
                        goto arg_fail;
1519
                }
1520
        }
1521

    
1522
        /*Object size*/
1523
        if (!object_size[0])
1524
                strcpy(object_size, "4M"); /*Default value*/
1525

    
1526
        cached->object_size = str2num(object_size);
1527
        if (!cached->object_size) {
1528
                XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
1529
                goto arg_fail;
1530
        }
1531
        if (cached->object_size % cached->bucket_size) {
1532
                XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
1533
                goto arg_fail;
1534
        }
1535

    
1536
        /*Max request size*/
1537
        if (!max_req_size[0]) {
1538
                XSEGLOG2(&lc, E, "Maximum request size must be provided\n");
1539
                goto arg_fail;
1540
        }
1541
        cached->max_req_size = str2num(max_req_size);
1542
        if (!cached->max_req_size) {
1543
                XSEGLOG2(&lc, E, "Invalid syntax: -mrs %s\n", max_req_size);
1544
                goto arg_fail;
1545
        }
1546
        if (cached->max_req_size % BUCKET_SIZE_QUANTUM) {
1547
                XSEGLOG2(&lc, E, "Misaligned maximum request size: %s\n",
1548
                                max_req_size);
1549
                goto arg_fail;
1550
        }
1551

    
1552
        /*Cache size*/
1553
        if (cache_size < 0) {
1554
                XSEGLOG2(&lc, E, "Cache size must be provided\n");
1555
                goto arg_fail;
1556
        }
1557
        cached->cache_size = cache_size;
1558

    
1559
        /*Blocker port*/
1560
        if (bportno < 0){
1561
                XSEGLOG2(&lc, E, "Blocker port must be provided");
1562
                goto arg_fail;
1563
        }
1564
        cached->bportno = bportno;
1565

    
1566
        /*Write policy*/
1567
        if (!write_policy[0]) {
1568
                XSEGLOG2(&lc, E, "Write policy must be provided");
1569
                goto arg_fail;
1570
        }
1571
        cached->write_policy = read_write_policy(write_policy);
1572
        if (cached->write_policy < 0) {
1573
                XSEGLOG2(&lc, E, "Invalid syntax: -wcp %s\n", write_policy);
1574
                goto arg_fail;
1575
        }
1576

    
1577
        cached->buckets_per_object = cached->object_size / cached->bucket_size;
1578

    
1579
        print_cached(cached);
1580

    
1581
        /*Initialize xcache*/
1582
        xcache_init(cached->cache, cached->cache_size, &c_ops, XCACHE_LRU_HEAP, peer);
1583
        xworkq_init(&cached->workq, NULL, 0);
1584

    
1585
        return 0;
1586

    
1587
arg_fail:
1588
        custom_peer_usage();
1589
cio_fail:
1590
        for (i = 0; i < peer->nr_ops && peer->peer_reqs[i].priv != NULL; i++)
1591
                free(peer->peer_reqs[i].priv);
1592
        free(cached->cache);
1593
cache_fail:
1594
        free(cached);
1595
fail:
1596
        return -1;
1597
}
1598

    
1599
void custom_peer_finalize(struct peerd *peer)
1600
{
1601
        //write dirty objects
1602
        //or cache_close(cached->cache);
1603
        return;
1604
}
1605

    
1606
void custom_peer_usage()
1607
{
1608
        fprintf(stderr, "Custom peer options: \n"
1609
                        "  --------------------------------------------\n"
1610
                        "    -cs       | None    | Number of objects to cache\n"
1611
                        "    -mrs      | None    | Max request size\n"
1612
                        "    -os       | 4M      | Object size\n"
1613
                        "    -bs       | 4K      | Bucket size\n"
1614
                        "    -bp       | None    | Blocker port\n"
1615
                        "    -wcp      | None    | Write policy [writethrough|writeback]\n"
1616
                        "\n");
1617
}