Disable update stamp during request transfer.
[archipelago] / xseg / xseg / xseg.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <xseg/xseg.h>
36 #include <xseg/domain.h>
37 #include <sys/util.h>
38
39 #ifndef NULL
40 #define NULL ((void *)0)
41 #endif
42
43 #define XSEG_NR_TYPES 16
44 #define XSEG_NR_PEER_TYPES 64
45 #define XSEG_MIN_PAGE_SIZE 4096
46
47 static struct xseg_type *__types[XSEG_NR_TYPES];
48 static unsigned int __nr_types;
49 static struct xseg_peer *__peer_types[XSEG_NR_PEER_TYPES];
50 static unsigned int __nr_peer_types;
51
52 static void __lock_segment(struct xseg *xseg)
53 {
54         volatile uint64_t *flags;
55         flags = &xseg->shared->flags;
56         while (__sync_fetch_and_or(flags, XSEG_F_LOCK));
57 }
58
59 static void __unlock_segment(struct xseg *xseg)
60 {
61         volatile uint64_t *flags;
62         flags = &xseg->shared->flags;
63         __sync_fetch_and_and(flags, ~XSEG_F_LOCK);
64 }
65
66 static struct xseg_type *__find_type(const char *name, long *index)
67 {
68         long i;
69         for (i = 0; (*index = i) < __nr_types; i++)
70                 if (!strncmp(__types[i]->name, name, XSEG_TNAMESIZE))
71                         return __types[i];
72         return NULL;
73 }
74
75 static struct xseg_peer *__find_peer_type(const char *name, int64_t *index)
76 {
77         int64_t i;
78         for (i = 0; (*index = i) < __nr_peer_types; i++) {
79                 if (!strncmp(__peer_types[i]->name, name, XSEG_TNAMESIZE))
80                         return __peer_types[i];
81         }
82         return NULL;
83 }
84
85 void xseg_report_peer_types(void)
86 {
87         long i;
88         XSEGLOG("total %u peer types:\n", __nr_peer_types);
89         for (i = 0; i < __nr_peer_types; i++)
90                 XSEGLOG("%ld: '%s'\n", i, __peer_types[i]->name);
91 }
92
93 static struct xseg_type *__find_or_load_type(const char *name)
94 {
95         long i;
96         struct xseg_type *type = __find_type(name, &i);
97         if (type)
98                 return type;
99
100         __load_plugin(name);
101         return __find_type(name, &i);
102 }
103
104 static struct xseg_peer *__find_or_load_peer_type(const char *name)
105 {
106         int64_t i;
107         struct xseg_peer *peer_type = __find_peer_type(name, &i);
108         if (peer_type)
109                 return peer_type;
110
111         __load_plugin(name);
112         return __find_peer_type(name, &i);
113 }
114
115 static struct xseg_peer *__get_peer_type(struct xseg *xseg, uint32_t serial)
116 {
117         char *name;
118         struct xseg_peer *type;
119         struct xseg_private *priv = xseg->priv;
120         char (*shared_peer_types)[XSEG_TNAMESIZE];
121
122         if (serial >= xseg->max_peer_types) {
123                 XSEGLOG("invalid peer type serial %d >= %d\n",
124                          serial, xseg->max_peer_types);
125                 return NULL;
126         }
127
128         type = priv->peer_types[serial];
129         if (type)
130                 return type;
131
132         /* xseg->shared->peer_types is an append-only array,
133          * therefore this should be safe
134          * without either locking or string copying. */
135         shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
136         name = shared_peer_types[serial];
137         if (!*name) {
138                 XSEGLOG("nonexistent peer type serial %d\n", serial);
139                 return NULL;
140         }
141
142         type = __find_or_load_peer_type(name);
143         if (!type)
144                 XSEGLOG("could not find driver for peer type %d [%s]\n",
145                          serial, name);
146
147         priv->peer_types[serial] = type;
148         return type;
149 }
150
151 static void * __get_peer_type_data(struct xseg *xseg, uint32_t serial)
152 {
153         char *name;
154         void *data;
155         struct xseg_private *priv = xseg->priv;
156         char (*shared_peer_types)[XSEG_TNAMESIZE];
157         xptr *shared_peer_type_data;
158
159         if (serial >= xseg->max_peer_types) {
160                 XSEGLOG("invalid peer type serial %d >= %d\n",
161                          serial, xseg->max_peer_types);
162                 return 0;
163         }
164
165         data = priv->peer_type_data[serial];
166         if (data)
167                 return data;
168
169         shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
170         name = shared_peer_types[serial];
171         if (!*name) {
172                 XSEGLOG("nonexistent peer type serial %d\n", serial);
173                 return 0;
174         }
175         shared_peer_type_data = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
176
177         priv->peer_type_data[serial] = XPTR_TAKE(shared_peer_type_data[serial], xseg->segment);
178         return priv->peer_type_data[serial];
179 }
180
181 static inline int __validate_port(struct xseg *xseg, uint32_t portno)
182 {
183         return portno < xseg->config.nr_ports;
184 }
185
186 static inline int __validate_ptr(struct xseg *xseg, xptr ptr)
187 {
188         return ptr < xseg->segment_size;
189 }
190
191 /* type:name:nr_ports:nr_requests:request_size:extra_size:page_shift */
192
193 #define TOK(s, sp, def) \
194         (s) = (sp); \
195         for (;;) { \
196                 switch (*(sp)) { \
197                 case 0: \
198                         s = (def); \
199                         break; \
200                 case ':': \
201                         *(sp)++ = 0; \
202                         break; \
203                 default: \
204                         (sp) ++; \
205                         continue; \
206                 } \
207                 break; \
208         } \
209
210 static unsigned long strul(char *s)
211 {
212         unsigned long n = 0;
213         for (;;) {
214                 unsigned char c = *s - '0';
215                 if (c >= 10)
216                         break;
217                 n = n * 10 + c;
218                 s ++;
219         }
220         return n;
221 }
222
223 /*
224 static char *strncopy(char *dest, const char *src, uint32_t n)
225 {
226         uint32_t i;
227         char c;
228         for (i = 0; i < n; i++) {
229                 c = src[i];
230                 dest[i] = c;
231                 if (!c)
232                         break;
233         }
234         dest[n-1] = 0;
235         return dest;
236 }
237 */
238
239 int xseg_parse_spec(char *segspec, struct xseg_config *config)
240 {
241         /* default: "posix:globalxseg:4:256:12" */
242         char *s = segspec, *sp = segspec;
243
244         /* type */
245         TOK(s, sp, "posix");
246         strncpy(config->type, s, XSEG_TNAMESIZE);
247         config->type[XSEG_TNAMESIZE-1] = 0;
248
249         /* name */
250         TOK(s, sp, "globalxseg");
251         strncpy(config->name, s, XSEG_NAMESIZE);
252         config->name[XSEG_NAMESIZE-1] = 0;
253
254         /* nr_ports */
255         TOK(s, sp, "4");
256         config->nr_ports = strul(s);
257
258         /* heap_size */
259         TOK(s, sp, "256");
260         config->heap_size = (uint64_t) (strul(s) * 1024UL * 1024UL);
261
262         /* page_shift */
263         TOK(s, sp, "12");
264         config->page_shift = strul(s);
265         return 0;
266 }
267
268 int xseg_register_type(struct xseg_type *type)
269 {
270         long i;
271         int r = -1;
272         struct xseg_type *__type;
273         __lock_domain();
274         __type = __find_type(type->name, &i);
275         if (__type) {
276                 XSEGLOG("type %s already exists\n", type->name);
277                 goto out;
278         }
279
280         if (__nr_types >= XSEG_NR_TYPES) {
281                 XSEGLOG("maximum type registrations reached: %u\n", __nr_types);
282                 r -= 1;
283                 goto out;
284         }
285
286         type->name[XSEG_TNAMESIZE-1] = 0;
287         __types[__nr_types] = type;
288         __nr_types += 1;
289         r = 0;
290 out:
291         __unlock_domain();
292         return r;
293 }
294
295 int xseg_unregister_type(const char *name)
296 {
297         long i;
298         int r = -1;
299         struct xseg_type *__type;
300         __lock_domain();
301         __type = __find_type(name, &i);
302         if (!__type) {
303                 XSEGLOG("segment type '%s' does not exist\n", name);
304                 goto out;
305         }
306
307         __nr_types -= 1;
308         __types[i] = __types[__nr_types];
309         __types[__nr_types] = NULL;
310         r = 0;
311 out:
312         __unlock_domain();
313         return r;
314 }
315
316 int xseg_register_peer(struct xseg_peer *peer_type)
317 {
318         int64_t i;
319         int r = -1;
320         struct xseg_peer *type;
321         __lock_domain();
322         type = __find_peer_type(peer_type->name, &i);
323         if (type) {
324                 XSEGLOG("peer type '%s' already exists\n", type->name);
325                 goto out;
326         }
327
328         if (__nr_peer_types >= XSEG_NR_PEER_TYPES) {
329                 XSEGLOG("maximum peer type registrations reached: %u",
330                         __nr_peer_types);
331                 r -= 1;
332                 goto out;
333         }
334
335         if (peer_type->peer_ops.remote_signal_init()) {
336                 XSEGLOG("peer type '%s': signal initialization failed\n",
337                         peer_type->name);
338                 r -= 1;
339                 goto out;
340         }
341
342         peer_type->name[XSEG_TNAMESIZE-1] = 0;
343         __peer_types[__nr_peer_types] = peer_type;
344         __nr_peer_types += 1;
345         r = 0;
346
347 out:
348         __unlock_domain();
349         return r;
350 }
351
352 int xseg_unregister_peer(const char *name)
353 {
354         int64_t i;
355         struct xseg_peer *driver;
356         int r = -1;
357         __lock_domain();
358         driver = __find_peer_type(name, &i);
359         if (!driver) {
360                 XSEGLOG("peer type '%s' does not exist\n", name);
361                 goto out;
362         }
363
364         __nr_peer_types -= 1;
365         __peer_types[i] = __peer_types[__nr_peer_types];
366         __peer_types[__nr_peer_types] = NULL;
367         driver->peer_ops.remote_signal_quit();
368         r = 0;
369 out:
370         __unlock_domain();
371         return r;
372 }
373
374 int64_t __enable_driver(struct xseg *xseg, struct xseg_peer *driver)
375 {
376         int64_t r;
377         char (*drivers)[XSEG_TNAMESIZE];
378         xptr *ptd;
379         uint32_t max_drivers = xseg->max_peer_types;
380         void *data;
381         xptr peer_type_data;
382
383         if (xseg->shared->nr_peer_types >= max_drivers) {
384                 XSEGLOG("cannot register '%s': driver namespace full\n",
385                         driver->name);
386                 return -1;
387         }
388
389         drivers = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
390         for (r = 0; r < max_drivers; r++) {
391                 if (!*drivers[r])
392                         goto bind;
393                 if (!strncmp(drivers[r], driver->name, XSEG_TNAMESIZE)){
394                         data = __get_peer_type_data(xseg, r);
395                         goto success;
396                 }
397         }
398
399         /* Unreachable */
400         return -666;
401
402 bind:
403         /* assert(xseg->shared->nr_peer_types == r); */
404         data = driver->peer_ops.alloc_data(xseg);
405         if (!data)
406                 return -1;
407         peer_type_data = XPTR_MAKE(data, xseg->segment);
408         ptd = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
409         ptd[r] = peer_type_data;
410         xseg->shared->nr_peer_types = r + 1;
411         strncpy(drivers[r], driver->name, XSEG_TNAMESIZE);
412         drivers[r][XSEG_TNAMESIZE-1] = 0;
413
414 success:
415         xseg->priv->peer_types[r] = driver;
416         xseg->priv->peer_type_data[r] = data;
417         return r;
418 }
419
420 int64_t xseg_enable_driver(struct xseg *xseg, const char *name)
421 {
422         int64_t r = -1;
423         struct xseg_peer *driver;
424
425         __lock_domain();
426         driver = __find_peer_type(name, &r);
427         if (!driver) {
428                 XSEGLOG("driver '%s' not found\n", name);
429                 goto out;
430         }
431
432         __lock_segment(xseg);
433         r = __enable_driver(xseg, driver);
434         __unlock_segment(xseg);
435 out:
436         __unlock_domain();
437         return r;
438 }
439
440 int xseg_disable_driver(struct xseg *xseg, const char *name)
441 {
442         int64_t i;
443         int r = -1;
444         struct xseg_private *priv = xseg->priv;
445         struct xseg_peer *driver;
446         __lock_domain();
447         driver =  __find_peer_type(name, &i);
448         if (!driver) {
449                 XSEGLOG("driver '%s' not found\n", name);
450                 goto out;
451         }
452
453         for (i = 0; i < xseg->max_peer_types; i++)
454                 if (priv->peer_types[i] == driver)
455                         priv->peer_types[i] = NULL;
456         r = 0;
457 out:
458         __unlock_domain();
459         return r;
460 }
461
462 /* NOTE: calculate_segment_size() and initialize_segment()
463  * must always be exactly in sync!
464 */
465
466 static uint64_t calculate_segment_size(struct xseg_config *config)
467 {
468         uint64_t size = 0;
469         uint32_t page_size, page_shift = config->page_shift;
470
471         /* assert(sizeof(struct xseg) <= (1 << 9)); */
472
473         if (page_shift < 9) {
474                 XSEGLOG("page_shift must be >= %d\n", 9);
475                 return 0;
476         }
477
478         page_size = 1 << page_shift;
479
480         /* struct xseg itself + struct xheap */
481         size += 2*page_size + config->heap_size;
482         size = __align(size, page_shift);
483         
484         return size;
485 }
486
487 static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
488 {
489         uint32_t page_shift = cfg->page_shift, page_size = 1 << page_shift;
490         struct xseg_shared *shared;
491         char *segment = (char *)xseg;
492         uint64_t size = page_size, i;
493         void *mem;
494         struct xheap *heap;
495         struct xobject_h *obj_h;
496         int r;
497         xptr *ports;
498         xport *gw;
499
500
501         if (page_size < XSEG_MIN_PAGE_SIZE)
502                 return -1;
503
504         xseg->segment_size = 2 * page_size + cfg->heap_size;
505         xseg->segment = (struct xseg *) segment;
506
507         /* build heap */
508         xseg->heap = (struct xheap *) XPTR_MAKE(segment + size, segment);
509         size += sizeof(struct xheap);
510         size = __align(size, page_shift);
511
512         heap = XPTR_TAKE(xseg->heap, segment);
513         r = xheap_init(heap, cfg->heap_size, page_shift, segment+size);
514         if (r < 0)
515                 return -1;
516
517         /* build object_handler handler */
518         mem = xheap_allocate(heap, sizeof(struct xobject_h));
519         if (!mem)
520                 return -1;
521         xseg->object_handlers = (struct xobject_h *) XPTR_MAKE(mem, segment);
522         obj_h = mem;
523         r = xobj_handler_init(obj_h, segment, MAGIC_OBJH, 
524                         sizeof(struct xobject_h), heap);
525         if (r < 0)
526                 return -1;
527
528         //now that we have object handlers handler, use that to allocate
529         //new object handlers
530         
531         //allocate requests handler
532         mem = xobj_get_obj(obj_h, X_ALLOC);
533         if (!mem)
534                 return -1;
535         obj_h = mem;
536         r = xobj_handler_init(obj_h, segment, MAGIC_REQ, 
537                         sizeof(struct xseg_request), heap);
538         if (r < 0)
539                 return -1;
540         xseg->request_h = (struct xobject_h *) XPTR_MAKE(obj_h, segment);
541         
542         //allocate ports handler
543         obj_h = XPTR_TAKE(xseg->object_handlers, segment);
544         mem = xobj_get_obj(obj_h, X_ALLOC);
545         if (!mem)
546                 return -1;
547         obj_h = mem;
548         r = xobj_handler_init(obj_h, segment, MAGIC_PORT, 
549                         sizeof(struct xseg_port), heap);
550         if (r < 0)
551                 return -1;
552         xseg->port_h = (struct xobject_h *) XPTR_MAKE(mem, segment);
553
554         //allocate xptr port array to be used as a map
555         //portno <--> xptr port
556         mem = xheap_allocate(heap, sizeof(xptr)*cfg->nr_ports);
557         if (!mem)
558                 return -1;
559         ports = mem;
560         for (i = 0; i < cfg->nr_ports; i++) {
561                 ports[i]=0;
562         }
563         xseg->ports = (xptr *) XPTR_MAKE(mem, segment);
564
565         //allocate {src,dst} gws
566         mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
567         if (!mem)
568                 return -1;
569         gw = mem;
570         for (i = 0; i < cfg->nr_ports; i++) {
571                 gw[i] = NoPort;
572         }
573         xseg->path_next = (xport *) XPTR_MAKE(mem, segment);
574
575         mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
576         if (!mem)
577                 return -1;
578         gw = mem;
579         for (i = 0; i < cfg->nr_ports; i++) {
580                 gw[i] = NoPort;
581         }
582         xseg->dst_gw = (xport *) XPTR_MAKE(mem, segment);
583         
584         //allocate xseg_shared memory
585         mem = xheap_allocate(heap, sizeof(struct xseg_shared));
586         if (!mem)
587                 return -1;
588         shared = (struct xseg_shared *) mem;
589         shared->flags = 0;
590         shared->nr_peer_types = 0;
591         xseg->shared = (struct xseg_shared *) XPTR_MAKE(mem, segment);
592         
593         mem = xheap_allocate(heap, page_size);
594         if (!mem)
595                 return -1;
596         shared->peer_types = (char **) XPTR_MAKE(mem, segment);
597         xseg->max_peer_types = xheap_get_chunk_size(mem) / XSEG_TNAMESIZE;
598         mem = xheap_allocate(heap, xseg->max_peer_types * sizeof(xptr));
599         if (!mem)
600                 return -1;
601         memset(mem, 0, xheap_get_chunk_size(mem));
602         shared->peer_type_data = (xptr *) XPTR_MAKE(mem, segment);
603
604         memcpy(&xseg->config, cfg, sizeof(struct xseg_config));
605
606         xseg->counters.req_cnt = 0;
607         xseg->counters.avg_req_lat = 0;
608
609         return 0;
610 }
611
612 int xseg_create(struct xseg_config *cfg)
613 {
614         struct xseg *xseg = NULL;
615         struct xseg_type *type;
616         struct xseg_operations *xops;
617         uint64_t size;
618         long r;
619
620         type = __find_or_load_type(cfg->type);
621         if (!type) {
622                 cfg->type[XSEG_TNAMESIZE-1] = 0;
623                 XSEGLOG("type '%s' does not exist\n", cfg->type);
624                 goto out_err;
625         }
626
627         size = calculate_segment_size(cfg);
628         if (!size) {
629                 XSEGLOG("invalid config!\n");
630                 goto out_err;
631         }
632
633         xops = &type->ops;
634         cfg->name[XSEG_NAMESIZE-1] = 0;
635         XSEGLOG("creating segment of size %llu\n", size);
636         r = xops->allocate(cfg->name, size);
637         if (r) {
638                 XSEGLOG("cannot allocate segment!\n");
639                 goto out_err;
640         }
641
642         xseg = xops->map(cfg->name, size, NULL);
643         if (!xseg) {
644                 XSEGLOG("cannot map segment!\n");
645                 goto out_deallocate;
646         }
647
648         r = initialize_segment(xseg, cfg);
649         xops->unmap(xseg, size);
650         if (r) {
651                 XSEGLOG("cannot initilize segment!\n");
652                 goto out_deallocate;
653         }
654
655
656         return 0;
657
658 out_deallocate:
659         xops->deallocate(cfg->name);
660 out_err:
661         return -1;
662 }
663
664 void xseg_destroy(struct xseg *xseg)
665 {
666         struct xseg_type *type;
667
668         __lock_domain();
669         type = __find_or_load_type(xseg->config.type);
670         if (!type) {
671                 XSEGLOG("no segment type '%s'\n", xseg->config.type);
672                 goto out;
673         }
674
675         /* should destroy() leave() first? */
676         type->ops.deallocate(xseg->config.name);
677 out:
678         __unlock_domain();
679 }
680
681 //FIXME
682 static int pointer_ok(  unsigned long ptr,
683                         unsigned long base,
684                         uint64_t size,
685                         char *name)
686 {
687         int ret = !(ptr >= base && ptr < base + size);
688         if (ret)
689                 XSEGLOG("invalid pointer '->%s' [%llx on %llx]!\n",
690                         (unsigned long long)ptr,
691                         (unsigned long long)base,
692                         name);
693         return ret;
694 }
695
696 #define POINTER_OK(xseg, field, base) \
697          pointer_ok(    (unsigned long)((xseg)->field), \
698                         (unsigned long)(base), \
699                         (xseg)->segment_size, \
700                         #field)
701
702 static int xseg_validate_pointers(struct xseg *xseg)
703 {
704         int r = 0;
705         r += POINTER_OK(xseg, object_handlers, xseg->segment);
706         r += POINTER_OK(xseg, request_h, xseg->segment);
707         r += POINTER_OK(xseg, port_h, xseg->segment);
708         r += POINTER_OK(xseg, ports, xseg->segment);
709         r += POINTER_OK(xseg, heap, xseg->segment);
710         r += POINTER_OK(xseg, shared, xseg->segment);
711         return r;
712 }
713
714 struct xseg *xseg_join( char *segtypename,
715                         char *segname,
716                         char *peertypename,
717                         void (*wakeup)
718                         (       uint32_t portno         ))
719 {
720         struct xseg *xseg, *__xseg;
721         uint64_t size;
722         struct xseg_peer *peertype;
723         struct xseg_type *segtype;
724         struct xseg_private *priv;
725         struct xseg_operations *xops;
726         struct xseg_peer_operations *pops;
727         int r;
728
729         __lock_domain();
730
731         peertype = __find_or_load_peer_type(peertypename);
732         if (!peertype) {
733                 XSEGLOG("Peer type '%s' not found\n", peertypename);
734                 __unlock_domain();
735                 goto err;
736         }
737
738         segtype = __find_or_load_type(segtypename);
739         if (!segtype) {
740                 XSEGLOG("Segment type '%s' not found\n", segtypename);
741                 __unlock_domain();
742                 goto err;
743         }
744
745         __unlock_domain();
746
747         xops = &segtype->ops;
748         pops = &peertype->peer_ops;
749
750         xseg = pops->malloc(sizeof(struct xseg));
751         if (!xseg) {
752                 XSEGLOG("Cannot allocate memory");
753                 goto err;
754         }
755
756         priv = pops->malloc(sizeof(struct xseg_private));
757         if (!priv) {
758                 XSEGLOG("Cannot allocate memory");
759                 goto err_seg;
760         }
761
762         __xseg = xops->map(segname, XSEG_MIN_PAGE_SIZE, NULL);
763         if (!__xseg) {
764                 XSEGLOG("Cannot map segment");
765                 goto err_priv;
766         }
767
768         size = __xseg->segment_size;
769         /* XSEGLOG("joined segment of size: %lu\n", (unsigned long)size); */
770         xops->unmap(__xseg, XSEG_MIN_PAGE_SIZE);
771
772         __xseg = xops->map(segname, size, xseg);
773         if (!__xseg) {
774                 XSEGLOG("Cannot map segment");
775                 goto err_priv;
776         }
777
778         priv->segment_type = *segtype;
779         priv->peer_type = *peertype;
780         priv->wakeup = wakeup;
781         priv->req_data = xhash_new(3, INTEGER); //FIXME should be relative to XSEG_DEF_REQS
782         if (!priv->req_data)
783                 goto err_priv;
784         xlock_release(&priv->reqdatalock);
785
786         xseg->max_peer_types = __xseg->max_peer_types;
787
788         priv->peer_types = pops->malloc(sizeof(void *) * xseg->max_peer_types);
789         if (!priv->peer_types) {
790                 XSEGLOG("Cannot allocate memory");
791                 goto err_unmap;
792         }
793         memset(priv->peer_types, 0, sizeof(void *) * xseg->max_peer_types);
794         priv->peer_type_data = pops->malloc(sizeof(void *) * xseg->max_peer_types);
795         if (!priv->peer_types) {
796                 XSEGLOG("Cannot allocate memory");
797                 //FIXME wrong err handling
798                 goto err_unmap;
799         }
800         memset(priv->peer_type_data, 0, sizeof(void *) * xseg->max_peer_types);
801
802         xseg->priv = priv;
803         xseg->config = __xseg->config;
804         xseg->version = __xseg->version;
805         xseg->request_h = XPTR_TAKE(__xseg->request_h, __xseg);
806         xseg->port_h = XPTR_TAKE(__xseg->port_h, __xseg);
807         xseg->ports = XPTR_TAKE(__xseg->ports, __xseg);
808         xseg->path_next = XPTR_TAKE(__xseg->path_next, __xseg);
809         xseg->dst_gw = XPTR_TAKE(__xseg->dst_gw, __xseg);
810         xseg->heap = XPTR_TAKE(__xseg->heap, __xseg);
811         xseg->object_handlers = XPTR_TAKE(__xseg->object_handlers, __xseg);
812         xseg->shared = XPTR_TAKE(__xseg->shared, __xseg);
813         xseg->segment_size = size;
814         xseg->segment = __xseg;
815         __sync_synchronize();
816
817         r = xseg_validate_pointers(xseg);
818         if (r) {
819                 XSEGLOG("found %d invalid xseg pointers!\n", r);
820                 goto err_free_types;
821         }
822
823         /* Do we need this?
824         r = xops->signal_join(xseg);
825         if (r) {
826                 XSEGLOG("Cannot attach signaling to segment! (error: %d)\n", r);
827                 goto err_free_types;
828         }
829         */
830
831         return xseg;
832
833 err_free_types:
834         pops->mfree(priv->peer_types);
835 err_unmap:
836         xops->unmap(__xseg, size);
837         xhash_free(priv->req_data);
838 err_priv:
839         pops->mfree(priv);
840 err_seg:
841         pops->mfree(xseg);
842 err:
843         return NULL;
844 }
845
846 void xseg_leave(struct xseg *xseg)
847 {
848         struct xseg_type *type;
849
850         __lock_domain();
851         type = __find_or_load_type(xseg->config.type);
852         if (!type) {
853                 XSEGLOG("no segment type '%s'\n", xseg->config.type);
854                 __unlock_domain();
855                 return;
856         }
857         __unlock_domain();
858
859         type->ops.unmap(xseg->segment, xseg->segment_size);
860         //FIXME free xseg?
861 }
862
863 struct xseg_port* xseg_get_port(struct xseg *xseg, uint32_t portno)
864 {
865         xptr p;
866         if (!__validate_port(xseg, portno))
867                 return NULL;
868         p = xseg->ports[portno];
869         if (p)
870                 return XPTR_TAKE(p, xseg->segment);
871         else 
872                 return NULL;
873 }
874
875 struct xq * __alloc_queue(struct xseg *xseg, uint64_t nr_reqs)
876 {
877         uint64_t bytes;
878         void *mem, *buf;
879         struct xq *q;
880         struct xheap *heap = xseg->heap;
881
882         //how many bytes to allocate for a queue
883         bytes = sizeof(struct xq) + nr_reqs*sizeof(xqindex);
884         mem = xheap_allocate(heap, bytes);
885         if (!mem)
886                 return NULL;
887
888         //how many bytes did we got, and calculate what's left of buffer
889         bytes = xheap_get_chunk_size(mem) - sizeof(struct xq);
890
891         //initialize queue with max nr of elements it can hold
892         q = (struct xq *) mem;
893         buf = (void *) (((unsigned long) mem) + sizeof(struct xq));
894         xq_init_empty(q, bytes/sizeof(xqindex), buf); 
895
896         return q;
897 }
898
899 //FIXME
900 //maybe add parameters of initial free_queue size and max_alloc_reqs
901 struct xseg_port *xseg_alloc_port(struct xseg *xseg, uint32_t flags, uint64_t nr_reqs)
902 {
903         struct xq *q;
904         struct xobject_h *obj_h = xseg->port_h;
905         struct xseg_port *port = xobj_get_obj(obj_h, flags);
906         if (!port)
907                 return NULL;
908
909         //alloc free queue
910         q = __alloc_queue(xseg, nr_reqs);
911         if (!q)
912                 goto err_free;
913         port->free_queue = XPTR_MAKE(q, xseg->segment);
914
915         //and for request queue
916         q = __alloc_queue(xseg, nr_reqs);
917         if (!q)
918                 goto err_req;
919         port->request_queue = XPTR_MAKE(q, xseg->segment);
920
921         //and for reply queue
922         q = __alloc_queue(xseg, nr_reqs);
923         if (!q)
924                 goto err_reply;
925         port->reply_queue = XPTR_MAKE(q, xseg->segment);
926
927         xlock_release(&port->fq_lock);
928         xlock_release(&port->rq_lock);
929         xlock_release(&port->pq_lock);
930         xlock_release(&port->port_lock);
931         port->owner = Noone;
932         port->portno = NoPort;
933         port->peer_type = 0; //FIXME what  here ??? NoType??
934         port->alloc_reqs = 0;
935         port->max_alloc_reqs = XSEG_DEF_MAX_ALLOCATED_REQS;
936         port->flags = 0;
937
938
939         return port;
940
941 err_reply:
942         xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
943         port->request_queue = 0;
944 err_req:
945         xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
946         port->free_queue = 0;
947 err_free:
948         xobj_put_obj(obj_h, port);
949
950         return NULL;
951 }
952
953 void xseg_free_port(struct xseg *xseg, struct xseg_port *port)
954 {
955         struct xobject_h *obj_h = xseg->port_h;
956
957         if (port->request_queue) {
958                 xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
959                 port->request_queue = 0;
960         }
961         if (port->free_queue) {
962                 xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
963                 port->free_queue = 0;
964         }
965         if (port->reply_queue) {
966                 xheap_free(XPTR_TAKE(port->reply_queue, xseg->segment));
967                 port->reply_queue = 0;
968         }
969         xobj_put_obj(obj_h, port);
970 }
971
972 void* xseg_alloc_buffer(struct xseg *xseg, uint64_t size)
973 {
974         struct xheap *heap = xseg->heap;
975         void *mem = xheap_allocate(heap, size);
976         if (mem && xheap_get_chunk_size(mem) < size) {
977                 XSEGLOG("Buffer size %llu instead of %llu\n", 
978                                 xheap_get_chunk_size(mem), size);
979                 xheap_free(mem);
980                 mem = NULL;
981         }
982         return mem;
983 }
984
985 void xseg_free_buffer(struct xseg *xseg, void *ptr)
986 {
987         xheap_free(ptr);
988 }
989
990 int xseg_prepare_wait(struct xseg *xseg, uint32_t portno)
991 {
992         if (!__validate_port(xseg, portno))
993                 return -1;
994
995         return xseg->priv->peer_type.peer_ops.prepare_wait(xseg, portno);
996 }
997
998 int xseg_cancel_wait(struct xseg *xseg, uint32_t portno)
999 {
1000         if (!__validate_port(xseg, portno))
1001                 return -1;
1002         return xseg->priv->peer_type.peer_ops.cancel_wait(xseg, portno);
1003 }
1004
1005 int xseg_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
1006 {
1007         return xseg->priv->peer_type.peer_ops.wait_signal(xseg, usec_timeout);
1008 }
1009
1010 int xseg_signal(struct xseg *xseg, xport portno)
1011 {
1012         struct xseg_peer *type;
1013         struct xseg_port *port = xseg_get_port(xseg, portno);
1014         if (!port)
1015                 return -1;
1016         
1017         type = __get_peer_type(xseg, port->peer_type);
1018         if (!type)
1019                 return -1;
1020
1021         return type->peer_ops.signal(xseg, portno);
1022 }
1023
1024 int xseg_init_local_signal(struct xseg *xseg, xport portno)
1025 {
1026         struct xseg_peer *type;
1027         struct xseg_port *port = xseg_get_port(xseg, portno);
1028         if (!port)
1029                 return -1;
1030         
1031         type = __get_peer_type(xseg, port->peer_type);
1032         if (!type)
1033                 return -1;
1034
1035         return type->peer_ops.local_signal_init(xseg, portno);
1036 }
1037
1038 void xseg_quit_local_signal(struct xseg *xseg, xport portno)
1039 {
1040         struct xseg_peer *type;
1041         struct xseg_port *port = xseg_get_port(xseg, portno);
1042         if (!port)
1043                 return;
1044         
1045         type = __get_peer_type(xseg, port->peer_type);
1046         if (!type)
1047                 return;
1048
1049         type->peer_ops.local_signal_quit(xseg, portno);
1050 }
1051
1052 //FIXME doesn't increase alloced reqs
1053 //is integer i enough here?
1054 int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr)
1055 {
1056         int i = 0;
1057         xqindex xqi;
1058         struct xq *q;
1059         struct xseg_request *req;
1060         struct xseg_port *port = xseg_get_port(xseg, portno);
1061         if (!port)
1062                 return -1;
1063
1064         xlock_acquire(&port->fq_lock, portno);
1065         q = XPTR_TAKE(port->free_queue, xseg->segment);
1066         while ((req = xobj_get_obj(xseg->request_h, X_ALLOC)) != NULL && i < nr) {
1067                 xqi = XPTR_MAKE(req, xseg->segment);
1068                 xqi = __xq_append_tail(q, xqi);
1069                 if (xqi == Noneidx) {
1070                         xobj_put_obj(xseg->request_h, req);
1071                         break;
1072                 }
1073                 i++;
1074         }
1075         xlock_release(&port->fq_lock);
1076
1077         if (i == 0)
1078                 i = -1;
1079         return i;
1080 }
1081
1082 int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
1083 {
1084         int i = 0;
1085         xqindex xqi;
1086         struct xq *q;
1087         struct xseg_request *req;
1088         struct xseg_port *port = xseg_get_port(xseg, portno);
1089         if (!port)
1090                 return -1;
1091
1092         xlock_acquire(&port->fq_lock, portno);
1093         q = XPTR_TAKE(port->free_queue, xseg->segment);
1094         while ((xqi = __xq_pop_head(q)) != Noneidx && i < nr) {
1095                 req = XPTR_TAKE(xqi, xseg->segment);
1096                 xobj_put_obj(xseg->request_h, (void *) req);
1097                 i++;
1098         }
1099         if (i == 0)
1100                 return -1;
1101         xlock_release(&port->fq_lock);
1102
1103         xlock_acquire(&port->port_lock, portno);
1104         port->alloc_reqs -= i;
1105         xlock_release(&port->port_lock);
1106
1107         return i;
1108 }
1109
1110 int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
1111                         uint32_t src_portno, uint32_t dst_portno)
1112 {
1113         if (!__validate_port(xseg, src_portno))
1114                 return -1;
1115
1116         if (!__validate_port(xseg, dst_portno))
1117                 return -1;
1118
1119         xreq->src_portno = src_portno;
1120         xreq->transit_portno = src_portno;
1121         xreq->dst_portno = dst_portno;
1122         xreq->effective_dst_portno = dst_portno;
1123
1124         return 0;
1125 }
1126
1127 struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
1128                                         xport dst_portno, uint32_t flags)
1129 {
1130         /*
1131          * Flags:
1132          * X_ALLOC Allocate more requests if object handler
1133          *         does not have any avaiable
1134          * X_LOCAL Use only local - preallocated reqs
1135          *         (Maybe we want this as default, to give a hint to a peer
1136          *          how many requests it can have flying)
1137          */
1138         struct xseg_request *req = NULL;
1139         struct xseg_port *port;
1140         struct xq *q;
1141         xqindex xqi;
1142         xptr ptr;
1143
1144         port = xseg_get_port(xseg, src_portno);
1145         if (!port)
1146                 return NULL;
1147         //try to allocate from free_queue
1148         xlock_acquire(&port->fq_lock, src_portno);
1149         q = XPTR_TAKE(port->free_queue, xseg->segment);
1150         xqi = __xq_pop_head(q);
1151         if (xqi != Noneidx){
1152                 xlock_release(&port->fq_lock);
1153                 ptr = xqi;
1154                 req = XPTR_TAKE(ptr, xseg->segment);
1155                 goto done;
1156         }
1157         xlock_release(&port->fq_lock);
1158
1159         if (flags & X_LOCAL)
1160                 return NULL;
1161
1162         //else try to allocate from global heap
1163         //FIXME
1164         xlock_acquire(&port->port_lock, src_portno);
1165         if (port->alloc_reqs < port->max_alloc_reqs) {
1166                 req = xobj_get_obj(xseg->request_h, flags & X_ALLOC);
1167                 if (req)
1168                         port->alloc_reqs++;
1169         }
1170         xlock_release(&port->port_lock);
1171         if (!req)
1172                 return NULL;
1173
1174 done:
1175
1176         req->buffer = 0;
1177         req->bufferlen = 0;
1178         req->target = 0;
1179         req->data = 0;
1180         req->datalen = 0;
1181         req->targetlen = 0;
1182         if (xseg_prep_ports(xseg, req, src_portno, dst_portno) < 0) {
1183                 xseg_put_request(xseg, req, src_portno);
1184                 return NULL;
1185         }
1186         req->state = 0;
1187         req->elapsed = 0;
1188         req->timestamp.tv_sec = 0;
1189         req->timestamp.tv_usec = 0;
1190         req->flags = 0;
1191
1192         xq_init_empty(&req->path, MAX_PATH_LEN, req->path_bufs); 
1193
1194         return req;
1195 }
1196
1197 //add flags
1198 //do not put request if path not empty or X_FORCE set
1199 int xseg_put_request (struct xseg *xseg, struct xseg_request *xreq,
1200                         xport portno)
1201 {
1202         xqindex xqi = XPTR_MAKE(xreq, xseg->segment);
1203         struct xq *q;
1204         struct xseg_port *port = xseg_get_port(xseg, xreq->src_portno);
1205         if (!port) 
1206                 return -1;
1207
1208         if (xreq->buffer){
1209                 void *ptr = XPTR_TAKE(xreq->buffer, xseg->segment);
1210                 xseg_free_buffer(xseg, ptr);
1211         }
1212         /* empty path */
1213         xq_init_empty(&xreq->path, MAX_PATH_LEN, xreq->path_bufs); 
1214         
1215         xreq->buffer = 0;
1216         xreq->bufferlen = 0;
1217         xreq->target = 0;
1218         xreq->data = 0;
1219         xreq->datalen = 0;
1220         xreq->targetlen = 0;
1221         xreq->state = 0;
1222         xreq->src_portno = NoPort;
1223         xreq->dst_portno = NoPort;
1224         xreq->transit_portno = NoPort;
1225         xreq->effective_dst_portno = NoPort;    
1226         
1227         if (xreq->elapsed != 0) {
1228                 __lock_segment(xseg);
1229                 ++(xseg->counters.req_cnt);
1230                 xseg->counters.avg_req_lat += xreq->elapsed;
1231                 __unlock_segment(xseg);
1232         }
1233
1234
1235         //try to put it in free_queue of the port
1236         xlock_acquire(&port->fq_lock, portno);
1237         q = XPTR_TAKE(port->free_queue, xseg->segment);
1238         xqi = __xq_append_head(q, xqi);
1239         xlock_release(&port->fq_lock);
1240         if (xqi != Noneidx)
1241                 return 0;
1242         //else return it to segment
1243         xobj_put_obj(xseg->request_h, (void *) xreq);
1244         xlock_acquire(&port->port_lock, portno);
1245         port->alloc_reqs--;
1246         xlock_release(&port->port_lock);
1247         return 0;
1248 }
1249
1250 int xseg_prep_request ( struct xseg* xseg, struct xseg_request *req,
1251                         uint32_t targetlen, uint64_t datalen )
1252 {
1253         uint64_t bufferlen = targetlen + datalen;
1254         void *buf;
1255         req->buffer = 0;
1256         req->bufferlen = 0;
1257         buf = xseg_alloc_buffer(xseg, bufferlen);
1258         if (!buf)
1259                 return -1;
1260         req->bufferlen = xheap_get_chunk_size(buf);
1261         req->buffer = XPTR_MAKE(buf, xseg->segment);
1262         
1263         req->data = req->buffer;
1264         req->target = req->buffer + req->bufferlen - targetlen;
1265         req->datalen = datalen;
1266         req->targetlen = targetlen;
1267         return 0;
1268 }
1269
1270 int xseg_resize_request (struct xseg *xseg, struct xseg_request *req,
1271                         uint32_t new_targetlen, uint64_t new_datalen)
1272 {
1273         if (req->bufferlen >= new_datalen + new_targetlen) {
1274                 req->data = req->buffer;
1275                 req->target = req->buffer + req->bufferlen - new_targetlen;
1276                 req->datalen = new_datalen;
1277                 req->targetlen = new_targetlen;
1278                 return 0;
1279         }
1280
1281         if (req->buffer){
1282                 void *ptr = XPTR_TAKE(req->buffer, xseg->segment);
1283                 xseg_free_buffer(xseg, ptr);
1284         }
1285         req->buffer = 0;
1286         req->bufferlen = 0;
1287         return xseg_prep_request(xseg, req, new_targetlen, new_datalen);
1288 }
1289
1290 static void __update_timestamp(struct xseg_request *xreq)
1291 {
1292         struct timeval tv;
1293
1294         __get_current_time(&tv);
1295         if (xreq->timestamp.tv_sec != 0)
1296                 /*
1297                  * FIXME: Make xreq->elapsed timeval/timespec again to avoid the
1298                  *                multiplication?
1299                  */
1300                 xreq->elapsed += (tv.tv_sec - xreq->timestamp.tv_sec) * 1000000 
1301                                                 + (tv.tv_usec - xreq->timestamp.tv_usec);
1302
1303         xreq->timestamp.tv_sec = tv.tv_sec;
1304         xreq->timestamp.tv_usec = tv.tv_usec;
1305 }
1306
1307 //FIXME should we add NON_BLOCK flag?
1308 xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
1309                         xport portno, uint32_t flags)
1310 {
1311         xserial serial = NoSerial;
1312         xqindex xqi, r;
1313         struct xq *q, *newq;
1314         xport next, cur;
1315         struct xseg_port *port;
1316
1317         /* discover where to submit */
1318
1319         if (!__validate_port(xseg, xreq->transit_portno)){
1320                 XSEGLOG("Couldn't validate transit_portno (portno: %lu)",
1321                                 xreq->transit_portno);
1322                 return NoPort;
1323         }
1324         if (!__validate_port(xseg, xreq->effective_dst_portno)){
1325                 XSEGLOG("Couldn't validate effective_dst_portno (portno: %lu)",
1326                                 xreq->effective_dst_portno);
1327                 return NoPort;
1328         }
1329
1330         cur = xreq->transit_portno;
1331         next = cur;
1332         //FIXME assert(cur == portno);
1333         do {
1334                 if (next == xreq->effective_dst_portno){
1335                         XSEGLOG("Path ended with no one willing to accept");
1336                         return NoPort;
1337                 }
1338
1339                 if (xseg->path_next[next] != NoPort){
1340                         next = xseg->path_next[next];
1341                 } else {
1342                         next = xreq->effective_dst_portno;
1343                 }
1344
1345                 port = xseg_get_port(xseg, next);
1346                 if (!port){
1347                         XSEGLOG("Couldnt get port (next :%u)", next);
1348                         return NoPort;
1349                 }
1350         } while ((!port->flags & CAN_ACCEPT));
1351
1352         /* submit */
1353
1354         //__update_timestamp(xreq);
1355
1356         xqi = XPTR_MAKE(xreq, xseg->segment);
1357
1358         /* add current port to path */
1359         serial = __xq_append_head(&xreq->path, cur);
1360         if (serial == Noneidx){
1361                 XSEGLOG("Couldn't append path head");
1362                 return NoPort;
1363         }
1364
1365         xlock_acquire(&port->rq_lock, portno);
1366         q = XPTR_TAKE(port->request_queue, xseg->segment);
1367         serial = __xq_append_tail(q, xqi);
1368         if (flags & X_ALLOC && serial == Noneidx) {
1369                 /* double up queue size */
1370                 XSEGLOG("Trying to double up queue");
1371                 newq = __alloc_queue(xseg, xq_size(q)*2);
1372                 if (!newq)
1373                         goto out_rel;
1374                 r = __xq_resize(q, newq);
1375                 if (r == Noneidx){
1376                         xheap_free(newq);
1377                         goto out_rel;
1378                 }
1379                 port->request_queue = XPTR_MAKE(newq, xseg->segment);
1380                 xheap_free(q);
1381                 serial = __xq_append_tail(newq, xqi);
1382         }
1383
1384 out_rel:
1385         xlock_release(&port->rq_lock);
1386         if (serial == Noneidx){
1387                 XSEGLOG("Couldn't append request to queue");
1388                 __xq_pop_head(&xreq->path);
1389                 next = NoPort;
1390         }
1391         return next;
1392
1393 }
1394
1395 struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags)
1396 {
1397         xqindex xqi;
1398         xserial serial = NoSerial;
1399         struct xq *q;
1400         struct xseg_request *req;
1401         struct xseg_port *port = xseg_get_port(xseg, portno);
1402         if (!port)
1403                 return NULL;
1404 retry:
1405         if (flags & X_NONBLOCK) {
1406                 if (!xlock_try_lock(&port->pq_lock, portno))
1407                         return NULL;
1408         } else {
1409                 xlock_acquire(&port->pq_lock, portno);
1410         }
1411         q = XPTR_TAKE(port->reply_queue, xseg->segment);
1412         xqi = __xq_pop_head(q);
1413         xlock_release(&port->pq_lock);
1414
1415         if (xqi == Noneidx)
1416                 return NULL;
1417
1418         req = XPTR_TAKE(xqi, xseg->segment);
1419 //      __update_timestamp(req);
1420         serial = __xq_pop_head(&req->path);
1421         if (serial == Noneidx){
1422                 /* this should never happen */
1423                 XSEGLOG("pop head of path queue returned Noneidx\n");
1424                 goto retry;
1425         }
1426
1427
1428         return req;
1429 }
1430
1431 struct xseg_request *xseg_accept(struct xseg *xseg, xport portno, uint32_t flags)
1432 {
1433         xqindex xqi;
1434         struct xq *q;
1435         struct xseg_request *req;
1436         struct xseg_port *port = xseg_get_port(xseg, portno);
1437         if (!port)
1438                 return NULL;
1439         if (flags & X_NONBLOCK) {
1440                 if (!xlock_try_lock(&port->rq_lock, portno))
1441                         return NULL;
1442         } else {
1443                 xlock_acquire(&port->rq_lock, portno);
1444         }
1445
1446         q = XPTR_TAKE(port->request_queue, xseg->segment);
1447         xqi = __xq_pop_head(q);
1448         xlock_release(&port->rq_lock);
1449         if (xqi == Noneidx)
1450                 return NULL;
1451
1452         req = XPTR_TAKE(xqi, xseg->segment);
1453         req->transit_portno = portno;
1454
1455         return req;
1456 }
1457
1458 //FIXME should we add NON_BLOCK flag?
1459 xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
1460                         xport portno, uint32_t flags)
1461 {
1462         xserial serial = NoSerial;
1463         xqindex xqi, r;
1464         struct xq *q, *newq;
1465         struct xseg_port *port;
1466         xport dst;
1467
1468 retry:
1469         serial = __xq_peek_head(&xreq->path);
1470         if (serial == Noneidx)
1471                 return NoPort;
1472         dst = (xport) serial;
1473
1474         port = xseg_get_port(xseg, dst);
1475         if (!port)
1476                 return NoPort;
1477         if (!(port->flags & CAN_RECEIVE)){
1478                 //XSEGLOG("Port %u cannot receive", dst);
1479                 /* Port cannot receive. Try next one in path */
1480                 __xq_pop_head(&xreq->path);
1481                 goto retry;
1482         }
1483
1484         xqi = XPTR_MAKE(xreq, xseg->segment);
1485
1486         xlock_acquire(&port->pq_lock, portno);
1487         q = XPTR_TAKE(port->reply_queue, xseg->segment);
1488         serial = __xq_append_tail(q, xqi);
1489         if (flags & X_ALLOC && serial == Noneidx) {
1490                 newq = __alloc_queue(xseg, xq_size(q)*2);
1491                 if (!newq)
1492                         goto out_rel;
1493                 r = __xq_resize(q, newq);
1494                 if (r == Noneidx) {
1495                         xheap_free(newq);
1496                         goto out_rel;
1497                 }
1498                 port->reply_queue = XPTR_MAKE(newq, xseg->segment);
1499                 xheap_free(q);
1500                 serial = __xq_append_tail(newq, xqi);
1501         }
1502
1503 out_rel:
1504         xlock_release(&port->pq_lock);
1505
1506         if (serial == Noneidx)
1507                 dst = NoPort;
1508         return dst;
1509         
1510 }
1511
1512 xport xseg_forward(struct xseg *xseg, struct xseg_request *req, xport new_dst,
1513                 xport portno, uint32_t flags)
1514 {
1515         if (!__validate_port(xseg, new_dst)){
1516                 XSEGLOG("Couldn't validate new destination (new_dst %lu)",
1517                                 new_dst);
1518                 return NoPort;
1519         }
1520         req->effective_dst_portno = new_dst;
1521         return xseg_submit(xseg, req, portno, flags);
1522
1523 }
1524
1525 int xseg_set_path_next(struct xseg *xseg, xport portno, xport next)
1526 {
1527         if (!__validate_port(xseg, portno))
1528                 return -1;
1529         if (!__validate_port(xseg, next))
1530                 return -1;
1531         xseg->path_next[portno] = next;
1532         return 0;
1533 }
1534
1535 int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data)
1536 {
1537         int r;
1538         xhash_t *req_data;
1539         
1540         xlock_acquire(&xseg->priv->reqdatalock, 1);
1541
1542         req_data = xseg->priv->req_data;
1543         r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1544         if (r == -XHASH_ERESIZE) {
1545                 req_data = xhash_resize(req_data, xhash_grow_size_shift(req_data), NULL);
1546                 if (req_data) {
1547                         xseg->priv->req_data = req_data;
1548                         r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1549                 }
1550         }
1551
1552         xlock_release(&xseg->priv->reqdatalock);
1553         return r;
1554 }
1555
1556 int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data)
1557 {
1558         int r;
1559         xhashidx val;
1560         xhash_t *req_data;
1561         
1562         xlock_acquire(&xseg->priv->reqdatalock, 1);
1563
1564         req_data = xseg->priv->req_data;
1565         //maybe we need a xhash_delete with lookup...
1566         //maybe we also need a delete that doesn't shrink xhash
1567         r = xhash_lookup(req_data, (xhashidx) xreq, &val);
1568         *data = (void *) val;
1569         if (r >= 0) {
1570                 r = xhash_delete(req_data, (xhashidx) xreq);
1571                 if (r == -XHASH_ERESIZE) {
1572                         req_data = xhash_resize(req_data, xhash_shrink_size_shift(req_data), NULL);
1573                         if (req_data){
1574                                 xseg->priv->req_data = req_data;
1575                                 r = xhash_delete(req_data, (xhashidx) xreq);
1576                         }
1577                 }
1578         }
1579
1580         xlock_release(&xseg->priv->reqdatalock);
1581         return r;
1582 }
1583
1584 struct xobject_h * xseg_get_objh(struct xseg *xseg, uint32_t magic, uint64_t size)
1585 {
1586         int r;
1587         struct xobject_h *obj_h = xobj_get_obj(xseg->object_handlers, X_ALLOC);
1588         if (!obj_h)
1589                 return NULL;
1590         r = xobj_handler_init(obj_h, xseg->segment, magic, size, xseg->heap);
1591         if (r < 0) {
1592                 xobj_put_obj(xseg->object_handlers, obj_h);
1593                 return NULL;
1594         }
1595         return obj_h;
1596 }
1597
1598 void xseg_put_objh(struct xseg *xseg, struct xobject_h *objh)
1599 {
1600         xobj_put_obj(xseg->object_handlers, objh);
1601 }
1602
1603
1604 /*
1605 int xseg_complete_req(struct xseg_request *req)
1606 {
1607         req->state |= XS_SERVED;
1608         req->state &= ~XS_FAILED;
1609 }
1610
1611 int xseg_fail_req(struct xseg_request *req)
1612 {
1613         req->state &= ~XS_SERVED;
1614         req->state |= XS_FAILED;
1615 }
1616 */
1617
1618 struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req, void * sd)
1619 {
1620         uint32_t portno, maxno, id = __get_id(), force;
1621         struct xseg_port *port = NULL;
1622         void *peer_data, *sigdesc;
1623         int64_t driver;
1624         int r;
1625
1626         if (req >= xseg->config.nr_ports) {
1627                 portno = 0;
1628                 maxno = xseg->config.nr_ports;
1629                 force = 0;
1630         } else {
1631                 portno = req;
1632                 maxno = req + 1;
1633                 force = 1;
1634         }
1635
1636         __lock_segment(xseg);
1637         for (; portno < maxno; portno++) {
1638                 if (!xseg->ports[portno]) {
1639                         port = xseg_alloc_port(xseg, X_ALLOC, XSEG_DEF_REQS);
1640                         if (!port)
1641                                 goto out;
1642                 } else if (force) {
1643                         port = xseg_get_port(xseg, portno);
1644                         if (!port)
1645                                 goto out;       
1646                 } else {
1647                         continue;
1648                 }
1649                 driver = __enable_driver(xseg, &xseg->priv->peer_type);
1650                 if (driver < 0)
1651                         break;
1652                 if (!sd){
1653                         peer_data = __get_peer_type_data(xseg, (uint64_t) driver);
1654                         if (!peer_data)
1655                                 break;
1656                         sigdesc = xseg->priv->peer_type.peer_ops.alloc_signal_desc(xseg, peer_data);
1657                         if (!sigdesc)
1658                                 break;
1659                         r = xseg->priv->peer_type.peer_ops.init_signal_desc(xseg, sigdesc);
1660                         if (r < 0){
1661                                 xseg->priv->peer_type.peer_ops.free_signal_desc(xseg, peer_data, sigdesc);
1662                                 break;
1663                         }
1664                         port->signal_desc = XPTR_MAKE(sigdesc, xseg->segment);
1665                 } else {
1666                         port->signal_desc = XPTR_MAKE(sd, xseg->segment);
1667                 }
1668                 port->peer_type = (uint64_t)driver;
1669                 port->owner = id;
1670                 port->portno = portno;
1671                 port->flags = CAN_ACCEPT | CAN_RECEIVE;
1672                 xseg->ports[portno] = XPTR_MAKE(port, xseg->segment);
1673                 goto out;
1674         }
1675         if (port) {
1676                 xseg_free_port(xseg, port);
1677                 xseg->ports[portno] = 0;
1678                 port = NULL;
1679         }
1680 out:
1681         __unlock_segment(xseg);
1682         return port;
1683 }
1684
1685 /*
1686  * set the limit of requests, a port can allocate.
1687  *
1688  * this limit should be greater than the number of requests a port can cache
1689  * locally on its free_queue, and less than the hard limit imposed by the
1690  * segment.
1691  *
1692  * maybe make it drop excess requests
1693  */
1694 int xseg_set_max_requests(struct xseg *xseg, xport portno, uint64_t nr_reqs)
1695 {
1696         int r = -1;
1697         struct xseg_port *port;
1698         struct xq *q;
1699         if (nr_reqs > XSEG_MAX_ALLOCATED_REQS)
1700                 return -1;
1701
1702         port = xseg_get_port(xseg, portno);
1703         if (!port)
1704                 return -1;
1705
1706         xlock_acquire(&port->fq_lock, portno);
1707         q = XPTR_TAKE(port->free_queue, xseg->segment);
1708         if (xq_size(q) <= nr_reqs){
1709                 port->max_alloc_reqs = nr_reqs;
1710                 r = 0;
1711         }
1712         xlock_release(&port->fq_lock);
1713
1714         /* no lock here 
1715          * if theres is a get_request in progress, it is not critical to enforce
1716          * the new limit.
1717          */
1718         port->max_alloc_reqs = nr_reqs;
1719         return r;
1720 }
1721
1722 uint64_t xseg_get_max_requests(struct xseg *xseg, xport portno)
1723 {
1724         struct xseg_port *port = xseg_get_port(xseg, portno);
1725         if (!port)
1726                 return -1;
1727         return port->max_alloc_reqs;
1728 }
1729
1730 uint64_t xseg_get_allocated_requests(struct xseg *xseg, xport portno)
1731 {
1732         struct xseg_port *port = xseg_get_port(xseg, portno);
1733         if (!port)
1734                 return -1;
1735         return port->alloc_reqs;
1736 }
1737
1738 /*
1739  * set free_queue size, aka the local "cached" requests a port can have
1740  * it should be smaller than port->max_alloc_reqs?
1741  *
1742  */
1743 int xseg_set_freequeue_size(struct xseg *xseg, xport portno, xqindex size,
1744                                 uint32_t flags)
1745 {
1746         int ret = 0;
1747         xqindex xqi,r;
1748         struct xq *q, *newq;
1749         struct xseg_request *xreq;
1750         struct xseg_port *port = xseg_get_port(xseg, portno);
1751         if (!port)
1752                 return -1;
1753
1754         newq = __alloc_queue(xseg, size);
1755         if (!newq)
1756                 return -1;
1757
1758         if (flags & X_NONBLOCK) {
1759                 if (!xlock_try_lock(&port->fq_lock, portno))
1760                         return -1;
1761         } else {
1762                 xlock_acquire(&port->fq_lock, portno);
1763         }
1764
1765         q = XPTR_TAKE(port->free_queue, xseg->segment);
1766
1767         /* put requests that don't fit in the new queue */
1768         while (xq_count(q) > xq_size(newq)){
1769                 xqi = __xq_pop_head(q);
1770                 if (xqi != Noneidx){
1771                         xreq = XPTR_TAKE(xqi, xseg->segment);
1772                         xobj_put_obj(xseg->request_h, (void *) xreq);
1773                 }
1774         }
1775
1776         r = __xq_resize(q, newq);
1777         if (r == Noneidx){
1778                 xheap_free(newq);
1779                 ret = -1;
1780                 goto out_rel;
1781         }
1782         port->free_queue = XPTR_MAKE(newq, xseg->segment);
1783         xheap_free(q);
1784         ret = 0;
1785
1786 out_rel:
1787         xlock_release(&port->fq_lock);
1788         return ret;
1789 }
1790
1791 int xseg_leave_port(struct xseg *xseg, struct xseg_port *port)
1792 {
1793         /* To be implemented */
1794         return -1;
1795 }
1796
1797 int xseg_initialize(void)
1798 {
1799         return __xseg_preinit();        /* with or without lock ? */
1800 }
1801
1802 int xseg_finalize(void)
1803 {
1804         /* finalize not supported yet */
1805         return -1;
1806 }
1807
1808
1809 char* xseg_get_data_nonstatic(struct xseg* xseg, struct xseg_request *req)
1810 {
1811         return xseg_get_data(xseg, req);
1812 }
1813
1814 char* xseg_get_target_nonstatic(struct xseg* xseg, struct xseg_request *req)
1815 {
1816         return xseg_get_target(xseg, req);
1817 }
1818
1819
1820 #ifdef __KERNEL__
1821 #include <linux/module.h>
1822 #include <xseg/xseg_exports.h>
1823 #endif
1824