Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / cached.c @ 8b803cb1

History | View | Annotate | Download (26.5 kB)

1
/*
2
 * Copyright 2013 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <unistd.h>
37
#include <sys/types.h>
38
#include <pthread.h>
39
#include <xseg/xseg.h>
40
#include <peer.h>
41
#include <time.h>
42
#include <xtypes/xlock.h>
43
#include <xtypes/xq.h>
44
#include <xtypes/xhash.h>
45
#include <xtypes/xworkq.h>
46
#include <xtypes/xwaitq.h>
47
#include <xseg/protocol.h>
48
#include <xtypes/xcache.h>
49

    
50
#define MAX_ARG_LEN 12
51

    
52
/* bucket states */
53
#define INVALID 0
54
#define LOADING 1
55
#define VALID   2
56
#define DIRTY   3
57
#define WRITING 4
58

    
59
#define bucket_readable(__status) \
60
        (__status == VALID || __status == DIRTY || __status == WRITING)
61

    
62
/* write policies */
63
#define WRITETHROUGH 1
64
#define WRITEBACK    2
65

    
66
#define WRITE_POLICY(__wcp)                                                        \
67
        __wcp == WRITETHROUGH ? "writethrough" :                \
68
        __wcp == WRITEBACK        ? "writeback" :                                \
69
        "undefined"
70

    
71

    
72

    
73
/* object states */
74
#define INVALIDATED (1 << 0)
75

    
76

    
77
/* cio states */
78
#define CIO_FAILED        1
79
#define CIO_ACCEPTED        2
80
#define CIO_READING        3
81

    
82
#define BUCKET_SIZE_QUANTUM 4096
83

    
84
struct cache_io {
85
        uint32_t state;
86
        xcache_handler h;
87
        uint32_t pending_reqs;
88
        struct work work;
89
};
90

    
91
struct cached {
92
        struct xcache *cache;
93
        uint64_t cache_size; /*Number of objects*/
94
        uint64_t max_req_size;
95
        uint32_t object_size; /*Bytes*/
96
        uint32_t bucket_size; /*In bytes*/
97
        uint32_t buckets_per_object;
98
        xport bportno;
99
        int write_policy;
100
        //scheduler
101
};
102

    
103
struct ce {
104
        unsigned char *data;
105
        uint32_t *status;
106
        struct xwaitq *waitq;
107
        uint32_t flags;
108
        struct xlock lock;
109
        struct xworkq workq;
110
        struct peer_req pr;
111
};
112

    
113

    
114
/*
115
 * Helper functions
116
 */
117

    
118
static inline struct cached * __get_cached(struct peerd *peer)
119
{
120
        return (struct cached *) peer->priv;
121
}
122

    
123
static inline struct cache_io * __get_cache_io(struct peer_req *pr)
124
{
125
        return (struct cache_io *) pr->priv;
126
}
127

    
128
static inline uint32_t __calculate_size(struct cached *cached,
129
                uint32_t start, uint32_t end)
130
{
131
        return (end - start + 1) * cached->bucket_size;
132
}
133

    
134
static inline uint32_t __calculate_offset(struct cached *cached,
135
                uint32_t start)
136
{
137
        return start * cached->bucket_size;
138
}
139

    
140
static inline uint64_t __quantize(uint64_t num, uint32_t quantum)
141
{
142
        quantum--;
143
        return num & (uint64_t)(~quantum);
144
}
145

    
146
static uint32_t __get_bucket(struct cached *cache, uint64_t offset)
147
{
148
        return (offset / cache->bucket_size);
149
}
150

    
151
static int is_not_loading(void *arg)
152
{
153
        uint32_t *status = (uint32_t *)arg;
154
        return (*status != LOADING);
155
}
156

    
157

    
158
static void print_cached(struct cached *cached)
159
{
160
        if (!cached) {
161
                XSEGLOG2(&lc, W, "Struct cached is NULL\n");
162
                return;
163
        }
164

    
165
        XSEGLOG2(&lc, I, "Struct cached fields:\n"
166
                        "                     cache        = %p\n"
167
                        "                     cache_size   = %lu\n"
168
                        "                     max_req_size = %lu\n"
169
                        "                     object_size  = %lu\n"
170
                        "                     bucket_size  = %lu\n"
171
                        "                     bucks_per_obj= %lu\n"
172
                        "                     Bportno      = %d\n"
173
                        "                     write_policy = %s\n",
174
                        cached->cache, cached->cache_size, cached->max_req_size,
175
                        cached->object_size, cached->bucket_size,
176
                        cached->buckets_per_object, cached->bportno,
177
                        WRITE_POLICY(cached->write_policy));
178
}
179

    
180
int read_write_policy(char *write_policy)
181
{
182
        if (strcmp(write_policy, "writethrough") == 0)
183
                return WRITETHROUGH;
184
        if (strcmp(write_policy, "writeback") == 0)
185
                return WRITEBACK;
186
        return -1;
187
}
188

    
189
/*
190
 * Convert string to size in bytes.
191
 * If syntax is invalid, return 0. Values such as zero and non-integer
192
 * multiples of segment's page size should not be accepted.
193
 */
194
uint64_t str2num(char *str)
195
{
196
        char *unit;
197
        uint64_t num;
198

    
199
        num = strtoll(str, &unit, 10);
200
        if (strlen(unit) > 1) //Invalid syntax
201
                return 0;
202
        else if (strlen(unit) < 1) //Plain number in bytes
203
                return num;
204

    
205
        switch (*unit) {
206
                case 'g':
207
                case 'G':
208
                        num *= 1024;
209
                case 'm':
210
                case 'M':
211
                        num *= 1024;
212
                case 'k':
213
                case 'K':
214
                        num *= 1024;
215
                        break;
216
                default:
217
                        num = 0;
218
        }
219
        return num;
220
}
221

    
222
/*
223
 * serve_req is called only when all the requested buckets are readable.
224
 */
225
static int serve_req(struct peerd *peer, struct peer_req *pr)
226
{
227
        struct cached *cached = __get_cached(peer);
228
        struct cache_io *cio = __get_cache_io(pr);
229
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
230
        struct xseg *xseg = peer->xseg;
231
        struct xseg_request *req = pr->req;
232
        char *req_data = xseg_get_data(xseg, req);
233

    
234
        XSEGLOG2(&lc, D, "Started\n");
235
        req->serviced = req->size;
236

    
237
        //assert req->serviced <= req->datalen
238
        memcpy(req_data, ce->data + req->offset, req->serviced);
239
        XSEGLOG2(&lc, D, "Finished\n");
240

    
241
        return 0;
242
}
243

    
244
/*
245
 * rw_range handles the issuing of requests to the blocker. Usually called when
246
 * we need to read(write) data from(to) slower media.
247
 */
248
static int rw_range(struct peerd *peer, struct peer_req *pr, int action,
249
                uint32_t start, uint32_t end)
250
{
251
        struct cached *cached = __get_cached(peer);
252
        struct cache_io *cio = __get_cache_io(pr);
253
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
254
        struct xseg_request *req;
255
        struct xseg_request *req_old = pr->req;
256
        struct xseg *xseg = peer->xseg;
257
        xport srcport = pr->portno;
258
        xport dstport = cached->bportno;
259
        xport p;
260
        char *req_target;
261
        int r;
262
        uint32_t i;
263

    
264
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
265
        if (!req) {
266
                XSEGLOG2(&lc, W, "Cannot get request\n");
267
                return -1;
268
        }
269

    
270
        req->size = __calculate_size(cached, start, end);
271
        req->offset = __calculate_offset(cached, start);
272
        req->op = req_old->op;
273

    
274
        //Allocate enough space for the data and the target's name
275
        r = xseg_prep_request(xseg, req, req_old->targetlen, req->size);
276
        if (r < 0) {
277
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
278
                                req_old->targetlen, (unsigned long long)req->size);
279
                goto put_xseg_request;
280
        }
281

    
282
        req_target = xseg_get_target(xseg, req);
283
        char *req_old_target = xseg_get_target(xseg, req_old);
284
        strncpy(req_target, req_old_target, req_old->targetlen);
285

    
286
        if (req->op == X_WRITE) {
287
                /*
288
                 * //Paste data
289
                 * req_data = xseg_get_data(xseg, req);
290
                 * memcpy(req_data, req_old->data, size);
291
                 */
292
        } else {
293
                for (i=start; i<=end; i++){
294
                        ce->status[i] = LOADING;
295
                }
296
        }
297

    
298
        r = xseg_set_req_data(xseg, req, pr);
299
        if (r < 0) {
300
                XSEGLOG2(&lc, W, "Cannot set request data\n");
301
                goto put_xseg_request;
302
        }
303

    
304
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
305
        if (p == NoPort) {
306
                XSEGLOG2(&lc, W, "Cannot submit request\n");
307
                goto out_unset_data;
308
        }
309

    
310
        r = xseg_signal(xseg, p);
311

    
312
        return 0;
313

    
314
out_unset_data:
315
        xseg_set_req_data(xseg, req, NULL);
316
put_xseg_request:
317
        if (xseg_put_request(xseg, req, srcport))
318
                XSEGLOG2(&lc, W, "Cannot put request\n");
319
        return -1;
320
}
321

    
322
int on_init(void *c, void *e)
323
{
324
        uint32_t i;
325
        struct peerd *peer = (struct peerd *)c;
326
        struct cached *cached = peer->priv;
327
        struct ce *ce = (struct ce *)e;
328
        ce->flags = 0;
329
        memset(ce->data, 0, cached->object_size);
330
        for (i = 0; i < cached->buckets_per_object; i++) {
331
                ce->status[i] = INVALID;
332
        }
333
        xlock_release(&ce->lock);
334
        return 0;
335
}
336

    
337
void on_put(void *c, void *e)
338
{
339
        struct peerd *peer = (struct peerd *)c;
340
        struct cached *cached = peer->priv;
341
        struct ce *ce = (struct ce *)e;
342
        //since we are the last referrer to the cache entry
343
        //no lock is needed.
344

    
345
        XSEGLOG2(&lc, D, "Putting cache entry %p", e);
346

    
347
        uint32_t start, end, i = 0;
348
        if (cached->write_policy == WRITETHROUGH || ce->flags & INVALIDATED)
349
                return;
350
        //write all dirty buckets.
351
        while(i < cached->buckets_per_object){
352
                if (ce->status[i] != DIRTY){
353
                        i++;
354
                        continue;
355
                }
356
                start = i;
357
                while (i < cached->buckets_per_object &&
358
                        (i-start)*cached->bucket_size < cached->max_req_size &&
359
                                ce->status[i] == DIRTY){
360
                        i++;
361
                }
362
                end = i;
363
                //problem: no assocciated pr
364
                //maybe put one in cache entry
365
                //rw_range(cached, ce, 1, start, end);
366
        }
367
}
368

    
369
void * init_node(void *c)
370
{
371
        int i;
372
        struct peerd *peer = (struct peerd *)c;
373
        struct cached *cached = peer->priv;
374

    
375
        struct ce *ce = malloc(sizeof(struct ce));
376
        if (!ce)
377
                goto ce_fail;
378
        xlock_release(&ce->lock);
379

    
380
        ce->data = malloc(sizeof(unsigned char) * cached->object_size);
381
        ce->status = malloc(sizeof(uint32_t) * cached->buckets_per_object);
382
        ce->waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object);
383
        if (!ce->data || !ce->status || !ce->waitq)
384
                goto ce_fields_fail;
385

    
386
        ce->pr.peer = peer;
387
        for (i = 0; i < cached->buckets_per_object; i++) {
388
                xwaitq_init(&ce->waitq[i], is_not_loading, &ce->status[i],
389
                                XWAIT_SIGNAL_ONE);
390
        }
391
        xworkq_init(&ce->workq, &ce->lock, 0);
392
        return ce;
393

    
394
ce_fields_fail:
395
        free(ce->data);
396
        free(ce->status);
397
        free(ce->waitq);
398
        free(ce);
399
ce_fail:
400
        perror("malloc");
401
        return NULL;
402
}
403

    
404
struct xcache_ops c_ops = {
405
        .on_init = on_init,
406
        .on_put  = on_put,
407
        .on_node_init = init_node
408
};
409

    
410
static uint32_t __get_last_invalid(struct ce *ce, uint32_t start,
411
                                        uint32_t limit)
412
{
413
        uint32_t end = start + 1;
414
        while (end <= limit && ce->status[end] == INVALID)
415
                end++;
416
        return (end - 1);
417
}
418

    
419
static void cached_fail(struct peerd *peer, struct peer_req *pr)
420
{
421
        struct cached *cached = __get_cached(peer);
422
        struct cache_io *cio = __get_cache_io(pr);
423
        if (cio->h != NoEntry){
424
                xcache_put(cached->cache, cio->h);
425
        }
426
        cio->h = NoEntry;
427
        fail(peer, pr);
428
}
429

    
430
static void cached_complete(struct peerd *peer, struct peer_req *pr)
431
{
432
        struct cached *cached = __get_cached(peer);
433
        struct cache_io *cio = __get_cache_io(pr);
434
        if (cio->h != NoEntry){
435
                xcache_put(cached->cache, cio->h);
436
        }
437
        cio->h = NoEntry;
438
        complete(peer, pr);
439
}
440

    
441
static void handle_read(void *arg);
442
//is this necessary?
443
static void status_changed(void *arg)
444
{
445
        /*
446
         * In this context we hold a reference to the cache entry.
447
         *
448
         * This function gets called only after the bucket at which the
449
         * current peer_req is waiting, has finished loading of failed.
450
         *
451
         * Assumptions:
452
         *         Each pr waits only at one bucket at any time. That means that
453
         *         under no circumstances, this function get called simutaneously
454
         *         for the same pr.
455
         */
456
        struct peer_req *pr = (struct peer_req *)arg;
457
        struct peerd *peer = pr->peer;
458
        struct cached *cached = __get_cached(peer);
459
        struct cache_io *cio = __get_cache_io(pr);
460
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
461

    
462
        if (xworkq_enqueue(&ce->workq, handle_read, (void *)pr) < 0){
463
                //FAIL or mark as failed ? are we the last?
464
                if (cio->pending_reqs){
465
                        // cannot put here, since there are outstanding reqs to
466
                        // be received.
467
                        // Simply mark pr as failed.
468
                        cio->state = CIO_FAILED;
469
                } else {
470
                        //safe to fail here, since there is no pending action on
471
                        //this pr.
472
                        cached_fail(peer, pr);
473
                }
474
        }
475
}
476

    
477
/*
478
 * handle_read reads all buckets within a given request's range.
479
 * If a bucket is:
480
 * VALID || DIRTY || WRITING: it's good to read.
481
 * INVALID: we have to issue a request (via blocker) to read it from slower
482
 *          media.
483
 * LOADING: We have to wait (on a waitq) for the slower media to answer our
484
 *          previous request.
485
 *
486
 * If unreadable buckets exist, it waits on the last unreadable bucket.
487
 */
488
static void handle_read(void *arg)
489
{
490
        /*
491
         * In this context we hold a reference to the cache entry and
492
         * the assocciated lock
493
         */
494

    
495
        struct peer_req *pr = (struct peer_req *)arg;
496
        struct peerd *peer = pr->peer;
497
        struct cached *cached = __get_cached(peer);
498
        struct cache_io *cio = __get_cache_io(pr);
499
        struct xseg_request *req = pr->req;
500
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
501

    
502
        uint32_t start_bucket, end_bucket;
503
        uint32_t i, b, limit;
504

    
505
        uint32_t pending_buckets = 0;
506

    
507
        XSEGLOG2(&lc, D, "Handle read started for %p (ce: %p)", pr, ce );
508
        if (cio->state == CIO_FAILED)
509
                goto out;
510
        b = __get_bucket(cached, req->offset);
511
        limit = __get_bucket(cached, req->offset + req->size - 1);
512
        //assert limit < cached->object_size
513

    
514

    
515
        XSEGLOG2(&lc, D, "Start: %lu, Limit %lu", b, limit );
516
        for (i = b; i <= limit; i++) {
517
                if (bucket_readable(ce->status[i]))
518
                        continue;
519
                if (ce->status[i] != LOADING){
520
                        XSEGLOG2(&lc, D, "Found invalid bucket %lu\n", i);
521
                        start_bucket = i;
522
                        end_bucket = __get_last_invalid(ce, start_bucket, limit);
523
                        i = end_bucket + 1;
524
                        if (rw_range(peer, pr, 0, start_bucket, end_bucket) < 0){
525
                                cio->state = CIO_FAILED;
526
                                break;
527
                        }
528
                        cio->pending_reqs++;
529
                        cio->state =  CIO_READING;
530
                }
531
                pending_buckets++;
532
        }
533

    
534
        if (pending_buckets) {
535
                XSEGLOG2(&lc, D, "Pending buckets exists: %u\n", pending_buckets);
536
                /* Do not put cache entry yet */
537
                cio->work.job_fn = handle_read;
538
                cio->work.job = pr;
539
                /* wait on the last bucket */
540
                xwaitq_enqueue(&ce->waitq[end_bucket], &cio->work);
541
                return;
542
        }
543

    
544
out:
545
        if (cio->state == CIO_FAILED){
546
                if (!cio->pending_reqs)
547
                        cached_fail(peer, pr);
548
        } else {
549
                if (serve_req(peer, pr)) {
550
                        XSEGLOG2(&lc, E,"Serve of request failed\n");
551
                        fail(peer, pr);
552
                }
553
                cached_complete(peer, pr);
554
        }
555
        return;
556
}
557

    
558
static void handle_write(void *arg)
559
{
560
        //if writeback
561
        //          for each bucket
562
        //                write all buckets
563
        //                mark them as dirty
564
        //        cache_put(h)
565
        //        complete
566
        //else
567
        //        send write to blocker
568

    
569
        /*
570
         * In this context we hold a reference to the cache entry and
571
         * the assocciated lock
572
         */
573

    
574
        int r = 0;
575
        struct peer_req *pr = (struct peer_req *)arg;
576
        struct peerd *peer = pr->peer;
577
        struct cached *cached = __get_cached(peer);
578
        struct cache_io *cio = __get_cache_io(pr);
579
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
580
        (void)ce;
581

    
582
        if (cached->write_policy == WRITETHROUGH){
583
                //send write to blocker
584
                //return
585
        } else if (cached->write_policy == WRITEBACK) {
586
                //for each bucket
587
                //        write all buckets
588
                //        mark them as dirty
589
                r = 0;
590
        } else {
591
                r = -1;
592
        }
593

    
594
out:
595
        if (r < 0)
596
                cached_fail(peer, pr);
597
        else
598
                cached_complete(peer, pr);
599
        return;
600
}
601

    
602
/*
603
 * handle_readwrite is called when we accept a request.
604
 * Its purpose is to find a handler associated with the request's target
605
 * object (partial cache hit), or to allocate a new one (cache_miss) and insert
606
 * it in xcache.
607
 *
608
 * Depending on the op type, a handler function is enqueued in the workq of the
609
 * target's cache_entry.
610
 */
611
static int handle_readwrite(struct peerd *peer, struct peer_req *pr)
612
{
613
        struct ce *ce;
614
        struct cached *cached = __get_cached(peer);
615
        struct cache_io *cio = __get_cache_io(pr);
616
        struct xseg_request *req = pr->req;
617
        char name[XSEG_MAX_TARGETLEN + 1];
618
        char *target;
619
        int r = -1;
620
        xcache_handler h = NoEntry, nh;
621

    
622
        //TODO: assert req->size != 0 --> complete req
623
        //assert (req->offset % cached->bucket_size) == 0;
624
        //NOTE: What about writes with req->size = 0?
625
        XSEGLOG2(&lc, D, "Started\n");
626

    
627
        target = xseg_get_target(peer->xseg, req);
628
        strncpy(name, target, req->targetlen);
629
        name[req->targetlen] = 0;
630
        XSEGLOG2(&lc, D, "Target is %s\n", name);
631

    
632
        h = xcache_lookup(cached->cache, name);
633
        if (h == NoEntry){
634
                XSEGLOG2(&lc, D, "Cache miss\n");
635
                h = xcache_alloc_init(cached->cache, name);
636
                if (h == NoEntry){
637
                        XSEGLOG2(&lc, E, "Could not allocate cache entry");
638
                        goto out;
639
                }
640
                nh = xcache_insert(cached->cache, h);
641
                if (nh == NoEntry){
642
                        xcache_put(cached->cache, h);
643
                        XSEGLOG2(&lc, E, "Could not insert cache entry");
644
                        goto out;
645
                }
646
                if (nh != h){
647
                        /* if insert returns another cache entry than the one we
648
                         * allocated and requested to be inserted, then
649
                         * someone else beat us to the insertion of a cache
650
                         * entry assocciated with the same name. Use this cache
651
                         * entry instead and put the one we allocated.
652
                         */
653
                        xcache_put(cached->cache, h);
654
                        h = nh;
655
                }
656
        } else {
657
                XSEGLOG2(&lc, D, "Cache hit\n");
658
        }
659

    
660
        ce = (struct ce *)get_cache_entry(cached->cache, h);
661
        if (!ce){
662
                r = -1;
663
                XSEGLOG2(&lc, E, "Received cache entry handler %lu but no cache entry", h);
664
                goto out;
665
        }
666
        cio->h = h;
667

    
668
        if (req->op == X_WRITE)
669
                r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr);
670
        else if (req->op == X_READ)
671
                r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr);
672
        else {
673
                r = -1;
674
                XSEGLOG2(&lc, E, "Invalid op %u", req->op);
675
                goto out;
676
        }
677

    
678
out:
679
        if (r < 0){
680
                XSEGLOG2(&lc, E, "Failing pr %p", pr);
681
                cached_fail(peer, pr);
682
        }
683
        return r;
684

    
685
}
686

    
687
struct req_completion{
688
        struct peer_req *pr;
689
        struct xseg_request *req;
690
};
691

    
692
/*
693
 * complete_read is called when we receive a reply from a request issued by
694
 * rw_range. The process mentioned below applies only to buckets previously
695
 * marked as LOADING:
696
 *
697
 * If all requested buckets are serviced, we mark these buckets as VALID.
698
 * If not, we mark serviced buckets as VALID, non-serviced buckets as INVALID
699
 * and the cio is failed
700
 *
701
 * At any point when a bucket status changes, we signal the respective waitq.
702
 */
703
static void complete_read(void *arg)
704
{
705
        /*
706
         * In this context we hold a reference to the cache entry and
707
         * the assocciated lock
708
         */
709
        struct req_completion *rc = (struct req_completion *)arg;
710
        struct peer_req *pr = rc->pr;
711
        struct xseg_request *req = rc->req;
712
        struct peerd *peer = pr->peer;
713
        struct cached *cached = __get_cached(peer);
714
        struct cache_io *cio = __get_cache_io(pr);
715
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
716
        char *data = xseg_get_data(peer->xseg, req);
717
        uint32_t start, end_serviced, end_size, i;
718

    
719
        XSEGLOG2(&lc, D, "Started\n");
720
        XSEGLOG2(&lc, D, "Target: %s.\n Serviced vs total: %lu/%lu.\n",
721
                        xseg_get_target(peer->xseg, req), req->serviced, req->size);
722

    
723
        /* TODO: Assert req->size != 0 ? */
724
        /* TODO: Assert req->state != XS_SERVED && req->serviced == req->size ? */
725

    
726
        /*
727
         * Synchronize pending_reqs of the cache_io here, since each cache_io
728
         * refers to only one object, and therefore we can use the object lock
729
         * to synchronize between receive contextes.
730
         */
731
        cio->pending_reqs--;
732

    
733
        start = __get_bucket(cached, req->offset);
734
        end_size = __get_bucket(cached, req->offset + req->size - 1);
735

    
736
        /* Check if request has been correctly served */
737
        if (req->serviced % cached->bucket_size) {
738
                req->serviced = __quantize(req->serviced, cached->bucket_size);
739
        }
740

    
741
        if (req->serviced)
742
                end_serviced = __get_bucket(cached, req->offset + req->serviced - 1);
743
        else
744
                end_serviced = start;
745

    
746
        XSEGLOG2(&lc, D,"Stats: \n"
747
                        "start        = %lu\n"
748
                        "end_serviced = %lu\n"
749
                        "end_size     = %lu\n",
750
                        start, end_serviced, end_size);
751

    
752
        /* Check serviced buckets */
753
        for (i = start; i <= end_serviced && req->serviced; i++) {
754
                if (ce->status[i] != LOADING)
755
                        continue;
756

    
757
                XSEGLOG2(&lc, D, "Bucket %lu loading and reception successful\n",i);
758
                memcpy(ce->data + (i * cached->bucket_size), data, cached->bucket_size);
759
                ce->status[i] = VALID;
760
                xwaitq_signal(&ce->waitq[i]);
761
        }
762

    
763
        /* Check non-serviced buckets */
764
        for (; i <= end_size; i++) {
765
                if (ce->status[i] != LOADING)
766
                        continue;
767

    
768
                XSEGLOG2(&lc, D, "Bucket %lu loading but reception unsuccessful\n", i);
769
                ce->status[i] = INVALID;
770
                cio->state = CIO_FAILED;
771
                xwaitq_signal(&ce->waitq[i]);
772
        }
773

    
774
        xseg_put_request(peer->xseg, rc->req, pr->portno);
775
        free(rc);
776
        XSEGLOG2(&lc, D, "Finished\n");
777
}
778

    
779
void complete_write(void *arg)
780
{
781
        //for each bucket
782
        //        if WRITETHROUGH
783
        //                copy data to bucket
784
        //                mark as valid
785
        //        else if WRITEBACK
786
        //                if status writing
787
        //                        mark as valid
788
        //
789
        /*
790
         * In this context we hold a reference to the cache entry and
791
         * the assocciated lock
792
         */
793
        return;
794
}
795

    
796
static int handle_receive_read(struct peerd *peer, struct peer_req *pr,
797
                        struct xseg_request *req)
798
{
799
        XSEGLOG2(&lc, D, "Started\n");
800
        /*
801
         * Should be rentrant
802
         */
803
        struct cached *cached = __get_cached(peer);
804
        struct cache_io *cio = __get_cache_io(pr);
805
        /*assert there is a handler for received cio*/
806
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
807

    
808
        struct req_completion *rc;
809

    
810
        rc = malloc(sizeof(struct req_completion));
811
        if (!rc) {
812
                perror("malloc");
813
                return -1;
814
        }
815

    
816
        rc->pr = pr;
817
        rc->req = req;
818
        if (xworkq_enqueue(&ce->workq, complete_read, (void *)rc) < 0){
819
                free(rc);
820
                return -1;
821
                //TODO WHAT?
822
        }
823
        XSEGLOG2(&lc, D, "Finished\n");
824
        return 0;
825
}
826

    
827
static int handle_receive_write(struct peerd *peer, struct peer_req *pr)
828
{
829
        //enqueue_work
830
        return 0;
831
}
832

    
833
static int handle_delete(struct peerd *peer, struct peer_req *pr)
834
{
835
        //h = cache_lookup
836
        //if h
837
        //        cio->h = h
838
        //
839
        //send delete to blocker
840
        return 0;
841
}
842

    
843
static int handle_receive_delete(struct peerd *peer, struct peer_req *pr)
844
{
845
        //if success
846
        //        if cio->h
847
        //                //this should not write any dirty data
848
        //                xcache_remove(h)
849
        return 0;
850
}
851

    
852
static int forward_req(struct peerd *peer, struct peer_req *pr)
853
{
854
        /*
855
        struct cached *cached = __get_cached(peer);
856
        struct cache_io *cio = __get_cache_io(pr);
857

858
        xport p;
859
        p = xseg_forward(peer->xseg, req, pr->portno, X_ALLOC);
860

861
        xseg_signal(peer->xseg, p);
862

863
        */
864
        return 0;
865
}
866

    
867
static int handle_receive(struct peerd *peer, struct peer_req *pr,
868
                        struct xseg_request *req)
869
{
870
        //if not read/write/delete
871
        //        put req;
872
        //        complete or fail pr;
873
        int r;
874
        xport p;
875

    
876
        switch (req->op){
877
                case X_READ: r = handle_receive_read(peer, pr, req); break;
878
//                case X_WRITE: r = handle_receive_write(peer, pr, req); break;
879
//                case X_DELETE: r = handle_receive_delete(peer, pr, req); break;
880
                default:
881
                        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
882
                        if (p == NoPort)
883
                                xseg_put_request(peer->xseg, req, pr->portno);
884
                        break;
885
        }
886

    
887
        return 0;
888
}
889

    
890
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
891
                enum dispatch_reason reason)
892
{
893
        struct cache_io *cio = __get_cache_io(pr);
894

    
895
        switch (reason) {
896
                case dispatch_accept:
897
                        XSEGLOG2(&lc, D, "In dispatch accept");
898
                        if (req->op == X_READ || req->op == X_WRITE) {
899
                                /*We cache only read/write requests*/
900
                                cio->state = CIO_ACCEPTED;
901
                                handle_readwrite(peer, pr);
902
                        } else {
903
                                /*FIXME: Other requests should be forwarded to blocker*/
904
                                fail(peer, pr);
905
                        }
906
                        break;
907
                case dispatch_receive:
908
                        XSEGLOG2(&lc, D, "In dispatch receive");
909
                        handle_receive(peer, pr, req);
910
                        break;
911
                case dispatch_internal:
912
                default:
913
                        XSEGLOG2(&lc, E, "Invalid dispatch reason");
914
        }
915
        return 0;
916
}
917

    
918

    
919
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
920
{
921
        int i;
922
        char bucket_size[MAX_ARG_LEN + 1];
923
        char object_size[MAX_ARG_LEN + 1];
924
        char max_req_size[MAX_ARG_LEN + 1];
925
        char write_policy[MAX_ARG_LEN + 1];
926
        long bportno = -1;
927
        long cache_size = -1;
928

    
929
        bucket_size[0] = 0;
930
        object_size[0] = 0;
931
        max_req_size[0] = 0;
932
        write_policy[0] = 0;
933

    
934
        /*Allocate enough space for needed structs*/
935
        struct cached *cached = malloc(sizeof(struct cached));
936
        if (!cached) {
937
                perror("malloc");
938
                goto fail;
939
        }
940
        cached->cache = malloc(sizeof(struct xcache));
941
        if (!cached->cache) {
942
                perror("malloc");
943
                goto cache_fail;
944
        }
945
        peer->priv = cached;
946

    
947
        for (i = 0; i < peer->nr_ops; i++) {
948
                struct cache_io *cio = malloc(sizeof(struct cache_io));
949
                if (!cio) {
950
                        perror("malloc");
951
                        goto cio_fail;
952
                }
953
                cio->h = NoEntry;
954
                cio->pending_reqs = 0;
955
                peer->peer_reqs[i].priv = cio;
956
        }
957

    
958
        /*Read arguments*/
959
        BEGIN_READ_ARGS(argc, argv);
960
        READ_ARG_ULONG("-bp", bportno);
961
        READ_ARG_ULONG("-cs", cache_size);
962
        READ_ARG_STRING("-mrs", max_req_size, MAX_ARG_LEN);
963
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
964
        READ_ARG_STRING("-bs", bucket_size, MAX_ARG_LEN);
965
        READ_ARG_STRING("-wcp", write_policy, MAX_ARG_LEN);
966
        END_READ_ARGS();
967

    
968
        /*Parse arguments for:*/
969

    
970
        /*Bucket size*/
971
        if (!bucket_size[0]) {
972
                cached->bucket_size = BUCKET_SIZE_QUANTUM; /*Default value*/
973
        } else {
974
                cached->bucket_size = str2num(bucket_size);
975
                if (!cached->bucket_size) {
976
                        XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", bucket_size);
977
                        goto arg_fail;
978
                }
979
                if (cached->bucket_size % BUCKET_SIZE_QUANTUM) {
980
                        XSEGLOG2(&lc, E, "Misaligned bucket size: %s\n", bucket_size);
981
                        goto arg_fail;
982
                }
983
        }
984

    
985
        /*Object size*/
986
        if (!object_size[0])
987
                strcpy(object_size, "4M"); /*Default value*/
988

    
989
        cached->object_size = str2num(object_size);
990
        if (!cached->object_size) {
991
                XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
992
                goto arg_fail;
993
        }
994
        if (cached->object_size % cached->bucket_size) {
995
                XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
996
                goto arg_fail;
997
        }
998

    
999
        /*Max request size*/
1000
        if (!max_req_size[0]) {
1001
                XSEGLOG2(&lc, E, "Maximum request size must be provided\n");
1002
                goto arg_fail;
1003
        }
1004
        cached->max_req_size = str2num(max_req_size);
1005
        if (!cached->max_req_size) {
1006
                XSEGLOG2(&lc, E, "Invalid syntax: -mrs %s\n", max_req_size);
1007
                goto arg_fail;
1008
        }
1009
        if (cached->max_req_size % BUCKET_SIZE_QUANTUM) {
1010
                XSEGLOG2(&lc, E, "Misaligned maximum request size: %s\n",
1011
                                max_req_size);
1012
                goto arg_fail;
1013
        }
1014

    
1015
        /*Cache size*/
1016
        if (cache_size < 0) {
1017
                XSEGLOG2(&lc, E, "Cache size must be provided\n");
1018
                goto arg_fail;
1019
        }
1020
        cached->cache_size = cache_size;
1021

    
1022
        /*Blocker port*/
1023
        if (bportno < 0){
1024
                XSEGLOG2(&lc, E, "Blocker port must be provided");
1025
                goto arg_fail;
1026
        }
1027
        cached->bportno = bportno;
1028

    
1029
        /*Write policy*/
1030
        if (!write_policy[0]) {
1031
                XSEGLOG2(&lc, E, "Write policy must be provided");
1032
                goto arg_fail;
1033
        }
1034
        cached->write_policy = read_write_policy(write_policy);
1035
        if (cached->write_policy < 0) {
1036
                XSEGLOG2(&lc, E, "Invalid syntax: -wcp %s\n", write_policy);
1037
                goto arg_fail;
1038
        }
1039

    
1040
        cached->buckets_per_object = cached->object_size / cached->bucket_size;
1041

    
1042
        print_cached(cached);
1043

    
1044
        /*Initialize xcache*/
1045
        xcache_init(cached->cache, cached->cache_size, &c_ops, 0, peer);
1046

    
1047
        return 0;
1048

    
1049
arg_fail:
1050
        custom_peer_usage();
1051
cio_fail:
1052
        for (i = 0; i < peer->nr_ops && peer->peer_reqs[i].priv != NULL; i++)
1053
                free(peer->peer_reqs[i].priv);
1054
        free(cached->cache);
1055
cache_fail:
1056
        free(cached);
1057
fail:
1058
        return -1;
1059
}
1060

    
1061
void custom_peer_finalize(struct peerd *peer)
1062
{
1063
        //write dirty objects
1064
        //or cache_close(cached->cache);
1065
        return;
1066
}
1067

    
1068
void custom_peer_usage()
1069
{
1070
        fprintf(stderr, "Custom peer options: \n"
1071
                        "  --------------------------------------------\n"
1072
                        "    -cs       | None    | Number of objects to cache\n"
1073
                        "    -mrs      | None    | Max request size\n"
1074
                        "    -os       | 4M      | Object size\n"
1075
                        "    -bs       | 4K      | Bucket size\n"
1076
                        "    -bp       | None    | Blocker port\n"
1077
                        "    -wcp      | None    | Write policy [writethrough|writeback]\n"
1078
                        "\n");
1079
}