2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
35 #include <xseg/xseg.h>
36 #include <xseg/domain.h>
40 #define NULL ((void *)0)
43 #define XSEG_NR_TYPES 16
44 #define XSEG_NR_PEER_TYPES 64
45 #define XSEG_MIN_PAGE_SIZE 4096
47 static struct xseg_type *__types[XSEG_NR_TYPES];
48 static unsigned int __nr_types;
49 static struct xseg_peer *__peer_types[XSEG_NR_PEER_TYPES];
50 static unsigned int __nr_peer_types;
52 static void __lock_segment(struct xseg *xseg)
54 volatile uint64_t *flags;
55 flags = &xseg->shared->flags;
56 while (__sync_fetch_and_or(flags, XSEG_F_LOCK));
59 static void __unlock_segment(struct xseg *xseg)
61 volatile uint64_t *flags;
62 flags = &xseg->shared->flags;
63 __sync_fetch_and_and(flags, ~XSEG_F_LOCK);
66 static struct xseg_type *__find_type(const char *name, long *index)
69 for (i = 0; (*index = i) < __nr_types; i++)
70 if (!strncmp(__types[i]->name, name, XSEG_TNAMESIZE))
75 static struct xseg_peer *__find_peer_type(const char *name, int64_t *index)
78 for (i = 0; (*index = i) < __nr_peer_types; i++) {
79 if (!strncmp(__peer_types[i]->name, name, XSEG_TNAMESIZE))
80 return __peer_types[i];
85 void xseg_report_peer_types(void)
88 XSEGLOG("total %u peer types:\n", __nr_peer_types);
89 for (i = 0; i < __nr_peer_types; i++)
90 XSEGLOG("%ld: '%s'\n", i, __peer_types[i]->name);
93 static struct xseg_type *__find_or_load_type(const char *name)
96 struct xseg_type *type = __find_type(name, &i);
101 return __find_type(name, &i);
104 static struct xseg_peer *__find_or_load_peer_type(const char *name)
107 struct xseg_peer *peer_type = __find_peer_type(name, &i);
112 return __find_peer_type(name, &i);
115 static struct xseg_peer *__get_peer_type(struct xseg *xseg, uint32_t serial)
118 struct xseg_peer *type;
119 struct xseg_private *priv = xseg->priv;
120 char (*shared_peer_types)[XSEG_TNAMESIZE];
122 if (serial >= xseg->max_peer_types) {
123 XSEGLOG("invalid peer type serial %d >= %d\n",
124 serial, xseg->max_peer_types);
128 type = priv->peer_types[serial];
132 /* xseg->shared->peer_types is an append-only array,
133 * therefore this should be safe
134 * without either locking or string copying. */
135 shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
136 name = shared_peer_types[serial];
138 XSEGLOG("nonexistent peer type serial %d\n", serial);
142 type = __find_or_load_peer_type(name);
144 XSEGLOG("could not find driver for peer type %d [%s]\n",
147 priv->peer_types[serial] = type;
151 static void * __get_peer_type_data(struct xseg *xseg, uint32_t serial)
155 struct xseg_private *priv = xseg->priv;
156 char (*shared_peer_types)[XSEG_TNAMESIZE];
157 xptr *shared_peer_type_data;
159 if (serial >= xseg->max_peer_types) {
160 XSEGLOG("invalid peer type serial %d >= %d\n",
161 serial, xseg->max_peer_types);
165 data = priv->peer_type_data[serial];
169 shared_peer_types = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
170 name = shared_peer_types[serial];
172 XSEGLOG("nonexistent peer type serial %d\n", serial);
175 shared_peer_type_data = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
177 priv->peer_type_data[serial] = XPTR_TAKE(shared_peer_type_data[serial], xseg->segment);
178 return priv->peer_type_data[serial];
181 static inline int __validate_port(struct xseg *xseg, uint32_t portno)
183 return portno < xseg->config.nr_ports;
186 static inline int __validate_ptr(struct xseg *xseg, xptr ptr)
188 return ptr < xseg->segment_size;
191 /* type:name:nr_ports:nr_requests:request_size:extra_size:page_shift */
193 #define TOK(s, sp, def) \
210 static unsigned long strul(char *s)
214 unsigned char c = *s - '0';
224 static char *strncopy(char *dest, const char *src, uint32_t n)
228 for (i = 0; i < n; i++) {
239 int xseg_parse_spec(char *segspec, struct xseg_config *config)
241 /* default: "posix:globalxseg:4:256:12" */
242 char *s = segspec, *sp = segspec;
246 strncpy(config->type, s, XSEG_TNAMESIZE);
247 config->type[XSEG_TNAMESIZE-1] = 0;
250 TOK(s, sp, "globalxseg");
251 strncpy(config->name, s, XSEG_NAMESIZE);
252 config->name[XSEG_NAMESIZE-1] = 0;
256 config->nr_ports = strul(s);
260 config->heap_size = (uint64_t) (strul(s) * 1024UL * 1024UL);
264 config->page_shift = strul(s);
268 int xseg_register_type(struct xseg_type *type)
272 struct xseg_type *__type;
274 __type = __find_type(type->name, &i);
276 XSEGLOG("type %s already exists\n", type->name);
280 if (__nr_types >= XSEG_NR_TYPES) {
281 XSEGLOG("maximum type registrations reached: %u\n", __nr_types);
286 type->name[XSEG_TNAMESIZE-1] = 0;
287 __types[__nr_types] = type;
295 int xseg_unregister_type(const char *name)
299 struct xseg_type *__type;
301 __type = __find_type(name, &i);
303 XSEGLOG("segment type '%s' does not exist\n", name);
308 __types[i] = __types[__nr_types];
309 __types[__nr_types] = NULL;
316 int xseg_register_peer(struct xseg_peer *peer_type)
320 struct xseg_peer *type;
322 type = __find_peer_type(peer_type->name, &i);
324 XSEGLOG("peer type '%s' already exists\n", type->name);
328 if (__nr_peer_types >= XSEG_NR_PEER_TYPES) {
329 XSEGLOG("maximum peer type registrations reached: %u",
335 if (peer_type->peer_ops.remote_signal_init()) {
336 XSEGLOG("peer type '%s': signal initialization failed\n",
342 peer_type->name[XSEG_TNAMESIZE-1] = 0;
343 __peer_types[__nr_peer_types] = peer_type;
344 __nr_peer_types += 1;
352 int xseg_unregister_peer(const char *name)
355 struct xseg_peer *driver;
358 driver = __find_peer_type(name, &i);
360 XSEGLOG("peer type '%s' does not exist\n", name);
364 __nr_peer_types -= 1;
365 __peer_types[i] = __peer_types[__nr_peer_types];
366 __peer_types[__nr_peer_types] = NULL;
367 driver->peer_ops.remote_signal_quit();
374 int64_t __enable_driver(struct xseg *xseg, struct xseg_peer *driver)
377 char (*drivers)[XSEG_TNAMESIZE];
379 uint32_t max_drivers = xseg->max_peer_types;
383 if (xseg->shared->nr_peer_types >= max_drivers) {
384 XSEGLOG("cannot register '%s': driver namespace full\n",
389 drivers = XPTR_TAKE(xseg->shared->peer_types, xseg->segment);
390 for (r = 0; r < max_drivers; r++) {
393 if (!strncmp(drivers[r], driver->name, XSEG_TNAMESIZE)){
394 data = __get_peer_type_data(xseg, r);
403 /* assert(xseg->shared->nr_peer_types == r); */
404 data = driver->peer_ops.alloc_data(xseg);
407 peer_type_data = XPTR_MAKE(data, xseg->segment);
408 ptd = XPTR_TAKE(xseg->shared->peer_type_data, xseg->segment);
409 ptd[r] = peer_type_data;
410 xseg->shared->nr_peer_types = r + 1;
411 strncpy(drivers[r], driver->name, XSEG_TNAMESIZE);
412 drivers[r][XSEG_TNAMESIZE-1] = 0;
415 xseg->priv->peer_types[r] = driver;
416 xseg->priv->peer_type_data[r] = data;
420 int64_t xseg_enable_driver(struct xseg *xseg, const char *name)
423 struct xseg_peer *driver;
426 driver = __find_peer_type(name, &r);
428 XSEGLOG("driver '%s' not found\n", name);
432 __lock_segment(xseg);
433 r = __enable_driver(xseg, driver);
434 __unlock_segment(xseg);
440 int xseg_disable_driver(struct xseg *xseg, const char *name)
444 struct xseg_private *priv = xseg->priv;
445 struct xseg_peer *driver;
447 driver = __find_peer_type(name, &i);
449 XSEGLOG("driver '%s' not found\n", name);
453 for (i = 0; i < xseg->max_peer_types; i++)
454 if (priv->peer_types[i] == driver)
455 priv->peer_types[i] = NULL;
462 /* NOTE: calculate_segment_size() and initialize_segment()
463 * must always be exactly in sync!
466 static uint64_t calculate_segment_size(struct xseg_config *config)
469 uint32_t page_size, page_shift = config->page_shift;
471 /* assert(sizeof(struct xseg) <= (1 << 9)); */
473 if (page_shift < 9) {
474 XSEGLOG("page_shift must be >= %d\n", 9);
478 page_size = 1 << page_shift;
480 /* struct xseg itself + struct xheap */
481 size += 2*page_size + config->heap_size;
482 size = __align(size, page_shift);
487 static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
489 uint32_t page_shift = cfg->page_shift, page_size = 1 << page_shift;
490 struct xseg_shared *shared;
491 char *segment = (char *)xseg;
492 uint64_t size = page_size, i;
495 struct xobject_h *obj_h;
501 if (page_size < XSEG_MIN_PAGE_SIZE)
504 xseg->segment_size = 2 * page_size + cfg->heap_size;
505 xseg->segment = (struct xseg *) segment;
508 xseg->heap = (struct xheap *) XPTR_MAKE(segment + size, segment);
509 size += sizeof(struct xheap);
510 size = __align(size, page_shift);
512 heap = XPTR_TAKE(xseg->heap, segment);
513 r = xheap_init(heap, cfg->heap_size, page_shift, segment+size);
517 /* build object_handler handler */
518 mem = xheap_allocate(heap, sizeof(struct xobject_h));
521 xseg->object_handlers = (struct xobject_h *) XPTR_MAKE(mem, segment);
523 r = xobj_handler_init(obj_h, segment, MAGIC_OBJH,
524 sizeof(struct xobject_h), heap);
528 //now that we have object handlers handler, use that to allocate
529 //new object handlers
531 //allocate requests handler
532 mem = xobj_get_obj(obj_h, X_ALLOC);
536 r = xobj_handler_init(obj_h, segment, MAGIC_REQ,
537 sizeof(struct xseg_request), heap);
540 xseg->request_h = (struct xobject_h *) XPTR_MAKE(obj_h, segment);
542 //allocate ports handler
543 obj_h = XPTR_TAKE(xseg->object_handlers, segment);
544 mem = xobj_get_obj(obj_h, X_ALLOC);
548 r = xobj_handler_init(obj_h, segment, MAGIC_PORT,
549 sizeof(struct xseg_port), heap);
552 xseg->port_h = (struct xobject_h *) XPTR_MAKE(mem, segment);
554 //allocate xptr port array to be used as a map
555 //portno <--> xptr port
556 mem = xheap_allocate(heap, sizeof(xptr)*cfg->nr_ports);
560 for (i = 0; i < cfg->nr_ports; i++) {
563 xseg->ports = (xptr *) XPTR_MAKE(mem, segment);
565 //allocate {src,dst} gws
566 mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
570 for (i = 0; i < cfg->nr_ports; i++) {
573 xseg->path_next = (xport *) XPTR_MAKE(mem, segment);
575 mem = xheap_allocate(heap, sizeof(xport) * cfg->nr_ports);
579 for (i = 0; i < cfg->nr_ports; i++) {
582 xseg->dst_gw = (xport *) XPTR_MAKE(mem, segment);
584 //allocate xseg_shared memory
585 mem = xheap_allocate(heap, sizeof(struct xseg_shared));
588 shared = (struct xseg_shared *) mem;
590 shared->nr_peer_types = 0;
591 xseg->shared = (struct xseg_shared *) XPTR_MAKE(mem, segment);
593 mem = xheap_allocate(heap, page_size);
596 shared->peer_types = (char **) XPTR_MAKE(mem, segment);
597 xseg->max_peer_types = xheap_get_chunk_size(mem) / XSEG_TNAMESIZE;
598 mem = xheap_allocate(heap, xseg->max_peer_types * sizeof(xptr));
601 memset(mem, 0, xheap_get_chunk_size(mem));
602 shared->peer_type_data = (xptr *) XPTR_MAKE(mem, segment);
604 memcpy(&xseg->config, cfg, sizeof(struct xseg_config));
606 xseg->counters.req_cnt = 0;
607 xseg->counters.avg_req_lat = 0;
612 int xseg_create(struct xseg_config *cfg)
614 struct xseg *xseg = NULL;
615 struct xseg_type *type;
616 struct xseg_operations *xops;
620 type = __find_or_load_type(cfg->type);
622 cfg->type[XSEG_TNAMESIZE-1] = 0;
623 XSEGLOG("type '%s' does not exist\n", cfg->type);
627 size = calculate_segment_size(cfg);
629 XSEGLOG("invalid config!\n");
634 cfg->name[XSEG_NAMESIZE-1] = 0;
635 XSEGLOG("creating segment of size %llu\n", size);
636 r = xops->allocate(cfg->name, size);
638 XSEGLOG("cannot allocate segment!\n");
642 xseg = xops->map(cfg->name, size, NULL);
644 XSEGLOG("cannot map segment!\n");
648 r = initialize_segment(xseg, cfg);
649 xops->unmap(xseg, size);
651 XSEGLOG("cannot initilize segment!\n");
659 xops->deallocate(cfg->name);
664 void xseg_destroy(struct xseg *xseg)
666 struct xseg_type *type;
669 type = __find_or_load_type(xseg->config.type);
671 XSEGLOG("no segment type '%s'\n", xseg->config.type);
675 /* should destroy() leave() first? */
676 type->ops.deallocate(xseg->config.name);
682 static int pointer_ok( unsigned long ptr,
687 int ret = !(ptr >= base && ptr < base + size);
689 XSEGLOG("invalid pointer '->%s' [%llx on %llx]!\n",
690 (unsigned long long)ptr,
691 (unsigned long long)base,
696 #define POINTER_OK(xseg, field, base) \
697 pointer_ok( (unsigned long)((xseg)->field), \
698 (unsigned long)(base), \
699 (xseg)->segment_size, \
702 static int xseg_validate_pointers(struct xseg *xseg)
705 r += POINTER_OK(xseg, object_handlers, xseg->segment);
706 r += POINTER_OK(xseg, request_h, xseg->segment);
707 r += POINTER_OK(xseg, port_h, xseg->segment);
708 r += POINTER_OK(xseg, ports, xseg->segment);
709 r += POINTER_OK(xseg, heap, xseg->segment);
710 r += POINTER_OK(xseg, shared, xseg->segment);
714 struct xseg *xseg_join( char *segtypename,
720 struct xseg *xseg, *__xseg;
722 struct xseg_peer *peertype;
723 struct xseg_type *segtype;
724 struct xseg_private *priv;
725 struct xseg_operations *xops;
726 struct xseg_peer_operations *pops;
731 peertype = __find_or_load_peer_type(peertypename);
733 XSEGLOG("Peer type '%s' not found\n", peertypename);
738 segtype = __find_or_load_type(segtypename);
740 XSEGLOG("Segment type '%s' not found\n", segtypename);
747 xops = &segtype->ops;
748 pops = &peertype->peer_ops;
750 xseg = pops->malloc(sizeof(struct xseg));
752 XSEGLOG("Cannot allocate memory");
756 priv = pops->malloc(sizeof(struct xseg_private));
758 XSEGLOG("Cannot allocate memory");
762 __xseg = xops->map(segname, XSEG_MIN_PAGE_SIZE, NULL);
764 XSEGLOG("Cannot map segment");
768 size = __xseg->segment_size;
769 /* XSEGLOG("joined segment of size: %lu\n", (unsigned long)size); */
770 xops->unmap(__xseg, XSEG_MIN_PAGE_SIZE);
772 __xseg = xops->map(segname, size, xseg);
774 XSEGLOG("Cannot map segment");
778 priv->segment_type = *segtype;
779 priv->peer_type = *peertype;
780 priv->wakeup = wakeup;
781 priv->req_data = xhash_new(3, INTEGER); //FIXME should be relative to XSEG_DEF_REQS
784 xlock_release(&priv->reqdatalock);
786 xseg->max_peer_types = __xseg->max_peer_types;
788 priv->peer_types = pops->malloc(sizeof(void *) * xseg->max_peer_types);
789 if (!priv->peer_types) {
790 XSEGLOG("Cannot allocate memory");
793 memset(priv->peer_types, 0, sizeof(void *) * xseg->max_peer_types);
794 priv->peer_type_data = pops->malloc(sizeof(void *) * xseg->max_peer_types);
795 if (!priv->peer_types) {
796 XSEGLOG("Cannot allocate memory");
797 //FIXME wrong err handling
800 memset(priv->peer_type_data, 0, sizeof(void *) * xseg->max_peer_types);
803 xseg->config = __xseg->config;
804 xseg->version = __xseg->version;
805 xseg->request_h = XPTR_TAKE(__xseg->request_h, __xseg);
806 xseg->port_h = XPTR_TAKE(__xseg->port_h, __xseg);
807 xseg->ports = XPTR_TAKE(__xseg->ports, __xseg);
808 xseg->path_next = XPTR_TAKE(__xseg->path_next, __xseg);
809 xseg->dst_gw = XPTR_TAKE(__xseg->dst_gw, __xseg);
810 xseg->heap = XPTR_TAKE(__xseg->heap, __xseg);
811 xseg->object_handlers = XPTR_TAKE(__xseg->object_handlers, __xseg);
812 xseg->shared = XPTR_TAKE(__xseg->shared, __xseg);
813 xseg->segment_size = size;
814 xseg->segment = __xseg;
815 __sync_synchronize();
817 r = xseg_validate_pointers(xseg);
819 XSEGLOG("found %d invalid xseg pointers!\n", r);
824 r = xops->signal_join(xseg);
826 XSEGLOG("Cannot attach signaling to segment! (error: %d)\n", r);
834 pops->mfree(priv->peer_types);
836 xops->unmap(__xseg, size);
837 xhash_free(priv->req_data);
846 void xseg_leave(struct xseg *xseg)
848 struct xseg_type *type;
851 type = __find_or_load_type(xseg->config.type);
853 XSEGLOG("no segment type '%s'\n", xseg->config.type);
859 type->ops.unmap(xseg->segment, xseg->segment_size);
863 struct xseg_port* xseg_get_port(struct xseg *xseg, uint32_t portno)
866 if (!__validate_port(xseg, portno))
868 p = xseg->ports[portno];
870 return XPTR_TAKE(p, xseg->segment);
875 struct xq * __alloc_queue(struct xseg *xseg, uint64_t nr_reqs)
880 struct xheap *heap = xseg->heap;
882 //how many bytes to allocate for a queue
883 bytes = sizeof(struct xq) + nr_reqs*sizeof(xqindex);
884 mem = xheap_allocate(heap, bytes);
888 //how many bytes did we got, and calculate what's left of buffer
889 bytes = xheap_get_chunk_size(mem) - sizeof(struct xq);
891 //initialize queue with max nr of elements it can hold
892 q = (struct xq *) mem;
893 buf = (void *) (((unsigned long) mem) + sizeof(struct xq));
894 xq_init_empty(q, bytes/sizeof(xqindex), buf);
900 //maybe add parameters of initial free_queue size and max_alloc_reqs
901 struct xseg_port *xseg_alloc_port(struct xseg *xseg, uint32_t flags, uint64_t nr_reqs)
904 struct xobject_h *obj_h = xseg->port_h;
905 struct xseg_port *port = xobj_get_obj(obj_h, flags);
910 q = __alloc_queue(xseg, nr_reqs);
913 port->free_queue = XPTR_MAKE(q, xseg->segment);
915 //and for request queue
916 q = __alloc_queue(xseg, nr_reqs);
919 port->request_queue = XPTR_MAKE(q, xseg->segment);
921 //and for reply queue
922 q = __alloc_queue(xseg, nr_reqs);
925 port->reply_queue = XPTR_MAKE(q, xseg->segment);
927 xlock_release(&port->fq_lock);
928 xlock_release(&port->rq_lock);
929 xlock_release(&port->pq_lock);
930 xlock_release(&port->port_lock);
932 port->portno = NoPort;
933 port->peer_type = 0; //FIXME what here ??? NoType??
934 port->alloc_reqs = 0;
935 port->max_alloc_reqs = XSEG_DEF_MAX_ALLOCATED_REQS;
942 xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
943 port->request_queue = 0;
945 xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
946 port->free_queue = 0;
948 xobj_put_obj(obj_h, port);
953 void xseg_free_port(struct xseg *xseg, struct xseg_port *port)
955 struct xobject_h *obj_h = xseg->port_h;
957 if (port->request_queue) {
958 xheap_free(XPTR_TAKE(port->request_queue, xseg->segment));
959 port->request_queue = 0;
961 if (port->free_queue) {
962 xheap_free(XPTR_TAKE(port->free_queue, xseg->segment));
963 port->free_queue = 0;
965 if (port->reply_queue) {
966 xheap_free(XPTR_TAKE(port->reply_queue, xseg->segment));
967 port->reply_queue = 0;
969 xobj_put_obj(obj_h, port);
972 void* xseg_alloc_buffer(struct xseg *xseg, uint64_t size)
974 struct xheap *heap = xseg->heap;
975 void *mem = xheap_allocate(heap, size);
976 if (mem && xheap_get_chunk_size(mem) < size) {
977 XSEGLOG("Buffer size %llu instead of %llu\n",
978 xheap_get_chunk_size(mem), size);
985 void xseg_free_buffer(struct xseg *xseg, void *ptr)
990 int xseg_prepare_wait(struct xseg *xseg, uint32_t portno)
992 if (!__validate_port(xseg, portno))
995 return xseg->priv->peer_type.peer_ops.prepare_wait(xseg, portno);
998 int xseg_cancel_wait(struct xseg *xseg, uint32_t portno)
1000 if (!__validate_port(xseg, portno))
1002 return xseg->priv->peer_type.peer_ops.cancel_wait(xseg, portno);
1005 int xseg_wait_signal(struct xseg *xseg, uint32_t usec_timeout)
1007 return xseg->priv->peer_type.peer_ops.wait_signal(xseg, usec_timeout);
1010 int xseg_signal(struct xseg *xseg, xport portno)
1012 struct xseg_peer *type;
1013 struct xseg_port *port = xseg_get_port(xseg, portno);
1017 type = __get_peer_type(xseg, port->peer_type);
1021 return type->peer_ops.signal(xseg, portno);
1024 int xseg_init_local_signal(struct xseg *xseg, xport portno)
1026 struct xseg_peer *type;
1027 struct xseg_port *port = xseg_get_port(xseg, portno);
1031 type = __get_peer_type(xseg, port->peer_type);
1035 return type->peer_ops.local_signal_init(xseg, portno);
1038 void xseg_quit_local_signal(struct xseg *xseg, xport portno)
1040 struct xseg_peer *type;
1041 struct xseg_port *port = xseg_get_port(xseg, portno);
1045 type = __get_peer_type(xseg, port->peer_type);
1049 type->peer_ops.local_signal_quit(xseg, portno);
1052 //FIXME doesn't increase alloced reqs
1053 //is integer i enough here?
1054 int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr)
1059 struct xseg_request *req;
1060 struct xseg_port *port = xseg_get_port(xseg, portno);
1064 xlock_acquire(&port->fq_lock, portno);
1065 q = XPTR_TAKE(port->free_queue, xseg->segment);
1066 while ((req = xobj_get_obj(xseg->request_h, X_ALLOC)) != NULL && i < nr) {
1067 xqi = XPTR_MAKE(req, xseg->segment);
1068 xqi = __xq_append_tail(q, xqi);
1069 if (xqi == Noneidx) {
1070 xobj_put_obj(xseg->request_h, req);
1075 xlock_release(&port->fq_lock);
1082 int xseg_free_requests(struct xseg *xseg, uint32_t portno, int nr)
1087 struct xseg_request *req;
1088 struct xseg_port *port = xseg_get_port(xseg, portno);
1092 xlock_acquire(&port->fq_lock, portno);
1093 q = XPTR_TAKE(port->free_queue, xseg->segment);
1094 while ((xqi = __xq_pop_head(q)) != Noneidx && i < nr) {
1095 req = XPTR_TAKE(xqi, xseg->segment);
1096 xobj_put_obj(xseg->request_h, (void *) req);
1101 xlock_release(&port->fq_lock);
1103 xlock_acquire(&port->port_lock, portno);
1104 port->alloc_reqs -= i;
1105 xlock_release(&port->port_lock);
1110 int xseg_prep_ports (struct xseg *xseg, struct xseg_request *xreq,
1111 uint32_t src_portno, uint32_t dst_portno)
1113 if (!__validate_port(xseg, src_portno))
1116 if (!__validate_port(xseg, dst_portno))
1119 xreq->src_portno = src_portno;
1120 xreq->transit_portno = src_portno;
1121 xreq->dst_portno = dst_portno;
1122 xreq->effective_dst_portno = dst_portno;
1127 struct xseg_request *xseg_get_request(struct xseg *xseg, xport src_portno,
1128 xport dst_portno, uint32_t flags)
1132 * X_ALLOC Allocate more requests if object handler
1133 * does not have any avaiable
1134 * X_LOCAL Use only local - preallocated reqs
1135 * (Maybe we want this as default, to give a hint to a peer
1136 * how many requests it can have flying)
1138 struct xseg_request *req = NULL;
1139 struct xseg_port *port;
1144 port = xseg_get_port(xseg, src_portno);
1147 //try to allocate from free_queue
1148 xlock_acquire(&port->fq_lock, src_portno);
1149 q = XPTR_TAKE(port->free_queue, xseg->segment);
1150 xqi = __xq_pop_head(q);
1151 if (xqi != Noneidx){
1152 xlock_release(&port->fq_lock);
1154 req = XPTR_TAKE(ptr, xseg->segment);
1157 xlock_release(&port->fq_lock);
1159 if (flags & X_LOCAL)
1162 //else try to allocate from global heap
1164 xlock_acquire(&port->port_lock, src_portno);
1165 if (port->alloc_reqs < port->max_alloc_reqs) {
1166 req = xobj_get_obj(xseg->request_h, flags & X_ALLOC);
1170 xlock_release(&port->port_lock);
1182 if (xseg_prep_ports(xseg, req, src_portno, dst_portno) < 0) {
1183 xseg_put_request(xseg, req, src_portno);
1188 req->timestamp.tv_sec = 0;
1189 req->timestamp.tv_usec = 0;
1192 xq_init_empty(&req->path, MAX_PATH_LEN, req->path_bufs);
1198 //do not put request if path not empty or X_FORCE set
1199 int xseg_put_request (struct xseg *xseg, struct xseg_request *xreq,
1202 xqindex xqi = XPTR_MAKE(xreq, xseg->segment);
1204 struct xseg_port *port = xseg_get_port(xseg, xreq->src_portno);
1209 void *ptr = XPTR_TAKE(xreq->buffer, xseg->segment);
1210 xseg_free_buffer(xseg, ptr);
1213 xq_init_empty(&xreq->path, MAX_PATH_LEN, xreq->path_bufs);
1216 xreq->bufferlen = 0;
1220 xreq->targetlen = 0;
1222 xreq->src_portno = NoPort;
1223 xreq->dst_portno = NoPort;
1224 xreq->transit_portno = NoPort;
1225 xreq->effective_dst_portno = NoPort;
1227 if (xreq->elapsed != 0) {
1228 __lock_segment(xseg);
1229 ++(xseg->counters.req_cnt);
1230 xseg->counters.avg_req_lat += xreq->elapsed;
1231 __unlock_segment(xseg);
1235 //try to put it in free_queue of the port
1236 xlock_acquire(&port->fq_lock, portno);
1237 q = XPTR_TAKE(port->free_queue, xseg->segment);
1238 xqi = __xq_append_head(q, xqi);
1239 xlock_release(&port->fq_lock);
1242 //else return it to segment
1243 xobj_put_obj(xseg->request_h, (void *) xreq);
1244 xlock_acquire(&port->port_lock, portno);
1246 xlock_release(&port->port_lock);
1250 int xseg_prep_request ( struct xseg* xseg, struct xseg_request *req,
1251 uint32_t targetlen, uint64_t datalen )
1253 uint64_t bufferlen = targetlen + datalen;
1257 buf = xseg_alloc_buffer(xseg, bufferlen);
1260 req->bufferlen = xheap_get_chunk_size(buf);
1261 req->buffer = XPTR_MAKE(buf, xseg->segment);
1263 req->data = req->buffer;
1264 req->target = req->buffer + req->bufferlen - targetlen;
1265 req->datalen = datalen;
1266 req->targetlen = targetlen;
1270 int xseg_resize_request (struct xseg *xseg, struct xseg_request *req,
1271 uint32_t new_targetlen, uint64_t new_datalen)
1273 if (req->bufferlen >= new_datalen + new_targetlen) {
1274 req->data = req->buffer;
1275 req->target = req->buffer + req->bufferlen - new_targetlen;
1276 req->datalen = new_datalen;
1277 req->targetlen = new_targetlen;
1282 void *ptr = XPTR_TAKE(req->buffer, xseg->segment);
1283 xseg_free_buffer(xseg, ptr);
1287 return xseg_prep_request(xseg, req, new_targetlen, new_datalen);
1290 static void __update_timestamp(struct xseg_request *xreq)
1294 __get_current_time(&tv);
1295 if (xreq->timestamp.tv_sec != 0)
1297 * FIXME: Make xreq->elapsed timeval/timespec again to avoid the
1300 xreq->elapsed += (tv.tv_sec - xreq->timestamp.tv_sec) * 1000000
1301 + (tv.tv_usec - xreq->timestamp.tv_usec);
1303 xreq->timestamp.tv_sec = tv.tv_sec;
1304 xreq->timestamp.tv_usec = tv.tv_usec;
1307 //FIXME should we add NON_BLOCK flag?
1308 xport xseg_submit (struct xseg *xseg, struct xseg_request *xreq,
1309 xport portno, uint32_t flags)
1311 xserial serial = NoSerial;
1313 struct xq *q, *newq;
1315 struct xseg_port *port;
1317 /* discover where to submit */
1319 if (!__validate_port(xseg, xreq->transit_portno)){
1320 XSEGLOG("Couldn't validate transit_portno (portno: %lu)",
1321 xreq->transit_portno);
1324 if (!__validate_port(xseg, xreq->effective_dst_portno)){
1325 XSEGLOG("Couldn't validate effective_dst_portno (portno: %lu)",
1326 xreq->effective_dst_portno);
1330 cur = xreq->transit_portno;
1332 //FIXME assert(cur == portno);
1334 if (next == xreq->effective_dst_portno){
1335 XSEGLOG("Path ended with no one willing to accept");
1339 if (xseg->path_next[next] != NoPort){
1340 next = xseg->path_next[next];
1342 next = xreq->effective_dst_portno;
1345 port = xseg_get_port(xseg, next);
1347 XSEGLOG("Couldnt get port (next :%u)", next);
1350 } while ((!port->flags & CAN_ACCEPT));
1354 //__update_timestamp(xreq);
1356 xqi = XPTR_MAKE(xreq, xseg->segment);
1358 /* add current port to path */
1359 serial = __xq_append_head(&xreq->path, cur);
1360 if (serial == Noneidx){
1361 XSEGLOG("Couldn't append path head");
1365 xlock_acquire(&port->rq_lock, portno);
1366 q = XPTR_TAKE(port->request_queue, xseg->segment);
1367 serial = __xq_append_tail(q, xqi);
1368 if (flags & X_ALLOC && serial == Noneidx) {
1369 /* double up queue size */
1370 XSEGLOG("Trying to double up queue");
1371 newq = __alloc_queue(xseg, xq_size(q)*2);
1374 r = __xq_resize(q, newq);
1379 port->request_queue = XPTR_MAKE(newq, xseg->segment);
1381 serial = __xq_append_tail(newq, xqi);
1385 xlock_release(&port->rq_lock);
1386 if (serial == Noneidx){
1387 XSEGLOG("Couldn't append request to queue");
1388 __xq_pop_head(&xreq->path);
1395 struct xseg_request *xseg_receive(struct xseg *xseg, xport portno, uint32_t flags)
1398 xserial serial = NoSerial;
1400 struct xseg_request *req;
1401 struct xseg_port *port = xseg_get_port(xseg, portno);
1405 if (flags & X_NONBLOCK) {
1406 if (!xlock_try_lock(&port->pq_lock, portno))
1409 xlock_acquire(&port->pq_lock, portno);
1411 q = XPTR_TAKE(port->reply_queue, xseg->segment);
1412 xqi = __xq_pop_head(q);
1413 xlock_release(&port->pq_lock);
1418 req = XPTR_TAKE(xqi, xseg->segment);
1419 // __update_timestamp(req);
1420 serial = __xq_pop_head(&req->path);
1421 if (serial == Noneidx){
1422 /* this should never happen */
1423 XSEGLOG("pop head of path queue returned Noneidx\n");
1431 struct xseg_request *xseg_accept(struct xseg *xseg, xport portno, uint32_t flags)
1435 struct xseg_request *req;
1436 struct xseg_port *port = xseg_get_port(xseg, portno);
1439 if (flags & X_NONBLOCK) {
1440 if (!xlock_try_lock(&port->rq_lock, portno))
1443 xlock_acquire(&port->rq_lock, portno);
1446 q = XPTR_TAKE(port->request_queue, xseg->segment);
1447 xqi = __xq_pop_head(q);
1448 xlock_release(&port->rq_lock);
1452 req = XPTR_TAKE(xqi, xseg->segment);
1453 req->transit_portno = portno;
1458 //FIXME should we add NON_BLOCK flag?
1459 xport xseg_respond (struct xseg *xseg, struct xseg_request *xreq,
1460 xport portno, uint32_t flags)
1462 xserial serial = NoSerial;
1464 struct xq *q, *newq;
1465 struct xseg_port *port;
1469 serial = __xq_peek_head(&xreq->path);
1470 if (serial == Noneidx)
1472 dst = (xport) serial;
1474 port = xseg_get_port(xseg, dst);
1477 if (!(port->flags & CAN_RECEIVE)){
1478 //XSEGLOG("Port %u cannot receive", dst);
1479 /* Port cannot receive. Try next one in path */
1480 __xq_pop_head(&xreq->path);
1484 xqi = XPTR_MAKE(xreq, xseg->segment);
1486 xlock_acquire(&port->pq_lock, portno);
1487 q = XPTR_TAKE(port->reply_queue, xseg->segment);
1488 serial = __xq_append_tail(q, xqi);
1489 if (flags & X_ALLOC && serial == Noneidx) {
1490 newq = __alloc_queue(xseg, xq_size(q)*2);
1493 r = __xq_resize(q, newq);
1498 port->reply_queue = XPTR_MAKE(newq, xseg->segment);
1500 serial = __xq_append_tail(newq, xqi);
1504 xlock_release(&port->pq_lock);
1506 if (serial == Noneidx)
1512 xport xseg_forward(struct xseg *xseg, struct xseg_request *req, xport new_dst,
1513 xport portno, uint32_t flags)
1515 if (!__validate_port(xseg, new_dst)){
1516 XSEGLOG("Couldn't validate new destination (new_dst %lu)",
1520 req->effective_dst_portno = new_dst;
1521 return xseg_submit(xseg, req, portno, flags);
1525 int xseg_set_path_next(struct xseg *xseg, xport portno, xport next)
1527 if (!__validate_port(xseg, portno))
1529 if (!__validate_port(xseg, next))
1531 xseg->path_next[portno] = next;
1535 int xseg_set_req_data(struct xseg *xseg, struct xseg_request *xreq, void *data)
1540 xlock_acquire(&xseg->priv->reqdatalock, 1);
1542 req_data = xseg->priv->req_data;
1543 r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1544 if (r == -XHASH_ERESIZE) {
1545 req_data = xhash_resize(req_data, xhash_grow_size_shift(req_data), NULL);
1547 xseg->priv->req_data = req_data;
1548 r = xhash_insert(req_data, (xhashidx) xreq, (xhashidx) data);
1552 xlock_release(&xseg->priv->reqdatalock);
1556 int xseg_get_req_data(struct xseg *xseg, struct xseg_request *xreq, void **data)
1562 xlock_acquire(&xseg->priv->reqdatalock, 1);
1564 req_data = xseg->priv->req_data;
1565 //maybe we need a xhash_delete with lookup...
1566 //maybe we also need a delete that doesn't shrink xhash
1567 r = xhash_lookup(req_data, (xhashidx) xreq, &val);
1568 *data = (void *) val;
1570 r = xhash_delete(req_data, (xhashidx) xreq);
1571 if (r == -XHASH_ERESIZE) {
1572 req_data = xhash_resize(req_data, xhash_shrink_size_shift(req_data), NULL);
1574 xseg->priv->req_data = req_data;
1575 r = xhash_delete(req_data, (xhashidx) xreq);
1580 xlock_release(&xseg->priv->reqdatalock);
1584 struct xobject_h * xseg_get_objh(struct xseg *xseg, uint32_t magic, uint64_t size)
1587 struct xobject_h *obj_h = xobj_get_obj(xseg->object_handlers, X_ALLOC);
1590 r = xobj_handler_init(obj_h, xseg->segment, magic, size, xseg->heap);
1592 xobj_put_obj(xseg->object_handlers, obj_h);
1598 void xseg_put_objh(struct xseg *xseg, struct xobject_h *objh)
1600 xobj_put_obj(xseg->object_handlers, objh);
1605 int xseg_complete_req(struct xseg_request *req)
1607 req->state |= XS_SERVED;
1608 req->state &= ~XS_FAILED;
1611 int xseg_fail_req(struct xseg_request *req)
1613 req->state &= ~XS_SERVED;
1614 req->state |= XS_FAILED;
1618 struct xseg_port *xseg_bind_port(struct xseg *xseg, uint32_t req, void * sd)
1620 uint32_t portno, maxno, id = __get_id(), force;
1621 struct xseg_port *port = NULL;
1622 void *peer_data, *sigdesc;
1626 if (req >= xseg->config.nr_ports) {
1628 maxno = xseg->config.nr_ports;
1636 __lock_segment(xseg);
1637 for (; portno < maxno; portno++) {
1638 if (!xseg->ports[portno]) {
1639 port = xseg_alloc_port(xseg, X_ALLOC, XSEG_DEF_REQS);
1643 port = xseg_get_port(xseg, portno);
1649 driver = __enable_driver(xseg, &xseg->priv->peer_type);
1653 peer_data = __get_peer_type_data(xseg, (uint64_t) driver);
1656 sigdesc = xseg->priv->peer_type.peer_ops.alloc_signal_desc(xseg, peer_data);
1659 r = xseg->priv->peer_type.peer_ops.init_signal_desc(xseg, sigdesc);
1661 xseg->priv->peer_type.peer_ops.free_signal_desc(xseg, peer_data, sigdesc);
1664 port->signal_desc = XPTR_MAKE(sigdesc, xseg->segment);
1666 port->signal_desc = XPTR_MAKE(sd, xseg->segment);
1668 port->peer_type = (uint64_t)driver;
1670 port->portno = portno;
1671 port->flags = CAN_ACCEPT | CAN_RECEIVE;
1672 xseg->ports[portno] = XPTR_MAKE(port, xseg->segment);
1676 xseg_free_port(xseg, port);
1677 xseg->ports[portno] = 0;
1681 __unlock_segment(xseg);
1686 * set the limit of requests, a port can allocate.
1688 * this limit should be greater than the number of requests a port can cache
1689 * locally on its free_queue, and less than the hard limit imposed by the
1692 * maybe make it drop excess requests
1694 int xseg_set_max_requests(struct xseg *xseg, xport portno, uint64_t nr_reqs)
1697 struct xseg_port *port;
1699 if (nr_reqs > XSEG_MAX_ALLOCATED_REQS)
1702 port = xseg_get_port(xseg, portno);
1706 xlock_acquire(&port->fq_lock, portno);
1707 q = XPTR_TAKE(port->free_queue, xseg->segment);
1708 if (xq_size(q) <= nr_reqs){
1709 port->max_alloc_reqs = nr_reqs;
1712 xlock_release(&port->fq_lock);
1715 * if theres is a get_request in progress, it is not critical to enforce
1718 port->max_alloc_reqs = nr_reqs;
1722 uint64_t xseg_get_max_requests(struct xseg *xseg, xport portno)
1724 struct xseg_port *port = xseg_get_port(xseg, portno);
1727 return port->max_alloc_reqs;
1730 uint64_t xseg_get_allocated_requests(struct xseg *xseg, xport portno)
1732 struct xseg_port *port = xseg_get_port(xseg, portno);
1735 return port->alloc_reqs;
1739 * set free_queue size, aka the local "cached" requests a port can have
1740 * it should be smaller than port->max_alloc_reqs?
1743 int xseg_set_freequeue_size(struct xseg *xseg, xport portno, xqindex size,
1748 struct xq *q, *newq;
1749 struct xseg_request *xreq;
1750 struct xseg_port *port = xseg_get_port(xseg, portno);
1754 newq = __alloc_queue(xseg, size);
1758 if (flags & X_NONBLOCK) {
1759 if (!xlock_try_lock(&port->fq_lock, portno))
1762 xlock_acquire(&port->fq_lock, portno);
1765 q = XPTR_TAKE(port->free_queue, xseg->segment);
1767 /* put requests that don't fit in the new queue */
1768 while (xq_count(q) > xq_size(newq)){
1769 xqi = __xq_pop_head(q);
1770 if (xqi != Noneidx){
1771 xreq = XPTR_TAKE(xqi, xseg->segment);
1772 xobj_put_obj(xseg->request_h, (void *) xreq);
1776 r = __xq_resize(q, newq);
1782 port->free_queue = XPTR_MAKE(newq, xseg->segment);
1787 xlock_release(&port->fq_lock);
1791 int xseg_leave_port(struct xseg *xseg, struct xseg_port *port)
1793 /* To be implemented */
1797 int xseg_initialize(void)
1799 return __xseg_preinit(); /* with or without lock ? */
1802 int xseg_finalize(void)
1804 /* finalize not supported yet */
1809 char* xseg_get_data_nonstatic(struct xseg* xseg, struct xseg_request *req)
1811 return xseg_get_data(xseg, req);
1814 char* xseg_get_target_nonstatic(struct xseg* xseg, struct xseg_request *req)
1816 return xseg_get_target(xseg, req);
1821 #include <linux/module.h>
1822 #include <xseg/xseg_exports.h>