X-Git-Url: https://code.grnet.gr/git/archipelago/blobdiff_plain/4720c1500ce2b2686d6fcc4e5e719169a173cfc6..2d8188235729e9cac09c9d003cd4aa1200ab6c80:/xseg/peers/kernel/xsegbd.c diff --git a/xseg/peers/kernel/xsegbd.c b/xseg/peers/kernel/xsegbd.c index f42f0fd..38aa85f 100644 --- a/xseg/peers/kernel/xsegbd.c +++ b/xseg/peers/kernel/xsegbd.c @@ -1,3 +1,22 @@ +/* + * Copyright (C) 2012 GRNET S.A. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + /* xsegbd.c * */ @@ -18,11 +37,14 @@ #include #include #include - +#include #include #include "xsegbd.h" +#include #define XSEGBD_MINORS 1 +/* define max request size to be used in xsegbd */ +#define XSEGBD_MAX_REQUEST_SIZE 4194304U MODULE_DESCRIPTION("xsegbd"); MODULE_AUTHOR("XSEG"); @@ -31,31 +53,41 @@ MODULE_LICENSE("GPL"); static long sector_size = 0; static long blksize = 512; static int major = 0; -static int max_nr_pending = 1024; +static int max_dev = 200; +static long start_portno = 0; +static long end_portno = 199; static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd"; -static char spec[256] = "segdev:xsegbd:4:512:64:1024:12"; +static char spec[256] = "segdev:xsegbd:512:1024:12"; module_param(sector_size, long, 0644); module_param(blksize, long, 0644); -module_param(max_nr_pending, int, 0644); +module_param(start_portno, long, 0644); +module_param(end_portno, long, 0644); module_param(major, int, 0644); module_param_string(name, name, sizeof(name), 0644); module_param_string(spec, spec, sizeof(spec), 0644); -struct pending { - struct request *request; - struct completion *comp; - struct xsegbd_device *dev; -}; - -static struct xq blk_queue_pending; -static struct pending *blk_req_pending; -static unsigned int nr_pending; -static spinlock_t __lock; static struct xsegbd xsegbd; +static struct xsegbd_device **xsegbd_devices; /* indexed by portno */ static DEFINE_MUTEX(xsegbd_mutex); -static LIST_HEAD(xsegbd_dev_list); -static DEFINE_SPINLOCK(xsegbd_dev_list_lock); +static DEFINE_SPINLOCK(xsegbd_devices_lock); + + +struct xsegbd_device *__xsegbd_get_dev(unsigned long id) +{ + struct xsegbd_device *xsegbd_dev = NULL; + + spin_lock(&xsegbd_devices_lock); + xsegbd_dev = xsegbd_devices[id]; + spin_unlock(&xsegbd_devices_lock); + + return xsegbd_dev; +} + +static int src_portno_to_id(xport src_portno) +{ + return (src_portno - start_portno); +} /* ************************* */ /* ***** sysfs helpers ***** */ @@ -81,7 +113,7 @@ static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev) /* ** XSEG Initialization ** */ /* ************************* */ -static void xseg_callback(struct xseg *xseg, uint32_t portno); +static void xseg_callback(uint32_t portno); int xsegbd_xseg_init(void) { @@ -104,6 +136,7 @@ int xsegbd_xseg_init(void) XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'", xsegbd.config.type); + /* leave it here for now */ XSEGLOG("joining segment"); xsegbd.xseg = xseg_join( xsegbd.config.type, xsegbd.config.name, @@ -178,6 +211,7 @@ static const struct block_device_operations xsegbd_ops = { static void xseg_request_fn(struct request_queue *rq); static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev); +static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev); static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev) { @@ -185,15 +219,15 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev) struct gendisk *disk; unsigned int max_request_size_bytes; - spin_lock_init(&xsegbd_dev->lock); + spin_lock_init(&xsegbd_dev->rqlock); xsegbd_dev->xsegbd = &xsegbd; - xsegbd_dev->blk_queue = blk_alloc_queue(GFP_KERNEL); + /* allocates and initializes queue */ + xsegbd_dev->blk_queue = blk_init_queue(xseg_request_fn, &xsegbd_dev->rqlock); if (!xsegbd_dev->blk_queue) goto out; - blk_init_allocated_queue(xsegbd_dev->blk_queue, xseg_request_fn, &xsegbd_dev->lock); xsegbd_dev->blk_queue->queuedata = xsegbd_dev; blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA); @@ -201,15 +235,11 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev) blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize); blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY); - //blk_queue_max_segments(dev->blk_queue, 512); - /* calculate maximum block request size - * request size in pages * page_size - * leave one page in buffer for name - */ - max_request_size_bytes = - (unsigned int) (xsegbd.config.request_size - 1) * - ( 1 << xsegbd.config.page_shift) ; + + max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE; blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9); +// blk_queue_max_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 10); + blk_queue_max_segments(xsegbd_dev->blk_queue, 1024); blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes); blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes); blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes); @@ -217,12 +247,12 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev) queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue); /* vkoukis says we don't need partitions */ - xsegbd_dev->gd = disk = alloc_disk(1); + xsegbd_dev->gd = disk = alloc_disk(XSEGBD_MINORS); if (!disk) - goto out_disk; + goto out; disk->major = xsegbd_dev->major; - disk->first_minor = 0; // id * XSEGBD_MINORS; + disk->first_minor = xsegbd_dev->id * XSEGBD_MINORS; disk->fops = &xsegbd_ops; disk->queue = xsegbd_dev->blk_queue; disk->private_data = xsegbd_dev; @@ -230,15 +260,6 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev) snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id); ret = 0; - spin_lock_irq(&__lock); - if (nr_pending + xsegbd_dev->nr_requests > max_nr_pending) - ret = -ENOBUFS; - else - nr_pending += xsegbd_dev->nr_requests; - spin_unlock_irq(&__lock); - - if (ret) - goto out_disk; /* allow a non-zero sector_size parameter to override the disk size */ if (sector_size) @@ -246,54 +267,60 @@ static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev) else { ret = xsegbd_get_size(xsegbd_dev); if (ret) - goto out_disk; + goto out; } - set_capacity(disk, xsegbd_dev->sectors); XSEGLOG("xsegbd active..."); add_disk(disk); /* immediately activates the device */ - return 0; - -out_disk: - put_disk(disk); out: + /* on error, everything is cleaned up in xsegbd_dev_release */ return ret; } static void xsegbd_dev_release(struct device *dev) { struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev); - struct xseg_port *port; + /* cleanup gendisk and blk_queue the right way */ if (xsegbd_dev->gd) { if (xsegbd_dev->gd->flags & GENHD_FL_UP) del_gendisk(xsegbd_dev->gd); - blk_cleanup_queue(xsegbd_dev->blk_queue); - put_disk(xsegbd_dev->gd); + xsegbd_mapclose(xsegbd_dev); } - /* reset the port's waitcue (aka cancel_wait) */ - port = &xsegbd.xseg->ports[xsegbd_dev->src_portno]; - port->waitcue = (long) NULL; + spin_lock(&xsegbd_devices_lock); + BUG_ON(xsegbd_devices[xsegbd_dev->id] != xsegbd_dev); + xsegbd_devices[xsegbd_dev->id] = NULL; + spin_unlock(&xsegbd_devices_lock); - xseg_free_requests(xsegbd.xseg, xsegbd_dev->src_portno, xsegbd_dev->nr_requests); + XSEGLOG("releasing id: %d", xsegbd_dev->id); +// xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno); + xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno); - WARN_ON(nr_pending < xsegbd_dev->nr_requests); - spin_lock_irq(&__lock); - nr_pending -= xsegbd_dev->nr_requests; - spin_unlock_irq(&__lock); + if (xsegbd_dev->blk_queue) + blk_cleanup_queue(xsegbd_dev->blk_queue); + if (xsegbd_dev->gd) + put_disk(xsegbd_dev->gd); - unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME); +// if (xseg_free_requests(xsegbd_dev->xseg, +// xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0) +// XSEGLOG("Error trying to free requests!\n"); - spin_lock(&xsegbd_dev_list_lock); - list_del_init(&xsegbd_dev->node); - spin_unlock(&xsegbd_dev_list_lock); - kfree(xsegbd_dev); + if (xsegbd_dev->xseg){ + xseg_leave(xsegbd_dev->xseg); + xsegbd_dev->xseg = NULL; + } + if (xsegbd_dev->blk_req_pending){ + kfree(xsegbd_dev->blk_req_pending); + xsegbd_dev->blk_req_pending = NULL; + } + xq_free(&xsegbd_dev->blk_queue_pending); + kfree(xsegbd_dev); module_put(THIS_MODULE); } @@ -307,7 +334,7 @@ static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq, struct bio_vec *bvec; struct req_iterator iter; uint64_t off = 0; - char *data = XSEG_TAKE_PTR(xreq->data, xseg->segment); + char *data = xseg_get_data(xseg, xreq); rq_for_each_segment(bvec, blkreq, iter) { char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset; memcpy(data + off, bdata, bvec->bv_len); @@ -322,7 +349,7 @@ static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq, struct bio_vec *bvec; struct req_iterator iter; uint64_t off = 0; - char *data = XSEG_TAKE_PTR(xreq->data, xseg->segment); + char *data = xseg_get_data(xseg, xreq); rq_for_each_segment(bvec, blkreq, iter) { char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset; memcpy(bdata, data + off, bvec->bv_len); @@ -336,41 +363,91 @@ static void xseg_request_fn(struct request_queue *rq) struct xseg_request *xreq; struct xsegbd_device *xsegbd_dev = rq->queuedata; struct request *blkreq; - struct pending *pending; + struct xsegbd_pending *pending; xqindex blkreq_idx; - char *name; - uint64_t datasize; + char *target; + uint64_t datalen; + xport p; + int r; + unsigned long flags; + spin_unlock_irq(&xsegbd_dev->rqlock); for (;;) { - xreq = xseg_get_request(xsegbd.xseg, xsegbd_dev->src_portno); + if (current_thread_info()->preempt_count || irqs_disabled()){ + XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ", + current_thread_info()->preempt_count, irqs_disabled()); + } + //XSEGLOG("Priority: %d", current_thread_info()->task->prio); + //XSEGLOG("Static priority: %d", current_thread_info()->task->static_prio); + //XSEGLOG("Normal priority: %d", current_thread_info()->task->normal_prio); + //XSEGLOG("Rt_priority: %u", current_thread_info()->task->rt_priority); + blkreq_idx = Noneidx; + xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, + xsegbd_dev->dst_portno, X_ALLOC); if (!xreq) break; + blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, + xsegbd_dev->src_portno); + if (blkreq_idx == Noneidx) + break; + + if (blkreq_idx >= xsegbd_dev->nr_requests) { + XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests"); + WARN_ON(1); + break; + } + + + spin_lock_irqsave(&xsegbd_dev->rqlock, flags); blkreq = blk_fetch_request(rq); - if (!blkreq) + if (!blkreq){ + spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags); break; + } if (blkreq->cmd_type != REQ_TYPE_FS) { + //FIXME we lose xreq here XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type); __blk_end_request_all(blkreq, 0); + spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags); + continue; + } + spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags); + if (current_thread_info()->preempt_count || irqs_disabled()){ + XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ", + current_thread_info()->preempt_count, irqs_disabled()); } + datalen = blk_rq_bytes(blkreq); + r = xseg_prep_request(xsegbd_dev->xseg, xreq, + xsegbd_dev->targetlen, datalen); + if (r < 0) { + XSEGLOG("couldn't prep request"); + blk_end_request_err(blkreq, r); + WARN_ON(1); + break; + } + r = -ENOMEM; + if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){ + XSEGLOG("malformed req buffers"); + blk_end_request_err(blkreq, r); + WARN_ON(1); + break; + } - datasize = blk_rq_bytes(blkreq); - BUG_ON(xreq->buffersize - xsegbd_dev->namesize < datasize); - BUG_ON(xseg_prep_request(xreq, xsegbd_dev->namesize, datasize)); + target = xseg_get_target(xsegbd_dev->xseg, xreq); + strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen); - name = XSEG_TAKE_PTR(xreq->name, xsegbd.xseg->segment); - strncpy(name, xsegbd_dev->name, xsegbd_dev->namesize); - blkreq_idx = xq_pop_head(&blk_queue_pending); - BUG_ON(blkreq_idx == None); - pending = &blk_req_pending[blkreq_idx]; + pending = &xsegbd_dev->blk_req_pending[blkreq_idx]; pending->dev = xsegbd_dev; pending->request = blkreq; pending->comp = NULL; - xreq->priv = (uint64_t)blkreq_idx; - xreq->size = datasize; + + xreq->size = datalen; xreq->offset = blk_rq_pos(blkreq) << 9; + xreq->priv = (uint64_t) blkreq_idx; + /* if (xreq->offset >= (sector_size << 9)) XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u", @@ -386,29 +463,42 @@ static void xseg_request_fn(struct request_queue *rq) xreq->flags |= XF_FUA; if (rq_data_dir(blkreq)) { - /* unlock for data transfers? */ - blk_to_xseg(xsegbd.xseg, xreq, blkreq); + blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq); xreq->op = X_WRITE; } else { xreq->op = X_READ; } - BUG_ON(xseg_submit(xsegbd.xseg, xsegbd_dev->dst_portno, xreq) == NoSerial); - } - /* TODO: - * This is going to happen at least once. - * Add a WARN_ON when debugging find out why it happens more than once. - */ - xseg_signal(xsegbd_dev->xsegbd->xseg, xsegbd_dev->dst_portno); +// XSEGLOG("%s : %lu (%lu)", xsegbd_dev->target, xreq->offset, xreq->datalen); + r = -EIO; + p = xseg_submit(xsegbd_dev->xseg, xreq, + xsegbd_dev->src_portno, X_ALLOC); + if (p == NoPort) { + XSEGLOG("coundn't submit req"); + WARN_ON(1); + blk_end_request_err(blkreq, r); + break; + } + WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0); + } if (xreq) - xseg_put_request(xsegbd_dev->xsegbd->xseg, xsegbd_dev->src_portno, xreq); + WARN_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, + xsegbd_dev->src_portno) == -1); + if (blkreq_idx != Noneidx) + WARN_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, + blkreq_idx, xsegbd_dev->src_portno) == Noneidx); + spin_lock_irq(&xsegbd_dev->rqlock); } int update_dev_sectors_from_request( struct xsegbd_device *xsegbd_dev, struct xseg_request *xreq ) { void *data; + if (!xreq) { + XSEGLOG("Invalid xreq"); + return -EIO; + } if (xreq->state & XS_FAILED) return -ENOENT; @@ -416,7 +506,15 @@ int update_dev_sectors_from_request( struct xsegbd_device *xsegbd_dev, if (!(xreq->state & XS_SERVED)) return -EIO; - data = XSEG_TAKE_PTR(xreq->data, xsegbd.xseg->segment); + data = xseg_get_data(xsegbd_dev->xseg, xreq); + if (!data) { + XSEGLOG("Invalid req data"); + return -EIO; + } + if (!xsegbd_dev) { + XSEGLOG("Invalid xsegbd_dev"); + return -ENOENT; + } xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL; return 0; } @@ -424,76 +522,158 @@ int update_dev_sectors_from_request( struct xsegbd_device *xsegbd_dev, static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev) { struct xseg_request *xreq; - struct xseg_port *port; - char *name; - uint64_t datasize; + char *target; xqindex blkreq_idx; - struct pending *pending; + struct xsegbd_pending *pending; struct completion comp; + xport p; int ret = -EBUSY; - xreq = xseg_get_request(xsegbd.xseg, xsegbd_dev->src_portno); + xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, + xsegbd_dev->dst_portno, X_ALLOC); if (!xreq) goto out; - datasize = sizeof(uint64_t); - BUG_ON(xreq->buffersize - xsegbd_dev->namesize < datasize); - BUG_ON(xseg_prep_request(xreq, xsegbd_dev->namesize, datasize)); + WARN_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, + sizeof(struct xseg_reply_info))); init_completion(&comp); - blkreq_idx = xq_pop_head(&blk_queue_pending); - BUG_ON(blkreq_idx == None); - pending = &blk_req_pending[blkreq_idx]; + blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1); + if (blkreq_idx == Noneidx) + goto out_put; + + pending = &xsegbd_dev->blk_req_pending[blkreq_idx]; pending->dev = xsegbd_dev; pending->request = NULL; pending->comp = ∁ - xreq->priv = (uint64_t)blkreq_idx; - - name = XSEG_TAKE_PTR(xreq->name, xsegbd.xseg->segment); - strncpy(name, xsegbd_dev->name, xsegbd_dev->namesize); - xreq->size = datasize; - xreq->offset = 0; - xreq->op = X_INFO; - port = &xsegbd.xseg->ports[xsegbd_dev->src_portno]; - port->waitcue = (uint64_t)(long)xsegbd_dev; + xreq->priv = (uint64_t) blkreq_idx; - BUG_ON(xseg_submit(xsegbd.xseg, xsegbd_dev->dst_portno, xreq) == NoSerial); - xseg_signal(xsegbd.xseg, xsegbd_dev->dst_portno); + target = xseg_get_target(xsegbd_dev->xseg, xreq); + strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen); + xreq->size = xreq->datalen; + xreq->offset = 0; + xreq->op = X_INFO; + xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno); + p = xseg_submit(xsegbd_dev->xseg, xreq, + xsegbd_dev->src_portno, X_ALLOC); + if ( p == NoPort) { + XSEGLOG("couldn't submit request"); + WARN_ON(1); + goto out_queue; + } + WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0); + XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx); wait_for_completion_interruptible(&comp); - XSEGLOG("Woken up after wait_for_completion_interruptible()\n"); + XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx); ret = update_dev_sectors_from_request(xsegbd_dev, xreq); XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors); + +out_queue: + pending->dev = NULL; + pending->comp = NULL; + xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1); +out_put: + WARN_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1); out: - xseg_put_request(xsegbd.xseg, xsegbd_dev->src_portno, xreq); return ret; } -static void xseg_callback(struct xseg *xseg, uint32_t portno) +static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev) { - struct xsegbd_device *xsegbd_dev = NULL, *old_dev = NULL; + struct xseg_request *xreq; + char *target; + xqindex blkreq_idx; + struct xsegbd_pending *pending; + struct completion comp; + xport p; + int ret = -EBUSY; + + xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, + xsegbd_dev->dst_portno, X_ALLOC); + if (!xreq) + goto out; + + WARN_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0)); + + init_completion(&comp); + blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1); + if (blkreq_idx == Noneidx) + goto out_put; + + pending = &xsegbd_dev->blk_req_pending[blkreq_idx]; + pending->dev = xsegbd_dev; + pending->request = NULL; + pending->comp = ∁ + + + xreq->priv = (uint64_t) blkreq_idx; + + target = xseg_get_target(xsegbd_dev->xseg, xreq); + strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen); + xreq->size = xreq->datalen; + xreq->offset = 0; + xreq->op = X_CLOSE; + + xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno); + p = xseg_submit(xsegbd_dev->xseg, xreq, + xsegbd_dev->src_portno, X_ALLOC); + if ( p == NoPort) { + XSEGLOG("couldn't submit request"); + WARN_ON(1); + goto out_queue; + } + WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0); + wait_for_completion_interruptible(&comp); + ret = 0; + if (xreq->state & XS_FAILED) + XSEGLOG("Couldn't close disk on mapper"); + +out_queue: + pending->dev = NULL; + pending->comp = NULL; + xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1); +out_put: + WARN_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1); +out: + return ret; +} + +static void xseg_callback(xport portno) +{ + struct xsegbd_device *xsegbd_dev; struct xseg_request *xreq; struct request *blkreq; - struct pending *pending; + struct xsegbd_pending *pending; unsigned long flags; - uint32_t blkreq_idx; + xqindex blkreq_idx, ridx; int err; + xsegbd_dev = __xsegbd_get_dev(portno); + if (!xsegbd_dev) { + XSEGLOG("portno: %u has no xsegbd device assigned", portno); + WARN_ON(1); + return; + } + for (;;) { - xreq = xseg_receive(xseg, portno); + xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno); + xreq = xseg_receive(xsegbd_dev->xseg, portno, 0); if (!xreq) break; - /* we rely upon our peers to not have touched ->priv */ - blkreq_idx = (uint64_t)xreq->priv; - if (blkreq_idx >= max_nr_pending) { +// xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno); + + blkreq_idx = (xqindex) xreq->priv; + if (blkreq_idx >= xsegbd_dev->nr_requests) { WARN_ON(1); + //FIXME maybe put request? continue; } - pending = &blk_req_pending[blkreq_idx]; + pending = &xsegbd_dev->blk_req_pending[blkreq_idx]; if (pending->comp) { /* someone is blocking on this request and will handle it when we wake them up. */ @@ -506,39 +686,51 @@ static void xseg_callback(struct xseg *xseg, uint32_t portno) /* this is now treated as a block I/O request to end */ blkreq = pending->request; pending->request = NULL; - xsegbd_dev = pending->dev; + if (xsegbd_dev != pending->dev) { + //FIXME maybe put request? + XSEGLOG("xsegbd_dev != pending->dev"); + WARN_ON(1); + continue; + } pending->dev = NULL; - WARN_ON(!blkreq); - - if ((xsegbd_dev != old_dev) && old_dev) { - spin_lock_irqsave(&old_dev->lock, flags); - xseg_request_fn(old_dev->blk_queue); - spin_unlock_irqrestore(&old_dev->lock, flags); + if (!blkreq){ + //FIXME maybe put request? + XSEGLOG("blkreq does not exist"); + WARN_ON(1); + continue; } - old_dev = xsegbd_dev; - + err = -EIO; if (!(xreq->state & XS_SERVED)) goto blk_end; if (xreq->serviced != blk_rq_bytes(blkreq)) goto blk_end; - /* unlock for data transfer? */ - if (!rq_data_dir(blkreq)) - xseg_to_blk(xseg, xreq, blkreq); - err = 0; + if (!rq_data_dir(blkreq)){ + xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq); + } blk_end: blk_end_request_all(blkreq, err); - xq_append_head(&blk_queue_pending, blkreq_idx); - xseg_put_request(xseg, xreq->portno, xreq); - } + ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, + blkreq_idx, xsegbd_dev->src_portno); + if (ridx == Noneidx) { + XSEGLOG("couldnt append blkreq_idx"); + WARN_ON(1); + } + + if (xseg_put_request(xsegbd_dev->xseg, xreq, + xsegbd_dev->src_portno) < 0){ + XSEGLOG("couldn't put req"); + WARN_ON(1); + } + } if (xsegbd_dev) { - spin_lock_irqsave(&xsegbd_dev->lock, flags); + spin_lock_irqsave(&xsegbd_dev->rqlock, flags); xseg_request_fn(xsegbd_dev->blk_queue); - spin_unlock_irqrestore(&xsegbd_dev->lock, flags); + spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags); } } @@ -597,12 +789,75 @@ static ssize_t xsegbd_reqs_show(struct device *dev, return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->nr_requests); } -static ssize_t xsegbd_name_show(struct device *dev, +static ssize_t xsegbd_target_show(struct device *dev, struct device_attribute *attr, char *buf) { struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev); - return sprintf(buf, "%s\n", xsegbd_dev->name); + return sprintf(buf, "%s\n", xsegbd_dev->target); +} + +static ssize_t xsegbd_image_refresh(struct device *dev, + struct device_attribute *attr, + const char *buf, + size_t size) +{ + struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev); + int rc, ret = size; + + mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING); + + rc = xsegbd_get_size(xsegbd_dev); + if (rc < 0) { + ret = rc; + goto out; + } + + set_capacity(xsegbd_dev->gd, xsegbd_dev->sectors); + +out: + mutex_unlock(&xsegbd_mutex); + return ret; +} + +//FIXME +//maybe try callback, first and then do a more invasive cleanup +static ssize_t xsegbd_cleanup(struct device *dev, + struct device_attribute *attr, + const char *buf, + size_t size) +{ + struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev); + int ret = size, i; + struct request *blkreq = NULL; + struct xsegbd_pending *pending = NULL; + struct completion *comp = NULL; + + mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING); + xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, + xsegbd_dev->src_portno); + for (i = 0; i < xsegbd_dev->nr_requests; i++) { + if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) { + pending = &xsegbd_dev->blk_req_pending[i]; + blkreq = pending->request; + pending->request = NULL; + comp = pending->comp; + pending->comp = NULL; + if (blkreq){ + XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i); + blk_end_request_all(blkreq, -EIO); + } + if (comp){ + XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i); + complete(comp); + } + __xq_append_tail(&xsegbd_dev->blk_queue_pending, i); + } + } + xlock_release(&xsegbd_dev->blk_queue_pending.lock); + + mutex_unlock(&xsegbd_mutex); + return ret; } static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL); @@ -611,7 +866,9 @@ static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL); static DEVICE_ATTR(dstport, S_IRUGO, xsegbd_dstport_show, NULL); static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL); static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL); -static DEVICE_ATTR(name , S_IRUGO, xsegbd_name_show, NULL); +static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL); +static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh); +static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup); static struct attribute *xsegbd_attrs[] = { &dev_attr_size.attr, @@ -620,7 +877,9 @@ static struct attribute *xsegbd_attrs[] = { &dev_attr_dstport.attr, &dev_attr_id.attr, &dev_attr_reqs.attr, - &dev_attr_name.attr, + &dev_attr_target.attr, + &dev_attr_refresh.attr, + &dev_attr_cleanup.attr, NULL }; @@ -680,10 +939,8 @@ static void xsegbd_bus_del_dev(struct xsegbd_device *xsegbd_dev) static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count) { struct xsegbd_device *xsegbd_dev; - struct xseg_port *xport; + struct xseg_port *port; ssize_t ret = -ENOMEM; - int new_id = 0; - struct list_head *tmp; if (!try_module_get(THIS_MODULE)) return -ENODEV; @@ -692,127 +949,115 @@ static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count) if (!xsegbd_dev) goto out; - spin_lock_init(&xsegbd_dev->lock); + spin_lock_init(&xsegbd_dev->rqlock); INIT_LIST_HEAD(&xsegbd_dev->node); /* parse cmd */ if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s " - "%d:%d:%d", xsegbd_dev->name, &xsegbd_dev->src_portno, + "%d:%d:%d", xsegbd_dev->target, &xsegbd_dev->src_portno, &xsegbd_dev->dst_portno, &xsegbd_dev->nr_requests) < 3) { ret = -EINVAL; goto out_dev; } - xsegbd_dev->namesize = strlen(xsegbd_dev->name); - - spin_lock(&xsegbd_dev_list_lock); - - list_for_each(tmp, &xsegbd_dev_list) { - struct xsegbd_device *entry; - - entry = list_entry(tmp, struct xsegbd_device, node); + xsegbd_dev->targetlen = strlen(xsegbd_dev->target); - if (entry->src_portno == xsegbd_dev->src_portno) { - ret = -EINVAL; - goto out_unlock; - } + if (xsegbd_dev->src_portno < start_portno || xsegbd_dev->src_portno > end_portno){ + XSEGLOG("Invadid portno"); + ret = -EINVAL; + goto out_dev; + } + xsegbd_dev->id = src_portno_to_id(xsegbd_dev->src_portno); - if (entry->id >= new_id) - new_id = entry->id + 1; + spin_lock(&xsegbd_devices_lock); + if (xsegbd_devices[xsegbd_dev->id] != NULL) { + ret = -EINVAL; + goto out_unlock; } + xsegbd_devices[xsegbd_dev->id] = xsegbd_dev; + spin_unlock(&xsegbd_devices_lock); - xsegbd_dev->id = new_id; + xsegbd_dev->major = major; - list_add_tail(&xsegbd_dev->node, &xsegbd_dev_list); + ret = xsegbd_bus_add_dev(xsegbd_dev); + if (ret) + goto out_delentry; - spin_unlock(&xsegbd_dev_list_lock); + if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending, + xsegbd_dev->nr_requests, + xsegbd_dev->nr_requests)) + goto out_bus; - XSEGLOG("registering block device major %d", major); - ret = register_blkdev(major, XSEGBD_NAME); - if (ret < 0) { - XSEGLOG("cannot register block device!"); - ret = -EBUSY; - goto out_delentry; - } - xsegbd_dev->major = ret; - XSEGLOG("registered block device major %d", xsegbd_dev->major); + xsegbd_dev->blk_req_pending = kzalloc( + xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending), + GFP_KERNEL); + if (!xsegbd_dev->blk_req_pending) + goto out_bus; - ret = xsegbd_bus_add_dev(xsegbd_dev); - if (ret) - goto out_blkdev; - XSEGLOG("binding to source port %u (destination %u)", + XSEGLOG("joining segment"); + //FIXME use xsebd module config for now + xsegbd_dev->xseg = xseg_join( xsegbd.config.type, + xsegbd.config.name, + "segdev", + xseg_callback ); + if (!xsegbd_dev->xseg) + goto out_bus; + + XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target, xsegbd_dev->src_portno, xsegbd_dev->dst_portno); - xport = xseg_bind_port(xsegbd.xseg, xsegbd_dev->src_portno); - if (!xport) { + port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL); + if (!port) { XSEGLOG("cannot bind to port"); ret = -EFAULT; goto out_bus; } - /* make sure we don't get any requests until we're ready to handle them */ - xport->waitcue = (long) NULL; - - XSEGLOG("allocating %u requests", xsegbd_dev->nr_requests); - if (xseg_alloc_requests(xsegbd.xseg, xsegbd_dev->src_portno, xsegbd_dev->nr_requests)) { - XSEGLOG("cannot allocate requests"); + + if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) { + XSEGLOG("portno != xsegbd_dev->src_portno"); + WARN_ON(1); ret = -EFAULT; - goto out_bus; } + xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno); + + + /* make sure we don't get any requests until we're ready to handle them */ + xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port)); ret = xsegbd_dev_init(xsegbd_dev); if (ret) goto out_bus; + xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port)); return count; out_bus: xsegbd_bus_del_dev(xsegbd_dev); - return ret; -out_blkdev: - unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME); - out_delentry: - spin_lock(&xsegbd_dev_list_lock); - list_del_init(&xsegbd_dev->node); + spin_lock(&xsegbd_devices_lock); + xsegbd_devices[xsegbd_dev->id] = NULL; out_unlock: - spin_unlock(&xsegbd_dev_list_lock); + spin_unlock(&xsegbd_devices_lock); out_dev: kfree(xsegbd_dev); out: + module_put(THIS_MODULE); return ret; } -static struct xsegbd_device *__xsegbd_get_dev(unsigned long id) -{ - struct list_head *tmp; - struct xsegbd_device *xsegbd_dev; - - - spin_lock(&xsegbd_dev_list_lock); - list_for_each(tmp, &xsegbd_dev_list) { - xsegbd_dev = list_entry(tmp, struct xsegbd_device, node); - if (xsegbd_dev->id == id) { - spin_unlock(&xsegbd_dev_list_lock); - return xsegbd_dev; - } - } - spin_unlock(&xsegbd_dev_list_lock); - return NULL; -} - static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count) { struct xsegbd_device *xsegbd_dev = NULL; int id, ret; unsigned long ul_id; - ret = kstrtoul(buf, 10, &ul_id); + ret = strict_strtoul(buf, 10, &ul_id); if (ret) return ret; @@ -828,7 +1073,6 @@ static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count ret = -ENOENT; goto out_unlock; } - xsegbd_bus_del_dev(xsegbd_dev); out_unlock: @@ -871,18 +1115,32 @@ static void xsegbd_sysfs_cleanup(void) static int __init xsegbd_init(void) { int ret = -ENOMEM; - - if (!xq_alloc_seq(&blk_queue_pending, max_nr_pending, max_nr_pending)) + max_dev = end_portno - start_portno; + if (max_dev < 0){ + XSEGLOG("invalid port numbers"); + ret = -EINVAL; + goto out; + } + xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL); + if (!xsegbd_devices) goto out; - blk_req_pending = kzalloc(sizeof(struct pending) * max_nr_pending, GFP_KERNEL); - if (!blk_req_pending) - goto out_queue; + spin_lock_init(&xsegbd_devices_lock); + + XSEGLOG("registering block device major %d", major); + ret = register_blkdev(major, XSEGBD_NAME); + if (ret < 0) { + XSEGLOG("cannot register block device!"); + ret = -EBUSY; + goto out_free; + } + major = ret; + XSEGLOG("registered block device major %d", major); ret = -ENOSYS; ret = xsegbd_xseg_init(); if (ret) - goto out_pending; + goto out_unregister; ret = xsegbd_sysfs_init(); if (ret) @@ -895,10 +1153,13 @@ out: out_xseg: xsegbd_xseg_quit(); -out_pending: - kfree(blk_req_pending); -out_queue: - xq_free(&blk_queue_pending); + +out_unregister: + unregister_blkdev(major, XSEGBD_NAME); + +out_free: + kfree(xsegbd_devices); + goto out; } @@ -906,6 +1167,7 @@ static void __exit xsegbd_exit(void) { xsegbd_sysfs_cleanup(); xsegbd_xseg_quit(); + unregister_blkdev(major, XSEGBD_NAME); } module_init(xsegbd_init);