add get/put support in xsegbd_dev
authorGiannakos Filippos <philipgian@cslab.ece.ntua.gr>
Fri, 5 Oct 2012 12:52:58 +0000 (15:52 +0300)
committerGiannakos Filippos <philipgian@cslab.ece.ntua.gr>
Fri, 5 Oct 2012 12:52:58 +0000 (15:52 +0300)
xseg/peers/kernel/xsegbd.c
xseg/peers/kernel/xsegbd.h
xseg/peers/user/mpeer.c
xseg/peers/user/mt-mapperd.c
xseg/peers/user/mt-vlmcd.c
xseg/xseg/xseg.c

index 7939892..3a1479c 100644 (file)
@@ -18,7 +18,7 @@
 #include <linux/bio.h>
 #include <linux/device.h>
 #include <linux/completion.h>
-
+#include <linux/wait.h>
 #include <sys/kernel/segdev.h>
 #include "xsegbd.h"
 #include <xseg/protocol.h>
@@ -52,6 +52,16 @@ static DEFINE_MUTEX(xsegbd_mutex);
 static DEFINE_SPINLOCK(xsegbd_devices_lock);
 
 
+static void __xsegbd_get(struct xsegbd_device *xsegbd_dev)
+{
+       atomic_inc(&xsegbd_dev->usercount);
+}
+
+static void __xsegbd_put(struct xsegbd_device *xsegbd_dev)
+{
+       atomic_dec(&xsegbd_dev->usercount);
+       wake_up(&xsegbd_dev->wq);
+}
 
 static struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
 {
@@ -59,6 +69,8 @@ static struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
 
        spin_lock(&xsegbd_devices_lock);
        xsegbd_dev = xsegbd_devices[id];
+       if (xsegbd_dev)
+               __xsegbd_get(xsegbd_dev);
        spin_unlock(&xsegbd_devices_lock);
 
        return xsegbd_dev;
@@ -258,6 +270,7 @@ outdisk:
        put_disk(xsegbd_dev->gd);
 outqueue:
        blk_cleanup_queue(xsegbd_dev->blk_queue);
+       xsegbd_dev->blk_queue = NULL;
 out:
        xsegbd_dev->gd = NULL;
        return ret;
@@ -265,8 +278,9 @@ out:
 
 static void xsegbd_dev_release(struct device *dev)
 {
+       int ret;
        struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
-       
+
        xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
 
        /* cleanup gendisk and blk_queue the right way */
@@ -274,23 +288,29 @@ static void xsegbd_dev_release(struct device *dev)
                if (xsegbd_dev->gd->flags & GENHD_FL_UP)
                        del_gendisk(xsegbd_dev->gd);
 
-               blk_cleanup_queue(xsegbd_dev->blk_queue);
                put_disk(xsegbd_dev->gd);
                xsegbd_mapclose(xsegbd_dev);
        }
+       
+       spin_lock(&xsegbd_devices_lock);
+       BUG_ON(xsegbd_devices[xsegbd_dev->src_portno] != xsegbd_dev);
+       xsegbd_devices[xsegbd_dev->src_portno] = NULL;
+       spin_unlock(&xsegbd_devices_lock);
+       
+       /* wait for all pending operations on device to end */
+       wait_event(xsegbd_dev->wq, atomic_read(&xsegbd_dev->usercount) <= 1);
+       if (xsegbd_dev->blk_queue)
+               blk_cleanup_queue(xsegbd_dev->blk_queue);
+
 
 //     if (xseg_free_requests(xsegbd_dev->xseg, 
 //                     xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
 //             XSEGLOG("Error trying to free requests!\n");
 
 
+       //FIXME xseg_leave to free_up resources ?
        unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
 
-       spin_lock(&xsegbd_devices_lock);
-       BUG_ON(xsegbd_devices[xsegbd_dev->src_portno] != xsegbd_dev);
-       xsegbd_devices[xsegbd_dev->src_portno] = NULL;
-       spin_unlock(&xsegbd_devices_lock);
-
        if (xsegbd_dev->blk_req_pending)
                kfree(xsegbd_dev->blk_req_pending);
        xq_free(&xsegbd_dev->blk_queue_pending);
@@ -347,6 +367,8 @@ static void xseg_request_fn(struct request_queue *rq)
        int r;
        unsigned long flags;
 
+       __xsegbd_get(xsegbd_dev);
+
        spin_unlock_irq(&xsegbd_dev->rqlock);
        for (;;) {
                if (current_thread_info()->preempt_count || irqs_disabled()){
@@ -465,6 +487,7 @@ static void xseg_request_fn(struct request_queue *rq)
                BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
                                blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
        spin_lock_irq(&xsegbd_dev->rqlock);
+       __xsegbd_put(xsegbd_dev);
 }
 
 int update_dev_sectors_from_request(   struct xsegbd_device *xsegbd_dev,
@@ -506,10 +529,13 @@ static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
        xport p;
        void *data;
        int ret = -EBUSY, r;
+
+       __xsegbd_get(xsegbd_dev);
+
        xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
                        xsegbd_dev->dst_portno, X_ALLOC);
        if (!xreq)
-               return ret;
+               goto out;
 
        BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 
                                sizeof(struct xseg_reply_info)));
@@ -517,7 +543,7 @@ static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
        init_completion(&comp);
        blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
        if (blkreq_idx == Noneidx)
-               goto out;
+               goto out_put;
        
        pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
        pending->dev = xsegbd_dev;
@@ -547,8 +573,10 @@ static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
        XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
        ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
        //XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
-out:
+out_put:
        BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
+out:
+       __xsegbd_put(xsegbd_dev);
        return ret;
 
 out_queue:
@@ -570,17 +598,19 @@ static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
        xport p;
        void *data;
        int ret = -EBUSY, r;
+
+       __xsegbd_get(xsegbd_dev);
        xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
                        xsegbd_dev->dst_portno, X_ALLOC);
        if (!xreq)
-               return ret;;
+               goto out;
 
        BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
 
        init_completion(&comp);
        blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
        if (blkreq_idx == Noneidx)
-               goto out;
+               goto out_put;
        
        pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
        pending->dev = xsegbd_dev;
@@ -609,8 +639,10 @@ static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
        ret = 0;
        if (xreq->state & XS_FAILED)
                XSEGLOG("Couldn't close disk on mapper");
-out:
+out_put:
        BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
+out:
+       __xsegbd_put(xsegbd_dev);
        return ret;
 
 out_queue:
@@ -661,6 +693,7 @@ static void xseg_callback(xport portno)
                        complete(pending->comp);
                        /* the request is blocker's responsibility so
                           we will not put_request(); */
+
                        continue;
                }
 
@@ -713,6 +746,7 @@ blk_end:
                spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
                xseg_request_fn(xsegbd_dev->blk_queue);
                spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
+               __xsegbd_put(xsegbd_dev);
        }
 }
 
@@ -931,6 +965,8 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
 
        spin_lock_init(&xsegbd_dev->rqlock);
        INIT_LIST_HEAD(&xsegbd_dev->node);
+       init_waitqueue_head(&xsegbd_dev->wq);
+       atomic_set(&xsegbd_dev->usercount, 0);
 
        /* parse cmd */
        if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
@@ -1002,7 +1038,8 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
                ret = -EFAULT;
                goto out_xseg;
        }
-       
+
+
        /* make sure we don't get any requests until we're ready to handle them */
        xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
 
@@ -1060,6 +1097,7 @@ static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count
        mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
 
        ret = count;
+       //FIXME when to put dev?
        xsegbd_dev = __xsegbd_get_dev(id);
        if (!xsegbd_dev) {
                ret = -ENOENT;
index dc0c7b1..3e1c763 100644 (file)
@@ -8,6 +8,7 @@
 
 #include <linux/kernel.h>
 #include <linux/types.h>
+#include <linux/wait.h>
 #include <xseg/xseg.h>
 #include <xtypes/xq.h>
 
@@ -43,6 +44,8 @@ struct xsegbd_device {
        struct list_head node;
        char target[XSEGBD_TARGET_NAMELEN + 1];
        uint32_t targetlen;
+       atomic_t usercount;
+       wait_queue_head_t wq;
 };
 
 #endif
index 2f62e63..830c073 100644 (file)
@@ -515,6 +515,8 @@ int main(int argc, char *argv[])
 
        //TODO err check
        peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno);
+       if (!peer)
+               return -1;
        r = custom_peer_init(peer, argc, argv);
        if (r < 0)
                return -1;
index 2042e0c..f51719d 100644 (file)
@@ -12,6 +12,8 @@
 #include <fcntl.h>
 #include <gcrypt.h>
 #include <errno.h>
+#include <sched.h>
+#include <sys/syscall.h>
 
 GCRY_THREAD_OPTION_PTHREAD_IMPL;
 
@@ -1965,6 +1967,10 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                }
        }
 
+       const struct sched_param param = { .sched_priority = 99 };
+       sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
+
+
 //     test_map(peer);
 
        return 0;
index 4fec96d..4c32209 100644 (file)
@@ -4,6 +4,8 @@
 #include <xseg/xseg.h>
 #include <xseg/protocol.h>
 #include <mpeer.h>
+#include <sched.h>
+#include <sys/syscall.h>
 
 enum io_state_enum {
        ACCEPTED = 0,
@@ -359,5 +361,8 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                }
        }
 
+       const struct sched_param param = { .sched_priority = 99 };
+       sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
+
        return 0;
 }
index 53b2996..8bb6220 100644 (file)
@@ -564,6 +564,7 @@ static long initialize_segment(struct xseg *xseg, struct xseg_config *cfg)
        mem = xheap_allocate(heap, xseg->max_peer_types * sizeof(xptr));
        if (!mem)
                return -1;
+       memset(mem, 0, xheap_get_chunk_size(mem));
        shared->peer_type_data = (xptr *) XPTR_MAKE(mem, segment);
 
        memcpy(&xseg->config, cfg, sizeof(struct xseg_config));
@@ -762,6 +763,7 @@ struct xseg *xseg_join(     char *segtypename,
                //FIXME wrong err handling
                goto err_unmap;
        }
+       memset(priv->peer_type_data, 0, sizeof(void *) * xseg->max_peer_types);
 
        xseg->priv = priv;
        xseg->config = __xseg->config;
@@ -820,6 +822,7 @@ void xseg_leave(struct xseg *xseg)
        __unlock_domain();
 
        type->ops.unmap(xseg->segment, xseg->segment_size);
+       //FIXME free xseg?
 }
 
 struct xseg_port* xseg_get_port(struct xseg *xseg, uint32_t portno)