Merge branch 'xseg-refactor' of ssh://ray.cslab.ece.ntua.gr/repos/archip into xseg...
[archipelago] / xseg / peers / kernel / xsegbd.c
index fd303b9..c511496 100644 (file)
@@ -45,7 +45,6 @@ module_param(major, int, 0644);
 module_param_string(name, name, sizeof(name), 0644);
 module_param_string(spec, spec, sizeof(spec), 0644);
 
-//static spinlock_t __lock;
 static struct xsegbd xsegbd;
 static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
 static DEFINE_MUTEX(xsegbd_mutex);
@@ -88,7 +87,7 @@ static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev)
 /* ** XSEG Initialization ** */
 /* ************************* */
 
-static void xseg_callback(struct xseg *xseg, uint32_t portno);
+static void xseg_callback(uint32_t portno);
 
 int xsegbd_xseg_init(void)
 {
@@ -201,7 +200,10 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
        if (!xsegbd_dev->blk_queue)
                goto out;
 
-       blk_init_allocated_queue(xsegbd_dev->blk_queue, xseg_request_fn, &xsegbd_dev->rqlock);
+       if (!blk_init_allocated_queue(xsegbd_dev->blk_queue, 
+                       xseg_request_fn, &xsegbd_dev->rqlock))
+               goto outqueue;
+
        xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
 
        blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
@@ -222,12 +224,7 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
        /* vkoukis says we don't need partitions */
        xsegbd_dev->gd = disk = alloc_disk(1);
        if (!disk)
-               /* FIXME: We call xsegbd_dev_release if something goes wrong, to cleanup
-                * disks/queues/etc.
-                * Would it be better to do the cleanup here, and conditionally cleanup
-                * in dev_release?
-                */
-               goto out;
+               goto outqueue;
 
        disk->major = xsegbd_dev->major;
        disk->first_minor = 0; // id * XSEGBD_MINORS;
@@ -245,7 +242,7 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
        else {
                ret = xsegbd_get_size(xsegbd_dev);
                if (ret)
-                       goto out;
+                       goto outdisk;
        }
 
        set_capacity(disk, xsegbd_dev->sectors);
@@ -254,13 +251,21 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
 
        return 0;
 
+
+outdisk:
+       put_disk(xsegbd_dev->gd);
+outqueue:
+       blk_cleanup_queue(xsegbd_dev->blk_queue);
 out:
+       xsegbd_dev->gd = NULL;
        return ret;
 }
 
 static void xsegbd_dev_release(struct device *dev)
 {
        struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
+       
+       xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
 
        /* cleanup gendisk and blk_queue the right way */
        if (xsegbd_dev->gd) {
@@ -271,16 +276,9 @@ static void xsegbd_dev_release(struct device *dev)
                put_disk(xsegbd_dev->gd);
        }
 
-       /* xsegbd actually does not need to use waiting. 
-        * maybe we can use xseg_cancel_wait for clarity
-        * with the xseg_segdev kernel driver to convert 
-        * this to a noop
-        */
-       xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
-
-       if (xseg_free_requests(xsegbd_dev->xseg, 
-                       xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
-               XSEGLOG("Error trying to free requests!\n");
+//     if (xseg_free_requests(xsegbd_dev->xseg, 
+//                     xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
+//             XSEGLOG("Error trying to free requests!\n");
 
 
        unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
@@ -356,32 +354,53 @@ static void xseg_request_fn(struct request_queue *rq)
                                                xsegbd_dev->src_portno);
                if (blkreq_idx == Noneidx)
                        break;
+               
+               if (blkreq_idx >= xsegbd_dev->nr_requests) {
+                       XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
+                       BUG_ON(1);
+                       break;
+               }
 
                blkreq = blk_fetch_request(rq);
                if (!blkreq)
                        break;
 
                if (blkreq->cmd_type != REQ_TYPE_FS) {
+                       //we lose xreq here
                        XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
                        __blk_end_request_all(blkreq, 0);
+                       continue;
                }
 
-
                datalen = blk_rq_bytes(blkreq);
-               BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, 
-                                       xsegbd_dev->targetlen, datalen));
-               BUG_ON(xreq->bufferlen - xsegbd_dev->targetlen < datalen);
+               r = xseg_prep_request(xsegbd_dev->xseg, xreq, 
+                                       xsegbd_dev->targetlen, datalen);
+               if (r < 0) {
+                       XSEGLOG("couldn't prep request");
+                       __blk_end_request_err(blkreq, r);
+                       BUG_ON(1);
+                       break;
+               }
+               r = -ENOMEM;
+               if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
+                       XSEGLOG("malformed req buffers");
+                       __blk_end_request_err(blkreq, r);
+                       BUG_ON(1);
+                       break;
+               }
 
                target = xseg_get_target(xsegbd_dev->xseg, xreq);
                strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
-               BUG_ON(blkreq_idx == Noneidx);
+
                pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
                pending->dev = xsegbd_dev;
                pending->request = blkreq;
                pending->comp = NULL;
-               xreq->priv = (uint64_t)blkreq_idx;
+               
                xreq->size = datalen;
                xreq->offset = blk_rq_pos(blkreq) << 9;
+               xreq->priv = (uint64_t) blkreq_idx;
+
                /*
                if (xreq->offset >= (sector_size << 9))
                        XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
@@ -404,15 +423,16 @@ static void xseg_request_fn(struct request_queue *rq)
                        xreq->op = X_READ;
                }
 
-               //maybe put this in loop start, and on break, 
-               //just do xseg_get_req_data
-               spin_lock(&xsegbd_dev->reqdatalock);
-               r = xseg_set_req_data(xsegbd_dev->xseg, xreq, (void *) blkreq_idx);
-               spin_unlock(&xsegbd_dev->reqdatalock);
-               BUG_ON(r < 0);
 
-               BUG_ON((p = xseg_submit(xsegbd_dev->xseg, xreq, 
-                                       xsegbd_dev->src_portno, X_ALLOC)) == NoPort);
+               r = -EIO;
+               p = xseg_submit(xsegbd_dev->xseg, xreq, 
+                                       xsegbd_dev->src_portno, X_ALLOC);
+               if (p == NoPort) {
+                       XSEGLOG("coundn't submit req");
+                       BUG_ON(1);
+                       __blk_end_request_err(blkreq, r);
+                       break;
+               }
                WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
        }
        if (xreq)
@@ -420,13 +440,17 @@ static void xseg_request_fn(struct request_queue *rq)
                                        xsegbd_dev->src_portno) == -1);
        if (blkreq_idx != Noneidx)
                BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
-                                       blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
+                               blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
 }
 
 int update_dev_sectors_from_request(   struct xsegbd_device *xsegbd_dev,
                                        struct xseg_request *xreq       )
 {
        void *data;
+       if (!xreq) {
+               XSEGLOG("Invalid xreq");
+               return -EIO;
+       }
 
        if (xreq->state & XS_FAILED)
                return -ENOENT;
@@ -435,6 +459,10 @@ int update_dev_sectors_from_request(       struct xsegbd_device *xsegbd_dev,
                return -EIO;
 
        data = xseg_get_data(xsegbd_dev->xseg, xreq);
+       if (!data) {
+               XSEGLOG("Invalid req data");
+               return -EIO;
+       }
        xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL;
        return 0;
 }
@@ -448,6 +476,7 @@ static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
        struct xsegbd_pending *pending;
        struct completion comp;
        xport p;
+       void *data;
        int ret = -EBUSY, r;
        xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
                        xsegbd_dev->dst_portno, X_ALLOC);
@@ -469,78 +498,70 @@ static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
        pending->comp = &comp;
 
        
-       spin_lock(&xsegbd_dev->reqdatalock);
-       r = xseg_set_req_data(xsegbd_dev->xseg, xreq, (void *) blkreq_idx);
-       spin_unlock(&xsegbd_dev->reqdatalock);
-       if (r < 0)
-               goto out_queue;
+       xreq->priv = (uint64_t) blkreq_idx;
 
        target = xseg_get_target(xsegbd_dev->xseg, xreq);
        strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
        xreq->size = datalen;
        xreq->offset = 0;
-
        xreq->op = X_INFO;
 
-       /* waiting is not needed.
-        * but it should be better to use xseg_prepare_wait
-        * and the xseg_segdev kernel driver, would be a no op
-        */
-
        xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
-       BUG_ON((p = xseg_submit(xsegbd_dev->xseg, xreq, 
-                                       xsegbd_dev->src_portno, X_ALLOC)) == NoPort);
+       p = xseg_submit(xsegbd_dev->xseg, xreq, 
+                               xsegbd_dev->src_portno, X_ALLOC);
+       if ( p == NoPort) {
+               XSEGLOG("couldn't submit request");
+               BUG_ON(1);
+               goto out_queue;
+       }
        WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
-
+       XSEGLOG("Before wait for completion, xreq %lx", (unsigned long) xreq);
        wait_for_completion_interruptible(&comp);
-       XSEGLOG("Woken up after wait_for_completion_interruptible()\n");
+       XSEGLOG("Woken up after wait_for_completion_interruptible(), xreq: %lx", (unsigned long) xreq);
        ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
-       XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
+       //XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
 out:
-       BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) < 0);
+       BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
        return ret;
 
 out_queue:
+       pending->dev = NULL;
+       pending->comp = NULL;
        xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
        
        goto out;
 }
 
-static void xseg_callback(struct xseg *xseg, xport portno)
+static void xseg_callback(xport portno)
 {
        struct xsegbd_device *xsegbd_dev;
        struct xseg_request *xreq;
        struct request *blkreq;
        struct xsegbd_pending *pending;
        unsigned long flags;
-       xqindex blkreq_idx;
+       xqindex blkreq_idx, ridx;
        int err;
        void *data;
 
        xsegbd_dev  = __xsegbd_get_dev(portno);
        if (!xsegbd_dev) {
-               WARN_ON(3);
+               XSEGLOG("portno: %u has no xsegbd device assigned", portno);
+               WARN_ON(1);
                return;
        }
 
        for (;;) {
+               xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
                xreq = xseg_receive(xsegbd_dev->xseg, portno);
                if (!xreq)
                        break;
 
-               spin_lock(&xsegbd_dev->reqdatalock);
-               err = xseg_get_req_data(xsegbd_dev->xseg, xreq, &data); 
-               spin_unlock(&xsegbd_dev->reqdatalock);
-               if (err < 0) {
-                       WARN_ON(2);
-                       //maybe put request?
-                       continue;
-               }
+               xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
 
-               blkreq_idx = (xqindex) data;
+               blkreq_idx = (xqindex) xreq->priv;
                if (blkreq_idx >= xsegbd_dev->nr_requests) {
                        WARN_ON(1);
-                       //maybe put request?
+                       //FIXME maybe put request?
                        continue;
                }
 
@@ -557,26 +578,47 @@ static void xseg_callback(struct xseg *xseg, xport portno)
                /* this is now treated as a block I/O request to end */
                blkreq = pending->request;
                pending->request = NULL;
-               //xsegbd_dev = pending->dev;
-               BUG_ON(xsegbd_dev != pending->dev);
+               if (xsegbd_dev != pending->dev) {
+                       //FIXME maybe put request?
+                       XSEGLOG("xsegbd_dev != pending->dev");
+                       BUG_ON(1);
+                       continue;
+               }
                pending->dev = NULL;
-               WARN_ON(!blkreq);
+               if (!blkreq){
+                       //FIXME maybe put request?
+                       XSEGLOG("blkreq does not exist");
+                       BUG_ON(1);
+                       continue;
+               }
 
+               err = -EIO;
                if (!(xreq->state & XS_SERVED))
                        goto blk_end;
 
                if (xreq->serviced != blk_rq_bytes(blkreq))
                        goto blk_end;
 
+               err = 0;
                /* unlock for data transfer? */
-               if (!rq_data_dir(blkreq))
+               if (!rq_data_dir(blkreq)){
                        xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
-
-               err = 0;
+               }       
 blk_end:
                blk_end_request_all(blkreq, err);
-               xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
-               BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) < 0);
+               
+               ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
+                                       blkreq_idx, xsegbd_dev->src_portno);
+               if (ridx == Noneidx) {
+                       XSEGLOG("couldnt append blkreq_idx");
+                       WARN_ON(1);
+               }
+
+               if (xseg_put_request(xsegbd_dev->xseg, xreq, 
+                                               xsegbd_dev->src_portno) < 0){
+                       XSEGLOG("couldn't put req");
+                       BUG_ON(1);
+               }
        }
 
        if (xsegbd_dev) {
@@ -672,6 +714,40 @@ out:
        return ret;
 }
 
+static ssize_t xsegbd_cleanup(struct device *dev,
+                                       struct device_attribute *attr,
+                                       const char *buf,
+                                       size_t size)
+{
+       struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
+       int ret = size, i;
+       struct request *blkreq = NULL;
+       struct xsegbd_pending *pending = NULL;
+       struct completion *comp = NULL;
+
+       mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
+       for (i = 0; i < xsegbd_dev->nr_requests; i++) {
+               xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, 
+                               xsegbd_dev->src_portno);
+               if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
+                       pending = &xsegbd_dev->blk_req_pending[i];
+                       blkreq = pending->request;
+                       pending->request = NULL;
+                       comp = pending->comp;
+                       pending->comp = NULL;
+                       if (blkreq)
+                               blk_end_request_all(blkreq, -EIO);
+                       if (comp)
+                               complete(comp);
+                       __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
+               }
+               xlock_release(&xsegbd_dev->blk_queue_pending.lock);
+       }
+
+       mutex_unlock(&xsegbd_mutex);
+       return ret;
+}
+
 static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
 static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
@@ -680,6 +756,7 @@ static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL);
 static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
 static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
 static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
+static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
 
 static struct attribute *xsegbd_attrs[] = {
        &dev_attr_size.attr,
@@ -690,6 +767,7 @@ static struct attribute *xsegbd_attrs[] = {
        &dev_attr_reqs.attr,
        &dev_attr_target.attr,
        &dev_attr_refresh.attr,
+       &dev_attr_cleanup.attr,
        NULL
 };
 
@@ -760,7 +838,6 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
                goto out;
 
        spin_lock_init(&xsegbd_dev->rqlock);
-       spin_lock_init(&xsegbd_dev->reqdatalock);
        INIT_LIST_HEAD(&xsegbd_dev->node);
 
        /* parse cmd */
@@ -826,8 +903,13 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
 
                goto out_xseg;
        }
-       //FIXME rollback here
-       BUG_ON(xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port));
+       
+       if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
+               XSEGLOG("portno != xsegbd_dev->src_portno");
+               BUG_ON(1);
+               ret = -EFAULT;
+               goto out_xseg;
+       }
        
        /* make sure we don't get any requests until we're ready to handle them */
        xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
@@ -836,6 +918,7 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
        if (ret)
                goto out_xseg;
 
+       xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
        return count;
 
 out_xseg:
@@ -849,7 +932,6 @@ out_freeq:
 
 out_bus:
        xsegbd_bus_del_dev(xsegbd_dev);
-
        return ret;
 
 out_blkdev: