add support for resizable queues
authorFilippos Giannakos <philipgian@grnet.gr>
Fri, 31 Aug 2012 16:56:58 +0000 (19:56 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Fri, 31 Aug 2012 16:56:58 +0000 (19:56 +0300)
also fix heap index bug

xseg/xseg/xseg.c
xseg/xseg/xseg.h
xseg/xtypes/xheap.c
xseg/xtypes/xobj.c
xseg/xtypes/xq.c
xseg/xtypes/xq.h
xseg/xtypes/xq_exports.h

index 88e3df2..48e70c9 100644 (file)
@@ -773,7 +773,6 @@ struct xq * __alloc_queue(struct xseg *xseg, uint64_t nr_reqs)
        //initialize queue with max nr of elements it can hold
        q = (struct xq *) mem;
        buf = (void *) (((unsigned long) mem) + sizeof(struct xq));
-       XSEGLOG("elements: %llu\n", bytes/sizeof(xqindex));
        xq_init_empty(q, bytes/sizeof(xqindex), buf); 
 
        return q;
@@ -808,6 +807,9 @@ struct xseg_port *xseg_alloc_port(struct xseg *xseg, uint32_t flags, uint64_t nr
                goto err_reply;
        port->reply_queue = XPTR_MAKE(q, xseg->segment);
 
+       xlock_release(&port->fq_lock);
+       xlock_release(&port->rq_lock);
+       xlock_release(&port->pq_lock);
        port->owner = 0; //should be Noone;
        port->waitcue = 0;
        port->portno = 0; // should be Noport;
@@ -849,7 +851,14 @@ void xseg_free_port(struct xseg *xseg, struct xseg_port *port)
 void* xseg_alloc_buffer(struct xseg *xseg, uint64_t size)
 {
        struct xheap *heap = xseg->heap;
-       return xheap_allocate(heap, size);
+       void *mem = xheap_allocate(heap, size);
+       if (xheap_get_chunk_size(mem) < size) {
+               XSEGLOG("Buffer size %llu instead of %llu\n", 
+                               xheap_get_chunk_size(mem), size);
+               xheap_free(mem);
+               mem = NULL;
+       }
+       return mem;
 }
 
 void xseg_free_buffer(struct xseg *xseg, void *ptr)
@@ -891,6 +900,7 @@ int xseg_signal(struct xseg *xseg, uint32_t portno)
        return type->peer_ops.signal(xseg, portno);
 }
 
+//FIXME wrong types (int vs unsigned long)
 int xseg_alloc_requests(struct xseg *xseg, uint32_t portno, uint32_t nr)
 {
        unsigned long i = 0;
@@ -1054,17 +1064,35 @@ xserial xseg_submit (   struct xseg *xseg, uint32_t portno,
                        struct xseg_request *xreq       )
 {
        xserial serial = NoSerial;
-       xqindex xqi;
-       struct xq *q;
+       xqindex xqi, r;
+       struct xq *q, *newq;
        struct xseg_port *port = xseg_get_port(xseg, portno);
        if (!port)
                goto out;
 
        __update_timestamp(xreq);
        
-       q = XPTR_TAKE(port->request_queue, xseg->segment);
        xqi = XPTR_MAKE(xreq, xseg->segment);
-       serial = xq_append_tail(q, xqi, portno);
+
+       xlock_acquire(&port->rq_lock, portno);
+       q = XPTR_TAKE(port->request_queue, xseg->segment);
+       serial = __xq_append_tail(q, xqi);
+       if (serial == Noneidx) {
+               //TODO make it flag controlled
+               /* double up queue size */
+               newq = __alloc_queue(xseg, xq_count(q)*2);
+               if (!newq)
+                       goto out_rel;
+               r = __xq_resize(q, newq);
+               if (r == Noneidx)
+                       goto out_rel;
+               port->request_queue = XPTR_MAKE(newq, xseg->segment);
+               xheap_free(q);
+               serial = __xq_append_tail(newq, xqi);
+       }
+
+out_rel:
+       xlock_release(&port->rq_lock);
 out:
        return serial;
        
@@ -1079,8 +1107,11 @@ struct xseg_request *xseg_receive(struct xseg *xseg, uint32_t portno)
        if (!port)
                return NULL;
 
+       xlock_acquire(&port->pq_lock, portno);
        q = XPTR_TAKE(port->reply_queue, xseg->segment);
-       xqi = xq_pop_head(q, portno);
+       xqi = __xq_pop_head(q);
+       xlock_release(&port->pq_lock);
+
        if (xqi == Noneidx)
                return NULL;
 
@@ -1098,8 +1129,10 @@ struct xseg_request *xseg_accept(struct xseg *xseg, uint32_t portno)
        struct xseg_port *port = xseg_get_port(xseg, portno);
        if (!port)
                return NULL;
+       xlock_acquire(&port->rq_lock, portno);
        q = XPTR_TAKE(port->request_queue, xseg->segment);
-       xqi = xq_pop_head(q, portno);
+       xqi = __xq_pop_head(q);
+       xlock_release(&port->rq_lock);
        if (xqi == Noneidx)
                return NULL;
 
@@ -1112,16 +1145,32 @@ xserial xseg_respond (  struct xseg *xseg, uint32_t portno,
                        struct xseg_request *xreq  )
 {
        xserial serial = NoSerial;
-       xqindex xqi;
-       struct xq *q;
+       xqindex xqi, r;
+       struct xq *q, *newq;
        struct xseg_port *port = xseg_get_port(xseg, portno);
        if (!port)
                goto out;
 
        
-       q = XPTR_TAKE(port->reply_queue, xseg->segment);
        xqi = XPTR_MAKE(xreq, xseg->segment);
-       serial = xq_append_tail(q, xqi, portno);
+       
+       xlock_acquire(&port->pq_lock, portno);
+       q = XPTR_TAKE(port->reply_queue, xseg->segment);
+       serial = __xq_append_tail(q, xqi);
+       if (serial == Noneidx) {
+               newq = __alloc_queue(xseg, xq_count(q)*2);
+               if (!newq)
+                       goto out_rel;
+               r = __xq_resize(q, newq);
+               if (r == Noneidx)
+                       goto out_rel;
+               port->reply_queue = XPTR_MAKE(newq, xseg->segment);
+               xheap_free(q);
+               serial = __xq_append_tail(newq, xqi);
+       }
+
+out_rel:
+       xlock_release(&port->pq_lock);
 out:
        return serial;
        
index 38e5b78..675c3a6 100644 (file)
@@ -112,6 +112,9 @@ struct xseg_config {
 };
 
 struct xseg_port {
+       struct xlock fq_lock;
+       struct xlock rq_lock;
+       struct xlock pq_lock;
        xptr free_queue;
        xptr request_queue;
        xptr reply_queue;
index 4df5706..db62d29 100644 (file)
@@ -12,13 +12,22 @@ static inline uint64_t __get_alloc_bytes(struct xheap *xheap, uint64_t bytes)
 
 static inline struct xheap_header* __get_header(void *ptr)
 {
-       return (struct xheap_header *) (ptr - sizeof(struct xheap_header));
+       return (struct xheap_header *) ((unsigned long)ptr - sizeof(struct xheap_header));
 }
 
-static inline int __get_index(struct xheap *heap, uint64_t bytes)
+#define __ALLOC 1
+#define __FREE 2
+
+static inline int __get_index(struct xheap *heap, uint64_t bytes, int type)
 {
+       uint32_t alignment_unit = heap->alignment_unit;
        bytes = __get_alloc_bytes(heap, bytes) - sizeof(struct xheap_header);
-       return (sizeof(bytes)*8 - __builtin_clzl(bytes -1));
+       if (bytes < (1<<alignment_unit) * 32)
+               return bytes / (1 << alignment_unit);
+       if (type = __ALLOC)
+               return (32 + sizeof(bytes)*8 - __builtin_clzl(bytes) +1);
+       else 
+               return (32 + sizeof(bytes)*8 - __builtin_clzl(bytes));
 }
 
 uint64_t xheap_get_chunk_size(void *ptr)
@@ -33,7 +42,7 @@ uint64_t xheap_get_chunk_size(void *ptr)
 void* xheap_allocate(struct xheap *heap, uint64_t bytes)
 {
        struct xheap_header *h;
-       int r = __get_index(heap, bytes);
+       int r = __get_index(heap, bytes, __ALLOC);
        void *mem = XPTR(&heap->mem), *addr = NULL;
        xptr *free_list = (xptr *) mem;
        xptr head, next;
@@ -44,6 +53,10 @@ void* xheap_allocate(struct xheap *heap, uint64_t bytes)
        //printf("(r: %d) list[%x]: %lu\n", r, &free_list[r], list);
        if (!head)
                goto alloc;
+       if (head > heap->cur) {
+               XSEGLOG("invalid xptr %llu found in chunk lists\n", head);
+               goto out;
+       }
        next = *(xptr *)(((unsigned long) mem) + head);
        free_list[r] = next;
 //     printf("popped %llu out of list. list is now %llu\n", head, next);
@@ -94,7 +107,7 @@ void xheap_free(void *ptr)
        void *mem = XPTR(&heap->mem);
        uint64_t size = xheap_get_chunk_size(ptr);
        xptr *free_list = (xptr *) mem;
-       int r = __get_index(heap, size);
+       int r = __get_index(heap, size, __FREE);
        //printf("size: %llu, r: %d\n", size, r);
        __add_in_free_list(heap, &free_list[r], ptr);
 //     printf("freed %lx (size: %llu)\n", ptr, __get_header(ptr)->size);
@@ -115,7 +128,7 @@ int xheap_init(struct xheap *heap, uint64_t size, uint32_t alignment_unit, void
        heap->alignment_unit = alignment_unit;
        XPTRSET(&heap->mem, mem);
        
-       r = __get_index(heap, size);
+       r = __get_index(heap, size, __ALLOC);
        
        /* minimum alignment unit required */
        if (heap_page < sizeof(struct xheap_header))
index 6d96229..ccf4088 100644 (file)
@@ -43,8 +43,6 @@ int xobj_alloc_obj(struct xobject_h * obj_h, uint64_t nr)
        struct xheap *heap = XPTR_TAKE(obj_h->heap, container);
        struct xobject *obj = NULL;
 
-       unsigned long i = 0;
-
        uint64_t used, bytes = nr * obj_h->obj_size;
        xptr objptr;
        xhash_t *allocated = XPTR_TAKE(obj_h->allocated, container);
@@ -69,7 +67,6 @@ int xobj_alloc_obj(struct xobject_h * obj_h, uint64_t nr)
                obj->next = XPTR_MAKE(((unsigned long) mem) + used, container); //point to the next obj
 //             printf("foo: %lx\n", &obj->next);
                
-               i++;
 
 retry:
                r = xhash_insert(allocated, objptr, objptr); //keep track of allocated objects
@@ -95,7 +92,6 @@ retry:
                        goto retry;
                }
        }
-       XSEGLOG("allocated %lu elements\n", i);
        if (!obj)
                return -1;
        objptr = obj_h->list;
index 328f776..277b535 100644 (file)
@@ -398,6 +398,38 @@ int xq_check(struct xq *xq, xqindex idx, unsigned long who)
        return r;
 }
 
+xqindex __xq_resize(struct xq *xq, struct xq *newxq)
+{
+       xqindex i, mask, mask_new, head, tail, val;
+       xqindex nr = xq_count(xq);
+
+       if (nr > newxq->size) {
+               return Noneidx;
+       }
+
+       mask = xq->size -1;
+       mask_new = newxq->size -1;
+       head = __xq_peek_head_idx(xq, nr);
+       tail = __xq_append_tail_idx(newxq, nr) + nr -1;
+       for (i = 0; i < nr; i++) {
+               val = XPTR(&xq->queue)[(head + i) & mask];
+               XPTR(&newxq->queue)[(tail - i) & mask_new] = val;
+       }
+       
+       return nr;
+}
+
+xqindex xq_resize(struct xq *xq, struct xq *newxq, unsigned long who)
+{
+       xqindex r = Noneidx;
+       xlock_acquire(&xq->lock, who);
+       xlock_acquire(&newxq->lock, who);
+       r = __xq_resize(xq, newxq);
+       xlock_release(&newxq->lock);
+       xlock_release(&xq->lock);
+       return r;
+}
+
 #ifdef __KERNEL__
 #include <linux/module.h>
 #include <xtypes/xq_exports.h>
index 9bd1f53..c2307e7 100644 (file)
@@ -94,5 +94,12 @@ int       __xq_check      ( struct xq  * xq,
 int         xq_check        ( struct xq  * xq, 
                               xqindex      idx,
                               unsigned long who );
+
+xqindex      __xq_resize     ( struct xq  * xq,
+                              struct xq  * newxq);
+
+xqindex      xq_resize       ( struct xq  * xq,
+                              struct xq  * newxq,
+                              unsigned long who );
 #endif
 
index 4d78009..c7ae803 100644 (file)
@@ -24,3 +24,5 @@ EXPORT_SYMBOL(__xq_peek_tail);
 EXPORT_SYMBOL(xq_peek_tail);
 EXPORT_SYMBOL(__xq_check);
 EXPORT_SYMBOL(xq_check);
+EXPORT_SYMBOL(__xq_resize);
+EXPORT_SYMBOL(xq_resize);