Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / cached.c @ 21c0e7f3

History | View | Annotate | Download (22.7 kB)

1
/*
2
 * Copyright 2013 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <unistd.h>
37
#include <sys/types.h>
38
#include <pthread.h>
39
#include <xseg/xseg.h>
40
#include <peer.h>
41
#include <time.h>
42
#include <xtypes/xlock.h>
43
#include <xtypes/xq.h>
44
#include <xtypes/xhash.h>
45
#include <xtypes/xworkq.h>
46
#include <xtypes/xwaitq.h>
47
#include <xseg/protocol.h>
48
#include <xtypes/xcache.h>
49

    
50
#define MAX_ARG_LEN 12
51

    
52
/* bucket states */
53
#define INVALID 0
54
#define LOADING 1
55
#define VALID   2
56
#define DIRTY   3
57
#define WRITING 4
58

    
59
#define bucket_readable(__status) \
60
        (__status == VALID || __status == DIRTY || __status == WRITING)
61

    
62
/* write policies */
63
/*FIXME: Shouldn't they be on a flag?*/
64
#define WRITETHROUGH 1
65
#define WRITEBACK    2
66

    
67

    
68
/* object states */
69
#define INVALIDATED (1 << 0)
70

    
71

    
72
/* cio states */
73
#define CIO_FAILED        1
74
#define CIO_ACCEPTED        2
75
#define CIO_READING        3
76

    
77
#define BUCKET_SIZE_QUANTUM 4096
78

    
79
struct cache_io {
80
        uint32_t state;
81
        xcache_handler h;
82
        uint32_t pending_reqs;
83
        struct work work;
84
};
85

    
86
struct cached {
87
        struct xcache *cache;
88
        uint64_t cache_size; /*Number of objects*/
89
        uint64_t max_req_size;
90
        uint32_t object_size; /*Bytes*/ /*TODO: change this to buckets_per_object*/
91
        uint32_t bucket_size; /*In bytes*/
92
        uint32_t buckets_per_object;
93
        xport bportno;
94
        int write_policy;
95
        //scheduler
96
};
97

    
98
struct ce {
99
        unsigned char *data;
100
        uint32_t *status;
101
        struct xwaitq *waitq;
102
        uint32_t flags;
103
        struct xlock lock;
104
        struct xworkq workq;
105
        struct peer_req pr;
106
};
107

    
108

    
109
/*
110
 * Helper functions
111
 */
112

    
113
static inline struct cached * __get_cached(struct peerd *peer)
114
{
115
        return (struct cached *) peer->priv;
116
}
117

    
118
static inline struct cache_io * __get_cache_io(struct peer_req *pr)
119
{
120
        return (struct cache_io *) pr->priv;
121
}
122

    
123
static inline uint32_t __calculate_size(uint32_t start, uint32_t end,
124
                struct cached *cached)
125
{
126
        return (end - start + 1) * cached->bucket_size;
127
}
128

    
129
static inline uint32_t __calculate_offset(uint32_t start,
130
                struct cached *cached)
131
{
132
        return start * cached->bucket_size;
133
}
134

    
135
static uint32_t __get_bucket(struct cached *cache, uint64_t offset)
136
{
137
        return (offset / cache->bucket_size);
138
}
139

    
140
static int is_loading(void *arg)
141
{
142
        uint32_t *status = (uint32_t *)arg;
143
        return (*status == LOADING);
144
}
145

    
146
static void print_cached(struct cached *cached)
147
{
148
        if (!cached) {
149
                XSEGLOG2(&lc, W, "Struct cached is NULL\n");
150
                return;
151
        }
152

    
153
        XSEGLOG2(&lc, I, "Struct cached fields:\n"
154
                        "                     cache        = %p\n"
155
                        "                     cache_size   = %lu\n"
156
                        "                     max_req_size = %lu\n"
157
                        "                     object_size  = %lu\n"
158
                        "                     bucket_size  = %lu\n"
159
                        "                     buckets      = %lu\n"
160
                        "                     Bportno      = %d\n",
161
                        cached->cache, cached->cache_size, cached->max_req_size,
162
                        cached->object_size, cached->bucket_size,
163
                        cached->buckets_per_object, cached->bportno);
164
}
165

    
166

    
167
/*
168
 * Convert string to size in bytes.
169
 * If syntax is invalid, return 0. Values such as zero and non-integer
170
 * multiples of segment's page size should not be accepted.
171
 */
172
uint64_t str2num(char *str)
173
{
174
        char *unit;
175
        uint64_t num;
176

    
177
        num = strtoll(str, &unit, 10);
178
        if (strlen(unit) > 1) //Invalid syntax
179
                return 0;
180
        else if (strlen(unit) < 1) //Plain number in bytes
181
                return num;
182

    
183
        switch (*unit) {
184
                case 'g':
185
                case 'G':
186
                        num *= 1024;
187
                case 'm':
188
                case 'M':
189
                        num *= 1024;
190
                case 'k':
191
                case 'K':
192
                        num *= 1024;
193
                        break;
194
                default:
195
                        num = 0;
196
        }
197
        return num;
198
}
199

    
200
static int rw_range(struct peerd *peer, struct peer_req *pr, int action,
201
                uint32_t start, uint32_t end)
202
{
203
        struct cached *cached = __get_cached(peer);
204
        struct cache_io *cio = __get_cache_io(pr);
205
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
206
        /*
207
         * get_req
208
         * prep_req
209
         * read or write data
210
         * associate req with pr
211
         * submit
212
         * signal
213
         */
214
        struct xseg_request *req;
215
        struct xseg_request *req_old = pr->req;
216
        struct xseg *xseg = peer->xseg;
217
        //FIXME: Which of those for srcport?
218
        //xport srcport = peer->portno_start;
219
        //FIXME: pr->portnumber
220
        xport srcport = pr->portno;
221
        xport dstport = cached->bportno;
222
        xport p;
223
        char *req_target;
224
        int r;
225
        uint32_t i;
226

    
227
        req = xseg_get_request(xseg, srcport, dstport, X_ALLOC);
228
        if (!req) {
229
                XSEGLOG2(&lc, W, "Cannot get request\n");
230
                return -1;
231
        }
232

    
233
        //FIXME: swap args
234
        req->size = __calculate_size(start, end, cached);
235
        req->offset = __calculate_offset(start, cached);
236
        req->op = req_old->op;
237

    
238
        //Allocate enough space for the data and the target's name
239
        r = xseg_prep_request(xseg, req, XSEG_MAX_TARGETLEN, req->size);
240
        if (r < 0) {
241
                XSEGLOG2(&lc, W, "Cannot prepare request! (%lu, %llu)\n",
242
                                XSEG_MAX_TARGETLEN, (unsigned long long)req->size);
243
                goto put_xseg_request;
244
        }
245

    
246
        //Paste target name
247
        req_target = xseg_get_target(xseg, req);
248
        char *req_old_target = xseg_get_target(xseg, req_old);
249
        strncpy(req_target, req_old_target, XSEG_MAX_TARGETLEN);
250

    
251
        if (req->op == X_WRITE) {
252
                /*
253
                 * //Paste data
254
                 * req_data = xseg_get_data(xseg, req);
255
                 * memcpy(req_data, req_old->data, size);
256
                 */
257
        } else {
258
                for (i=start; i<=end; i++){
259
                        ce->status[i] = LOADING;
260
                }
261
        }
262

    
263
        /*
264
         * We can avoid allocating a new peer_request, if we take extra care
265
         * when we receive ours back.
266
         */
267
#if 0
268
        pr = alloc_peer_req(peer);
269
        if (!pr) {
270
                XSEGLOG2(&lc, W, "Cannot allocate peer request (%ld remaining)\n",
271
                                peer->nr_ops - xq_count(&peer->free_reqs));
272
                goto put_xseg_request;
273
        }
274
        pr->peer = peer;
275
        pr->portno = srcport;
276
        pr->req = req;
277
#endif
278

    
279
        r = xseg_set_req_data(xseg, req, pr);
280
        if (r < 0) {
281
                XSEGLOG2(&lc, W, "Cannot set request data\n");
282
                goto put_peer_request;
283
        }
284

    
285
        p = xseg_submit(xseg, req, srcport, X_ALLOC);
286
        if (p == NoPort) {
287
                XSEGLOG2(&lc, W, "Cannot submit request\n");
288
                goto put_peer_request;
289
        }
290

    
291
        r = xseg_signal(xseg, p);
292

    
293
        return 0;
294

    
295
put_peer_request:
296
        free(pr->priv);
297
        free_peer_req(peer, pr);
298
put_xseg_request:
299
        if (xseg_put_request(xseg, req, srcport))
300
                XSEGLOG2(&lc, W, "Cannot put request\n");
301
        return -1;
302

    
303
        return 0;
304
}
305

    
306

    
307
int on_init(void *c, void *e)
308
{
309
        uint32_t i;
310
        struct peerd *peer = (struct peerd *)c;
311
        struct cached *cached = peer->priv;
312
        struct ce *ce = (struct ce *)e;
313
        ce->flags = 0;
314
        memset(ce->data, 0, cached->object_size);
315
        for (i = 0; i < cached->buckets_per_object; i++) {
316
                ce->status[i] = INVALID;
317
        }
318
        xlock_release(&ce->lock);
319
        return 0;
320
}
321

    
322
void on_put(void *c, void *e)
323
{
324
        struct peerd *peer = (struct peerd *)c;
325
        struct cached *cached = peer->priv;
326
        struct ce *ce = (struct ce *)e;
327
        //since we are the last referrer to the cache entry
328
        //no lock is needed.
329

    
330
        uint32_t start, end, i = 0;
331
        if (cached->write_policy == WRITETHROUGH || ce->flags & INVALIDATED)
332
                return;
333
        //write all dirty buckets.
334
        while(i < cached->buckets_per_object){
335
                if (ce->status[i] != DIRTY){
336
                        i++;
337
                        continue;
338
                }
339
                start = i;
340
                while (i < cached->buckets_per_object &&
341
                        (i-start)*cached->bucket_size < cached->max_req_size &&
342
                                ce->status[i] == DIRTY){
343
                        i++;
344
                }
345
                end = i;
346
                //problem: no assocciated pr
347
                //maybe put one in cache entry
348
                //rw_range(cached, ce, 1, start, end);
349
        }
350
}
351

    
352
void * init_node(void *c)
353
{
354
        int i;
355
        struct peerd *peer = (struct peerd *)c;
356
        struct cached *cached = peer->priv;
357

    
358
        struct ce *ce = malloc(sizeof(struct ce));
359
        if (!ce)
360
                goto ce_fail;
361
        xlock_release(&ce->lock);
362

    
363
        ce->data = malloc(sizeof(unsigned char) * cached->object_size);
364
        ce->status = malloc(sizeof(uint32_t) * cached->buckets_per_object);
365
        ce->waitq = malloc(sizeof(struct xwaitq) * cached->buckets_per_object);
366
        if (!ce->data || !ce->status || !ce->waitq)
367
                goto ce_fields_fail;
368

    
369
        ce->pr.peer = peer;
370
        for (i = 0; i < cached->buckets_per_object; i++) {
371
                xwaitq_init(&ce->waitq[i], is_loading, &ce->status[i],
372
                                XWAIT_SIGNAL_ONE);
373
        }
374
        xworkq_init(&ce->workq, &ce->lock, 0);
375
        return ce;
376

    
377
ce_fields_fail:
378
        free(ce->data);
379
        free(ce->status);
380
        free(ce->waitq);
381
        free(ce);
382
ce_fail:
383
        perror("malloc");
384
        return NULL;
385
}
386

    
387
struct xcache_ops c_ops = {
388
        .on_init = on_init,
389
        .on_put  = on_put,
390
        .on_node_init = init_node
391
};
392

    
393
static uint32_t __get_next_invalid(struct ce *ce, uint32_t start,
394
                                        uint32_t limit)
395
{
396
        uint32_t end = start + 1;
397
        while (end <= limit && ce->status[end] == INVALID)
398
                end++;
399
        return end - 1;
400
}
401

    
402
static void cached_fail(struct peerd *peer, struct peer_req *pr)
403
{
404
        struct cached *cached = __get_cached(peer);
405
        struct cache_io *cio = __get_cache_io(pr);
406
        if (cio->h != NoEntry){
407
                xcache_put(cached->cache, cio->h);
408
        }
409
        cio->h = NoEntry;
410
        fail(peer, pr);
411
}
412

    
413
static void cached_complete(struct peerd *peer, struct peer_req *pr)
414
{
415
        struct cached *cached = __get_cached(peer);
416
        struct cache_io *cio = __get_cache_io(pr);
417
        if (cio->h != NoEntry){
418
                xcache_put(cached->cache, cio->h);
419
        }
420
        cio->h = NoEntry;
421
        complete(peer, pr);
422
}
423

    
424
static void handle_read(void *arg);
425
//is this necessary?
426
static void status_changed(void *arg)
427
{
428
        /*
429
         * In this context we hold a reference to the cache entry.
430
         *
431
         * This function gets called only after the bucket at which the
432
         * current peer_req is waiting, has finished loading of failed.
433
         *
434
         * Assumptions:
435
         *         Each pr waits only at one bucket at any time. That means that
436
         *         under no circumstances, this function get called simutaneously
437
         *         for the same pr.
438
         */
439
        struct peer_req *pr = (struct peer_req *)arg;
440
        struct peerd *peer = pr->peer;
441
        struct cached *cached = __get_cached(peer);
442
        struct cache_io *cio = __get_cache_io(pr);
443
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
444

    
445
        if (xworkq_enqueue(&ce->workq, handle_read, (void *)pr) < 0){
446
                //FAIL or mark as failed ? are we the last?
447
                if (cio->pending_reqs){
448
                        // cannot put here, since there are outstanding reqs to
449
                        // be received.
450
                        // Simply mark pr as failed.
451
                        cio->state = CIO_FAILED;
452
                } else {
453
                        //safe to fail here, since there is no pending action on
454
                        //this pr.
455
                        cached_fail(peer, pr);
456
                }
457
        }
458
}
459

    
460
static void handle_read(void *arg)
461
{
462
        /*
463
         * In this context we hold a reference to the cache entry and
464
         * the assocciated lock
465
         */
466

    
467
        struct peer_req *pr = (struct peer_req *)arg;
468
        struct peerd *peer = pr->peer;
469
        struct cached *cached = __get_cached(peer);
470
        struct cache_io *cio = __get_cache_io(pr);
471
        struct xseg_request *req = pr->req;
472
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
473

    
474
        uint32_t start_bucket, end_bucket, next;
475
        uint32_t i, b, limit;
476

    
477
        uint32_t pending_buckets = 0;
478

    
479
        XSEGLOG2(&lc, E, "Handle read started for %p (ce: %p)", pr, ce );
480
        if (cio->state == CIO_FAILED)
481
                goto out;
482

    
483
        b = __get_bucket(cached, req->offset);
484
        limit = __get_bucket(cached, req->offset + req->size);
485
        //assert limit < cached->object_size
486

    
487
        for (i = b; i <= limit; i++) {
488
                if (bucket_readable(ce->status[i]))
489
                        continue;
490
                if (ce->status[i] != LOADING){
491
                        start_bucket = i;
492
                        end_bucket = __get_next_invalid(ce, start_bucket, limit);
493
                        i = end_bucket;
494
                        if (rw_range(peer, pr, 0, start_bucket, end_bucket) < 0){
495
                                cio->state = CIO_FAILED;
496
                                break;
497
                        }
498
                        cio->pending_reqs++;
499
                        cio->state =  CIO_READING;
500
                }
501
                pending_buckets++;;
502
        }
503

    
504
        if (pending_buckets) {
505
                /* Do not put cache entry yet */
506
                cio->work.job_fn = handle_read;
507
                cio->work.job = pr;
508
                /* wait on the last bucket */
509
                xwaitq_enqueue(&ce->waitq[end_bucket], &cio->work);
510
                return;
511
        }
512

    
513
out:
514
        if (cio->state == CIO_FAILED){
515
                if (!cio->pending_reqs)
516
                        cached_fail(peer, pr);
517
        }
518
        else{
519
                //serve req;
520
                cached_complete(peer, pr);
521
        }
522
        return;
523
}
524

    
525
static void handle_write(void *arg)
526
{
527
        //if writeback
528
        //          for each bucket
529
        //                write all buckets
530
        //                mark them as dirty
531
        //        cache_put(h)
532
        //        complete
533
        //else
534
        //        send write to blocker
535

    
536
        /*
537
         * In this context we hold a reference to the cache entry and
538
         * the assocciated lock
539
         */
540

    
541
        int r;
542
        struct peer_req *pr = (struct peer_req *)arg;
543
        struct peerd *peer = pr->peer;
544
        struct cached *cached = __get_cached(peer);
545
        struct cache_io *cio = __get_cache_io(pr);
546
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
547
        (void)ce;
548

    
549
        if (cached->write_policy == WRITETHROUGH){
550
                //send write to blocker
551
                //return
552
        } else if (cached->write_policy == WRITEBACK) {
553
                //for each bucket
554
                //        write all buckets
555
                //        mark them as dirty
556
                r = 0;
557
        } else {
558
                r = -1;
559
        }
560

    
561
out:
562
        if (r < 0)
563
                cached_fail(peer, pr);
564
        else
565
                cached_complete(peer, pr);
566
        return;
567
}
568

    
569
static int handle_readwrite(struct peerd *peer, struct peer_req *pr)
570
{
571
        int r = -1;
572
        struct ce *ce;
573
        struct cached *cached = __get_cached(peer);
574
        struct cache_io *cio = __get_cache_io(pr);
575
        char name[XSEG_MAX_TARGETLEN + 1];
576
        struct xseg_request *req = pr->req;
577
        char *target = xseg_get_target(peer->xseg, req);
578

    
579
        xcache_handler h = NoEntry;
580
        XSEGLOG2(&lc, D, "In handle readwrite");
581

    
582
        strncpy(name, target, req->targetlen);
583
        name[req->targetlen] = 0;
584
        XSEGLOG2(&lc, D, "All good till here1. Name: %s\n", name);
585

    
586
        h = xcache_lookup(cached->cache, name);
587
        if (h == NoEntry){
588
                h = xcache_alloc_init(cached->cache, name);
589
                if (h == NoEntry){
590
                        goto out;
591
                }
592
                r = xcache_insert(cached->cache, h);
593
                if (r < 0){
594
                        goto out;
595
                }
596
        }
597

    
598
        ce = (struct ce *)get_cache_entry(cached->cache, h);
599
        if (!ce){
600
                r = -1;
601
                goto out;
602
        }
603
        cio->h = h;
604

    
605
        if (req->op == X_WRITE)
606
                r = xworkq_enqueue(&ce->workq, handle_write, (void *)pr);
607
        else if (req->op == X_READ)
608
                r = xworkq_enqueue(&ce->workq, handle_read, (void *)pr);
609
        else {
610
                r = -1;
611
                goto out;
612
        }
613

    
614
out:
615
        if (r < 0){
616
                cached_fail(peer, pr);
617
        }
618
        return r;
619

    
620
}
621

    
622
struct req_completion{
623
        struct peer_req *pr;
624
        struct xseg_request *req;
625
};
626

    
627
static void complete_read(void *arg)
628
{
629
        /*
630
         * In this context we hold a reference to the cache entry and
631
         * the assocciated lock
632
         */
633

    
634
        struct req_completion *rc = (struct req_completion *)arg;
635
        struct peer_req *pr = rc->pr;
636
        struct xseg_request *req = rc->req;
637
        struct peerd *peer = pr->peer;
638
        struct cached *cached = __get_cached(peer);
639
        struct cache_io *cio = __get_cache_io(pr);
640
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
641
        uint32_t start, end, i;
642
        int success;
643
        char *data = xseg_get_data(peer->xseg, req);
644

    
645
        /*
646
         * Synchronize pending_reqs of the cache_io here, since each cache_io
647
         * refers to only one object, and therefore we can use the object lock
648
         * to synchronize between receive contextes.
649
         */
650
        cio->pending_reqs--;
651
        success = (req->state == XS_SERVED && req->serviced == req->size);
652
        if (!success)
653
                cio->state = CIO_FAILED;
654
        //assert (req->offset % cached->bucket_size) == 0;
655
        //assert ((req->offset+req->serviced) % cached->bucket_size) == 0;
656
        start = __get_bucket(cached, req->offset);
657
        end = __get_bucket(cached, req->offset + req->serviced);
658
        for (i = start; i <= end; i++) {
659
                if (ce->status[i] == LOADING){
660
                        if (success){
661
                                memcpy(ce->data+(i*cached->bucket_size), data,
662
                                                cached->bucket_size);
663
                                ce->status[i] = VALID;
664
                        }
665
                        else {
666
                                //reset status
667
                                XSEGLOG2(&lc, E, "Before ce %p, i %lu, ce->status[i] %u", ce, i, ce->status[i]);
668
                                ce->status[i] = INVALID;
669
                                XSEGLOG2(&lc, E, "After ce %p, i %lu, ce->status[i] %u", ce, i, ce->status[i]);
670
                        }
671
                        xwaitq_signal(&ce->waitq[i]);
672
                }
673
        }
674
        free(rc);
675
}
676

    
677
void complete_write(void *arg)
678
{
679
        //for each bucket
680
        //        if WRITETHROUGH
681
        //                copy data to bucket
682
        //                mark as valid
683
        //        else if WRITEBACK
684
        //                if status writing
685
        //                        mark as valid
686
        //
687
        /*
688
         * In this context we hold a reference to the cache entry and
689
         * the assocciated lock
690
         */
691
        return;
692
}
693

    
694
static int handle_receive_read(struct peerd *peer, struct peer_req *pr,
695
                        struct xseg_request *req)
696
{
697
        /*
698
         * Should be rentrant
699
         */
700
        struct cached *cached = __get_cached(peer);
701
        struct cache_io *cio = __get_cache_io(pr);
702
        struct ce *ce = get_cache_entry(cached->cache, cio->h);
703

    
704
        struct req_completion *rc;
705

    
706
        rc = malloc(sizeof(struct req_completion));
707

    
708
        rc->pr = pr;
709
        rc->req = req;
710
        if (xworkq_enqueue(&ce->workq, complete_read, (void *)rc) < 0){
711
                free(rc);
712
                //TODO WHAT?
713
        }
714
        return 0;
715
}
716

    
717
static int handle_receive_write(struct peerd *peer, struct peer_req *pr)
718
{
719
        //enqueue_work
720
        return 0;
721
}
722

    
723
static int handle_delete(struct peerd *peer, struct peer_req *pr)
724
{
725
        //h = cache_lookup
726
        //if h
727
        //        cio->h = h
728
        //
729
        //send delete to blocker
730
        return 0;
731
}
732

    
733
static int handle_receive_delete(struct peerd *peer, struct peer_req *pr)
734
{
735
        //if success
736
        //        if cio->h
737
        //                //this should not write any dirty data
738
        //                xcache_remove(h)
739
        return 0;
740
}
741

    
742
static int forward_req(struct peerd *peer, struct peer_req *pr)
743
{
744
        /*
745
        struct cached *cached = __get_cached(peer);
746
        struct cache_io *cio = __get_cache_io(pr);
747

748
        xport p;
749
        p = xseg_forward(peer->xseg, req, pr->portno, X_ALLOC);
750

751
        xseg_signal(peer->xseg, p);
752

753
        */
754
        return 0;
755
}
756

    
757
static int handle_receive(struct peerd *peer, struct peer_req *pr,
758
                        struct xseg_request *req)
759
{
760
        //if not read/write/delete
761
        //        put req;
762
        //        complete or fail pr;
763
        int r;
764
        xport p;
765

    
766
        switch (req->op){
767
                case X_READ: r = handle_receive_read(peer, pr, req); break;
768
//                case X_WRITE: r = handle_receive_write(peer, pr, req); break;
769
//                case X_DELETE: r = handle_receive_delete(peer, pr, req); break;
770
                default:
771
                        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
772
                        if (p == NoPort)
773
                                xseg_put_request(peer->xseg, req, pr->portno);
774
                        break;
775
        }
776

    
777
        return 0;
778
}
779

    
780
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
781
                enum dispatch_reason reason)
782
{
783
        struct cached *cached = __get_cached(peer);
784
        (void) cached;
785
        struct cache_io *cio = __get_cache_io(pr);
786
        (void) cio;
787

    
788
        switch (reason) {
789
                case dispatch_accept:
790
                        XSEGLOG2(&lc, D, "In dispatch accept");
791
                        if (req->op == X_READ || req->op == X_WRITE) {
792
                                cio->state = CIO_ACCEPTED;
793
                                handle_readwrite(peer, pr);
794
                        } else {
795
                                fail(peer, pr);
796
                        }
797
                        break;
798
                case dispatch_receive:
799
                        XSEGLOG2(&lc, D, "In dispatch receive");
800
                        handle_receive(peer, pr, req);
801
                        break;
802
                case dispatch_internal:
803
                default:
804
                        XSEGLOG2(&lc, E, "Invalid dispatch reason");
805
        }
806
        return 0;
807
}
808

    
809
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
810
{
811
        int i;
812
        char cache_size[MAX_ARG_LEN + 1];
813
        char bucket_size[MAX_ARG_LEN + 1];
814
        char object_size[MAX_ARG_LEN + 1];
815
        char max_req_size[MAX_ARG_LEN + 1];
816
        char write_policy[MAX_ARG_LEN + 1];
817
        long bportno = -1;
818

    
819
        cache_size[0] = 0;
820
        bucket_size[0] = 0;
821
        object_size[0] = 0;
822
        max_req_size[0] = 0;
823
        write_policy[0] = 0;
824

    
825
        /*Allocate enough space for needed structs*/
826
        struct cached *cached = malloc(sizeof(struct cached));
827
        if (!cached) {
828
                perror("malloc");
829
                goto fail;
830
        }
831
        cached->cache = malloc(sizeof(struct xcache));
832
        if (!cached->cache) {
833
                perror("malloc");
834
                goto cache_fail;
835
        }
836
        peer->priv = cached;
837

    
838
        for (i = 0; i < peer->nr_ops; i++) {
839
                struct cache_io *cio = malloc(sizeof(struct cache_io));
840
                if (!cio) {
841
                        perror("malloc");
842
                        goto cio_fail;
843
                }
844
                cio->h = NoEntry;
845
                cio->pending_reqs = 0;
846
                peer->peer_reqs[i].priv = cio;
847
        }
848

    
849
        /*Read arguments*/
850
        /*FIXME: change actual size to relative size for cache_size*/
851
        BEGIN_READ_ARGS(argc, argv);
852
        READ_ARG_ULONG("-bp", bportno);
853
        READ_ARG_STRING("-cs", cache_size, MAX_ARG_LEN);
854
        READ_ARG_STRING("-mrs", max_req_size, MAX_ARG_LEN);
855
        READ_ARG_STRING("-os", object_size, MAX_ARG_LEN);
856
        READ_ARG_STRING("-bs", bucket_size, MAX_ARG_LEN);
857
        READ_ARG_STRING("-wcp", write_policy, MAX_ARG_LEN);
858
        END_READ_ARGS();
859

    
860
        /*Parse arguments for:*/
861

    
862
        /*Bucket size*/
863
        if (!bucket_size[0]) {
864
                cached->bucket_size = BUCKET_SIZE_QUANTUM; /*Default value*/
865
        } else {
866
                cached->bucket_size = str2num(bucket_size);
867
                if (!cached->bucket_size) {
868
                        XSEGLOG2(&lc, E, "Invalid syntax: -bs %s\n", bucket_size);
869
                        goto arg_fail;
870
                }
871
                if (cached->bucket_size % BUCKET_SIZE_QUANTUM) {
872
                        XSEGLOG2(&lc, E, "Misaligned bucket size: %s\n", bucket_size);
873
                        goto arg_fail;
874
                }
875
        }
876

    
877
        /*Object size*/
878
        if (!object_size[0])
879
                strcpy(object_size, "4M"); /*Default value*/
880

    
881
        cached->object_size = str2num(object_size);
882
        if (!cached->object_size) {
883
                XSEGLOG2(&lc, E, "Invalid syntax: -os %s\n", object_size);
884
                goto arg_fail;
885
        }
886
        if (cached->object_size % cached->bucket_size) {
887
                XSEGLOG2(&lc, E, "Misaligned object size: %s\n", object_size);
888
                goto arg_fail;
889
        }
890

    
891
        /*Max request size*/
892
        if (!max_req_size[0]) {
893
                XSEGLOG2(&lc, E, "Maximum request size must be provided\n");
894
                goto arg_fail;
895
        }
896
        cached->max_req_size = str2num(max_req_size);
897
        if (!cached->max_req_size) {
898
                XSEGLOG2(&lc, E, "Invalid syntax: -mrs %s\n", max_req_size);
899
                goto arg_fail;
900
        }
901
        if (cached->max_req_size % BUCKET_SIZE_QUANTUM) {
902
                XSEGLOG2(&lc, E, "Misaligned maximum request size: %s\n",
903
                                max_req_size);
904
                goto arg_fail;
905
        }
906

    
907
        /*Cache size*/
908
        if (!cache_size[0]) {
909
                XSEGLOG2(&lc, E, "Cache size must be provided\n");
910
                goto arg_fail;
911
        }
912
        cached->cache_size = str2num(cache_size);
913
        if (!cached->cache_size) {
914
                XSEGLOG2(&lc, E, "Invalid syntax: -cs %s\n", cache_size);
915
                goto arg_fail;
916
        }
917
        if (cached->cache_size % cached->object_size) {
918
                XSEGLOG2(&lc, E, "Misaligned cache size: %s\n",        cache_size);
919
                goto arg_fail;
920
        }
921

    
922
        /*Blocker port*/
923
        if (bportno < 0){
924
                XSEGLOG2(&lc, E, "Blocker port must be provided");
925
                goto arg_fail;
926
        }
927
        cached->bportno = bportno;
928

    
929

    
930
        /*Change cache_size to number of objects*/
931
        cached->cache_size = cached->cache_size / cached->object_size;
932
        cached->buckets_per_object = cached->object_size / cached->bucket_size;
933

    
934
        print_cached(cached);
935

    
936
        /*Initialize xcache*/
937
        xcache_init(cached->cache, cached->cache_size, &c_ops, peer);
938

    
939
        return 0;
940

    
941
arg_fail:
942
        custom_peer_usage();
943
cio_fail:
944
        for (i = 0; i < peer->nr_ops && peer->peer_reqs[i].priv != NULL; i++)
945
                free(peer->peer_reqs[i].priv);
946
        free(cached->cache);
947
cache_fail:
948
        free(cached);
949
fail:
950
        return -1;
951
}
952

    
953
void custom_peer_finalize(struct peerd *peer)
954
{
955
        //write dirty objects
956
        //or cache_close(cached->cache);
957
        return;
958
}
959

    
960
void custom_peer_usage()
961
{
962
        fprintf(stderr, "Custom peer options: \n"
963
                        "  --------------------------------------------\n"
964
                        "    -cs       | None    | Cache size\n"
965
                        "    -mrs      | None    | Max request size\n"
966
                        "    -os       | 4M      | Object size\n"
967
                        "    -bs       | 4K      | Bucket size\n"
968
                        "    -bp       | None    | Blocker port\n"
969
                        "\n");
970
}