fix xsegbd nr of peers problem
authorGiannakos Filippos <philipgian@cslab.ece.ntua.gr>
Wed, 10 Oct 2012 16:13:22 +0000 (19:13 +0300)
committerGiannakos Filippos <philipgian@cslab.ece.ntua.gr>
Wed, 10 Oct 2012 16:13:22 +0000 (19:13 +0300)
12 files changed:
xseg/drivers/kernel/xseg_posix.c
xseg/drivers/kernel/xseg_pthread.c
xseg/drivers/kernel/xseg_segdev.c
xseg/drivers/user/xseg_posix.c
xseg/drivers/user/xseg_pthread.c
xseg/drivers/user/xseg_segdev.c
xseg/launch
xseg/peers/kernel/xsegbd.c
xseg/peers/kernel/xsegbd.h
xseg/peers/user/speer.c
xseg/xseg/xseg.c
xseg/xseg/xseg.h

index d9c2aeb..18d60a7 100644 (file)
@@ -38,12 +38,12 @@ static void posix_remote_signal_quit(void)
        return;
 }
 
-static int posix_local_signal_init(void)
+static int posix_local_signal_init(struct xseg *xseg, xport portno)
 {
        return -1;
 }
 
-static void posix_local_signal_quit(void)
+static void posix_local_signal_quit(struct xseg *xseg, xport portno)
 {
        return;
 }
index 63776b1..9317bb3 100644 (file)
@@ -39,12 +39,12 @@ static void pthread_remote_signal_quit(void)
        return;
 }
 
-static int pthread_local_signal_init(void)
+static int pthread_local_signal_init(struct xseg *xseg, xport portno)
 {
        return -1;
 }
 
-static void pthread_local_signal_quit(void)
+static void pthread_local_signal_quit(struct xseg *xseg, xport portno)
 {
        return;
 }
index 98ec75a..078b938 100644 (file)
 #include <sys/kernel/segdev.h>
 #include <sys/util.h>
 #include <drivers/xseg_segdev.h>
-
+#include <peers/kernel/xsegbd.h>
 MODULE_DESCRIPTION("xseg_segdev");
 MODULE_AUTHOR("XSEG");
 MODULE_LICENSE("GPL");
 
-/* for now, support only one peer */
-static struct xseg *xsegments[1];
+/* FIXME */
+static struct xseg *xsegments[65536];
 static unsigned int nr_xsegments = 1;
 
 struct segpriv {
@@ -117,8 +117,8 @@ static void *segdev_map(const char *name, uint64_t size, struct xseg *seg)
        if (priv->segno >= nr_xsegments)
                goto out;
 
-       if (seg)
-               xsegments[priv->segno] = seg;
+//     if (seg)
+//             xsegments[priv->segno] = seg;
 
        xseg = (void *)dev->segment;
 out:
@@ -137,7 +137,7 @@ static void segdev_unmap(void *ptr, uint64_t size)
        if (priv->segno >= nr_xsegments)
                goto out;
 
-       xsegments[priv->segno] = NULL;
+//     xsegments[priv->segno] = NULL;
 
 out:
        /* unmap() releases the reference taken by map() */
@@ -153,21 +153,27 @@ static void segdev_callback(struct segdev *dev, xport portno)
        struct xseg_private *xpriv;
        struct xseg_port *port;
        struct segdev_signal_desc *ssd;
+
+       xseg = xsegments[portno];
+       if (!xseg)
+               return;
        if (priv->segno >= nr_xsegments)
                return;
 
-       xseg = xsegments[priv->segno];
        xpriv = xseg->priv;
        port = xseg_get_port(xseg, portno);
        if (!port)
                return;
        ssd = xseg_get_signal_desc(xseg, port);
-       if (!ssd || !ssd->waitcue)
+       if (!ssd || !ssd->waitcue){
+               XSEGLOG("portno %u has waitcue == 0", portno);
                return;
+       }
 
        if (xpriv->wakeup) {
                xpriv->wakeup(portno);
        }
+       return;
 }
 
 static struct xseg_type xseg_segdev = {
@@ -192,41 +198,16 @@ static void segdev_remote_signal_quit(void)
        return;
 }
 
-static int segdev_local_signal_init(void)
+static int segdev_local_signal_init(struct xseg *xseg, xport portno)
 {
-       struct segdev *segdev = segdev_get(0);
-       struct segpriv *segpriv;
-       int r = IS_ERR(segdev) ? PTR_ERR(segdev) : 0;
-       if (r)
-               goto out;
-
-       r = -EADDRINUSE;
-       if (xsegments[0]) 
-               goto out;
-
-       r = -ENOMEM;
-       segpriv = kmalloc(sizeof(struct segpriv), GFP_KERNEL);
-       if (!segpriv)
-               goto out;
-
-       segpriv->segno = 0;
-       segdev->callback = segdev_callback;
-       segdev->priv = segpriv;
-       r = 0;
-out:
-       segdev_put(segdev);
-       return r;
+       //assert xsegments[portno] == NULL;
+       xsegments[portno] = xseg;
 }
 
-static void segdev_local_signal_quit(void)
+static void segdev_local_signal_quit(struct xseg *xseg, xport portno)
 {
-       struct segdev *segdev = segdev_get(0);
-       int r = IS_ERR(segdev) ? PTR_ERR(segdev) : 0;
-       xsegments[0] = NULL;
-       if (!r)
-               segdev->callback = NULL;
-
-       segdev_put(segdev);
+       //assert xsegments[portno] == xseg;
+       xsegments[portno] = NULL;
        return;
 }
 
@@ -349,6 +330,8 @@ static struct xseg_peer xseg_peer_segdev = {
 
 static int segdev_init(void)
 {
+       struct segdev *segdev;
+       struct segpriv *segpriv;
        int r;
 
        XSEGLOG("registering xseg types");
@@ -359,15 +342,27 @@ static int segdev_init(void)
        r = xseg_register_peer(&xseg_peer_segdev);
        if (r)
                goto err1;
-
-       r = segdev_local_signal_init();
+       
+       segdev = segdev_get(0);
+       r = IS_ERR(segdev) ? PTR_ERR(segdev) : 0;
        if (r)
                goto err2;
 
+       r = -ENOMEM;
+       segpriv = kmalloc(sizeof(struct segpriv), GFP_KERNEL);
+       if (!segpriv)
+               goto err3;
+
+       segpriv->segno = 0;
+       segdev->callback = segdev_callback;
+       segdev->priv = segpriv;
+
        return 0;
 
+err3:
+       segdev_put(segdev);
 err2:
-       segdev_local_signal_quit();
+       xseg_unregister_peer(xseg_peer_segdev.name);
 err1:
        xseg_unregister_type(xseg_segdev.name);
 err0:
@@ -376,14 +371,15 @@ err0:
 
 static int segdev_quit(void)
 {
-       struct segdev *segdev;
-
-       /* make sure to unmap the segment first */
-       segdev = segdev_get(0);
-       clear_bit(SEGDEV_RESERVED, &segdev->flags);
-       segdev_put(segdev);
-
-       segdev_local_signal_quit();
+       struct segdev *segdev = segdev_get(0);
+       int r = IS_ERR(segdev) ? PTR_ERR(segdev) : 0;
+       if (!r){
+               /* make sure to unmap the segment first */
+               clear_bit(SEGDEV_RESERVED, &segdev->flags);
+               segdev->callback = NULL;
+               //FIXME what aboud segdev->priv?
+               segdev_put(segdev);
+       }
        xseg_unregister_peer(xseg_peer_segdev.name);
        xseg_unregister_type(xseg_segdev.name);
 
index 2e0bac2..e5545bb 100644 (file)
@@ -101,7 +101,7 @@ static void handler(int signum)
 static sigset_t savedset, set;
 static pid_t pid;
 
-static int posix_local_signal_init(void)
+static int posix_local_signal_init(struct xseg *xseg, xport portno)
 {
        void (*h)(int);
        int r;
@@ -120,7 +120,7 @@ static int posix_local_signal_init(void)
        return 0;
 }
 
-static void posix_local_signal_quit(void)
+static void posix_local_signal_quit(struct xseg *xseg, xport portno)
 {
        pid = 0;
        signal(SIGIO, SIG_DFL);
index a814164..1c1fc53 100644 (file)
@@ -150,7 +150,7 @@ static void keys_init(void)
        } while (0)
 
 /* must be called by each thread */
-static int pthread_local_signal_init(void)
+static int pthread_local_signal_init(struct xseg *xseg, xport portno)
 {
        int r;
        pid_t pid;
@@ -217,7 +217,7 @@ err1:
 }
 
 /* should be called by each thread which had initialized signals */
-static void pthread_local_signal_quit(void)
+static void pthread_local_signal_quit(struct xseg *xseg, xport portno)
 {
        sigset_t *savedset;
        struct sigaction *old_act;
index 6dd76e1..bb69b7d 100644 (file)
@@ -149,12 +149,12 @@ static struct xseg_type xseg_segdev = {
        "segdev"
 };
 
-static int segdev_local_signal_init(void)
+static int segdev_local_signal_init(struct xseg *xseg, xport portno)
 {
        return -1;
 }
 
-static void segdev_local_signal_quit(void)
+static void segdev_local_signal_quit(struct xseg *xseg, xport portno)
 {
        return;
 }
@@ -193,8 +193,10 @@ static int segdev_signal(struct xseg *xseg, uint32_t portno)
        if (!ssd)
                return -1;
 
-       if (!ssd->waitcue)
+       if (!ssd->waitcue){
+               XSEGLOG("portno %u has waitcue == 0", portno);
                return 0;
+       }
        else
                return write(opendev(), &portno, sizeof(portno));
 }
index 77da522..55c8cfe 100755 (executable)
@@ -120,7 +120,7 @@ function spawn_vlmcd {
        pgrep -f "peers/user/st-vlmcd" || \
        "${XSEG_HOME}/peers/user/st-vlmcd" -t 1 -sp "$VPORT_START" \
        -ep "$VPORT_END" -bp "$BPORT" -mp "$MPORT" -g \
-       "${SPEC}" -n ${NR_OPS} &> "${XSEG_LOGS}/vlmcd-${HOSTNAME}" &
+       "${SPEC}" -n ${NR_OPS} -v 1 &> "${XSEG_LOGS}/vlmcd-${HOSTNAME}" &
 #      alloc_requests "$VPORT:0" 128
 }
 
index 6ca471e..0c173ec 100644 (file)
@@ -52,18 +52,18 @@ static DEFINE_MUTEX(xsegbd_mutex);
 static DEFINE_SPINLOCK(xsegbd_devices_lock);
 
 
-static void __xsegbd_get(struct xsegbd_device *xsegbd_dev)
+void __xsegbd_get(struct xsegbd_device *xsegbd_dev)
 {
        atomic_inc(&xsegbd_dev->usercount);
 }
 
-static void __xsegbd_put(struct xsegbd_device *xsegbd_dev)
+void __xsegbd_put(struct xsegbd_device *xsegbd_dev)
 {
-       atomic_dec(&xsegbd_dev->usercount);
-       wake_up(&xsegbd_dev->wq);
+       if (atomic_dec_and_test(&xsegbd_dev->usercount))
+               wake_up(&xsegbd_dev->wq);
 }
 
-static struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
+struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
 {
        struct xsegbd_device *xsegbd_dev = NULL;
 
@@ -297,6 +297,7 @@ static void xsegbd_dev_release(struct device *dev)
        spin_unlock(&xsegbd_devices_lock);
        
 //     xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+       xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
        /* wait for all pending operations on device to end */
        wait_event(xsegbd_dev->wq, atomic_read(&xsegbd_dev->usercount) <= 0);
        XSEGLOG("releasing id: %d", xsegbd_dev->id);
@@ -471,17 +472,18 @@ static void xseg_request_fn(struct request_queue *rq)
 
 
                r = -EIO;
+               /* xsegbd_get here. will be put on receive */
+               __xsegbd_get(xsegbd_dev);
                p = xseg_submit(xsegbd_dev->xseg, xreq, 
                                        xsegbd_dev->src_portno, X_ALLOC);
                if (p == NoPort) {
                        XSEGLOG("coundn't submit req");
-                       BUG_ON(1);
+                       WARN_ON(1);
                        blk_end_request_err(blkreq, r);
+                       __xsegbd_put(xsegbd_dev);
                        break;
                }
                WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
-               /* xsegbd_get here. will be put on receive */
-               __xsegbd_get(xsegbd_dev);
        }
        if (xreq)
                BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
@@ -680,7 +682,7 @@ static void xseg_callback(xport portno)
                if (!xreq)
                        break;
 
-               xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+//             xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
 
                blkreq_idx = (xqindex) xreq->priv;
                if (blkreq_idx >= xsegbd_dev->nr_requests) {
@@ -839,6 +841,7 @@ out:
        return ret;
 }
 
+//FIXME
 static ssize_t xsegbd_cleanup(struct device *dev,
                                        struct device_attribute *attr,
                                        const char *buf,
@@ -1023,8 +1026,8 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
                                        xseg_callback           );
        if (!xsegbd_dev->xseg)
                goto out_freepending;
+       __sync_synchronize();
        
-
        XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target,
                        xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
        port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
@@ -1041,6 +1044,7 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
                ret = -EFAULT;
                goto out_xseg;
        }
+       xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
 
 
        /* make sure we don't get any requests until we're ready to handle them */
@@ -1048,11 +1052,13 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
 
        ret = xsegbd_dev_init(xsegbd_dev);
        if (ret)
-               goto out_xseg;
+               goto out_signal;
 
        xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
        return count;
 
+out_signal:
+       xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
 out_xseg:
        xseg_leave(xsegbd_dev->xseg);
        
index 3e1c763..f043116 100644 (file)
@@ -48,4 +48,7 @@ struct xsegbd_device {
        wait_queue_head_t wq;
 };
 
+void __xsegbd_get(struct xsegbd_device *xsegbd_dev);
+void __xsegbd_put(struct xsegbd_device *xsegbd_dev);
+struct xsegbd_device *__xsegbd_get_dev(unsigned long id);
 #endif
index c84b4e9..343866d 100644 (file)
@@ -169,7 +169,8 @@ void fail(struct peerd *peer, struct peer_req *pr)
        req->state |= XS_FAILED;
        //xseg_set_req_data(peer->xseg, pr->req, NULL);
        p = xseg_respond(peer->xseg, req, pr->portno, X_ALLOC);
-       xseg_signal(peer->xseg, p);
+       if (xseg_signal(peer->xseg, p) < 0)
+               XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
        free_peer_req(peer, pr);
        wake_up_next_thread(peer);
 }
@@ -188,7 +189,8 @@ void complete(struct peerd *peer, struct peer_req *pr)
        //timersub(&resp_end, &resp_start, &resp_end);
        //timeradd(&resp_end, &resp_accum, &resp_accum);
        //printf("xseg_signal: %u\n", p);
-       xseg_signal(peer->xseg, p);
+       if (xseg_signal(peer->xseg, p) < 0)
+               XSEGLOG2(&lc, W, "Cannot signal portno %u", p);
        free_peer_req(peer, pr);
        wake_up_next_thread(peer);
 }
index fc3f207..1a4bec6 100644 (file)
@@ -778,6 +778,7 @@ struct xseg *xseg_join(     char *segtypename,
        xseg->shared = XPTR_TAKE(__xseg->shared, __xseg);
        xseg->segment_size = size;
        xseg->segment = __xseg;
+       __sync_synchronize();
 
        r = xseg_validate_pointers(xseg);
        if (r) {
@@ -828,8 +829,22 @@ void xseg_leave(struct xseg *xseg)
 struct xseg_port* xseg_get_port(struct xseg *xseg, uint32_t portno)
 {
        xptr p;
+       if (!xseg) {
+               XSEGLOG("xseg == NULL");
+               return NULL;
+       }
+       if (!xseg->segment) {
+               XSEGLOG("xseg->segment == NULL");
+               return NULL;
+       }
        if (!__validate_port(xseg, portno))
                return NULL;
+       if (pointer_ok((unsigned long)(xseg->ports), (unsigned long)xseg->segment,
+                               xseg->segment_size, "ports"))
+               return NULL;
+       if (pointer_ok((unsigned long)(xseg->ports + portno), (unsigned long)xseg->segment,
+                               xseg->segment_size, "ports[portno]"))
+               return NULL;
        p = xseg->ports[portno];
        if (p)
                return XPTR_TAKE(p, xseg->segment);
@@ -995,7 +1010,7 @@ int xseg_init_local_signal(struct xseg *xseg, xport portno)
        if (!type)
                return -1;
 
-       return type->peer_ops.local_signal_init();
+       return type->peer_ops.local_signal_init(xseg, portno);
 }
 
 void xseg_quit_local_signal(struct xseg *xseg, xport portno)
@@ -1009,7 +1024,7 @@ void xseg_quit_local_signal(struct xseg *xseg, xport portno)
        if (!type)
                return;
 
-       type->peer_ops.local_signal_quit();
+       type->peer_ops.local_signal_quit(xseg, portno);
 }
 
 //FIXME doesn't increase alloced reqs
index 745ab77..a67f2e1 100644 (file)
@@ -99,8 +99,8 @@ struct xseg_peer_operations {
        void  (*free_data)(struct xseg *xseg, void *data);
        void *(*alloc_signal_desc)(struct xseg *xseg, void *data);
        void  (*free_signal_desc)(struct xseg *xseg, void *data, void *sd);
-       int   (*local_signal_init)(void);
-       void  (*local_signal_quit)(void);
+       int   (*local_signal_init)(struct xseg *xseg, xport portno);
+       void  (*local_signal_quit)(struct xseg *xseg, xport portno);
        int   (*remote_signal_init)(void);
        void  (*remote_signal_quit)(void);
        int   (*signal_join)(struct xseg *xseg);