make vlmc map volumes from port 0
[archipelago] / xseg / peers / kernel / xsegbd.c
1 /* xsegbd.c
2  *
3  */
4
5 #include <linux/module.h>
6 #include <linux/moduleparam.h>
7 #include <linux/init.h>
8 #include <linux/sched.h>
9 #include <linux/kernel.h>
10 #include <linux/slab.h>
11 #include <linux/fs.h>
12 #include <linux/errno.h>
13 #include <linux/timer.h>
14 #include <linux/types.h>
15 #include <linux/vmalloc.h>
16 #include <linux/genhd.h>
17 #include <linux/blkdev.h>
18 #include <linux/bio.h>
19 #include <linux/device.h>
20 #include <linux/completion.h>
21 #include <linux/wait.h>
22 #include <sys/kernel/segdev.h>
23 #include "xsegbd.h"
24 #include <xseg/protocol.h>
25
26 #define XSEGBD_MINORS 1
27 /* define max request size to be used in xsegbd */
28 //FIXME should we make this 4MB instead of 256KB ?
29 //#define XSEGBD_MAX_REQUEST_SIZE 262144U
30 #define XSEGBD_MAX_REQUEST_SIZE 4194304U
31
32 MODULE_DESCRIPTION("xsegbd");
33 MODULE_AUTHOR("XSEG");
34 MODULE_LICENSE("GPL");
35
36 static long sector_size = 0;
37 static long blksize = 512;
38 static int major = 0;
39 static int max_dev = 1024;
40 static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd";
41 static char spec[256] = "segdev:xsegbd:4:1024:12";
42
43 module_param(sector_size, long, 0644);
44 module_param(blksize, long, 0644);
45 module_param(max_dev, int, 0644);
46 module_param(major, int, 0644);
47 module_param_string(name, name, sizeof(name), 0644);
48 module_param_string(spec, spec, sizeof(spec), 0644);
49
50 static struct xsegbd xsegbd;
51 static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
52 static DEFINE_MUTEX(xsegbd_mutex);
53 static DEFINE_SPINLOCK(xsegbd_devices_lock);
54
55
56 struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
57 {
58         struct xsegbd_device *xsegbd_dev = NULL;
59
60         spin_lock(&xsegbd_devices_lock);
61         xsegbd_dev = xsegbd_devices[id];
62         spin_unlock(&xsegbd_devices_lock);
63
64         return xsegbd_dev;
65 }
66
67 /* ************************* */
68 /* ***** sysfs helpers ***** */
69 /* ************************* */
70
71 static struct xsegbd_device *dev_to_xsegbd(struct device *dev)
72 {
73         return container_of(dev, struct xsegbd_device, dev);
74 }
75
76 static struct device *xsegbd_get_dev(struct xsegbd_device *xsegbd_dev)
77 {
78         /* FIXME */
79         return get_device(&xsegbd_dev->dev);
80 }
81
82 static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev)
83 {
84         put_device(&xsegbd_dev->dev);
85 }
86
87 /* ************************* */
88 /* ** XSEG Initialization ** */
89 /* ************************* */
90
91 static void xseg_callback(uint32_t portno);
92
93 int xsegbd_xseg_init(void)
94 {
95         int r;
96
97         if (!xsegbd.name[0])
98                 strncpy(xsegbd.name, name, XSEGBD_SEGMENT_NAMELEN);
99
100         r = xseg_initialize();
101         if (r) {
102                 XSEGLOG("cannot initialize 'segdev' peer");
103                 goto err;
104         }
105
106         r = xseg_parse_spec(spec, &xsegbd.config);
107         if (r)
108                 goto err;
109
110         if (strncmp(xsegbd.config.type, "segdev", 16))
111                 XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'",
112                          xsegbd.config.type);
113
114         /* leave it here for now */
115         XSEGLOG("joining segment");
116         xsegbd.xseg = xseg_join(        xsegbd.config.type,
117                                         xsegbd.config.name,
118                                         "segdev",
119                                         xseg_callback           );
120         if (!xsegbd.xseg) {
121                 XSEGLOG("cannot find segment");
122                 r = -ENODEV;
123                 goto err;
124         }
125
126         return 0;
127 err:
128         return r;
129
130 }
131
132 int xsegbd_xseg_quit(void)
133 {
134         struct segdev *segdev;
135
136         /* make sure to unmap the segment first */
137         segdev = segdev_get(0);
138         clear_bit(SEGDEV_RESERVED, &segdev->flags);
139         xsegbd.xseg->priv->segment_type.ops.unmap(xsegbd.xseg, xsegbd.xseg->segment_size);
140         segdev_put(segdev);
141
142         return 0;
143 }
144
145
146 /* ***************************** */
147 /* ** Block Device Operations ** */
148 /* ***************************** */
149
150 static int xsegbd_open(struct block_device *bdev, fmode_t mode)
151 {
152         struct gendisk *disk = bdev->bd_disk;
153         struct xsegbd_device *xsegbd_dev = disk->private_data;
154
155         xsegbd_get_dev(xsegbd_dev);
156
157         return 0;
158 }
159
160 static int xsegbd_release(struct gendisk *gd, fmode_t mode)
161 {
162         struct xsegbd_device *xsegbd_dev = gd->private_data;
163
164         xsegbd_put_dev(xsegbd_dev);
165
166         return 0;
167 }
168
169 static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode,
170                         unsigned int cmd, unsigned long arg)
171 {
172         return -ENOTTY;
173 }
174
175 static const struct block_device_operations xsegbd_ops = {
176         .owner          = THIS_MODULE,
177         .open           = xsegbd_open,
178         .release        = xsegbd_release,
179         .ioctl          = xsegbd_ioctl 
180 };
181
182
183 /* *************************** */
184 /* ** Device Initialization ** */
185 /* *************************** */
186
187 static void xseg_request_fn(struct request_queue *rq);
188 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev);
189 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev);
190
191 static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
192 {
193         int ret = -ENOMEM;
194         struct gendisk *disk;
195         unsigned int max_request_size_bytes;
196
197         spin_lock_init(&xsegbd_dev->rqlock);
198
199         xsegbd_dev->xsegbd = &xsegbd;
200
201         /* allocates and initializes queue */
202         xsegbd_dev->blk_queue = blk_init_queue(xseg_request_fn, &xsegbd_dev->rqlock);
203         if (!xsegbd_dev->blk_queue)
204                 goto out;
205
206         xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
207
208         blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
209         blk_queue_logical_block_size(xsegbd_dev->blk_queue, 512);
210         blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
211         blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
212         
213
214         max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
215         blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
216 //      blk_queue_max_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 10);
217         blk_queue_max_segments(xsegbd_dev->blk_queue, 1024);
218         blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
219         blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
220         blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
221
222         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue);
223
224         /* vkoukis says we don't need partitions */
225         xsegbd_dev->gd = disk = alloc_disk(1);
226         if (!disk)
227                 goto out;
228
229         disk->major = xsegbd_dev->major;
230         disk->first_minor = 0; // id * XSEGBD_MINORS;
231         disk->fops = &xsegbd_ops;
232         disk->queue = xsegbd_dev->blk_queue;
233         disk->private_data = xsegbd_dev;
234         disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO;
235         snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id);
236
237         ret = 0;
238
239         /* allow a non-zero sector_size parameter to override the disk size */
240         if (sector_size)
241                 xsegbd_dev->sectors = sector_size;
242         else {
243                 ret = xsegbd_get_size(xsegbd_dev);
244                 if (ret)
245                         goto out;
246         }
247
248         set_capacity(disk, xsegbd_dev->sectors);
249         XSEGLOG("xsegbd active...");
250         add_disk(disk); /* immediately activates the device */
251
252 out:
253         /* on error, everything is cleaned up in xsegbd_dev_release */
254         return ret;
255 }
256
257 static void xsegbd_dev_release(struct device *dev)
258 {
259         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
260
261
262         /* cleanup gendisk and blk_queue the right way */
263         if (xsegbd_dev->gd) {
264                 if (xsegbd_dev->gd->flags & GENHD_FL_UP)
265                         del_gendisk(xsegbd_dev->gd);
266
267                 xsegbd_mapclose(xsegbd_dev);
268         }
269
270         spin_lock(&xsegbd_devices_lock);
271         BUG_ON(xsegbd_devices[xsegbd_dev->src_portno] != xsegbd_dev);
272         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
273         spin_unlock(&xsegbd_devices_lock);
274
275         XSEGLOG("releasing id: %d", xsegbd_dev->id);
276 //      xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
277         xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
278
279         if (xsegbd_dev->blk_queue)
280                 blk_cleanup_queue(xsegbd_dev->blk_queue);
281         if (xsegbd_dev->gd)
282                 put_disk(xsegbd_dev->gd);
283
284 //      if (xseg_free_requests(xsegbd_dev->xseg, 
285 //                      xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
286 //              XSEGLOG("Error trying to free requests!\n");
287
288         if (xsegbd_dev->xseg){
289                 xseg_leave(xsegbd_dev->xseg);
290                 xsegbd_dev->xseg = NULL;
291         }
292
293         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
294
295         if (xsegbd_dev->blk_req_pending){
296                 kfree(xsegbd_dev->blk_req_pending);
297                 xsegbd_dev->blk_req_pending = NULL;
298         }
299         xq_free(&xsegbd_dev->blk_queue_pending);
300         kfree(xsegbd_dev);
301         module_put(THIS_MODULE);
302 }
303
304 /* ******************* */
305 /* ** Critical Path ** */
306 /* ******************* */
307
308 static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq,
309                         struct request *blkreq)
310 {
311         struct bio_vec *bvec;
312         struct req_iterator iter;
313         uint64_t off = 0;
314         char *data = xseg_get_data(xseg, xreq);
315         rq_for_each_segment(bvec, blkreq, iter) {
316                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
317                 memcpy(data + off, bdata, bvec->bv_len);
318                 off += bvec->bv_len;
319                 kunmap_atomic(bdata);
320         }
321 }
322
323 static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq,
324                         struct request *blkreq)
325 {
326         struct bio_vec *bvec;
327         struct req_iterator iter;
328         uint64_t off = 0;
329         char *data = xseg_get_data(xseg, xreq);
330         rq_for_each_segment(bvec, blkreq, iter) {
331                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
332                 memcpy(bdata, data + off, bvec->bv_len);
333                 off += bvec->bv_len;
334                 kunmap_atomic(bdata);
335         }
336 }
337
338 static void xseg_request_fn(struct request_queue *rq)
339 {
340         struct xseg_request *xreq;
341         struct xsegbd_device *xsegbd_dev = rq->queuedata;
342         struct request *blkreq;
343         struct xsegbd_pending *pending;
344         xqindex blkreq_idx;
345         char *target;
346         uint64_t datalen;
347         xport p;
348         int r;
349         unsigned long flags;
350
351         spin_unlock_irq(&xsegbd_dev->rqlock);
352         for (;;) {
353                 if (current_thread_info()->preempt_count || irqs_disabled()){
354                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
355                                         current_thread_info()->preempt_count, irqs_disabled());
356                 }
357                 //XSEGLOG("Priority: %d", current_thread_info()->task->prio);
358                 //XSEGLOG("Static priority: %d", current_thread_info()->task->static_prio);
359                 //XSEGLOG("Normal priority: %d", current_thread_info()->task->normal_prio);
360                 //XSEGLOG("Rt_priority: %u", current_thread_info()->task->rt_priority);
361                 blkreq_idx = Noneidx;
362                 xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, 
363                                 xsegbd_dev->dst_portno, X_ALLOC);
364                 if (!xreq)
365                         break;
366
367                 blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 
368                                                 xsegbd_dev->src_portno);
369                 if (blkreq_idx == Noneidx)
370                         break;
371
372                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
373                         XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
374                         BUG_ON(1);
375                         break;
376                 }
377
378
379                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
380                 blkreq = blk_fetch_request(rq);
381                 if (!blkreq){
382                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
383                         break;
384                 }
385
386                 if (blkreq->cmd_type != REQ_TYPE_FS) {
387                         //FIXME we lose xreq here
388                         XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
389                         __blk_end_request_all(blkreq, 0);
390                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
391                         continue;
392                 }
393                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
394                 if (current_thread_info()->preempt_count || irqs_disabled()){
395                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
396                                         current_thread_info()->preempt_count, irqs_disabled());
397                 }
398
399                 datalen = blk_rq_bytes(blkreq);
400                 r = xseg_prep_request(xsegbd_dev->xseg, xreq, 
401                                         xsegbd_dev->targetlen, datalen);
402                 if (r < 0) {
403                         XSEGLOG("couldn't prep request");
404                         blk_end_request_err(blkreq, r);
405                         BUG_ON(1);
406                         break;
407                 }
408                 r = -ENOMEM;
409                 if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
410                         XSEGLOG("malformed req buffers");
411                         blk_end_request_err(blkreq, r);
412                         BUG_ON(1);
413                         break;
414                 }
415
416                 target = xseg_get_target(xsegbd_dev->xseg, xreq);
417                 strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
418
419                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
420                 pending->dev = xsegbd_dev;
421                 pending->request = blkreq;
422                 pending->comp = NULL;
423
424                 xreq->size = datalen;
425                 xreq->offset = blk_rq_pos(blkreq) << 9;
426                 xreq->priv = (uint64_t) blkreq_idx;
427
428                 /*
429                 if (xreq->offset >= (sector_size << 9))
430                         XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
431                                  blk_rq_pos(blkreq), sector_size,
432                                  blkreq->cmd_flags & REQ_FLUSH,
433                                  blkreq->cmd_flags & REQ_FUA);
434                 */
435
436                 if (blkreq->cmd_flags & REQ_FLUSH)
437                         xreq->flags |= XF_FLUSH;
438
439                 if (blkreq->cmd_flags & REQ_FUA)
440                         xreq->flags |= XF_FUA;
441
442                 if (rq_data_dir(blkreq)) {
443                         blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
444                         xreq->op = X_WRITE;
445                 } else {
446                         xreq->op = X_READ;
447                 }
448
449
450 //              XSEGLOG("%s : %lu (%lu)", xsegbd_dev->target, xreq->offset, xreq->datalen);
451                 r = -EIO;
452                 p = xseg_submit(xsegbd_dev->xseg, xreq, 
453                                         xsegbd_dev->src_portno, X_ALLOC);
454                 if (p == NoPort) {
455                         XSEGLOG("coundn't submit req");
456                         WARN_ON(1);
457                         blk_end_request_err(blkreq, r);
458                         break;
459                 }
460                 WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
461         }
462         if (xreq)
463                 BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
464                                         xsegbd_dev->src_portno) == -1);
465         if (blkreq_idx != Noneidx)
466                 BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
467                                 blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
468         spin_lock_irq(&xsegbd_dev->rqlock);
469 }
470
471 int update_dev_sectors_from_request(    struct xsegbd_device *xsegbd_dev,
472                                         struct xseg_request *xreq       )
473 {
474         void *data;
475         if (!xreq) {
476                 XSEGLOG("Invalid xreq");
477                 return -EIO;
478         }
479
480         if (xreq->state & XS_FAILED)
481                 return -ENOENT;
482
483         if (!(xreq->state & XS_SERVED))
484                 return -EIO;
485
486         data = xseg_get_data(xsegbd_dev->xseg, xreq);
487         if (!data) {
488                 XSEGLOG("Invalid req data");
489                 return -EIO;
490         }
491         if (!xsegbd_dev) {
492                 XSEGLOG("Invalid xsegbd_dev");
493                 return -ENOENT;
494         }
495         xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL;
496         return 0;
497 }
498
499 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
500 {
501         struct xseg_request *xreq;
502         char *target;
503         xqindex blkreq_idx;
504         struct xsegbd_pending *pending;
505         struct completion comp;
506         xport p;
507         int ret = -EBUSY;
508
509         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
510                         xsegbd_dev->dst_portno, X_ALLOC);
511         if (!xreq)
512                 goto out;
513
514         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 
515                                 sizeof(struct xseg_reply_info)));
516
517         init_completion(&comp);
518         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
519         if (blkreq_idx == Noneidx)
520                 goto out_put;
521
522         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
523         pending->dev = xsegbd_dev;
524         pending->request = NULL;
525         pending->comp = &comp;
526
527
528         xreq->priv = (uint64_t) blkreq_idx;
529
530         target = xseg_get_target(xsegbd_dev->xseg, xreq);
531         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
532         xreq->size = xreq->datalen;
533         xreq->offset = 0;
534         xreq->op = X_INFO;
535
536         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
537         p = xseg_submit(xsegbd_dev->xseg, xreq,
538                                 xsegbd_dev->src_portno, X_ALLOC);
539         if ( p == NoPort) {
540                 XSEGLOG("couldn't submit request");
541                 BUG_ON(1);
542                 goto out_queue;
543         }
544         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
545         XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
546         wait_for_completion_interruptible(&comp);
547         XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
548         ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
549         XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
550
551 out_queue:
552         pending->dev = NULL;
553         pending->comp = NULL;
554         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
555 out_put:
556         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
557 out:
558         return ret;
559 }
560
561 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
562 {
563         struct xseg_request *xreq;
564         char *target;
565         xqindex blkreq_idx;
566         struct xsegbd_pending *pending;
567         struct completion comp;
568         xport p;
569         int ret = -EBUSY;
570
571         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
572                         xsegbd_dev->dst_portno, X_ALLOC);
573         if (!xreq)
574                 goto out;
575
576         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
577
578         init_completion(&comp);
579         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
580         if (blkreq_idx == Noneidx)
581                 goto out_put;
582
583         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
584         pending->dev = xsegbd_dev;
585         pending->request = NULL;
586         pending->comp = &comp;
587
588
589         xreq->priv = (uint64_t) blkreq_idx;
590
591         target = xseg_get_target(xsegbd_dev->xseg, xreq);
592         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
593         xreq->size = xreq->datalen;
594         xreq->offset = 0;
595         xreq->op = X_CLOSE;
596
597         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
598         p = xseg_submit(xsegbd_dev->xseg, xreq, 
599                                 xsegbd_dev->src_portno, X_ALLOC);
600         if ( p == NoPort) {
601                 XSEGLOG("couldn't submit request");
602                 BUG_ON(1);
603                 goto out_queue;
604         }
605         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
606         wait_for_completion_interruptible(&comp);
607         ret = 0;
608         if (xreq->state & XS_FAILED)
609                 XSEGLOG("Couldn't close disk on mapper");
610
611 out_queue:
612         pending->dev = NULL;
613         pending->comp = NULL;
614         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
615 out_put:
616         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
617 out:
618         return ret;
619 }
620
621 static void xseg_callback(xport portno)
622 {
623         struct xsegbd_device *xsegbd_dev;
624         struct xseg_request *xreq;
625         struct request *blkreq;
626         struct xsegbd_pending *pending;
627         unsigned long flags;
628         xqindex blkreq_idx, ridx;
629         int err;
630
631         xsegbd_dev  = __xsegbd_get_dev(portno);
632         if (!xsegbd_dev) {
633                 XSEGLOG("portno: %u has no xsegbd device assigned", portno);
634                 WARN_ON(1);
635                 return;
636         }
637
638         for (;;) {
639                 xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
640                 xreq = xseg_receive(xsegbd_dev->xseg, portno, 0);
641                 if (!xreq)
642                         break;
643
644 //              xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
645
646                 blkreq_idx = (xqindex) xreq->priv;
647                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
648                         WARN_ON(1);
649                         //FIXME maybe put request?
650                         continue;
651                 }
652
653                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
654                 if (pending->comp) {
655                         /* someone is blocking on this request
656                            and will handle it when we wake them up. */
657                         complete(pending->comp);
658                         /* the request is blocker's responsibility so
659                            we will not put_request(); */
660                         continue;
661                 }
662
663                 /* this is now treated as a block I/O request to end */
664                 blkreq = pending->request;
665                 pending->request = NULL;
666                 if (xsegbd_dev != pending->dev) {
667                         //FIXME maybe put request?
668                         XSEGLOG("xsegbd_dev != pending->dev");
669                         WARN_ON(1);
670                         continue;
671                 }
672                 pending->dev = NULL;
673                 if (!blkreq){
674                         //FIXME maybe put request?
675                         XSEGLOG("blkreq does not exist");
676                         WARN_ON(1);
677                         continue;
678                 }
679
680                 err = -EIO;
681                 if (!(xreq->state & XS_SERVED))
682                         goto blk_end;
683
684                 if (xreq->serviced != blk_rq_bytes(blkreq))
685                         goto blk_end;
686
687                 err = 0;
688                 if (!rq_data_dir(blkreq)){
689                         xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
690                 }
691 blk_end:
692                 blk_end_request_all(blkreq, err);
693
694                 ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
695                                         blkreq_idx, xsegbd_dev->src_portno);
696                 if (ridx == Noneidx) {
697                         XSEGLOG("couldnt append blkreq_idx");
698                         WARN_ON(1);
699                 }
700
701                 if (xseg_put_request(xsegbd_dev->xseg, xreq, 
702                                                 xsegbd_dev->src_portno) < 0){
703                         XSEGLOG("couldn't put req");
704                         BUG_ON(1);
705                 }
706         }
707         if (xsegbd_dev) {
708                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
709                 xseg_request_fn(xsegbd_dev->blk_queue);
710                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
711         }
712 }
713
714
715 /* sysfs interface */
716
717 static struct bus_type xsegbd_bus_type = {
718         .name   = "xsegbd",
719 };
720
721 static ssize_t xsegbd_size_show(struct device *dev,
722                                         struct device_attribute *attr, char *buf)
723 {
724         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
725
726         return sprintf(buf, "%llu\n", (unsigned long long) xsegbd_dev->sectors * 512ULL);
727 }
728
729 static ssize_t xsegbd_major_show(struct device *dev,
730                                         struct device_attribute *attr, char *buf)
731 {
732         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
733
734         return sprintf(buf, "%d\n", xsegbd_dev->major);
735 }
736
737 static ssize_t xsegbd_srcport_show(struct device *dev,
738                                         struct device_attribute *attr, char *buf)
739 {
740         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
741
742         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->src_portno);
743 }
744
745 static ssize_t xsegbd_dstport_show(struct device *dev,
746                                         struct device_attribute *attr, char *buf)
747 {
748         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
749
750         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->dst_portno);
751 }
752
753 static ssize_t xsegbd_id_show(struct device *dev,
754                                         struct device_attribute *attr, char *buf)
755 {
756         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
757
758         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->id);
759 }
760
761 static ssize_t xsegbd_reqs_show(struct device *dev,
762                                         struct device_attribute *attr, char *buf)
763 {
764         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
765
766         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->nr_requests);
767 }
768
769 static ssize_t xsegbd_target_show(struct device *dev,
770                                         struct device_attribute *attr, char *buf)
771 {
772         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
773
774         return sprintf(buf, "%s\n", xsegbd_dev->target);
775 }
776
777 static ssize_t xsegbd_image_refresh(struct device *dev,
778                                         struct device_attribute *attr,
779                                         const char *buf,
780                                         size_t size)
781 {
782         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
783         int rc, ret = size;
784
785         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
786
787         rc = xsegbd_get_size(xsegbd_dev);
788         if (rc < 0) {
789                 ret = rc;
790                 goto out;
791         }
792
793         set_capacity(xsegbd_dev->gd, xsegbd_dev->sectors);
794
795 out:
796         mutex_unlock(&xsegbd_mutex);
797         return ret;
798 }
799
800 //FIXME
801 //maybe try callback, first and then do a more invasive cleanup
802 static ssize_t xsegbd_cleanup(struct device *dev,
803                                         struct device_attribute *attr,
804                                         const char *buf,
805                                         size_t size)
806 {
807         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
808         int ret = size, i;
809         struct request *blkreq = NULL;
810         struct xsegbd_pending *pending = NULL;
811         struct completion *comp = NULL;
812
813         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
814         xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, 
815                                 xsegbd_dev->src_portno);
816         for (i = 0; i < xsegbd_dev->nr_requests; i++) {
817                 if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
818                         pending = &xsegbd_dev->blk_req_pending[i];
819                         blkreq = pending->request;
820                         pending->request = NULL;
821                         comp = pending->comp;
822                         pending->comp = NULL;
823                         if (blkreq){
824                                 XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i);
825                                 blk_end_request_all(blkreq, -EIO);
826                         }
827                         if (comp){
828                                 XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i);
829                                 complete(comp);
830                         }
831                         __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
832                 }
833         }
834         xlock_release(&xsegbd_dev->blk_queue_pending.lock);
835
836         mutex_unlock(&xsegbd_mutex);
837         return ret;
838 }
839
840 static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
841 static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
842 static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
843 static DEVICE_ATTR(dstport, S_IRUGO, xsegbd_dstport_show, NULL);
844 static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL);
845 static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
846 static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
847 static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
848 static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
849
850 static struct attribute *xsegbd_attrs[] = {
851         &dev_attr_size.attr,
852         &dev_attr_major.attr,
853         &dev_attr_srcport.attr,
854         &dev_attr_dstport.attr,
855         &dev_attr_id.attr,
856         &dev_attr_reqs.attr,
857         &dev_attr_target.attr,
858         &dev_attr_refresh.attr,
859         &dev_attr_cleanup.attr,
860         NULL
861 };
862
863 static struct attribute_group xsegbd_attr_group = {
864         .attrs = xsegbd_attrs,
865 };
866
867 static const struct attribute_group *xsegbd_attr_groups[] = {
868         &xsegbd_attr_group,
869         NULL
870 };
871
872 static void xsegbd_sysfs_dev_release(struct device *dev)
873 {
874 }
875
876 static struct device_type xsegbd_device_type = {
877         .name           = "xsegbd",
878         .groups         = xsegbd_attr_groups,
879         .release        = xsegbd_sysfs_dev_release,
880 };
881
882 static void xsegbd_root_dev_release(struct device *dev)
883 {
884 }
885
886 static struct device xsegbd_root_dev = {
887         .init_name      = "xsegbd",
888         .release        = xsegbd_root_dev_release,
889 };
890
891 static int xsegbd_bus_add_dev(struct xsegbd_device *xsegbd_dev)
892 {
893         int ret = -ENOMEM;
894         struct device *dev;
895
896         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
897         dev = &xsegbd_dev->dev;
898
899         dev->bus = &xsegbd_bus_type;
900         dev->type = &xsegbd_device_type;
901         dev->parent = &xsegbd_root_dev;
902         dev->release = xsegbd_dev_release;
903         dev_set_name(dev, "%d", xsegbd_dev->id);
904
905         ret = device_register(dev);
906
907         mutex_unlock(&xsegbd_mutex);
908         return ret;
909 }
910
911 static void xsegbd_bus_del_dev(struct xsegbd_device *xsegbd_dev)
912 {
913         device_unregister(&xsegbd_dev->dev);
914 }
915
916 static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
917 {
918         struct xsegbd_device *xsegbd_dev;
919         struct xseg_port *port;
920         ssize_t ret = -ENOMEM;
921
922         if (!try_module_get(THIS_MODULE))
923                 return -ENODEV;
924
925         xsegbd_dev = kzalloc(sizeof(*xsegbd_dev), GFP_KERNEL);
926         if (!xsegbd_dev)
927                 goto out;
928
929         spin_lock_init(&xsegbd_dev->rqlock);
930         INIT_LIST_HEAD(&xsegbd_dev->node);
931
932         /* parse cmd */
933         if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
934                         "%d:%d:%d", xsegbd_dev->target, &xsegbd_dev->src_portno,
935                         &xsegbd_dev->dst_portno, &xsegbd_dev->nr_requests) < 3) {
936                 ret = -EINVAL;
937                 goto out_dev;
938         }
939         xsegbd_dev->targetlen = strlen(xsegbd_dev->target);
940
941         spin_lock(&xsegbd_devices_lock);
942         if (xsegbd_devices[xsegbd_dev->src_portno] != NULL) {
943                 ret = -EINVAL;
944                 goto out_unlock;
945         }
946         xsegbd_devices[xsegbd_dev->src_portno] = xsegbd_dev;
947         xsegbd_dev->id = xsegbd_dev->src_portno;
948         spin_unlock(&xsegbd_devices_lock);
949
950         XSEGLOG("registering block device major %d", major);
951         ret = register_blkdev(major, XSEGBD_NAME);
952         if (ret < 0) {
953                 XSEGLOG("cannot register block device!");
954                 ret = -EBUSY;
955                 goto out_delentry;
956         }
957         xsegbd_dev->major = ret;
958         XSEGLOG("registered block device major %d", xsegbd_dev->major);
959
960         ret = xsegbd_bus_add_dev(xsegbd_dev);
961         if (ret)
962                 goto out_blkdev;
963
964         if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending, 
965                                 xsegbd_dev->nr_requests,
966                                 xsegbd_dev->nr_requests))
967                 goto out_bus;
968
969         xsegbd_dev->blk_req_pending = kzalloc(
970                         xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
971                                    GFP_KERNEL);
972         if (!xsegbd_dev->blk_req_pending)
973                 goto out_bus;
974
975         
976         XSEGLOG("joining segment");
977         //FIXME use xsebd module config for now
978         xsegbd_dev->xseg = xseg_join(   xsegbd.config.type,
979                                         xsegbd.config.name,
980                                         "segdev",
981                                         xseg_callback           );
982         if (!xsegbd_dev->xseg)
983                 goto out_bus;
984         
985         XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target,
986                         xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
987         port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
988         if (!port) {
989                 XSEGLOG("cannot bind to port");
990                 ret = -EFAULT;
991
992                 goto out_bus;
993         }
994         
995         if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
996                 XSEGLOG("portno != xsegbd_dev->src_portno");
997                 BUG_ON(1);
998                 ret = -EFAULT;
999                 goto out_bus;
1000         }
1001         xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
1002
1003
1004         /* make sure we don't get any requests until we're ready to handle them */
1005         xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1006
1007         ret = xsegbd_dev_init(xsegbd_dev);
1008         if (ret)
1009                 goto out_bus;
1010
1011         xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1012         return count;
1013
1014 out_bus:
1015         xsegbd_bus_del_dev(xsegbd_dev);
1016         return ret;
1017
1018 out_blkdev:
1019         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
1020
1021 out_delentry:
1022         spin_lock(&xsegbd_devices_lock);
1023         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
1024
1025 out_unlock:
1026         spin_unlock(&xsegbd_devices_lock);
1027
1028 out_dev:
1029         kfree(xsegbd_dev);
1030
1031 out:
1032         return ret;
1033 }
1034
1035 static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count)
1036 {
1037         struct xsegbd_device *xsegbd_dev = NULL;
1038         int id, ret;
1039         unsigned long ul_id;
1040
1041         ret = strict_strtoul(buf, 10, &ul_id);
1042         if (ret)
1043                 return ret;
1044
1045         id = (int) ul_id;
1046         if (id != ul_id)
1047                 return -EINVAL;
1048
1049         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
1050
1051         ret = count;
1052         xsegbd_dev = __xsegbd_get_dev(id);
1053         if (!xsegbd_dev) {
1054                 ret = -ENOENT;
1055                 goto out_unlock;
1056         }
1057         xsegbd_bus_del_dev(xsegbd_dev);
1058
1059 out_unlock:
1060         mutex_unlock(&xsegbd_mutex);
1061         return ret;
1062 }
1063
1064 static struct bus_attribute xsegbd_bus_attrs[] = {
1065         __ATTR(add, S_IWUSR, NULL, xsegbd_add),
1066         __ATTR(remove, S_IWUSR, NULL, xsegbd_remove),
1067         __ATTR_NULL
1068 };
1069
1070 static int xsegbd_sysfs_init(void)
1071 {
1072         int ret;
1073
1074         ret = device_register(&xsegbd_root_dev);
1075         if (ret < 0)
1076                 return ret;
1077
1078         xsegbd_bus_type.bus_attrs = xsegbd_bus_attrs;
1079         ret = bus_register(&xsegbd_bus_type);
1080         if (ret < 0)
1081                 device_unregister(&xsegbd_root_dev);
1082
1083         return ret;
1084 }
1085
1086 static void xsegbd_sysfs_cleanup(void)
1087 {
1088         bus_unregister(&xsegbd_bus_type);
1089         device_unregister(&xsegbd_root_dev);
1090 }
1091
1092 /* *************************** */
1093 /* ** Module Initialization ** */
1094 /* *************************** */
1095
1096 static int __init xsegbd_init(void)
1097 {
1098         int ret = -ENOMEM;
1099         xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL);
1100         if (!xsegbd_devices)
1101                 goto out;
1102
1103         spin_lock_init(&xsegbd_devices_lock);
1104
1105         ret = -ENOSYS;
1106         ret = xsegbd_xseg_init();
1107         if (ret)
1108                 goto out_free;
1109
1110         ret = xsegbd_sysfs_init();
1111         if (ret)
1112                 goto out_xseg;
1113
1114         XSEGLOG("initialization complete");
1115
1116 out:
1117         return ret;
1118
1119 out_xseg:
1120         xsegbd_xseg_quit();
1121         
1122 out_free:
1123         kfree(xsegbd_devices);
1124
1125         goto out;
1126 }
1127
1128 static void __exit xsegbd_exit(void)
1129 {
1130         xsegbd_sysfs_cleanup();
1131         xsegbd_xseg_quit();
1132 }
1133
1134 module_init(xsegbd_init);
1135 module_exit(xsegbd_exit);
1136