+/*
+ * Copyright (C) 2012 GRNET S.A.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301, USA.
+ */
+
/* xsegbd.c
*
*/
#include <linux/bio.h>
#include <linux/device.h>
#include <linux/completion.h>
-
+#include <linux/wait.h>
#include <sys/kernel/segdev.h>
#include "xsegbd.h"
+#include <xseg/protocol.h>
#define XSEGBD_MINORS 1
+/* define max request size to be used in xsegbd */
+#define XSEGBD_MAX_REQUEST_SIZE 4194304U
MODULE_DESCRIPTION("xsegbd");
MODULE_AUTHOR("XSEG");
static long sector_size = 0;
static long blksize = 512;
static int major = 0;
-static int max_nr_pending = 1024;
+static int max_dev = 200;
+static long start_portno = 0;
+static long end_portno = 199;
static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd";
-static char spec[256] = "segdev:xsegbd:4:512:64:1024:12";
+static char spec[256] = "segdev:xsegbd:512:1024:12";
module_param(sector_size, long, 0644);
module_param(blksize, long, 0644);
-module_param(max_nr_pending, int, 0644);
+module_param(start_portno, long, 0644);
+module_param(end_portno, long, 0644);
module_param(major, int, 0644);
module_param_string(name, name, sizeof(name), 0644);
module_param_string(spec, spec, sizeof(spec), 0644);
-struct pending {
- struct request *request;
- struct completion *comp;
- struct xsegbd_device *dev;
-};
-
-static struct xq blk_queue_pending;
-static struct pending *blk_req_pending;
-static unsigned int nr_pending;
-static spinlock_t __lock;
static struct xsegbd xsegbd;
+static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
static DEFINE_MUTEX(xsegbd_mutex);
-static LIST_HEAD(xsegbd_dev_list);
-static DEFINE_SPINLOCK(xsegbd_dev_list_lock);
+static DEFINE_SPINLOCK(xsegbd_devices_lock);
+
+
+struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
+{
+ struct xsegbd_device *xsegbd_dev = NULL;
+
+ spin_lock(&xsegbd_devices_lock);
+ xsegbd_dev = xsegbd_devices[id];
+ spin_unlock(&xsegbd_devices_lock);
+
+ return xsegbd_dev;
+}
+
+static int src_portno_to_id(xport src_portno)
+{
+ return (src_portno - start_portno);
+}
/* ************************* */
/* ***** sysfs helpers ***** */
/* ** XSEG Initialization ** */
/* ************************* */
-static void xseg_callback(struct xseg *xseg, uint32_t portno);
+static void xseg_callback(uint32_t portno);
int xsegbd_xseg_init(void)
{
XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'",
xsegbd.config.type);
+ /* leave it here for now */
XSEGLOG("joining segment");
xsegbd.xseg = xseg_join( xsegbd.config.type,
xsegbd.config.name,
static void xseg_request_fn(struct request_queue *rq);
static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev);
+static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev);
static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
{
struct gendisk *disk;
unsigned int max_request_size_bytes;
- spin_lock_init(&xsegbd_dev->lock);
+ spin_lock_init(&xsegbd_dev->rqlock);
xsegbd_dev->xsegbd = &xsegbd;
- xsegbd_dev->blk_queue = blk_alloc_queue(GFP_KERNEL);
+ /* allocates and initializes queue */
+ xsegbd_dev->blk_queue = blk_init_queue(xseg_request_fn, &xsegbd_dev->rqlock);
if (!xsegbd_dev->blk_queue)
goto out;
- blk_init_allocated_queue(xsegbd_dev->blk_queue, xseg_request_fn, &xsegbd_dev->lock);
xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
- //blk_queue_max_segments(dev->blk_queue, 512);
- /* calculate maximum block request size
- * request size in pages * page_size
- * leave one page in buffer for name
- */
- max_request_size_bytes =
- (unsigned int) (xsegbd.config.request_size - 1) *
- ( 1 << xsegbd.config.page_shift) ;
+
+ max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
+// blk_queue_max_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 10);
+ blk_queue_max_segments(xsegbd_dev->blk_queue, 1024);
blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue);
/* vkoukis says we don't need partitions */
- xsegbd_dev->gd = disk = alloc_disk(1);
+ xsegbd_dev->gd = disk = alloc_disk(XSEGBD_MINORS);
if (!disk)
- /* FIXME: We call xsegbd_dev_release if something goes wrong, to cleanup
- * disks/queues/etc.
- * Would it be better to do the cleanup here, and conditionally cleanup
- * in dev_release?
- */
goto out;
disk->major = xsegbd_dev->major;
- disk->first_minor = 0; // id * XSEGBD_MINORS;
+ disk->first_minor = xsegbd_dev->id * XSEGBD_MINORS;
disk->fops = &xsegbd_ops;
disk->queue = xsegbd_dev->blk_queue;
disk->private_data = xsegbd_dev;
snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id);
ret = 0;
- spin_lock_irq(&__lock);
- if (nr_pending + xsegbd_dev->nr_requests > max_nr_pending)
- ret = -ENOBUFS;
- else
- nr_pending += xsegbd_dev->nr_requests;
- spin_unlock_irq(&__lock);
-
- if (ret)
- goto out;
/* allow a non-zero sector_size parameter to override the disk size */
if (sector_size)
XSEGLOG("xsegbd active...");
add_disk(disk); /* immediately activates the device */
- return 0;
-
out:
+ /* on error, everything is cleaned up in xsegbd_dev_release */
return ret;
}
static void xsegbd_dev_release(struct device *dev)
{
struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
- struct xseg_port *port;
+
/* cleanup gendisk and blk_queue the right way */
if (xsegbd_dev->gd) {
if (xsegbd_dev->gd->flags & GENHD_FL_UP)
del_gendisk(xsegbd_dev->gd);
- blk_cleanup_queue(xsegbd_dev->blk_queue);
- put_disk(xsegbd_dev->gd);
+ xsegbd_mapclose(xsegbd_dev);
}
- /* reset the port's waitcue (aka cancel_wait) */
- port = &xsegbd.xseg->ports[xsegbd_dev->src_portno];
- port->waitcue = (long) NULL;
+ spin_lock(&xsegbd_devices_lock);
+ BUG_ON(xsegbd_devices[xsegbd_dev->id] != xsegbd_dev);
+ xsegbd_devices[xsegbd_dev->id] = NULL;
+ spin_unlock(&xsegbd_devices_lock);
- if (xseg_free_requests(xsegbd.xseg, xsegbd_dev->src_portno, xsegbd_dev->nr_requests) != 0)
- XSEGLOG("Error trying to free requests!\n");
+ XSEGLOG("releasing id: %d", xsegbd_dev->id);
+// xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+ xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
- WARN_ON(nr_pending < xsegbd_dev->nr_requests);
- spin_lock_irq(&__lock);
- nr_pending -= xsegbd_dev->nr_requests;
- spin_unlock_irq(&__lock);
+ if (xsegbd_dev->blk_queue)
+ blk_cleanup_queue(xsegbd_dev->blk_queue);
+ if (xsegbd_dev->gd)
+ put_disk(xsegbd_dev->gd);
- unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
+// if (xseg_free_requests(xsegbd_dev->xseg,
+// xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
+// XSEGLOG("Error trying to free requests!\n");
- spin_lock(&xsegbd_dev_list_lock);
- list_del_init(&xsegbd_dev->node);
- spin_unlock(&xsegbd_dev_list_lock);
- kfree(xsegbd_dev);
+ if (xsegbd_dev->xseg){
+ xseg_leave(xsegbd_dev->xseg);
+ xsegbd_dev->xseg = NULL;
+ }
+ if (xsegbd_dev->blk_req_pending){
+ kfree(xsegbd_dev->blk_req_pending);
+ xsegbd_dev->blk_req_pending = NULL;
+ }
+ xq_free(&xsegbd_dev->blk_queue_pending);
+ kfree(xsegbd_dev);
module_put(THIS_MODULE);
}
struct bio_vec *bvec;
struct req_iterator iter;
uint64_t off = 0;
- char *data = XSEG_TAKE_PTR(xreq->data, xseg->segment);
+ char *data = xseg_get_data(xseg, xreq);
rq_for_each_segment(bvec, blkreq, iter) {
char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
memcpy(data + off, bdata, bvec->bv_len);
struct bio_vec *bvec;
struct req_iterator iter;
uint64_t off = 0;
- char *data = XSEG_TAKE_PTR(xreq->data, xseg->segment);
+ char *data = xseg_get_data(xseg, xreq);
rq_for_each_segment(bvec, blkreq, iter) {
char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
memcpy(bdata, data + off, bvec->bv_len);
struct xseg_request *xreq;
struct xsegbd_device *xsegbd_dev = rq->queuedata;
struct request *blkreq;
- struct pending *pending;
+ struct xsegbd_pending *pending;
xqindex blkreq_idx;
char *target;
uint64_t datalen;
+ xport p;
+ int r;
+ unsigned long flags;
+ spin_unlock_irq(&xsegbd_dev->rqlock);
for (;;) {
- xreq = xseg_get_request(xsegbd.xseg, xsegbd_dev->src_portno);
+ if (current_thread_info()->preempt_count || irqs_disabled()){
+ XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
+ current_thread_info()->preempt_count, irqs_disabled());
+ }
+ //XSEGLOG("Priority: %d", current_thread_info()->task->prio);
+ //XSEGLOG("Static priority: %d", current_thread_info()->task->static_prio);
+ //XSEGLOG("Normal priority: %d", current_thread_info()->task->normal_prio);
+ //XSEGLOG("Rt_priority: %u", current_thread_info()->task->rt_priority);
+ blkreq_idx = Noneidx;
+ xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
+ xsegbd_dev->dst_portno, X_ALLOC);
if (!xreq)
break;
+ blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending,
+ xsegbd_dev->src_portno);
+ if (blkreq_idx == Noneidx)
+ break;
+
+ if (blkreq_idx >= xsegbd_dev->nr_requests) {
+ XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
+ WARN_ON(1);
+ break;
+ }
+
+
+ spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
blkreq = blk_fetch_request(rq);
- if (!blkreq)
+ if (!blkreq){
+ spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
break;
+ }
if (blkreq->cmd_type != REQ_TYPE_FS) {
+ //FIXME we lose xreq here
XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
__blk_end_request_all(blkreq, 0);
+ spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
+ continue;
+ }
+ spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
+ if (current_thread_info()->preempt_count || irqs_disabled()){
+ XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
+ current_thread_info()->preempt_count, irqs_disabled());
}
-
datalen = blk_rq_bytes(blkreq);
- BUG_ON(xreq->bufferlen - xsegbd_dev->targetlen < datalen);
- BUG_ON(xseg_prep_request(xreq, xsegbd_dev->targetlen, datalen));
+ r = xseg_prep_request(xsegbd_dev->xseg, xreq,
+ xsegbd_dev->targetlen, datalen);
+ if (r < 0) {
+ XSEGLOG("couldn't prep request");
+ blk_end_request_err(blkreq, r);
+ WARN_ON(1);
+ break;
+ }
+ r = -ENOMEM;
+ if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
+ XSEGLOG("malformed req buffers");
+ blk_end_request_err(blkreq, r);
+ WARN_ON(1);
+ break;
+ }
- target = XSEG_TAKE_PTR(xreq->target, xsegbd.xseg->segment);
+ target = xseg_get_target(xsegbd_dev->xseg, xreq);
strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
- blkreq_idx = xq_pop_head(&blk_queue_pending);
- BUG_ON(blkreq_idx == Noneidx);
- pending = &blk_req_pending[blkreq_idx];
+
+ pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
pending->dev = xsegbd_dev;
pending->request = blkreq;
pending->comp = NULL;
- xreq->priv = (uint64_t)blkreq_idx;
+
xreq->size = datalen;
xreq->offset = blk_rq_pos(blkreq) << 9;
+ xreq->priv = (uint64_t) blkreq_idx;
+
/*
if (xreq->offset >= (sector_size << 9))
XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
xreq->flags |= XF_FUA;
if (rq_data_dir(blkreq)) {
- /* unlock for data transfers? */
- blk_to_xseg(xsegbd.xseg, xreq, blkreq);
+ blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
xreq->op = X_WRITE;
} else {
xreq->op = X_READ;
}
- BUG_ON(xseg_submit(xsegbd.xseg, xsegbd_dev->dst_portno, xreq) == NoSerial);
- }
- WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, xsegbd_dev->dst_portno) < 0);
+// XSEGLOG("%s : %lu (%lu)", xsegbd_dev->target, xreq->offset, xreq->datalen);
+ r = -EIO;
+ p = xseg_submit(xsegbd_dev->xseg, xreq,
+ xsegbd_dev->src_portno, X_ALLOC);
+ if (p == NoPort) {
+ XSEGLOG("coundn't submit req");
+ WARN_ON(1);
+ blk_end_request_err(blkreq, r);
+ break;
+ }
+ WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
+ }
if (xreq)
- BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xsegbd_dev->src_portno, xreq) == NoSerial);
+ WARN_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq,
+ xsegbd_dev->src_portno) == -1);
+ if (blkreq_idx != Noneidx)
+ WARN_ON(xq_append_head(&xsegbd_dev->blk_queue_pending,
+ blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
+ spin_lock_irq(&xsegbd_dev->rqlock);
}
int update_dev_sectors_from_request( struct xsegbd_device *xsegbd_dev,
struct xseg_request *xreq )
{
void *data;
+ if (!xreq) {
+ XSEGLOG("Invalid xreq");
+ return -EIO;
+ }
if (xreq->state & XS_FAILED)
return -ENOENT;
if (!(xreq->state & XS_SERVED))
return -EIO;
- data = XSEG_TAKE_PTR(xreq->data, xsegbd.xseg->segment);
+ data = xseg_get_data(xsegbd_dev->xseg, xreq);
+ if (!data) {
+ XSEGLOG("Invalid req data");
+ return -EIO;
+ }
+ if (!xsegbd_dev) {
+ XSEGLOG("Invalid xsegbd_dev");
+ return -ENOENT;
+ }
xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL;
return 0;
}
static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
{
struct xseg_request *xreq;
- struct xseg_port *port;
char *target;
- uint64_t datalen;
xqindex blkreq_idx;
- struct pending *pending;
+ struct xsegbd_pending *pending;
struct completion comp;
+ xport p;
int ret = -EBUSY;
- xreq = xseg_get_request(xsegbd.xseg, xsegbd_dev->src_portno);
+ xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
+ xsegbd_dev->dst_portno, X_ALLOC);
if (!xreq)
goto out;
- datalen = sizeof(uint64_t);
- BUG_ON(xreq->bufferlen - xsegbd_dev->targetlen < datalen);
- BUG_ON(xseg_prep_request(xreq, xsegbd_dev->targetlen, datalen));
+ WARN_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen,
+ sizeof(struct xseg_reply_info)));
init_completion(&comp);
- blkreq_idx = xq_pop_head(&blk_queue_pending);
- BUG_ON(blkreq_idx == Noneidx);
- pending = &blk_req_pending[blkreq_idx];
+ blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
+ if (blkreq_idx == Noneidx)
+ goto out_put;
+
+ pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
pending->dev = xsegbd_dev;
pending->request = NULL;
pending->comp = ∁
- xreq->priv = (uint64_t)blkreq_idx;
- target = XSEG_TAKE_PTR(xreq->target, xsegbd.xseg->segment);
+
+ xreq->priv = (uint64_t) blkreq_idx;
+
+ target = xseg_get_target(xsegbd_dev->xseg, xreq);
strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
- xreq->size = datalen;
+ xreq->size = xreq->datalen;
xreq->offset = 0;
-
xreq->op = X_INFO;
- port = &xsegbd.xseg->ports[xsegbd_dev->src_portno];
- port->waitcue = (uint64_t)(long)xsegbd_dev;
-
- BUG_ON(xseg_submit(xsegbd.xseg, xsegbd_dev->dst_portno, xreq) == NoSerial);
- WARN_ON(xseg_signal(xsegbd.xseg, xsegbd_dev->dst_portno) < 0);
-
+ xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+ p = xseg_submit(xsegbd_dev->xseg, xreq,
+ xsegbd_dev->src_portno, X_ALLOC);
+ if ( p == NoPort) {
+ XSEGLOG("couldn't submit request");
+ WARN_ON(1);
+ goto out_queue;
+ }
+ WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
+ XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
wait_for_completion_interruptible(&comp);
- XSEGLOG("Woken up after wait_for_completion_interruptible()\n");
+ XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
+
+out_queue:
+ pending->dev = NULL;
+ pending->comp = NULL;
+ xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
+out_put:
+ WARN_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
out:
- BUG_ON(xseg_put_request(xsegbd.xseg, xsegbd_dev->src_portno, xreq) == NoSerial);
return ret;
}
-static void xseg_callback(struct xseg *xseg, uint32_t portno)
+static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
{
- struct xsegbd_device *xsegbd_dev = NULL, *old_dev = NULL;
+ struct xseg_request *xreq;
+ char *target;
+ xqindex blkreq_idx;
+ struct xsegbd_pending *pending;
+ struct completion comp;
+ xport p;
+ int ret = -EBUSY;
+
+ xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
+ xsegbd_dev->dst_portno, X_ALLOC);
+ if (!xreq)
+ goto out;
+
+ WARN_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
+
+ init_completion(&comp);
+ blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
+ if (blkreq_idx == Noneidx)
+ goto out_put;
+
+ pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
+ pending->dev = xsegbd_dev;
+ pending->request = NULL;
+ pending->comp = ∁
+
+
+ xreq->priv = (uint64_t) blkreq_idx;
+
+ target = xseg_get_target(xsegbd_dev->xseg, xreq);
+ strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
+ xreq->size = xreq->datalen;
+ xreq->offset = 0;
+ xreq->op = X_CLOSE;
+
+ xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+ p = xseg_submit(xsegbd_dev->xseg, xreq,
+ xsegbd_dev->src_portno, X_ALLOC);
+ if ( p == NoPort) {
+ XSEGLOG("couldn't submit request");
+ WARN_ON(1);
+ goto out_queue;
+ }
+ WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
+ wait_for_completion_interruptible(&comp);
+ ret = 0;
+ if (xreq->state & XS_FAILED)
+ XSEGLOG("Couldn't close disk on mapper");
+
+out_queue:
+ pending->dev = NULL;
+ pending->comp = NULL;
+ xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
+out_put:
+ WARN_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
+out:
+ return ret;
+}
+
+static void xseg_callback(xport portno)
+{
+ struct xsegbd_device *xsegbd_dev;
struct xseg_request *xreq;
struct request *blkreq;
- struct pending *pending;
+ struct xsegbd_pending *pending;
unsigned long flags;
- uint32_t blkreq_idx;
+ xqindex blkreq_idx, ridx;
int err;
+ xsegbd_dev = __xsegbd_get_dev(portno);
+ if (!xsegbd_dev) {
+ XSEGLOG("portno: %u has no xsegbd device assigned", portno);
+ WARN_ON(1);
+ return;
+ }
+
for (;;) {
- xreq = xseg_receive(xseg, portno);
+ xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+ xreq = xseg_receive(xsegbd_dev->xseg, portno, 0);
if (!xreq)
break;
- /* we rely upon our peers to not have touched ->priv */
- blkreq_idx = (uint64_t)xreq->priv;
- if (blkreq_idx >= max_nr_pending) {
+// xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+
+ blkreq_idx = (xqindex) xreq->priv;
+ if (blkreq_idx >= xsegbd_dev->nr_requests) {
WARN_ON(1);
+ //FIXME maybe put request?
continue;
}
- pending = &blk_req_pending[blkreq_idx];
+ pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
if (pending->comp) {
/* someone is blocking on this request
and will handle it when we wake them up. */
/* this is now treated as a block I/O request to end */
blkreq = pending->request;
pending->request = NULL;
- xsegbd_dev = pending->dev;
+ if (xsegbd_dev != pending->dev) {
+ //FIXME maybe put request?
+ XSEGLOG("xsegbd_dev != pending->dev");
+ WARN_ON(1);
+ continue;
+ }
pending->dev = NULL;
- WARN_ON(!blkreq);
-
- if ((xsegbd_dev != old_dev) && old_dev) {
- spin_lock_irqsave(&old_dev->lock, flags);
- xseg_request_fn(old_dev->blk_queue);
- spin_unlock_irqrestore(&old_dev->lock, flags);
+ if (!blkreq){
+ //FIXME maybe put request?
+ XSEGLOG("blkreq does not exist");
+ WARN_ON(1);
+ continue;
}
- old_dev = xsegbd_dev;
-
+ err = -EIO;
if (!(xreq->state & XS_SERVED))
goto blk_end;
if (xreq->serviced != blk_rq_bytes(blkreq))
goto blk_end;
- /* unlock for data transfer? */
- if (!rq_data_dir(blkreq))
- xseg_to_blk(xseg, xreq, blkreq);
-
err = 0;
+ if (!rq_data_dir(blkreq)){
+ xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
+ }
blk_end:
blk_end_request_all(blkreq, err);
- xq_append_head(&blk_queue_pending, blkreq_idx);
- BUG_ON(xseg_put_request(xseg, xreq->portno, xreq) == NoSerial);
- }
+ ridx = xq_append_head(&xsegbd_dev->blk_queue_pending,
+ blkreq_idx, xsegbd_dev->src_portno);
+ if (ridx == Noneidx) {
+ XSEGLOG("couldnt append blkreq_idx");
+ WARN_ON(1);
+ }
+
+ if (xseg_put_request(xsegbd_dev->xseg, xreq,
+ xsegbd_dev->src_portno) < 0){
+ XSEGLOG("couldn't put req");
+ WARN_ON(1);
+ }
+ }
if (xsegbd_dev) {
- spin_lock_irqsave(&xsegbd_dev->lock, flags);
+ spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
xseg_request_fn(xsegbd_dev->blk_queue);
- spin_unlock_irqrestore(&xsegbd_dev->lock, flags);
+ spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
}
}
return ret;
}
+//FIXME
+//maybe try callback, first and then do a more invasive cleanup
+static ssize_t xsegbd_cleanup(struct device *dev,
+ struct device_attribute *attr,
+ const char *buf,
+ size_t size)
+{
+ struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
+ int ret = size, i;
+ struct request *blkreq = NULL;
+ struct xsegbd_pending *pending = NULL;
+ struct completion *comp = NULL;
+
+ mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
+ xlock_acquire(&xsegbd_dev->blk_queue_pending.lock,
+ xsegbd_dev->src_portno);
+ for (i = 0; i < xsegbd_dev->nr_requests; i++) {
+ if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
+ pending = &xsegbd_dev->blk_req_pending[i];
+ blkreq = pending->request;
+ pending->request = NULL;
+ comp = pending->comp;
+ pending->comp = NULL;
+ if (blkreq){
+ XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i);
+ blk_end_request_all(blkreq, -EIO);
+ }
+ if (comp){
+ XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i);
+ complete(comp);
+ }
+ __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
+ }
+ }
+ xlock_release(&xsegbd_dev->blk_queue_pending.lock);
+
+ mutex_unlock(&xsegbd_mutex);
+ return ret;
+}
+
static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
+static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
static struct attribute *xsegbd_attrs[] = {
&dev_attr_size.attr,
&dev_attr_reqs.attr,
&dev_attr_target.attr,
&dev_attr_refresh.attr,
+ &dev_attr_cleanup.attr,
NULL
};
static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
{
struct xsegbd_device *xsegbd_dev;
- struct xseg_port *xport;
+ struct xseg_port *port;
ssize_t ret = -ENOMEM;
- int new_id = 0;
- struct list_head *tmp;
if (!try_module_get(THIS_MODULE))
return -ENODEV;
if (!xsegbd_dev)
goto out;
- spin_lock_init(&xsegbd_dev->lock);
+ spin_lock_init(&xsegbd_dev->rqlock);
INIT_LIST_HEAD(&xsegbd_dev->node);
/* parse cmd */
}
xsegbd_dev->targetlen = strlen(xsegbd_dev->target);
- spin_lock(&xsegbd_dev_list_lock);
-
- list_for_each(tmp, &xsegbd_dev_list) {
- struct xsegbd_device *entry;
-
- entry = list_entry(tmp, struct xsegbd_device, node);
-
- if (entry->src_portno == xsegbd_dev->src_portno) {
- ret = -EINVAL;
- goto out_unlock;
- }
+ if (xsegbd_dev->src_portno < start_portno || xsegbd_dev->src_portno > end_portno){
+ XSEGLOG("Invadid portno");
+ ret = -EINVAL;
+ goto out_dev;
+ }
+ xsegbd_dev->id = src_portno_to_id(xsegbd_dev->src_portno);
- if (entry->id >= new_id)
- new_id = entry->id + 1;
+ spin_lock(&xsegbd_devices_lock);
+ if (xsegbd_devices[xsegbd_dev->id] != NULL) {
+ ret = -EINVAL;
+ goto out_unlock;
}
+ xsegbd_devices[xsegbd_dev->id] = xsegbd_dev;
+ spin_unlock(&xsegbd_devices_lock);
+
+ xsegbd_dev->major = major;
- xsegbd_dev->id = new_id;
+ ret = xsegbd_bus_add_dev(xsegbd_dev);
+ if (ret)
+ goto out_delentry;
- list_add_tail(&xsegbd_dev->node, &xsegbd_dev_list);
+ if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending,
+ xsegbd_dev->nr_requests,
+ xsegbd_dev->nr_requests))
+ goto out_bus;
- spin_unlock(&xsegbd_dev_list_lock);
+ xsegbd_dev->blk_req_pending = kzalloc(
+ xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
+ GFP_KERNEL);
+ if (!xsegbd_dev->blk_req_pending)
+ goto out_bus;
- XSEGLOG("registering block device major %d", major);
- ret = register_blkdev(major, XSEGBD_NAME);
- if (ret < 0) {
- XSEGLOG("cannot register block device!");
- ret = -EBUSY;
- goto out_delentry;
- }
- xsegbd_dev->major = ret;
- XSEGLOG("registered block device major %d", xsegbd_dev->major);
- ret = xsegbd_bus_add_dev(xsegbd_dev);
- if (ret)
- goto out_blkdev;
+ XSEGLOG("joining segment");
+ //FIXME use xsebd module config for now
+ xsegbd_dev->xseg = xseg_join( xsegbd.config.type,
+ xsegbd.config.name,
+ "segdev",
+ xseg_callback );
+ if (!xsegbd_dev->xseg)
+ goto out_bus;
- XSEGLOG("binding to source port %u (destination %u)",
+ XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target,
xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
- xport = xseg_bind_port(xsegbd.xseg, xsegbd_dev->src_portno);
- if (!xport) {
+ port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
+ if (!port) {
XSEGLOG("cannot bind to port");
ret = -EFAULT;
goto out_bus;
}
- /* make sure we don't get any requests until we're ready to handle them */
- xport->waitcue = (long) NULL;
-
- XSEGLOG("allocating %u requests", xsegbd_dev->nr_requests);
- if (xseg_alloc_requests(xsegbd.xseg, xsegbd_dev->src_portno, xsegbd_dev->nr_requests)) {
- XSEGLOG("cannot allocate requests");
+
+ if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
+ XSEGLOG("portno != xsegbd_dev->src_portno");
+ WARN_ON(1);
ret = -EFAULT;
-
goto out_bus;
}
+ xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
+
+
+ /* make sure we don't get any requests until we're ready to handle them */
+ xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
ret = xsegbd_dev_init(xsegbd_dev);
if (ret)
goto out_bus;
+ xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
return count;
out_bus:
xsegbd_bus_del_dev(xsegbd_dev);
-
return ret;
-out_blkdev:
- unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
-
out_delentry:
- spin_lock(&xsegbd_dev_list_lock);
- list_del_init(&xsegbd_dev->node);
+ spin_lock(&xsegbd_devices_lock);
+ xsegbd_devices[xsegbd_dev->id] = NULL;
out_unlock:
- spin_unlock(&xsegbd_dev_list_lock);
+ spin_unlock(&xsegbd_devices_lock);
out_dev:
kfree(xsegbd_dev);
out:
+ module_put(THIS_MODULE);
return ret;
}
-static struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
-{
- struct list_head *tmp;
- struct xsegbd_device *xsegbd_dev;
-
-
- spin_lock(&xsegbd_dev_list_lock);
- list_for_each(tmp, &xsegbd_dev_list) {
- xsegbd_dev = list_entry(tmp, struct xsegbd_device, node);
- if (xsegbd_dev->id == id) {
- spin_unlock(&xsegbd_dev_list_lock);
- return xsegbd_dev;
- }
- }
- spin_unlock(&xsegbd_dev_list_lock);
- return NULL;
-}
-
static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count)
{
struct xsegbd_device *xsegbd_dev = NULL;
ret = -ENOENT;
goto out_unlock;
}
-
xsegbd_bus_del_dev(xsegbd_dev);
out_unlock:
static int __init xsegbd_init(void)
{
int ret = -ENOMEM;
-
- if (!xq_alloc_seq(&blk_queue_pending, max_nr_pending, max_nr_pending))
+ max_dev = end_portno - start_portno;
+ if (max_dev < 0){
+ XSEGLOG("invalid port numbers");
+ ret = -EINVAL;
+ goto out;
+ }
+ xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL);
+ if (!xsegbd_devices)
goto out;
- blk_req_pending = kzalloc(sizeof(struct pending) * max_nr_pending, GFP_KERNEL);
- if (!blk_req_pending)
- goto out_queue;
+ spin_lock_init(&xsegbd_devices_lock);
+
+ XSEGLOG("registering block device major %d", major);
+ ret = register_blkdev(major, XSEGBD_NAME);
+ if (ret < 0) {
+ XSEGLOG("cannot register block device!");
+ ret = -EBUSY;
+ goto out_free;
+ }
+ major = ret;
+ XSEGLOG("registered block device major %d", major);
ret = -ENOSYS;
ret = xsegbd_xseg_init();
if (ret)
- goto out_pending;
+ goto out_unregister;
ret = xsegbd_sysfs_init();
if (ret)
out_xseg:
xsegbd_xseg_quit();
-out_pending:
- kfree(blk_req_pending);
-out_queue:
- xq_free(&blk_queue_pending);
+
+out_unregister:
+ unregister_blkdev(major, XSEGBD_NAME);
+
+out_free:
+ kfree(xsegbd_devices);
+
goto out;
}
{
xsegbd_sysfs_cleanup();
xsegbd_xseg_quit();
+ unregister_blkdev(major, XSEGBD_NAME);
}
module_init(xsegbd_init);