add X_CLOSE in xsegbd removal
[archipelago] / xseg / peers / kernel / xsegbd.c
1 /* xsegbd.c
2  *
3  */
4
5 #include <linux/module.h>
6 #include <linux/moduleparam.h>
7 #include <linux/init.h>
8 #include <linux/sched.h>
9 #include <linux/kernel.h>
10 #include <linux/slab.h>
11 #include <linux/fs.h>
12 #include <linux/errno.h>
13 #include <linux/timer.h>
14 #include <linux/types.h>
15 #include <linux/vmalloc.h>
16 #include <linux/genhd.h>
17 #include <linux/blkdev.h>
18 #include <linux/bio.h>
19 #include <linux/device.h>
20 #include <linux/completion.h>
21
22 #include <sys/kernel/segdev.h>
23 #include "xsegbd.h"
24 #include <xseg/protocol.h>
25
26 #define XSEGBD_MINORS 1
27 /* define max request size to be used in xsegbd */
28 //FIXME should we make this 4MB instead of 256KB ?
29 #define XSEGBD_MAX_REQUEST_SIZE 262144U
30
31 MODULE_DESCRIPTION("xsegbd");
32 MODULE_AUTHOR("XSEG");
33 MODULE_LICENSE("GPL");
34
35 static long sector_size = 0;
36 static long blksize = 512;
37 static int major = 0;
38 static int max_dev = 1024;
39 static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd";
40 static char spec[256] = "segdev:xsegbd:4:1024:12";
41
42 module_param(sector_size, long, 0644);
43 module_param(blksize, long, 0644);
44 module_param(max_dev, int, 0644);
45 module_param(major, int, 0644);
46 module_param_string(name, name, sizeof(name), 0644);
47 module_param_string(spec, spec, sizeof(spec), 0644);
48
49 static struct xsegbd xsegbd;
50 static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
51 static DEFINE_MUTEX(xsegbd_mutex);
52 static DEFINE_SPINLOCK(xsegbd_devices_lock);
53
54
55
56 static struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
57 {
58         struct xsegbd_device *xsegbd_dev = NULL;
59
60         spin_lock(&xsegbd_devices_lock);
61         xsegbd_dev = xsegbd_devices[id];
62         spin_unlock(&xsegbd_devices_lock);
63
64         return xsegbd_dev;
65 }
66
67 /* ************************* */
68 /* ***** sysfs helpers ***** */
69 /* ************************* */
70
71 static struct xsegbd_device *dev_to_xsegbd(struct device *dev)
72 {
73         return container_of(dev, struct xsegbd_device, dev);
74 }
75
76 static struct device *xsegbd_get_dev(struct xsegbd_device *xsegbd_dev)
77 {
78         /* FIXME */
79         return get_device(&xsegbd_dev->dev);
80 }
81
82 static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev)
83 {
84         put_device(&xsegbd_dev->dev);
85 }
86
87 /* ************************* */
88 /* ** XSEG Initialization ** */
89 /* ************************* */
90
91 static void xseg_callback(uint32_t portno);
92
93 int xsegbd_xseg_init(void)
94 {
95         int r;
96
97         if (!xsegbd.name[0])
98                 strncpy(xsegbd.name, name, XSEGBD_SEGMENT_NAMELEN);
99
100         r = xseg_initialize();
101         if (r) {
102                 XSEGLOG("cannot initialize 'segdev' peer");
103                 goto err;
104         }
105
106         r = xseg_parse_spec(spec, &xsegbd.config);
107         if (r)
108                 goto err;
109
110         if (strncmp(xsegbd.config.type, "segdev", 16))
111                 XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'",
112                          xsegbd.config.type);
113
114         /* leave it here for now */
115         XSEGLOG("joining segment");
116         xsegbd.xseg = xseg_join(        xsegbd.config.type,
117                                         xsegbd.config.name,
118                                         "segdev",
119                                         xseg_callback           );
120         if (!xsegbd.xseg) {
121                 XSEGLOG("cannot find segment");
122                 r = -ENODEV;
123                 goto err;
124         }
125
126         return 0;
127 err:
128         return r;
129
130 }
131
132 int xsegbd_xseg_quit(void)
133 {
134         struct segdev *segdev;
135
136         /* make sure to unmap the segment first */
137         segdev = segdev_get(0);
138         clear_bit(SEGDEV_RESERVED, &segdev->flags);
139         xsegbd.xseg->priv->segment_type.ops.unmap(xsegbd.xseg, xsegbd.xseg->segment_size);
140         segdev_put(segdev);
141
142         return 0;
143 }
144
145
146 /* ***************************** */
147 /* ** Block Device Operations ** */
148 /* ***************************** */
149
150 static int xsegbd_open(struct block_device *bdev, fmode_t mode)
151 {
152         struct gendisk *disk = bdev->bd_disk;
153         struct xsegbd_device *xsegbd_dev = disk->private_data;
154
155         xsegbd_get_dev(xsegbd_dev);
156
157         return 0;
158 }
159
160 static int xsegbd_release(struct gendisk *gd, fmode_t mode)
161 {
162         struct xsegbd_device *xsegbd_dev = gd->private_data;
163
164         xsegbd_put_dev(xsegbd_dev);
165
166         return 0;
167 }
168
169 static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode,
170                         unsigned int cmd, unsigned long arg)
171 {
172         return -ENOTTY;
173 }
174
175 static const struct block_device_operations xsegbd_ops = {
176         .owner          = THIS_MODULE,
177         .open           = xsegbd_open,
178         .release        = xsegbd_release,
179         .ioctl          = xsegbd_ioctl 
180 };
181
182
183 /* *************************** */
184 /* ** Device Initialization ** */
185 /* *************************** */
186
187 static void xseg_request_fn(struct request_queue *rq);
188 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev);
189 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev);
190
191 static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
192 {
193         int ret = -ENOMEM;
194         struct gendisk *disk;
195         unsigned int max_request_size_bytes;
196
197         spin_lock_init(&xsegbd_dev->rqlock);
198
199         xsegbd_dev->xsegbd = &xsegbd;
200
201         xsegbd_dev->blk_queue = blk_alloc_queue(GFP_KERNEL);
202         if (!xsegbd_dev->blk_queue)
203                 goto out;
204
205         if (!blk_init_allocated_queue(xsegbd_dev->blk_queue, 
206                         xseg_request_fn, &xsegbd_dev->rqlock))
207                 goto outqueue;
208
209         xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
210
211         blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
212         blk_queue_logical_block_size(xsegbd_dev->blk_queue, 512);
213         blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
214         blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
215         
216         //blk_queue_max_segments(dev->blk_queue, 512);
217
218         max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
219         blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
220         blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
221         blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
222         blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
223
224         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue);
225
226         /* vkoukis says we don't need partitions */
227         xsegbd_dev->gd = disk = alloc_disk(1);
228         if (!disk)
229                 goto outqueue;
230
231         disk->major = xsegbd_dev->major;
232         disk->first_minor = 0; // id * XSEGBD_MINORS;
233         disk->fops = &xsegbd_ops;
234         disk->queue = xsegbd_dev->blk_queue;
235         disk->private_data = xsegbd_dev;
236         disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO;
237         snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id);
238
239         ret = 0;
240         
241         /* allow a non-zero sector_size parameter to override the disk size */
242         if (sector_size)
243                 xsegbd_dev->sectors = sector_size;
244         else {
245                 ret = xsegbd_get_size(xsegbd_dev);
246                 if (ret)
247                         goto outdisk;
248         }
249
250         set_capacity(disk, xsegbd_dev->sectors);
251         XSEGLOG("xsegbd active...");
252         add_disk(disk); /* immediately activates the device */
253
254         return 0;
255
256
257 outdisk:
258         put_disk(xsegbd_dev->gd);
259 outqueue:
260         blk_cleanup_queue(xsegbd_dev->blk_queue);
261 out:
262         xsegbd_dev->gd = NULL;
263         return ret;
264 }
265
266 static void xsegbd_dev_release(struct device *dev)
267 {
268         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
269         
270         xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
271
272         /* cleanup gendisk and blk_queue the right way */
273         if (xsegbd_dev->gd) {
274                 if (xsegbd_dev->gd->flags & GENHD_FL_UP)
275                         del_gendisk(xsegbd_dev->gd);
276
277                 blk_cleanup_queue(xsegbd_dev->blk_queue);
278                 put_disk(xsegbd_dev->gd);
279                 xsegbd_mapclose(xsegbd_dev);
280         }
281
282 //      if (xseg_free_requests(xsegbd_dev->xseg, 
283 //                      xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
284 //              XSEGLOG("Error trying to free requests!\n");
285
286
287         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
288
289         spin_lock(&xsegbd_devices_lock);
290         BUG_ON(xsegbd_devices[xsegbd_dev->src_portno] != xsegbd_dev);
291         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
292         spin_unlock(&xsegbd_devices_lock);
293
294         if (xsegbd_dev->blk_req_pending)
295                 kfree(xsegbd_dev->blk_req_pending);
296         xq_free(&xsegbd_dev->blk_queue_pending);
297
298         kfree(xsegbd_dev);
299
300         module_put(THIS_MODULE);
301 }
302
303 /* ******************* */
304 /* ** Critical Path ** */
305 /* ******************* */
306
307 static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq,
308                         struct request *blkreq)
309 {
310         struct bio_vec *bvec;
311         struct req_iterator iter;
312         uint64_t off = 0;
313         char *data = xseg_get_data(xseg, xreq);
314         rq_for_each_segment(bvec, blkreq, iter) {
315                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
316                 memcpy(data + off, bdata, bvec->bv_len);
317                 off += bvec->bv_len;
318                 kunmap_atomic(bdata);
319         }
320 }
321
322 static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq,
323                         struct request *blkreq)
324 {
325         struct bio_vec *bvec;
326         struct req_iterator iter;
327         uint64_t off = 0;
328         char *data = xseg_get_data(xseg, xreq);
329         rq_for_each_segment(bvec, blkreq, iter) {
330                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
331                 memcpy(bdata, data + off, bvec->bv_len);
332                 off += bvec->bv_len;
333                 kunmap_atomic(bdata);
334         }
335 }
336
337 static void xseg_request_fn(struct request_queue *rq)
338 {
339         struct xseg_request *xreq;
340         struct xsegbd_device *xsegbd_dev = rq->queuedata;
341         struct request *blkreq;
342         struct xsegbd_pending *pending;
343         xqindex blkreq_idx;
344         char *target;
345         uint64_t datalen;
346         xport p;
347         int r;
348
349         for (;;) {
350                 blkreq_idx = Noneidx;
351                 xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, 
352                                 xsegbd_dev->dst_portno, X_ALLOC);
353                 if (!xreq)
354                         break;
355
356                 blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 
357                                                 xsegbd_dev->src_portno);
358                 if (blkreq_idx == Noneidx)
359                         break;
360                 
361                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
362                         XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
363                         BUG_ON(1);
364                         break;
365                 }
366
367                 blkreq = blk_fetch_request(rq);
368                 if (!blkreq)
369                         break;
370
371                 if (blkreq->cmd_type != REQ_TYPE_FS) {
372                         //we lose xreq here
373                         XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
374                         __blk_end_request_all(blkreq, 0);
375                         continue;
376                 }
377
378                 datalen = blk_rq_bytes(blkreq);
379                 r = xseg_prep_request(xsegbd_dev->xseg, xreq, 
380                                         xsegbd_dev->targetlen, datalen);
381                 if (r < 0) {
382                         XSEGLOG("couldn't prep request");
383                         __blk_end_request_err(blkreq, r);
384                         BUG_ON(1);
385                         break;
386                 }
387                 r = -ENOMEM;
388                 if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
389                         XSEGLOG("malformed req buffers");
390                         __blk_end_request_err(blkreq, r);
391                         BUG_ON(1);
392                         break;
393                 }
394
395                 target = xseg_get_target(xsegbd_dev->xseg, xreq);
396                 strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
397
398                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
399                 pending->dev = xsegbd_dev;
400                 pending->request = blkreq;
401                 pending->comp = NULL;
402                 
403                 xreq->size = datalen;
404                 xreq->offset = blk_rq_pos(blkreq) << 9;
405                 xreq->priv = (uint64_t) blkreq_idx;
406
407                 /*
408                 if (xreq->offset >= (sector_size << 9))
409                         XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
410                                  blk_rq_pos(blkreq), sector_size,
411                                  blkreq->cmd_flags & REQ_FLUSH,
412                                  blkreq->cmd_flags & REQ_FUA);
413                 */
414
415                 if (blkreq->cmd_flags & REQ_FLUSH)
416                         xreq->flags |= XF_FLUSH;
417
418                 if (blkreq->cmd_flags & REQ_FUA)
419                         xreq->flags |= XF_FUA;
420
421                 if (rq_data_dir(blkreq)) {
422                         /* unlock for data transfers? */
423                         blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
424                         xreq->op = X_WRITE;
425                 } else {
426                         xreq->op = X_READ;
427                 }
428
429
430                 r = -EIO;
431                 p = xseg_submit(xsegbd_dev->xseg, xreq, 
432                                         xsegbd_dev->src_portno, X_ALLOC);
433                 if (p == NoPort) {
434                         XSEGLOG("coundn't submit req");
435                         BUG_ON(1);
436                         __blk_end_request_err(blkreq, r);
437                         break;
438                 }
439                 WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
440         }
441         if (xreq)
442                 BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
443                                         xsegbd_dev->src_portno) == -1);
444         if (blkreq_idx != Noneidx)
445                 BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
446                                 blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
447 }
448
449 int update_dev_sectors_from_request(    struct xsegbd_device *xsegbd_dev,
450                                         struct xseg_request *xreq       )
451 {
452         void *data;
453         if (!xreq) {
454                 XSEGLOG("Invalid xreq");
455                 return -EIO;
456         }
457
458         if (xreq->state & XS_FAILED)
459                 return -ENOENT;
460
461         if (!(xreq->state & XS_SERVED))
462                 return -EIO;
463
464         data = xseg_get_data(xsegbd_dev->xseg, xreq);
465         if (!data) {
466                 XSEGLOG("Invalid req data");
467                 return -EIO;
468         }
469         if (!xsegbd_dev) {
470                 XSEGLOG("Invalid xsegbd_dev");
471                 return -ENOENT;
472         }
473         xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL;
474         return 0;
475 }
476
477 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
478 {
479         struct xseg_request *xreq;
480         char *target;
481         uint64_t datalen;
482         xqindex blkreq_idx;
483         struct xsegbd_pending *pending;
484         struct completion comp;
485         xport p;
486         void *data;
487         int ret = -EBUSY, r;
488         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
489                         xsegbd_dev->dst_portno, X_ALLOC);
490         if (!xreq)
491                 return ret;
492
493         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 
494                                 sizeof(struct xseg_reply_info)));
495
496         init_completion(&comp);
497         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
498         if (blkreq_idx == Noneidx)
499                 goto out;
500         
501         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
502         pending->dev = xsegbd_dev;
503         pending->request = NULL;
504         pending->comp = &comp;
505
506         
507         xreq->priv = (uint64_t) blkreq_idx;
508
509         target = xseg_get_target(xsegbd_dev->xseg, xreq);
510         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
511         xreq->size = xreq->datalen;
512         xreq->offset = 0;
513         xreq->op = X_INFO;
514
515         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
516         p = xseg_submit(xsegbd_dev->xseg, xreq, 
517                                 xsegbd_dev->src_portno, X_ALLOC);
518         if ( p == NoPort) {
519                 XSEGLOG("couldn't submit request");
520                 BUG_ON(1);
521                 goto out_queue;
522         }
523         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
524         XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
525         wait_for_completion_interruptible(&comp);
526         XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
527         ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
528         //XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
529 out:
530         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
531         return ret;
532
533 out_queue:
534         pending->dev = NULL;
535         pending->comp = NULL;
536         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
537         
538         goto out;
539 }
540
541 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
542 {
543         struct xseg_request *xreq;
544         char *target;
545         uint64_t datalen;
546         xqindex blkreq_idx;
547         struct xsegbd_pending *pending;
548         struct completion comp;
549         xport p;
550         void *data;
551         int ret = -EBUSY, r;
552         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
553                         xsegbd_dev->dst_portno, X_ALLOC);
554         if (!xreq)
555                 return ret;;
556
557         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
558         BUG_ON(xreq->bufferlen - xsegbd_dev->targetlen < datalen);
559
560         init_completion(&comp);
561         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
562         if (blkreq_idx == Noneidx)
563                 goto out;
564         
565         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
566         pending->dev = xsegbd_dev;
567         pending->request = NULL;
568         pending->comp = &comp;
569
570         
571         xreq->priv = (uint64_t) blkreq_idx;
572
573         target = xseg_get_target(xsegbd_dev->xseg, xreq);
574         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
575         xreq->size = xreq->datalen;
576         xreq->offset = 0;
577         xreq->op = X_CLOSE;
578
579         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
580         p = xseg_submit(xsegbd_dev->xseg, xreq, 
581                                 xsegbd_dev->src_portno, X_ALLOC);
582         if ( p == NoPort) {
583                 XSEGLOG("couldn't submit request");
584                 BUG_ON(1);
585                 goto out_queue;
586         }
587         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
588         wait_for_completion_interruptible(&comp);
589         ret = 0;
590         if (xreq->state & XS_FAILED)
591                 XSEGLOG("Couldn't close disk on mapper");
592 out:
593         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
594         return ret;
595
596 out_queue:
597         pending->dev = NULL;
598         pending->comp = NULL;
599         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
600         
601         goto out;
602 }
603
604 static void xseg_callback(xport portno)
605 {
606         struct xsegbd_device *xsegbd_dev;
607         struct xseg_request *xreq;
608         struct request *blkreq;
609         struct xsegbd_pending *pending;
610         unsigned long flags;
611         xqindex blkreq_idx, ridx;
612         int err;
613         void *data;
614
615         xsegbd_dev  = __xsegbd_get_dev(portno);
616         if (!xsegbd_dev) {
617                 XSEGLOG("portno: %u has no xsegbd device assigned", portno);
618                 WARN_ON(1);
619                 return;
620         }
621
622         for (;;) {
623                 xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
624                 xreq = xseg_receive(xsegbd_dev->xseg, portno);
625                 if (!xreq)
626                         break;
627
628                 xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
629
630                 blkreq_idx = (xqindex) xreq->priv;
631                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
632                         WARN_ON(1);
633                         //FIXME maybe put request?
634                         continue;
635                 }
636
637                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
638                 if (pending->comp) {
639                         /* someone is blocking on this request
640                            and will handle it when we wake them up. */
641                         complete(pending->comp);
642                         /* the request is blocker's responsibility so
643                            we will not put_request(); */
644                         continue;
645                 }
646
647                 /* this is now treated as a block I/O request to end */
648                 blkreq = pending->request;
649                 pending->request = NULL;
650                 if (xsegbd_dev != pending->dev) {
651                         //FIXME maybe put request?
652                         XSEGLOG("xsegbd_dev != pending->dev");
653                         BUG_ON(1);
654                         continue;
655                 }
656                 pending->dev = NULL;
657                 if (!blkreq){
658                         //FIXME maybe put request?
659                         XSEGLOG("blkreq does not exist");
660                         BUG_ON(1);
661                         continue;
662                 }
663
664                 err = -EIO;
665                 if (!(xreq->state & XS_SERVED))
666                         goto blk_end;
667
668                 if (xreq->serviced != blk_rq_bytes(blkreq))
669                         goto blk_end;
670
671                 err = 0;
672                 /* unlock for data transfer? */
673                 if (!rq_data_dir(blkreq)){
674                         xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
675                 }       
676 blk_end:
677                 blk_end_request_all(blkreq, err);
678                 
679                 ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
680                                         blkreq_idx, xsegbd_dev->src_portno);
681                 if (ridx == Noneidx) {
682                         XSEGLOG("couldnt append blkreq_idx");
683                         WARN_ON(1);
684                 }
685
686                 if (xseg_put_request(xsegbd_dev->xseg, xreq, 
687                                                 xsegbd_dev->src_portno) < 0){
688                         XSEGLOG("couldn't put req");
689                         BUG_ON(1);
690                 }
691         }
692
693         if (xsegbd_dev) {
694                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
695                 xseg_request_fn(xsegbd_dev->blk_queue);
696                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
697         }
698 }
699
700
701 /* sysfs interface */
702
703 static struct bus_type xsegbd_bus_type = {
704         .name   = "xsegbd",
705 };
706
707 static ssize_t xsegbd_size_show(struct device *dev,
708                                         struct device_attribute *attr, char *buf)
709 {
710         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
711
712         return sprintf(buf, "%llu\n", (unsigned long long) xsegbd_dev->sectors * 512ULL);
713 }
714
715 static ssize_t xsegbd_major_show(struct device *dev,
716                                         struct device_attribute *attr, char *buf)
717 {
718         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
719
720         return sprintf(buf, "%d\n", xsegbd_dev->major);
721 }
722
723 static ssize_t xsegbd_srcport_show(struct device *dev,
724                                         struct device_attribute *attr, char *buf)
725 {
726         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
727
728         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->src_portno);
729 }
730
731 static ssize_t xsegbd_dstport_show(struct device *dev,
732                                         struct device_attribute *attr, char *buf)
733 {
734         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
735
736         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->dst_portno);
737 }
738
739 static ssize_t xsegbd_id_show(struct device *dev,
740                                         struct device_attribute *attr, char *buf)
741 {
742         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
743
744         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->id);
745 }
746
747 static ssize_t xsegbd_reqs_show(struct device *dev,
748                                         struct device_attribute *attr, char *buf)
749 {
750         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
751
752         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->nr_requests);
753 }
754
755 static ssize_t xsegbd_target_show(struct device *dev,
756                                         struct device_attribute *attr, char *buf)
757 {
758         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
759
760         return sprintf(buf, "%s\n", xsegbd_dev->target);
761 }
762
763 static ssize_t xsegbd_image_refresh(struct device *dev,
764                                         struct device_attribute *attr,
765                                         const char *buf,
766                                         size_t size)
767 {
768         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
769         int rc, ret = size;
770
771         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
772
773         rc = xsegbd_get_size(xsegbd_dev);
774         if (rc < 0) {
775                 ret = rc;
776                 goto out;
777         }
778
779         set_capacity(xsegbd_dev->gd, xsegbd_dev->sectors);
780
781 out:
782         mutex_unlock(&xsegbd_mutex);
783         return ret;
784 }
785
786 static ssize_t xsegbd_cleanup(struct device *dev,
787                                         struct device_attribute *attr,
788                                         const char *buf,
789                                         size_t size)
790 {
791         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
792         int ret = size, i;
793         struct request *blkreq = NULL;
794         struct xsegbd_pending *pending = NULL;
795         struct completion *comp = NULL;
796
797         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
798         xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, 
799                                 xsegbd_dev->src_portno);
800         for (i = 0; i < xsegbd_dev->nr_requests; i++) {
801                 if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
802                         pending = &xsegbd_dev->blk_req_pending[i];
803                         blkreq = pending->request;
804                         pending->request = NULL;
805                         comp = pending->comp;
806                         pending->comp = NULL;
807                         if (blkreq){
808                                 XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i);
809                                 blk_end_request_all(blkreq, -EIO);
810                         }
811                         if (comp){
812                                 XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i);
813                                 complete(comp);
814                         }
815                         __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
816                 }
817         }
818         xlock_release(&xsegbd_dev->blk_queue_pending.lock);
819
820         mutex_unlock(&xsegbd_mutex);
821         return ret;
822 }
823
824 static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
825 static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
826 static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
827 static DEVICE_ATTR(dstport, S_IRUGO, xsegbd_dstport_show, NULL);
828 static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL);
829 static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
830 static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
831 static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
832 static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
833
834 static struct attribute *xsegbd_attrs[] = {
835         &dev_attr_size.attr,
836         &dev_attr_major.attr,
837         &dev_attr_srcport.attr,
838         &dev_attr_dstport.attr,
839         &dev_attr_id.attr,
840         &dev_attr_reqs.attr,
841         &dev_attr_target.attr,
842         &dev_attr_refresh.attr,
843         &dev_attr_cleanup.attr,
844         NULL
845 };
846
847 static struct attribute_group xsegbd_attr_group = {
848         .attrs = xsegbd_attrs,
849 };
850
851 static const struct attribute_group *xsegbd_attr_groups[] = {
852         &xsegbd_attr_group,
853         NULL
854 };
855
856 static void xsegbd_sysfs_dev_release(struct device *dev)
857 {
858 }
859
860 static struct device_type xsegbd_device_type = {
861         .name           = "xsegbd",
862         .groups         = xsegbd_attr_groups,
863         .release        = xsegbd_sysfs_dev_release,
864 };
865
866 static void xsegbd_root_dev_release(struct device *dev)
867 {
868 }
869
870 static struct device xsegbd_root_dev = {
871         .init_name      = "xsegbd",
872         .release        = xsegbd_root_dev_release,
873 };
874
875 static int xsegbd_bus_add_dev(struct xsegbd_device *xsegbd_dev)
876 {
877         int ret = -ENOMEM;
878         struct device *dev;
879
880         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
881         dev = &xsegbd_dev->dev;
882
883         dev->bus = &xsegbd_bus_type;
884         dev->type = &xsegbd_device_type;
885         dev->parent = &xsegbd_root_dev;
886         dev->release = xsegbd_dev_release;
887         dev_set_name(dev, "%d", xsegbd_dev->id);
888
889         ret = device_register(dev);
890
891         mutex_unlock(&xsegbd_mutex);
892         return ret;
893 }
894
895 static void xsegbd_bus_del_dev(struct xsegbd_device *xsegbd_dev)
896 {
897         device_unregister(&xsegbd_dev->dev);
898 }
899
900 static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
901 {
902         struct xsegbd_device *xsegbd_dev;
903         struct xseg_port *port;
904         ssize_t ret = -ENOMEM;
905
906         if (!try_module_get(THIS_MODULE))
907                 return -ENODEV;
908
909         xsegbd_dev = kzalloc(sizeof(*xsegbd_dev), GFP_KERNEL);
910         if (!xsegbd_dev)
911                 goto out;
912
913         spin_lock_init(&xsegbd_dev->rqlock);
914         INIT_LIST_HEAD(&xsegbd_dev->node);
915
916         /* parse cmd */
917         if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
918                         "%d:%d:%d", xsegbd_dev->target, &xsegbd_dev->src_portno,
919                         &xsegbd_dev->dst_portno, &xsegbd_dev->nr_requests) < 3) {
920                 ret = -EINVAL;
921                 goto out_dev;
922         }
923         xsegbd_dev->targetlen = strlen(xsegbd_dev->target);
924
925         spin_lock(&xsegbd_devices_lock);
926         if (xsegbd_devices[xsegbd_dev->src_portno] != NULL) {
927                 ret = -EINVAL;
928                 goto out_unlock;
929         }
930         xsegbd_devices[xsegbd_dev->src_portno] = xsegbd_dev;
931         xsegbd_dev->id = xsegbd_dev->src_portno;
932         spin_unlock(&xsegbd_devices_lock);
933
934         XSEGLOG("registering block device major %d", major);
935         ret = register_blkdev(major, XSEGBD_NAME);
936         if (ret < 0) {
937                 XSEGLOG("cannot register block device!");
938                 ret = -EBUSY;
939                 goto out_delentry;
940         }
941         xsegbd_dev->major = ret;
942         XSEGLOG("registered block device major %d", xsegbd_dev->major);
943
944         ret = xsegbd_bus_add_dev(xsegbd_dev);
945         if (ret)
946                 goto out_blkdev;
947
948         if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending, 
949                                 xsegbd_dev->nr_requests,
950                                 xsegbd_dev->nr_requests))
951                 goto out_bus;
952
953         xsegbd_dev->blk_req_pending = kzalloc(
954                         xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
955                                    GFP_KERNEL);
956         if (!xsegbd_dev->blk_req_pending)
957                 goto out_freeq;
958
959         
960         XSEGLOG("joining segment");
961         //FIXME use xsebd module config for now
962         xsegbd_dev->xseg = xseg_join(   xsegbd.config.type,
963                                         xsegbd.config.name,
964                                         "segdev",
965                                         xseg_callback           );
966         if (!xsegbd_dev->xseg)
967                 goto out_freepending;
968         
969
970         XSEGLOG("binding to source port %u (destination %u)",
971                         xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
972         port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno);
973         if (!port) {
974                 XSEGLOG("cannot bind to port");
975                 ret = -EFAULT;
976
977                 goto out_xseg;
978         }
979         
980         if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
981                 XSEGLOG("portno != xsegbd_dev->src_portno");
982                 BUG_ON(1);
983                 ret = -EFAULT;
984                 goto out_xseg;
985         }
986         
987         /* make sure we don't get any requests until we're ready to handle them */
988         xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
989
990         ret = xsegbd_dev_init(xsegbd_dev);
991         if (ret)
992                 goto out_xseg;
993
994         xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
995         return count;
996
997 out_xseg:
998         xseg_leave(xsegbd_dev->xseg);
999         
1000 out_freepending:
1001         kfree(xsegbd_dev->blk_req_pending);
1002
1003 out_freeq:
1004         xq_free(&xsegbd_dev->blk_queue_pending);
1005
1006 out_bus:
1007         xsegbd_bus_del_dev(xsegbd_dev);
1008         return ret;
1009
1010 out_blkdev:
1011         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
1012
1013 out_delentry:
1014         spin_lock(&xsegbd_devices_lock);
1015         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
1016
1017 out_unlock:
1018         spin_unlock(&xsegbd_devices_lock);
1019
1020 out_dev:
1021         kfree(xsegbd_dev);
1022
1023 out:
1024         return ret;
1025 }
1026
1027 static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count)
1028 {
1029         struct xsegbd_device *xsegbd_dev = NULL;
1030         int id, ret;
1031         unsigned long ul_id;
1032
1033         ret = strict_strtoul(buf, 10, &ul_id);
1034         if (ret)
1035                 return ret;
1036
1037         id = (int) ul_id;
1038         if (id != ul_id)
1039                 return -EINVAL;
1040
1041         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
1042
1043         ret = count;
1044         xsegbd_dev = __xsegbd_get_dev(id);
1045         if (!xsegbd_dev) {
1046                 ret = -ENOENT;
1047                 goto out_unlock;
1048         }
1049         xsegbd_bus_del_dev(xsegbd_dev);
1050
1051 out_unlock:
1052         mutex_unlock(&xsegbd_mutex);
1053         return ret;
1054 }
1055
1056 static struct bus_attribute xsegbd_bus_attrs[] = {
1057         __ATTR(add, S_IWUSR, NULL, xsegbd_add),
1058         __ATTR(remove, S_IWUSR, NULL, xsegbd_remove),
1059         __ATTR_NULL
1060 };
1061
1062 static int xsegbd_sysfs_init(void)
1063 {
1064         int ret;
1065
1066         ret = device_register(&xsegbd_root_dev);
1067         if (ret < 0)
1068                 return ret;
1069
1070         xsegbd_bus_type.bus_attrs = xsegbd_bus_attrs;
1071         ret = bus_register(&xsegbd_bus_type);
1072         if (ret < 0)
1073                 device_unregister(&xsegbd_root_dev);
1074
1075         return ret;
1076 }
1077
1078 static void xsegbd_sysfs_cleanup(void)
1079 {
1080         bus_unregister(&xsegbd_bus_type);
1081         device_unregister(&xsegbd_root_dev);
1082 }
1083
1084 /* *************************** */
1085 /* ** Module Initialization ** */
1086 /* *************************** */
1087
1088 static int __init xsegbd_init(void)
1089 {
1090         int ret = -ENOMEM;
1091         xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL);
1092         if (!xsegbd_devices)
1093                 goto out;
1094
1095         spin_lock_init(&xsegbd_devices_lock);
1096
1097         ret = -ENOSYS;
1098         ret = xsegbd_xseg_init();
1099         if (ret)
1100                 goto out_free;
1101
1102         ret = xsegbd_sysfs_init();
1103         if (ret)
1104                 goto out_xseg;
1105
1106         XSEGLOG("initialization complete");
1107
1108 out:
1109         return ret;
1110
1111 out_xseg:
1112         xsegbd_xseg_quit();
1113         
1114 out_free:
1115         kfree(xsegbd_devices);
1116
1117         goto out;
1118 }
1119
1120 static void __exit xsegbd_exit(void)
1121 {
1122         xsegbd_sysfs_cleanup();
1123         xsegbd_xseg_quit();
1124 }
1125
1126 module_init(xsegbd_init);
1127 module_exit(xsegbd_exit);
1128