add compile time options to xlock
[archipelago] / xseg / peers / kernel / xsegbd.c
1 /* xsegbd.c
2  *
3  */
4
5 #include <linux/module.h>
6 #include <linux/moduleparam.h>
7 #include <linux/init.h>
8 #include <linux/sched.h>
9 #include <linux/kernel.h>
10 #include <linux/slab.h>
11 #include <linux/fs.h>
12 #include <linux/errno.h>
13 #include <linux/timer.h>
14 #include <linux/types.h>
15 #include <linux/vmalloc.h>
16 #include <linux/genhd.h>
17 #include <linux/blkdev.h>
18 #include <linux/bio.h>
19 #include <linux/device.h>
20 #include <linux/completion.h>
21 #include <linux/wait.h>
22 #include <sys/kernel/segdev.h>
23 #include "xsegbd.h"
24 #include <xseg/protocol.h>
25
26 #define XSEGBD_MINORS 1
27 /* define max request size to be used in xsegbd */
28 //FIXME should we make this 4MB instead of 256KB ?
29 #define XSEGBD_MAX_REQUEST_SIZE 262144U
30
31 MODULE_DESCRIPTION("xsegbd");
32 MODULE_AUTHOR("XSEG");
33 MODULE_LICENSE("GPL");
34
35 static long sector_size = 0;
36 static long blksize = 512;
37 static int major = 0;
38 static int max_dev = 1024;
39 static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd";
40 static char spec[256] = "segdev:xsegbd:4:1024:12";
41
42 module_param(sector_size, long, 0644);
43 module_param(blksize, long, 0644);
44 module_param(max_dev, int, 0644);
45 module_param(major, int, 0644);
46 module_param_string(name, name, sizeof(name), 0644);
47 module_param_string(spec, spec, sizeof(spec), 0644);
48
49 static struct xsegbd xsegbd;
50 static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
51 static DEFINE_MUTEX(xsegbd_mutex);
52 static DEFINE_SPINLOCK(xsegbd_devices_lock);
53
54
55 void __xsegbd_get(struct xsegbd_device *xsegbd_dev)
56 {
57         atomic_inc(&xsegbd_dev->usercount);
58 }
59
60 void __xsegbd_put(struct xsegbd_device *xsegbd_dev)
61 {
62         if (atomic_dec_and_test(&xsegbd_dev->usercount))
63                 wake_up(&xsegbd_dev->wq);
64 }
65
66 struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
67 {
68         struct xsegbd_device *xsegbd_dev = NULL;
69
70         spin_lock(&xsegbd_devices_lock);
71         xsegbd_dev = xsegbd_devices[id];
72         if (xsegbd_dev)
73                 __xsegbd_get(xsegbd_dev);
74         spin_unlock(&xsegbd_devices_lock);
75
76         return xsegbd_dev;
77 }
78
79 /* ************************* */
80 /* ***** sysfs helpers ***** */
81 /* ************************* */
82
83 static struct xsegbd_device *dev_to_xsegbd(struct device *dev)
84 {
85         return container_of(dev, struct xsegbd_device, dev);
86 }
87
88 static struct device *xsegbd_get_dev(struct xsegbd_device *xsegbd_dev)
89 {
90         /* FIXME */
91         return get_device(&xsegbd_dev->dev);
92 }
93
94 static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev)
95 {
96         put_device(&xsegbd_dev->dev);
97 }
98
99 /* ************************* */
100 /* ** XSEG Initialization ** */
101 /* ************************* */
102
103 static void xseg_callback(uint32_t portno);
104
105 int xsegbd_xseg_init(void)
106 {
107         int r;
108
109         if (!xsegbd.name[0])
110                 strncpy(xsegbd.name, name, XSEGBD_SEGMENT_NAMELEN);
111
112         r = xseg_initialize();
113         if (r) {
114                 XSEGLOG("cannot initialize 'segdev' peer");
115                 goto err;
116         }
117
118         r = xseg_parse_spec(spec, &xsegbd.config);
119         if (r)
120                 goto err;
121
122         if (strncmp(xsegbd.config.type, "segdev", 16))
123                 XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'",
124                          xsegbd.config.type);
125
126         /* leave it here for now */
127         XSEGLOG("joining segment");
128         xsegbd.xseg = xseg_join(        xsegbd.config.type,
129                                         xsegbd.config.name,
130                                         "segdev",
131                                         xseg_callback           );
132         if (!xsegbd.xseg) {
133                 XSEGLOG("cannot find segment");
134                 r = -ENODEV;
135                 goto err;
136         }
137
138         return 0;
139 err:
140         return r;
141
142 }
143
144 int xsegbd_xseg_quit(void)
145 {
146         struct segdev *segdev;
147
148         /* make sure to unmap the segment first */
149         segdev = segdev_get(0);
150         clear_bit(SEGDEV_RESERVED, &segdev->flags);
151         xsegbd.xseg->priv->segment_type.ops.unmap(xsegbd.xseg, xsegbd.xseg->segment_size);
152         segdev_put(segdev);
153
154         return 0;
155 }
156
157
158 /* ***************************** */
159 /* ** Block Device Operations ** */
160 /* ***************************** */
161
162 static int xsegbd_open(struct block_device *bdev, fmode_t mode)
163 {
164         struct gendisk *disk = bdev->bd_disk;
165         struct xsegbd_device *xsegbd_dev = disk->private_data;
166
167         xsegbd_get_dev(xsegbd_dev);
168
169         return 0;
170 }
171
172 static int xsegbd_release(struct gendisk *gd, fmode_t mode)
173 {
174         struct xsegbd_device *xsegbd_dev = gd->private_data;
175
176         xsegbd_put_dev(xsegbd_dev);
177
178         return 0;
179 }
180
181 static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode,
182                         unsigned int cmd, unsigned long arg)
183 {
184         return -ENOTTY;
185 }
186
187 static const struct block_device_operations xsegbd_ops = {
188         .owner          = THIS_MODULE,
189         .open           = xsegbd_open,
190         .release        = xsegbd_release,
191         .ioctl          = xsegbd_ioctl 
192 };
193
194
195 /* *************************** */
196 /* ** Device Initialization ** */
197 /* *************************** */
198
199 static void xseg_request_fn(struct request_queue *rq);
200 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev);
201 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev);
202
203 static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
204 {
205         int ret = -ENOMEM;
206         struct gendisk *disk;
207         unsigned int max_request_size_bytes;
208
209         spin_lock_init(&xsegbd_dev->rqlock);
210
211         xsegbd_dev->xsegbd = &xsegbd;
212
213         xsegbd_dev->blk_queue = blk_alloc_queue(GFP_KERNEL);
214         if (!xsegbd_dev->blk_queue)
215                 goto out;
216
217         if (!blk_init_allocated_queue(xsegbd_dev->blk_queue, 
218                         xseg_request_fn, &xsegbd_dev->rqlock))
219                 goto outqueue;
220
221         xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
222
223         blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
224         blk_queue_logical_block_size(xsegbd_dev->blk_queue, 512);
225         blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
226         blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
227         
228         //blk_queue_max_segments(dev->blk_queue, 512);
229
230         max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
231         blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
232         blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
233         blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
234         blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
235
236         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue);
237
238         /* vkoukis says we don't need partitions */
239         xsegbd_dev->gd = disk = alloc_disk(1);
240         if (!disk)
241                 goto outqueue;
242
243         disk->major = xsegbd_dev->major;
244         disk->first_minor = 0; // id * XSEGBD_MINORS;
245         disk->fops = &xsegbd_ops;
246         disk->queue = xsegbd_dev->blk_queue;
247         disk->private_data = xsegbd_dev;
248         disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO;
249         snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id);
250
251         ret = 0;
252         
253         /* allow a non-zero sector_size parameter to override the disk size */
254         if (sector_size)
255                 xsegbd_dev->sectors = sector_size;
256         else {
257                 ret = xsegbd_get_size(xsegbd_dev);
258                 if (ret)
259                         goto outdisk;
260         }
261
262         set_capacity(disk, xsegbd_dev->sectors);
263         XSEGLOG("xsegbd active...");
264         add_disk(disk); /* immediately activates the device */
265
266         return 0;
267
268
269 outdisk:
270         put_disk(xsegbd_dev->gd);
271 outqueue:
272         blk_cleanup_queue(xsegbd_dev->blk_queue);
273 out:
274         xsegbd_dev->blk_queue = NULL;
275         xsegbd_dev->gd = NULL;
276         return ret;
277 }
278
279 static void xsegbd_dev_release(struct device *dev)
280 {
281         int ret;
282         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
283
284
285         /* cleanup gendisk and blk_queue the right way */
286         if (xsegbd_dev->gd) {
287                 if (xsegbd_dev->gd->flags & GENHD_FL_UP)
288                         del_gendisk(xsegbd_dev->gd);
289
290                 put_disk(xsegbd_dev->gd);
291                 xsegbd_mapclose(xsegbd_dev);
292         }
293         
294         spin_lock(&xsegbd_devices_lock);
295         BUG_ON(xsegbd_devices[xsegbd_dev->src_portno] != xsegbd_dev);
296         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
297         spin_unlock(&xsegbd_devices_lock);
298         
299 //      xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
300         xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
301         /* wait for all pending operations on device to end */
302         wait_event(xsegbd_dev->wq, atomic_read(&xsegbd_dev->usercount) <= 0);
303         XSEGLOG("releasing id: %d", xsegbd_dev->id);
304         if (xsegbd_dev->blk_queue)
305                 blk_cleanup_queue(xsegbd_dev->blk_queue);
306
307
308 //      if (xseg_free_requests(xsegbd_dev->xseg, 
309 //                      xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
310 //              XSEGLOG("Error trying to free requests!\n");
311
312
313         //FIXME xseg_leave to free_up resources ?
314         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
315
316         if (xsegbd_dev->blk_req_pending)
317                 kfree(xsegbd_dev->blk_req_pending);
318         xq_free(&xsegbd_dev->blk_queue_pending);
319
320         kfree(xsegbd_dev);
321
322         module_put(THIS_MODULE);
323 }
324
325 /* ******************* */
326 /* ** Critical Path ** */
327 /* ******************* */
328
329 static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq,
330                         struct request *blkreq)
331 {
332         struct bio_vec *bvec;
333         struct req_iterator iter;
334         uint64_t off = 0;
335         char *data = xseg_get_data(xseg, xreq);
336         rq_for_each_segment(bvec, blkreq, iter) {
337                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
338                 memcpy(data + off, bdata, bvec->bv_len);
339                 off += bvec->bv_len;
340                 kunmap_atomic(bdata);
341         }
342 }
343
344 static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq,
345                         struct request *blkreq)
346 {
347         struct bio_vec *bvec;
348         struct req_iterator iter;
349         uint64_t off = 0;
350         char *data = xseg_get_data(xseg, xreq);
351         rq_for_each_segment(bvec, blkreq, iter) {
352                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
353                 memcpy(bdata, data + off, bvec->bv_len);
354                 off += bvec->bv_len;
355                 kunmap_atomic(bdata);
356         }
357 }
358
359 static void xseg_request_fn(struct request_queue *rq)
360 {
361         struct xseg_request *xreq;
362         struct xsegbd_device *xsegbd_dev = rq->queuedata;
363         struct request *blkreq;
364         struct xsegbd_pending *pending;
365         xqindex blkreq_idx;
366         char *target;
367         uint64_t datalen;
368         xport p;
369         int r;
370         unsigned long flags;
371
372         __xsegbd_get(xsegbd_dev);
373
374         spin_unlock_irq(&xsegbd_dev->rqlock);
375         for (;;) {
376                 if (current_thread_info()->preempt_count || irqs_disabled()){
377                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
378                                         current_thread_info()->preempt_count, irqs_disabled());
379                 }
380                 //XSEGLOG("Priority: %d", current_thread_info()->task->prio);
381                 //XSEGLOG("Static priority: %d", current_thread_info()->task->static_prio);
382                 //XSEGLOG("Normal priority: %d", current_thread_info()->task->normal_prio);
383                 //XSEGLOG("Rt_priority: %u", current_thread_info()->task->rt_priority);
384                 blkreq_idx = Noneidx;
385                 xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, 
386                                 xsegbd_dev->dst_portno, X_ALLOC);
387                 if (!xreq)
388                         break;
389
390                 blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 
391                                                 xsegbd_dev->src_portno);
392                 if (blkreq_idx == Noneidx)
393                         break;
394                 
395                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
396                         XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
397                         BUG_ON(1);
398                         break;
399                 }
400
401                 
402                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
403                 blkreq = blk_fetch_request(rq);
404                 if (!blkreq){
405                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
406                         break;
407                 }
408
409                 if (blkreq->cmd_type != REQ_TYPE_FS) {
410                         //we lose xreq here
411                         XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
412                         __blk_end_request_all(blkreq, 0);
413                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
414                         continue;
415                 }
416                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
417                 if (current_thread_info()->preempt_count || irqs_disabled()){
418                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
419                                         current_thread_info()->preempt_count, irqs_disabled());
420                 }
421
422                 datalen = blk_rq_bytes(blkreq);
423                 r = xseg_prep_request(xsegbd_dev->xseg, xreq, 
424                                         xsegbd_dev->targetlen, datalen);
425                 if (r < 0) {
426                         XSEGLOG("couldn't prep request");
427                         blk_end_request_err(blkreq, r);
428                         BUG_ON(1);
429                         break;
430                 }
431                 r = -ENOMEM;
432                 if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
433                         XSEGLOG("malformed req buffers");
434                         blk_end_request_err(blkreq, r);
435                         BUG_ON(1);
436                         break;
437                 }
438
439                 target = xseg_get_target(xsegbd_dev->xseg, xreq);
440                 strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
441
442                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
443                 pending->dev = xsegbd_dev;
444                 pending->request = blkreq;
445                 pending->comp = NULL;
446                 
447                 xreq->size = datalen;
448                 xreq->offset = blk_rq_pos(blkreq) << 9;
449                 xreq->priv = (uint64_t) blkreq_idx;
450
451                 /*
452                 if (xreq->offset >= (sector_size << 9))
453                         XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
454                                  blk_rq_pos(blkreq), sector_size,
455                                  blkreq->cmd_flags & REQ_FLUSH,
456                                  blkreq->cmd_flags & REQ_FUA);
457                 */
458
459                 if (blkreq->cmd_flags & REQ_FLUSH)
460                         xreq->flags |= XF_FLUSH;
461
462                 if (blkreq->cmd_flags & REQ_FUA)
463                         xreq->flags |= XF_FUA;
464
465                 if (rq_data_dir(blkreq)) {
466                         /* unlock for data transfers? */
467                         blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
468                         xreq->op = X_WRITE;
469                 } else {
470                         xreq->op = X_READ;
471                 }
472
473
474 //              XSEGLOG("%s : %lu (%lu)", xsegbd_dev->target, xreq->offset, xreq->datalen);
475                 r = -EIO;
476                 /* xsegbd_get here. will be put on receive */
477                 __xsegbd_get(xsegbd_dev);
478                 p = xseg_submit(xsegbd_dev->xseg, xreq, 
479                                         xsegbd_dev->src_portno, X_ALLOC);
480                 if (p == NoPort) {
481                         XSEGLOG("coundn't submit req");
482                         WARN_ON(1);
483                         blk_end_request_err(blkreq, r);
484                         __xsegbd_put(xsegbd_dev);
485                         break;
486                 }
487                 WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
488         }
489         if (xreq)
490                 BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
491                                         xsegbd_dev->src_portno) == -1);
492         if (blkreq_idx != Noneidx)
493                 BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
494                                 blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
495         spin_lock_irq(&xsegbd_dev->rqlock);
496         __xsegbd_put(xsegbd_dev);
497 }
498
499 int update_dev_sectors_from_request(    struct xsegbd_device *xsegbd_dev,
500                                         struct xseg_request *xreq       )
501 {
502         void *data;
503         if (!xreq) {
504                 XSEGLOG("Invalid xreq");
505                 return -EIO;
506         }
507
508         if (xreq->state & XS_FAILED)
509                 return -ENOENT;
510
511         if (!(xreq->state & XS_SERVED))
512                 return -EIO;
513
514         data = xseg_get_data(xsegbd_dev->xseg, xreq);
515         if (!data) {
516                 XSEGLOG("Invalid req data");
517                 return -EIO;
518         }
519         if (!xsegbd_dev) {
520                 XSEGLOG("Invalid xsegbd_dev");
521                 return -ENOENT;
522         }
523         xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL;
524         return 0;
525 }
526
527 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
528 {
529         struct xseg_request *xreq;
530         char *target;
531         uint64_t datalen;
532         xqindex blkreq_idx;
533         struct xsegbd_pending *pending;
534         struct completion comp;
535         xport p;
536         void *data;
537         int ret = -EBUSY, r;
538
539         __xsegbd_get(xsegbd_dev);
540
541         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
542                         xsegbd_dev->dst_portno, X_ALLOC);
543         if (!xreq)
544                 goto out;
545
546         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 
547                                 sizeof(struct xseg_reply_info)));
548
549         init_completion(&comp);
550         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
551         if (blkreq_idx == Noneidx)
552                 goto out_put;
553         
554         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
555         pending->dev = xsegbd_dev;
556         pending->request = NULL;
557         pending->comp = &comp;
558
559         
560         xreq->priv = (uint64_t) blkreq_idx;
561
562         target = xseg_get_target(xsegbd_dev->xseg, xreq);
563         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
564         xreq->size = xreq->datalen;
565         xreq->offset = 0;
566         xreq->op = X_INFO;
567
568         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
569         p = xseg_submit(xsegbd_dev->xseg, xreq, 
570                                 xsegbd_dev->src_portno, X_ALLOC);
571         if ( p == NoPort) {
572                 XSEGLOG("couldn't submit request");
573                 BUG_ON(1);
574                 goto out_queue;
575         }
576         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
577         XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
578         wait_for_completion_interruptible(&comp);
579         XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
580         ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
581         XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
582 out_put:
583         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
584 out:
585         __xsegbd_put(xsegbd_dev);
586         return ret;
587
588 out_queue:
589         pending->dev = NULL;
590         pending->comp = NULL;
591         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
592         
593         goto out;
594 }
595
596 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
597 {
598         struct xseg_request *xreq;
599         char *target;
600         uint64_t datalen;
601         xqindex blkreq_idx;
602         struct xsegbd_pending *pending;
603         struct completion comp;
604         xport p;
605         void *data;
606         int ret = -EBUSY, r;
607
608         __xsegbd_get(xsegbd_dev);
609         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
610                         xsegbd_dev->dst_portno, X_ALLOC);
611         if (!xreq)
612                 goto out;
613
614         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
615
616         init_completion(&comp);
617         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
618         if (blkreq_idx == Noneidx)
619                 goto out_put;
620         
621         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
622         pending->dev = xsegbd_dev;
623         pending->request = NULL;
624         pending->comp = &comp;
625
626         
627         xreq->priv = (uint64_t) blkreq_idx;
628
629         target = xseg_get_target(xsegbd_dev->xseg, xreq);
630         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
631         xreq->size = xreq->datalen;
632         xreq->offset = 0;
633         xreq->op = X_CLOSE;
634
635         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
636         p = xseg_submit(xsegbd_dev->xseg, xreq, 
637                                 xsegbd_dev->src_portno, X_ALLOC);
638         if ( p == NoPort) {
639                 XSEGLOG("couldn't submit request");
640                 BUG_ON(1);
641                 goto out_queue;
642         }
643         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
644         wait_for_completion_interruptible(&comp);
645         ret = 0;
646         if (xreq->state & XS_FAILED)
647                 XSEGLOG("Couldn't close disk on mapper");
648 out_put:
649         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
650 out:
651         __xsegbd_put(xsegbd_dev);
652         return ret;
653
654 out_queue:
655         pending->dev = NULL;
656         pending->comp = NULL;
657         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
658         
659         goto out;
660 }
661
662 static void xseg_callback(xport portno)
663 {
664         struct xsegbd_device *xsegbd_dev;
665         struct xseg_request *xreq;
666         struct request *blkreq;
667         struct xsegbd_pending *pending;
668         unsigned long flags;
669         xqindex blkreq_idx, ridx;
670         int err;
671         void *data;
672
673         xsegbd_dev  = __xsegbd_get_dev(portno);
674         if (!xsegbd_dev) {
675                 XSEGLOG("portno: %u has no xsegbd device assigned", portno);
676                 WARN_ON(1);
677                 return;
678         }
679
680         for (;;) {
681                 xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
682                 xreq = xseg_receive(xsegbd_dev->xseg, portno, 0);
683                 if (!xreq)
684                         break;
685
686 //              xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
687
688                 blkreq_idx = (xqindex) xreq->priv;
689                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
690                         WARN_ON(1);
691                         //FIXME maybe put request?
692                         continue;
693                 }
694
695                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
696                 if (pending->comp) {
697                         /* someone is blocking on this request
698                            and will handle it when we wake them up. */
699                         complete(pending->comp);
700                         /* the request is blocker's responsibility so
701                            we will not put_request(); */
702
703                         continue;
704                 }
705
706                 /* this is now treated as a block I/O request to end */
707                 blkreq = pending->request;
708                 pending->request = NULL;
709                 if (xsegbd_dev != pending->dev) {
710                         //FIXME maybe put request?
711                         XSEGLOG("xsegbd_dev != pending->dev");
712                         BUG_ON(1);
713                         continue;
714                 }
715                 pending->dev = NULL;
716                 if (!blkreq){
717                         //FIXME maybe put request?
718                         XSEGLOG("blkreq does not exist");
719                         BUG_ON(1);
720                         continue;
721                 }
722
723                 err = -EIO;
724                 if (!(xreq->state & XS_SERVED))
725                         goto blk_end;
726
727                 if (xreq->serviced != blk_rq_bytes(blkreq))
728                         goto blk_end;
729
730                 err = 0;
731                 if (!rq_data_dir(blkreq)){
732                         xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
733                 }       
734 blk_end:
735                 blk_end_request_all(blkreq, err);
736                 
737                 ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
738                                         blkreq_idx, xsegbd_dev->src_portno);
739                 if (ridx == Noneidx) {
740                         XSEGLOG("couldnt append blkreq_idx");
741                         WARN_ON(1);
742                 }
743
744                 if (xseg_put_request(xsegbd_dev->xseg, xreq, 
745                                                 xsegbd_dev->src_portno) < 0){
746                         XSEGLOG("couldn't put req");
747                         BUG_ON(1);
748                 }
749                 __xsegbd_put(xsegbd_dev);
750         }
751         if (xsegbd_dev) {
752                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
753                 xseg_request_fn(xsegbd_dev->blk_queue);
754                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
755                 __xsegbd_put(xsegbd_dev);
756         }
757 }
758
759
760 /* sysfs interface */
761
762 static struct bus_type xsegbd_bus_type = {
763         .name   = "xsegbd",
764 };
765
766 static ssize_t xsegbd_size_show(struct device *dev,
767                                         struct device_attribute *attr, char *buf)
768 {
769         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
770
771         return sprintf(buf, "%llu\n", (unsigned long long) xsegbd_dev->sectors * 512ULL);
772 }
773
774 static ssize_t xsegbd_major_show(struct device *dev,
775                                         struct device_attribute *attr, char *buf)
776 {
777         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
778
779         return sprintf(buf, "%d\n", xsegbd_dev->major);
780 }
781
782 static ssize_t xsegbd_srcport_show(struct device *dev,
783                                         struct device_attribute *attr, char *buf)
784 {
785         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
786
787         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->src_portno);
788 }
789
790 static ssize_t xsegbd_dstport_show(struct device *dev,
791                                         struct device_attribute *attr, char *buf)
792 {
793         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
794
795         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->dst_portno);
796 }
797
798 static ssize_t xsegbd_id_show(struct device *dev,
799                                         struct device_attribute *attr, char *buf)
800 {
801         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
802
803         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->id);
804 }
805
806 static ssize_t xsegbd_reqs_show(struct device *dev,
807                                         struct device_attribute *attr, char *buf)
808 {
809         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
810
811         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->nr_requests);
812 }
813
814 static ssize_t xsegbd_target_show(struct device *dev,
815                                         struct device_attribute *attr, char *buf)
816 {
817         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
818
819         return sprintf(buf, "%s\n", xsegbd_dev->target);
820 }
821
822 static ssize_t xsegbd_image_refresh(struct device *dev,
823                                         struct device_attribute *attr,
824                                         const char *buf,
825                                         size_t size)
826 {
827         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
828         int rc, ret = size;
829
830         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
831
832         rc = xsegbd_get_size(xsegbd_dev);
833         if (rc < 0) {
834                 ret = rc;
835                 goto out;
836         }
837
838         set_capacity(xsegbd_dev->gd, xsegbd_dev->sectors);
839
840 out:
841         mutex_unlock(&xsegbd_mutex);
842         return ret;
843 }
844
845 //FIXME
846 static ssize_t xsegbd_cleanup(struct device *dev,
847                                         struct device_attribute *attr,
848                                         const char *buf,
849                                         size_t size)
850 {
851         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
852         int ret = size, i;
853         struct request *blkreq = NULL;
854         struct xsegbd_pending *pending = NULL;
855         struct completion *comp = NULL;
856
857         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
858         xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, 
859                                 xsegbd_dev->src_portno);
860         for (i = 0; i < xsegbd_dev->nr_requests; i++) {
861                 if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
862                         pending = &xsegbd_dev->blk_req_pending[i];
863                         blkreq = pending->request;
864                         pending->request = NULL;
865                         comp = pending->comp;
866                         pending->comp = NULL;
867                         if (blkreq){
868                                 XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i);
869                                 blk_end_request_all(blkreq, -EIO);
870                         }
871                         if (comp){
872                                 XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i);
873                                 complete(comp);
874                         }
875                         __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
876                 }
877         }
878         xlock_release(&xsegbd_dev->blk_queue_pending.lock);
879
880         mutex_unlock(&xsegbd_mutex);
881         return ret;
882 }
883
884 static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
885 static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
886 static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
887 static DEVICE_ATTR(dstport, S_IRUGO, xsegbd_dstport_show, NULL);
888 static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL);
889 static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
890 static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
891 static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
892 static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
893
894 static struct attribute *xsegbd_attrs[] = {
895         &dev_attr_size.attr,
896         &dev_attr_major.attr,
897         &dev_attr_srcport.attr,
898         &dev_attr_dstport.attr,
899         &dev_attr_id.attr,
900         &dev_attr_reqs.attr,
901         &dev_attr_target.attr,
902         &dev_attr_refresh.attr,
903         &dev_attr_cleanup.attr,
904         NULL
905 };
906
907 static struct attribute_group xsegbd_attr_group = {
908         .attrs = xsegbd_attrs,
909 };
910
911 static const struct attribute_group *xsegbd_attr_groups[] = {
912         &xsegbd_attr_group,
913         NULL
914 };
915
916 static void xsegbd_sysfs_dev_release(struct device *dev)
917 {
918 }
919
920 static struct device_type xsegbd_device_type = {
921         .name           = "xsegbd",
922         .groups         = xsegbd_attr_groups,
923         .release        = xsegbd_sysfs_dev_release,
924 };
925
926 static void xsegbd_root_dev_release(struct device *dev)
927 {
928 }
929
930 static struct device xsegbd_root_dev = {
931         .init_name      = "xsegbd",
932         .release        = xsegbd_root_dev_release,
933 };
934
935 static int xsegbd_bus_add_dev(struct xsegbd_device *xsegbd_dev)
936 {
937         int ret = -ENOMEM;
938         struct device *dev;
939
940         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
941         dev = &xsegbd_dev->dev;
942
943         dev->bus = &xsegbd_bus_type;
944         dev->type = &xsegbd_device_type;
945         dev->parent = &xsegbd_root_dev;
946         dev->release = xsegbd_dev_release;
947         dev_set_name(dev, "%d", xsegbd_dev->id);
948
949         ret = device_register(dev);
950
951         mutex_unlock(&xsegbd_mutex);
952         return ret;
953 }
954
955 static void xsegbd_bus_del_dev(struct xsegbd_device *xsegbd_dev)
956 {
957         device_unregister(&xsegbd_dev->dev);
958 }
959
960 static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
961 {
962         struct xsegbd_device *xsegbd_dev;
963         struct xseg_port *port;
964         ssize_t ret = -ENOMEM;
965
966         if (!try_module_get(THIS_MODULE))
967                 return -ENODEV;
968
969         xsegbd_dev = kzalloc(sizeof(*xsegbd_dev), GFP_KERNEL);
970         if (!xsegbd_dev)
971                 goto out;
972
973         spin_lock_init(&xsegbd_dev->rqlock);
974         INIT_LIST_HEAD(&xsegbd_dev->node);
975         init_waitqueue_head(&xsegbd_dev->wq);
976         atomic_set(&xsegbd_dev->usercount, 0);
977
978         /* parse cmd */
979         if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
980                         "%d:%d:%d", xsegbd_dev->target, &xsegbd_dev->src_portno,
981                         &xsegbd_dev->dst_portno, &xsegbd_dev->nr_requests) < 3) {
982                 ret = -EINVAL;
983                 goto out_dev;
984         }
985         xsegbd_dev->targetlen = strlen(xsegbd_dev->target);
986
987         spin_lock(&xsegbd_devices_lock);
988         if (xsegbd_devices[xsegbd_dev->src_portno] != NULL) {
989                 ret = -EINVAL;
990                 goto out_unlock;
991         }
992         xsegbd_devices[xsegbd_dev->src_portno] = xsegbd_dev;
993         xsegbd_dev->id = xsegbd_dev->src_portno;
994         spin_unlock(&xsegbd_devices_lock);
995
996         XSEGLOG("registering block device major %d", major);
997         ret = register_blkdev(major, XSEGBD_NAME);
998         if (ret < 0) {
999                 XSEGLOG("cannot register block device!");
1000                 ret = -EBUSY;
1001                 goto out_delentry;
1002         }
1003         xsegbd_dev->major = ret;
1004         XSEGLOG("registered block device major %d", xsegbd_dev->major);
1005
1006         ret = xsegbd_bus_add_dev(xsegbd_dev);
1007         if (ret)
1008                 goto out_blkdev;
1009
1010         if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending, 
1011                                 xsegbd_dev->nr_requests,
1012                                 xsegbd_dev->nr_requests))
1013                 goto out_bus;
1014
1015         xsegbd_dev->blk_req_pending = kzalloc(
1016                         xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
1017                                    GFP_KERNEL);
1018         if (!xsegbd_dev->blk_req_pending)
1019                 goto out_freeq;
1020
1021         
1022         XSEGLOG("joining segment");
1023         //FIXME use xsebd module config for now
1024         xsegbd_dev->xseg = xseg_join(   xsegbd.config.type,
1025                                         xsegbd.config.name,
1026                                         "segdev",
1027                                         xseg_callback           );
1028         if (!xsegbd_dev->xseg)
1029                 goto out_freepending;
1030         __sync_synchronize();
1031         
1032         XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target,
1033                         xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
1034         port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
1035         if (!port) {
1036                 XSEGLOG("cannot bind to port");
1037                 ret = -EFAULT;
1038
1039                 goto out_xseg;
1040         }
1041         
1042         if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
1043                 XSEGLOG("portno != xsegbd_dev->src_portno");
1044                 BUG_ON(1);
1045                 ret = -EFAULT;
1046                 goto out_xseg;
1047         }
1048         xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
1049
1050
1051         /* make sure we don't get any requests until we're ready to handle them */
1052         xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1053
1054         ret = xsegbd_dev_init(xsegbd_dev);
1055         if (ret)
1056                 goto out_signal;
1057
1058         xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1059         return count;
1060
1061 out_signal:
1062         xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
1063 out_xseg:
1064         xseg_leave(xsegbd_dev->xseg);
1065         
1066 out_freepending:
1067         kfree(xsegbd_dev->blk_req_pending);
1068
1069 out_freeq:
1070         xq_free(&xsegbd_dev->blk_queue_pending);
1071
1072 out_bus:
1073         xsegbd_bus_del_dev(xsegbd_dev);
1074         return ret;
1075
1076 out_blkdev:
1077         unregister_blkdev(xsegbd_dev->major, XSEGBD_NAME);
1078
1079 out_delentry:
1080         spin_lock(&xsegbd_devices_lock);
1081         xsegbd_devices[xsegbd_dev->src_portno] = NULL;
1082
1083 out_unlock:
1084         spin_unlock(&xsegbd_devices_lock);
1085
1086 out_dev:
1087         kfree(xsegbd_dev);
1088
1089 out:
1090         return ret;
1091 }
1092
1093 static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count)
1094 {
1095         struct xsegbd_device *xsegbd_dev = NULL;
1096         int id, ret;
1097         unsigned long ul_id;
1098
1099         ret = strict_strtoul(buf, 10, &ul_id);
1100         if (ret)
1101                 return ret;
1102
1103         id = (int) ul_id;
1104         if (id != ul_id)
1105                 return -EINVAL;
1106
1107         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
1108
1109         ret = count;
1110         xsegbd_dev = __xsegbd_get_dev(id);
1111         if (!xsegbd_dev) {
1112                 ret = -ENOENT;
1113                 goto out_unlock;
1114         }
1115         __xsegbd_put(xsegbd_dev);
1116         xsegbd_bus_del_dev(xsegbd_dev);
1117
1118 out_unlock:
1119         mutex_unlock(&xsegbd_mutex);
1120         return ret;
1121 }
1122
1123 static struct bus_attribute xsegbd_bus_attrs[] = {
1124         __ATTR(add, S_IWUSR, NULL, xsegbd_add),
1125         __ATTR(remove, S_IWUSR, NULL, xsegbd_remove),
1126         __ATTR_NULL
1127 };
1128
1129 static int xsegbd_sysfs_init(void)
1130 {
1131         int ret;
1132
1133         ret = device_register(&xsegbd_root_dev);
1134         if (ret < 0)
1135                 return ret;
1136
1137         xsegbd_bus_type.bus_attrs = xsegbd_bus_attrs;
1138         ret = bus_register(&xsegbd_bus_type);
1139         if (ret < 0)
1140                 device_unregister(&xsegbd_root_dev);
1141
1142         return ret;
1143 }
1144
1145 static void xsegbd_sysfs_cleanup(void)
1146 {
1147         bus_unregister(&xsegbd_bus_type);
1148         device_unregister(&xsegbd_root_dev);
1149 }
1150
1151 /* *************************** */
1152 /* ** Module Initialization ** */
1153 /* *************************** */
1154
1155 static int __init xsegbd_init(void)
1156 {
1157         int ret = -ENOMEM;
1158         xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL);
1159         if (!xsegbd_devices)
1160                 goto out;
1161
1162         spin_lock_init(&xsegbd_devices_lock);
1163
1164         ret = -ENOSYS;
1165         ret = xsegbd_xseg_init();
1166         if (ret)
1167                 goto out_free;
1168
1169         ret = xsegbd_sysfs_init();
1170         if (ret)
1171                 goto out_xseg;
1172
1173         XSEGLOG("initialization complete");
1174
1175 out:
1176         return ret;
1177
1178 out_xseg:
1179         xsegbd_xseg_quit();
1180         
1181 out_free:
1182         kfree(xsegbd_devices);
1183
1184         goto out;
1185 }
1186
1187 static void __exit xsegbd_exit(void)
1188 {
1189         xsegbd_sysfs_cleanup();
1190         xsegbd_xseg_quit();
1191 }
1192
1193 module_init(xsegbd_init);
1194 module_exit(xsegbd_exit);
1195