Statistics
| Branch: | Tag: | Revision:

root / xseg / xseg / xseg.c @ d8a852fa

History | View | Annotate | Download (41.5 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <xseg/xseg.h>
36
#include <xseg/domain.h>
37
#include <sys/util.h>
38

    
39
#ifndef NULL
40
#define NULL ((void *)0)
41
#endif
42

    
43
#define XSEG_NR_TYPES 16
44
#define XSEG_NR_PEER_TYPES 64
45
#define XSEG_MIN_PAGE_SIZE 4096
46

    
47
static struct xseg_type *__types[XSEG_NR_TYPES];
48
static unsigned int __nr_types;
49
static struct xseg_peer *__peer_types[XSEG_NR_PEER_TYPES];
50
static unsigned int __nr_peer_types;
51

    
52
static void __lock_segment(struct xseg *xseg)
53
{
54
        volatile uint64_t *flags;
55
        flags = &xseg->shared->flags;
56
        while (__sync_fetch_and_or(flags, XSEG_F_LOCK));
57
}
58

    
59
static void __unlock_segment(struct xseg *xseg)
60
{
61
        volatile uint64_t *flags;
62
        flags = &xseg->shared->flags;
63
        __sync_fetch_and_and(flags, ~XSEG_F_LOCK);
64
}
65

    
66
static struct xseg_type *__find_type(const char *name, long *index)
67
{
68
        long i;
69
        for (i = 0; (*index = i) < __nr_types; i++)
70
                if (!strncmp(__types[i]->name, name, XSEG_TNAMESIZE))
71
                        return __types[i];
72
        return NULL;
73
}
74

    
75
static struct xseg_peer *__find_peer_type(const char *name, int64_t *index)
76
{
77
        int64_t i;
78
        for (i = 0; (*index = i) < __nr_peer_types; i++) {
79
                if (!strncmp(__peer_types[i]->name, name, XSEG_TNAMESIZE))
80
                        return __peer_types[i];
81
        }
82
        return NULL;
83
}
84

    
85
void xseg_report_peer_types(void)
86
{
87
        long i;
88
        XSEGLOG("total %u peer types:\n", __nr_peer_types);
89
        for (i = 0; i < __nr_peer_types; i++)
90
                XSEGLOG("%ld: '%s'\n", i, __peer_types[i]->name);
91
}
92

    
93
static struct xseg_type *__find_or_load_type(const char *name)
94
{
95
        long i;
96
        struct xseg_type *type = __find_type(name, &i);
97
        if (type)
98
                return type;
99

    
100
        __load_plugin(name);
101
        return __find_type(name, &i);
102
}
103

    
104
static struct xseg_peer *__find_or_load_peer_type(const char *name)
105
{
106
        int64_t i;
107
        struct xseg_peer *peer_type = __find_peer_type(name, &i);
108
        if (peer_type)
109
                return peer_type;
110

    
111
        __load_plugin(name);
112
        return __find_peer_type(name, &i);
113
}
114

    
115
static struct xseg_peer *__get_peer_type(struct xseg *xseg, uint32_t serial)
116
{
117
        char *name;
118
        struct xseg_peer *type;
119
        struct xseg_private *priv = xseg->priv;
120
        char (*shared_peer_types)[XSEG_TNAMESIZE];
121

    
122
        if (serial >= xseg->max_peer_types) {
123
                XSEGLOG("invalid peer type serial %d >= %d\n",
124
                         serial, xseg->max_peer_types);
125
                return NULL;
126
        }
127

    
128
        type = priv->peer_types[serial];
129
        if (type)
130
                return type;
131

    
132
        /* xseg->shared->peer_types is an append-only array,
133
         * therefore this should be safe
134
         * without either locking or string copying. */
135
        shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
136
        name = shared_peer_types[serial];
137
        if (!*name) {
138
                XSEGLOG("nonexistent peer type serial %d\n", serial);
139
                return NULL;
140
        }
141

    
142
        type = __find_or_load_peer_type(name);
143
        if (!type)
144
                XSEGLOG("could not find driver for peer type %d [%s]\n",
145
                         serial, name);
146

    
147
        priv->peer_types[serial] = type;
148
        return type;
149
}
150

    
151
static void * __get_peer_type_data(struct xseg *xseg, uint32_t serial)
152
{
153
        char *name;
154
        void *data;
155
        struct xseg_private *priv = xseg->priv;
156
        char (*shared_peer_types)[XSEG_TNAMESIZE];
157
        xptr *shared_peer_type_data;
158

    
159
        if (serial >= xseg->max_peer_types) {
160
                XSEGLOG("invalid peer type serial %d >= %d\n",
161
                         serial, xseg->max_peer_types);
162
                return 0;
163
        }
164

    
165
        data = priv->peer_type_data[serial];
166
        if (data)
167
                return data;
168

    
169
        shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
170
        name = shared_peer_types[serial];
171
        if (!*name) {
172
                XSEGLOG("nonexistent peer type serial %d\n", serial);
173
                return 0;
174
        }
175
        shared_peer_type_data = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
176

    
177
        priv->peer_type_data[serial] = XPTR_TAKE(shared_peer_type_data[serial], xseg->segment);
178
        return priv->peer_type_data[serial];
179
}
180

    
181
static inline int __validate_port(struct xseg *xseg, uint32_t portno)
182
{
183
        return portno < xseg->config.nr_ports;
184
}
185

    
186
static inline int __validate_ptr(struct xseg *xseg, xptr ptr)
187
{
188
        return ptr < xseg->segment_size;
189
}
190

    
191
/* type:name:nr_ports:nr_requests:request_size:extra_size:page_shift */
192

    
193
#define TOK(s, sp, def) \
194
        (s) = (sp); \
195
        for (;;) { \
196
                switch (*(sp)) { \
197
                case 0: \
198
                        s = (def); \
199
                        break; \
200
                case ':': \
201
                        *(sp)++ = 0; \
202
                        break; \
203
                default: \
204
                        (sp) ++; \
205
                        continue; \
206
                } \
207
                break; \
208
        } \
209

    
210
static unsigned long strul(char *s)
211
{
212
        unsigned long n = 0;
213
        for (;;) {
214
                unsigned char c = *s - '0';
215
                if (c >= 10)
216
                        break;
217
                n = n * 10 + c;
218
                s ++;
219
        }
220
        return n;
221
}
222

    
223
/*
224
static char *strncopy(char *dest, const char *src, uint32_t n)
225
{
226
        uint32_t i;
227
        char c;
228
        for (i = 0; i < n; i++) {
229
                c = src[i];
230
                dest[i] = c;
231
                if (!c)
232
                        break;
233
        }
234
        dest[n-1] = 0;
235
        return dest;
236
}
237
*/
238

    
239
int xseg_parse_spec(char *segspec, struct xseg_config *config)
240
{
241
        /* default: "posix:globalxseg:4:256:12" */
242
        char *s = segspec, *sp = segspec;
243

    
244
        /* type */
245
        TOK(s, sp, "posix");
246
        strncpy(config->type, s, XSEG_TNAMESIZE);
247
        config->type[XSEG_TNAMESIZE-1] = 0;
248

    
249
        /* name */
250
        TOK(s, sp, "globalxseg");
251
        strncpy(config->name, s, XSEG_NAMESIZE);
252
        config->name[XSEG_NAMESIZE-1] = 0;
253

    
254
        /* nr_ports */
255
        TOK(s, sp, "4");
256
        config->nr_ports = strul(s);
257

    
258
        /* heap_size */
259
        TOK(s, sp, "256");
260
        config->heap_size = (uint64_t) (strul(s) * 1024UL * 1024UL);
261

    
262
        /* page_shift */
263
        TOK(s, sp, "12");
264
        config->page_shift = strul(s);
265
        return 0;
266
}
267

    
268
int xseg_register_type(struct xseg_type *type)
269
{
270
        long i;
271
        int r = -1;
272
        struct xseg_type *__type;
273
        __lock_domain();
274
        __type = __find_type(type->name, &i);
275
        if (__type) {
276
                XSEGLOG("type %s already exists\n", type->name);
277
                goto out;
278
        }
279

    
280
        if (__nr_types >= XSEG_NR_TYPES) {
281
                XSEGLOG("maximum type registrations reached: %u\n", __nr_types);
282
                r -= 1;
283
                goto out;
284
        }
285

    
286
        type->name[XSEG_TNAMESIZE-1] = 0;
287
        __types[__nr_types] = type;
288
        __nr_types += 1;
289
        r = 0;
290
out:
291
        __unlock_domain();
292
        return r;
293
}
294

    
295
int xseg_unregister_type(const char *name)
296
{
297
        long i;
298
        int r = -1;
299
        struct xseg_type *__type;
300
        __lock_domain();
301
        __type = __find_type(name, &i);
302
        if (!__type) {
303
                XSEGLOG("segment type '%s' does not exist\n", name);
304
                goto out;
305
        }
306

    
307
        __nr_types -= 1;
308
        __types[i] = __types[__nr_types];
309
        __types[__nr_types] = NULL;
310
        r = 0;
311
out:
312
        __unlock_domain();
313
        return r;
314
}
315

    
316
int xseg_register_peer(struct xseg_peer *peer_type)
317
{
318
        int64_t i;
319
        int r = -1;
320
        struct xseg_peer *type;
321
        __lock_domain();
322
        type = __find_peer_type(peer_type->name, &i);
323
        if (type) {
324
                XSEGLOG("peer type '%s' already exists\n", type->name);
325
                goto out;
326
        }
327

    
328
        if (__nr_peer_types >= XSEG_NR_PEER_TYPES) {
329
                XSEGLOG("maximum peer type registrations reached: %u",
330
                        __nr_peer_types);
331
                r -= 1;
332
                goto out;
333
        }
334

    
335
        if (peer_type->peer_ops.remote_signal_init()) {
336
                XSEGLOG("peer type '%s': signal initialization failed\n",
337
                        peer_type->name);
338
                r -= 1;
339
                goto out;
340
        }
341

    
342
        peer_type->name[XSEG_TNAMESIZE-1] = 0;
343
        __peer_types[__nr_peer_types] = peer_type;
344
        __nr_peer_types += 1;
345
        r = 0;
346

    
347
out:
348
        __unlock_domain();
349
        return r;
350
}
351

    
352
int xseg_unregister_peer(const char *name)
353
{
354
        int64_t i;
355
        struct xseg_peer *driver;
356
        int r = -1;
357
        __lock_domain();
358
        driver = __find_peer_type(name, &i);
359
        if (!driver) {
360
                XSEGLOG("peer type '%s' does not exist\n", name);
361
                goto out;
362
        }
363

    
364
        __nr_peer_types -= 1;
365
        __peer_types[i] = __peer_types[__nr_peer_types];
366
        __peer_types[__nr_peer_types] = NULL;
367
        driver->peer_ops.remote_signal_quit();
368
        r = 0;
369
out:
370
        __unlock_domain();
371
        return r;
372
}
373

    
374
int64_t __enable_driver(struct xseg *xseg, struct xseg_peer *driver)
375
{
376
        int64_t r;
377
        char (*drivers)[XSEG_TNAMESIZE];
378
        xptr *ptd;
379
        uint32_t max_drivers = xseg->max_peer_types;
380
        void *data;
381
        xptr peer_type_data;
382

    
383
        if (xseg->shared->nr_peer_types >= max_drivers) {
384
                XSEGLOG("cannot register '%s': driver namespace full\n",
385
                        driver->name);
386
                return -1;
387
        }
388

    
389
        drivers = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
390
        for (r = 0; r < max_drivers; r++) {
391
                if (!*drivers[r])
392
                        goto bind;
393
                if (!strncmp(drivers[r], driver->name, XSEG_TNAMESIZE)){
394
                        data = __get_peer_type_data(xseg, r);
395
                        goto success;
396
                }
397
        }
398

    
399
        /* Unreachable */
400
        return -666;
401

    
402
bind:
403
        /* assert(xseg->shared->nr_peer_types == r); */
404
        data = driver->peer_ops.alloc_data(xseg);
405
        if (!data)
406
                return -1;
407
        peer_type_data = XPTR_MAKE(data, xseg->segment);
408
        ptd = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
409
        ptd[r] = peer_type_data;
410
        xseg->shared->nr_peer_types = r + 1;
411
        strncpy(drivers[r], driver->name, XSEG_TNAMESIZE);
412
        drivers[r][XSEG_TNAMESIZE-1] = 0;
413

    
414
success:
415
        xseg->priv->peer_types[r] = driver;
416
        xseg->priv->peer_type_data[r] = data;
417
        return r;
418
}
419

    
420
int64_t xseg_enable_driver(struct xseg *xseg, const char *name)
421
{
422
        int64_t r = -1;
423
        struct xseg_peer *driver;
424

    
425
        __lock_domain();
426
        driver = __find_peer_type(name, &r);
427
        if (!driver) {
428
                XSEGLOG("driver '%s' not found\n", name);
429
                goto out;
430
        }
431

    
432
        __lock_segment(xseg);
433
        r = __enable_driver(xseg, driver);
434
        __unlock_segment(xseg);
435
out:
436
        __unlock_domain();
437
        return r;
438
}
439

    
440
int xseg_disable_driver(struct xseg *xseg, const char *name)
441
{
442
        int64_t i;
443
        int r = -1;
444
        struct xseg_private *priv = xseg->priv;
445
        struct xseg_peer *driver;
446
        __lock_domain();
447
        driver =  __find_peer_type(name, &i);
448
        if (!driver) {
449
                XSEGLOG("driver '%s' not found\n", name);
450
                goto out;
451
        }
452

    
453
        for (i = 0; i < xseg->max_peer_types; i++)
454
                if (priv->peer_types[i] == driver)
455
                        priv->peer_types[i] = NULL;
456
        r = 0;
457
out:
458
        __unlock_domain();
459
        return r;
460
}
461

    
462
/* NOTE: calculate_segment_size() and initialize_segment()
463
 * must always be exactly in sync!
464
*/
465

    
466
static uint64_t calculate_segment_size(struct xseg_config *config)
467
{
468
        uint64_t size = 0;
469
        uint32_t page_size, page_shift = config->page_shift;
470

    
471
        /* assert(sizeof(struct xseg) <= (1 << 9)); */
472

    
473
        if (page_shift < 9) {
474
                XSEGLOG("page_shift must be >= %d\n", 9);
475
                return 0;
476
        }
477

    
478
        page_size = 1 << page_shift;
479

    
480
        /* struct xseg itself + struct xheap */
481
        size += 2*page_size + config->heap_size;
482
        size = __align(size, page_shift);
483
        
484
        return size;
485
}
486

    
487
static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
488
{
489
        uint32_t page_shift = cfg->page_shift, page_size = 1 << page_shift;
490
        struct xseg_shared *shared;
491
        char *segment = (char *)xseg;
492
        uint64_t size = page_size, i;
493
        void *mem;
494
        struct xheap *heap;
495
        struct xobject_h *obj_h;
496
        int r;
497
        xptr *ports;
498
        xport *gw;
499

    
500

    
501
        if (page_size < XSEG_MIN_PAGE_SIZE)
502
                return -1;
503

    
504
        xseg->segment_size = 2 * page_size + cfg->heap_size;
505
        xseg->segment = (struct xseg *) segment;
506

    
507
        /* build heap */
508
        xseg->heap = (struct xheap *) XPTR_MAKE(segment + size, segment);
509
        size += sizeof(struct xheap);
510
        size = __align(size, page_shift);
511

    
512
        heap = XPTR_TAKE(xseg->heap, segment);
513
        r = xheap_init(heap, cfg->heap_size, page_shift, segment+size);
514
        if (r < 0)
515
                return -1;
516

    
517
        /* build object_handler handler */
518
        mem = xheap_allocate(heap, sizeof(struct xobject_h));
519
        if (!mem)
520
                return -1;
521
        xseg->object_handlers = (struct xobject_h *) XPTR_MAKE(mem, segment);
522
        obj_h = mem;
523
        r = xobj_handler_init(obj_h, segment, MAGIC_OBJH, 
524
                        sizeof(struct xobject_h), heap);
525
        if (r < 0)
526
                return -1;
527

    
528
        //now that we have object handlers handler, use that to allocate
529
        //new object handlers
530
        
531
        //allocate requests handler
532
        mem = xobj_get_obj(obj_h, X_ALLOC);
533
        if (!mem)
534
                return -1;
535
        obj_h = mem;
536
        r = xobj_handler_init(obj_h, segment, MAGIC_REQ, 
537
                        sizeof(struct xseg_request), heap);
538
        if (r < 0)
539
                return -1;
540
        xseg->request_h = (struct xobject_h *) XPTR_MAKE(obj_h, segment);
541
        
542
        //allocate ports handler
543
        obj_h = XPTR_TAKE(xseg->object_handlers, segment);
544
        mem = xobj_get_obj(obj_h, X_ALLOC);
545
        if (!mem)
546
                return -1;
547
        obj_h = mem;
548
        r = xobj_handler_init(obj_h, segment, MAGIC_PORT, 
549
                        sizeof(struct xseg_port), heap);
550
        if (r < 0)
551
                return -1;
552
        xseg->port_h = (struct xobject_h *) XPTR_MAKE(mem, segment);
553

    
554
        //allocate xptr port array to be used as a map
555
        //portno <--> xptr port
556
        mem = xheap_allocate(heap, sizeof(xptr)*cfg->nr_ports);
557
        if (!mem)
558
                return -1;
559
        ports = mem;
560
        for (i = 0; i < cfg->nr_ports; i++) {
561
                ports[i]=0;
562
        }
563
        xseg->ports = (xptr *) XPTR_MAKE(mem, segment);
564

    
565
        //allocate {src,dst} gws
566
        mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
567
        if (!mem)
568
                return -1;
569
        gw = mem;
570
        for (i = 0; i < cfg->nr_ports; i++) {
571
                gw[i] = NoPort;
572
        }
573
        xseg->path_next = (xport *) XPTR_MAKE(mem, segment);
574

    
575
        mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
576
        if (!mem)
577
                return -1;
578
        gw = mem;
579
        for (i = 0; i < cfg->nr_ports; i++) {
580
                gw[i] = NoPort;
581
        }
582
        xseg->dst_gw = (xport *) XPTR_MAKE(mem, segment);
583
        
584
        //allocate xseg_shared memory
585
        mem = xheap_allocate(heap, sizeof(struct xseg_shared));
586
        if (!mem)
587
                return -1;
588
        shared = (struct xseg_shared *) mem;
589
        shared->flags = 0;
590
        shared->nr_peer_types = 0;
591
        xseg->shared = (struct xseg_shared *) XPTR_MAKE(mem, segment);
592
        
593
        mem = xheap_allocate(heap, page_size);
594
        if (!mem)
595
                return -1;
596
        shared->peer_types = (char **) XPTR_MAKE(mem, segment);
597
        xseg->max_peer_types = xheap_get_chunk_size(mem) / XSEG_TNAMESIZE;
598
        mem = xheap_allocate(heap, xseg->max_peer_types * sizeof(xptr));
599
        if (!mem)
600
                return -1;
601
        memset(mem, 0, xheap_get_chunk_size(mem));
602
        shared->peer_type_data = (xptr *) XPTR_MAKE(mem, segment);
603

    
604
        memcpy(&xseg->config, cfg, sizeof(struct xseg_config));
605

    
606
        xseg->counters.req_cnt = 0;
607
        xseg->counters.avg_req_lat = 0;
608

    
609
        return 0;
610
}
611

    
612
int xseg_create(struct xseg_config *cfg)
613
{
614
        struct xseg *xseg = NULL;
615
        struct xseg_type *type;
616
        struct xseg_operations *xops;
617
        uint64_t size;
618
        long r;
619

    
620
        type = __find_or_load_type(cfg->type);
621
        if (!type) {
622
                cfg->type[XSEG_TNAMESIZE-1] = 0;
623
                XSEGLOG("type '%s' does not exist\n", cfg->type);
624
                goto out_err;
625
        }
626

    
627
        size = calculate_segment_size(cfg);
628
        if (!size) {
629
                XSEGLOG("invalid config!\n");
630
                goto out_err;
631
        }
632

    
633
        xops = &type->ops;
634
        cfg->name[XSEG_NAMESIZE-1] = 0;
635
        XSEGLOG("creating segment of size %llu\n", size);
636
        r = xops->allocate(cfg->name, size);
637
        if (r) {
638
                XSEGLOG("cannot allocate segment!\n");
639
                goto out_err;
640
        }
641

    
642
        xseg = xops->map(cfg->name, size, NULL);
643
        if (!xseg) {
644
                XSEGLOG("cannot map segment!\n");
645
                goto out_deallocate;
646
        }
647

    
648
        r = initialize_segment(xseg, cfg);
649
        xops->unmap(xseg, size);
650
        if (r) {
651
                XSEGLOG("cannot initilize segment!\n");
652
                goto out_deallocate;
653
        }
654

    
655

    
656
        return 0;
657

    
658
out_deallocate:
659
        xops->deallocate(cfg->name);
660
out_err:
661
        return -1;
662
}
663

    
664
void xseg_destroy(struct xseg *xseg)
665
{
666
        struct xseg_type *type;
667

    
668
        __lock_domain();
669
        type = __find_or_load_type(xseg->config.type);
670
        if (!type) {
671
                XSEGLOG("no segment type '%s'\n", xseg->config.type);
672
                goto out;
673
        }
674

    
675
        /* should destroy() leave() first? */
676
        type->ops.deallocate(xseg->config.name);
677
out:
678
        __unlock_domain();
679
}
680

    
681
//FIXME
682
static int pointer_ok(        unsigned long ptr,
683
                        unsigned long base,
684
                        uint64_t size,
685
                        char *name)
686
{
687
        int ret = !(ptr >= base && ptr < base + size);
688
        if (ret)
689
                XSEGLOG("invalid pointer '->%s' [%llx on %llx]!\n",
690
                        (unsigned long long)ptr,
691
                        (unsigned long long)base,
692
                        name);
693
        return ret;
694
}
695

    
696
#define POINTER_OK(xseg, field, base) \
697
         pointer_ok(        (unsigned long)((xseg)->field), \
698
                        (unsigned long)(base), \
699
                        (xseg)->segment_size, \
700
                        #field)
701

    
702
static int xseg_validate_pointers(struct xseg *xseg)
703
{
704
        int r = 0;
705
        r += POINTER_OK(xseg, object_handlers, xseg->segment);
706
        r += POINTER_OK(xseg, request_h, xseg->segment);
707
        r += POINTER_OK(xseg, port_h, xseg->segment);
708
        r += POINTER_OK(xseg, ports, xseg->segment);
709
        r += POINTER_OK(xseg, heap, xseg->segment);
710
        r += POINTER_OK(xseg, shared, xseg->segment);
711
        return r;
712
}
713

    
714
struct xseg *xseg_join(        char *segtypename,
715
                        char *segname,
716
                        char *peertypename,
717
                        void (*wakeup)
718
                        (        uint32_t portno                ))
719
{
720
        struct xseg *xseg, *__xseg;
721
        uint64_t size;
722
        struct xseg_peer *peertype;
723
        struct xseg_type *segtype;
724
        struct xseg_private *priv;
725
        struct xseg_operations *xops;
726
        struct xseg_peer_operations *pops;
727
        int r;
728

    
729
        __lock_domain();
730

    
731
        peertype = __find_or_load_peer_type(peertypename);
732
        if (!peertype) {
733
                XSEGLOG("Peer type '%s' not found\n", peertypename);
734
                __unlock_domain();
735
                goto err;
736
        }
737

    
738
        segtype = __find_or_load_type(segtypename);
739
        if (!segtype) {
740
                XSEGLOG("Segment type '%s' not found\n", segtypename);
741
                __unlock_domain();
742
                goto err;
743
        }
744

    
745
        __unlock_domain();
746

    
747
        xops = &segtype->ops;
748
        pops = &peertype->peer_ops;
749

    
750
        xseg = pops->malloc(sizeof(struct xseg));
751
        if (!xseg) {
752
                XSEGLOG("Cannot allocate memory");
753
                goto err;
754
        }
755

    
756
        priv = pops->malloc(sizeof(struct xseg_private));
757
        if (!priv) {
758
                XSEGLOG("Cannot allocate memory");
759
                goto err_seg;
760
        }
761

    
762
        __xseg = xops->map(segname, XSEG_MIN_PAGE_SIZE, NULL);
763
        if (!__xseg) {
764
                XSEGLOG("Cannot map segment");
765
                goto err_priv;
766
        }
767

    
768
        size = __xseg->segment_size;
769
        /* XSEGLOG("joined segment of size: %lu\n", (unsigned long)size); */
770
        xops->unmap(__xseg, XSEG_MIN_PAGE_SIZE);
771

    
772
        __xseg = xops->map(segname, size, xseg);
773
        if (!__xseg) {
774
                XSEGLOG("Cannot map segment");
775
                goto err_priv;
776
        }
777

    
778
        priv->segment_type = *segtype;
779
        priv->peer_type = *peertype;
780
        priv->wakeup = wakeup;
781
        priv->req_data = xhash_new(3, 0, INTEGER); //FIXME should be relative to XSEG_DEF_REQS
782
        if (!priv->req_data)
783
                goto err_priv;
784
        xlock_release(&priv->reqdatalock);
785

    
786
        xseg->max_peer_types = __xseg->max_peer_types;
787

    
788
        priv->peer_types = pops->malloc(sizeof(void *) * xseg->max_peer_types);
789
        if (!priv->peer_types) {
790
                XSEGLOG("Cannot allocate memory");
791
                goto err_unmap;
792
        }
793
        memset(priv->peer_types, 0, sizeof(void *) * xseg->max_peer_types);
794
        priv->peer_type_data = pops->malloc(sizeof(void *) * xseg->max_peer_types);
795
        if (!priv->peer_types) {
796
                XSEGLOG("Cannot allocate memory");
797
                //FIXME wrong err handling
798
                goto err_unmap;
799
        }
800
        memset(priv->peer_type_data, 0, sizeof(void *) * xseg->max_peer_types);
801

    
802
        xseg->priv = priv;
803
        xseg->config = __xseg->config;
804
        xseg->version = __xseg->version;
805
        xseg->request_h = XPTR_TAKE(__xseg->request_h, __xseg);
806
        xseg->port_h = XPTR_TAKE(__xseg->port_h, __xseg);
807
        xseg->ports = XPTR_TAKE(__xseg->ports, __xseg);
808
        xseg->path_next = XPTR_TAKE(__xseg->path_next, __xseg);
809
        xseg->dst_gw = XPTR_TAKE(__xseg->dst_gw, __xseg);
810
        xseg->heap = XPTR_TAKE(__xseg->heap, __xseg);
811
        xseg->object_handlers = XPTR_TAKE(__xseg->object_handlers, __xseg);
812
        xseg->shared = XPTR_TAKE(__xseg->shared, __xseg);
813
        xseg->segment_size = size;
814
        xseg->segment = __xseg;
815
        __sync_synchronize();
816

    
817
        r = xseg_validate_pointers(xseg);
818
        if (r) {
819
                XSEGLOG("found %d invalid xseg pointers!\n", r);
820
                goto err_free_types;
821
        }
822

    
823
        /* Do we need this?
824
        r = xops->signal_join(xseg);
825
        if (r) {
826
                XSEGLOG("Cannot attach signaling to segment! (error: %d)\n", r);
827
                goto err_free_types;
828
        }
829
        */
830

    
831
        return xseg;
832

    
833
err_free_types:
834
        pops->mfree(priv->peer_types);
835
err_unmap:
836
        xops->unmap(__xseg, size);
837
        xhash_free(priv->req_data);
838
err_priv:
839
        pops->mfree(priv);
840
err_seg:
841
        pops->mfree(xseg);
842
err:
843
        return NULL;
844
}
845

    
846
void xseg_leave(struct xseg *xseg)
847
{
848
        struct xseg_type *type;
849

    
850
        __lock_domain();
851
        type = __find_or_load_type(xseg->config.type);
852
        if (!type) {
853
                XSEGLOG("no segment type '%s'\n", xseg->config.type);
854
                __unlock_domain();
855
                return;
856
        }
857
        __unlock_domain();
858

    
859
        type->ops.unmap(xseg->segment, xseg->segment_size);
860
        //FIXME free xseg?
861
}
862

    
863
struct xseg_port* xseg_get_port(struct xseg *xseg, uint32_t portno)
864
{
865
        xptr p;
866
        if (!__validate_port(xseg, portno))
867
                return NULL;
868
        p = xseg->ports[portno];
869
        if (p)
870
                return XPTR_TAKE(p, xseg->segment);
871
        else 
872
                return NULL;
873
}
874

    
875
struct xq * __alloc_queue(struct xseg *xseg, uint64_t nr_reqs)
876
{
877
        uint64_t bytes;
878
        void *mem, *buf;
879
        struct xq *q;
880
        struct xheap *heap = xseg->heap;
881

    
882
        //how many bytes to allocate for a queue
883
        bytes = sizeof(struct xq) + nr_reqs*sizeof(xqindex);
884
        mem = xheap_allocate(heap, bytes);
885
        if (!mem)
886
                return NULL;
887

    
888
        //how many bytes did we got, and calculate what's left of buffer
889
        bytes = xheap_get_chunk_size(mem) - sizeof(struct xq);
890

    
891
        //initialize queue with max nr of elements it can hold
892
        q = (struct xq *) mem;
893
        buf = (void *) (((unsigned long) mem) + sizeof(struct xq));
894
        xq_init_empty(q, bytes/sizeof(xqindex), buf); 
895

    
896
        return q;
897
}
898

    
899
//FIXME
900
//maybe add parameters of initial free_queue size and max_alloc_reqs
901
struct xseg_port *xseg_alloc_port(struct xseg *xseg, uint32_t flags, uint64_t nr_reqs)
902
{
903
        struct xq *q;
904
        struct xobject_h *obj_h = xseg->port_h;
905
        struct xseg_port *port = xobj_get_obj(obj_h, flags);
906
        if (!port)
907
                return NULL;
908

    
909
        //alloc free queue
910
        q = __alloc_queue(xseg, nr_reqs);
911
        if (!q)
912
                goto err_free;
913
        port->free_queue = XPTR_MAKE(q, xseg->segment);
914

    
915
        //and for request queue
916
        q = __alloc_queue(xseg, nr_reqs);
917
        if (!q)
918
                goto err_req;
919
        port->request_queue = XPTR_MAKE(q, xseg->segment);
920

    
921
        //and for reply queue
922
        q = __alloc_queue(xseg, nr_reqs);
923
        if (!q)
924
                goto err_reply;
925
        port->reply_queue = XPTR_MAKE(q, xseg->segment);
926

    
927
        xlock_release(&port->fq_lock);
928
        xlock_release(&port->rq_lock);
929
        xlock_release(&port->pq_lock);
930
        xlock_release(&port->port_lock);
931
        port->owner = Noone;
932
        port->portno = NoPort;
933
        port->peer_type = 0; //FIXME what  here ??? NoType??
934
        port->alloc_reqs = 0;
935
        port->max_alloc_reqs = XSEG_DEF_MAX_ALLOCATED_REQS;
936
        port->flags = 0;
937

    
938

    
939
        return port;
940

    
941
err_reply:
942
        xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
943
        port->request_queue = 0;
944
err_req:
945
        xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
946
        port->free_queue = 0;
947
err_free:
948
        xobj_put_obj(obj_h, port);
949

    
950
        return NULL;
951
}
952

    
953
void xseg_free_port(struct xseg *xseg, struct xseg_port *port)
954
{
955
        struct xobject_h *obj_h = xseg->port_h;
956

    
957
        if (port->request_queue) {
958
                xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
959
                port->request_queue = 0;
960
        }
961
        if (port->free_queue) {
962
                xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
963
                port->free_queue = 0;
964
        }
965
        if (port->reply_queue) {
966
                xheap_free(XPTR_TAKE(port->reply_queue, xseg->segment));
967
                port->reply_queue = 0;
968
        }
969
        xobj_put_obj(obj_h, port);
970
}
971

    
972
void* xseg_alloc_buffer(struct xseg *xseg, uint64_t size)
973
{
974
        struct xheap *heap = xseg->heap;
975
        void *mem = xheap_allocate(heap, size);
976
        if (mem && xheap_get_chunk_size(mem) < size) {
977
                XSEGLOG("Buffer size %llu instead of %llu\n", 
978
                                xheap_get_chunk_size(mem), size);
979
                xheap_free(mem);
980
                mem = NULL;
981
        }
982
        return mem;
983
}
984

    
985
void xseg_free_buffer(struct xseg *xseg, void *ptr)
986
{
987
        xheap_free(ptr);
988
}
989

    
990
int xseg_prepare_wait(struct xseg *xseg, uint32_t portno)
991
{
992
        if (!__validate_port(xseg, portno))
993
                return -1;
994

    
995
        return xseg->priv->peer_type.peer_ops.prepare_wait(xseg, portno);
996
}
997

    
998
int xseg_cancel_wait(struct xseg *xseg, uint32_t portno)
999
{
1000
        if (!__validate_port(xseg, portno))
1001
                return -1;
1002
        return xseg->priv->peer_type.peer_ops.cancel_wait(xseg, portno);
1003
}
1004

    
1005
int xseg_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
1006
{
1007
        return xseg->priv->peer_type.peer_ops.wait_signal(xseg, usec_timeout);
1008
}
1009

    
1010
int xseg_signal(struct xseg *xseg, xport portno)
1011
{
1012
        struct xseg_peer *type;
1013
        struct xseg_port *port = xseg_get_port(xseg, portno);
1014
        if (!port)
1015
                return -1;
1016
        
1017
        type = __get_peer_type(xseg, port->peer_type);
1018
        if (!type)
1019
                return -1;
1020

    
1021
        return type->peer_ops.signal(xseg, portno);
1022
}
1023

    
1024
int xseg_init_local_signal(struct xseg *xseg, xport portno)
1025
{
1026
        struct xseg_peer *type;
1027
        struct xseg_port *port = xseg_get_port(xseg, portno);
1028
        if (!port)
1029
                return -1;
1030
        
1031
        type = __get_peer_type(xseg, port->peer_type);
1032
        if (!type)
1033
                return -1;
1034

    
1035
        return type->peer_ops.local_signal_init(xseg, portno);
1036
}
1037

    
1038
void xseg_quit_local_signal(struct xseg *xseg, xport portno)
1039
{
1040
        struct xseg_peer *type;
1041
        struct xseg_port *port = xseg_get_port(xseg, portno);
1042
        if (!port)
1043
                return;
1044
        
1045
        type = __get_peer_type(xseg, port->peer_type);
1046
        if (!type)
1047
                return;
1048

    
1049
        type->peer_ops.local_signal_quit(xseg, portno);
1050
}
1051

    
1052
//FIXME doesn't increase alloced reqs
1053
//is integer i enough here?
1054
int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr)
1055
{
1056
        int i = 0;
1057
        xqindex xqi;
1058
        struct xq *q;
1059
        struct xseg_request *req;
1060
        struct xseg_port *port = xseg_get_port(xseg, portno);
1061
        if (!port)
1062
                return -1;
1063

    
1064
        xlock_acquire(&port->fq_lock, portno);
1065
        q = XPTR_TAKE(port->free_queue, xseg->segment);
1066
        while ((req = xobj_get_obj(xseg->request_h, X_ALLOC)) != NULL && i < nr) {
1067
                xqi = XPTR_MAKE(req, xseg->segment);
1068
                xqi = __xq_append_tail(q, xqi);
1069
                if (xqi == Noneidx) {
1070
                        xobj_put_obj(xseg->request_h, req);
1071
                        break;
1072
                }
1073
                i++;
1074
        }
1075
        xlock_release(&port->fq_lock);
1076

    
1077
        if (i == 0)
1078
                i = -1;
1079
        return i;
1080
}
1081

    
1082
int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
1083
{
1084
        int i = 0;
1085
        xqindex xqi;
1086
        struct xq *q;
1087
        struct xseg_request *req;
1088
        struct xseg_port *port = xseg_get_port(xseg, portno);
1089
        if (!port)
1090
                return -1;
1091

    
1092
        xlock_acquire(&port->fq_lock, portno);
1093
        q = XPTR_TAKE(port->free_queue, xseg->segment);
1094
        while ((xqi = __xq_pop_head(q)) != Noneidx && i < nr) {
1095
                req = XPTR_TAKE(xqi, xseg->segment);
1096
                xobj_put_obj(xseg->request_h, (void *) req);
1097
                i++;
1098
        }
1099
        if (i == 0)
1100
                return -1;
1101
        xlock_release(&port->fq_lock);
1102

    
1103
        xlock_acquire(&port->port_lock, portno);
1104
        port->alloc_reqs -= i;
1105
        xlock_release(&port->port_lock);
1106

    
1107
        return i;
1108
}
1109

    
1110
int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
1111
                        uint32_t src_portno, uint32_t dst_portno)
1112
{
1113
        if (!__validate_port(xseg, src_portno))
1114
                return -1;
1115

    
1116
        if (!__validate_port(xseg, dst_portno))
1117
                return -1;
1118

    
1119
        xreq->src_portno = src_portno;
1120
        xreq->transit_portno = src_portno;
1121
        xreq->dst_portno = dst_portno;
1122
        xreq->effective_dst_portno = dst_portno;
1123

    
1124
        return 0;
1125
}
1126

    
1127
struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
1128
                                        xport dst_portno, uint32_t flags)
1129
{
1130
        /*
1131
         * Flags:
1132
         * X_ALLOC Allocate more requests if object handler
1133
         *            does not have any avaiable
1134
         * X_LOCAL Use only local - preallocated reqs
1135
         *         (Maybe we want this as default, to give a hint to a peer
1136
         *             how many requests it can have flying)
1137
         */
1138
        struct xseg_request *req = NULL;
1139
        struct xseg_port *port;
1140
        struct xq *q;
1141
        xqindex xqi;
1142
        xptr ptr;
1143

    
1144
        port = xseg_get_port(xseg, src_portno);
1145
        if (!port)
1146
                return NULL;
1147
        //try to allocate from free_queue
1148
        xlock_acquire(&port->fq_lock, src_portno);
1149
        q = XPTR_TAKE(port->free_queue, xseg->segment);
1150
        xqi = __xq_pop_head(q);
1151
        if (xqi != Noneidx){
1152
                xlock_release(&port->fq_lock);
1153
                ptr = xqi;
1154
                req = XPTR_TAKE(ptr, xseg->segment);
1155
                goto done;
1156
        }
1157
        xlock_release(&port->fq_lock);
1158

    
1159
        if (flags & X_LOCAL)
1160
                return NULL;
1161

    
1162
        //else try to allocate from global heap
1163
        //FIXME
1164
        xlock_acquire(&port->port_lock, src_portno);
1165
        if (port->alloc_reqs < port->max_alloc_reqs) {
1166
                req = xobj_get_obj(xseg->request_h, flags & X_ALLOC);
1167
                if (req)
1168
                        port->alloc_reqs++;
1169
        }
1170
        xlock_release(&port->port_lock);
1171
        if (!req)
1172
                return NULL;
1173

    
1174
done:
1175

    
1176
        req->buffer = 0;
1177
        req->bufferlen = 0;
1178
        req->target = 0;
1179
        req->data = 0;
1180
        req->datalen = 0;
1181
        req->targetlen = 0;
1182
        if (xseg_prep_ports(xseg, req, src_portno, dst_portno) < 0) {
1183
                xseg_put_request(xseg, req, src_portno);
1184
                return NULL;
1185
        }
1186
        req->state = 0;
1187
        req->elapsed = 0;
1188
        req->timestamp.tv_sec = 0;
1189
        req->timestamp.tv_usec = 0;
1190
        req->flags = 0;
1191

    
1192
        xq_init_empty(&req->path, MAX_PATH_LEN, req->path_bufs); 
1193

    
1194
        return req;
1195
}
1196

    
1197
//add flags
1198
//do not put request if path not empty or X_FORCE set
1199
int xseg_put_request (struct xseg *xseg, struct xseg_request *xreq,
1200
                        xport portno)
1201
{
1202
        xqindex xqi = XPTR_MAKE(xreq, xseg->segment);
1203
        struct xq *q;
1204
        struct xseg_port *port = xseg_get_port(xseg, xreq->src_portno);
1205
        if (!port) 
1206
                return -1;
1207

    
1208
        if (xreq->buffer){
1209
                void *ptr = XPTR_TAKE(xreq->buffer, xseg->segment);
1210
                xseg_free_buffer(xseg, ptr);
1211
        }
1212
        /* empty path */
1213
        xq_init_empty(&xreq->path, MAX_PATH_LEN, xreq->path_bufs); 
1214
        
1215
        xreq->buffer = 0;
1216
        xreq->bufferlen = 0;
1217
        xreq->target = 0;
1218
        xreq->data = 0;
1219
        xreq->datalen = 0;
1220
        xreq->targetlen = 0;
1221
        xreq->state = 0;
1222
        xreq->src_portno = NoPort;
1223
        xreq->dst_portno = NoPort;
1224
        xreq->transit_portno = NoPort;
1225
        xreq->effective_dst_portno = NoPort;        
1226
        
1227
        if (xreq->elapsed != 0) {
1228
                __lock_segment(xseg);
1229
                ++(xseg->counters.req_cnt);
1230
                xseg->counters.avg_req_lat += xreq->elapsed;
1231
                __unlock_segment(xseg);
1232
        }
1233

    
1234

    
1235
        //try to put it in free_queue of the port
1236
        xlock_acquire(&port->fq_lock, portno);
1237
        q = XPTR_TAKE(port->free_queue, xseg->segment);
1238
        xqi = __xq_append_head(q, xqi);
1239
        xlock_release(&port->fq_lock);
1240
        if (xqi != Noneidx)
1241
                return 0;
1242
        //else return it to segment
1243
        xobj_put_obj(xseg->request_h, (void *) xreq);
1244
        xlock_acquire(&port->port_lock, portno);
1245
        port->alloc_reqs--;
1246
        xlock_release(&port->port_lock);
1247
        return 0;
1248
}
1249

    
1250
int xseg_prep_request ( struct xseg* xseg, struct xseg_request *req,
1251
                        uint32_t targetlen, uint64_t datalen )
1252
{
1253
        uint64_t bufferlen = targetlen + datalen;
1254
        void *buf;
1255
        req->buffer = 0;
1256
        req->bufferlen = 0;
1257
        buf = xseg_alloc_buffer(xseg, bufferlen);
1258
        if (!buf)
1259
                return -1;
1260
        req->bufferlen = xheap_get_chunk_size(buf);
1261
        req->buffer = XPTR_MAKE(buf, xseg->segment);
1262
        
1263
        req->data = req->buffer;
1264
        req->target = req->buffer + req->bufferlen - targetlen;
1265
        req->datalen = datalen;
1266
        req->targetlen = targetlen;
1267
        return 0;
1268
}
1269

    
1270
int xseg_resize_request (struct xseg *xseg, struct xseg_request *req,
1271
                        uint32_t new_targetlen, uint64_t new_datalen)
1272
{
1273
        if (req->bufferlen >= new_datalen + new_targetlen) {
1274
                req->data = req->buffer;
1275
                req->target = req->buffer + req->bufferlen - new_targetlen;
1276
                req->datalen = new_datalen;
1277
                req->targetlen = new_targetlen;
1278
                return 0;
1279
        }
1280

    
1281
        if (req->buffer){
1282
                void *ptr = XPTR_TAKE(req->buffer, xseg->segment);
1283
                xseg_free_buffer(xseg, ptr);
1284
        }
1285
        req->buffer = 0;
1286
        req->bufferlen = 0;
1287
        return xseg_prep_request(xseg, req, new_targetlen, new_datalen);
1288
}
1289

    
1290
static void __update_timestamp(struct xseg_request *xreq)
1291
{
1292
        struct timeval tv;
1293

    
1294
        __get_current_time(&tv);
1295
        if (xreq->timestamp.tv_sec != 0)
1296
                /*
1297
                 * FIXME: Make xreq->elapsed timeval/timespec again to avoid the
1298
                 *                   multiplication?
1299
                 */
1300
                xreq->elapsed += (tv.tv_sec - xreq->timestamp.tv_sec) * 1000000 
1301
                                                + (tv.tv_usec - xreq->timestamp.tv_usec);
1302

    
1303
        xreq->timestamp.tv_sec = tv.tv_sec;
1304
        xreq->timestamp.tv_usec = tv.tv_usec;
1305
}
1306

    
1307
//FIXME should we add NON_BLOCK flag?
1308
xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
1309
                        xport portno, uint32_t flags)
1310
{
1311
        xserial serial = NoSerial;
1312
        xqindex xqi, r;
1313
        struct xq *q, *newq;
1314
        xport next, cur;
1315
        struct xseg_port *port;
1316

    
1317
        /* discover where to submit */
1318

    
1319
        if (!__validate_port(xseg, xreq->transit_portno)){
1320
                XSEGLOG("Couldn't validate transit_portno (portno: %lu)",
1321
                                xreq->transit_portno);
1322
                return NoPort;
1323
        }
1324
        if (!__validate_port(xseg, xreq->effective_dst_portno)){
1325
                XSEGLOG("Couldn't validate effective_dst_portno (portno: %lu)",
1326
                                xreq->effective_dst_portno);
1327
                return NoPort;
1328
        }
1329

    
1330
        cur = xreq->transit_portno;
1331
        next = cur;
1332
        //FIXME assert(cur == portno);
1333
        do {
1334
                if (next == xreq->effective_dst_portno){
1335
                        XSEGLOG("Path ended with no one willing to accept");
1336
                        return NoPort;
1337
                }
1338

    
1339
                if (xseg->path_next[next] != NoPort){
1340
                        next = xseg->path_next[next];
1341
                } else {
1342
                        next = xreq->effective_dst_portno;
1343
                }
1344

    
1345
                port = xseg_get_port(xseg, next);
1346
                if (!port){
1347
                        XSEGLOG("Couldnt get port (next :%u)", next);
1348
                        return NoPort;
1349
                }
1350
        } while ((!port->flags & CAN_ACCEPT));
1351

    
1352
        /* submit */
1353

    
1354
        //__update_timestamp(xreq);
1355

    
1356
        xqi = XPTR_MAKE(xreq, xseg->segment);
1357

    
1358
        /* add current port to path */
1359
        serial = __xq_append_head(&xreq->path, cur);
1360
        if (serial == Noneidx){
1361
                XSEGLOG("Couldn't append path head");
1362
                return NoPort;
1363
        }
1364

    
1365
        xlock_acquire(&port->rq_lock, portno);
1366
        q = XPTR_TAKE(port->request_queue, xseg->segment);
1367
        serial = __xq_append_tail(q, xqi);
1368
        if (flags & X_ALLOC && serial == Noneidx) {
1369
                /* double up queue size */
1370
                XSEGLOG("Trying to double up queue");
1371
                newq = __alloc_queue(xseg, xq_size(q)*2);
1372
                if (!newq)
1373
                        goto out_rel;
1374
                r = __xq_resize(q, newq);
1375
                if (r == Noneidx){
1376
                        xheap_free(newq);
1377
                        goto out_rel;
1378
                }
1379
                port->request_queue = XPTR_MAKE(newq, xseg->segment);
1380
                xheap_free(q);
1381
                serial = __xq_append_tail(newq, xqi);
1382
        }
1383

    
1384
out_rel:
1385
        xlock_release(&port->rq_lock);
1386
        if (serial == Noneidx){
1387
                XSEGLOG("Couldn't append request to queue");
1388
                __xq_pop_head(&xreq->path);
1389
                next = NoPort;
1390
        }
1391
        return next;
1392

    
1393
}
1394

    
1395
struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags)
1396
{
1397
        xqindex xqi;
1398
        xserial serial = NoSerial;
1399
        struct xq *q;
1400
        struct xseg_request *req;
1401
        struct xseg_port *port = xseg_get_port(xseg, portno);
1402
        if (!port)
1403
                return NULL;
1404
retry:
1405
        if (flags & X_NONBLOCK) {
1406
                if (!xlock_try_lock(&port->pq_lock, portno))
1407
                        return NULL;
1408
        } else {
1409
                xlock_acquire(&port->pq_lock, portno);
1410
        }
1411
        q = XPTR_TAKE(port->reply_queue, xseg->segment);
1412
        xqi = __xq_pop_head(q);
1413
        xlock_release(&port->pq_lock);
1414

    
1415
        if (xqi == Noneidx)
1416
                return NULL;
1417

    
1418
        req = XPTR_TAKE(xqi, xseg->segment);
1419
//        __update_timestamp(req);
1420
        serial = __xq_pop_head(&req->path);
1421
        if (serial == Noneidx){
1422
                /* this should never happen */
1423
                XSEGLOG("pop head of path queue returned Noneidx\n");
1424
                goto retry;
1425
        }
1426

    
1427

    
1428
        return req;
1429
}
1430

    
1431
struct xseg_request *xseg_accept(struct xseg *xseg, xport portno, uint32_t flags)
1432
{
1433
        xqindex xqi;
1434
        struct xq *q;
1435
        struct xseg_request *req;
1436
        struct xseg_port *port = xseg_get_port(xseg, portno);
1437
        if (!port)
1438
                return NULL;
1439
        if (flags & X_NONBLOCK) {
1440
                if (!xlock_try_lock(&port->rq_lock, portno))
1441
                        return NULL;
1442
        } else {
1443
                xlock_acquire(&port->rq_lock, portno);
1444
        }
1445

    
1446
        q = XPTR_TAKE(port->request_queue, xseg->segment);
1447
        xqi = __xq_pop_head(q);
1448
        xlock_release(&port->rq_lock);
1449
        if (xqi == Noneidx)
1450
                return NULL;
1451

    
1452
        req = XPTR_TAKE(xqi, xseg->segment);
1453
        req->transit_portno = portno;
1454

    
1455
        return req;
1456
}
1457

    
1458
//FIXME should we add NON_BLOCK flag?
1459
xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
1460
                        xport portno, uint32_t flags)
1461
{
1462
        xserial serial = NoSerial;
1463
        xqindex xqi, r;
1464
        struct xq *q, *newq;
1465
        struct xseg_port *port;
1466
        xport dst;
1467

    
1468
retry:
1469
        serial = __xq_peek_head(&xreq->path);
1470
        if (serial == Noneidx)
1471
                return NoPort;
1472
        dst = (xport) serial;
1473

    
1474
        port = xseg_get_port(xseg, dst);
1475
        if (!port)
1476
                return NoPort;
1477
        if (!(port->flags & CAN_RECEIVE)){
1478
                //XSEGLOG("Port %u cannot receive", dst);
1479
                /* Port cannot receive. Try next one in path */
1480
                __xq_pop_head(&xreq->path);
1481
                goto retry;
1482
        }
1483

    
1484
        xqi = XPTR_MAKE(xreq, xseg->segment);
1485

    
1486
        xlock_acquire(&port->pq_lock, portno);
1487
        q = XPTR_TAKE(port->reply_queue, xseg->segment);
1488
        serial = __xq_append_tail(q, xqi);
1489
        if (flags & X_ALLOC && serial == Noneidx) {
1490
                newq = __alloc_queue(xseg, xq_size(q)*2);
1491
                if (!newq)
1492
                        goto out_rel;
1493
                r = __xq_resize(q, newq);
1494
                if (r == Noneidx) {
1495
                        xheap_free(newq);
1496
                        goto out_rel;
1497
                }
1498
                port->reply_queue = XPTR_MAKE(newq, xseg->segment);
1499
                xheap_free(q);
1500
                serial = __xq_append_tail(newq, xqi);
1501
        }
1502

    
1503
out_rel:
1504
        xlock_release(&port->pq_lock);
1505

    
1506
        if (serial == Noneidx)
1507
                dst = NoPort;
1508
        return dst;
1509
        
1510
}
1511

    
1512
xport xseg_forward(struct xseg *xseg, struct xseg_request *req, xport new_dst,
1513
                xport portno, uint32_t flags)
1514
{
1515
        if (!__validate_port(xseg, new_dst)){
1516
                XSEGLOG("Couldn't validate new destination (new_dst %lu)",
1517
                                new_dst);
1518
                return NoPort;
1519
        }
1520
        req->effective_dst_portno = new_dst;
1521
        return xseg_submit(xseg, req, portno, flags);
1522

    
1523
}
1524

    
1525
int xseg_set_path_next(struct xseg *xseg, xport portno, xport next)
1526
{
1527
        if (!__validate_port(xseg, portno))
1528
                return -1;
1529
        if (!__validate_port(xseg, next))
1530
                return -1;
1531
        xseg->path_next[portno] = next;
1532
        return 0;
1533
}
1534

    
1535
int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data)
1536
{
1537
        int r;
1538
        xhash_t *req_data;
1539
        
1540
        xlock_acquire(&xseg->priv->reqdatalock, 1);
1541

    
1542
        req_data = xseg->priv->req_data;
1543
        r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1544
        if (r == -XHASH_ERESIZE) {
1545
                req_data = xhash_resize(req_data, xhash_grow_size_shift(req_data), 0, NULL);
1546
                if (req_data) {
1547
                        xseg->priv->req_data = req_data;
1548
                        r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1549
                }
1550
        }
1551

    
1552
        xlock_release(&xseg->priv->reqdatalock);
1553
        return r;
1554
}
1555

    
1556
int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data)
1557
{
1558
        int r;
1559
        xhashidx val;
1560
        xhash_t *req_data;
1561
        
1562
        xlock_acquire(&xseg->priv->reqdatalock, 1);
1563

    
1564
        req_data = xseg->priv->req_data;
1565
        //maybe we need a xhash_delete with lookup...
1566
        //maybe we also need a delete that doesn't shrink xhash
1567
        r = xhash_lookup(req_data, (xhashidx) xreq, &val);
1568
        *data = (void *) val;
1569
        if (r >= 0) {
1570
                r = xhash_delete(req_data, (xhashidx) xreq);
1571
                if (r == -XHASH_ERESIZE) {
1572
                        req_data = xhash_resize(req_data, xhash_shrink_size_shift(req_data), 0, NULL);
1573
                        if (req_data){
1574
                                xseg->priv->req_data = req_data;
1575
                                r = xhash_delete(req_data, (xhashidx) xreq);
1576
                        }
1577
                }
1578
        }
1579

    
1580
        xlock_release(&xseg->priv->reqdatalock);
1581
        return r;
1582
}
1583

    
1584
struct xobject_h * xseg_get_objh(struct xseg *xseg, uint32_t magic, uint64_t size)
1585
{
1586
        int r;
1587
        struct xobject_h *obj_h = xobj_get_obj(xseg->object_handlers, X_ALLOC);
1588
        if (!obj_h)
1589
                return NULL;
1590
        r = xobj_handler_init(obj_h, xseg->segment, magic, size, xseg->heap);
1591
        if (r < 0) {
1592
                xobj_put_obj(xseg->object_handlers, obj_h);
1593
                return NULL;
1594
        }
1595
        return obj_h;
1596
}
1597

    
1598
void xseg_put_objh(struct xseg *xseg, struct xobject_h *objh)
1599
{
1600
        xobj_put_obj(xseg->object_handlers, objh);
1601
}
1602

    
1603

    
1604
/*
1605
int xseg_complete_req(struct xseg_request *req)
1606
{
1607
        req->state |= XS_SERVED;
1608
        req->state &= ~XS_FAILED;
1609
}
1610

1611
int xseg_fail_req(struct xseg_request *req)
1612
{
1613
        req->state &= ~XS_SERVED;
1614
        req->state |= XS_FAILED;
1615
}
1616
*/
1617

    
1618
struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req, void * sd)
1619
{
1620
        uint32_t portno, maxno, id = __get_id(), force;
1621
        struct xseg_port *port = NULL;
1622
        void *peer_data, *sigdesc;
1623
        int64_t driver;
1624
        int r;
1625

    
1626
        if (req >= xseg->config.nr_ports) {
1627
                portno = 0;
1628
                maxno = xseg->config.nr_ports;
1629
                force = 0;
1630
        } else {
1631
                portno = req;
1632
                maxno = req + 1;
1633
                force = 1;
1634
        }
1635

    
1636
        __lock_segment(xseg);
1637
        for (; portno < maxno; portno++) {
1638
                if (!xseg->ports[portno]) {
1639
                        port = xseg_alloc_port(xseg, X_ALLOC, XSEG_DEF_REQS);
1640
                        if (!port)
1641
                                goto out;
1642
                } else if (force) {
1643
                        port = xseg_get_port(xseg, portno);
1644
                        if (!port)
1645
                                goto out;        
1646
                } else {
1647
                        continue;
1648
                }
1649
                driver = __enable_driver(xseg, &xseg->priv->peer_type);
1650
                if (driver < 0)
1651
                        break;
1652
                if (!sd){
1653
                        peer_data = __get_peer_type_data(xseg, (uint64_t) driver);
1654
                        if (!peer_data)
1655
                                break;
1656
                        sigdesc = xseg->priv->peer_type.peer_ops.alloc_signal_desc(xseg, peer_data);
1657
                        if (!sigdesc)
1658
                                break;
1659
                        r = xseg->priv->peer_type.peer_ops.init_signal_desc(xseg, sigdesc);
1660
                        if (r < 0){
1661
                                xseg->priv->peer_type.peer_ops.free_signal_desc(xseg, peer_data, sigdesc);
1662
                                break;
1663
                        }
1664
                        port->signal_desc = XPTR_MAKE(sigdesc, xseg->segment);
1665
                } else {
1666
                        port->signal_desc = XPTR_MAKE(sd, xseg->segment);
1667
                }
1668
                port->peer_type = (uint64_t)driver;
1669
                port->owner = id;
1670
                port->portno = portno;
1671
                port->flags = CAN_ACCEPT | CAN_RECEIVE;
1672
                xseg->ports[portno] = XPTR_MAKE(port, xseg->segment);
1673
                goto out;
1674
        }
1675
        if (port) {
1676
                xseg_free_port(xseg, port);
1677
                xseg->ports[portno] = 0;
1678
                port = NULL;
1679
        }
1680
out:
1681
        __unlock_segment(xseg);
1682
        return port;
1683
}
1684

    
1685
/*
1686
 * set the limit of requests, a port can allocate.
1687
 *
1688
 * this limit should be greater than the number of requests a port can cache
1689
 * locally on its free_queue, and less than the hard limit imposed by the
1690
 * segment.
1691
 *
1692
 * maybe make it drop excess requests
1693
 */
1694
int xseg_set_max_requests(struct xseg *xseg, xport portno, uint64_t nr_reqs)
1695
{
1696
        int r = -1;
1697
        struct xseg_port *port;
1698
        struct xq *q;
1699
        if (nr_reqs > XSEG_MAX_ALLOCATED_REQS)
1700
                return -1;
1701

    
1702
        port = xseg_get_port(xseg, portno);
1703
        if (!port)
1704
                return -1;
1705

    
1706
        xlock_acquire(&port->fq_lock, portno);
1707
        q = XPTR_TAKE(port->free_queue, xseg->segment);
1708
        if (xq_size(q) <= nr_reqs){
1709
                port->max_alloc_reqs = nr_reqs;
1710
                r = 0;
1711
        }
1712
        xlock_release(&port->fq_lock);
1713

    
1714
        /* no lock here 
1715
         * if theres is a get_request in progress, it is not critical to enforce
1716
         * the new limit.
1717
         */
1718
        port->max_alloc_reqs = nr_reqs;
1719
        return r;
1720
}
1721

    
1722
uint64_t xseg_get_max_requests(struct xseg *xseg, xport portno)
1723
{
1724
        struct xseg_port *port = xseg_get_port(xseg, portno);
1725
        if (!port)
1726
                return -1;
1727
        return port->max_alloc_reqs;
1728
}
1729

    
1730
uint64_t xseg_get_allocated_requests(struct xseg *xseg, xport portno)
1731
{
1732
        struct xseg_port *port = xseg_get_port(xseg, portno);
1733
        if (!port)
1734
                return -1;
1735
        return port->alloc_reqs;
1736
}
1737

    
1738
/*
1739
 * set free_queue size, aka the local "cached" requests a port can have
1740
 * it should be smaller than port->max_alloc_reqs?
1741
 *
1742
 */
1743
int xseg_set_freequeue_size(struct xseg *xseg, xport portno, xqindex size,
1744
                                uint32_t flags)
1745
{
1746
        int ret = 0;
1747
        xqindex xqi,r;
1748
        struct xq *q, *newq;
1749
        struct xseg_request *xreq;
1750
        struct xseg_port *port = xseg_get_port(xseg, portno);
1751
        if (!port)
1752
                return -1;
1753

    
1754
        newq = __alloc_queue(xseg, size);
1755
        if (!newq)
1756
                return -1;
1757

    
1758
        if (flags & X_NONBLOCK) {
1759
                if (!xlock_try_lock(&port->fq_lock, portno))
1760
                        return -1;
1761
        } else {
1762
                xlock_acquire(&port->fq_lock, portno);
1763
        }
1764

    
1765
        q = XPTR_TAKE(port->free_queue, xseg->segment);
1766

    
1767
        /* put requests that don't fit in the new queue */
1768
        while (xq_count(q) > xq_size(newq)){
1769
                xqi = __xq_pop_head(q);
1770
                if (xqi != Noneidx){
1771
                        xreq = XPTR_TAKE(xqi, xseg->segment);
1772
                        xobj_put_obj(xseg->request_h, (void *) xreq);
1773
                }
1774
        }
1775

    
1776
        r = __xq_resize(q, newq);
1777
        if (r == Noneidx){
1778
                xheap_free(newq);
1779
                ret = -1;
1780
                goto out_rel;
1781
        }
1782
        port->free_queue = XPTR_MAKE(newq, xseg->segment);
1783
        xheap_free(q);
1784
        ret = 0;
1785

    
1786
out_rel:
1787
        xlock_release(&port->fq_lock);
1788
        return ret;
1789
}
1790

    
1791
int xseg_leave_port(struct xseg *xseg, struct xseg_port *port)
1792
{
1793
        /* To be implemented */
1794
        return -1;
1795
}
1796

    
1797
int xseg_initialize(void)
1798
{
1799
        return __xseg_preinit();        /* with or without lock ? */
1800
}
1801

    
1802
int xseg_finalize(void)
1803
{
1804
        /* finalize not supported yet */
1805
        return -1;
1806
}
1807

    
1808

    
1809
char* xseg_get_data_nonstatic(struct xseg* xseg, struct xseg_request *req)
1810
{
1811
        return xseg_get_data(xseg, req);
1812
}
1813

    
1814
char* xseg_get_target_nonstatic(struct xseg* xseg, struct xseg_request *req)
1815
{
1816
        return xseg_get_target(xseg, req);
1817
}
1818

    
1819

    
1820
#ifdef __KERNEL__
1821
#include <linux/module.h>
1822
#include <xseg/xseg_exports.h>
1823
#endif
1824