3a1479c319d19a6c4041c025afd395920c60b866
[archipelago] / xseg / peers / kernel / xsegbd.c
1 /* xsegbd.c
2  *
3  */
4
5 #include <linux/module.h>
6 #include <linux/moduleparam.h>
7 #include <linux/init.h>
8 #include <linux/sched.h>
9 #include <linux/kernel.h>
10 #include <linux/slab.h>
11 #include <linux/fs.h>
12 #include <linux/errno.h>
13 #include <linux/timer.h>
14 #include <linux/types.h>
15 #include <linux/vmalloc.h>
16 #include <linux/genhd.h>
17 #include <linux/blkdev.h>
18 #include <linux/bio.h>
19 #include <linux/device.h>
20 #include <linux/completion.h>
21 #include <linux/wait.h>
22 #include <sys/kernel/segdev.h>
23 #include "xsegbd.h"
24 #include <xseg/protocol.h>
25
26 #define XSEGBD_MINORS 1
27 /* define max request size to be used in xsegbd */
28 //FIXME should we make this 4MB instead of 256KB ?
29 #define XSEGBD_MAX_REQUEST_SIZE 262144U
30
31 MODULE_DESCRIPTION("xsegbd");
32 MODULE_AUTHOR("XSEG");
33 MODULE_LICENSE("GPL");
34
35 static long sector_size = 0;
36 static long blksize = 512;
37 static int major = 0;
38 static int max_dev = 1024;
39 static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd";
40 static char spec[256] = "segdev:xsegbd:4:1024:12";
41
42 module_param(sector_size, long, 0644);
43 module_param(blksize, long, 0644);
44 module_param(max_dev, int, 0644);
45 module_param(major, int, 0644);
46 module_param_string(name, name, sizeof(name), 0644);
47 module_param_string(spec, spec, sizeof(spec), 0644);
48
49 static struct xsegbd xsegbd;
50 static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
51 static DEFINE_MUTEX(xsegbd_mutex);
52 static DEFINE_SPINLOCK(xsegbd_devices_lock);
53
54
55 static void __xsegbd_get(struct xsegbd_device *xsegbd_dev)
56 {
57         atomic_inc(&xsegbd_dev->usercount);
58 }
59
60 static void __xsegbd_put(struct xsegbd_device *xsegbd_dev)
61 {
62         atomic_dec(&xsegbd_dev->usercount);
63         wake_up(&xsegbd_dev->wq);
64 }
65
66 static struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
67 {
68         struct xsegbd_device *xsegbd_dev = NULL;
69
70         spin_lock(&xsegbd_devices_lock);
71         xsegbd_dev = xsegbd_devices[id];
72         if (xsegbd_dev)
73                 __xsegbd_get(xsegbd_dev);
74         spin_unlock(&xsegbd_devices_lock);
75
76         return xsegbd_dev;
77 }
78
79 /* ************************* */
80 /* ***** sysfs helpers ***** */
81 /* ************************* */
82
83 static struct xsegbd_device *dev_to_xsegbd(struct device *dev)
84 {
85         return container_of(dev, struct xsegbd_device, dev);
86 }
87
88 static struct device *xsegbd_get_dev(struct xsegbd_device *xsegbd_dev)
89 {
90         /* FIXME */
91         return get_device(&xsegbd_dev->dev);
92 }
93
94 static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev)
95 {
96         put_device(&xsegbd_dev->dev);
97 }
98
99 /* ************************* */
100 /* ** XSEG Initialization ** */
101 /* ************************* */
102
103 static void xseg_callback(uint32_t portno);
104
105 int xsegbd_xseg_init(void)
106 {
107         int r;
108
109         if (!xsegbd.name[0])
110                 strncpy(xsegbd.name, name, XSEGBD_SEGMENT_NAMELEN);
111
112         r = xseg_initialize();
113         if (r) {
114                 XSEGLOG("cannot initialize 'segdev' peer");
115                 goto err;
116         }
117
118         r = xseg_parse_spec(spec, &xsegbd.config);
119         if (r)
120                 goto err;
121
122         if (strncmp(xsegbd.config.type, "segdev", 16))
123                 XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'",
124                          xsegbd.config.type);
125
126         /* leave it here for now */
127         XSEGLOG("joining segment");
128         xsegbd.xseg = xseg_join(        xsegbd.config.type,
129                                         xsegbd.config.name,
130                                         "segdev",
131                                         xseg_callback           );
132         if (!xsegbd.xseg) {
133                 XSEGLOG("cannot find segment");
134                 r = -ENODEV;
135                 goto err;
136         }
137
138         return 0;
139 err:
140         return r;
141
142 }
143
144 int xsegbd_xseg_quit(void)
145 {
146         struct segdev *segdev;
147
148         /* make sure to unmap the segment first */
149         segdev = segdev_get(0);
150         clear_bit(SEGDEV_RESERVED, &segdev->flags);
151         xsegbd.xseg->priv->segment_type.ops.unmap(xsegbd.xseg, xsegbd.xseg->segment_size);
152         segdev_put(segdev);
153
154         return 0;
155 }
156
157
158 /* ***************************** */
159 /* ** Block Device Operations ** */
160 /* ***************************** */
161
162 static int xsegbd_open(struct block_device *bdev, fmode_t mode)
163 {
164         struct gendisk *disk = bdev->bd_disk;
165         struct xsegbd_device *xsegbd_dev = disk->private_data;
166
167         xsegbd_get_dev(xsegbd_dev);
168
169         return 0;
170 }
171
172 static int xsegbd_release(struct gendisk *gd, fmode_t mode)
173 {
174         struct xsegbd_device *xsegbd_dev = gd->private_data;
175
176         xsegbd_put_dev(xsegbd_dev);
177
178         return 0;
179 }
180
181 static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode,
182                         unsigned int cmd, unsigned long arg)
183 {
184         return -ENOTTY;
185 }
186
187 static const struct block_device_operations xsegbd_ops = {
188         .owner          = THIS_MODULE,
189         .open           = xsegbd_open,
190         .release        = xsegbd_release,
191         .ioctl          = xsegbd_ioctl 
192 };
193
194
195 /* *************************** */
196 /* ** Device Initialization ** */
197 /* *************************** */
198
199 static void xseg_request_fn(struct request_queue *rq);
200 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev);
201 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev);
202
203 static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
204 {
205         int ret = -ENOMEM;
206         struct gendisk *disk;
207         unsigned int max_request_size_bytes;
208
209         spin_lock_init(&xsegbd_dev->rqlock);
210
211         xsegbd_dev->xsegbd = &xsegbd;
212
213         xsegbd_dev->blk_queue = blk_alloc_queue(GFP_KERNEL);
214         if (!xsegbd_dev->blk_queue)
215                 goto out;
216
217         if (!blk_init_allocated_queue(xsegbd_dev->blk_queue, 
218                         xseg_request_fn, &xsegbd_dev->rqlock))
219                 goto outqueue;
220
221         xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
222
223         blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
224         blk_queue_logical_block_size(xsegbd_dev->blk_queue, 512);
225         blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
226         blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
227         
228         //blk_queue_max_segments(dev->blk_queue, 512);
229
230         max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
231         blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
232         blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
233         blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
234         blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
235
236         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue);
237
238         /* vkoukis says we don't need partitions */
239         xsegbd_dev->gd = disk = alloc_disk(1);
240         if (!disk)
241                 goto outqueue;
242
243         disk->major = xsegbd_dev->major;
244         disk->first_minor = 0; // id * XSEGBD_MINORS;
245         disk->fops = &xsegbd_ops;
246         disk->queue = xsegbd_dev->blk_queue;
247         disk->private_data = xsegbd_dev;
248         disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO;
249         snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id);
250
251         ret = 0;
252         
253         /* allow a non-zero sector_size parameter to override the disk size */
254         if (sector_size)
255                 xsegbd_dev->sectors = sector_size;
256         else {
257                 ret = xsegbd_get_size(xsegbd_dev);
258                 if (ret)
259                         goto outdisk;
260         }
261
262         set_capacity(disk, xsegbd_dev->sectors);
263         XSEGLOG("xsegbd active...");
264         add_disk(disk); /* immediately activates the device */
265
266         return 0;
267
268
269 outdisk:
270         put_disk(xsegbd_dev->gd);
271 outqueue:
272         blk_cleanup_queue(xsegbd_dev->blk_queue);
273         xsegbd_dev->blk_queue = NULL;
274 out:
275         xsegbd_dev->gd = NULL;
276         return ret;
277 }
278
279 static void xsegbd_dev_release(struct device *dev)
280 {
281         int ret;
282         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
283
284         xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
285
286         /* cleanup gendisk and blk_queue the right way */
287         if (xsegbd_dev->gd) {
288                 if (xsegbd_dev->gd->flags & GENHD_FL_UP)
289                         del_gendisk(xsegbd_dev->gd);
290
291                 put_disk(xsegbd_dev->gd);
292                 xsegbd_mapclose(xsegbd_dev);
293         }
294         
295         spin_lock(&xsegbd_devices_lock);
296         BUG_ON(xsegbd_devices[xsegbd_dev->src_portno] != xsegbd_dev);
297         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
298         spin_unlock(&xsegbd_devices_lock);
299         
300         /* wait for all pending operations on device to end */
301         wait_event(xsegbd_dev->wq, atomic_read(&xsegbd_dev->usercount) <= 1);
302         if (xsegbd_dev->blk_queue)
303                 blk_cleanup_queue(xsegbd_dev->blk_queue);
304
305
306 //      if (xseg_free_requests(xsegbd_dev->xseg, 
307 //                      xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
308 //              XSEGLOG("Error trying to free requests!\n");
309
310
311         //FIXME xseg_leave to free_up resources ?
312         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
313
314         if (xsegbd_dev->blk_req_pending)
315                 kfree(xsegbd_dev->blk_req_pending);
316         xq_free(&xsegbd_dev->blk_queue_pending);
317
318         kfree(xsegbd_dev);
319
320         module_put(THIS_MODULE);
321 }
322
323 /* ******************* */
324 /* ** Critical Path ** */
325 /* ******************* */
326
327 static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq,
328                         struct request *blkreq)
329 {
330         struct bio_vec *bvec;
331         struct req_iterator iter;
332         uint64_t off = 0;
333         char *data = xseg_get_data(xseg, xreq);
334         rq_for_each_segment(bvec, blkreq, iter) {
335                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
336                 memcpy(data + off, bdata, bvec->bv_len);
337                 off += bvec->bv_len;
338                 kunmap_atomic(bdata);
339         }
340 }
341
342 static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq,
343                         struct request *blkreq)
344 {
345         struct bio_vec *bvec;
346         struct req_iterator iter;
347         uint64_t off = 0;
348         char *data = xseg_get_data(xseg, xreq);
349         rq_for_each_segment(bvec, blkreq, iter) {
350                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
351                 memcpy(bdata, data + off, bvec->bv_len);
352                 off += bvec->bv_len;
353                 kunmap_atomic(bdata);
354         }
355 }
356
357 static void xseg_request_fn(struct request_queue *rq)
358 {
359         struct xseg_request *xreq;
360         struct xsegbd_device *xsegbd_dev = rq->queuedata;
361         struct request *blkreq;
362         struct xsegbd_pending *pending;
363         xqindex blkreq_idx;
364         char *target;
365         uint64_t datalen;
366         xport p;
367         int r;
368         unsigned long flags;
369
370         __xsegbd_get(xsegbd_dev);
371
372         spin_unlock_irq(&xsegbd_dev->rqlock);
373         for (;;) {
374                 if (current_thread_info()->preempt_count || irqs_disabled()){
375                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
376                                         current_thread_info()->preempt_count, irqs_disabled());
377                 }
378                 //XSEGLOG("Priority: %d", current_thread_info()->task->prio);
379                 //XSEGLOG("Static priority: %d", current_thread_info()->task->static_prio);
380                 //XSEGLOG("Normal priority: %d", current_thread_info()->task->normal_prio);
381                 //XSEGLOG("Rt_priority: %u", current_thread_info()->task->rt_priority);
382                 blkreq_idx = Noneidx;
383                 xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, 
384                                 xsegbd_dev->dst_portno, X_ALLOC);
385                 if (!xreq)
386                         break;
387
388                 blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 
389                                                 xsegbd_dev->src_portno);
390                 if (blkreq_idx == Noneidx)
391                         break;
392                 
393                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
394                         XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
395                         BUG_ON(1);
396                         break;
397                 }
398
399                 
400                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
401                 blkreq = blk_fetch_request(rq);
402                 if (!blkreq){
403                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
404                         break;
405                 }
406
407                 if (blkreq->cmd_type != REQ_TYPE_FS) {
408                         //we lose xreq here
409                         XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
410                         __blk_end_request_all(blkreq, 0);
411                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
412                         continue;
413                 }
414                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
415                 if (current_thread_info()->preempt_count || irqs_disabled()){
416                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
417                                         current_thread_info()->preempt_count, irqs_disabled());
418                 }
419
420                 datalen = blk_rq_bytes(blkreq);
421                 r = xseg_prep_request(xsegbd_dev->xseg, xreq, 
422                                         xsegbd_dev->targetlen, datalen);
423                 if (r < 0) {
424                         XSEGLOG("couldn't prep request");
425                         blk_end_request_err(blkreq, r);
426                         BUG_ON(1);
427                         break;
428                 }
429                 r = -ENOMEM;
430                 if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
431                         XSEGLOG("malformed req buffers");
432                         blk_end_request_err(blkreq, r);
433                         BUG_ON(1);
434                         break;
435                 }
436
437                 target = xseg_get_target(xsegbd_dev->xseg, xreq);
438                 strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
439
440                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
441                 pending->dev = xsegbd_dev;
442                 pending->request = blkreq;
443                 pending->comp = NULL;
444                 
445                 xreq->size = datalen;
446                 xreq->offset = blk_rq_pos(blkreq) << 9;
447                 xreq->priv = (uint64_t) blkreq_idx;
448
449                 /*
450                 if (xreq->offset >= (sector_size << 9))
451                         XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
452                                  blk_rq_pos(blkreq), sector_size,
453                                  blkreq->cmd_flags & REQ_FLUSH,
454                                  blkreq->cmd_flags & REQ_FUA);
455                 */
456
457                 if (blkreq->cmd_flags & REQ_FLUSH)
458                         xreq->flags |= XF_FLUSH;
459
460                 if (blkreq->cmd_flags & REQ_FUA)
461                         xreq->flags |= XF_FUA;
462
463                 if (rq_data_dir(blkreq)) {
464                         /* unlock for data transfers? */
465                         blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
466                         xreq->op = X_WRITE;
467                 } else {
468                         xreq->op = X_READ;
469                 }
470
471
472                 r = -EIO;
473                 p = xseg_submit(xsegbd_dev->xseg, xreq, 
474                                         xsegbd_dev->src_portno, X_ALLOC);
475                 if (p == NoPort) {
476                         XSEGLOG("coundn't submit req");
477                         BUG_ON(1);
478                         blk_end_request_err(blkreq, r);
479                         break;
480                 }
481                 WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
482         }
483         if (xreq)
484                 BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
485                                         xsegbd_dev->src_portno) == -1);
486         if (blkreq_idx != Noneidx)
487                 BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
488                                 blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
489         spin_lock_irq(&xsegbd_dev->rqlock);
490         __xsegbd_put(xsegbd_dev);
491 }
492
493 int update_dev_sectors_from_request(    struct xsegbd_device *xsegbd_dev,
494                                         struct xseg_request *xreq       )
495 {
496         void *data;
497         if (!xreq) {
498                 XSEGLOG("Invalid xreq");
499                 return -EIO;
500         }
501
502         if (xreq->state & XS_FAILED)
503                 return -ENOENT;
504
505         if (!(xreq->state & XS_SERVED))
506                 return -EIO;
507
508         data = xseg_get_data(xsegbd_dev->xseg, xreq);
509         if (!data) {
510                 XSEGLOG("Invalid req data");
511                 return -EIO;
512         }
513         if (!xsegbd_dev) {
514                 XSEGLOG("Invalid xsegbd_dev");
515                 return -ENOENT;
516         }
517         xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL;
518         return 0;
519 }
520
521 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
522 {
523         struct xseg_request *xreq;
524         char *target;
525         uint64_t datalen;
526         xqindex blkreq_idx;
527         struct xsegbd_pending *pending;
528         struct completion comp;
529         xport p;
530         void *data;
531         int ret = -EBUSY, r;
532
533         __xsegbd_get(xsegbd_dev);
534
535         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
536                         xsegbd_dev->dst_portno, X_ALLOC);
537         if (!xreq)
538                 goto out;
539
540         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 
541                                 sizeof(struct xseg_reply_info)));
542
543         init_completion(&comp);
544         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
545         if (blkreq_idx == Noneidx)
546                 goto out_put;
547         
548         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
549         pending->dev = xsegbd_dev;
550         pending->request = NULL;
551         pending->comp = &comp;
552
553         
554         xreq->priv = (uint64_t) blkreq_idx;
555
556         target = xseg_get_target(xsegbd_dev->xseg, xreq);
557         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
558         xreq->size = xreq->datalen;
559         xreq->offset = 0;
560         xreq->op = X_INFO;
561
562         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
563         p = xseg_submit(xsegbd_dev->xseg, xreq, 
564                                 xsegbd_dev->src_portno, X_ALLOC);
565         if ( p == NoPort) {
566                 XSEGLOG("couldn't submit request");
567                 BUG_ON(1);
568                 goto out_queue;
569         }
570         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
571         XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
572         wait_for_completion_interruptible(&comp);
573         XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
574         ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
575         //XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
576 out_put:
577         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
578 out:
579         __xsegbd_put(xsegbd_dev);
580         return ret;
581
582 out_queue:
583         pending->dev = NULL;
584         pending->comp = NULL;
585         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
586         
587         goto out;
588 }
589
590 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
591 {
592         struct xseg_request *xreq;
593         char *target;
594         uint64_t datalen;
595         xqindex blkreq_idx;
596         struct xsegbd_pending *pending;
597         struct completion comp;
598         xport p;
599         void *data;
600         int ret = -EBUSY, r;
601
602         __xsegbd_get(xsegbd_dev);
603         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
604                         xsegbd_dev->dst_portno, X_ALLOC);
605         if (!xreq)
606                 goto out;
607
608         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
609
610         init_completion(&comp);
611         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
612         if (blkreq_idx == Noneidx)
613                 goto out_put;
614         
615         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
616         pending->dev = xsegbd_dev;
617         pending->request = NULL;
618         pending->comp = &comp;
619
620         
621         xreq->priv = (uint64_t) blkreq_idx;
622
623         target = xseg_get_target(xsegbd_dev->xseg, xreq);
624         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
625         xreq->size = xreq->datalen;
626         xreq->offset = 0;
627         xreq->op = X_CLOSE;
628
629         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
630         p = xseg_submit(xsegbd_dev->xseg, xreq, 
631                                 xsegbd_dev->src_portno, X_ALLOC);
632         if ( p == NoPort) {
633                 XSEGLOG("couldn't submit request");
634                 BUG_ON(1);
635                 goto out_queue;
636         }
637         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
638         wait_for_completion_interruptible(&comp);
639         ret = 0;
640         if (xreq->state & XS_FAILED)
641                 XSEGLOG("Couldn't close disk on mapper");
642 out_put:
643         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
644 out:
645         __xsegbd_put(xsegbd_dev);
646         return ret;
647
648 out_queue:
649         pending->dev = NULL;
650         pending->comp = NULL;
651         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
652         
653         goto out;
654 }
655
656 static void xseg_callback(xport portno)
657 {
658         struct xsegbd_device *xsegbd_dev;
659         struct xseg_request *xreq;
660         struct request *blkreq;
661         struct xsegbd_pending *pending;
662         unsigned long flags;
663         xqindex blkreq_idx, ridx;
664         int err;
665         void *data;
666
667         xsegbd_dev  = __xsegbd_get_dev(portno);
668         if (!xsegbd_dev) {
669                 XSEGLOG("portno: %u has no xsegbd device assigned", portno);
670                 WARN_ON(1);
671                 return;
672         }
673
674         for (;;) {
675                 xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
676                 xreq = xseg_receive(xsegbd_dev->xseg, portno);
677                 if (!xreq)
678                         break;
679
680                 xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
681
682                 blkreq_idx = (xqindex) xreq->priv;
683                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
684                         WARN_ON(1);
685                         //FIXME maybe put request?
686                         continue;
687                 }
688
689                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
690                 if (pending->comp) {
691                         /* someone is blocking on this request
692                            and will handle it when we wake them up. */
693                         complete(pending->comp);
694                         /* the request is blocker's responsibility so
695                            we will not put_request(); */
696
697                         continue;
698                 }
699
700                 /* this is now treated as a block I/O request to end */
701                 blkreq = pending->request;
702                 pending->request = NULL;
703                 if (xsegbd_dev != pending->dev) {
704                         //FIXME maybe put request?
705                         XSEGLOG("xsegbd_dev != pending->dev");
706                         BUG_ON(1);
707                         continue;
708                 }
709                 pending->dev = NULL;
710                 if (!blkreq){
711                         //FIXME maybe put request?
712                         XSEGLOG("blkreq does not exist");
713                         BUG_ON(1);
714                         continue;
715                 }
716
717                 err = -EIO;
718                 if (!(xreq->state & XS_SERVED))
719                         goto blk_end;
720
721                 if (xreq->serviced != blk_rq_bytes(blkreq))
722                         goto blk_end;
723
724                 err = 0;
725                 /* unlock for data transfer? */
726                 if (!rq_data_dir(blkreq)){
727                         xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
728                 }       
729 blk_end:
730                 blk_end_request_all(blkreq, err);
731                 
732                 ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
733                                         blkreq_idx, xsegbd_dev->src_portno);
734                 if (ridx == Noneidx) {
735                         XSEGLOG("couldnt append blkreq_idx");
736                         WARN_ON(1);
737                 }
738
739                 if (xseg_put_request(xsegbd_dev->xseg, xreq, 
740                                                 xsegbd_dev->src_portno) < 0){
741                         XSEGLOG("couldn't put req");
742                         BUG_ON(1);
743                 }
744         }
745         if (xsegbd_dev) {
746                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
747                 xseg_request_fn(xsegbd_dev->blk_queue);
748                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
749                 __xsegbd_put(xsegbd_dev);
750         }
751 }
752
753
754 /* sysfs interface */
755
756 static struct bus_type xsegbd_bus_type = {
757         .name   = "xsegbd",
758 };
759
760 static ssize_t xsegbd_size_show(struct device *dev,
761                                         struct device_attribute *attr, char *buf)
762 {
763         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
764
765         return sprintf(buf, "%llu\n", (unsigned long long) xsegbd_dev->sectors * 512ULL);
766 }
767
768 static ssize_t xsegbd_major_show(struct device *dev,
769                                         struct device_attribute *attr, char *buf)
770 {
771         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
772
773         return sprintf(buf, "%d\n", xsegbd_dev->major);
774 }
775
776 static ssize_t xsegbd_srcport_show(struct device *dev,
777                                         struct device_attribute *attr, char *buf)
778 {
779         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
780
781         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->src_portno);
782 }
783
784 static ssize_t xsegbd_dstport_show(struct device *dev,
785                                         struct device_attribute *attr, char *buf)
786 {
787         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
788
789         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->dst_portno);
790 }
791
792 static ssize_t xsegbd_id_show(struct device *dev,
793                                         struct device_attribute *attr, char *buf)
794 {
795         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
796
797         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->id);
798 }
799
800 static ssize_t xsegbd_reqs_show(struct device *dev,
801                                         struct device_attribute *attr, char *buf)
802 {
803         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
804
805         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->nr_requests);
806 }
807
808 static ssize_t xsegbd_target_show(struct device *dev,
809                                         struct device_attribute *attr, char *buf)
810 {
811         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
812
813         return sprintf(buf, "%s\n", xsegbd_dev->target);
814 }
815
816 static ssize_t xsegbd_image_refresh(struct device *dev,
817                                         struct device_attribute *attr,
818                                         const char *buf,
819                                         size_t size)
820 {
821         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
822         int rc, ret = size;
823
824         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
825
826         rc = xsegbd_get_size(xsegbd_dev);
827         if (rc < 0) {
828                 ret = rc;
829                 goto out;
830         }
831
832         set_capacity(xsegbd_dev->gd, xsegbd_dev->sectors);
833
834 out:
835         mutex_unlock(&xsegbd_mutex);
836         return ret;
837 }
838
839 static ssize_t xsegbd_cleanup(struct device *dev,
840                                         struct device_attribute *attr,
841                                         const char *buf,
842                                         size_t size)
843 {
844         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
845         int ret = size, i;
846         struct request *blkreq = NULL;
847         struct xsegbd_pending *pending = NULL;
848         struct completion *comp = NULL;
849
850         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
851         xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, 
852                                 xsegbd_dev->src_portno);
853         for (i = 0; i < xsegbd_dev->nr_requests; i++) {
854                 if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
855                         pending = &xsegbd_dev->blk_req_pending[i];
856                         blkreq = pending->request;
857                         pending->request = NULL;
858                         comp = pending->comp;
859                         pending->comp = NULL;
860                         if (blkreq){
861                                 XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i);
862                                 blk_end_request_all(blkreq, -EIO);
863                         }
864                         if (comp){
865                                 XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i);
866                                 complete(comp);
867                         }
868                         __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
869                 }
870         }
871         xlock_release(&xsegbd_dev->blk_queue_pending.lock);
872
873         mutex_unlock(&xsegbd_mutex);
874         return ret;
875 }
876
877 static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
878 static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
879 static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
880 static DEVICE_ATTR(dstport, S_IRUGO, xsegbd_dstport_show, NULL);
881 static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL);
882 static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
883 static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
884 static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
885 static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
886
887 static struct attribute *xsegbd_attrs[] = {
888         &dev_attr_size.attr,
889         &dev_attr_major.attr,
890         &dev_attr_srcport.attr,
891         &dev_attr_dstport.attr,
892         &dev_attr_id.attr,
893         &dev_attr_reqs.attr,
894         &dev_attr_target.attr,
895         &dev_attr_refresh.attr,
896         &dev_attr_cleanup.attr,
897         NULL
898 };
899
900 static struct attribute_group xsegbd_attr_group = {
901         .attrs = xsegbd_attrs,
902 };
903
904 static const struct attribute_group *xsegbd_attr_groups[] = {
905         &xsegbd_attr_group,
906         NULL
907 };
908
909 static void xsegbd_sysfs_dev_release(struct device *dev)
910 {
911 }
912
913 static struct device_type xsegbd_device_type = {
914         .name           = "xsegbd",
915         .groups         = xsegbd_attr_groups,
916         .release        = xsegbd_sysfs_dev_release,
917 };
918
919 static void xsegbd_root_dev_release(struct device *dev)
920 {
921 }
922
923 static struct device xsegbd_root_dev = {
924         .init_name      = "xsegbd",
925         .release        = xsegbd_root_dev_release,
926 };
927
928 static int xsegbd_bus_add_dev(struct xsegbd_device *xsegbd_dev)
929 {
930         int ret = -ENOMEM;
931         struct device *dev;
932
933         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
934         dev = &xsegbd_dev->dev;
935
936         dev->bus = &xsegbd_bus_type;
937         dev->type = &xsegbd_device_type;
938         dev->parent = &xsegbd_root_dev;
939         dev->release = xsegbd_dev_release;
940         dev_set_name(dev, "%d", xsegbd_dev->id);
941
942         ret = device_register(dev);
943
944         mutex_unlock(&xsegbd_mutex);
945         return ret;
946 }
947
948 static void xsegbd_bus_del_dev(struct xsegbd_device *xsegbd_dev)
949 {
950         device_unregister(&xsegbd_dev->dev);
951 }
952
953 static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
954 {
955         struct xsegbd_device *xsegbd_dev;
956         struct xseg_port *port;
957         ssize_t ret = -ENOMEM;
958
959         if (!try_module_get(THIS_MODULE))
960                 return -ENODEV;
961
962         xsegbd_dev = kzalloc(sizeof(*xsegbd_dev), GFP_KERNEL);
963         if (!xsegbd_dev)
964                 goto out;
965
966         spin_lock_init(&xsegbd_dev->rqlock);
967         INIT_LIST_HEAD(&xsegbd_dev->node);
968         init_waitqueue_head(&xsegbd_dev->wq);
969         atomic_set(&xsegbd_dev->usercount, 0);
970
971         /* parse cmd */
972         if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
973                         "%d:%d:%d", xsegbd_dev->target, &xsegbd_dev->src_portno,
974                         &xsegbd_dev->dst_portno, &xsegbd_dev->nr_requests) < 3) {
975                 ret = -EINVAL;
976                 goto out_dev;
977         }
978         xsegbd_dev->targetlen = strlen(xsegbd_dev->target);
979
980         spin_lock(&xsegbd_devices_lock);
981         if (xsegbd_devices[xsegbd_dev->src_portno] != NULL) {
982                 ret = -EINVAL;
983                 goto out_unlock;
984         }
985         xsegbd_devices[xsegbd_dev->src_portno] = xsegbd_dev;
986         xsegbd_dev->id = xsegbd_dev->src_portno;
987         spin_unlock(&xsegbd_devices_lock);
988
989         XSEGLOG("registering block device major %d", major);
990         ret = register_blkdev(major, XSEGBD_NAME);
991         if (ret < 0) {
992                 XSEGLOG("cannot register block device!");
993                 ret = -EBUSY;
994                 goto out_delentry;
995         }
996         xsegbd_dev->major = ret;
997         XSEGLOG("registered block device major %d", xsegbd_dev->major);
998
999         ret = xsegbd_bus_add_dev(xsegbd_dev);
1000         if (ret)
1001                 goto out_blkdev;
1002
1003         if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending, 
1004                                 xsegbd_dev->nr_requests,
1005                                 xsegbd_dev->nr_requests))
1006                 goto out_bus;
1007
1008         xsegbd_dev->blk_req_pending = kzalloc(
1009                         xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
1010                                    GFP_KERNEL);
1011         if (!xsegbd_dev->blk_req_pending)
1012                 goto out_freeq;
1013
1014         
1015         XSEGLOG("joining segment");
1016         //FIXME use xsebd module config for now
1017         xsegbd_dev->xseg = xseg_join(   xsegbd.config.type,
1018                                         xsegbd.config.name,
1019                                         "segdev",
1020                                         xseg_callback           );
1021         if (!xsegbd_dev->xseg)
1022                 goto out_freepending;
1023         
1024
1025         XSEGLOG("binding to source port %u (destination %u)",
1026                         xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
1027         port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
1028         if (!port) {
1029                 XSEGLOG("cannot bind to port");
1030                 ret = -EFAULT;
1031
1032                 goto out_xseg;
1033         }
1034         
1035         if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
1036                 XSEGLOG("portno != xsegbd_dev->src_portno");
1037                 BUG_ON(1);
1038                 ret = -EFAULT;
1039                 goto out_xseg;
1040         }
1041
1042
1043         /* make sure we don't get any requests until we're ready to handle them */
1044         xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1045
1046         ret = xsegbd_dev_init(xsegbd_dev);
1047         if (ret)
1048                 goto out_xseg;
1049
1050         xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1051         return count;
1052
1053 out_xseg:
1054         xseg_leave(xsegbd_dev->xseg);
1055         
1056 out_freepending:
1057         kfree(xsegbd_dev->blk_req_pending);
1058
1059 out_freeq:
1060         xq_free(&xsegbd_dev->blk_queue_pending);
1061
1062 out_bus:
1063         xsegbd_bus_del_dev(xsegbd_dev);
1064         return ret;
1065
1066 out_blkdev:
1067         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
1068
1069 out_delentry:
1070         spin_lock(&xsegbd_devices_lock);
1071         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
1072
1073 out_unlock:
1074         spin_unlock(&xsegbd_devices_lock);
1075
1076 out_dev:
1077         kfree(xsegbd_dev);
1078
1079 out:
1080         return ret;
1081 }
1082
1083 static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count)
1084 {
1085         struct xsegbd_device *xsegbd_dev = NULL;
1086         int id, ret;
1087         unsigned long ul_id;
1088
1089         ret = strict_strtoul(buf, 10, &ul_id);
1090         if (ret)
1091                 return ret;
1092
1093         id = (int) ul_id;
1094         if (id != ul_id)
1095                 return -EINVAL;
1096
1097         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
1098
1099         ret = count;
1100         //FIXME when to put dev?
1101         xsegbd_dev = __xsegbd_get_dev(id);
1102         if (!xsegbd_dev) {
1103                 ret = -ENOENT;
1104                 goto out_unlock;
1105         }
1106         xsegbd_bus_del_dev(xsegbd_dev);
1107
1108 out_unlock:
1109         mutex_unlock(&xsegbd_mutex);
1110         return ret;
1111 }
1112
1113 static struct bus_attribute xsegbd_bus_attrs[] = {
1114         __ATTR(add, S_IWUSR, NULL, xsegbd_add),
1115         __ATTR(remove, S_IWUSR, NULL, xsegbd_remove),
1116         __ATTR_NULL
1117 };
1118
1119 static int xsegbd_sysfs_init(void)
1120 {
1121         int ret;
1122
1123         ret = device_register(&xsegbd_root_dev);
1124         if (ret < 0)
1125                 return ret;
1126
1127         xsegbd_bus_type.bus_attrs = xsegbd_bus_attrs;
1128         ret = bus_register(&xsegbd_bus_type);
1129         if (ret < 0)
1130                 device_unregister(&xsegbd_root_dev);
1131
1132         return ret;
1133 }
1134
1135 static void xsegbd_sysfs_cleanup(void)
1136 {
1137         bus_unregister(&xsegbd_bus_type);
1138         device_unregister(&xsegbd_root_dev);
1139 }
1140
1141 /* *************************** */
1142 /* ** Module Initialization ** */
1143 /* *************************** */
1144
1145 static int __init xsegbd_init(void)
1146 {
1147         int ret = -ENOMEM;
1148         xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL);
1149         if (!xsegbd_devices)
1150                 goto out;
1151
1152         spin_lock_init(&xsegbd_devices_lock);
1153
1154         ret = -ENOSYS;
1155         ret = xsegbd_xseg_init();
1156         if (ret)
1157                 goto out_free;
1158
1159         ret = xsegbd_sysfs_init();
1160         if (ret)
1161                 goto out_xseg;
1162
1163         XSEGLOG("initialization complete");
1164
1165 out:
1166         return ret;
1167
1168 out_xseg:
1169         xsegbd_xseg_quit();
1170         
1171 out_free:
1172         kfree(xsegbd_devices);
1173
1174         goto out;
1175 }
1176
1177 static void __exit xsegbd_exit(void)
1178 {
1179         xsegbd_sysfs_cleanup();
1180         xsegbd_xseg_quit();
1181 }
1182
1183 module_init(xsegbd_init);
1184 module_exit(xsegbd_exit);
1185