2 #include <xseg/domain.h>
6 #define NULL ((void *)0)
9 #define XSEG_NR_TYPES 16
10 #define XSEG_NR_PEER_TYPES 64
11 #define XSEG_MIN_PAGE_SIZE 4096
13 static struct xseg_type *__types[XSEG_NR_TYPES];
14 static unsigned int __nr_types;
15 static struct xseg_peer *__peer_types[XSEG_NR_PEER_TYPES];
16 static unsigned int __nr_peer_types;
18 static void __lock_segment(struct xseg *xseg)
20 volatile uint64_t *flags;
21 flags = &xseg->shared->flags;
22 while (__sync_fetch_and_or(flags, XSEG_F_LOCK));
25 static void __unlock_segment(struct xseg *xseg)
27 volatile uint64_t *flags;
28 flags = &xseg->shared->flags;
29 __sync_fetch_and_and(flags, ~XSEG_F_LOCK);
32 static struct xseg_type *__find_type(const char *name, long *index)
35 for (i = 0; (*index = i) < __nr_types; i++)
36 if (!strncmp(__types[i]->name, name, XSEG_TNAMESIZE))
41 static struct xseg_peer *__find_peer_type(const char *name, int64_t *index)
44 for (i = 0; (*index = i) < __nr_peer_types; i++) {
45 if (!strncmp(__peer_types[i]->name, name, XSEG_TNAMESIZE))
46 return __peer_types[i];
51 void xseg_report_peer_types(void)
54 XSEGLOG("total %u peer types:\n", __nr_peer_types);
55 for (i = 0; i < __nr_peer_types; i++)
56 XSEGLOG("%ld: '%s'\n", i, __peer_types[i]->name);
59 static struct xseg_type *__find_or_load_type(const char *name)
62 struct xseg_type *type = __find_type(name, &i);
67 return __find_type(name, &i);
70 static struct xseg_peer *__find_or_load_peer_type(const char *name)
73 struct xseg_peer *peer_type = __find_peer_type(name, &i);
78 return __find_peer_type(name, &i);
81 static struct xseg_peer *__get_peer_type(struct xseg *xseg, uint32_t serial)
84 struct xseg_peer *type;
85 struct xseg_private *priv = xseg->priv;
86 char (*shared_peer_types)[XSEG_TNAMESIZE];
88 if (serial >= xseg->max_peer_types) {
89 XSEGLOG("invalid peer type serial %d >= %d\n",
90 serial, xseg->max_peer_types);
94 type = priv->peer_types[serial];
98 /* xseg->shared->peer_types is an append-only array,
99 * therefore this should be safe
100 * without either locking or string copying. */
101 shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
102 name = shared_peer_types[serial];
104 XSEGLOG("nonexistent peer type serial %d\n", serial);
108 type = __find_or_load_peer_type(name);
110 XSEGLOG("could not find driver for peer type %d [%s]\n",
113 priv->peer_types[serial] = type;
117 static void * __get_peer_type_data(struct xseg *xseg, uint32_t serial)
121 struct xseg_private *priv = xseg->priv;
122 char (*shared_peer_types)[XSEG_TNAMESIZE];
123 xptr *shared_peer_type_data;
125 if (serial >= xseg->max_peer_types) {
126 XSEGLOG("invalid peer type serial %d >= %d\n",
127 serial, xseg->max_peer_types);
131 data = priv->peer_type_data[serial];
135 shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
136 name = shared_peer_types[serial];
138 XSEGLOG("nonexistent peer type serial %d\n", serial);
141 shared_peer_type_data = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
143 priv->peer_type_data[serial] = XPTR_TAKE(shared_peer_type_data[serial], xseg->segment);
144 return priv->peer_type_data[serial];
147 static inline int __validate_port(struct xseg *xseg, uint32_t portno)
149 return portno < xseg->config.nr_ports;
152 static inline int __validate_ptr(struct xseg *xseg, xptr ptr)
154 return ptr < xseg->segment_size;
157 /* type:name:nr_ports:nr_requests:request_size:extra_size:page_shift */
159 #define TOK(s, sp, def) \
176 static unsigned long strul(char *s)
180 unsigned char c = *s - '0';
190 static char *strncopy(char *dest, const char *src, uint32_t n)
194 for (i = 0; i < n; i++) {
205 int xseg_parse_spec(char *segspec, struct xseg_config *config)
207 /* default: "posix:globalxseg:4:256:12" */
208 char *s = segspec, *sp = segspec;
212 strncpy(config->type, s, XSEG_TNAMESIZE);
213 config->type[XSEG_TNAMESIZE-1] = 0;
216 TOK(s, sp, "globalxseg");
217 strncpy(config->name, s, XSEG_NAMESIZE);
218 config->name[XSEG_NAMESIZE-1] = 0;
222 config->nr_ports = strul(s);
226 config->heap_size = (uint64_t) (strul(s) * 1024UL * 1024UL);
230 config->page_shift = strul(s);
234 int xseg_register_type(struct xseg_type *type)
238 struct xseg_type *__type;
240 __type = __find_type(type->name, &i);
242 XSEGLOG("type %s already exists\n", type->name);
246 if (__nr_types >= XSEG_NR_TYPES) {
247 XSEGLOG("maximum type registrations reached: %u\n", __nr_types);
252 type->name[XSEG_TNAMESIZE-1] = 0;
253 __types[__nr_types] = type;
261 int xseg_unregister_type(const char *name)
265 struct xseg_type *__type;
267 __type = __find_type(name, &i);
269 XSEGLOG("segment type '%s' does not exist\n", name);
274 __types[i] = __types[__nr_types];
275 __types[__nr_types] = NULL;
282 int xseg_register_peer(struct xseg_peer *peer_type)
286 struct xseg_peer *type;
288 type = __find_peer_type(peer_type->name, &i);
290 XSEGLOG("peer type '%s' already exists\n", type->name);
294 if (__nr_peer_types >= XSEG_NR_PEER_TYPES) {
295 XSEGLOG("maximum peer type registrations reached: %u",
301 if (peer_type->peer_ops.remote_signal_init()) {
302 XSEGLOG("peer type '%s': signal initialization failed\n",
308 peer_type->name[XSEG_TNAMESIZE-1] = 0;
309 __peer_types[__nr_peer_types] = peer_type;
310 __nr_peer_types += 1;
318 int xseg_unregister_peer(const char *name)
321 struct xseg_peer *driver;
324 driver = __find_peer_type(name, &i);
326 XSEGLOG("peer type '%s' does not exist\n", name);
330 __nr_peer_types -= 1;
331 __peer_types[i] = __peer_types[__nr_peer_types];
332 __peer_types[__nr_peer_types] = NULL;
333 driver->peer_ops.remote_signal_quit();
340 int64_t __enable_driver(struct xseg *xseg, struct xseg_peer *driver)
343 char (*drivers)[XSEG_TNAMESIZE];
345 uint32_t max_drivers = xseg->max_peer_types;
349 if (xseg->shared->nr_peer_types >= max_drivers) {
350 XSEGLOG("cannot register '%s': driver namespace full\n",
355 drivers = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
356 for (r = 0; r < max_drivers; r++) {
359 if (!strncmp(drivers[r], driver->name, XSEG_TNAMESIZE)){
360 data = __get_peer_type_data(xseg, r);
369 /* assert(xseg->shared->nr_peer_types == r); */
370 data = driver->peer_ops.alloc_data(xseg);
373 peer_type_data = XPTR_MAKE(data, xseg->segment);
374 ptd = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
375 ptd[r] = peer_type_data;
376 xseg->shared->nr_peer_types = r + 1;
377 strncpy(drivers[r], driver->name, XSEG_TNAMESIZE);
378 drivers[r][XSEG_TNAMESIZE-1] = 0;
381 xseg->priv->peer_types[r] = driver;
382 xseg->priv->peer_type_data[r] = data;
386 int64_t xseg_enable_driver(struct xseg *xseg, const char *name)
389 struct xseg_peer *driver;
392 driver = __find_peer_type(name, &r);
394 XSEGLOG("driver '%s' not found\n", name);
398 __lock_segment(xseg);
399 r = __enable_driver(xseg, driver);
400 __unlock_segment(xseg);
406 int xseg_disable_driver(struct xseg *xseg, const char *name)
410 struct xseg_private *priv = xseg->priv;
411 struct xseg_peer *driver;
413 driver = __find_peer_type(name, &i);
415 XSEGLOG("driver '%s' not found\n", name);
419 for (i = 0; i < xseg->max_peer_types; i++)
420 if (priv->peer_types[i] == driver)
421 priv->peer_types[i] = NULL;
428 /* NOTE: calculate_segment_size() and initialize_segment()
429 * must always be exactly in sync!
432 static uint64_t calculate_segment_size(struct xseg_config *config)
435 uint32_t page_size, page_shift = config->page_shift;
437 /* assert(sizeof(struct xseg) <= (1 << 9)); */
439 if (page_shift < 9) {
440 XSEGLOG("page_shift must be >= %d\n", 9);
444 page_size = 1 << page_shift;
446 /* struct xseg itself + struct xheap */
447 size += 2*page_size + config->heap_size;
448 size = __align(size, page_shift);
453 static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
455 uint32_t page_shift = cfg->page_shift, page_size = 1 << page_shift;
456 struct xseg_shared *shared;
457 char *segment = (char *)xseg;
458 uint64_t size = page_size, i;
461 struct xobject_h *obj_h;
467 if (page_size < XSEG_MIN_PAGE_SIZE)
470 xseg->segment_size = 2 * page_size + cfg->heap_size;
471 xseg->segment = (struct xseg *) segment;
474 xseg->heap = (struct xheap *) XPTR_MAKE(segment + size, segment);
475 size += sizeof(struct xheap);
476 size = __align(size, page_shift);
478 heap = XPTR_TAKE(xseg->heap, segment);
479 r = xheap_init(heap, cfg->heap_size, page_shift, segment+size);
483 /* build object_handler handler */
484 mem = xheap_allocate(heap, sizeof(struct xobject_h));
487 xseg->object_handlers = (struct xobject_h *) XPTR_MAKE(mem, segment);
489 r = xobj_handler_init(obj_h, segment, MAGIC_OBJH,
490 sizeof(struct xobject_h), heap);
494 //now that we have object handlers handler, use that to allocate
495 //new object handlers
497 //allocate requests handler
498 mem = xobj_get_obj(obj_h, X_ALLOC);
502 r = xobj_handler_init(obj_h, segment, MAGIC_REQ,
503 sizeof(struct xseg_request), heap);
506 xseg->request_h = (struct xobject_h *) XPTR_MAKE(obj_h, segment);
508 //allocate ports handler
509 obj_h = XPTR_TAKE(xseg->object_handlers, segment);
510 mem = xobj_get_obj(obj_h, X_ALLOC);
514 r = xobj_handler_init(obj_h, segment, MAGIC_PORT,
515 sizeof(struct xseg_port), heap);
518 xseg->port_h = (struct xobject_h *) XPTR_MAKE(mem, segment);
520 //allocate xptr port array to be used as a map
521 //portno <--> xptr port
522 mem = xheap_allocate(heap, sizeof(xptr)*cfg->nr_ports);
526 for (i = 0; i < cfg->nr_ports; i++) {
529 xseg->ports = (xptr *) XPTR_MAKE(mem, segment);
531 //allocate {src,dst} gws
532 mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
536 for (i = 0; i < cfg->nr_ports; i++) {
539 xseg->src_gw = (xport *) XPTR_MAKE(mem, segment);
541 mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
545 for (i = 0; i < cfg->nr_ports; i++) {
548 xseg->dst_gw = (xport *) XPTR_MAKE(mem, segment);
550 //allocate xseg_shared memory
551 mem = xheap_allocate(heap, sizeof(struct xseg_shared));
554 shared = (struct xseg_shared *) mem;
556 shared->nr_peer_types = 0;
557 xseg->shared = (struct xseg_shared *) XPTR_MAKE(mem, segment);
559 mem = xheap_allocate(heap, page_size);
562 shared->peer_types = (char **) XPTR_MAKE(mem, segment);
563 xseg->max_peer_types = xheap_get_chunk_size(mem) / XSEG_TNAMESIZE;
564 mem = xheap_allocate(heap, xseg->max_peer_types * sizeof(xptr));
567 memset(mem, 0, xheap_get_chunk_size(mem));
568 shared->peer_type_data = (xptr *) XPTR_MAKE(mem, segment);
570 memcpy(&xseg->config, cfg, sizeof(struct xseg_config));
572 xseg->counters.req_cnt = 0;
573 xseg->counters.avg_req_lat = 0;
578 int xseg_create(struct xseg_config *cfg)
580 struct xseg *xseg = NULL;
581 struct xseg_type *type;
582 struct xseg_operations *xops;
586 type = __find_or_load_type(cfg->type);
588 cfg->type[XSEG_TNAMESIZE-1] = 0;
589 XSEGLOG("type '%s' does not exist\n", cfg->type);
593 size = calculate_segment_size(cfg);
595 XSEGLOG("invalid config!\n");
600 cfg->name[XSEG_NAMESIZE-1] = 0;
601 XSEGLOG("creating segment of size %llu\n", size);
602 r = xops->allocate(cfg->name, size);
604 XSEGLOG("cannot allocate segment!\n");
608 xseg = xops->map(cfg->name, size, NULL);
610 XSEGLOG("cannot map segment!\n");
614 r = initialize_segment(xseg, cfg);
615 xops->unmap(xseg, size);
617 XSEGLOG("cannot initilize segment!\n");
625 xops->deallocate(cfg->name);
630 void xseg_destroy(struct xseg *xseg)
632 struct xseg_type *type;
635 type = __find_or_load_type(xseg->config.type);
637 XSEGLOG("no segment type '%s'\n", xseg->config.type);
641 /* should destroy() leave() first? */
642 type->ops.deallocate(xseg->config.name);
648 static int pointer_ok( unsigned long ptr,
653 int ret = !(ptr >= base && ptr < base + size);
655 XSEGLOG("invalid pointer '->%s' [%llx on %llx]!\n",
656 (unsigned long long)ptr,
657 (unsigned long long)base,
662 #define POINTER_OK(xseg, field, base) \
663 pointer_ok( (unsigned long)((xseg)->field), \
664 (unsigned long)(base), \
665 (xseg)->segment_size, \
668 static int xseg_validate_pointers(struct xseg *xseg)
671 r += POINTER_OK(xseg, object_handlers, xseg->segment);
672 r += POINTER_OK(xseg, request_h, xseg->segment);
673 r += POINTER_OK(xseg, port_h, xseg->segment);
674 r += POINTER_OK(xseg, ports, xseg->segment);
675 r += POINTER_OK(xseg, heap, xseg->segment);
676 r += POINTER_OK(xseg, shared, xseg->segment);
680 struct xseg *xseg_join( char *segtypename,
686 struct xseg *xseg, *__xseg;
688 struct xseg_peer *peertype;
689 struct xseg_type *segtype;
690 struct xseg_private *priv;
691 struct xseg_operations *xops;
692 struct xseg_peer_operations *pops;
697 peertype = __find_or_load_peer_type(peertypename);
699 XSEGLOG("Peer type '%s' not found\n", peertypename);
704 segtype = __find_or_load_type(segtypename);
706 XSEGLOG("Segment type '%s' not found\n", segtypename);
713 xops = &segtype->ops;
714 pops = &peertype->peer_ops;
716 xseg = pops->malloc(sizeof(struct xseg));
718 XSEGLOG("Cannot allocate memory");
722 priv = pops->malloc(sizeof(struct xseg_private));
724 XSEGLOG("Cannot allocate memory");
728 __xseg = xops->map(segname, XSEG_MIN_PAGE_SIZE, NULL);
730 XSEGLOG("Cannot map segment");
734 size = __xseg->segment_size;
735 /* XSEGLOG("joined segment of size: %lu\n", (unsigned long)size); */
736 xops->unmap(__xseg, XSEG_MIN_PAGE_SIZE);
738 __xseg = xops->map(segname, size, xseg);
740 XSEGLOG("Cannot map segment");
744 priv->segment_type = *segtype;
745 priv->peer_type = *peertype;
746 priv->wakeup = wakeup;
747 priv->req_data = xhash_new(3, INTEGER); //FIXME should be relative to XSEG_DEF_REQS
750 xlock_release(&priv->reqdatalock);
752 xseg->max_peer_types = __xseg->max_peer_types;
754 priv->peer_types = pops->malloc(sizeof(void *) * xseg->max_peer_types);
755 if (!priv->peer_types) {
756 XSEGLOG("Cannot allocate memory");
759 memset(priv->peer_types, 0, sizeof(void *) * xseg->max_peer_types);
760 priv->peer_type_data = pops->malloc(sizeof(void *) * xseg->max_peer_types);
761 if (!priv->peer_types) {
762 XSEGLOG("Cannot allocate memory");
763 //FIXME wrong err handling
766 memset(priv->peer_type_data, 0, sizeof(void *) * xseg->max_peer_types);
769 xseg->config = __xseg->config;
770 xseg->version = __xseg->version;
771 xseg->request_h = XPTR_TAKE(__xseg->request_h, __xseg);
772 xseg->port_h = XPTR_TAKE(__xseg->port_h, __xseg);
773 xseg->ports = XPTR_TAKE(__xseg->ports, __xseg);
774 xseg->src_gw = XPTR_TAKE(__xseg->src_gw, __xseg);
775 xseg->dst_gw = XPTR_TAKE(__xseg->dst_gw, __xseg);
776 xseg->heap = XPTR_TAKE(__xseg->heap, __xseg);
777 xseg->object_handlers = XPTR_TAKE(__xseg->object_handlers, __xseg);
778 xseg->shared = XPTR_TAKE(__xseg->shared, __xseg);
779 xseg->segment_size = size;
780 xseg->segment = __xseg;
782 r = xseg_validate_pointers(xseg);
784 XSEGLOG("found %d invalid xseg pointers!\n", r);
789 r = xops->signal_join(xseg);
791 XSEGLOG("Cannot attach signaling to segment! (error: %d)\n", r);
799 pops->mfree(priv->peer_types);
801 xops->unmap(__xseg, size);
802 xhash_free(priv->req_data);
811 void xseg_leave(struct xseg *xseg)
813 struct xseg_type *type;
816 type = __find_or_load_type(xseg->config.type);
818 XSEGLOG("no segment type '%s'\n", xseg->config.type);
824 type->ops.unmap(xseg->segment, xseg->segment_size);
828 struct xseg_port* xseg_get_port(struct xseg *xseg, uint32_t portno)
831 if (!__validate_port(xseg, portno))
833 p = xseg->ports[portno];
835 return XPTR_TAKE(p, xseg->segment);
840 struct xq * __alloc_queue(struct xseg *xseg, uint64_t nr_reqs)
845 struct xheap *heap = xseg->heap;
847 //how many bytes to allocate for a queue
848 bytes = sizeof(struct xq) + nr_reqs*sizeof(xqindex);
849 mem = xheap_allocate(heap, bytes);
853 //how many bytes did we got, and calculate what's left of buffer
854 bytes = xheap_get_chunk_size(mem) - sizeof(struct xq);
856 //initialize queue with max nr of elements it can hold
857 q = (struct xq *) mem;
858 buf = (void *) (((unsigned long) mem) + sizeof(struct xq));
859 xq_init_empty(q, bytes/sizeof(xqindex), buf);
864 struct xseg_port *xseg_alloc_port(struct xseg *xseg, uint32_t flags, uint64_t nr_reqs)
867 struct xobject_h *obj_h = xseg->port_h;
868 struct xseg_port *port = xobj_get_obj(obj_h, flags);
873 q = __alloc_queue(xseg, nr_reqs);
876 port->free_queue = XPTR_MAKE(q, xseg->segment);
878 //and for request queue
879 q = __alloc_queue(xseg, nr_reqs);
882 port->request_queue = XPTR_MAKE(q, xseg->segment);
884 //and for reply queue
885 q = __alloc_queue(xseg, nr_reqs);
888 port->reply_queue = XPTR_MAKE(q, xseg->segment);
890 xlock_release(&port->fq_lock);
891 xlock_release(&port->rq_lock);
892 xlock_release(&port->pq_lock);
893 xlock_release(&port->port_lock);
894 port->owner = 0; //should be Noone;
895 port->portno = 0; // should be Noport;
896 port->peer_type = 0; //FIXME what here ??
897 port->alloc_reqs = 0;
898 port->max_alloc_reqs = 512; //FIXME
904 xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
905 port->request_queue = 0;
907 xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
908 port->free_queue = 0;
910 xobj_put_obj(obj_h, port);
916 void xseg_free_port(struct xseg *xseg, struct xseg_port *port)
918 struct xobject_h *obj_h = xseg->port_h;
920 if (port->request_queue) {
921 xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
922 port->request_queue = 0;
924 if (port->free_queue) {
925 xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
926 port->free_queue = 0;
928 if (port->reply_queue) {
929 xheap_free(XPTR_TAKE(port->reply_queue, xseg->segment));
930 port->reply_queue = 0;
932 xobj_put_obj(obj_h, port);
935 void* xseg_alloc_buffer(struct xseg *xseg, uint64_t size)
937 struct xheap *heap = xseg->heap;
938 void *mem = xheap_allocate(heap, size);
939 if (mem && xheap_get_chunk_size(mem) < size) {
940 XSEGLOG("Buffer size %llu instead of %llu\n",
941 xheap_get_chunk_size(mem), size);
948 void xseg_free_buffer(struct xseg *xseg, void *ptr)
953 int xseg_prepare_wait(struct xseg *xseg, uint32_t portno)
955 if (!__validate_port(xseg, portno))
958 return xseg->priv->peer_type.peer_ops.prepare_wait(xseg, portno);
961 int xseg_cancel_wait(struct xseg *xseg, uint32_t portno)
963 if (!__validate_port(xseg, portno))
965 return xseg->priv->peer_type.peer_ops.cancel_wait(xseg, portno);
968 int xseg_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
970 return xseg->priv->peer_type.peer_ops.wait_signal(xseg, usec_timeout);
973 int xseg_signal(struct xseg *xseg, xport portno)
975 struct xseg_peer *type;
976 struct xseg_port *port = xseg_get_port(xseg, portno);
980 type = __get_peer_type(xseg, port->peer_type);
984 return type->peer_ops.signal(xseg, portno);
987 int xseg_init_local_signal(struct xseg *xseg, xport portno)
989 struct xseg_peer *type;
990 struct xseg_port *port = xseg_get_port(xseg, portno);
994 type = __get_peer_type(xseg, port->peer_type);
998 return type->peer_ops.local_signal_init();
1001 void xseg_quit_local_signal(struct xseg *xseg, xport portno)
1003 struct xseg_peer *type;
1004 struct xseg_port *port = xseg_get_port(xseg, portno);
1008 type = __get_peer_type(xseg, port->peer_type);
1012 type->peer_ops.local_signal_quit();
1015 //FIXME doesn't increase alloced reqs
1016 //is integer i enough here?
1017 int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr)
1022 struct xseg_request *req;
1023 struct xseg_port *port = xseg_get_port(xseg, portno);
1027 q = XPTR_TAKE(port->free_queue, xseg->segment);
1028 while ((req = xobj_get_obj(xseg->request_h, X_ALLOC)) != NULL && i < nr) {
1029 xqi = XPTR_MAKE(req, xseg->segment);
1030 xqi = xq_append_tail(q, xqi, portno);
1031 if (xqi == Noneidx) {
1032 xobj_put_obj(xseg->request_h, req);
1043 int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
1048 struct xseg_request *req;
1049 struct xseg_port *port = xseg_get_port(xseg, portno);
1053 q = XPTR_TAKE(port->free_queue, xseg->segment);
1054 while ((xqi = xq_pop_head(q, portno)) != Noneidx && i < nr) {
1055 req = XPTR_TAKE(xqi, xseg->segment);
1056 xobj_put_obj(xseg->request_h, (void *) req);
1061 xlock_acquire(&port->port_lock, portno);
1062 port->alloc_reqs -= i;
1063 xlock_release(&port->port_lock);
1068 int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
1069 uint32_t src_portno, uint32_t dst_portno)
1071 if (!__validate_port(xseg, src_portno))
1074 if (!__validate_port(xseg, dst_portno))
1077 xreq->src_portno = src_portno;
1078 xreq->src_transit_portno = src_portno;
1079 xreq->dst_portno = dst_portno;
1080 xreq->dst_transit_portno = dst_portno;
1085 struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
1086 xport dst_portno, uint32_t flags)
1090 * X_ALLOC Allocate more requests if object handler
1091 * does not have any avaiable
1092 * X_LOCAL Use only local - preallocated reqs
1093 * (Maybe we want this as default, to give a hint to a peer
1094 * how many requests it can have flying)
1096 struct xseg_request *req = NULL;
1097 struct xseg_port *port;
1102 port = xseg_get_port(xseg, src_portno);
1105 //try to allocate from free_queue
1106 q = XPTR_TAKE(port->free_queue, xseg->segment);
1107 xqi = xq_pop_head(q, src_portno);
1108 if (xqi != Noneidx){
1110 req = XPTR_TAKE(ptr, xseg->segment);
1114 if (flags & X_LOCAL)
1117 //else try to allocate from global heap
1119 xlock_acquire(&port->port_lock, src_portno);
1120 if (port->alloc_reqs < port->max_alloc_reqs) {
1121 req = xobj_get_obj(xseg->request_h, flags & X_ALLOC);
1125 xlock_release(&port->port_lock);
1137 if (xseg_prep_ports(xseg, req, src_portno, dst_portno) < 0) {
1138 xseg_put_request(xseg, req, src_portno);
1143 req->timestamp.tv_sec = 0;
1144 req->timestamp.tv_usec = 0;
1146 xq_init_empty(&req->path, MAX_PATH_LEN, req->path_bufs);
1151 int xseg_put_request (struct xseg *xseg, struct xseg_request *xreq,
1154 xqindex xqi = XPTR_MAKE(xreq, xseg->segment);
1156 struct xseg_port *port = xseg_get_port(xseg, xreq->src_portno);
1161 void *ptr = XPTR_TAKE(xreq->buffer, xseg->segment);
1162 xseg_free_buffer(xseg, ptr);
1165 xq_init_empty(&xreq->path, MAX_PATH_LEN, xreq->path_bufs);
1168 xreq->bufferlen = 0;
1172 xreq->targetlen = 0;
1174 xreq->src_portno = NoPort;
1175 xreq->dst_portno = NoPort;
1176 xreq->src_transit_portno = NoPort;
1177 xreq->dst_transit_portno = NoPort;
1179 if (xreq->elapsed != 0) {
1180 __lock_segment(xseg);
1181 ++(xseg->counters.req_cnt);
1182 xseg->counters.avg_req_lat += xreq->elapsed;
1183 __unlock_segment(xseg);
1187 //try to put it in free_queue of the port
1188 q = XPTR_TAKE(port->free_queue, xseg->segment);
1189 xqi = xq_append_head(q, xqi, portno);
1192 //else return it to segment
1193 xobj_put_obj(xseg->request_h, (void *) xreq);
1194 xlock_acquire(&port->port_lock, portno);
1196 xlock_release(&port->port_lock);
1200 int xseg_prep_request ( struct xseg* xseg, struct xseg_request *req,
1201 uint32_t targetlen, uint64_t datalen )
1203 uint64_t bufferlen = targetlen + datalen;
1207 buf = xseg_alloc_buffer(xseg, bufferlen);
1210 req->bufferlen = xheap_get_chunk_size(buf);
1211 req->buffer = XPTR_MAKE(buf, xseg->segment);
1213 req->data = req->buffer;
1214 req->target = req->buffer + req->bufferlen - targetlen;
1215 req->datalen = datalen;
1216 req->targetlen = targetlen;
1220 int xseg_resize_request (struct xseg *xseg, struct xseg_request *req,
1221 uint32_t new_targetlen, uint64_t new_datalen)
1223 if (req->bufferlen >= new_datalen + new_targetlen) {
1224 req->data = req->buffer;
1225 req->target = req->buffer + req->bufferlen - new_targetlen;
1226 req->datalen = new_datalen;
1227 req->targetlen = new_targetlen;
1232 void *ptr = XPTR_TAKE(req->buffer, xseg->segment);
1233 xseg_free_buffer(xseg, ptr);
1237 return xseg_prep_request(xseg, req, new_targetlen, new_datalen);
1240 static void __update_timestamp(struct xseg_request *xreq)
1244 __get_current_time(&tv);
1245 if (xreq->timestamp.tv_sec != 0)
1247 * FIXME: Make xreq->elapsed timeval/timespec again to avoid the
1250 xreq->elapsed += (tv.tv_sec - xreq->timestamp.tv_sec) * 1000000
1251 + (tv.tv_usec - xreq->timestamp.tv_usec);
1253 xreq->timestamp.tv_sec = tv.tv_sec;
1254 xreq->timestamp.tv_usec = tv.tv_usec;
1257 xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
1258 xport portno, uint32_t flags)
1260 xserial serial = NoSerial;
1262 struct xq *q, *newq;
1264 struct xseg_port *port;
1266 /* discover next and current ports */
1267 if (!__validate_port(xseg, xreq->src_transit_portno)){
1268 XSEGLOG("couldn't validate src_transit_portno");
1271 next = xseg->src_gw[xreq->src_transit_portno];
1272 if (next != xreq->src_portno) {
1273 cur = xreq->src_transit_portno;
1277 if (!__validate_port(xseg, xreq->dst_transit_portno)){
1278 XSEGLOG("couldn't validate dst_transit_portno");
1281 next = xseg->dst_gw[xreq->dst_transit_portno];
1282 if (xreq->dst_transit_portno == xreq->dst_portno)
1283 cur = xreq->src_transit_portno;
1285 cur = xreq->dst_transit_portno;
1289 port = xseg_get_port(xseg, next);
1291 XSEGLOG("couldnt get port (next :%u)", next);
1295 __update_timestamp(xreq);
1297 xqi = XPTR_MAKE(xreq, xseg->segment);
1299 /* add current port to path */
1300 serial = __xq_append_head(&xreq->path, cur);
1301 if (serial == Noneidx){
1302 XSEGLOG("couldn't append path head");
1306 xlock_acquire(&port->rq_lock, portno);
1307 q = XPTR_TAKE(port->request_queue, xseg->segment);
1308 serial = __xq_append_tail(q, xqi);
1309 if (flags & X_ALLOC && serial == Noneidx) {
1310 /* double up queue size */
1311 XSEGLOG("trying to double up queue");
1312 newq = __alloc_queue(xseg, xq_size(q)*2);
1315 r = __xq_resize(q, newq);
1320 port->request_queue = XPTR_MAKE(newq, xseg->segment);
1322 serial = __xq_append_tail(newq, xqi);
1326 xlock_release(&port->rq_lock);
1327 if (serial == Noneidx){
1328 XSEGLOG("couldn't append request to queue");
1329 __xq_pop_head(&xreq->path);
1337 struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags)
1340 xserial serial = NoSerial;
1342 struct xseg_request *req;
1343 struct xseg_port *port = xseg_get_port(xseg, portno);
1347 if (flags & X_NONBLOCK) {
1348 if (!xlock_try_lock(&port->pq_lock, portno))
1351 xlock_acquire(&port->pq_lock, portno);
1353 q = XPTR_TAKE(port->reply_queue, xseg->segment);
1354 xqi = __xq_pop_head(q);
1355 xlock_release(&port->pq_lock);
1360 req = XPTR_TAKE(xqi, xseg->segment);
1361 __update_timestamp(req);
1362 serial = __xq_pop_head(&req->path);
1363 if (serial == Noneidx){
1364 /* this should never happen */
1365 XSEGLOG("pop head of path queue returned Noneidx\n");
1373 struct xseg_request *xseg_accept(struct xseg *xseg, xport portno, uint32_t flags)
1377 struct xseg_request *req;
1378 struct xseg_port *port = xseg_get_port(xseg, portno);
1381 if (flags & X_NONBLOCK) {
1382 if (!xlock_try_lock(&port->rq_lock, portno))
1385 xlock_acquire(&port->rq_lock, portno);
1388 q = XPTR_TAKE(port->request_queue, xseg->segment);
1389 xqi = __xq_pop_head(q);
1390 xlock_release(&port->rq_lock);
1394 req = XPTR_TAKE(xqi, xseg->segment);
1396 if (xseg->src_gw[req->src_transit_portno] == portno)
1397 req->src_transit_portno = portno;
1399 req->dst_transit_portno = portno;
1405 xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
1406 xport portno, uint32_t flags)
1408 xserial serial = NoSerial;
1410 struct xq *q, *newq;
1411 struct xseg_port *port;
1414 serial = __xq_peek_head(&xreq->path);
1415 if (serial == Noneidx)
1417 dst = (xport) serial;
1419 port = xseg_get_port(xseg, dst);
1423 xqi = XPTR_MAKE(xreq, xseg->segment);
1425 xlock_acquire(&port->pq_lock, portno);
1426 q = XPTR_TAKE(port->reply_queue, xseg->segment);
1427 serial = __xq_append_tail(q, xqi);
1428 if (flags & X_ALLOC && serial == Noneidx) {
1429 newq = __alloc_queue(xseg, xq_size(q)*2);
1432 r = __xq_resize(q, newq);
1437 port->reply_queue = XPTR_MAKE(newq, xseg->segment);
1439 serial = __xq_append_tail(newq, xqi);
1443 xlock_release(&port->pq_lock);
1445 if (serial == Noneidx)
1451 xport xseg_set_srcgw(struct xseg *xseg, xport portno, xport srcgw)
1453 if (!__validate_port(xseg, portno))
1455 xseg->src_gw[portno] = srcgw;
1459 xport xseg_getandset_srcgw(struct xseg *xseg, xport portno, xport srcgw)
1463 prev_portno = xseg->src_gw[portno];
1464 xseg->src_gw[srcgw] = prev_portno;
1465 }while(!(__sync_bool_compare_and_swap(&xseg->src_gw[portno], prev_portno, srcgw)));
1469 xport xseg_set_dstgw(struct xseg *xseg, xport portno, xport dstgw)
1471 if (!__validate_port(xseg, portno))
1473 xseg->dst_gw[portno] = dstgw;
1477 xport xseg_getandset_dstgw(struct xseg *xseg, xport portno, xport dstgw)
1481 prev_portno = xseg->dst_gw[portno];
1482 xseg->dst_gw[dstgw] = prev_portno;
1483 }while(!(__sync_bool_compare_and_swap(&xseg->dst_gw[portno], prev_portno, dstgw)));
1487 int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data)
1492 xlock_acquire(&xseg->priv->reqdatalock, 1);
1494 req_data = xseg->priv->req_data;
1495 r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1496 if (r == -XHASH_ERESIZE) {
1497 req_data = xhash_resize(req_data, xhash_grow_size_shift(req_data), NULL);
1499 xseg->priv->req_data = req_data;
1500 r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1504 xlock_release(&xseg->priv->reqdatalock);
1508 int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data)
1514 xlock_acquire(&xseg->priv->reqdatalock, 1);
1516 req_data = xseg->priv->req_data;
1517 //maybe we need a xhash_delete with lookup...
1518 //maybe we also need a delete that doesn't shrink xhash
1519 r = xhash_lookup(req_data, (xhashidx) xreq, &val);
1520 *data = (void *) val;
1522 r = xhash_delete(req_data, (xhashidx) xreq);
1523 if (r == -XHASH_ERESIZE) {
1524 req_data = xhash_resize(req_data, xhash_shrink_size_shift(req_data), NULL);
1526 xseg->priv->req_data = req_data;
1527 r = xhash_delete(req_data, (xhashidx) xreq);
1532 xlock_release(&xseg->priv->reqdatalock);
1536 struct xobject_h * xseg_get_objh(struct xseg *xseg, uint32_t magic, uint64_t size)
1539 struct xobject_h *obj_h = xobj_get_obj(xseg->object_handlers, X_ALLOC);
1542 r = xobj_handler_init(obj_h, xseg->segment, magic, size, xseg->heap);
1544 xobj_put_obj(xseg->object_handlers, obj_h);
1550 void xseg_put_objh(struct xseg *xseg, struct xobject_h *objh)
1552 xobj_put_obj(xseg->object_handlers, objh);
1557 int xseg_complete_req(struct xseg_request *req)
1559 req->state |= XS_SERVED;
1560 req->state &= ~XS_FAILED;
1563 int xseg_fail_req(struct xseg_request *req)
1565 req->state &= ~XS_SERVED;
1566 req->state |= XS_FAILED;
1570 struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req, void * sd)
1572 uint32_t portno, maxno, id = __get_id(), force;
1573 struct xseg_port *port = NULL;
1575 if (req >= xseg->config.nr_ports) {
1577 maxno = xseg->config.nr_ports;
1585 __lock_segment(xseg);
1586 for (; portno < maxno; portno++) {
1588 if (!xseg->ports[portno]) {
1589 port = xseg_alloc_port(xseg, X_ALLOC, XSEG_DEF_REQS);
1593 port = xseg_get_port(xseg, portno);
1599 driver = __enable_driver(xseg, &xseg->priv->peer_type);
1603 void *peer_data = __get_peer_type_data(xseg, (uint64_t) driver);
1606 void *sigdesc = xseg->priv->peer_type.peer_ops.alloc_signal_desc(xseg, peer_data);
1609 int r = xseg->priv->peer_type.peer_ops.init_signal_desc(xseg, sigdesc);
1611 xseg->priv->peer_type.peer_ops.free_signal_desc(xseg, peer_data, sigdesc);
1614 port->signal_desc = XPTR_MAKE(sigdesc, xseg->segment);
1616 port->signal_desc = XPTR_MAKE(sd, xseg->segment);
1618 port->peer_type = (uint64_t)driver;
1620 port->portno = portno;
1621 xseg->ports[portno] = XPTR_MAKE(port, xseg->segment);
1625 xseg_free_port(xseg, port);
1629 __unlock_segment(xseg);
1633 int xseg_leave_port(struct xseg *xseg, struct xseg_port *port)
1635 /* To be implemented */
1639 int xseg_initialize(void)
1641 return __xseg_preinit(); /* with or without lock ? */
1644 int xseg_finalize(void)
1646 /* finalize not supported yet */
1652 #include <linux/module.h>
1653 #include <xseg/xseg_exports.h>