add snapshot functionality in xseg-tool
[archipelago] / xseg / peers / kernel / xsegbd.c
1 /*
2  * Copyright (C) 2012 GRNET S.A.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  * 02110-1301, USA.
18  */
19
20 /* xsegbd.c
21  *
22  */
23
24 #include <linux/module.h>
25 #include <linux/moduleparam.h>
26 #include <linux/init.h>
27 #include <linux/sched.h>
28 #include <linux/kernel.h>
29 #include <linux/slab.h>
30 #include <linux/fs.h>
31 #include <linux/errno.h>
32 #include <linux/timer.h>
33 #include <linux/types.h>
34 #include <linux/vmalloc.h>
35 #include <linux/genhd.h>
36 #include <linux/blkdev.h>
37 #include <linux/bio.h>
38 #include <linux/device.h>
39 #include <linux/completion.h>
40 #include <linux/wait.h>
41 #include <sys/kernel/segdev.h>
42 #include "xsegbd.h"
43 #include <xseg/protocol.h>
44
45 #define XSEGBD_MINORS 1
46 /* define max request size to be used in xsegbd */
47 #define XSEGBD_MAX_REQUEST_SIZE 4194304U
48
49 MODULE_DESCRIPTION("xsegbd");
50 MODULE_AUTHOR("XSEG");
51 MODULE_LICENSE("GPL");
52
53 static long sector_size = 0;
54 static long blksize = 512;
55 static int major = 0;
56 static int max_dev = 200;
57 static long start_portno = 0;
58 static long end_portno = 199;
59 static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd";
60 static char spec[256] = "segdev:xsegbd:512:1024:12";
61
62 module_param(sector_size, long, 0644);
63 module_param(blksize, long, 0644);
64 module_param(start_portno, long, 0644);
65 module_param(end_portno, long, 0644);
66 module_param(major, int, 0644);
67 module_param_string(name, name, sizeof(name), 0644);
68 module_param_string(spec, spec, sizeof(spec), 0644);
69
70 static struct xsegbd xsegbd;
71 static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
72 static DEFINE_MUTEX(xsegbd_mutex);
73 static DEFINE_SPINLOCK(xsegbd_devices_lock);
74
75
76 struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
77 {
78         struct xsegbd_device *xsegbd_dev = NULL;
79
80         spin_lock(&xsegbd_devices_lock);
81         xsegbd_dev = xsegbd_devices[id];
82         spin_unlock(&xsegbd_devices_lock);
83
84         return xsegbd_dev;
85 }
86
87 static int src_portno_to_id(xport src_portno)
88 {
89         return (src_portno - start_portno);
90 }
91
92 /* ************************* */
93 /* ***** sysfs helpers ***** */
94 /* ************************* */
95
96 static struct xsegbd_device *dev_to_xsegbd(struct device *dev)
97 {
98         return container_of(dev, struct xsegbd_device, dev);
99 }
100
101 static struct device *xsegbd_get_dev(struct xsegbd_device *xsegbd_dev)
102 {
103         /* FIXME */
104         return get_device(&xsegbd_dev->dev);
105 }
106
107 static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev)
108 {
109         put_device(&xsegbd_dev->dev);
110 }
111
112 /* ************************* */
113 /* ** XSEG Initialization ** */
114 /* ************************* */
115
116 static void xseg_callback(uint32_t portno);
117
118 int xsegbd_xseg_init(void)
119 {
120         int r;
121
122         if (!xsegbd.name[0])
123                 strncpy(xsegbd.name, name, XSEGBD_SEGMENT_NAMELEN);
124
125         r = xseg_initialize();
126         if (r) {
127                 XSEGLOG("cannot initialize 'segdev' peer");
128                 goto err;
129         }
130
131         r = xseg_parse_spec(spec, &xsegbd.config);
132         if (r)
133                 goto err;
134
135         if (strncmp(xsegbd.config.type, "segdev", 16))
136                 XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'",
137                          xsegbd.config.type);
138
139         /* leave it here for now */
140         XSEGLOG("joining segment");
141         xsegbd.xseg = xseg_join(        xsegbd.config.type,
142                                         xsegbd.config.name,
143                                         "segdev",
144                                         xseg_callback           );
145         if (!xsegbd.xseg) {
146                 XSEGLOG("cannot find segment");
147                 r = -ENODEV;
148                 goto err;
149         }
150
151         return 0;
152 err:
153         return r;
154
155 }
156
157 int xsegbd_xseg_quit(void)
158 {
159         struct segdev *segdev;
160
161         /* make sure to unmap the segment first */
162         segdev = segdev_get(0);
163         clear_bit(SEGDEV_RESERVED, &segdev->flags);
164         xsegbd.xseg->priv->segment_type.ops.unmap(xsegbd.xseg, xsegbd.xseg->segment_size);
165         segdev_put(segdev);
166
167         return 0;
168 }
169
170
171 /* ***************************** */
172 /* ** Block Device Operations ** */
173 /* ***************************** */
174
175 static int xsegbd_open(struct block_device *bdev, fmode_t mode)
176 {
177         struct gendisk *disk = bdev->bd_disk;
178         struct xsegbd_device *xsegbd_dev = disk->private_data;
179
180         xsegbd_get_dev(xsegbd_dev);
181
182         return 0;
183 }
184
185 static int xsegbd_release(struct gendisk *gd, fmode_t mode)
186 {
187         struct xsegbd_device *xsegbd_dev = gd->private_data;
188
189         xsegbd_put_dev(xsegbd_dev);
190
191         return 0;
192 }
193
194 static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode,
195                         unsigned int cmd, unsigned long arg)
196 {
197         return -ENOTTY;
198 }
199
200 static const struct block_device_operations xsegbd_ops = {
201         .owner          = THIS_MODULE,
202         .open           = xsegbd_open,
203         .release        = xsegbd_release,
204         .ioctl          = xsegbd_ioctl 
205 };
206
207
208 /* *************************** */
209 /* ** Device Initialization ** */
210 /* *************************** */
211
212 static void xseg_request_fn(struct request_queue *rq);
213 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev);
214 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev);
215
216 static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
217 {
218         int ret = -ENOMEM;
219         struct gendisk *disk;
220         unsigned int max_request_size_bytes;
221
222         spin_lock_init(&xsegbd_dev->rqlock);
223
224         xsegbd_dev->xsegbd = &xsegbd;
225
226         /* allocates and initializes queue */
227         xsegbd_dev->blk_queue = blk_init_queue(xseg_request_fn, &xsegbd_dev->rqlock);
228         if (!xsegbd_dev->blk_queue)
229                 goto out;
230
231         xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
232
233         blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
234         blk_queue_logical_block_size(xsegbd_dev->blk_queue, 512);
235         blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
236         blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
237         
238
239         max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
240         blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
241 //      blk_queue_max_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 10);
242         blk_queue_max_segments(xsegbd_dev->blk_queue, 1024);
243         blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
244         blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
245         blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
246
247         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue);
248
249         /* vkoukis says we don't need partitions */
250         xsegbd_dev->gd = disk = alloc_disk(XSEGBD_MINORS);
251         if (!disk)
252                 goto out;
253
254         disk->major = xsegbd_dev->major;
255         disk->first_minor = xsegbd_dev->id * XSEGBD_MINORS;
256         disk->fops = &xsegbd_ops;
257         disk->queue = xsegbd_dev->blk_queue;
258         disk->private_data = xsegbd_dev;
259         disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO;
260         snprintf(disk->disk_name, 32, "xsegbd%u", xsegbd_dev->id);
261
262         ret = 0;
263
264         /* allow a non-zero sector_size parameter to override the disk size */
265         if (sector_size)
266                 xsegbd_dev->sectors = sector_size;
267         else {
268                 ret = xsegbd_get_size(xsegbd_dev);
269                 if (ret)
270                         goto out;
271         }
272
273         set_capacity(disk, xsegbd_dev->sectors);
274         XSEGLOG("xsegbd active...");
275         add_disk(disk); /* immediately activates the device */
276
277 out:
278         /* on error, everything is cleaned up in xsegbd_dev_release */
279         return ret;
280 }
281
282 static void xsegbd_dev_release(struct device *dev)
283 {
284         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
285
286
287         /* cleanup gendisk and blk_queue the right way */
288         if (xsegbd_dev->gd) {
289                 if (xsegbd_dev->gd->flags & GENHD_FL_UP)
290                         del_gendisk(xsegbd_dev->gd);
291
292                 xsegbd_mapclose(xsegbd_dev);
293         }
294
295         spin_lock(&xsegbd_devices_lock);
296         BUG_ON(xsegbd_devices[xsegbd_dev->id] != xsegbd_dev);
297         xsegbd_devices[xsegbd_dev->id] = NULL;
298         spin_unlock(&xsegbd_devices_lock);
299
300         XSEGLOG("releasing id: %d", xsegbd_dev->id);
301 //      xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
302         xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
303
304         if (xsegbd_dev->blk_queue)
305                 blk_cleanup_queue(xsegbd_dev->blk_queue);
306         if (xsegbd_dev->gd)
307                 put_disk(xsegbd_dev->gd);
308
309 //      if (xseg_free_requests(xsegbd_dev->xseg,
310 //                      xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
311 //              XSEGLOG("Error trying to free requests!\n");
312
313         if (xsegbd_dev->xseg){
314                 xseg_leave(xsegbd_dev->xseg);
315                 xsegbd_dev->xseg = NULL;
316         }
317
318         if (xsegbd_dev->blk_req_pending){
319                 kfree(xsegbd_dev->blk_req_pending);
320                 xsegbd_dev->blk_req_pending = NULL;
321         }
322         xq_free(&xsegbd_dev->blk_queue_pending);
323         kfree(xsegbd_dev);
324         module_put(THIS_MODULE);
325 }
326
327 /* ******************* */
328 /* ** Critical Path ** */
329 /* ******************* */
330
331 static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq,
332                         struct request *blkreq)
333 {
334         struct bio_vec *bvec;
335         struct req_iterator iter;
336         uint64_t off = 0;
337         char *data = xseg_get_data(xseg, xreq);
338         rq_for_each_segment(bvec, blkreq, iter) {
339                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
340                 memcpy(data + off, bdata, bvec->bv_len);
341                 off += bvec->bv_len;
342                 kunmap_atomic(bdata);
343         }
344 }
345
346 static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq,
347                         struct request *blkreq)
348 {
349         struct bio_vec *bvec;
350         struct req_iterator iter;
351         uint64_t off = 0;
352         char *data = xseg_get_data(xseg, xreq);
353         rq_for_each_segment(bvec, blkreq, iter) {
354                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
355                 memcpy(bdata, data + off, bvec->bv_len);
356                 off += bvec->bv_len;
357                 kunmap_atomic(bdata);
358         }
359 }
360
361 static void xseg_request_fn(struct request_queue *rq)
362 {
363         struct xseg_request *xreq;
364         struct xsegbd_device *xsegbd_dev = rq->queuedata;
365         struct request *blkreq;
366         struct xsegbd_pending *pending;
367         xqindex blkreq_idx;
368         char *target;
369         uint64_t datalen;
370         xport p;
371         int r;
372         unsigned long flags;
373
374         spin_unlock_irq(&xsegbd_dev->rqlock);
375         for (;;) {
376                 if (current_thread_info()->preempt_count || irqs_disabled()){
377                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
378                                         current_thread_info()->preempt_count, irqs_disabled());
379                 }
380                 //XSEGLOG("Priority: %d", current_thread_info()->task->prio);
381                 //XSEGLOG("Static priority: %d", current_thread_info()->task->static_prio);
382                 //XSEGLOG("Normal priority: %d", current_thread_info()->task->normal_prio);
383                 //XSEGLOG("Rt_priority: %u", current_thread_info()->task->rt_priority);
384                 blkreq_idx = Noneidx;
385                 xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, 
386                                 xsegbd_dev->dst_portno, X_ALLOC);
387                 if (!xreq)
388                         break;
389
390                 blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 
391                                                 xsegbd_dev->src_portno);
392                 if (blkreq_idx == Noneidx)
393                         break;
394
395                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
396                         XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
397                         BUG_ON(1);
398                         break;
399                 }
400
401
402                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
403                 blkreq = blk_fetch_request(rq);
404                 if (!blkreq){
405                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
406                         break;
407                 }
408
409                 if (blkreq->cmd_type != REQ_TYPE_FS) {
410                         //FIXME we lose xreq here
411                         XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
412                         __blk_end_request_all(blkreq, 0);
413                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
414                         continue;
415                 }
416                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
417                 if (current_thread_info()->preempt_count || irqs_disabled()){
418                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
419                                         current_thread_info()->preempt_count, irqs_disabled());
420                 }
421
422                 datalen = blk_rq_bytes(blkreq);
423                 r = xseg_prep_request(xsegbd_dev->xseg, xreq, 
424                                         xsegbd_dev->targetlen, datalen);
425                 if (r < 0) {
426                         XSEGLOG("couldn't prep request");
427                         blk_end_request_err(blkreq, r);
428                         BUG_ON(1);
429                         break;
430                 }
431                 r = -ENOMEM;
432                 if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
433                         XSEGLOG("malformed req buffers");
434                         blk_end_request_err(blkreq, r);
435                         BUG_ON(1);
436                         break;
437                 }
438
439                 target = xseg_get_target(xsegbd_dev->xseg, xreq);
440                 strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
441
442                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
443                 pending->dev = xsegbd_dev;
444                 pending->request = blkreq;
445                 pending->comp = NULL;
446
447                 xreq->size = datalen;
448                 xreq->offset = blk_rq_pos(blkreq) << 9;
449                 xreq->priv = (uint64_t) blkreq_idx;
450
451                 /*
452                 if (xreq->offset >= (sector_size << 9))
453                         XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
454                                  blk_rq_pos(blkreq), sector_size,
455                                  blkreq->cmd_flags & REQ_FLUSH,
456                                  blkreq->cmd_flags & REQ_FUA);
457                 */
458
459                 if (blkreq->cmd_flags & REQ_FLUSH)
460                         xreq->flags |= XF_FLUSH;
461
462                 if (blkreq->cmd_flags & REQ_FUA)
463                         xreq->flags |= XF_FUA;
464
465                 if (rq_data_dir(blkreq)) {
466                         blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
467                         xreq->op = X_WRITE;
468                 } else {
469                         xreq->op = X_READ;
470                 }
471
472
473 //              XSEGLOG("%s : %lu (%lu)", xsegbd_dev->target, xreq->offset, xreq->datalen);
474                 r = -EIO;
475                 p = xseg_submit(xsegbd_dev->xseg, xreq, 
476                                         xsegbd_dev->src_portno, X_ALLOC);
477                 if (p == NoPort) {
478                         XSEGLOG("coundn't submit req");
479                         WARN_ON(1);
480                         blk_end_request_err(blkreq, r);
481                         break;
482                 }
483                 WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
484         }
485         if (xreq)
486                 BUG_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
487                                         xsegbd_dev->src_portno) == -1);
488         if (blkreq_idx != Noneidx)
489                 BUG_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
490                                 blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
491         spin_lock_irq(&xsegbd_dev->rqlock);
492 }
493
494 int update_dev_sectors_from_request(    struct xsegbd_device *xsegbd_dev,
495                                         struct xseg_request *xreq       )
496 {
497         void *data;
498         if (!xreq) {
499                 XSEGLOG("Invalid xreq");
500                 return -EIO;
501         }
502
503         if (xreq->state & XS_FAILED)
504                 return -ENOENT;
505
506         if (!(xreq->state & XS_SERVED))
507                 return -EIO;
508
509         data = xseg_get_data(xsegbd_dev->xseg, xreq);
510         if (!data) {
511                 XSEGLOG("Invalid req data");
512                 return -EIO;
513         }
514         if (!xsegbd_dev) {
515                 XSEGLOG("Invalid xsegbd_dev");
516                 return -ENOENT;
517         }
518         xsegbd_dev->sectors = *((uint64_t *) data) / 512ULL;
519         return 0;
520 }
521
522 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
523 {
524         struct xseg_request *xreq;
525         char *target;
526         xqindex blkreq_idx;
527         struct xsegbd_pending *pending;
528         struct completion comp;
529         xport p;
530         int ret = -EBUSY;
531
532         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
533                         xsegbd_dev->dst_portno, X_ALLOC);
534         if (!xreq)
535                 goto out;
536
537         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 
538                                 sizeof(struct xseg_reply_info)));
539
540         init_completion(&comp);
541         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
542         if (blkreq_idx == Noneidx)
543                 goto out_put;
544
545         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
546         pending->dev = xsegbd_dev;
547         pending->request = NULL;
548         pending->comp = &comp;
549
550
551         xreq->priv = (uint64_t) blkreq_idx;
552
553         target = xseg_get_target(xsegbd_dev->xseg, xreq);
554         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
555         xreq->size = xreq->datalen;
556         xreq->offset = 0;
557         xreq->op = X_INFO;
558
559         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
560         p = xseg_submit(xsegbd_dev->xseg, xreq,
561                                 xsegbd_dev->src_portno, X_ALLOC);
562         if ( p == NoPort) {
563                 XSEGLOG("couldn't submit request");
564                 BUG_ON(1);
565                 goto out_queue;
566         }
567         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
568         XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
569         wait_for_completion_interruptible(&comp);
570         XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
571         ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
572         XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
573
574 out_queue:
575         pending->dev = NULL;
576         pending->comp = NULL;
577         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
578 out_put:
579         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
580 out:
581         return ret;
582 }
583
584 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
585 {
586         struct xseg_request *xreq;
587         char *target;
588         xqindex blkreq_idx;
589         struct xsegbd_pending *pending;
590         struct completion comp;
591         xport p;
592         int ret = -EBUSY;
593
594         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
595                         xsegbd_dev->dst_portno, X_ALLOC);
596         if (!xreq)
597                 goto out;
598
599         BUG_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
600
601         init_completion(&comp);
602         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
603         if (blkreq_idx == Noneidx)
604                 goto out_put;
605
606         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
607         pending->dev = xsegbd_dev;
608         pending->request = NULL;
609         pending->comp = &comp;
610
611
612         xreq->priv = (uint64_t) blkreq_idx;
613
614         target = xseg_get_target(xsegbd_dev->xseg, xreq);
615         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
616         xreq->size = xreq->datalen;
617         xreq->offset = 0;
618         xreq->op = X_CLOSE;
619
620         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
621         p = xseg_submit(xsegbd_dev->xseg, xreq, 
622                                 xsegbd_dev->src_portno, X_ALLOC);
623         if ( p == NoPort) {
624                 XSEGLOG("couldn't submit request");
625                 BUG_ON(1);
626                 goto out_queue;
627         }
628         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
629         wait_for_completion_interruptible(&comp);
630         ret = 0;
631         if (xreq->state & XS_FAILED)
632                 XSEGLOG("Couldn't close disk on mapper");
633
634 out_queue:
635         pending->dev = NULL;
636         pending->comp = NULL;
637         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
638 out_put:
639         BUG_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
640 out:
641         return ret;
642 }
643
644 static void xseg_callback(xport portno)
645 {
646         struct xsegbd_device *xsegbd_dev;
647         struct xseg_request *xreq;
648         struct request *blkreq;
649         struct xsegbd_pending *pending;
650         unsigned long flags;
651         xqindex blkreq_idx, ridx;
652         int err;
653
654         xsegbd_dev  = __xsegbd_get_dev(portno);
655         if (!xsegbd_dev) {
656                 XSEGLOG("portno: %u has no xsegbd device assigned", portno);
657                 WARN_ON(1);
658                 return;
659         }
660
661         for (;;) {
662                 xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
663                 xreq = xseg_receive(xsegbd_dev->xseg, portno, 0);
664                 if (!xreq)
665                         break;
666
667 //              xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
668
669                 blkreq_idx = (xqindex) xreq->priv;
670                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
671                         WARN_ON(1);
672                         //FIXME maybe put request?
673                         continue;
674                 }
675
676                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
677                 if (pending->comp) {
678                         /* someone is blocking on this request
679                            and will handle it when we wake them up. */
680                         complete(pending->comp);
681                         /* the request is blocker's responsibility so
682                            we will not put_request(); */
683                         continue;
684                 }
685
686                 /* this is now treated as a block I/O request to end */
687                 blkreq = pending->request;
688                 pending->request = NULL;
689                 if (xsegbd_dev != pending->dev) {
690                         //FIXME maybe put request?
691                         XSEGLOG("xsegbd_dev != pending->dev");
692                         WARN_ON(1);
693                         continue;
694                 }
695                 pending->dev = NULL;
696                 if (!blkreq){
697                         //FIXME maybe put request?
698                         XSEGLOG("blkreq does not exist");
699                         WARN_ON(1);
700                         continue;
701                 }
702
703                 err = -EIO;
704                 if (!(xreq->state & XS_SERVED))
705                         goto blk_end;
706
707                 if (xreq->serviced != blk_rq_bytes(blkreq))
708                         goto blk_end;
709
710                 err = 0;
711                 if (!rq_data_dir(blkreq)){
712                         xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
713                 }
714 blk_end:
715                 blk_end_request_all(blkreq, err);
716
717                 ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
718                                         blkreq_idx, xsegbd_dev->src_portno);
719                 if (ridx == Noneidx) {
720                         XSEGLOG("couldnt append blkreq_idx");
721                         WARN_ON(1);
722                 }
723
724                 if (xseg_put_request(xsegbd_dev->xseg, xreq, 
725                                                 xsegbd_dev->src_portno) < 0){
726                         XSEGLOG("couldn't put req");
727                         BUG_ON(1);
728                 }
729         }
730         if (xsegbd_dev) {
731                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
732                 xseg_request_fn(xsegbd_dev->blk_queue);
733                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
734         }
735 }
736
737
738 /* sysfs interface */
739
740 static struct bus_type xsegbd_bus_type = {
741         .name   = "xsegbd",
742 };
743
744 static ssize_t xsegbd_size_show(struct device *dev,
745                                         struct device_attribute *attr, char *buf)
746 {
747         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
748
749         return sprintf(buf, "%llu\n", (unsigned long long) xsegbd_dev->sectors * 512ULL);
750 }
751
752 static ssize_t xsegbd_major_show(struct device *dev,
753                                         struct device_attribute *attr, char *buf)
754 {
755         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
756
757         return sprintf(buf, "%d\n", xsegbd_dev->major);
758 }
759
760 static ssize_t xsegbd_srcport_show(struct device *dev,
761                                         struct device_attribute *attr, char *buf)
762 {
763         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
764
765         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->src_portno);
766 }
767
768 static ssize_t xsegbd_dstport_show(struct device *dev,
769                                         struct device_attribute *attr, char *buf)
770 {
771         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
772
773         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->dst_portno);
774 }
775
776 static ssize_t xsegbd_id_show(struct device *dev,
777                                         struct device_attribute *attr, char *buf)
778 {
779         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
780
781         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->id);
782 }
783
784 static ssize_t xsegbd_reqs_show(struct device *dev,
785                                         struct device_attribute *attr, char *buf)
786 {
787         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
788
789         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->nr_requests);
790 }
791
792 static ssize_t xsegbd_target_show(struct device *dev,
793                                         struct device_attribute *attr, char *buf)
794 {
795         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
796
797         return sprintf(buf, "%s\n", xsegbd_dev->target);
798 }
799
800 static ssize_t xsegbd_image_refresh(struct device *dev,
801                                         struct device_attribute *attr,
802                                         const char *buf,
803                                         size_t size)
804 {
805         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
806         int rc, ret = size;
807
808         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
809
810         rc = xsegbd_get_size(xsegbd_dev);
811         if (rc < 0) {
812                 ret = rc;
813                 goto out;
814         }
815
816         set_capacity(xsegbd_dev->gd, xsegbd_dev->sectors);
817
818 out:
819         mutex_unlock(&xsegbd_mutex);
820         return ret;
821 }
822
823 //FIXME
824 //maybe try callback, first and then do a more invasive cleanup
825 static ssize_t xsegbd_cleanup(struct device *dev,
826                                         struct device_attribute *attr,
827                                         const char *buf,
828                                         size_t size)
829 {
830         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
831         int ret = size, i;
832         struct request *blkreq = NULL;
833         struct xsegbd_pending *pending = NULL;
834         struct completion *comp = NULL;
835
836         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
837         xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, 
838                                 xsegbd_dev->src_portno);
839         for (i = 0; i < xsegbd_dev->nr_requests; i++) {
840                 if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
841                         pending = &xsegbd_dev->blk_req_pending[i];
842                         blkreq = pending->request;
843                         pending->request = NULL;
844                         comp = pending->comp;
845                         pending->comp = NULL;
846                         if (blkreq){
847                                 XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i);
848                                 blk_end_request_all(blkreq, -EIO);
849                         }
850                         if (comp){
851                                 XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i);
852                                 complete(comp);
853                         }
854                         __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
855                 }
856         }
857         xlock_release(&xsegbd_dev->blk_queue_pending.lock);
858
859         mutex_unlock(&xsegbd_mutex);
860         return ret;
861 }
862
863 static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
864 static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
865 static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
866 static DEVICE_ATTR(dstport, S_IRUGO, xsegbd_dstport_show, NULL);
867 static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL);
868 static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
869 static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
870 static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
871 static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
872
873 static struct attribute *xsegbd_attrs[] = {
874         &dev_attr_size.attr,
875         &dev_attr_major.attr,
876         &dev_attr_srcport.attr,
877         &dev_attr_dstport.attr,
878         &dev_attr_id.attr,
879         &dev_attr_reqs.attr,
880         &dev_attr_target.attr,
881         &dev_attr_refresh.attr,
882         &dev_attr_cleanup.attr,
883         NULL
884 };
885
886 static struct attribute_group xsegbd_attr_group = {
887         .attrs = xsegbd_attrs,
888 };
889
890 static const struct attribute_group *xsegbd_attr_groups[] = {
891         &xsegbd_attr_group,
892         NULL
893 };
894
895 static void xsegbd_sysfs_dev_release(struct device *dev)
896 {
897 }
898
899 static struct device_type xsegbd_device_type = {
900         .name           = "xsegbd",
901         .groups         = xsegbd_attr_groups,
902         .release        = xsegbd_sysfs_dev_release,
903 };
904
905 static void xsegbd_root_dev_release(struct device *dev)
906 {
907 }
908
909 static struct device xsegbd_root_dev = {
910         .init_name      = "xsegbd",
911         .release        = xsegbd_root_dev_release,
912 };
913
914 static int xsegbd_bus_add_dev(struct xsegbd_device *xsegbd_dev)
915 {
916         int ret = -ENOMEM;
917         struct device *dev;
918
919         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
920         dev = &xsegbd_dev->dev;
921
922         dev->bus = &xsegbd_bus_type;
923         dev->type = &xsegbd_device_type;
924         dev->parent = &xsegbd_root_dev;
925         dev->release = xsegbd_dev_release;
926         dev_set_name(dev, "%d", xsegbd_dev->id);
927
928         ret = device_register(dev);
929
930         mutex_unlock(&xsegbd_mutex);
931         return ret;
932 }
933
934 static void xsegbd_bus_del_dev(struct xsegbd_device *xsegbd_dev)
935 {
936         device_unregister(&xsegbd_dev->dev);
937 }
938
939 static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
940 {
941         struct xsegbd_device *xsegbd_dev;
942         struct xseg_port *port;
943         ssize_t ret = -ENOMEM;
944
945         if (!try_module_get(THIS_MODULE))
946                 return -ENODEV;
947
948         xsegbd_dev = kzalloc(sizeof(*xsegbd_dev), GFP_KERNEL);
949         if (!xsegbd_dev)
950                 goto out;
951
952         spin_lock_init(&xsegbd_dev->rqlock);
953         INIT_LIST_HEAD(&xsegbd_dev->node);
954
955         /* parse cmd */
956         if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
957                         "%d:%d:%d", xsegbd_dev->target, &xsegbd_dev->src_portno,
958                         &xsegbd_dev->dst_portno, &xsegbd_dev->nr_requests) < 3) {
959                 ret = -EINVAL;
960                 goto out_dev;
961         }
962         xsegbd_dev->targetlen = strlen(xsegbd_dev->target);
963
964         if (xsegbd_dev->src_portno < start_portno || xsegbd_dev->src_portno > end_portno){
965                 XSEGLOG("Invadid portno");
966                 ret = -EINVAL;
967                 goto out_dev;
968         }
969         xsegbd_dev->id = src_portno_to_id(xsegbd_dev->src_portno);
970
971         spin_lock(&xsegbd_devices_lock);
972         if (xsegbd_devices[xsegbd_dev->id] != NULL) {
973                 ret = -EINVAL;
974                 goto out_unlock;
975         }
976         xsegbd_devices[xsegbd_dev->id] = xsegbd_dev;
977         spin_unlock(&xsegbd_devices_lock);
978
979         xsegbd_dev->major = major;
980
981         ret = xsegbd_bus_add_dev(xsegbd_dev);
982         if (ret)
983                 goto out_delentry;
984
985         if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending,
986                                 xsegbd_dev->nr_requests,
987                                 xsegbd_dev->nr_requests))
988                 goto out_bus;
989
990         xsegbd_dev->blk_req_pending = kzalloc(
991                         xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
992                                    GFP_KERNEL);
993         if (!xsegbd_dev->blk_req_pending)
994                 goto out_bus;
995
996
997         XSEGLOG("joining segment");
998         //FIXME use xsebd module config for now
999         xsegbd_dev->xseg = xseg_join(   xsegbd.config.type,
1000                                         xsegbd.config.name,
1001                                         "segdev",
1002                                         xseg_callback           );
1003         if (!xsegbd_dev->xseg)
1004                 goto out_bus;
1005
1006         XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target,
1007                         xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
1008         port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
1009         if (!port) {
1010                 XSEGLOG("cannot bind to port");
1011                 ret = -EFAULT;
1012
1013                 goto out_bus;
1014         }
1015         
1016         if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
1017                 XSEGLOG("portno != xsegbd_dev->src_portno");
1018                 BUG_ON(1);
1019                 ret = -EFAULT;
1020                 goto out_bus;
1021         }
1022         xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
1023
1024
1025         /* make sure we don't get any requests until we're ready to handle them */
1026         xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1027
1028         ret = xsegbd_dev_init(xsegbd_dev);
1029         if (ret)
1030                 goto out_bus;
1031
1032         xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1033         return count;
1034
1035 out_bus:
1036         xsegbd_bus_del_dev(xsegbd_dev);
1037         return ret;
1038
1039 out_delentry:
1040         spin_lock(&xsegbd_devices_lock);
1041         xsegbd_devices[xsegbd_dev->id] = NULL;
1042
1043 out_unlock:
1044         spin_unlock(&xsegbd_devices_lock);
1045
1046 out_dev:
1047         kfree(xsegbd_dev);
1048
1049 out:
1050         module_put(THIS_MODULE);
1051         return ret;
1052 }
1053
1054 static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count)
1055 {
1056         struct xsegbd_device *xsegbd_dev = NULL;
1057         int id, ret;
1058         unsigned long ul_id;
1059
1060         ret = strict_strtoul(buf, 10, &ul_id);
1061         if (ret)
1062                 return ret;
1063
1064         id = (int) ul_id;
1065         if (id != ul_id)
1066                 return -EINVAL;
1067
1068         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
1069
1070         ret = count;
1071         xsegbd_dev = __xsegbd_get_dev(id);
1072         if (!xsegbd_dev) {
1073                 ret = -ENOENT;
1074                 goto out_unlock;
1075         }
1076         xsegbd_bus_del_dev(xsegbd_dev);
1077
1078 out_unlock:
1079         mutex_unlock(&xsegbd_mutex);
1080         return ret;
1081 }
1082
1083 static struct bus_attribute xsegbd_bus_attrs[] = {
1084         __ATTR(add, S_IWUSR, NULL, xsegbd_add),
1085         __ATTR(remove, S_IWUSR, NULL, xsegbd_remove),
1086         __ATTR_NULL
1087 };
1088
1089 static int xsegbd_sysfs_init(void)
1090 {
1091         int ret;
1092
1093         ret = device_register(&xsegbd_root_dev);
1094         if (ret < 0)
1095                 return ret;
1096
1097         xsegbd_bus_type.bus_attrs = xsegbd_bus_attrs;
1098         ret = bus_register(&xsegbd_bus_type);
1099         if (ret < 0)
1100                 device_unregister(&xsegbd_root_dev);
1101
1102         return ret;
1103 }
1104
1105 static void xsegbd_sysfs_cleanup(void)
1106 {
1107         bus_unregister(&xsegbd_bus_type);
1108         device_unregister(&xsegbd_root_dev);
1109 }
1110
1111 /* *************************** */
1112 /* ** Module Initialization ** */
1113 /* *************************** */
1114
1115 static int __init xsegbd_init(void)
1116 {
1117         int ret = -ENOMEM;
1118         max_dev = end_portno - start_portno;
1119         if (max_dev < 0){
1120                 XSEGLOG("invalid port numbers");
1121                 ret = -EINVAL;
1122                 goto out;
1123         }
1124         xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL);
1125         if (!xsegbd_devices)
1126                 goto out;
1127
1128         spin_lock_init(&xsegbd_devices_lock);
1129
1130         XSEGLOG("registering block device major %d", major);
1131         ret = register_blkdev(major, XSEGBD_NAME);
1132         if (ret < 0) {
1133                 XSEGLOG("cannot register block device!");
1134                 ret = -EBUSY;
1135                 goto out_free;
1136         }
1137         major = ret;
1138         XSEGLOG("registered block device major %d", major);
1139
1140         ret = -ENOSYS;
1141         ret = xsegbd_xseg_init();
1142         if (ret)
1143                 goto out_unregister;
1144
1145         ret = xsegbd_sysfs_init();
1146         if (ret)
1147                 goto out_xseg;
1148
1149         XSEGLOG("initialization complete");
1150
1151 out:
1152         return ret;
1153
1154 out_xseg:
1155         xsegbd_xseg_quit();
1156
1157 out_unregister:
1158         unregister_blkdev(major, XSEGBD_NAME);
1159
1160 out_free:
1161         kfree(xsegbd_devices);
1162
1163         goto out;
1164 }
1165
1166 static void __exit xsegbd_exit(void)
1167 {
1168         xsegbd_sysfs_cleanup();
1169         xsegbd_xseg_quit();
1170         unregister_blkdev(major, XSEGBD_NAME);
1171 }
1172
1173 module_init(xsegbd_init);
1174 module_exit(xsegbd_exit);
1175