Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / cached.c @ 7cf0125c

History | View | Annotate | Download (24.7 kB)

1
/*
2
 * Copyright 2013 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <unistd.h>
37
#include <sys/types.h>
38
#include <pthread.h>
39
#include <xseg/xseg.h>
40
#include <peer.h>
41
#include <time.h>
42
#include <xtypes/xlock.h>
43
#include <xtypes/xq.h>
44
#include <xtypes/xhash.h>
45
#include <xtypes/xworkq.h>
46
#include <xtypes/xwaitq.h>
47
#include <xseg/protocol.h>
48
#include <xtypes/xcache.h>
49

    
50
#define MAX_ARG_LEN 12
51

    
52
/* bucket states */
53
#define INVALID 0
54
#define LOADING 1
55
#define VALID   2
56
#define DIRTY   3
57
#define WRITING 4
58

    
59
#define bucket_readable(__status) \
60
        (__status == VALID || __status == DIRTY || __status == WRITING)
61

    
62
/* write policies */
63
/*FIXME: Shouldn't they be on a flag?*/
64
#define WRITETHROUGH 1
65
#define WRITEBACK    2
66

    
67
#define WRITE_POLICY(__wcp)                                                        \
68
        __wcp == WRITETHROUGH ? "writethrough" :                \
69
        __wcp == WRITEBACK        ? "writeback" :                                \
70
        "undefined"
71

    
72

    
73

    
74
/* object states */
75
#define INVALIDATED (1 << 0)
76

    
77

    
78
/* cio states */
79
#define CIO_FAILED        1
80
#define CIO_ACCEPTED        2
81
#define CIO_READING        3
82

    
83
#define BUCKET_SIZE_QUANTUM 4096
84

    
85
struct cache_io {
86
        uint32_t state;
87
        xcache_handler h;
88
        uint32_t pending_reqs;
89
        struct work work;
90
};
91

    
92
struct cached {
93
        struct xcache *cache;
94
        uint64_t cache_size; /*Number of objects*/
95
        uint64_t max_req_size;
96
        uint32_t object_size; /*Bytes*/ /*TODO: change this to buckets_per_object*/
97
        uint32_t bucket_size; /*In bytes*/
98
        uint32_t buckets_per_object;
99
        xport bportno;
100
        int write_policy;
101
        //scheduler
102
};
103

    
104
struct ce {
105
        unsigned char *data;
106
        uint32_t *status;
107
        struct xwaitq *waitq;
108
        uint32_t flags;
109
        struct xlock lock;
110
        struct xworkq workq;
111
        struct peer_req pr;
112
};
113

    
114

    
115
/*
116
 * Helper functions
117
 */
118

    
119
static inline struct cached * __get_cached(struct peerd *peer)
120
{
121
        return (struct cached *) peer->priv;
122
}
123

    
124
static inline struct cache_io * __get_cache_io(struct peer_req *pr)
125
{
126
        return (struct cache_io *) pr->priv;
127
}
128

    
129
static inline uint32_t __calculate_size(struct cached *cached,
130
                uint32_t start, uint32_t end)
131
{
132
        return (end - start + 1) * cached->bucket_size;
133
}
134

    
135
static inline uint32_t __calculate_offset(struct cached *cached,
136
                uint32_t start)
137
{
138
        return start * cached->bucket_size;
139
}
140

    
141
static uint32_t __get_bucket(struct cached *cache, uint64_t offset)
142
{
143
        return (offset / cache->bucket_size);
144
}
145

    
146
static int is_not_loading(void *arg)
147
{
148
        uint32_t *status = (uint32_t *)arg;
149
        return (*status != LOADING);
150
}
151

    
152

    
153
static void print_cached(struct cached *cached)
154
{
155
        if (!cached) {
156
                XSEGLOG2(&lc, W, "Struct cached is NULL\n");
157
                return;
158
        }
159

    
160
        XSEGLOG2(&lc, I, "Struct cached fields:\n"
161
                        "                     cache        = %p\n"
162
                        "                     cache_size   = %lu\n"
163
                        "                     max_req_size = %lu\n"
164
                        "                     object_size  = %lu\n"
165
                        "                     bucket_size  = %lu\n"
166
                        "                     bucks_per_obj= %lu\n"
167
                        "                     Bportno      = %d\n"
168
                        "                     write_policy = %s\n",
169
                        cached->cache, cached->cache_size, cached->max_req_size,
170
                        cached->object_size, cached->bucket_size,
171
                        cached->buckets_per_object, cached->bportno,
172
                        WRITE_POLICY(cached->write_policy));
173
}
174

    
175
int read_write_policy(char *write_policy)
176
{
177
        if (strcmp(write_policy, "writethrough") == 0)
178
                return WRITETHROUGH;
179
        if (strcmp(write_policy, "writeback") == 0)
180
                return WRITEBACK;
181
        return -1;
182
}
183

    
184
/*
185
 * Convert string to size in bytes.
186
 * If syntax is invalid, return 0. Values such as zero and non-integer
187
 * multiples of segment's page size should not be accepted.
188
 */
189
uint64_t str2num(char *str)
190
{
191
        char *unit;
192
        uint64_t num;
193

    
194
        num = strtoll(str, &unit, 10);
195
        if (strlen(unit) > 1) //Invalid syntax
196
                return 0;
197
        else if (strlen(unit) < 1) //Plain number in bytes
198
                return num;
199

    
200
        switch (*unit) {
201
                case 'g':
202
                case 'G':
203
                        num *= 1024;
204
                case 'm':
205
                case 'M':
206
                        num *= 1024;
207
                case 'k':
208
                case 'K':
209
                        num *= 1024;
210
                        break;
211
                default:
212
                        num = 0;
213
        }
214
        return num;
215
}
216

    
217
/*
218
 * rw_range handles the issuing of requests to the blocker. Usually called when
219
 * we need to read(write) data from(to) slower media.
220
 */
221
static int rw_range(struct peerd *peer, struct peer_req *pr, int action,
222
                uint32_t start, uint32_t end)
223
{
224
        struct cached *cached = __get_cached(peer);
225
        struct cache_io *cio = __get_cache_io(pr);
226
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
227
        struct xseg_request *req;
228
        struct xseg_request *req_old = pr->req;
229
        struct xseg *xseg = peer->xseg;
230
        xport srcport = pr->portno;
231
        xport dstport = cached->bportno;
232
        xport p;
233
        char *req_target;
234
        int r;
235
        uint32_t i;
236

    
237
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
238
        if (!req) {
239
                XSEGLOG2(&lc, W, "Cannot get request\n");
240
                return -1;
241
        }
242

    
243
        req->size = __calculate_size(cached, start, end);
244
        req->offset = __calculate_offset(cached, start);
245
        req->op = req_old->op;
246

    
247
        //Allocate enough space for the data and the target's name
248
        r = xseg_prep_request(xseg, req, req_old->targetlen, req->size);
249
        if (r < 0) {
250
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
251
                                req_old->targetlen, (unsigned long long)req->size);
252
                goto put_xseg_request;
253
        }
254

    
255
        req_target = xseg_get_target(xseg, req);
256
        char *req_old_target = xseg_get_target(xseg, req_old);
257
        strncpy(req_target, req_old_target, req_old->targetlen);
258

    
259
        if (req->op == X_WRITE) {
260
                /*
261
                 * //Paste data
262
                 * req_data = xseg_get_data(xseg, req);
263
                 * memcpy(req_data, req_old->data, size);
264
                 */
265
        } else {
266
                for (i=start; i<=end; i++){
267
                        ce->status[i] = LOADING;
268
                }
269
        }
270

    
271
        r = xseg_set_req_data(xseg, req, pr);
272
        if (r < 0) {
273
                XSEGLOG2(&lc, W, "Cannot set request data\n");
274
                goto put_xseg_request;
275
        }
276

    
277
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
278
        if (p == NoPort) {
279
                XSEGLOG2(&lc, W, "Cannot submit request\n");
280
                goto out_unset_data;
281
        }
282

    
283
        r = xseg_signal(xseg, p);
284

    
285
        return 0;
286

    
287
out_unset_data:
288
        xseg_set_req_data(xseg, req, NULL);
289
put_xseg_request:
290
        if (xseg_put_request(xseg, req, srcport))
291
                XSEGLOG2(&lc, W, "Cannot put request\n");
292
        return -1;
293
}
294

    
295
int on_init(void *c, void *e)
296
{
297
        uint32_t i;
298
        struct peerd *peer = (struct peerd *)c;
299
        struct cached *cached = peer->priv;
300
        struct ce *ce = (struct ce *)e;
301
        ce->flags = 0;
302
        memset(ce->data, 0, cached->object_size);
303
        for (i = 0; i < cached->buckets_per_object; i++) {
304
                ce->status[i] = INVALID;
305
        }
306
        xlock_release(&ce->lock);
307
        return 0;
308
}
309

    
310
void on_put(void *c, void *e)
311
{
312
        struct peerd *peer = (struct peerd *)c;
313
        struct cached *cached = peer->priv;
314
        struct ce *ce = (struct ce *)e;
315
        //since we are the last referrer to the cache entry
316
        //no lock is needed.
317

    
318
        XSEGLOG2(&lc, D, "Putting cache entry %p", e);
319

    
320
        uint32_t start, end, i = 0;
321
        if (cached->write_policy == WRITETHROUGH || ce->flags & INVALIDATED)
322
                return;
323
        //write all dirty buckets.
324
        while(i < cached->buckets_per_object){
325
                if (ce->status[i] != DIRTY){
326
                        i++;
327
                        continue;
328
                }
329
                start = i;
330
                while (i < cached->buckets_per_object &&
331
                        (i-start)*cached->bucket_size < cached->max_req_size &&
332
                                ce->status[i] == DIRTY){
333
                        i++;
334
                }
335
                end = i;
336
                //problem: no assocciated pr
337
                //maybe put one in cache entry
338
                //rw_range(cached, ce, 1, start, end);
339
        }
340
}
341

    
342
void * init_node(void *c)
343
{
344
        int i;
345
        struct peerd *peer = (struct peerd *)c;
346
        struct cached *cached = peer->priv;
347

    
348
        struct ce *ce = malloc(sizeof(struct ce));
349
        if (!ce)
350
                goto ce_fail;
351
        xlock_release(&ce->lock);
352

    
353
        ce->data = malloc(sizeof(unsigned char) * cached->object_size);
354
        ce->status = malloc(sizeof(uint32_t) * cached->buckets_per_object);
355
        ce->waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object);
356
        if (!ce->data || !ce->status || !ce->waitq)
357
                goto ce_fields_fail;
358

    
359
        ce->pr.peer = peer;
360
        for (i = 0; i < cached->buckets_per_object; i++) {
361
                xwaitq_init(&ce->waitq[i], is_not_loading, &ce->status[i],
362
                                XWAIT_SIGNAL_ONE);
363
        }
364
        xworkq_init(&ce->workq, &ce->lock, 0);
365
        return ce;
366

    
367
ce_fields_fail:
368
        free(ce->data);
369
        free(ce->status);
370
        free(ce->waitq);
371
        free(ce);
372
ce_fail:
373
        perror("malloc");
374
        return NULL;
375
}
376

    
377
struct xcache_ops c_ops = {
378
        .on_init = on_init,
379
        .on_put  = on_put,
380
        .on_node_init = init_node
381
};
382

    
383
static uint32_t __get_next_invalid(struct ce *ce, uint32_t start,
384
                                        uint32_t limit)
385
{
386
        uint32_t end = start + 1;
387
        while (end <= limit && ce->status[end] == INVALID)
388
                end++;
389
        return end;
390
}
391

    
392
static void cached_fail(struct peerd *peer, struct peer_req *pr)
393
{
394
        struct cached *cached = __get_cached(peer);
395
        struct cache_io *cio = __get_cache_io(pr);
396
        if (cio->h != NoEntry){
397
                xcache_put(cached->cache, cio->h);
398
        }
399
        cio->h = NoEntry;
400
        fail(peer, pr);
401
}
402

    
403
static void cached_complete(struct peerd *peer, struct peer_req *pr)
404
{
405
        struct cached *cached = __get_cached(peer);
406
        struct cache_io *cio = __get_cache_io(pr);
407
        if (cio->h != NoEntry){
408
                xcache_put(cached->cache, cio->h);
409
        }
410
        cio->h = NoEntry;
411
        complete(peer, pr);
412
}
413

    
414
static void handle_read(void *arg);
415
//is this necessary?
416
static void status_changed(void *arg)
417
{
418
        /*
419
         * In this context we hold a reference to the cache entry.
420
         *
421
         * This function gets called only after the bucket at which the
422
         * current peer_req is waiting, has finished loading of failed.
423
         *
424
         * Assumptions:
425
         *         Each pr waits only at one bucket at any time. That means that
426
         *         under no circumstances, this function get called simutaneously
427
         *         for the same pr.
428
         */
429
        struct peer_req *pr = (struct peer_req *)arg;
430
        struct peerd *peer = pr->peer;
431
        struct cached *cached = __get_cached(peer);
432
        struct cache_io *cio = __get_cache_io(pr);
433
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
434

    
435
        if (xworkq_enqueue(&ce->workq, handle_read, (void *)pr) < 0){
436
                //FAIL or mark as failed ? are we the last?
437
                if (cio->pending_reqs){
438
                        // cannot put here, since there are outstanding reqs to
439
                        // be received.
440
                        // Simply mark pr as failed.
441
                        cio->state = CIO_FAILED;
442
                } else {
443
                        //safe to fail here, since there is no pending action on
444
                        //this pr.
445
                        cached_fail(peer, pr);
446
                }
447
        }
448
}
449

    
450
/*
451
 * handle_read reads all buckets within a given request's range.
452
 * If a bucket is:
453
 * VALID || DIRTY || WRITING: it's good to read.
454
 * INVALID: we have to issue a request (via blocker) to read it from slower
455
 *          media.
456
 * LOADING: We have to wait (on a waitq) for the slower media to answer our
457
 *          previous request.
458
 */
459
static void handle_read(void *arg)
460
{
461
        /*
462
         * In this context we hold a reference to the cache entry and
463
         * the assocciated lock
464
         */
465

    
466
        struct peer_req *pr = (struct peer_req *)arg;
467
        struct peerd *peer = pr->peer;
468
        struct cached *cached = __get_cached(peer);
469
        struct cache_io *cio = __get_cache_io(pr);
470
        struct xseg_request *req = pr->req;
471
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
472

    
473
        uint32_t start_bucket, end_bucket, next;
474
        uint32_t i, b, limit;
475

    
476
        uint32_t pending_buckets = 0;
477

    
478
        XSEGLOG2(&lc, E, "Handle read started for %p (ce: %p)", pr, ce );
479
        if (cio->state == CIO_FAILED)
480
                goto out;
481

    
482
        b = __get_bucket(cached, req->offset);
483
        limit = __get_bucket(cached, req->offset + req->size);
484
        //assert limit < cached->object_size
485

    
486
        for (i = b; i <= limit; i++) {
487
                if (bucket_readable(ce->status[i]))
488
                        continue;
489
                if (ce->status[i] != LOADING){
490
                        XSEGLOG2(&lc, I, "Found invalid bucket %lu\n", i);
491
                        start_bucket = i;
492
                        end_bucket = __get_next_invalid(ce, start_bucket, limit) - 1;
493
                        i = end_bucket;
494
                        if (rw_range(peer, pr, 0, start_bucket, end_bucket) < 0){
495
                                cio->state = CIO_FAILED;
496
                                break;
497
                        }
498
                        cio->pending_reqs++;
499
                        cio->state =  CIO_READING;
500
                }
501
                pending_buckets++;
502
        }
503

    
504
        if (pending_buckets) {
505
                XSEGLOG2(&lc, D, "Pending buckets exists: %u\n", pending_buckets);
506
                /* Do not put cache entry yet */
507
                cio->work.job_fn = handle_read;
508
                cio->work.job = pr;
509
                /* wait on the last bucket */
510
                XSEGLOG2(&lc, I, "Enqueuing cio %p in waitq (fn: handle_read).\n", cio);
511
                xwaitq_enqueue(&ce->waitq[end_bucket], &cio->work);
512
                XSEGLOG2(&lc, I, "Handle_read returned after enqueuing cio %p in waitq.\n", cio);
513
                return;
514
        }
515

    
516
out:
517
        if (cio->state == CIO_FAILED){
518
                if (!cio->pending_reqs)
519
                        cached_fail(peer, pr);
520
        }
521
        else{
522
                //serve req;
523
                cached_complete(peer, pr);
524
        }
525
        return;
526
}
527

    
528
static void handle_write(void *arg)
529
{
530
        //if writeback
531
        //          for each bucket
532
        //                write all buckets
533
        //                mark them as dirty
534
        //        cache_put(h)
535
        //        complete
536
        //else
537
        //        send write to blocker
538

    
539
        /*
540
         * In this context we hold a reference to the cache entry and
541
         * the assocciated lock
542
         */
543

    
544
        int r;
545
        struct peer_req *pr = (struct peer_req *)arg;
546
        struct peerd *peer = pr->peer;
547
        struct cached *cached = __get_cached(peer);
548
        struct cache_io *cio = __get_cache_io(pr);
549
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
550
        (void)ce;
551

    
552
        if (cached->write_policy == WRITETHROUGH){
553
                //send write to blocker
554
                //return
555
        } else if (cached->write_policy == WRITEBACK) {
556
                //for each bucket
557
                //        write all buckets
558
                //        mark them as dirty
559
                r = 0;
560
        } else {
561
                r = -1;
562
        }
563

    
564
out:
565
        if (r < 0)
566
                cached_fail(peer, pr);
567
        else
568
                cached_complete(peer, pr);
569
        return;
570
}
571

    
572
/*
573
 * handle_readwrite is called when we accept a request.
574
 * Its purpose is to find a handler associated with the request's target
575
 * object (partial cache hit), or to allocate a new one (cache_miss) and insert
576
 * it in xcache.
577
 *
578
 * Depending on the op type, a handler function is enqueued in the workq of the
579
 * target's cache_entry.
580
 */
581
static int handle_readwrite(struct peerd *peer, struct peer_req *pr)
582
{
583
        struct ce *ce;
584
        struct cached *cached = __get_cached(peer);
585
        struct cache_io *cio = __get_cache_io(pr);
586
        struct xseg_request *req = pr->req;
587
        char name[XSEG_MAX_TARGETLEN + 1];
588
        char *target;
589
        int r = -1;
590
        xcache_handler h = NoEntry, nh;
591

    
592
        XSEGLOG2(&lc, I, "Started\n");
593

    
594
        target = xseg_get_target(peer->xseg, req);
595
        strncpy(name, target, req->targetlen);
596
        name[req->targetlen] = 0;
597
        XSEGLOG2(&lc, D, "Target is %s\n", name);
598

    
599
        h = xcache_lookup(cached->cache, name);
600
        if (h == NoEntry){
601
                h = xcache_alloc_init(cached->cache, name);
602
                if (h == NoEntry){
603
                        XSEGLOG2(&lc, E, "Could not allocate cache entry");
604
                        goto out;
605
                }
606
                nh = xcache_insert(cached->cache, h);
607
                if (nh == NoEntry){
608
                        xcache_put(cached->cache, h);
609
                        XSEGLOG2(&lc, E, "Could not insert cache entry");
610
                        goto out;
611
                }
612
                if (nh != h){
613
                        /* if insert returns another cache entry than the one we
614
                         * allocated and requested to be inserted, then
615
                         * someone else beat us to the insertion of a cache
616
                         * entry assocciated with the same name. Use this cache
617
                         * entry instead and put the one we allocated.
618
                         */
619
                        xcache_put(cached->cache, h);
620
                        h = nh;
621
                }
622
        }
623

    
624
        ce = (struct ce *)get_cache_entry(cached->cache, h);
625
        if (!ce){
626
                r = -1;
627
                XSEGLOG2(&lc, E, "Received cache entry handler %lu but no cache entry", h);
628
                goto out;
629
        }
630
        cio->h = h;
631

    
632
        if (req->op == X_WRITE)
633
                r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr);
634
        else if (req->op == X_READ)
635
                r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr);
636
        else {
637
                r = -1;
638
                XSEGLOG2(&lc, E, "Invalid op %u", req->op);
639
                goto out;
640
        }
641

    
642
out:
643
        if (r < 0){
644
                XSEGLOG2(&lc, E, "Failing pr %p", pr);
645
                cached_fail(peer, pr);
646
        }
647
        return r;
648

    
649
}
650

    
651
struct req_completion{
652
        struct peer_req *pr;
653
        struct xseg_request *req;
654
};
655

    
656
static void complete_read(void *arg)
657
{
658
        /*
659
         * In this context we hold a reference to the cache entry and
660
         * the assocciated lock
661
         */
662
        XSEGLOG2(&lc, I, "Started\n");
663

    
664
        struct req_completion *rc = (struct req_completion *)arg;
665
        struct peer_req *pr = rc->pr;
666
        struct xseg_request *req = rc->req;
667
        struct peerd *peer = pr->peer;
668
        struct cached *cached = __get_cached(peer);
669
        struct cache_io *cio = __get_cache_io(pr);
670
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
671
        uint32_t start, end, i;
672
        int success;
673
        char *data = xseg_get_data(peer->xseg, req);
674

    
675
        /*
676
         * Synchronize pending_reqs of the cache_io here, since each cache_io
677
         * refers to only one object, and therefore we can use the object lock
678
         * to synchronize between receive contextes.
679
         */
680
        cio->pending_reqs--;
681
        success = (req->state == XS_SERVED && req->serviced == req->size);
682
        if (!success)
683
                cio->state = CIO_FAILED;
684
        //assert (req->offset % cached->bucket_size) == 0;
685
        //assert ((req->offset+req->serviced) % cached->bucket_size) == 0;
686
        start = __get_bucket(cached, req->offset);
687
        end = __get_bucket(cached, req->offset + req->serviced);
688
        for (i = start; i <= end; i++) {
689
                if (ce->status[i] == LOADING){
690
                        if (success){
691
                                XSEGLOG2(&lc, I,
692
                                                "Bucket %lu loading and reception successful\n", i);
693
                                memcpy(ce->data+(i*cached->bucket_size), data,
694
                                                cached->bucket_size);
695
                                ce->status[i] = VALID;
696
                        }
697
                        else {
698
                                XSEGLOG2(&lc, I,
699
                                                "Bucket %lu loading but reception unsuccessful\n", i);
700
                                //reset status
701
                                XSEGLOG2(&lc, E, "Before ce %p, i %lu, ce->status[i] %u", ce, i, ce->status[i]);
702
                                ce->status[i] = INVALID;
703
                                XSEGLOG2(&lc, E, "After ce %p, i %lu, ce->status[i] %u", ce, i, ce->status[i]);
704
                        }
705
                        xwaitq_signal(&ce->waitq[i]);
706
                }
707
        }
708
        xseg_put_request(peer->xseg, rc->req, pr->portno);
709
        free(rc);
710
        XSEGLOG2(&lc, I, "Finished\n");
711
}
712

    
713
void complete_write(void *arg)
714
{
715
        //for each bucket
716
        //        if WRITETHROUGH
717
        //                copy data to bucket
718
        //                mark as valid
719
        //        else if WRITEBACK
720
        //                if status writing
721
        //                        mark as valid
722
        //
723
        /*
724
         * In this context we hold a reference to the cache entry and
725
         * the assocciated lock
726
         */
727
        return;
728
}
729

    
730
static int handle_receive_read(struct peerd *peer, struct peer_req *pr,
731
                        struct xseg_request *req)
732
{
733
        XSEGLOG2(&lc, I, "Started\n");
734
        /*
735
         * Should be rentrant
736
         */
737
        struct cached *cached = __get_cached(peer);
738
        struct cache_io *cio = __get_cache_io(pr);
739
        /*assert there is a handler for received cio*/
740
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
741

    
742
        struct req_completion *rc;
743

    
744
        rc = malloc(sizeof(struct req_completion));
745
        if (!rc) {
746
                perror("malloc");
747
                return -1;
748
        }
749

    
750
        rc->pr = pr;
751
        rc->req = req;
752
        if (xworkq_enqueue(&ce->workq, complete_read, (void *)rc) < 0){
753
                free(rc);
754
                return -1;
755
                //TODO WHAT?
756
        }
757
        XSEGLOG2(&lc, I, "Finished\n");
758
        return 0;
759
}
760

    
761
static int handle_receive_write(struct peerd *peer, struct peer_req *pr)
762
{
763
        //enqueue_work
764
        return 0;
765
}
766

    
767
static int handle_delete(struct peerd *peer, struct peer_req *pr)
768
{
769
        //h = cache_lookup
770
        //if h
771
        //        cio->h = h
772
        //
773
        //send delete to blocker
774
        return 0;
775
}
776

    
777
static int handle_receive_delete(struct peerd *peer, struct peer_req *pr)
778
{
779
        //if success
780
        //        if cio->h
781
        //                //this should not write any dirty data
782
        //                xcache_remove(h)
783
        return 0;
784
}
785

    
786
static int forward_req(struct peerd *peer, struct peer_req *pr)
787
{
788
        /*
789
        struct cached *cached = __get_cached(peer);
790
        struct cache_io *cio = __get_cache_io(pr);
791

792
        xport p;
793
        p = xseg_forward(peer->xseg, req, pr->portno, X_ALLOC);
794

795
        xseg_signal(peer->xseg, p);
796

797
        */
798
        return 0;
799
}
800

    
801
static int handle_receive(struct peerd *peer, struct peer_req *pr,
802
                        struct xseg_request *req)
803
{
804
        //if not read/write/delete
805
        //        put req;
806
        //        complete or fail pr;
807
        int r;
808
        xport p;
809

    
810
        switch (req->op){
811
                case X_READ: r = handle_receive_read(peer, pr, req); break;
812
//                case X_WRITE: r = handle_receive_write(peer, pr, req); break;
813
//                case X_DELETE: r = handle_receive_delete(peer, pr, req); break;
814
                default:
815
                        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
816
                        if (p == NoPort)
817
                                xseg_put_request(peer->xseg, req, pr->portno);
818
                        break;
819
        }
820

    
821
        return 0;
822
}
823

    
824
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
825
                enum dispatch_reason reason)
826
{
827
        struct cached *cached = __get_cached(peer);
828
        struct cache_io *cio = __get_cache_io(pr);
829

    
830
        switch (reason) {
831
                case dispatch_accept:
832
                        XSEGLOG2(&lc, D, "In dispatch accept");
833
                        if (req->op == X_READ || req->op == X_WRITE) {
834
                                /*We cache only read/write requests*/
835
                                cio->state = CIO_ACCEPTED;
836
                                handle_readwrite(peer, pr);
837
                        } else {
838
                                /*FIXME: Other requests should be forwarded to blocker*/
839
                                fail(peer, pr);
840
                        }
841
                        break;
842
                case dispatch_receive:
843
                        XSEGLOG2(&lc, D, "In dispatch receive");
844
                        handle_receive(peer, pr, req);
845
                        break;
846
                case dispatch_internal:
847
                default:
848
                        XSEGLOG2(&lc, E, "Invalid dispatch reason");
849
        }
850
        return 0;
851
}
852

    
853

    
854
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
855
{
856
        int i;
857
        char bucket_size[MAX_ARG_LEN + 1];
858
        char object_size[MAX_ARG_LEN + 1];
859
        char max_req_size[MAX_ARG_LEN + 1];
860
        char write_policy[MAX_ARG_LEN + 1];
861
        long bportno = -1;
862
        long cache_size = -1;
863

    
864
        bucket_size[0] = 0;
865
        object_size[0] = 0;
866
        max_req_size[0] = 0;
867
        write_policy[0] = 0;
868

    
869
        /*Allocate enough space for needed structs*/
870
        struct cached *cached = malloc(sizeof(struct cached));
871
        if (!cached) {
872
                perror("malloc");
873
                goto fail;
874
        }
875
        cached->cache = malloc(sizeof(struct xcache));
876
        if (!cached->cache) {
877
                perror("malloc");
878
                goto cache_fail;
879
        }
880
        peer->priv = cached;
881

    
882
        for (i = 0; i < peer->nr_ops; i++) {
883
                struct cache_io *cio = malloc(sizeof(struct cache_io));
884
                if (!cio) {
885
                        perror("malloc");
886
                        goto cio_fail;
887
                }
888
                cio->h = NoEntry;
889
                cio->pending_reqs = 0;
890
                peer->peer_reqs[i].priv = cio;
891
        }
892

    
893
        /*Read arguments*/
894
        BEGIN_READ_ARGS(argc, argv);
895
        READ_ARG_ULONG("-bp", bportno);
896
        READ_ARG_ULONG("-cs", cache_size);
897
        READ_ARG_STRING("-mrs", max_req_size, MAX_ARG_LEN);
898
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
899
        READ_ARG_STRING("-bs", bucket_size, MAX_ARG_LEN);
900
        READ_ARG_STRING("-wcp", write_policy, MAX_ARG_LEN);
901
        END_READ_ARGS();
902

    
903
        /*Parse arguments for:*/
904

    
905
        /*Bucket size*/
906
        if (!bucket_size[0]) {
907
                cached->bucket_size = BUCKET_SIZE_QUANTUM; /*Default value*/
908
        } else {
909
                cached->bucket_size = str2num(bucket_size);
910
                if (!cached->bucket_size) {
911
                        XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", bucket_size);
912
                        goto arg_fail;
913
                }
914
                if (cached->bucket_size % BUCKET_SIZE_QUANTUM) {
915
                        XSEGLOG2(&lc, E, "Misaligned bucket size: %s\n", bucket_size);
916
                        goto arg_fail;
917
                }
918
        }
919

    
920
        /*Object size*/
921
        if (!object_size[0])
922
                strcpy(object_size, "4M"); /*Default value*/
923

    
924
        cached->object_size = str2num(object_size);
925
        if (!cached->object_size) {
926
                XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
927
                goto arg_fail;
928
        }
929
        if (cached->object_size % cached->bucket_size) {
930
                XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
931
                goto arg_fail;
932
        }
933

    
934
        /*Max request size*/
935
        if (!max_req_size[0]) {
936
                XSEGLOG2(&lc, E, "Maximum request size must be provided\n");
937
                goto arg_fail;
938
        }
939
        cached->max_req_size = str2num(max_req_size);
940
        if (!cached->max_req_size) {
941
                XSEGLOG2(&lc, E, "Invalid syntax: -mrs %s\n", max_req_size);
942
                goto arg_fail;
943
        }
944
        if (cached->max_req_size % BUCKET_SIZE_QUANTUM) {
945
                XSEGLOG2(&lc, E, "Misaligned maximum request size: %s\n",
946
                                max_req_size);
947
                goto arg_fail;
948
        }
949

    
950
        /*Cache size*/
951
        if (cache_size < 0) {
952
                XSEGLOG2(&lc, E, "Cache size must be provided\n");
953
                goto arg_fail;
954
        }
955
        cached->cache_size = cache_size;
956

    
957
        /*Blocker port*/
958
        if (bportno < 0){
959
                XSEGLOG2(&lc, E, "Blocker port must be provided");
960
                goto arg_fail;
961
        }
962
        cached->bportno = bportno;
963

    
964
        /*Write policy*/
965
        if (!write_policy[0]) {
966
                XSEGLOG2(&lc, E, "Write policy must be provided");
967
                goto arg_fail;
968
        }
969
        cached->write_policy = read_write_policy(write_policy);
970
        if (cached->write_policy < 0) {
971
                XSEGLOG2(&lc, E, "Invalid syntax: -wcp %s\n", write_policy);
972
                goto arg_fail;
973
        }
974

    
975
        cached->buckets_per_object = cached->object_size / cached->bucket_size;
976

    
977
        print_cached(cached);
978

    
979
        /*Initialize xcache*/
980
        xcache_init(cached->cache, cached->cache_size, &c_ops, XCACHE_LRU_HEAP,
981
                        peer);
982

    
983
        return 0;
984

    
985
arg_fail:
986
        custom_peer_usage();
987
cio_fail:
988
        for (i = 0; i < peer->nr_ops && peer->peer_reqs[i].priv != NULL; i++)
989
                free(peer->peer_reqs[i].priv);
990
        free(cached->cache);
991
cache_fail:
992
        free(cached);
993
fail:
994
        return -1;
995
}
996

    
997
void custom_peer_finalize(struct peerd *peer)
998
{
999
        //write dirty objects
1000
        //or cache_close(cached->cache);
1001
        return;
1002
}
1003

    
1004
void custom_peer_usage()
1005
{
1006
        fprintf(stderr, "Custom peer options: \n"
1007
                        "  --------------------------------------------\n"
1008
                        "    -cs       | None    | Number of objects to cache\n"
1009
                        "    -mrs      | None    | Max request size\n"
1010
                        "    -os       | 4M      | Object size\n"
1011
                        "    -bs       | 4K      | Bucket size\n"
1012
                        "    -bp       | None    | Blocker port\n"
1013
                        "    -wcp      | None    | Write policy [writethrough|writeback]\n"
1014
                        "\n");
1015
}