add support for configurable max allocated requests and local req cache
[archipelago] / xseg / peers / kernel / xsegbd.c
index 6ca471e..74d7809 100644 (file)
@@ -26,7 +26,8 @@
 #define XSEGBD_MINORS 1
 /* define max request size to be used in xsegbd */
 //FIXME should we make this 4MB instead of 256KB ?
-#define XSEGBD_MAX_REQUEST_SIZE 262144U
+//#define XSEGBD_MAX_REQUEST_SIZE 262144U
+#define XSEGBD_MAX_REQUEST_SIZE 4194304U
 
 MODULE_DESCRIPTION("xsegbd");
 MODULE_AUTHOR("XSEG");
@@ -52,25 +53,12 @@ static DEFINE_MUTEX(xsegbd_mutex);
 static DEFINE_SPINLOCK(xsegbd_devices_lock);
 
 
-static void __xsegbd_get(struct xsegbd_device *xsegbd_dev)
-{
-       atomic_inc(&xsegbd_dev->usercount);
-}
-
-static void __xsegbd_put(struct xsegbd_device *xsegbd_dev)
-{
-       atomic_dec(&xsegbd_dev->usercount);
-       wake_up(&xsegbd_dev->wq);
-}
-
-static struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
+struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
 {
        struct xsegbd_device *xsegbd_dev = NULL;
 
        spin_lock(&xsegbd_devices_lock);
        xsegbd_dev = xsegbd_devices[id];
-       if (xsegbd_dev)
-               __xsegbd_get(xsegbd_dev);
        spin_unlock(&xsegbd_devices_lock);
 
        return xsegbd_dev;
@@ -210,14 +198,11 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
 
        xsegbd_dev->xsegbd = &xsegbd;
 
-       xsegbd_dev->blk_queue = blk_alloc_queue(GFP_KERNEL);
+       /* allocates and initializes queue */
+       xsegbd_dev->blk_queue = blk_init_queue(xseg_request_fn, &xsegbd_dev->rqlock);
        if (!xsegbd_dev->blk_queue)
                goto out;
 
-       if (!blk_init_allocated_queue(xsegbd_dev->blk_queue, 
-                       xseg_request_fn, &xsegbd_dev->rqlock))
-               goto outqueue;
-
        xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
 
        blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
@@ -225,10 +210,11 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
        blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
        blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
        
-       //blk_queue_max_segments(dev->blk_queue, 512);
 
        max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
        blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
+//     blk_queue_max_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 10);
+       blk_queue_max_segments(xsegbd_dev->blk_queue, 1024);
        blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
        blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
        blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
@@ -238,7 +224,7 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
        /* vkoukis says we don't need partitions */
        xsegbd_dev->gd = disk = alloc_disk(1);
        if (!disk)
-               goto outqueue;
+               goto out;
 
        disk->major = xsegbd_dev->major;
        disk->first_minor = 0; // id * XSEGBD_MINORS;
@@ -249,36 +235,27 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
        snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id);
 
        ret = 0;
-       
+
        /* allow a non-zero sector_size parameter to override the disk size */
        if (sector_size)
                xsegbd_dev->sectors = sector_size;
        else {
                ret = xsegbd_get_size(xsegbd_dev);
                if (ret)
-                       goto outdisk;
+                       goto out;
        }
 
        set_capacity(disk, xsegbd_dev->sectors);
        XSEGLOG("xsegbd active...");
        add_disk(disk); /* immediately activates the device */
 
-       return 0;
-
-
-outdisk:
-       put_disk(xsegbd_dev->gd);
-outqueue:
-       blk_cleanup_queue(xsegbd_dev->blk_queue);
 out:
-       xsegbd_dev->blk_queue = NULL;
-       xsegbd_dev->gd = NULL;
+       /* on error, everything is cleaned up in xsegbd_dev_release */
        return ret;
 }
 
 static void xsegbd_dev_release(struct device *dev)
 {
-       int ret;
        struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
 
 
@@ -287,37 +264,40 @@ static void xsegbd_dev_release(struct device *dev)
                if (xsegbd_dev->gd->flags & GENHD_FL_UP)
                        del_gendisk(xsegbd_dev->gd);
 
-               put_disk(xsegbd_dev->gd);
                xsegbd_mapclose(xsegbd_dev);
        }
-       
+
        spin_lock(&xsegbd_devices_lock);
        BUG_ON(xsegbd_devices[xsegbd_dev->src_portno] != xsegbd_dev);
        xsegbd_devices[xsegbd_dev->src_portno] = NULL;
        spin_unlock(&xsegbd_devices_lock);
-       
-//     xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
-       /* wait for all pending operations on device to end */
-       wait_event(xsegbd_dev->wq, atomic_read(&xsegbd_dev->usercount) <= 0);
+
        XSEGLOG("releasing id: %d", xsegbd_dev->id);
+//     xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+       xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+
        if (xsegbd_dev->blk_queue)
                blk_cleanup_queue(xsegbd_dev->blk_queue);
-
+       if (xsegbd_dev->gd)
+               put_disk(xsegbd_dev->gd);
 
 //     if (xseg_free_requests(xsegbd_dev->xseg, 
 //                     xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
 //             XSEGLOG("Error trying to free requests!\n");
 
+       if (xsegbd_dev->xseg){
+               xseg_leave(xsegbd_dev->xseg);
+               xsegbd_dev->xseg = NULL;
+       }
 
-       //FIXME xseg_leave to free_up resources ?
        unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
 
-       if (xsegbd_dev->blk_req_pending)
+       if (xsegbd_dev->blk_req_pending){
                kfree(xsegbd_dev->blk_req_pending);
+               xsegbd_dev->blk_req_pending = NULL;
+       }
        xq_free(&xsegbd_dev->blk_queue_pending);
-
        kfree(xsegbd_dev);
-
        module_put(THIS_MODULE);
 }
 
@@ -368,8 +348,6 @@ static void xseg_request_fn(struct request_queue *rq)
        int r;
        unsigned long flags;
 
-       __xsegbd_get(xsegbd_dev);
-
        spin_unlock_irq(&xsegbd_dev->rqlock);
        for (;;) {
                if (current_thread_info()->preempt_count || irqs_disabled()){
@@ -390,14 +368,14 @@ static void xseg_request_fn(struct request_queue *rq)
                                                xsegbd_dev->src_portno);
                if (blkreq_idx == Noneidx)
                        break;
-               
+
                if (blkreq_idx >= xsegbd_dev->nr_requests) {
                        XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
                        BUG_ON(1);
                        break;
                }
 
-               
+
                spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
                blkreq = blk_fetch_request(rq);
                if (!blkreq){
@@ -406,7 +384,7 @@ static void xseg_request_fn(struct request_queue *rq)
                }
 
                if (blkreq->cmd_type != REQ_TYPE_FS) {
-                       //we lose xreq here
+                       //FIXME we lose xreq here
                        XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
                        __blk_end_request_all(blkreq, 0);
                        spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
@@ -442,7 +420,7 @@ static void xseg_request_fn(struct request_queue *rq)
                pending->dev = xsegbd_dev;
                pending->request = blkreq;
                pending->comp = NULL;
-               
+
                xreq->size = datalen;
                xreq->offset = blk_rq_pos(blkreq) << 9;
                xreq->priv = (uint64_t) blkreq_idx;
@@ -462,7 +440,6 @@ static void xseg_request_fn(struct request_queue *rq)
                        xreq->flags |= XF_FUA;
 
                if (rq_data_dir(blkreq)) {
-                       /* unlock for data transfers? */
                        blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
                        xreq->op = X_WRITE;
                } else {
@@ -470,18 +447,17 @@ static void xseg_request_fn(struct request_queue *rq)
                }
 
 
+//             XSEGLOG("%s : %lu (%lu)", xsegbd_dev->target, xreq->offset, xreq->datalen);
                r = -EIO;
                p = xseg_submit(xsegbd_dev->xseg, xreq, 
                                        xsegbd_dev->src_portno, X_ALLOC);
                if (p == NoPort) {
                        XSEGLOG("coundn't submit req");
-                       BUG_ON(1);
+                       WARN_ON(1);
                        blk_end_request_err(blkreq, r);
                        break;
                }
                WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
-               /* xsegbd_get here. will be put on receive */
-               __xsegbd_get(xsegbd_dev);
        }
        if (xreq)
                BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
@@ -490,7 +466,6 @@ static void xseg_request_fn(struct request_queue *rq)
                BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
                                blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
        spin_lock_irq(&xsegbd_dev->rqlock);
-       __xsegbd_put(xsegbd_dev);
 }
 
 int update_dev_sectors_from_request(   struct xsegbd_device *xsegbd_dev,
@@ -525,15 +500,11 @@ static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
 {
        struct xseg_request *xreq;
        char *target;
-       uint64_t datalen;
        xqindex blkreq_idx;
        struct xsegbd_pending *pending;
        struct completion comp;
        xport p;
-       void *data;
-       int ret = -EBUSY, r;
-
-       __xsegbd_get(xsegbd_dev);
+       int ret = -EBUSY;
 
        xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
                        xsegbd_dev->dst_portno, X_ALLOC);
@@ -547,13 +518,13 @@ static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
        blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
        if (blkreq_idx == Noneidx)
                goto out_put;
-       
+
        pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
        pending->dev = xsegbd_dev;
        pending->request = NULL;
        pending->comp = &comp;
 
-       
+
        xreq->priv = (uint64_t) blkreq_idx;
 
        target = xseg_get_target(xsegbd_dev->xseg, xreq);
@@ -575,34 +546,28 @@ static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
        wait_for_completion_interruptible(&comp);
        XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
        ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
-       //XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
-out_put:
-       BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
-out:
-       __xsegbd_put(xsegbd_dev);
-       return ret;
+       XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
 
 out_queue:
        pending->dev = NULL;
        pending->comp = NULL;
        xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
-       
-       goto out;
+out_put:
+       BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
+out:
+       return ret;
 }
 
 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
 {
        struct xseg_request *xreq;
        char *target;
-       uint64_t datalen;
        xqindex blkreq_idx;
        struct xsegbd_pending *pending;
        struct completion comp;
        xport p;
-       void *data;
-       int ret = -EBUSY, r;
+       int ret = -EBUSY;
 
-       __xsegbd_get(xsegbd_dev);
        xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
                        xsegbd_dev->dst_portno, X_ALLOC);
        if (!xreq)
@@ -614,13 +579,13 @@ static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
        blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
        if (blkreq_idx == Noneidx)
                goto out_put;
-       
+
        pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
        pending->dev = xsegbd_dev;
        pending->request = NULL;
        pending->comp = &comp;
 
-       
+
        xreq->priv = (uint64_t) blkreq_idx;
 
        target = xseg_get_target(xsegbd_dev->xseg, xreq);
@@ -642,18 +607,15 @@ static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
        ret = 0;
        if (xreq->state & XS_FAILED)
                XSEGLOG("Couldn't close disk on mapper");
-out_put:
-       BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
-out:
-       __xsegbd_put(xsegbd_dev);
-       return ret;
 
 out_queue:
        pending->dev = NULL;
        pending->comp = NULL;
        xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
-       
-       goto out;
+out_put:
+       BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
+out:
+       return ret;
 }
 
 static void xseg_callback(xport portno)
@@ -665,7 +627,6 @@ static void xseg_callback(xport portno)
        unsigned long flags;
        xqindex blkreq_idx, ridx;
        int err;
-       void *data;
 
        xsegbd_dev  = __xsegbd_get_dev(portno);
        if (!xsegbd_dev) {
@@ -680,7 +641,7 @@ static void xseg_callback(xport portno)
                if (!xreq)
                        break;
 
-               xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+//             xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
 
                blkreq_idx = (xqindex) xreq->priv;
                if (blkreq_idx >= xsegbd_dev->nr_requests) {
@@ -696,7 +657,6 @@ static void xseg_callback(xport portno)
                        complete(pending->comp);
                        /* the request is blocker's responsibility so
                           we will not put_request(); */
-
                        continue;
                }
 
@@ -706,14 +666,14 @@ static void xseg_callback(xport portno)
                if (xsegbd_dev != pending->dev) {
                        //FIXME maybe put request?
                        XSEGLOG("xsegbd_dev != pending->dev");
-                       BUG_ON(1);
+                       WARN_ON(1);
                        continue;
                }
                pending->dev = NULL;
                if (!blkreq){
                        //FIXME maybe put request?
                        XSEGLOG("blkreq does not exist");
-                       BUG_ON(1);
+                       WARN_ON(1);
                        continue;
                }
 
@@ -727,10 +687,10 @@ static void xseg_callback(xport portno)
                err = 0;
                if (!rq_data_dir(blkreq)){
                        xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
-               }       
+               }
 blk_end:
                blk_end_request_all(blkreq, err);
-               
+
                ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
                                        blkreq_idx, xsegbd_dev->src_portno);
                if (ridx == Noneidx) {
@@ -743,13 +703,11 @@ blk_end:
                        XSEGLOG("couldn't put req");
                        BUG_ON(1);
                }
-               __xsegbd_put(xsegbd_dev);
        }
        if (xsegbd_dev) {
                spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
                xseg_request_fn(xsegbd_dev->blk_queue);
                spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
-               __xsegbd_put(xsegbd_dev);
        }
 }
 
@@ -839,6 +797,8 @@ out:
        return ret;
 }
 
+//FIXME
+//maybe try callback, first and then do a more invasive cleanup
 static ssize_t xsegbd_cleanup(struct device *dev,
                                        struct device_attribute *attr,
                                        const char *buf,
@@ -968,8 +928,6 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
 
        spin_lock_init(&xsegbd_dev->rqlock);
        INIT_LIST_HEAD(&xsegbd_dev->node);
-       init_waitqueue_head(&xsegbd_dev->wq);
-       atomic_set(&xsegbd_dev->usercount, 0);
 
        /* parse cmd */
        if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
@@ -1012,7 +970,7 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
                        xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
                                   GFP_KERNEL);
        if (!xsegbd_dev->blk_req_pending)
-               goto out_freeq;
+               goto out_bus;
 
        
        XSEGLOG("joining segment");
@@ -1022,9 +980,8 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
                                        "segdev",
                                        xseg_callback           );
        if (!xsegbd_dev->xseg)
-               goto out_freepending;
+               goto out_bus;
        
-
        XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target,
                        xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
        port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
@@ -1032,15 +989,16 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
                XSEGLOG("cannot bind to port");
                ret = -EFAULT;
 
-               goto out_xseg;
+               goto out_bus;
        }
        
        if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
                XSEGLOG("portno != xsegbd_dev->src_portno");
                BUG_ON(1);
                ret = -EFAULT;
-               goto out_xseg;
+               goto out_bus;
        }
+       xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
 
 
        /* make sure we don't get any requests until we're ready to handle them */
@@ -1048,20 +1006,11 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
 
        ret = xsegbd_dev_init(xsegbd_dev);
        if (ret)
-               goto out_xseg;
+               goto out_bus;
 
        xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
        return count;
 
-out_xseg:
-       xseg_leave(xsegbd_dev->xseg);
-       
-out_freepending:
-       kfree(xsegbd_dev->blk_req_pending);
-
-out_freeq:
-       xq_free(&xsegbd_dev->blk_queue_pending);
-
 out_bus:
        xsegbd_bus_del_dev(xsegbd_dev);
        return ret;
@@ -1105,7 +1054,6 @@ static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count
                ret = -ENOENT;
                goto out_unlock;
        }
-       __xsegbd_put(xsegbd_dev);
        xsegbd_bus_del_dev(xsegbd_dev);
 
 out_unlock: