seperate malloced struct xseg from mmapped segment
[archipelago] / xseg / xseg / xseg.c
1 #include <xseg/xseg.h>
2 #include <xseg/domain.h>
3 #include <sys/util.h>
4
5 #ifndef NULL
6 #define NULL ((void *)0)
7 #endif
8
9 #define XSEG_NR_TYPES 16
10 #define XSEG_NR_PEER_TYPES 64
11 #define XSEG_MIN_PAGE_SIZE 4096
12
13 static struct xseg_type *__types[XSEG_NR_TYPES];
14 static unsigned int __nr_types;
15 static struct xseg_peer *__peer_types[XSEG_NR_PEER_TYPES];
16 static unsigned int __nr_peer_types;
17
18 static void __lock_segment(struct xseg *xseg)
19 {
20         volatile uint64_t *flags;
21         flags = &xseg->shared->flags;
22         while (__sync_fetch_and_or(flags, XSEG_F_LOCK));
23 }
24
25 static void __unlock_segment(struct xseg *xseg)
26 {
27         volatile uint64_t *flags;
28         flags = &xseg->shared->flags;
29         __sync_fetch_and_and(flags, ~XSEG_F_LOCK);
30 }
31
32 static struct xseg_type *__find_type(const char *name, long *index)
33 {
34         long i;
35         for (i = 0; (*index = i) < __nr_types; i++)
36                 if (!strncmp(__types[i]->name, name, XSEG_TNAMESIZE))
37                         return __types[i];
38         return NULL;
39 }
40
41 static struct xseg_peer *__find_peer_type(const char *name, int64_t *index)
42 {
43         int64_t i;
44         for (i = 0; (*index = i) < __nr_peer_types; i++) {
45                 if (!strncmp(__peer_types[i]->name, name, XSEG_TNAMESIZE))
46                         return __peer_types[i];
47         }
48         return NULL;
49 }
50
51 void xseg_report_peer_types(void)
52 {
53         long i;
54         XSEGLOG("total %u peer types:\n", __nr_peer_types);
55         for (i = 0; i < __nr_peer_types; i++)
56                 XSEGLOG("%ld: '%s'\n", i, __peer_types[i]->name);
57 }
58
59 static struct xseg_type *__find_or_load_type(const char *name)
60 {
61         long i;
62         struct xseg_type *type = __find_type(name, &i);
63         if (type)
64                 return type;
65
66         __load_plugin(name);
67         return __find_type(name, &i);
68 }
69
70 static struct xseg_peer *__find_or_load_peer_type(const char *name)
71 {
72         int64_t i;
73         struct xseg_peer *peer_type = __find_peer_type(name, &i);
74         if (peer_type)
75                 return peer_type;
76
77         __load_plugin(name);
78         return __find_peer_type(name, &i);
79 }
80
81 static struct xseg_peer *__get_peer_type(struct xseg *xseg, uint32_t serial)
82 {
83         char *name;
84         struct xseg_peer *type;
85         struct xseg_private *priv = xseg->priv;
86         char (*shared_peer_types)[XSEG_TNAMESIZE];
87
88         if (serial >= xseg->max_peer_types) {
89                 XSEGLOG("invalid peer type serial %d >= %d\n",
90                          serial, xseg->max_peer_types);
91                 return NULL;
92         }
93
94         type = priv->peer_types[serial];
95         if (type)
96                 return type;
97
98         /* xseg->shared->peer_types is an append-only array,
99          * therefore this should be safe
100          * without either locking or string copying. */
101         shared_peer_types = XSEG_TAKE_PTR(xseg->shared->peer_types, xseg->segment);
102         name = shared_peer_types[serial];
103         if (!*name) {
104                 XSEGLOG("nonexistent peer type serial %d\n", serial);
105                 return NULL;
106         }
107
108         type = __find_or_load_peer_type(name);
109         if (!type)
110                 XSEGLOG("could not find driver for peer type %d [%s]\n",
111                          serial, name);
112
113         priv->peer_types[serial] = type;
114         return type;
115 }
116
117 static inline int __validate_port(struct xseg *xseg, uint32_t portno)
118 {
119         return portno < xseg->config.nr_ports;
120 }
121
122 static inline int __validate_ptr(struct xseg *xseg, xptr ptr)
123 {
124         return ptr < xseg->segment_size;
125 }
126
127 /* type:name:nr_ports:nr_requests:request_size:extra_size:page_shift */
128
129 #define TOK(s, sp, def) \
130         (s) = (sp); \
131         for (;;) { \
132                 switch (*(sp)) { \
133                 case 0: \
134                         s = (def); \
135                         break; \
136                 case ':': \
137                         *(sp)++ = 0; \
138                         break; \
139                 default: \
140                         (sp) ++; \
141                         continue; \
142                 } \
143                 break; \
144         } \
145
146 static unsigned long strul(char *s)
147 {
148         unsigned long n = 0;
149         for (;;) {
150                 unsigned char c = *s - '0';
151                 if (c >= 10)
152                         break;
153                 n = n * 10 + c;
154                 s ++;
155         }
156         return n;
157 }
158
159 /*
160 static char *strncopy(char *dest, const char *src, uint32_t n)
161 {
162         uint32_t i;
163         char c;
164         for (i = 0; i < n; i++) {
165                 c = src[i];
166                 dest[i] = c;
167                 if (!c)
168                         break;
169         }
170         dest[n-1] = 0;
171         return dest;
172 }
173 */
174
175 int xseg_parse_spec(char *segspec, struct xseg_config *config)
176 {
177         /* default: "posix:globalxseg:4:512:64:1024:12" */
178         char *s = segspec, *sp = segspec;
179
180         /* type */
181         TOK(s, sp, "posix");
182         strncpy(config->type, s, XSEG_TNAMESIZE);
183         config->type[XSEG_TNAMESIZE-1] = 0;
184
185         /* name */
186         TOK(s, sp, "globalxseg");
187         strncpy(config->name, s, XSEG_NAMESIZE);
188         config->name[XSEG_NAMESIZE-1] = 0;
189
190         /* nr_ports */
191         TOK(s, sp, "4");
192         config->nr_ports = strul(s);
193
194         /* nr_requests */
195         TOK(s, sp, "512");
196         config->nr_requests = strul(s);
197
198         /* request_size */
199         TOK(s, sp, "64");
200         config->request_size = strul(s);
201
202         /* extra_size */
203         TOK(s, sp, "128");
204         config->extra_size = strul(s);
205
206         /* page_shift */
207         TOK(s, sp, "12");
208         config->page_shift = strul(s);
209         return 0;
210 }
211
212 int xseg_register_type(struct xseg_type *type)
213 {
214         long i;
215         int r = -1;
216         struct xseg_type *__type;
217         __lock_domain();
218         __type = __find_type(type->name, &i);
219         if (__type) {
220                 XSEGLOG("type %s already exists\n", type->name);
221                 goto out;
222         }
223
224         if (__nr_types >= XSEG_NR_TYPES) {
225                 XSEGLOG("maximum type registrations reached: %u\n", __nr_types);
226                 r -= 1;
227                 goto out;
228         }
229
230         type->name[XSEG_TNAMESIZE-1] = 0;
231         __types[__nr_types] = type;
232         __nr_types += 1;
233         r = 0;
234 out:
235         __unlock_domain();
236         return r;
237 }
238
239 int xseg_unregister_type(const char *name)
240 {
241         long i;
242         int r = -1;
243         struct xseg_type *__type;
244         __lock_domain();
245         __type = __find_type(name, &i);
246         if (!__type) {
247                 XSEGLOG("segment type '%s' does not exist\n", name);
248                 goto out;
249         }
250
251         __nr_types -= 1;
252         __types[i] = __types[__nr_types];
253         __types[__nr_types] = NULL;
254         r = 0;
255 out:
256         __unlock_domain();
257         return r;
258 }
259
260 int xseg_register_peer(struct xseg_peer *peer_type)
261 {
262         int64_t i;
263         int r = -1;
264         struct xseg_peer *type;
265         __lock_domain();
266         type = __find_peer_type(peer_type->name, &i);
267         if (type) {
268                 XSEGLOG("peer type '%s' already exists\n", type->name);
269                 goto out;
270         }
271
272         if (__nr_peer_types >= XSEG_NR_PEER_TYPES) {
273                 XSEGLOG("maximum peer type registrations reached: %u",
274                         __nr_peer_types);
275                 r -= 1;
276                 goto out;
277         }
278
279         if (peer_type->peer_ops.signal_init()) {
280                 XSEGLOG("peer type '%s': signal initialization failed\n",
281                         peer_type->name);
282                 r -= 1;
283                 goto out;
284         }
285
286         peer_type->name[XSEG_TNAMESIZE-1] = 0;
287         __peer_types[__nr_peer_types] = peer_type;
288         __nr_peer_types += 1;
289         r = 0;
290
291 out:
292         __unlock_domain();
293         return r;
294 }
295
296 int xseg_unregister_peer(const char *name)
297 {
298         int64_t i;
299         struct xseg_peer *driver;
300         int r = -1;
301         __lock_domain();
302         driver = __find_peer_type(name, &i);
303         if (!driver) {
304                 XSEGLOG("peer type '%s' does not exist\n", name);
305                 goto out;
306         }
307
308         __nr_peer_types -= 1;
309         __peer_types[i] = __peer_types[__nr_peer_types];
310         __peer_types[__nr_peer_types] = NULL;
311         driver->peer_ops.signal_quit();
312         r = 0;
313 out:
314         __unlock_domain();
315         return r;
316 }
317
318 int64_t __enable_driver(struct xseg *xseg, struct xseg_peer *driver)
319 {
320         int64_t r;
321         char (*drivers)[XSEG_TNAMESIZE];
322         uint32_t max_drivers = xseg->max_peer_types;
323
324         if (xseg->shared->nr_peer_types >= max_drivers) {
325                 XSEGLOG("cannot register '%s': driver namespace full\n",
326                         driver->name);
327                 return -1;
328         }
329
330         drivers = XSEG_TAKE_PTR(xseg->shared->peer_types, xseg->segment);
331         for (r = 0; r < max_drivers; r++) {
332                 if (!*drivers[r])
333                         goto bind;
334                 if (!strncmp(drivers[r], driver->name, XSEG_TNAMESIZE))
335                         goto success;
336         }
337
338         /* Unreachable */
339         return -666;
340
341 bind:
342         /* assert(xseg->shared->nr_peer_types == r); */
343         xseg->shared->nr_peer_types = r + 1;
344         strncpy(drivers[r], driver->name, XSEG_TNAMESIZE);
345         drivers[r][XSEG_TNAMESIZE-1] = 0;
346
347 success:
348         xseg->priv->peer_types[r] = driver;
349         return r;
350 }
351
352 int64_t xseg_enable_driver(struct xseg *xseg, const char *name)
353 {
354         int64_t r = -1;
355         struct xseg_peer *driver;
356
357         __lock_domain();
358         driver = __find_peer_type(name, &r);
359         if (!driver) {
360                 XSEGLOG("driver '%s' not found\n", name);
361                 goto out;
362         }
363
364         __lock_segment(xseg);
365         r = __enable_driver(xseg, driver);
366         __unlock_segment(xseg);
367 out:
368         __unlock_domain();
369         return r;
370 }
371
372 int xseg_disable_driver(struct xseg *xseg, const char *name)
373 {
374         int64_t i;
375         int r = -1;
376         struct xseg_private *priv = xseg->priv;
377         struct xseg_peer *driver;
378         __lock_domain();
379         driver =  __find_peer_type(name, &i);
380         if (!driver) {
381                 XSEGLOG("driver '%s' not found\n", name);
382                 goto out;
383         }
384
385         for (i = 0; i < xseg->max_peer_types; i++)
386                 if (priv->peer_types[i] == driver)
387                         priv->peer_types[i] = NULL;
388         r = 0;
389 out:
390         __unlock_domain();
391         return r;
392 }
393
394 /* NOTE: calculate_segment_size() and initialize_segment()
395  * must always be exactly in sync!
396 */
397
398 static uint64_t calculate_segment_size(struct xseg_config *config)
399 {
400         uint64_t size = 0;
401         uint32_t page_size, page_shift = config->page_shift;
402
403         /* assert(sizeof(struct xseg) <= (1 << 9)); */
404
405         if (page_shift < 9) {
406                 XSEGLOG("page_shift must be >= %d\n", 9);
407                 return 0;
408         }
409
410         page_size = 1 << page_shift;
411
412         /* struct xseg itself */
413         size += page_size + config->heap_size;
414         size = __align(size, page_shift);
415         
416         return size;
417 }
418
419 static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
420 {
421         uint32_t page_shift = cfg->page_shift, page_size = 1 << page_shift;
422         struct xseg_shared *shared;
423         char *segment = (char *)xseg;
424         struct xq *q;
425         void *qmem;
426         uint64_t bodysize, size = page_size, i;
427         xptr mem;
428         struct xseg_heap *heap;
429         xhash_t *xhash;
430         struct xseg_object_handler *obj_h;
431
432
433         if (page_size < XSEG_MIN_PAGE_SIZE)
434                 return -1;
435
436         xseg->segment_size = size;
437         xseg->segment = segment;
438
439         /* build heap */
440         xseg->heap = XSEG_MAKE_PTR(segment + size, segment);
441         size += sizeof(xseg_heap);
442         size = __align(size, page_shift);
443
444         heap = XSEG_TAKE_PTR(xseg->heap, segment);
445         heap->size = config->heap_size;
446         heap->start = XSEG_MAKE_PTR(segment+size, segment);
447         heap->cur = heap->start;
448
449         /* build object_handler handler */
450         mem = xseg_allocate(heap, sizeof(struct xseg_object_handler));
451         if (!mem)
452                 return -1;
453         xseg->object_handlers = mem;
454         obj_h = XSEG_TAKE_PTR(xseg->object_handlers, segment);
455         r = xseg_init_object_handler(segment, obj_h, MAGIC_OBJH, 
456                         sizeof(struct xseg_object_handler), xseg->heap);
457         if (!r)
458                 return -1;
459
460         //now that we have object handlers handler, use that to allocate
461         //new object handlers
462         
463         //allocate requests handler
464         mem = xseg_get_obj(obj_h, X_ALLOC);
465         if (!mem)
466                 return -1;
467         obj_h = XSEG_TAKE_PTR(mem, segment);
468         r = xseg_init_object_handler(segment, obj_h, MAGIC_REQ, 
469                         sizeof(struct xseg_request), xseg->heap);
470         if (!r)
471                 return -1;
472         xseg->requests = mem;
473         
474         //allocate ports handler
475         obj_h = XSEG_TAKE_PTR(xseg->object_handlers, segment);
476         mem = xseg_get_obj(obj_h, X_ALLOC);
477         if (!mem)
478                 return -1;
479         obj_h = XSEG_TAKE_PTR(mem, segment);
480         r = xseg_init_object_handler(segment, obj_h, MAGIC_PORT, 
481                         sizeof(struct xseg_port), xseg->heap);
482         if (!r)
483                 return -1;
484         xseg->ports = mem;
485         
486         //allocate buffers4K handler
487         obj_h = XSEG_TAKE_PTR(xseg->object_handlers, segment);
488         mem = xseg_get_obj(obj_h, X_ALLOC);
489         if (!mem)
490                 return -1;
491         obj_h = XSEG_TAKE_PTR(mem, segment);
492         r = xseg_init_object_handler(segment, obj_h, MAGIC_4K, 
493                         4096, xseg->heap);
494         if (!r)
495                 return -1;
496         xseg->buffers4K = mem;
497
498         //allocate buffers256K handler
499         obj_h = XSEG_TAKE_PTR(xseg->object_handlers, segment);
500         mem = xseg_get_obj(obj_h, X_ALLOC);
501         if (!mem)
502                 return -1;
503         obj_h = XSEG_TAKE_PTR(mem, segment);
504         r = xseg_init_object_handler(segment, obj_h, MAGIC_256K, 
505                         256*1024, xseg->heap);
506         if (!r)
507                 return -1;
508         xseg->buffers256K = mem;
509
510         //allocate buffers4M handler
511         obj_h = XSEG_TAKE_PTR(xseg->object_handlers, segment);
512         mem = xseg_get_obj(obj_h, X_ALLOC);
513         if (!mem)
514                 return -1;
515         obj_h = XSEG_TAKE_PTR(mem, segment);
516         r = xseg_init_object_handler(segment, obj_h, MAGIC_4M, 
517                         4096*1024, xseg->heap);
518         if (!r)
519                 return -1;
520         xseg->buffers4M = mem;
521
522         //allocate xseg_shared memory
523         mem = xseg_allocate(heap, sizeof(struct xseg_shared));
524         if (!mem)
525                 return -1;
526         shared = (struct xseg_shared *) XSEG_TAKE_PTR(mem, segment);
527         shared->flags = 0;
528         shared->nr_peer_types = 0;
529         xseg->shared = mem;
530         
531         mem = xseg_allocate(heap, page_size);
532         if (!mem)
533                 return -1;
534         shared->peer_types = mem;
535         xseg->max_peer_types = get_alloc_bytes(page_size) / XSEG_TNAMESIZE;
536
537         memcpy(&xseg->config, cfg, sizeof(struct xseg_config));
538
539         xseg->counters.req_cnt = 0;
540         xseg->counters.avg_req_lat = 0;
541
542         return 0;
543 }
544
545 int xseg_create(struct xseg_config *cfg)
546 {
547         struct xseg *xseg = NULL;
548         struct xseg_type *type;
549         struct xseg_operations *xops;
550         uint64_t size;
551         long r;
552
553         type = __find_or_load_type(cfg->type);
554         if (!type) {
555                 cfg->type[XSEG_TNAMESIZE-1] = 0;
556                 XSEGLOG("type '%s' does not exist\n", cfg->type);
557                 goto out_err;
558         }
559
560         size = calculate_segment_size(cfg);
561         if (!size) {
562                 XSEGLOG("invalid config!\n");
563                 goto out_err;
564         }
565
566         xops = &type->ops;
567         cfg->name[XSEG_NAMESIZE-1] = 0;
568         r = xops->allocate(cfg->name, size);
569         if (r) {
570                 XSEGLOG("cannot allocate segment!\n");
571                 goto out_err;
572         }
573
574         xseg = xops->map(cfg->name, size, NULL);
575         if (!xseg) {
576                 XSEGLOG("cannot map segment!\n");
577                 goto out_deallocate;
578         }
579
580         r = initialize_segment(xseg, cfg);
581         xops->unmap(xseg, size);
582         if (r) {
583                 XSEGLOG("cannot initilize segment!\n");
584                 goto out_deallocate;
585         }
586
587
588         return 0;
589
590 out_deallocate:
591         xops->deallocate(cfg->name);
592 out_err:
593         return -1;
594 }
595
596 void xseg_destroy(struct xseg *xseg)
597 {
598         struct xseg_type *type;
599
600         __lock_domain();
601         type = __find_or_load_type(xseg->config.type);
602         if (!type) {
603                 XSEGLOG("no segment type '%s'\n", xseg->config.type);
604                 goto out;
605         }
606
607         /* should destroy() leave() first? */
608         type->ops.deallocate(xseg->config.name);
609 out:
610         __unlock_domain();
611 }
612
613 static int pointer_ok(  unsigned long ptr,
614                         unsigned long base,
615                         uint64_t size,
616                         char *name)
617 {
618         int ret = !(ptr >= base && ptr < base + size);
619         if (ret)
620                 XSEGLOG("invalid pointer '->%s' [%llx on %llx]!\n",
621                         (unsigned long long)ptr,
622                         (unsigned long long)base,
623                         name);
624         return ret;
625 }
626
627 #define POINTER_OK(xseg, field, base) \
628          pointer_ok(    (unsigned long)((xseg)->field), \
629                         (unsigned long)(base), \
630                         (xseg)->segment_size, \
631                         #field)
632
633 static int xseg_validate_pointers(struct xseg *xseg)
634 {
635         int r = 0;
636         r += POINTER_OK(xseg, requests, xseg->segment);
637         r += POINTER_OK(xseg, free_requests, xseg->segment);
638         r += POINTER_OK(xseg, ports, xseg->segment);
639         r += POINTER_OK(xseg, buffers, xseg->segment);
640         r += POINTER_OK(xseg, extra, xseg->segment);
641         r += POINTER_OK(xseg, shared, xseg->segment);
642         return r;
643 }
644
645 struct xseg *xseg_join( char *segtypename,
646                         char *segname,
647                         char *peertypename,
648                         void (*wakeup)
649                         (       struct xseg *xseg,
650                                 uint32_t portno         ))
651 {
652         struct xseg *xseg, *__xseg;
653         uint64_t size;
654         struct xseg_peer *peertype;
655         struct xseg_type *segtype;
656         struct xseg_private *priv;
657         struct xseg_operations *xops;
658         struct xseg_peer_operations *pops;
659         int r;
660
661         __lock_domain();
662
663         peertype = __find_or_load_peer_type(peertypename);
664         if (!peertype) {
665                 XSEGLOG("Peer type '%s' not found\n", peertypename);
666                 __unlock_domain();
667                 goto err;
668         }
669
670         segtype = __find_or_load_type(segtypename);
671         if (!segtype) {
672                 XSEGLOG("Segment type '%s' not found\n", segtypename);
673                 __unlock_domain();
674                 goto err;
675         }
676
677         __unlock_domain();
678
679         xops = &segtype->ops;
680         pops = &peertype->peer_ops;
681
682         xseg = pops->malloc(sizeof(struct xseg));
683         if (!xseg) {
684                 XSEGLOG("Cannot allocate memory");
685                 goto err;
686         }
687
688         priv = pops->malloc(sizeof(struct xseg_private));
689         if (!priv) {
690                 XSEGLOG("Cannot allocate memory");
691                 goto err_seg;
692         }
693
694         __xseg = xops->map(segname, XSEG_MIN_PAGE_SIZE, NULL);
695         if (!__xseg) {
696                 XSEGLOG("Cannot map segment");
697                 goto err_priv;
698         }
699
700         size = __xseg->segment_size;
701         /* XSEGLOG("joined segment of size: %lu\n", (unsigned long)size); */
702         xops->unmap(__xseg, XSEG_MIN_PAGE_SIZE);
703
704         __xseg = xops->map(segname, size, xseg);
705         if (!__xseg) {
706                 XSEGLOG("Cannot map segment");
707                 goto err_priv;
708         }
709
710         priv->segment_type = *segtype;
711         priv->peer_type = *peertype;
712         priv->wakeup = wakeup;
713         xseg->max_peer_types = __xseg->max_peer_types;
714
715         priv->peer_types = pops->malloc(sizeof(void *) * xseg->max_peer_types);
716         if (!priv->peer_types) {
717                 XSEGLOG("Cannot allocate memory");
718                 goto err_unmap;
719         }
720         memset(priv->peer_types, 0, sizeof(void *) * xseg->max_peer_types);
721
722         xseg->priv = priv;
723         xseg->config = __xseg->config;
724         xseg->version = __xseg->version;
725         xseg->requests = XSEG_TAKE_PTR(__xseg->requests, __xseg);
726         xseg->free_requests = XSEG_TAKE_PTR(__xseg->free_requests, __xseg);
727         xseg->ports = XSEG_TAKE_PTR(__xseg->ports, __xseg);
728         xseg->buffers = XSEG_TAKE_PTR(__xseg->buffers, __xseg);
729         xseg->extra = XSEG_TAKE_PTR(__xseg->extra, __xseg);
730         xseg->shared = XSEG_TAKE_PTR(__xseg->shared, __xseg);
731         xseg->segment_size = size;
732         xseg->segment = __xseg;
733
734         r = xseg_validate_pointers(xseg);
735         if (r) {
736                 XSEGLOG("found %d invalid xseg pointers!\n", r);
737                 goto err_free_types;
738         }
739
740         /* Do we need this?
741         r = xops->signal_join(xseg);
742         if (r) {
743                 XSEGLOG("Cannot attach signaling to segment! (error: %d)\n", r);
744                 goto err_free_types;
745         }
746         */
747
748         return xseg;
749
750 err_free_types:
751         pops->mfree(priv->peer_types);
752 err_unmap:
753         xops->unmap(__xseg, size);
754 err_priv:
755         pops->mfree(priv);
756 err_seg:
757         pops->mfree(xseg);
758 err:
759         return NULL;
760 }
761
762 void xseg_leave(struct xseg *xseg)
763 {
764         struct xseg_type *type;
765
766         __lock_domain();
767         type = __find_or_load_type(xseg->config.type);
768         if (!type) {
769                 XSEGLOG("no segment type '%s'\n", xseg->config.type);
770                 __unlock_domain();
771                 return;
772         }
773         __unlock_domain();
774
775         type->ops.unmap(xseg->segment, xseg->segment_size);
776 }
777
778 int xseg_prepare_wait(struct xseg *xseg, uint32_t portno)
779 {
780         if (!__validate_port(xseg, portno))
781                 return -1;
782
783         return xseg->priv->peer_type.peer_ops.prepare_wait(xseg, portno);
784 }
785
786 int xseg_cancel_wait(struct xseg *xseg, uint32_t portno)
787 {
788         if (!__validate_port(xseg, portno))
789                 return -1;
790         return xseg->priv->peer_type.peer_ops.cancel_wait(xseg, portno);
791 }
792
793 int xseg_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
794 {
795         return xseg->priv->peer_type.peer_ops.wait_signal(xseg, usec_timeout);
796 }
797
798 int xseg_signal(struct xseg *xseg, uint32_t portno)
799 {
800         struct xseg_peer *type;
801         struct xseg_port *port;
802         if (!__validate_port(xseg, portno))
803                 return -1;
804
805         port = &xseg->ports[portno];
806         type = __get_peer_type(xseg, port->peer_type);
807         if (!type)
808                 return -1;
809
810         return type->peer_ops.signal(xseg, portno);
811 }
812
813 int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr)
814 {
815         struct xseg_port *port;
816         if (!__validate_port(xseg, portno))
817                 return -1;
818
819         port = &xseg->ports[portno];
820         return xq_head_to_tail(xseg->free_requests, &port->free_queue, nr, portno);
821 }
822
823 int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
824 {
825         struct xseg_port *port;
826         if (!__validate_port(xseg, portno))
827                 return -1;
828
829         port = &xseg->ports[portno];
830         return xq_head_to_tail(&port->free_queue, xseg->free_requests, nr, portno);
831 }
832
833 struct xseg_request *xseg_get_request(struct xseg *xseg, uint32_t portno)
834 {
835         struct xseg_request *req;
836         struct xseg_port *port;
837         xqindex xqi;
838         if (!__validate_port(xseg, portno))
839                 return NULL;
840
841         port = &xseg->ports[portno];
842         xqi = xq_pop_head(&port->free_queue, portno);
843         if (xqi == Noneidx)
844                 return NULL;
845
846         req = &xseg->requests[xqi];
847         req->portno = portno;
848
849         req->elapsed = 0;
850         req->timestamp.tv_sec = 0;
851         req->timestamp.tv_usec = 0;
852
853         return req;
854 }
855
856 int xseg_put_request (  struct xseg *xseg,
857                         uint32_t portno,
858                         struct xseg_request *xreq )
859 {
860         xqindex xqi = xreq - xseg->requests;
861         xreq->data = xreq->buffer;
862         xreq->datalen = xreq->bufferlen;
863         xreq->target = NULL;
864         xreq->targetlen = 0;
865
866         if (xreq->elapsed != 0) {
867                 __lock_segment(xseg);
868                 ++(xseg->counters.req_cnt);
869                 xseg->counters.avg_req_lat += xreq->elapsed;
870                 __unlock_segment(xseg);
871         }
872
873         return xq_append_head(&xseg->ports[portno].free_queue, xqi, portno) == Noneidx;
874 }
875
876 int xseg_prep_request ( struct xseg_request *req,
877                         uint32_t targetlen, uint64_t datalen )
878 {
879         if (targetlen + datalen > req->bufferlen)
880                 return -1;
881
882         req->data = req->buffer;
883         req->target = req->buffer + req->bufferlen - targetlen;
884         req->datalen = datalen;
885         req->targetlen = targetlen;
886         return 0;
887 }
888
889 static void __update_timestamp(struct xseg_request *xreq)
890 {
891         struct timeval tv;
892
893         __get_current_time(&tv);
894         if (xreq->timestamp.tv_sec != 0)
895                 /*
896                  * FIXME: Make xreq->elapsed timeval/timespec again to avoid the
897                  *                multiplication?
898                  */
899                 xreq->elapsed += (tv.tv_sec - xreq->timestamp.tv_sec) * 1000000 
900                                                 + (tv.tv_usec - xreq->timestamp.tv_usec);
901
902         xreq->timestamp.tv_sec = tv.tv_sec;
903         xreq->timestamp.tv_usec = tv.tv_usec;
904 }
905
906 xserial xseg_submit (   struct xseg *xseg, uint32_t portno,
907                         struct xseg_request *xreq       )
908 {
909         xserial serial = NoSerial;
910         xqindex xqi;
911         struct xseg_port *port;
912         if (!__validate_port(xseg, portno))
913                 goto out;
914
915         __update_timestamp(xreq);
916
917         port = &xseg->ports[portno];
918         xqi = xreq - xseg->requests;
919         serial = xq_append_tail(&port->request_queue, xqi, portno);
920 out:
921         return serial;
922 }
923
924 struct xseg_request *xseg_receive(struct xseg *xseg, uint32_t portno)
925 {
926         xqindex xqi;
927         struct xseg_port *port;
928         if (!__validate_port(xseg, portno))
929                 return NULL;
930
931         port = &xseg->ports[portno];
932         xqi = xq_pop_head(&port->reply_queue, portno);
933         if (xqi == Noneidx)
934                 return NULL;
935
936         __update_timestamp(&xseg->requests[xqi]);
937
938         return xseg->requests + xqi;
939 }
940
941 struct xseg_request *xseg_accept(struct xseg *xseg, uint32_t portno)
942 {
943         xqindex xqi;
944         struct xseg_port *port;
945         if (!__validate_port(xseg, portno))
946                 return NULL;
947
948         port = &xseg->ports[portno];
949         xqi = xq_pop_head(&port->request_queue, portno);
950         if (xqi == Noneidx)
951                 return NULL;
952
953         return xseg->requests + xqi;
954 }
955
956 xserial xseg_respond (  struct xseg *xseg, uint32_t portno,
957                         struct xseg_request *xreq  )
958 {
959         xserial serial = NoSerial;
960         xqindex xqi;
961         struct xseg_port *port;
962         if (!__validate_port(xseg, portno))
963                 goto out;
964
965         port = &xseg->ports[portno];
966         xqi = xreq - xseg->requests;
967         serial = xq_append_tail(&port->reply_queue, xqi, portno);
968 out:
969         return serial;
970 }
971
972
973 struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req)
974 {
975         uint32_t portno, maxno, id = __get_id(), force;
976         struct xseg_port *port;
977
978         if (req >= xseg->config.nr_ports) {
979                 portno = 0;
980                 maxno = xseg->config.nr_ports;
981                 force = 0;
982         } else {
983                 portno = req;
984                 maxno = req + 1;
985                 force = 1;
986         }
987
988         __lock_segment(xseg);
989         for (; portno < maxno; portno++) {
990                 int64_t driver;
991                 port = &xseg->ports[portno];
992                 if (port->owner && !force)
993                         continue;
994                 driver = __enable_driver(xseg, &xseg->priv->peer_type);
995                 if (driver < 0)
996                         break;
997                 port->peer_type = (uint64_t)driver;
998                 port->owner = id;
999                 goto out;
1000         }
1001         port = NULL;
1002 out:
1003         __unlock_segment(xseg);
1004         return port;
1005 }
1006
1007
1008 int xseg_initialize(void)
1009 {
1010         return __xseg_preinit();        /* with or without lock ? */
1011 }
1012
1013 int xseg_finalize(void)
1014 {
1015         /* finalize not supported yet */
1016         return -1;
1017 }
1018
1019 #define X_ALLOC ((uint32_t) (1 << 0))
1020
1021 /*
1022  * xseg -> address of malloced struct xseg, each peer takes on join
1023  * segment -> address of mmapped segment
1024  */
1025
1026
1027 xptr xseg_get_obj(struct xseg_object_handler * obj_h, uint32_t flags)
1028 {
1029         struct xseg *segment = XPTR(obj_h->segment);
1030         struct xseg_object *obj;
1031         xptr list, objptr;
1032 retry:
1033         while (obj_h->list) {
1034                 list = obj_h->list;
1035                 obj = XSEG_TAKE_PTR(list, segment);
1036                 objptr = obj->next;
1037                 if (__sync_bool_compare_and_swap(&obj_h->list, list, objptr)) {
1038                         return list;
1039                 }
1040         }
1041         if (!(flags & X_ALLOC)) 
1042                 return 0;
1043         if (xlock_try_lock(&obj_h->lock, 1)) {
1044                 //allocate minimum 64 objects
1045                 xseg_alloc_obj(obj_h, 64);
1046                 xlock_release(&obj_h->lock);
1047         }
1048         goto retry;
1049 }
1050
1051 void xseg_put_obj(struct xseg_object_handler * obj_h, struct xseg_object *obj)
1052 {
1053         struct xseg *segment = XPTR(obj_h->segment);
1054         xptr list, objptr = XSEG_MAKE_PTR(obj, segment);
1055         do {
1056                 list = obj_h->list;
1057                 obj->next = list;
1058         } while(__sync_bool_compare_and_swap(&obj_h->list, list, objptr));
1059 }
1060
1061 uint64_t get_alloc_bytes(uint64_t bytes)
1062 {
1063         return __get_alloc_bytes(bytes) - sizeof(struct free_space_header);
1064 }
1065
1066 uint64_t __get_alloc_bytes(uint64_t bytes)
1067 {
1068         return __align(bytes + sizeof(struct free_space_header), 12);
1069 }
1070
1071 //should be called with object_handler lock held
1072 int xseg_alloc_obj(struct xseg_object_handler *obj_h, uint64_t nr)
1073 {
1074         struct xseg *segment = XPTR(&obj_h->segment);
1075         struct xseg_heap *heap = XSEG_TAKE_PTR(obj_h->heap, segment);
1076         uint64_t used, bytes = nr * obj_h->size;
1077         xptr objptr, mem = xseg_allocate(heap, bytes);
1078         struct xseg_object *obj;
1079         xhash_t *allocated = XSEG_TAKE_PTR(obj_h->allocated, segment);
1080         int r;
1081
1082         if (!xptr)
1083                 //alloc error
1084                 return -1;
1085         bytes = get_alloc_bytes(bytes);
1086         used = 0;
1087         while (used + obj_h->size < bytes) {
1088                 objptr = xptr+used;
1089                 obj = XSEG_TAKE_PTR(objptr, xseg);
1090                 used += obj_h->size;
1091                 obj->magic = obj_h->magic;
1092                 obj->size = obj_h->size;
1093                 obj->next = xptr + used; //point to the next obj
1094                 r = xhash_insert(allocated, objptr, objptr); //keep track of allocated objects
1095                 //ugly
1096                 if (r == -XHASH_ERESIZE) {
1097                         ul_t sizeshift = grow_size_shift(allocated);
1098                         uint64_t size;
1099                         xhash_t *new;
1100                         xptr newptr, oldptr;
1101                         size = xhash_get_alloc_size(sizeshift);
1102                         newptr = xseg_allocate(heap, size);
1103                         if (!newptr) {
1104                                 xseg_free(heap, xptr);
1105                                 return -1;
1106                         }
1107                         new = XSEG_TAKE_PTR(newptr, segment);
1108                         xhash_resize(allocated, sizeshift, new);
1109                         
1110                         oldptr = XSEG_MAKE_PTR(allocated, segment);
1111                         xseg_free(heap, oldptr);
1112                         allocated = new;
1113                         obj_h->allocated = XSEG_MAKE_PTR(allocated, segment);
1114                 }
1115         }
1116         obj->next = 0; //list is null terminated
1117         do {
1118                 //assert obj_h->list == 0
1119                 ojbptr = obj_h->list;
1120         }while(!__sync_bool_compare_and_swap(&obj_handler->list, objptr, xptr));
1121         return 0;
1122 }
1123
1124 xptr xseg_allocate(struct xseg_heap *heap, uint64_t bytes)
1125 {
1126         struct xseg *segment = XPTR(&heap->segment);
1127         struct xseg_free_space_header *fsh;
1128         xptr ret = 0;
1129
1130         bytes = __get_alloc_bytes(bytes);
1131         do {
1132                 if ((heap->cur - heap->start) > bytes)
1133                         return ret;
1134                 ret = xseg_heap->cur;
1135         } while (!__sync_bool_compare_and_swap(&heap->cur, ret, (xptr) cur + bytes));
1136
1137         fsh = (struct xseg_free_space_header *) XSEG_TAKE_PTR(segment, ret);
1138         fsh->size = bytes;
1139         ret += sizeof(struct xseg_free_space_header);
1140         return ret;
1141 }
1142
1143 void xseg_free(struct xseg_heap *heap, xptr ptr)
1144 {
1145         struct xseg *segment = XPTR(&heap->segment);
1146         struct xseg_free_space_header *fsh;
1147         uint64_t size = XSEG_TAKE_PTR(segment, ptr);
1148         //split space to objects
1149 }
1150
1151 int xseg_init_object_handler(struct xseg *xseg, struct xseg_object_handler *obj_h, 
1152                 uint32_t magic, uint64_t size, xptr heap)
1153 {
1154         struct xseg_heap *xheap = XSEG_TAKE_PTR(heap, xseg->segment);
1155         obj_h->magic = magic;
1156         obj_h->obj_size = size;
1157         //use 18 as min size shift for all new hashtables, cause we align 
1158         //memory to 4K. minsize 19 would give us two pages because of the
1159         //free memory header.
1160         mem = xseg_allocate(xheap, xhash_get_alloc_size(18));
1161         if (!mem)
1162                 return -1;
1163         xhash = XSEG_TAKE_PTR(mem, xseg->segment);
1164         xhash_init(xhash, 18);
1165         obj_h->allocated = mem;
1166         obj_h->list = 0;
1167         obj_h->flags = 0;
1168         obj_h->heap = heap;
1169         XPTRSET(&obj_h->segment, xseg->segment);
1170         xlock_release(&obj_h->lock);
1171         return 0;
1172 }
1173
1174 int xseg_init_port(struct xseg *xseg, struct xseg_port *port)
1175 {
1176         xptr mem;
1177         struct xseg_heap *heap = XSEG_TAKE_PTR(xseg->heap, xseg->segment);
1178         struct xq *q;
1179         char *buf;
1180         uint64_t bytes;
1181         //each port starts with minimum 512 requests;
1182         //TODO make it configurable
1183         //TODO since max number of requests is not fixed
1184         //      maybe we should make xqs expand when necessary
1185         uint64_t nr_reqs = 512;
1186
1187         //how many bytes to allocate for a queue
1188         bytes = sizeof(struct xq) + nr_reqs*sizeof(xqindex);
1189         mem = xseg_allocate(heap, bytes);
1190         if (!mem)
1191                 return -1;
1192         //how many did we got, and calculate what's left of buffer
1193         bytes = get_alloc_bytes(bytes) - sizeof(struct xq);
1194         port->free_queue = mem;
1195         //initialize queue with max nr it can hold
1196         q = (struct xq *)XSEG_TAKE_PTR(&port->free_queue, xseg->segment);
1197         buf = XSEG_TAKE_PTR(mem + sizeof(struct xq), xseg->segment);
1198         xq_init_empty(q, bytes/sizeof(xqindex), buf); 
1199
1200         //and for request queue
1201         bytes = sizeof(struct xq) + nr_reqs*sizeof(xqindex);
1202         mem = xseg_allocate(heap, bytes);
1203         if (!mem) 
1204                 goto err_req;
1205         bytes = get_alloc_bytes(bytes) - sizeof(struct xq);
1206         port->request_queue = mem;
1207         q = (struct xq *)XSEG_TAKE_PTR(&port->request_queue, xseg->segment);
1208         buf = XSEG_TAKE_PTR(mem + sizeof(struct xq), xseg->segment);
1209         xq_init_empty(q, bytes/sizeof(xqindex), buf); 
1210         
1211         //and for reply_queue
1212         bytes = sizeof(struct xq) + nr_reqs*sizeof(xqindex);
1213         mem = xseg_allocate(heap, bytes);
1214         if (!mem)
1215                 goto err_reply;
1216         bytes = get_alloc_bytes(bytes) - sizeof(struct xq);
1217         port->reply_queue = mem;
1218         q = (struct xq *)XSEG_TAKE_PTR(&port->reply_queue, xseg->segment);
1219         buf = XSEG_TAKE_PTR(mem + sizeof(struct xq), xseg->segment);
1220         xq_init_empty(q, bytes/sizeof(xqindex), buf);
1221
1222         return 0;
1223
1224 err_reply:
1225         xseg_free(heap, port->request_queue);
1226         port->request_queue = 0;
1227 err_req:
1228         xseg_free(heap, port->free_queue);
1229         port->free_queue = 0;
1230
1231         return -1;
1232         
1233 }
1234
1235 void xseg_put_port(struct xseg *xseg, struct xseg_port *port)
1236 {
1237         struct xseg_heap *heap = XSEG_TAKE_PTR(xseg->heap, xseg->segment);
1238
1239         if (port->request_queue) {
1240                 xseg_free(heap, port->request_queue);
1241                 port->request_queue = 0;
1242         }
1243         if (port->free_queue) {
1244                 xseg_free(heap, port->free_queue);
1245                 port->free_queue = 0;
1246         }
1247         if (port->reply_queue) {
1248                 xseg_free(heap, port->reply_queue);
1249                 port->reply_queue = 0;
1250         }
1251
1252         xseg_put_obj(obj_h, port);
1253 }
1254
1255 #ifdef __KERNEL__
1256 #include <linux/module.h>
1257 #include <xseg/xseg_exports.h>
1258 #endif
1259