make peers verbose by default
[archipelago] / xseg / peers / kernel / xsegbd.c
1 /* xsegbd.c
2  *
3  */
4
5 #include <linux/module.h>
6 #include <linux/moduleparam.h>
7 #include <linux/init.h>
8 #include <linux/sched.h>
9 #include <linux/kernel.h>
10 #include <linux/slab.h>
11 #include <linux/fs.h>
12 #include <linux/errno.h>
13 #include <linux/timer.h>
14 #include <linux/types.h>
15 #include <linux/vmalloc.h>
16 #include <linux/genhd.h>
17 #include <linux/blkdev.h>
18 #include <linux/bio.h>
19 #include <linux/device.h>
20 #include <linux/completion.h>
21 #include <linux/wait.h>
22 #include <sys/kernel/segdev.h>
23 #include "xsegbd.h"
24 #include <xseg/protocol.h>
25
26 #define XSEGBD_MINORS 1
27 /* define max request size to be used in xsegbd */
28 //FIXME should we make this 4MB instead of 256KB ?
29 //#define XSEGBD_MAX_REQUEST_SIZE 262144U
30 #define XSEGBD_MAX_REQUEST_SIZE 4194304U
31
32 MODULE_DESCRIPTION("xsegbd");
33 MODULE_AUTHOR("XSEG");
34 MODULE_LICENSE("GPL");
35
36 static long sector_size = 0;
37 static long blksize = 512;
38 static int major = 0;
39 static int max_dev = 1024;
40 static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd";
41 static char spec[256] = "segdev:xsegbd:4:1024:12";
42
43 module_param(sector_size, long, 0644);
44 module_param(blksize, long, 0644);
45 module_param(max_dev, int, 0644);
46 module_param(major, int, 0644);
47 module_param_string(name, name, sizeof(name), 0644);
48 module_param_string(spec, spec, sizeof(spec), 0644);
49
50 static struct xsegbd xsegbd;
51 static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
52 static DEFINE_MUTEX(xsegbd_mutex);
53 static DEFINE_SPINLOCK(xsegbd_devices_lock);
54
55
56 void __xsegbd_get(struct xsegbd_device *xsegbd_dev)
57 {
58         atomic_inc(&xsegbd_dev->usercount);
59 }
60
61 void __xsegbd_put(struct xsegbd_device *xsegbd_dev)
62 {
63         if (atomic_dec_and_test(&xsegbd_dev->usercount))
64                 wake_up(&xsegbd_dev->wq);
65 }
66
67 struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
68 {
69         struct xsegbd_device *xsegbd_dev = NULL;
70
71         spin_lock(&xsegbd_devices_lock);
72         xsegbd_dev = xsegbd_devices[id];
73         if (xsegbd_dev)
74                 __xsegbd_get(xsegbd_dev);
75         spin_unlock(&xsegbd_devices_lock);
76
77         return xsegbd_dev;
78 }
79
80 /* ************************* */
81 /* ***** sysfs helpers ***** */
82 /* ************************* */
83
84 static struct xsegbd_device *dev_to_xsegbd(struct device *dev)
85 {
86         return container_of(dev, struct xsegbd_device, dev);
87 }
88
89 static struct device *xsegbd_get_dev(struct xsegbd_device *xsegbd_dev)
90 {
91         /* FIXME */
92         return get_device(&xsegbd_dev->dev);
93 }
94
95 static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev)
96 {
97         put_device(&xsegbd_dev->dev);
98 }
99
100 /* ************************* */
101 /* ** XSEG Initialization ** */
102 /* ************************* */
103
104 static void xseg_callback(uint32_t portno);
105
106 int xsegbd_xseg_init(void)
107 {
108         int r;
109
110         if (!xsegbd.name[0])
111                 strncpy(xsegbd.name, name, XSEGBD_SEGMENT_NAMELEN);
112
113         r = xseg_initialize();
114         if (r) {
115                 XSEGLOG("cannot initialize 'segdev' peer");
116                 goto err;
117         }
118
119         r = xseg_parse_spec(spec, &xsegbd.config);
120         if (r)
121                 goto err;
122
123         if (strncmp(xsegbd.config.type, "segdev", 16))
124                 XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'",
125                          xsegbd.config.type);
126
127         /* leave it here for now */
128         XSEGLOG("joining segment");
129         xsegbd.xseg = xseg_join(        xsegbd.config.type,
130                                         xsegbd.config.name,
131                                         "segdev",
132                                         xseg_callback           );
133         if (!xsegbd.xseg) {
134                 XSEGLOG("cannot find segment");
135                 r = -ENODEV;
136                 goto err;
137         }
138
139         return 0;
140 err:
141         return r;
142
143 }
144
145 int xsegbd_xseg_quit(void)
146 {
147         struct segdev *segdev;
148
149         /* make sure to unmap the segment first */
150         segdev = segdev_get(0);
151         clear_bit(SEGDEV_RESERVED, &segdev->flags);
152         xsegbd.xseg->priv->segment_type.ops.unmap(xsegbd.xseg, xsegbd.xseg->segment_size);
153         segdev_put(segdev);
154
155         return 0;
156 }
157
158
159 /* ***************************** */
160 /* ** Block Device Operations ** */
161 /* ***************************** */
162
163 static int xsegbd_open(struct block_device *bdev, fmode_t mode)
164 {
165         struct gendisk *disk = bdev->bd_disk;
166         struct xsegbd_device *xsegbd_dev = disk->private_data;
167
168         xsegbd_get_dev(xsegbd_dev);
169
170         return 0;
171 }
172
173 static int xsegbd_release(struct gendisk *gd, fmode_t mode)
174 {
175         struct xsegbd_device *xsegbd_dev = gd->private_data;
176
177         xsegbd_put_dev(xsegbd_dev);
178
179         return 0;
180 }
181
182 static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode,
183                         unsigned int cmd, unsigned long arg)
184 {
185         return -ENOTTY;
186 }
187
188 static const struct block_device_operations xsegbd_ops = {
189         .owner          = THIS_MODULE,
190         .open           = xsegbd_open,
191         .release        = xsegbd_release,
192         .ioctl          = xsegbd_ioctl 
193 };
194
195
196 /* *************************** */
197 /* ** Device Initialization ** */
198 /* *************************** */
199
200 static void xseg_request_fn(struct request_queue *rq);
201 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev);
202 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev);
203
204 static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
205 {
206         int ret = -ENOMEM;
207         struct gendisk *disk;
208         unsigned int max_request_size_bytes;
209
210         spin_lock_init(&xsegbd_dev->rqlock);
211
212         xsegbd_dev->xsegbd = &xsegbd;
213
214         xsegbd_dev->blk_queue = blk_alloc_queue(GFP_KERNEL);
215         if (!xsegbd_dev->blk_queue)
216                 goto out;
217
218         if (!blk_init_allocated_queue(xsegbd_dev->blk_queue, 
219                         xseg_request_fn, &xsegbd_dev->rqlock))
220                 goto outqueue;
221
222         xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
223
224         blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
225         blk_queue_logical_block_size(xsegbd_dev->blk_queue, 512);
226         blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
227         blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
228         
229         //blk_queue_max_segments(dev->blk_queue, 512);
230
231         max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
232         blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
233 //      blk_queue_max_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 10);
234         blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
235         blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
236         blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
237
238         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue);
239
240         /* vkoukis says we don't need partitions */
241         xsegbd_dev->gd = disk = alloc_disk(1);
242         if (!disk)
243                 goto outqueue;
244
245         disk->major = xsegbd_dev->major;
246         disk->first_minor = 0; // id * XSEGBD_MINORS;
247         disk->fops = &xsegbd_ops;
248         disk->queue = xsegbd_dev->blk_queue;
249         disk->private_data = xsegbd_dev;
250         disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO;
251         snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id);
252
253         ret = 0;
254         
255         /* allow a non-zero sector_size parameter to override the disk size */
256         if (sector_size)
257                 xsegbd_dev->sectors = sector_size;
258         else {
259                 ret = xsegbd_get_size(xsegbd_dev);
260                 if (ret)
261                         goto outdisk;
262         }
263
264         set_capacity(disk, xsegbd_dev->sectors);
265         XSEGLOG("xsegbd active...");
266         add_disk(disk); /* immediately activates the device */
267
268         return 0;
269
270
271 outdisk:
272         put_disk(xsegbd_dev->gd);
273 outqueue:
274         blk_cleanup_queue(xsegbd_dev->blk_queue);
275 out:
276         xsegbd_dev->blk_queue = NULL;
277         xsegbd_dev->gd = NULL;
278         return ret;
279 }
280
281 static void xsegbd_dev_release(struct device *dev)
282 {
283         int ret;
284         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
285
286
287         /* cleanup gendisk and blk_queue the right way */
288         if (xsegbd_dev->gd) {
289                 if (xsegbd_dev->gd->flags & GENHD_FL_UP)
290                         del_gendisk(xsegbd_dev->gd);
291
292                 put_disk(xsegbd_dev->gd);
293                 xsegbd_mapclose(xsegbd_dev);
294         }
295         
296         spin_lock(&xsegbd_devices_lock);
297         BUG_ON(xsegbd_devices[xsegbd_dev->src_portno] != xsegbd_dev);
298         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
299         spin_unlock(&xsegbd_devices_lock);
300         
301 //      xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
302         xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
303         /* wait for all pending operations on device to end */
304         wait_event(xsegbd_dev->wq, atomic_read(&xsegbd_dev->usercount) <= 0);
305         XSEGLOG("releasing id: %d", xsegbd_dev->id);
306         if (xsegbd_dev->blk_queue)
307                 blk_cleanup_queue(xsegbd_dev->blk_queue);
308
309
310 //      if (xseg_free_requests(xsegbd_dev->xseg, 
311 //                      xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
312 //              XSEGLOG("Error trying to free requests!\n");
313
314
315         //FIXME xseg_leave to free_up resources ?
316         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
317
318         if (xsegbd_dev->blk_req_pending)
319                 kfree(xsegbd_dev->blk_req_pending);
320         xq_free(&xsegbd_dev->blk_queue_pending);
321
322         kfree(xsegbd_dev);
323
324         module_put(THIS_MODULE);
325 }
326
327 /* ******************* */
328 /* ** Critical Path ** */
329 /* ******************* */
330
331 static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq,
332                         struct request *blkreq)
333 {
334         struct bio_vec *bvec;
335         struct req_iterator iter;
336         uint64_t off = 0;
337         char *data = xseg_get_data(xseg, xreq);
338         rq_for_each_segment(bvec, blkreq, iter) {
339                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
340                 memcpy(data + off, bdata, bvec->bv_len);
341                 off += bvec->bv_len;
342                 kunmap_atomic(bdata);
343         }
344 }
345
346 static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq,
347                         struct request *blkreq)
348 {
349         struct bio_vec *bvec;
350         struct req_iterator iter;
351         uint64_t off = 0;
352         char *data = xseg_get_data(xseg, xreq);
353         rq_for_each_segment(bvec, blkreq, iter) {
354                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
355                 memcpy(bdata, data + off, bvec->bv_len);
356                 off += bvec->bv_len;
357                 kunmap_atomic(bdata);
358         }
359 }
360
361 static void xseg_request_fn(struct request_queue *rq)
362 {
363         struct xseg_request *xreq;
364         struct xsegbd_device *xsegbd_dev = rq->queuedata;
365         struct request *blkreq;
366         struct xsegbd_pending *pending;
367         xqindex blkreq_idx;
368         char *target;
369         uint64_t datalen;
370         xport p;
371         int r;
372         unsigned long flags;
373
374         __xsegbd_get(xsegbd_dev);
375
376         spin_unlock_irq(&xsegbd_dev->rqlock);
377         for (;;) {
378                 if (current_thread_info()->preempt_count || irqs_disabled()){
379                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
380                                         current_thread_info()->preempt_count, irqs_disabled());
381                 }
382                 //XSEGLOG("Priority: %d", current_thread_info()->task->prio);
383                 //XSEGLOG("Static priority: %d", current_thread_info()->task->static_prio);
384                 //XSEGLOG("Normal priority: %d", current_thread_info()->task->normal_prio);
385                 //XSEGLOG("Rt_priority: %u", current_thread_info()->task->rt_priority);
386                 blkreq_idx = Noneidx;
387                 xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, 
388                                 xsegbd_dev->dst_portno, X_ALLOC);
389                 if (!xreq)
390                         break;
391
392                 blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 
393                                                 xsegbd_dev->src_portno);
394                 if (blkreq_idx == Noneidx)
395                         break;
396                 
397                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
398                         XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
399                         BUG_ON(1);
400                         break;
401                 }
402
403                 
404                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
405                 blkreq = blk_fetch_request(rq);
406                 if (!blkreq){
407                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
408                         break;
409                 }
410
411                 if (blkreq->cmd_type != REQ_TYPE_FS) {
412                         //we lose xreq here
413                         XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
414                         __blk_end_request_all(blkreq, 0);
415                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
416                         continue;
417                 }
418                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
419                 if (current_thread_info()->preempt_count || irqs_disabled()){
420                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
421                                         current_thread_info()->preempt_count, irqs_disabled());
422                 }
423
424                 datalen = blk_rq_bytes(blkreq);
425                 r = xseg_prep_request(xsegbd_dev->xseg, xreq, 
426                                         xsegbd_dev->targetlen, datalen);
427                 if (r < 0) {
428                         XSEGLOG("couldn't prep request");
429                         blk_end_request_err(blkreq, r);
430                         BUG_ON(1);
431                         break;
432                 }
433                 r = -ENOMEM;
434                 if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
435                         XSEGLOG("malformed req buffers");
436                         blk_end_request_err(blkreq, r);
437                         BUG_ON(1);
438                         break;
439                 }
440
441                 target = xseg_get_target(xsegbd_dev->xseg, xreq);
442                 strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
443
444                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
445                 pending->dev = xsegbd_dev;
446                 pending->request = blkreq;
447                 pending->comp = NULL;
448                 
449                 xreq->size = datalen;
450                 xreq->offset = blk_rq_pos(blkreq) << 9;
451                 xreq->priv = (uint64_t) blkreq_idx;
452
453                 /*
454                 if (xreq->offset >= (sector_size << 9))
455                         XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
456                                  blk_rq_pos(blkreq), sector_size,
457                                  blkreq->cmd_flags & REQ_FLUSH,
458                                  blkreq->cmd_flags & REQ_FUA);
459                 */
460
461                 if (blkreq->cmd_flags & REQ_FLUSH)
462                         xreq->flags |= XF_FLUSH;
463
464                 if (blkreq->cmd_flags & REQ_FUA)
465                         xreq->flags |= XF_FUA;
466
467                 if (rq_data_dir(blkreq)) {
468                         /* unlock for data transfers? */
469                         blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
470                         xreq->op = X_WRITE;
471                 } else {
472                         xreq->op = X_READ;
473                 }
474
475
476 //              XSEGLOG("%s : %lu (%lu)", xsegbd_dev->target, xreq->offset, xreq->datalen);
477                 r = -EIO;
478                 /* xsegbd_get here. will be put on receive */
479                 __xsegbd_get(xsegbd_dev);
480                 p = xseg_submit(xsegbd_dev->xseg, xreq, 
481                                         xsegbd_dev->src_portno, X_ALLOC);
482                 if (p == NoPort) {
483                         XSEGLOG("coundn't submit req");
484                         WARN_ON(1);
485                         blk_end_request_err(blkreq, r);
486                         __xsegbd_put(xsegbd_dev);
487                         break;
488                 }
489                 WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
490         }
491         if (xreq)
492                 BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
493                                         xsegbd_dev->src_portno) == -1);
494         if (blkreq_idx != Noneidx)
495                 BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
496                                 blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
497         spin_lock_irq(&xsegbd_dev->rqlock);
498         __xsegbd_put(xsegbd_dev);
499 }
500
501 int update_dev_sectors_from_request(    struct xsegbd_device *xsegbd_dev,
502                                         struct xseg_request *xreq       )
503 {
504         void *data;
505         if (!xreq) {
506                 XSEGLOG("Invalid xreq");
507                 return -EIO;
508         }
509
510         if (xreq->state & XS_FAILED)
511                 return -ENOENT;
512
513         if (!(xreq->state & XS_SERVED))
514                 return -EIO;
515
516         data = xseg_get_data(xsegbd_dev->xseg, xreq);
517         if (!data) {
518                 XSEGLOG("Invalid req data");
519                 return -EIO;
520         }
521         if (!xsegbd_dev) {
522                 XSEGLOG("Invalid xsegbd_dev");
523                 return -ENOENT;
524         }
525         xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL;
526         return 0;
527 }
528
529 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
530 {
531         struct xseg_request *xreq;
532         char *target;
533         uint64_t datalen;
534         xqindex blkreq_idx;
535         struct xsegbd_pending *pending;
536         struct completion comp;
537         xport p;
538         void *data;
539         int ret = -EBUSY, r;
540
541         __xsegbd_get(xsegbd_dev);
542
543         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
544                         xsegbd_dev->dst_portno, X_ALLOC);
545         if (!xreq)
546                 goto out;
547
548         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 
549                                 sizeof(struct xseg_reply_info)));
550
551         init_completion(&comp);
552         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
553         if (blkreq_idx == Noneidx)
554                 goto out_put;
555         
556         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
557         pending->dev = xsegbd_dev;
558         pending->request = NULL;
559         pending->comp = &comp;
560
561         
562         xreq->priv = (uint64_t) blkreq_idx;
563
564         target = xseg_get_target(xsegbd_dev->xseg, xreq);
565         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
566         xreq->size = xreq->datalen;
567         xreq->offset = 0;
568         xreq->op = X_INFO;
569
570         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
571         p = xseg_submit(xsegbd_dev->xseg, xreq, 
572                                 xsegbd_dev->src_portno, X_ALLOC);
573         if ( p == NoPort) {
574                 XSEGLOG("couldn't submit request");
575                 BUG_ON(1);
576                 goto out_queue;
577         }
578         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
579         XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
580         wait_for_completion_interruptible(&comp);
581         XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
582         ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
583         XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
584 out_put:
585         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
586 out:
587         __xsegbd_put(xsegbd_dev);
588         return ret;
589
590 out_queue:
591         pending->dev = NULL;
592         pending->comp = NULL;
593         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
594         
595         goto out;
596 }
597
598 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
599 {
600         struct xseg_request *xreq;
601         char *target;
602         uint64_t datalen;
603         xqindex blkreq_idx;
604         struct xsegbd_pending *pending;
605         struct completion comp;
606         xport p;
607         void *data;
608         int ret = -EBUSY, r;
609
610         __xsegbd_get(xsegbd_dev);
611         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
612                         xsegbd_dev->dst_portno, X_ALLOC);
613         if (!xreq)
614                 goto out;
615
616         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
617
618         init_completion(&comp);
619         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
620         if (blkreq_idx == Noneidx)
621                 goto out_put;
622         
623         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
624         pending->dev = xsegbd_dev;
625         pending->request = NULL;
626         pending->comp = &comp;
627
628         
629         xreq->priv = (uint64_t) blkreq_idx;
630
631         target = xseg_get_target(xsegbd_dev->xseg, xreq);
632         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
633         xreq->size = xreq->datalen;
634         xreq->offset = 0;
635         xreq->op = X_CLOSE;
636
637         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
638         p = xseg_submit(xsegbd_dev->xseg, xreq, 
639                                 xsegbd_dev->src_portno, X_ALLOC);
640         if ( p == NoPort) {
641                 XSEGLOG("couldn't submit request");
642                 BUG_ON(1);
643                 goto out_queue;
644         }
645         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
646         wait_for_completion_interruptible(&comp);
647         ret = 0;
648         if (xreq->state & XS_FAILED)
649                 XSEGLOG("Couldn't close disk on mapper");
650 out_put:
651         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
652 out:
653         __xsegbd_put(xsegbd_dev);
654         return ret;
655
656 out_queue:
657         pending->dev = NULL;
658         pending->comp = NULL;
659         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
660         
661         goto out;
662 }
663
664 static void xseg_callback(xport portno)
665 {
666         struct xsegbd_device *xsegbd_dev;
667         struct xseg_request *xreq;
668         struct request *blkreq;
669         struct xsegbd_pending *pending;
670         unsigned long flags;
671         xqindex blkreq_idx, ridx;
672         int err;
673         void *data;
674
675         xsegbd_dev  = __xsegbd_get_dev(portno);
676         if (!xsegbd_dev) {
677                 XSEGLOG("portno: %u has no xsegbd device assigned", portno);
678                 WARN_ON(1);
679                 return;
680         }
681
682         for (;;) {
683                 xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
684                 xreq = xseg_receive(xsegbd_dev->xseg, portno, 0);
685                 if (!xreq)
686                         break;
687
688 //              xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
689
690                 blkreq_idx = (xqindex) xreq->priv;
691                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
692                         WARN_ON(1);
693                         //FIXME maybe put request?
694                         continue;
695                 }
696
697                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
698                 if (pending->comp) {
699                         /* someone is blocking on this request
700                            and will handle it when we wake them up. */
701                         complete(pending->comp);
702                         /* the request is blocker's responsibility so
703                            we will not put_request(); */
704
705                         continue;
706                 }
707
708                 /* this is now treated as a block I/O request to end */
709                 blkreq = pending->request;
710                 pending->request = NULL;
711                 if (xsegbd_dev != pending->dev) {
712                         //FIXME maybe put request?
713                         XSEGLOG("xsegbd_dev != pending->dev");
714                         BUG_ON(1);
715                         continue;
716                 }
717                 pending->dev = NULL;
718                 if (!blkreq){
719                         //FIXME maybe put request?
720                         XSEGLOG("blkreq does not exist");
721                         BUG_ON(1);
722                         continue;
723                 }
724
725                 err = -EIO;
726                 if (!(xreq->state & XS_SERVED))
727                         goto blk_end;
728
729                 if (xreq->serviced != blk_rq_bytes(blkreq))
730                         goto blk_end;
731
732                 err = 0;
733                 if (!rq_data_dir(blkreq)){
734                         xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
735                 }       
736 blk_end:
737                 blk_end_request_all(blkreq, err);
738                 
739                 ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
740                                         blkreq_idx, xsegbd_dev->src_portno);
741                 if (ridx == Noneidx) {
742                         XSEGLOG("couldnt append blkreq_idx");
743                         WARN_ON(1);
744                 }
745
746                 if (xseg_put_request(xsegbd_dev->xseg, xreq, 
747                                                 xsegbd_dev->src_portno) < 0){
748                         XSEGLOG("couldn't put req");
749                         BUG_ON(1);
750                 }
751                 __xsegbd_put(xsegbd_dev);
752         }
753         if (xsegbd_dev) {
754                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
755                 xseg_request_fn(xsegbd_dev->blk_queue);
756                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
757                 __xsegbd_put(xsegbd_dev);
758         }
759 }
760
761
762 /* sysfs interface */
763
764 static struct bus_type xsegbd_bus_type = {
765         .name   = "xsegbd",
766 };
767
768 static ssize_t xsegbd_size_show(struct device *dev,
769                                         struct device_attribute *attr, char *buf)
770 {
771         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
772
773         return sprintf(buf, "%llu\n", (unsigned long long) xsegbd_dev->sectors * 512ULL);
774 }
775
776 static ssize_t xsegbd_major_show(struct device *dev,
777                                         struct device_attribute *attr, char *buf)
778 {
779         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
780
781         return sprintf(buf, "%d\n", xsegbd_dev->major);
782 }
783
784 static ssize_t xsegbd_srcport_show(struct device *dev,
785                                         struct device_attribute *attr, char *buf)
786 {
787         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
788
789         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->src_portno);
790 }
791
792 static ssize_t xsegbd_dstport_show(struct device *dev,
793                                         struct device_attribute *attr, char *buf)
794 {
795         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
796
797         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->dst_portno);
798 }
799
800 static ssize_t xsegbd_id_show(struct device *dev,
801                                         struct device_attribute *attr, char *buf)
802 {
803         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
804
805         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->id);
806 }
807
808 static ssize_t xsegbd_reqs_show(struct device *dev,
809                                         struct device_attribute *attr, char *buf)
810 {
811         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
812
813         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->nr_requests);
814 }
815
816 static ssize_t xsegbd_target_show(struct device *dev,
817                                         struct device_attribute *attr, char *buf)
818 {
819         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
820
821         return sprintf(buf, "%s\n", xsegbd_dev->target);
822 }
823
824 static ssize_t xsegbd_image_refresh(struct device *dev,
825                                         struct device_attribute *attr,
826                                         const char *buf,
827                                         size_t size)
828 {
829         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
830         int rc, ret = size;
831
832         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
833
834         rc = xsegbd_get_size(xsegbd_dev);
835         if (rc < 0) {
836                 ret = rc;
837                 goto out;
838         }
839
840         set_capacity(xsegbd_dev->gd, xsegbd_dev->sectors);
841
842 out:
843         mutex_unlock(&xsegbd_mutex);
844         return ret;
845 }
846
847 //FIXME
848 //maybe try callback, first and then do a more invasive cleanup
849 //DO NOT forget to put device!!
850 static ssize_t xsegbd_cleanup(struct device *dev,
851                                         struct device_attribute *attr,
852                                         const char *buf,
853                                         size_t size)
854 {
855         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
856         int ret = size, i;
857         struct request *blkreq = NULL;
858         struct xsegbd_pending *pending = NULL;
859         struct completion *comp = NULL;
860
861         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
862         xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, 
863                                 xsegbd_dev->src_portno);
864         for (i = 0; i < xsegbd_dev->nr_requests; i++) {
865                 if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
866                         pending = &xsegbd_dev->blk_req_pending[i];
867                         blkreq = pending->request;
868                         pending->request = NULL;
869                         comp = pending->comp;
870                         pending->comp = NULL;
871                         if (blkreq){
872                                 XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i);
873                                 blk_end_request_all(blkreq, -EIO);
874                         }
875                         if (comp){
876                                 XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i);
877                                 complete(comp);
878                         }
879                         __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
880                 }
881         }
882         xlock_release(&xsegbd_dev->blk_queue_pending.lock);
883
884         mutex_unlock(&xsegbd_mutex);
885         return ret;
886 }
887
888 static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
889 static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
890 static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
891 static DEVICE_ATTR(dstport, S_IRUGO, xsegbd_dstport_show, NULL);
892 static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL);
893 static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
894 static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
895 static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
896 static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
897
898 static struct attribute *xsegbd_attrs[] = {
899         &dev_attr_size.attr,
900         &dev_attr_major.attr,
901         &dev_attr_srcport.attr,
902         &dev_attr_dstport.attr,
903         &dev_attr_id.attr,
904         &dev_attr_reqs.attr,
905         &dev_attr_target.attr,
906         &dev_attr_refresh.attr,
907         &dev_attr_cleanup.attr,
908         NULL
909 };
910
911 static struct attribute_group xsegbd_attr_group = {
912         .attrs = xsegbd_attrs,
913 };
914
915 static const struct attribute_group *xsegbd_attr_groups[] = {
916         &xsegbd_attr_group,
917         NULL
918 };
919
920 static void xsegbd_sysfs_dev_release(struct device *dev)
921 {
922 }
923
924 static struct device_type xsegbd_device_type = {
925         .name           = "xsegbd",
926         .groups         = xsegbd_attr_groups,
927         .release        = xsegbd_sysfs_dev_release,
928 };
929
930 static void xsegbd_root_dev_release(struct device *dev)
931 {
932 }
933
934 static struct device xsegbd_root_dev = {
935         .init_name      = "xsegbd",
936         .release        = xsegbd_root_dev_release,
937 };
938
939 static int xsegbd_bus_add_dev(struct xsegbd_device *xsegbd_dev)
940 {
941         int ret = -ENOMEM;
942         struct device *dev;
943
944         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
945         dev = &xsegbd_dev->dev;
946
947         dev->bus = &xsegbd_bus_type;
948         dev->type = &xsegbd_device_type;
949         dev->parent = &xsegbd_root_dev;
950         dev->release = xsegbd_dev_release;
951         dev_set_name(dev, "%d", xsegbd_dev->id);
952
953         ret = device_register(dev);
954
955         mutex_unlock(&xsegbd_mutex);
956         return ret;
957 }
958
959 static void xsegbd_bus_del_dev(struct xsegbd_device *xsegbd_dev)
960 {
961         device_unregister(&xsegbd_dev->dev);
962 }
963
964 static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
965 {
966         struct xsegbd_device *xsegbd_dev;
967         struct xseg_port *port;
968         ssize_t ret = -ENOMEM;
969
970         if (!try_module_get(THIS_MODULE))
971                 return -ENODEV;
972
973         xsegbd_dev = kzalloc(sizeof(*xsegbd_dev), GFP_KERNEL);
974         if (!xsegbd_dev)
975                 goto out;
976
977         spin_lock_init(&xsegbd_dev->rqlock);
978         INIT_LIST_HEAD(&xsegbd_dev->node);
979         init_waitqueue_head(&xsegbd_dev->wq);
980         atomic_set(&xsegbd_dev->usercount, 0);
981
982         /* parse cmd */
983         if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
984                         "%d:%d:%d", xsegbd_dev->target, &xsegbd_dev->src_portno,
985                         &xsegbd_dev->dst_portno, &xsegbd_dev->nr_requests) < 3) {
986                 ret = -EINVAL;
987                 goto out_dev;
988         }
989         xsegbd_dev->targetlen = strlen(xsegbd_dev->target);
990
991         spin_lock(&xsegbd_devices_lock);
992         if (xsegbd_devices[xsegbd_dev->src_portno] != NULL) {
993                 ret = -EINVAL;
994                 goto out_unlock;
995         }
996         xsegbd_devices[xsegbd_dev->src_portno] = xsegbd_dev;
997         xsegbd_dev->id = xsegbd_dev->src_portno;
998         spin_unlock(&xsegbd_devices_lock);
999
1000         XSEGLOG("registering block device major %d", major);
1001         ret = register_blkdev(major, XSEGBD_NAME);
1002         if (ret < 0) {
1003                 XSEGLOG("cannot register block device!");
1004                 ret = -EBUSY;
1005                 goto out_delentry;
1006         }
1007         xsegbd_dev->major = ret;
1008         XSEGLOG("registered block device major %d", xsegbd_dev->major);
1009
1010         ret = xsegbd_bus_add_dev(xsegbd_dev);
1011         if (ret)
1012                 goto out_blkdev;
1013
1014         if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending, 
1015                                 xsegbd_dev->nr_requests,
1016                                 xsegbd_dev->nr_requests))
1017                 goto out_bus;
1018
1019         xsegbd_dev->blk_req_pending = kzalloc(
1020                         xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
1021                                    GFP_KERNEL);
1022         if (!xsegbd_dev->blk_req_pending)
1023                 goto out_freeq;
1024
1025         
1026         XSEGLOG("joining segment");
1027         //FIXME use xsebd module config for now
1028         xsegbd_dev->xseg = xseg_join(   xsegbd.config.type,
1029                                         xsegbd.config.name,
1030                                         "segdev",
1031                                         xseg_callback           );
1032         if (!xsegbd_dev->xseg)
1033                 goto out_freepending;
1034         __sync_synchronize();
1035         
1036         XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target,
1037                         xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
1038         port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
1039         if (!port) {
1040                 XSEGLOG("cannot bind to port");
1041                 ret = -EFAULT;
1042
1043                 goto out_xseg;
1044         }
1045         
1046         if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
1047                 XSEGLOG("portno != xsegbd_dev->src_portno");
1048                 BUG_ON(1);
1049                 ret = -EFAULT;
1050                 goto out_xseg;
1051         }
1052         xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
1053
1054
1055         /* make sure we don't get any requests until we're ready to handle them */
1056         xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1057
1058         ret = xsegbd_dev_init(xsegbd_dev);
1059         if (ret)
1060                 goto out_signal;
1061
1062         xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1063         return count;
1064
1065 out_signal:
1066         xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
1067 out_xseg:
1068         xseg_leave(xsegbd_dev->xseg);
1069         
1070 out_freepending:
1071         kfree(xsegbd_dev->blk_req_pending);
1072
1073 out_freeq:
1074         xq_free(&xsegbd_dev->blk_queue_pending);
1075
1076 out_bus:
1077         xsegbd_bus_del_dev(xsegbd_dev);
1078         return ret;
1079
1080 out_blkdev:
1081         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
1082
1083 out_delentry:
1084         spin_lock(&xsegbd_devices_lock);
1085         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
1086
1087 out_unlock:
1088         spin_unlock(&xsegbd_devices_lock);
1089
1090 out_dev:
1091         kfree(xsegbd_dev);
1092
1093 out:
1094         return ret;
1095 }
1096
1097 static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count)
1098 {
1099         struct xsegbd_device *xsegbd_dev = NULL;
1100         int id, ret;
1101         unsigned long ul_id;
1102
1103         ret = strict_strtoul(buf, 10, &ul_id);
1104         if (ret)
1105                 return ret;
1106
1107         id = (int) ul_id;
1108         if (id != ul_id)
1109                 return -EINVAL;
1110
1111         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
1112
1113         ret = count;
1114         xsegbd_dev = __xsegbd_get_dev(id);
1115         if (!xsegbd_dev) {
1116                 ret = -ENOENT;
1117                 goto out_unlock;
1118         }
1119         __xsegbd_put(xsegbd_dev);
1120         xsegbd_bus_del_dev(xsegbd_dev);
1121
1122 out_unlock:
1123         mutex_unlock(&xsegbd_mutex);
1124         return ret;
1125 }
1126
1127 static struct bus_attribute xsegbd_bus_attrs[] = {
1128         __ATTR(add, S_IWUSR, NULL, xsegbd_add),
1129         __ATTR(remove, S_IWUSR, NULL, xsegbd_remove),
1130         __ATTR_NULL
1131 };
1132
1133 static int xsegbd_sysfs_init(void)
1134 {
1135         int ret;
1136
1137         ret = device_register(&xsegbd_root_dev);
1138         if (ret < 0)
1139                 return ret;
1140
1141         xsegbd_bus_type.bus_attrs = xsegbd_bus_attrs;
1142         ret = bus_register(&xsegbd_bus_type);
1143         if (ret < 0)
1144                 device_unregister(&xsegbd_root_dev);
1145
1146         return ret;
1147 }
1148
1149 static void xsegbd_sysfs_cleanup(void)
1150 {
1151         bus_unregister(&xsegbd_bus_type);
1152         device_unregister(&xsegbd_root_dev);
1153 }
1154
1155 /* *************************** */
1156 /* ** Module Initialization ** */
1157 /* *************************** */
1158
1159 static int __init xsegbd_init(void)
1160 {
1161         int ret = -ENOMEM;
1162         xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL);
1163         if (!xsegbd_devices)
1164                 goto out;
1165
1166         spin_lock_init(&xsegbd_devices_lock);
1167
1168         ret = -ENOSYS;
1169         ret = xsegbd_xseg_init();
1170         if (ret)
1171                 goto out_free;
1172
1173         ret = xsegbd_sysfs_init();
1174         if (ret)
1175                 goto out_xseg;
1176
1177         XSEGLOG("initialization complete");
1178
1179 out:
1180         return ret;
1181
1182 out_xseg:
1183         xsegbd_xseg_quit();
1184         
1185 out_free:
1186         kfree(xsegbd_devices);
1187
1188         goto out;
1189 }
1190
1191 static void __exit xsegbd_exit(void)
1192 {
1193         xsegbd_sysfs_cleanup();
1194         xsegbd_xseg_quit();
1195 }
1196
1197 module_init(xsegbd_init);
1198 module_exit(xsegbd_exit);
1199