8bb62209c38b1ac66eef57ed40266f43b193c9b7
[archipelago] / xseg / xseg / xseg.c
1 #include <xseg/xseg.h>
2 #include <xseg/domain.h>
3 #include <sys/util.h>
4
5 #ifndef NULL
6 #define NULL ((void *)0)
7 #endif
8
9 #define XSEG_NR_TYPES 16
10 #define XSEG_NR_PEER_TYPES 64
11 #define XSEG_MIN_PAGE_SIZE 4096
12
13 static struct xseg_type *__types[XSEG_NR_TYPES];
14 static unsigned int __nr_types;
15 static struct xseg_peer *__peer_types[XSEG_NR_PEER_TYPES];
16 static unsigned int __nr_peer_types;
17
18 static void __lock_segment(struct xseg *xseg)
19 {
20         volatile uint64_t *flags;
21         flags = &xseg->shared->flags;
22         while (__sync_fetch_and_or(flags, XSEG_F_LOCK));
23 }
24
25 static void __unlock_segment(struct xseg *xseg)
26 {
27         volatile uint64_t *flags;
28         flags = &xseg->shared->flags;
29         __sync_fetch_and_and(flags, ~XSEG_F_LOCK);
30 }
31
32 static struct xseg_type *__find_type(const char *name, long *index)
33 {
34         long i;
35         for (i = 0; (*index = i) < __nr_types; i++)
36                 if (!strncmp(__types[i]->name, name, XSEG_TNAMESIZE))
37                         return __types[i];
38         return NULL;
39 }
40
41 static struct xseg_peer *__find_peer_type(const char *name, int64_t *index)
42 {
43         int64_t i;
44         for (i = 0; (*index = i) < __nr_peer_types; i++) {
45                 if (!strncmp(__peer_types[i]->name, name, XSEG_TNAMESIZE))
46                         return __peer_types[i];
47         }
48         return NULL;
49 }
50
51 void xseg_report_peer_types(void)
52 {
53         long i;
54         XSEGLOG("total %u peer types:\n", __nr_peer_types);
55         for (i = 0; i < __nr_peer_types; i++)
56                 XSEGLOG("%ld: '%s'\n", i, __peer_types[i]->name);
57 }
58
59 static struct xseg_type *__find_or_load_type(const char *name)
60 {
61         long i;
62         struct xseg_type *type = __find_type(name, &i);
63         if (type)
64                 return type;
65
66         __load_plugin(name);
67         return __find_type(name, &i);
68 }
69
70 static struct xseg_peer *__find_or_load_peer_type(const char *name)
71 {
72         int64_t i;
73         struct xseg_peer *peer_type = __find_peer_type(name, &i);
74         if (peer_type)
75                 return peer_type;
76
77         __load_plugin(name);
78         return __find_peer_type(name, &i);
79 }
80
81 static struct xseg_peer *__get_peer_type(struct xseg *xseg, uint32_t serial)
82 {
83         char *name;
84         struct xseg_peer *type;
85         struct xseg_private *priv = xseg->priv;
86         char (*shared_peer_types)[XSEG_TNAMESIZE];
87
88         if (serial >= xseg->max_peer_types) {
89                 XSEGLOG("invalid peer type serial %d >= %d\n",
90                          serial, xseg->max_peer_types);
91                 return NULL;
92         }
93
94         type = priv->peer_types[serial];
95         if (type)
96                 return type;
97
98         /* xseg->shared->peer_types is an append-only array,
99          * therefore this should be safe
100          * without either locking or string copying. */
101         shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
102         name = shared_peer_types[serial];
103         if (!*name) {
104                 XSEGLOG("nonexistent peer type serial %d\n", serial);
105                 return NULL;
106         }
107
108         type = __find_or_load_peer_type(name);
109         if (!type)
110                 XSEGLOG("could not find driver for peer type %d [%s]\n",
111                          serial, name);
112
113         priv->peer_types[serial] = type;
114         return type;
115 }
116
117 static void * __get_peer_type_data(struct xseg *xseg, uint32_t serial)
118 {
119         char *name;
120         void *data;
121         struct xseg_private *priv = xseg->priv;
122         char (*shared_peer_types)[XSEG_TNAMESIZE];
123         xptr *shared_peer_type_data;
124
125         if (serial >= xseg->max_peer_types) {
126                 XSEGLOG("invalid peer type serial %d >= %d\n",
127                          serial, xseg->max_peer_types);
128                 return 0;
129         }
130
131         data = priv->peer_type_data[serial];
132         if (data)
133                 return data;
134
135         shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
136         name = shared_peer_types[serial];
137         if (!*name) {
138                 XSEGLOG("nonexistent peer type serial %d\n", serial);
139                 return 0;
140         }
141         shared_peer_type_data = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
142
143         priv->peer_type_data[serial] = XPTR_TAKE(shared_peer_type_data[serial], xseg->segment);
144         return priv->peer_type_data[serial];
145 }
146
147 static inline int __validate_port(struct xseg *xseg, uint32_t portno)
148 {
149         return portno < xseg->config.nr_ports;
150 }
151
152 static inline int __validate_ptr(struct xseg *xseg, xptr ptr)
153 {
154         return ptr < xseg->segment_size;
155 }
156
157 /* type:name:nr_ports:nr_requests:request_size:extra_size:page_shift */
158
159 #define TOK(s, sp, def) \
160         (s) = (sp); \
161         for (;;) { \
162                 switch (*(sp)) { \
163                 case 0: \
164                         s = (def); \
165                         break; \
166                 case ':': \
167                         *(sp)++ = 0; \
168                         break; \
169                 default: \
170                         (sp) ++; \
171                         continue; \
172                 } \
173                 break; \
174         } \
175
176 static unsigned long strul(char *s)
177 {
178         unsigned long n = 0;
179         for (;;) {
180                 unsigned char c = *s - '0';
181                 if (c >= 10)
182                         break;
183                 n = n * 10 + c;
184                 s ++;
185         }
186         return n;
187 }
188
189 /*
190 static char *strncopy(char *dest, const char *src, uint32_t n)
191 {
192         uint32_t i;
193         char c;
194         for (i = 0; i < n; i++) {
195                 c = src[i];
196                 dest[i] = c;
197                 if (!c)
198                         break;
199         }
200         dest[n-1] = 0;
201         return dest;
202 }
203 */
204
205 int xseg_parse_spec(char *segspec, struct xseg_config *config)
206 {
207         /* default: "posix:globalxseg:4:256:12" */
208         char *s = segspec, *sp = segspec;
209
210         /* type */
211         TOK(s, sp, "posix");
212         strncpy(config->type, s, XSEG_TNAMESIZE);
213         config->type[XSEG_TNAMESIZE-1] = 0;
214
215         /* name */
216         TOK(s, sp, "globalxseg");
217         strncpy(config->name, s, XSEG_NAMESIZE);
218         config->name[XSEG_NAMESIZE-1] = 0;
219
220         /* nr_ports */
221         TOK(s, sp, "4");
222         config->nr_ports = strul(s);
223
224         /* heap_size */
225         TOK(s, sp, "256");
226         config->heap_size = (uint64_t) (strul(s) * 1024UL * 1024UL);
227
228         /* page_shift */
229         TOK(s, sp, "12");
230         config->page_shift = strul(s);
231         return 0;
232 }
233
234 int xseg_register_type(struct xseg_type *type)
235 {
236         long i;
237         int r = -1;
238         struct xseg_type *__type;
239         __lock_domain();
240         __type = __find_type(type->name, &i);
241         if (__type) {
242                 XSEGLOG("type %s already exists\n", type->name);
243                 goto out;
244         }
245
246         if (__nr_types >= XSEG_NR_TYPES) {
247                 XSEGLOG("maximum type registrations reached: %u\n", __nr_types);
248                 r -= 1;
249                 goto out;
250         }
251
252         type->name[XSEG_TNAMESIZE-1] = 0;
253         __types[__nr_types] = type;
254         __nr_types += 1;
255         r = 0;
256 out:
257         __unlock_domain();
258         return r;
259 }
260
261 int xseg_unregister_type(const char *name)
262 {
263         long i;
264         int r = -1;
265         struct xseg_type *__type;
266         __lock_domain();
267         __type = __find_type(name, &i);
268         if (!__type) {
269                 XSEGLOG("segment type '%s' does not exist\n", name);
270                 goto out;
271         }
272
273         __nr_types -= 1;
274         __types[i] = __types[__nr_types];
275         __types[__nr_types] = NULL;
276         r = 0;
277 out:
278         __unlock_domain();
279         return r;
280 }
281
282 int xseg_register_peer(struct xseg_peer *peer_type)
283 {
284         int64_t i;
285         int r = -1;
286         struct xseg_peer *type;
287         __lock_domain();
288         type = __find_peer_type(peer_type->name, &i);
289         if (type) {
290                 XSEGLOG("peer type '%s' already exists\n", type->name);
291                 goto out;
292         }
293
294         if (__nr_peer_types >= XSEG_NR_PEER_TYPES) {
295                 XSEGLOG("maximum peer type registrations reached: %u",
296                         __nr_peer_types);
297                 r -= 1;
298                 goto out;
299         }
300
301         if (peer_type->peer_ops.remote_signal_init()) {
302                 XSEGLOG("peer type '%s': signal initialization failed\n",
303                         peer_type->name);
304                 r -= 1;
305                 goto out;
306         }
307
308         peer_type->name[XSEG_TNAMESIZE-1] = 0;
309         __peer_types[__nr_peer_types] = peer_type;
310         __nr_peer_types += 1;
311         r = 0;
312
313 out:
314         __unlock_domain();
315         return r;
316 }
317
318 int xseg_unregister_peer(const char *name)
319 {
320         int64_t i;
321         struct xseg_peer *driver;
322         int r = -1;
323         __lock_domain();
324         driver = __find_peer_type(name, &i);
325         if (!driver) {
326                 XSEGLOG("peer type '%s' does not exist\n", name);
327                 goto out;
328         }
329
330         __nr_peer_types -= 1;
331         __peer_types[i] = __peer_types[__nr_peer_types];
332         __peer_types[__nr_peer_types] = NULL;
333         driver->peer_ops.remote_signal_quit();
334         r = 0;
335 out:
336         __unlock_domain();
337         return r;
338 }
339
340 int64_t __enable_driver(struct xseg *xseg, struct xseg_peer *driver)
341 {
342         int64_t r;
343         char (*drivers)[XSEG_TNAMESIZE];
344         xptr *ptd;
345         uint32_t max_drivers = xseg->max_peer_types;
346         void *data;
347         xptr peer_type_data;
348
349         if (xseg->shared->nr_peer_types >= max_drivers) {
350                 XSEGLOG("cannot register '%s': driver namespace full\n",
351                         driver->name);
352                 return -1;
353         }
354
355         drivers = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
356         for (r = 0; r < max_drivers; r++) {
357                 if (!*drivers[r])
358                         goto bind;
359                 if (!strncmp(drivers[r], driver->name, XSEG_TNAMESIZE)){
360                         data = __get_peer_type_data(xseg, r);
361                         goto success;
362                 }
363         }
364
365         /* Unreachable */
366         return -666;
367
368 bind:
369         /* assert(xseg->shared->nr_peer_types == r); */
370         data = driver->peer_ops.alloc_data(xseg);
371         if (!data)
372                 return -1;
373         peer_type_data = XPTR_MAKE(data, xseg->segment);
374         ptd = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
375         ptd[r] = peer_type_data;
376         xseg->shared->nr_peer_types = r + 1;
377         strncpy(drivers[r], driver->name, XSEG_TNAMESIZE);
378         drivers[r][XSEG_TNAMESIZE-1] = 0;
379
380 success:
381         xseg->priv->peer_types[r] = driver;
382         xseg->priv->peer_type_data[r] = data;
383         return r;
384 }
385
386 int64_t xseg_enable_driver(struct xseg *xseg, const char *name)
387 {
388         int64_t r = -1;
389         struct xseg_peer *driver;
390
391         __lock_domain();
392         driver = __find_peer_type(name, &r);
393         if (!driver) {
394                 XSEGLOG("driver '%s' not found\n", name);
395                 goto out;
396         }
397
398         __lock_segment(xseg);
399         r = __enable_driver(xseg, driver);
400         __unlock_segment(xseg);
401 out:
402         __unlock_domain();
403         return r;
404 }
405
406 int xseg_disable_driver(struct xseg *xseg, const char *name)
407 {
408         int64_t i;
409         int r = -1;
410         struct xseg_private *priv = xseg->priv;
411         struct xseg_peer *driver;
412         __lock_domain();
413         driver =  __find_peer_type(name, &i);
414         if (!driver) {
415                 XSEGLOG("driver '%s' not found\n", name);
416                 goto out;
417         }
418
419         for (i = 0; i < xseg->max_peer_types; i++)
420                 if (priv->peer_types[i] == driver)
421                         priv->peer_types[i] = NULL;
422         r = 0;
423 out:
424         __unlock_domain();
425         return r;
426 }
427
428 /* NOTE: calculate_segment_size() and initialize_segment()
429  * must always be exactly in sync!
430 */
431
432 static uint64_t calculate_segment_size(struct xseg_config *config)
433 {
434         uint64_t size = 0;
435         uint32_t page_size, page_shift = config->page_shift;
436
437         /* assert(sizeof(struct xseg) <= (1 << 9)); */
438
439         if (page_shift < 9) {
440                 XSEGLOG("page_shift must be >= %d\n", 9);
441                 return 0;
442         }
443
444         page_size = 1 << page_shift;
445
446         /* struct xseg itself + struct xheap */
447         size += 2*page_size + config->heap_size;
448         size = __align(size, page_shift);
449         
450         return size;
451 }
452
453 static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
454 {
455         uint32_t page_shift = cfg->page_shift, page_size = 1 << page_shift;
456         struct xseg_shared *shared;
457         char *segment = (char *)xseg;
458         uint64_t size = page_size, i;
459         void *mem;
460         struct xheap *heap;
461         struct xobject_h *obj_h;
462         int r;
463         xptr *ports;
464         xport *gw;
465
466
467         if (page_size < XSEG_MIN_PAGE_SIZE)
468                 return -1;
469
470         xseg->segment_size = 2 * page_size + cfg->heap_size;
471         xseg->segment = (struct xseg *) segment;
472
473         /* build heap */
474         xseg->heap = (struct xheap *) XPTR_MAKE(segment + size, segment);
475         size += sizeof(struct xheap);
476         size = __align(size, page_shift);
477
478         heap = XPTR_TAKE(xseg->heap, segment);
479         r = xheap_init(heap, cfg->heap_size, page_shift, segment+size);
480         if (r < 0)
481                 return -1;
482
483         /* build object_handler handler */
484         mem = xheap_allocate(heap, sizeof(struct xobject_h));
485         if (!mem)
486                 return -1;
487         xseg->object_handlers = (struct xobject_h *) XPTR_MAKE(mem, segment);
488         obj_h = mem;
489         r = xobj_handler_init(obj_h, segment, MAGIC_OBJH, 
490                         sizeof(struct xobject_h), heap);
491         if (r < 0)
492                 return -1;
493
494         //now that we have object handlers handler, use that to allocate
495         //new object handlers
496         
497         //allocate requests handler
498         mem = xobj_get_obj(obj_h, X_ALLOC);
499         if (!mem)
500                 return -1;
501         obj_h = mem;
502         r = xobj_handler_init(obj_h, segment, MAGIC_REQ, 
503                         sizeof(struct xseg_request), heap);
504         if (r < 0)
505                 return -1;
506         xseg->request_h = (struct xobject_h *) XPTR_MAKE(obj_h, segment);
507         
508         //allocate ports handler
509         obj_h = XPTR_TAKE(xseg->object_handlers, segment);
510         mem = xobj_get_obj(obj_h, X_ALLOC);
511         if (!mem)
512                 return -1;
513         obj_h = mem;
514         r = xobj_handler_init(obj_h, segment, MAGIC_PORT, 
515                         sizeof(struct xseg_port), heap);
516         if (r < 0)
517                 return -1;
518         xseg->port_h = (struct xobject_h *) XPTR_MAKE(mem, segment);
519
520         //allocate xptr port array to be used as a map
521         //portno <--> xptr port
522         mem = xheap_allocate(heap, sizeof(xptr)*cfg->nr_ports);
523         if (!mem)
524                 return -1;
525         ports = mem;
526         for (i = 0; i < cfg->nr_ports; i++) {
527                 ports[i]=0;
528         }
529         xseg->ports = (xptr *) XPTR_MAKE(mem, segment);
530
531         //allocate {src,dst} gws
532         mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
533         if (!mem)
534                 return -1;
535         gw = mem;
536         for (i = 0; i < cfg->nr_ports; i++) {
537                 gw[i] = i;
538         }
539         xseg->src_gw = (xport *) XPTR_MAKE(mem, segment);
540
541         mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
542         if (!mem)
543                 return -1;
544         gw = mem;
545         for (i = 0; i < cfg->nr_ports; i++) {
546                 gw[i] = i;
547         }
548         xseg->dst_gw = (xport *) XPTR_MAKE(mem, segment);
549         
550         //allocate xseg_shared memory
551         mem = xheap_allocate(heap, sizeof(struct xseg_shared));
552         if (!mem)
553                 return -1;
554         shared = (struct xseg_shared *) mem;
555         shared->flags = 0;
556         shared->nr_peer_types = 0;
557         xseg->shared = (struct xseg_shared *) XPTR_MAKE(mem, segment);
558         
559         mem = xheap_allocate(heap, page_size);
560         if (!mem)
561                 return -1;
562         shared->peer_types = (char **) XPTR_MAKE(mem, segment);
563         xseg->max_peer_types = xheap_get_chunk_size(mem) / XSEG_TNAMESIZE;
564         mem = xheap_allocate(heap, xseg->max_peer_types * sizeof(xptr));
565         if (!mem)
566                 return -1;
567         memset(mem, 0, xheap_get_chunk_size(mem));
568         shared->peer_type_data = (xptr *) XPTR_MAKE(mem, segment);
569
570         memcpy(&xseg->config, cfg, sizeof(struct xseg_config));
571
572         xseg->counters.req_cnt = 0;
573         xseg->counters.avg_req_lat = 0;
574
575         return 0;
576 }
577
578 int xseg_create(struct xseg_config *cfg)
579 {
580         struct xseg *xseg = NULL;
581         struct xseg_type *type;
582         struct xseg_operations *xops;
583         uint64_t size;
584         long r;
585
586         type = __find_or_load_type(cfg->type);
587         if (!type) {
588                 cfg->type[XSEG_TNAMESIZE-1] = 0;
589                 XSEGLOG("type '%s' does not exist\n", cfg->type);
590                 goto out_err;
591         }
592
593         size = calculate_segment_size(cfg);
594         if (!size) {
595                 XSEGLOG("invalid config!\n");
596                 goto out_err;
597         }
598
599         xops = &type->ops;
600         cfg->name[XSEG_NAMESIZE-1] = 0;
601         XSEGLOG("creating segment of size %llu\n", size);
602         r = xops->allocate(cfg->name, size);
603         if (r) {
604                 XSEGLOG("cannot allocate segment!\n");
605                 goto out_err;
606         }
607
608         xseg = xops->map(cfg->name, size, NULL);
609         if (!xseg) {
610                 XSEGLOG("cannot map segment!\n");
611                 goto out_deallocate;
612         }
613
614         r = initialize_segment(xseg, cfg);
615         xops->unmap(xseg, size);
616         if (r) {
617                 XSEGLOG("cannot initilize segment!\n");
618                 goto out_deallocate;
619         }
620
621
622         return 0;
623
624 out_deallocate:
625         xops->deallocate(cfg->name);
626 out_err:
627         return -1;
628 }
629
630 void xseg_destroy(struct xseg *xseg)
631 {
632         struct xseg_type *type;
633
634         __lock_domain();
635         type = __find_or_load_type(xseg->config.type);
636         if (!type) {
637                 XSEGLOG("no segment type '%s'\n", xseg->config.type);
638                 goto out;
639         }
640
641         /* should destroy() leave() first? */
642         type->ops.deallocate(xseg->config.name);
643 out:
644         __unlock_domain();
645 }
646
647 //FIXME
648 static int pointer_ok(  unsigned long ptr,
649                         unsigned long base,
650                         uint64_t size,
651                         char *name)
652 {
653         int ret = !(ptr >= base && ptr < base + size);
654         if (ret)
655                 XSEGLOG("invalid pointer '->%s' [%llx on %llx]!\n",
656                         (unsigned long long)ptr,
657                         (unsigned long long)base,
658                         name);
659         return ret;
660 }
661
662 #define POINTER_OK(xseg, field, base) \
663          pointer_ok(    (unsigned long)((xseg)->field), \
664                         (unsigned long)(base), \
665                         (xseg)->segment_size, \
666                         #field)
667
668 static int xseg_validate_pointers(struct xseg *xseg)
669 {
670         int r = 0;
671         r += POINTER_OK(xseg, object_handlers, xseg->segment);
672         r += POINTER_OK(xseg, request_h, xseg->segment);
673         r += POINTER_OK(xseg, port_h, xseg->segment);
674         r += POINTER_OK(xseg, ports, xseg->segment);
675         r += POINTER_OK(xseg, heap, xseg->segment);
676         r += POINTER_OK(xseg, shared, xseg->segment);
677         return r;
678 }
679
680 struct xseg *xseg_join( char *segtypename,
681                         char *segname,
682                         char *peertypename,
683                         void (*wakeup)
684                         (       uint32_t portno         ))
685 {
686         struct xseg *xseg, *__xseg;
687         uint64_t size;
688         struct xseg_peer *peertype;
689         struct xseg_type *segtype;
690         struct xseg_private *priv;
691         struct xseg_operations *xops;
692         struct xseg_peer_operations *pops;
693         int r;
694
695         __lock_domain();
696
697         peertype = __find_or_load_peer_type(peertypename);
698         if (!peertype) {
699                 XSEGLOG("Peer type '%s' not found\n", peertypename);
700                 __unlock_domain();
701                 goto err;
702         }
703
704         segtype = __find_or_load_type(segtypename);
705         if (!segtype) {
706                 XSEGLOG("Segment type '%s' not found\n", segtypename);
707                 __unlock_domain();
708                 goto err;
709         }
710
711         __unlock_domain();
712
713         xops = &segtype->ops;
714         pops = &peertype->peer_ops;
715
716         xseg = pops->malloc(sizeof(struct xseg));
717         if (!xseg) {
718                 XSEGLOG("Cannot allocate memory");
719                 goto err;
720         }
721
722         priv = pops->malloc(sizeof(struct xseg_private));
723         if (!priv) {
724                 XSEGLOG("Cannot allocate memory");
725                 goto err_seg;
726         }
727
728         __xseg = xops->map(segname, XSEG_MIN_PAGE_SIZE, NULL);
729         if (!__xseg) {
730                 XSEGLOG("Cannot map segment");
731                 goto err_priv;
732         }
733
734         size = __xseg->segment_size;
735         /* XSEGLOG("joined segment of size: %lu\n", (unsigned long)size); */
736         xops->unmap(__xseg, XSEG_MIN_PAGE_SIZE);
737
738         __xseg = xops->map(segname, size, xseg);
739         if (!__xseg) {
740                 XSEGLOG("Cannot map segment");
741                 goto err_priv;
742         }
743
744         priv->segment_type = *segtype;
745         priv->peer_type = *peertype;
746         priv->wakeup = wakeup;
747         priv->req_data = xhash_new(3, INTEGER); //FIXME should be relative to XSEG_DEF_REQS
748         if (!priv->req_data)
749                 goto err_priv;
750         xlock_release(&priv->reqdatalock);
751
752         xseg->max_peer_types = __xseg->max_peer_types;
753
754         priv->peer_types = pops->malloc(sizeof(void *) * xseg->max_peer_types);
755         if (!priv->peer_types) {
756                 XSEGLOG("Cannot allocate memory");
757                 goto err_unmap;
758         }
759         memset(priv->peer_types, 0, sizeof(void *) * xseg->max_peer_types);
760         priv->peer_type_data = pops->malloc(sizeof(void *) * xseg->max_peer_types);
761         if (!priv->peer_types) {
762                 XSEGLOG("Cannot allocate memory");
763                 //FIXME wrong err handling
764                 goto err_unmap;
765         }
766         memset(priv->peer_type_data, 0, sizeof(void *) * xseg->max_peer_types);
767
768         xseg->priv = priv;
769         xseg->config = __xseg->config;
770         xseg->version = __xseg->version;
771         xseg->request_h = XPTR_TAKE(__xseg->request_h, __xseg);
772         xseg->port_h = XPTR_TAKE(__xseg->port_h, __xseg);
773         xseg->ports = XPTR_TAKE(__xseg->ports, __xseg);
774         xseg->src_gw = XPTR_TAKE(__xseg->src_gw, __xseg);
775         xseg->dst_gw = XPTR_TAKE(__xseg->dst_gw, __xseg);
776         xseg->heap = XPTR_TAKE(__xseg->heap, __xseg);
777         xseg->object_handlers = XPTR_TAKE(__xseg->object_handlers, __xseg);
778         xseg->shared = XPTR_TAKE(__xseg->shared, __xseg);
779         xseg->segment_size = size;
780         xseg->segment = __xseg;
781
782         r = xseg_validate_pointers(xseg);
783         if (r) {
784                 XSEGLOG("found %d invalid xseg pointers!\n", r);
785                 goto err_free_types;
786         }
787
788         /* Do we need this?
789         r = xops->signal_join(xseg);
790         if (r) {
791                 XSEGLOG("Cannot attach signaling to segment! (error: %d)\n", r);
792                 goto err_free_types;
793         }
794         */
795
796         return xseg;
797
798 err_free_types:
799         pops->mfree(priv->peer_types);
800 err_unmap:
801         xops->unmap(__xseg, size);
802         xhash_free(priv->req_data);
803 err_priv:
804         pops->mfree(priv);
805 err_seg:
806         pops->mfree(xseg);
807 err:
808         return NULL;
809 }
810
811 void xseg_leave(struct xseg *xseg)
812 {
813         struct xseg_type *type;
814
815         __lock_domain();
816         type = __find_or_load_type(xseg->config.type);
817         if (!type) {
818                 XSEGLOG("no segment type '%s'\n", xseg->config.type);
819                 __unlock_domain();
820                 return;
821         }
822         __unlock_domain();
823
824         type->ops.unmap(xseg->segment, xseg->segment_size);
825         //FIXME free xseg?
826 }
827
828 struct xseg_port* xseg_get_port(struct xseg *xseg, uint32_t portno)
829 {
830         xptr p;
831         if (!__validate_port(xseg, portno))
832                 return NULL;
833         p = xseg->ports[portno];
834         if (p)
835                 return XPTR_TAKE(p, xseg->segment);
836         else 
837                 return NULL;
838 }
839
840 struct xq * __alloc_queue(struct xseg *xseg, uint64_t nr_reqs)
841 {
842         uint64_t bytes;
843         void *mem, *buf;
844         struct xq *q;
845         struct xheap *heap = xseg->heap;
846
847         //how many bytes to allocate for a queue
848         bytes = sizeof(struct xq) + nr_reqs*sizeof(xqindex);
849         mem = xheap_allocate(heap, bytes);
850         if (!mem)
851                 return NULL;
852
853         //how many bytes did we got, and calculate what's left of buffer
854         bytes = xheap_get_chunk_size(mem) - sizeof(struct xq);
855
856         //initialize queue with max nr of elements it can hold
857         q = (struct xq *) mem;
858         buf = (void *) (((unsigned long) mem) + sizeof(struct xq));
859         xq_init_empty(q, bytes/sizeof(xqindex), buf); 
860
861         return q;
862 }
863
864 struct xseg_port *xseg_alloc_port(struct xseg *xseg, uint32_t flags, uint64_t nr_reqs)
865 {
866         struct xq *q;
867         struct xobject_h *obj_h = xseg->port_h;
868         struct xseg_port *port = xobj_get_obj(obj_h, flags);
869         if (!port)
870                 return NULL;
871         
872         //alloc free queue
873         q = __alloc_queue(xseg, nr_reqs);
874         if (!q)
875                 goto err_free;
876         port->free_queue = XPTR_MAKE(q, xseg->segment);
877
878         //and for request queue
879         q = __alloc_queue(xseg, nr_reqs);
880         if (!q)
881                 goto err_req;
882         port->request_queue = XPTR_MAKE(q, xseg->segment);
883
884         //and for reply queue
885         q = __alloc_queue(xseg, nr_reqs);
886         if (!q)
887                 goto err_reply;
888         port->reply_queue = XPTR_MAKE(q, xseg->segment);
889
890         xlock_release(&port->fq_lock);
891         xlock_release(&port->rq_lock);
892         xlock_release(&port->pq_lock);
893         xlock_release(&port->port_lock);
894         port->owner = 0; //should be Noone;
895         port->portno = 0; // should be Noport;
896         port->peer_type = 0; //FIXME what  here ??
897         port->alloc_reqs = 0;
898         port->max_alloc_reqs = 512; //FIXME 
899
900
901         return port;
902
903 err_reply:
904         xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
905         port->request_queue = 0;
906 err_req:
907         xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
908         port->free_queue = 0;
909 err_free:
910         xobj_put_obj(obj_h, port);
911
912         return NULL;
913         
914 }
915
916 void xseg_free_port(struct xseg *xseg, struct xseg_port *port)
917 {
918         struct xobject_h *obj_h = xseg->port_h;
919
920         if (port->request_queue) {
921                 xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
922                 port->request_queue = 0;
923         }
924         if (port->free_queue) {
925                 xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
926                 port->free_queue = 0;
927         }
928         if (port->reply_queue) {
929                 xheap_free(XPTR_TAKE(port->reply_queue, xseg->segment));
930                 port->reply_queue = 0;
931         }
932         xobj_put_obj(obj_h, port);
933 }
934
935 void* xseg_alloc_buffer(struct xseg *xseg, uint64_t size)
936 {
937         struct xheap *heap = xseg->heap;
938         void *mem = xheap_allocate(heap, size);
939         if (mem && xheap_get_chunk_size(mem) < size) {
940                 XSEGLOG("Buffer size %llu instead of %llu\n", 
941                                 xheap_get_chunk_size(mem), size);
942                 xheap_free(mem);
943                 mem = NULL;
944         }
945         return mem;
946 }
947
948 void xseg_free_buffer(struct xseg *xseg, void *ptr)
949 {
950         xheap_free(ptr);
951 }
952
953 int xseg_prepare_wait(struct xseg *xseg, uint32_t portno)
954 {
955         if (!__validate_port(xseg, portno))
956                 return -1;
957
958         return xseg->priv->peer_type.peer_ops.prepare_wait(xseg, portno);
959 }
960
961 int xseg_cancel_wait(struct xseg *xseg, uint32_t portno)
962 {
963         if (!__validate_port(xseg, portno))
964                 return -1;
965         return xseg->priv->peer_type.peer_ops.cancel_wait(xseg, portno);
966 }
967
968 int xseg_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
969 {
970         return xseg->priv->peer_type.peer_ops.wait_signal(xseg, usec_timeout);
971 }
972
973 int xseg_signal(struct xseg *xseg, xport portno)
974 {
975         struct xseg_peer *type;
976         struct xseg_port *port = xseg_get_port(xseg, portno);
977         if (!port)
978                 return -1;
979         
980         type = __get_peer_type(xseg, port->peer_type);
981         if (!type)
982                 return -1;
983
984         return type->peer_ops.signal(xseg, portno);
985 }
986
987 int xseg_init_local_signal(struct xseg *xseg, xport portno)
988 {
989         struct xseg_peer *type;
990         struct xseg_port *port = xseg_get_port(xseg, portno);
991         if (!port)
992                 return -1;
993         
994         type = __get_peer_type(xseg, port->peer_type);
995         if (!type)
996                 return -1;
997
998         return type->peer_ops.local_signal_init();
999 }
1000
1001 void xseg_quit_local_signal(struct xseg *xseg, xport portno)
1002 {
1003         struct xseg_peer *type;
1004         struct xseg_port *port = xseg_get_port(xseg, portno);
1005         if (!port)
1006                 return;
1007         
1008         type = __get_peer_type(xseg, port->peer_type);
1009         if (!type)
1010                 return;
1011
1012         type->peer_ops.local_signal_quit();
1013 }
1014
1015 //FIXME doesn't increase alloced reqs
1016 //is integer i enough here?
1017 int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr)
1018 {
1019         int i = 0;
1020         xqindex xqi;
1021         struct xq *q;
1022         struct xseg_request *req;
1023         struct xseg_port *port = xseg_get_port(xseg, portno);
1024         if (!port)
1025                 return -1;
1026
1027         q = XPTR_TAKE(port->free_queue, xseg->segment);
1028         while ((req = xobj_get_obj(xseg->request_h, X_ALLOC)) != NULL && i < nr) {
1029                 xqi = XPTR_MAKE(req, xseg->segment);
1030                 xqi = xq_append_tail(q, xqi, portno);
1031                 if (xqi == Noneidx) {
1032                         xobj_put_obj(xseg->request_h, req);
1033                         break;
1034                 }
1035                 i++;
1036         }
1037
1038         if (i == 0)
1039                 i = -1;
1040         return i;
1041 }
1042
1043 int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
1044 {
1045         int i = 0;
1046         xqindex xqi;
1047         struct xq *q;
1048         struct xseg_request *req;
1049         struct xseg_port *port = xseg_get_port(xseg, portno);
1050         if (!port)
1051                 return -1;
1052
1053         q = XPTR_TAKE(port->free_queue, xseg->segment);
1054         while ((xqi = xq_pop_head(q, portno)) != Noneidx && i < nr) {
1055                 req = XPTR_TAKE(xqi, xseg->segment);
1056                 xobj_put_obj(xseg->request_h, (void *) req);
1057                 i++;
1058         }
1059         if (i == 0)
1060                 return -1;
1061         xlock_acquire(&port->port_lock, portno);
1062         port->alloc_reqs -= i;
1063         xlock_release(&port->port_lock);
1064
1065         return i;
1066 }
1067
1068 int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq, 
1069                         uint32_t src_portno, uint32_t dst_portno)
1070 {
1071         if (!__validate_port(xseg, src_portno))
1072                 return -1;
1073
1074         if (!__validate_port(xseg, dst_portno))
1075                 return -1;
1076
1077         xreq->src_portno = src_portno;
1078         xreq->src_transit_portno = src_portno;
1079         xreq->dst_portno = dst_portno;
1080         xreq->dst_transit_portno = dst_portno;
1081
1082         return 0;
1083 }
1084
1085 struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno, 
1086                                         xport dst_portno, uint32_t flags)
1087 {
1088         /*
1089          * Flags:
1090          * X_ALLOC Allocate more requests if object handler 
1091          *         does not have any avaiable
1092          * X_LOCAL Use only local - preallocated reqs
1093          *         (Maybe we want this as default, to give a hint to a peer
1094          *          how many requests it can have flying)
1095          */
1096         struct xseg_request *req = NULL;
1097         struct xseg_port *port;
1098         struct xq *q;
1099         xqindex xqi;
1100         xptr ptr;
1101
1102         port = xseg_get_port(xseg, src_portno);
1103         if (!port)
1104                 return NULL;
1105         //try to allocate from free_queue
1106         q = XPTR_TAKE(port->free_queue, xseg->segment);
1107         xqi = xq_pop_head(q, src_portno);
1108         if (xqi != Noneidx){
1109                 ptr = xqi;
1110                 req = XPTR_TAKE(ptr, xseg->segment);
1111                 goto done;
1112         }
1113
1114         if (flags & X_LOCAL)
1115                 return NULL;
1116
1117         //else try to allocate from global heap
1118         //FIXME
1119         xlock_acquire(&port->port_lock, src_portno);
1120         if (port->alloc_reqs < port->max_alloc_reqs) {
1121                 req = xobj_get_obj(xseg->request_h, flags & X_ALLOC);
1122                 if (req)
1123                         port->alloc_reqs++;
1124         }
1125         xlock_release(&port->port_lock);
1126         if (!req)
1127                 return NULL;
1128
1129 done:
1130
1131         req->buffer = 0;
1132         req->bufferlen = 0;
1133         req->target = 0;
1134         req->data = 0;
1135         req->datalen = 0;
1136         req->targetlen = 0;
1137         if (xseg_prep_ports(xseg, req, src_portno, dst_portno) < 0) {
1138                 xseg_put_request(xseg, req, src_portno);
1139                 return NULL;
1140         }
1141         req->state = 0;
1142         req->elapsed = 0;
1143         req->timestamp.tv_sec = 0;
1144         req->timestamp.tv_usec = 0;
1145
1146         xq_init_empty(&req->path, MAX_PATH_LEN, req->path_bufs); 
1147
1148         return req;
1149 }
1150
1151 int xseg_put_request (struct xseg *xseg, struct xseg_request *xreq,
1152                         xport portno)
1153 {
1154         xqindex xqi = XPTR_MAKE(xreq, xseg->segment);
1155         struct xq *q;
1156         struct xseg_port *port = xseg_get_port(xseg, xreq->src_portno);
1157         if (!port) 
1158                 return -1;
1159
1160         if (xreq->buffer){
1161                 void *ptr = XPTR_TAKE(xreq->buffer, xseg->segment);
1162                 xseg_free_buffer(xseg, ptr);
1163         }
1164         /* empty path */
1165         xq_init_empty(&xreq->path, MAX_PATH_LEN, xreq->path_bufs); 
1166         
1167         xreq->buffer = 0;
1168         xreq->bufferlen = 0;
1169         xreq->target = 0;
1170         xreq->data = 0;
1171         xreq->datalen = 0;
1172         xreq->targetlen = 0;
1173         xreq->state = 0;
1174         xreq->src_portno = NoPort;
1175         xreq->dst_portno = NoPort;
1176         xreq->src_transit_portno = NoPort;
1177         xreq->dst_transit_portno = NoPort;      
1178         
1179         if (xreq->elapsed != 0) {
1180                 __lock_segment(xseg);
1181                 ++(xseg->counters.req_cnt);
1182                 xseg->counters.avg_req_lat += xreq->elapsed;
1183                 __unlock_segment(xseg);
1184         }
1185
1186
1187         //try to put it in free_queue of the port
1188         q = XPTR_TAKE(port->free_queue, xseg->segment);
1189         xqi = xq_append_head(q, xqi, portno);
1190         if (xqi != Noneidx)
1191                 return 0;
1192         //else return it to segment
1193         xobj_put_obj(xseg->request_h, (void *) xreq);
1194         xlock_acquire(&port->port_lock, portno);
1195         port->alloc_reqs--;
1196         xlock_release(&port->port_lock);
1197         return 0;
1198 }
1199
1200 int xseg_prep_request ( struct xseg* xseg, struct xseg_request *req,
1201                         uint32_t targetlen, uint64_t datalen )
1202 {
1203         uint64_t bufferlen = targetlen + datalen;
1204         void *buf;
1205         req->buffer = 0;
1206         req->bufferlen = 0;
1207         buf = xseg_alloc_buffer(xseg, bufferlen);
1208         if (!buf)
1209                 return -1;
1210         req->bufferlen = xheap_get_chunk_size(buf);
1211         req->buffer = XPTR_MAKE(buf, xseg->segment);
1212         
1213         req->data = req->buffer;
1214         req->target = req->buffer + req->bufferlen - targetlen;
1215         req->datalen = datalen;
1216         req->targetlen = targetlen;
1217         return 0;
1218 }
1219
1220 int xseg_resize_request (struct xseg *xseg, struct xseg_request *req,
1221                         uint32_t new_targetlen, uint64_t new_datalen)
1222 {
1223         if (req->bufferlen >= new_datalen + new_targetlen) {
1224                 req->data = req->buffer;
1225                 req->target = req->buffer + req->bufferlen - new_targetlen;
1226                 req->datalen = new_datalen;
1227                 req->targetlen = new_targetlen;
1228                 return 0;
1229         }
1230
1231         if (req->buffer){
1232                 void *ptr = XPTR_TAKE(req->buffer, xseg->segment);
1233                 xseg_free_buffer(xseg, ptr);
1234         }
1235         req->buffer = 0;
1236         req->bufferlen = 0;
1237         return xseg_prep_request(xseg, req, new_targetlen, new_datalen);
1238 }
1239
1240 static void __update_timestamp(struct xseg_request *xreq)
1241 {
1242         struct timeval tv;
1243
1244         __get_current_time(&tv);
1245         if (xreq->timestamp.tv_sec != 0)
1246                 /*
1247                  * FIXME: Make xreq->elapsed timeval/timespec again to avoid the
1248                  *                multiplication?
1249                  */
1250                 xreq->elapsed += (tv.tv_sec - xreq->timestamp.tv_sec) * 1000000 
1251                                                 + (tv.tv_usec - xreq->timestamp.tv_usec);
1252
1253         xreq->timestamp.tv_sec = tv.tv_sec;
1254         xreq->timestamp.tv_usec = tv.tv_usec;
1255 }
1256
1257 xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq, 
1258                         xport portno, uint32_t flags)
1259 {
1260         xserial serial = NoSerial;
1261         xqindex xqi, r;
1262         struct xq *q, *newq;
1263         xport next, cur;
1264         struct xseg_port *port;
1265
1266         /* discover next and current ports */
1267         if (!__validate_port(xseg, xreq->src_transit_portno)){
1268                 XSEGLOG("couldn't validate src_transit_portno");
1269                 return NoPort;
1270         }
1271         next = xseg->src_gw[xreq->src_transit_portno];
1272         if (next != xreq->src_portno) {
1273                 cur = xreq->src_transit_portno;
1274                 goto submit;
1275         }
1276         
1277         if (!__validate_port(xseg, xreq->dst_transit_portno)){
1278                 XSEGLOG("couldn't validate dst_transit_portno");
1279                 return NoPort;
1280         }
1281         next = xseg->dst_gw[xreq->dst_transit_portno];
1282         if (xreq->dst_transit_portno == xreq->dst_portno)
1283                 cur = xreq->src_transit_portno; 
1284         else 
1285                 cur = xreq->dst_transit_portno;
1286
1287
1288 submit:
1289         port = xseg_get_port(xseg, next);
1290         if (!port){
1291                 XSEGLOG("couldnt get port (next :%u)", next);
1292                 return NoPort;
1293         }
1294
1295         __update_timestamp(xreq);
1296         
1297         xqi = XPTR_MAKE(xreq, xseg->segment);
1298
1299         /* add current port to path */
1300         serial = __xq_append_head(&xreq->path, cur);
1301         if (serial == Noneidx){
1302                 XSEGLOG("couldn't append path head");
1303                 return NoPort;
1304         }
1305
1306         xlock_acquire(&port->rq_lock, portno);
1307         q = XPTR_TAKE(port->request_queue, xseg->segment);
1308         serial = __xq_append_tail(q, xqi);
1309         if (flags & X_ALLOC && serial == Noneidx) {
1310                 /* double up queue size */
1311                 XSEGLOG("trying to double up queue");
1312                 newq = __alloc_queue(xseg, xq_size(q)*2);
1313                 if (!newq)
1314                         goto out_rel;
1315                 r = __xq_resize(q, newq);
1316                 if (r == Noneidx){
1317                         xheap_free(newq);
1318                         goto out_rel;
1319                 }
1320                 port->request_queue = XPTR_MAKE(newq, xseg->segment);
1321                 xheap_free(q);
1322                 serial = __xq_append_tail(newq, xqi);
1323         }
1324
1325 out_rel:
1326         xlock_release(&port->rq_lock);
1327         if (serial == Noneidx){
1328                 XSEGLOG("couldn't append request to queue");
1329                 __xq_pop_head(&xreq->path);
1330                 next = NoPort;
1331         }
1332 out:
1333         return next;
1334         
1335 }
1336
1337 struct xseg_request *xseg_receive(struct xseg *xseg, xport portno)
1338 {
1339         xqindex xqi;
1340         xserial serial = NoSerial;
1341         struct xq *q;
1342         struct xseg_request *req;
1343         struct xseg_port *port = xseg_get_port(xseg, portno);
1344         if (!port)
1345                 return NULL;
1346 retry:
1347         xlock_acquire(&port->pq_lock, portno);
1348         q = XPTR_TAKE(port->reply_queue, xseg->segment);
1349         xqi = __xq_pop_head(q);
1350         xlock_release(&port->pq_lock);
1351
1352         if (xqi == Noneidx)
1353                 return NULL;
1354
1355         req = XPTR_TAKE(xqi, xseg->segment);
1356         __update_timestamp(req);
1357         serial = __xq_pop_head(&req->path);
1358         if (serial == Noneidx){
1359                 /* this should never happen */
1360                 XSEGLOG("pop head of path queue returned Noneidx\n");
1361                 goto retry;
1362         }
1363
1364
1365         return req;
1366 }
1367
1368 struct xseg_request *xseg_accept(struct xseg *xseg, xport portno)
1369 {
1370         xqindex xqi;
1371         struct xq *q;
1372         struct xseg_request *req;
1373         struct xseg_port *port = xseg_get_port(xseg, portno);
1374         if (!port)
1375                 return NULL;
1376         xlock_acquire(&port->rq_lock, portno);
1377         q = XPTR_TAKE(port->request_queue, xseg->segment);
1378         xqi = __xq_pop_head(q);
1379         xlock_release(&port->rq_lock);
1380         if (xqi == Noneidx)
1381                 return NULL;
1382
1383         req = XPTR_TAKE(xqi, xseg->segment);
1384
1385         if (xseg->src_gw[req->src_transit_portno] == portno)
1386                 req->src_transit_portno = portno;
1387         else
1388                 req->dst_transit_portno = portno;
1389
1390
1391         return req;
1392 }
1393
1394 xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
1395                         xport portno, uint32_t flags)
1396 {
1397         xserial serial = NoSerial;
1398         xqindex xqi, r;
1399         struct xq *q, *newq;
1400         struct xseg_port *port;
1401         xport dst;
1402
1403         serial = __xq_peek_head(&xreq->path);
1404         if (serial == Noneidx)
1405                 return NoPort;
1406         dst = (xport) serial;
1407         
1408         port = xseg_get_port(xseg, dst);
1409         if (!port)
1410                 return NoPort;
1411         
1412         xqi = XPTR_MAKE(xreq, xseg->segment);
1413         
1414         xlock_acquire(&port->pq_lock, portno);
1415         q = XPTR_TAKE(port->reply_queue, xseg->segment);
1416         serial = __xq_append_tail(q, xqi);
1417         if (flags & X_ALLOC && serial == Noneidx) {
1418                 newq = __alloc_queue(xseg, xq_size(q)*2);
1419                 if (!newq) 
1420                         goto out_rel;
1421                 r = __xq_resize(q, newq);
1422                 if (r == Noneidx) {
1423                         xheap_free(newq);
1424                         goto out_rel;
1425                 }
1426                 port->reply_queue = XPTR_MAKE(newq, xseg->segment);
1427                 xheap_free(q);
1428                 serial = __xq_append_tail(newq, xqi);
1429         }
1430
1431 out_rel:
1432         xlock_release(&port->pq_lock);
1433         
1434         if (serial == Noneidx)
1435                 dst = NoPort;
1436         return dst;
1437         
1438 }
1439
1440 xport xseg_set_srcgw(struct xseg *xseg, xport portno, xport srcgw)
1441 {
1442         if (!__validate_port(xseg, portno))
1443                 return NoPort;
1444         xseg->src_gw[portno] = srcgw;
1445         return srcgw;
1446 }
1447
1448 xport xseg_getandset_srcgw(struct xseg *xseg, xport portno, xport srcgw)
1449 {
1450         xport prev_portno;
1451         do {
1452                 prev_portno = xseg->src_gw[portno];
1453                 xseg->src_gw[srcgw] = prev_portno;
1454         }while(!(__sync_bool_compare_and_swap(&xseg->src_gw[portno], prev_portno, srcgw)));
1455         return prev_portno; 
1456 }
1457
1458 xport xseg_set_dstgw(struct xseg *xseg, xport portno, xport dstgw)
1459 {
1460         if (!__validate_port(xseg, portno))
1461                 return NoPort;
1462         xseg->dst_gw[portno] = dstgw;
1463         return dstgw;
1464 }
1465
1466 xport xseg_getandset_dstgw(struct xseg *xseg, xport portno, xport dstgw)
1467 {
1468         xport prev_portno;
1469         do {
1470                 prev_portno = xseg->dst_gw[portno];
1471                 xseg->dst_gw[dstgw] = prev_portno;
1472         }while(!(__sync_bool_compare_and_swap(&xseg->dst_gw[portno], prev_portno, dstgw)));
1473         return prev_portno;
1474 }
1475
1476 int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data)
1477 {
1478         int r;
1479         xhash_t *req_data;
1480         
1481         xlock_acquire(&xseg->priv->reqdatalock, 1);
1482
1483         req_data = xseg->priv->req_data;
1484         r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1485         if (r == -XHASH_ERESIZE) {
1486                 req_data = xhash_resize(req_data, xhash_grow_size_shift(req_data), NULL);
1487                 if (req_data) {
1488                         xseg->priv->req_data = req_data;
1489                         r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1490                 }
1491         }
1492
1493         xlock_release(&xseg->priv->reqdatalock);
1494         return r;
1495 }
1496
1497 int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data)
1498 {
1499         int r;
1500         xhashidx val;
1501         xhash_t *req_data;
1502         
1503         xlock_acquire(&xseg->priv->reqdatalock, 1);
1504
1505         req_data = xseg->priv->req_data;
1506         //maybe we need a xhash_delete with lookup...
1507         //maybe we also need a delete that doesn't shrink xhash
1508         r = xhash_lookup(req_data, (xhashidx) xreq, &val);
1509         *data = (void *) val;
1510         if (r >= 0) {
1511                 r = xhash_delete(req_data, (xhashidx) xreq);
1512                 if (r == -XHASH_ERESIZE) {
1513                         req_data = xhash_resize(req_data, xhash_shrink_size_shift(req_data), NULL);
1514                         if (req_data){
1515                                 xseg->priv->req_data = req_data;
1516                                 r = xhash_delete(req_data, (xhashidx) xreq);
1517                         }
1518                 }
1519         }
1520
1521         xlock_release(&xseg->priv->reqdatalock);
1522         return r;
1523 }
1524
1525 struct xobject_h * xseg_get_objh(struct xseg *xseg, uint32_t magic, uint64_t size)
1526 {
1527         int r;
1528         struct xobject_h *obj_h = xobj_get_obj(xseg->object_handlers, X_ALLOC);
1529         if (!obj_h)
1530                 return NULL;
1531         r = xobj_handler_init(obj_h, xseg->segment, magic, size, xseg->heap);
1532         if (r < 0) {
1533                 xobj_put_obj(xseg->object_handlers, obj_h);
1534                 return NULL;
1535         }
1536         return obj_h;
1537 }
1538
1539 void xseg_put_objh(struct xseg *xseg, struct xobject_h *objh)
1540 {
1541         xobj_put_obj(xseg->object_handlers, objh);
1542 }
1543
1544
1545 /*
1546 int xseg_complete_req(struct xseg_request *req)
1547 {
1548         req->state |= XS_SERVED;
1549         req->state &= ~XS_FAILED;
1550 }
1551
1552 int xseg_fail_req(struct xseg_request *req)
1553 {
1554         req->state &= ~XS_SERVED;
1555         req->state |= XS_FAILED;
1556 }
1557 */
1558
1559 struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req, void * sd)
1560 {
1561         uint32_t portno, maxno, id = __get_id(), force;
1562         struct xseg_port *port = NULL;
1563
1564         if (req >= xseg->config.nr_ports) {
1565                 portno = 0;
1566                 maxno = xseg->config.nr_ports;
1567                 force = 0;
1568         } else {
1569                 portno = req;
1570                 maxno = req + 1;
1571                 force = 1;
1572         }
1573
1574         __lock_segment(xseg);
1575         for (; portno < maxno; portno++) {
1576                 int64_t driver;
1577                 if (!xseg->ports[portno]) {
1578                         port = xseg_alloc_port(xseg, X_ALLOC, XSEG_DEF_REQS);
1579                         if (!port)
1580                                 goto out;
1581                 } else if (force) {
1582                         port = xseg_get_port(xseg, portno);
1583                         if (!port)
1584                                 goto out;       
1585                 } else {
1586                         continue;
1587                 }
1588                 driver = __enable_driver(xseg, &xseg->priv->peer_type);
1589                 if (driver < 0)
1590                         break;
1591                 if (!sd){
1592                         void *peer_data = __get_peer_type_data(xseg, (uint64_t) driver);
1593                         if (!peer_data)
1594                                 break;
1595                         void *sigdesc = xseg->priv->peer_type.peer_ops.alloc_signal_desc(xseg, peer_data);
1596                         if (!sigdesc)
1597                                 break;
1598                         int r = xseg->priv->peer_type.peer_ops.init_signal_desc(xseg, sigdesc);
1599                         if (r < 0){
1600                                 xseg->priv->peer_type.peer_ops.free_signal_desc(xseg, peer_data, sigdesc);
1601                                 break;
1602                         }
1603                         port->signal_desc = XPTR_MAKE(sigdesc, xseg->segment);
1604                 } else {
1605                         port->signal_desc = XPTR_MAKE(sd, xseg->segment);
1606                 }
1607                 port->peer_type = (uint64_t)driver;
1608                 port->owner = id;
1609                 port->portno = portno;
1610                 xseg->ports[portno] = XPTR_MAKE(port, xseg->segment);
1611                 goto out;
1612         }
1613         if (port) {
1614                 xseg_free_port(xseg, port);
1615                 port = NULL;
1616         }
1617 out:
1618         __unlock_segment(xseg);
1619         return port;
1620 }
1621
1622 int xseg_leave_port(struct xseg *xseg, struct xseg_port *port)
1623 {
1624         /* To be implemented */
1625         return -1;
1626 }
1627
1628 int xseg_initialize(void)
1629 {
1630         return __xseg_preinit();        /* with or without lock ? */
1631 }
1632
1633 int xseg_finalize(void)
1634 {
1635         /* finalize not supported yet */
1636         return -1;
1637 }
1638
1639
1640 #ifdef __KERNEL__
1641 #include <linux/module.h>
1642 #include <xseg/xseg_exports.h>
1643 #endif
1644