Add initial support for benchmarking xseg
[archipelago] / xseg / peers / kernel / xsegbd.c
1 /*
2  * Copyright (C) 2012 GRNET S.A.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  * 02110-1301, USA.
18  */
19
20 /* xsegbd.c
21  *
22  */
23
24 #include <linux/module.h>
25 #include <linux/moduleparam.h>
26 #include <linux/init.h>
27 #include <linux/sched.h>
28 #include <linux/kernel.h>
29 #include <linux/slab.h>
30 #include <linux/fs.h>
31 #include <linux/errno.h>
32 #include <linux/timer.h>
33 #include <linux/types.h>
34 #include <linux/vmalloc.h>
35 #include <linux/genhd.h>
36 #include <linux/blkdev.h>
37 #include <linux/bio.h>
38 #include <linux/device.h>
39 #include <linux/completion.h>
40 #include <linux/wait.h>
41 #include <sys/kernel/segdev.h>
42 #include "xsegbd.h"
43 #include <xseg/protocol.h>
44
45 #define XSEGBD_MINORS 1
46 /* define max request size to be used in xsegbd */
47 #define XSEGBD_MAX_REQUEST_SIZE 4194304U
48
49 MODULE_DESCRIPTION("xsegbd");
50 MODULE_AUTHOR("XSEG");
51 MODULE_LICENSE("GPL");
52
53 static long sector_size = 0;
54 static long blksize = 512;
55 static int major = 0;
56 static int max_dev = 200;
57 static long start_portno = 0;
58 static long end_portno = 199;
59 static char name[XSEGBD_SEGMENT_NAMELEN] = "xsegbd";
60 static char spec[256] = "segdev:xsegbd:512:1024:12";
61
62 module_param(sector_size, long, 0644);
63 module_param(blksize, long, 0644);
64 module_param(start_portno, long, 0644);
65 module_param(end_portno, long, 0644);
66 module_param(major, int, 0644);
67 module_param_string(name, name, sizeof(name), 0644);
68 module_param_string(spec, spec, sizeof(spec), 0644);
69
70 static struct xsegbd xsegbd;
71 static struct xsegbd_device **xsegbd_devices; /* indexed by portno */
72 static DEFINE_MUTEX(xsegbd_mutex);
73 static DEFINE_SPINLOCK(xsegbd_devices_lock);
74
75
76 struct xsegbd_device *__xsegbd_get_dev(unsigned long id)
77 {
78         struct xsegbd_device *xsegbd_dev = NULL;
79
80         spin_lock(&xsegbd_devices_lock);
81         xsegbd_dev = xsegbd_devices[id];
82         spin_unlock(&xsegbd_devices_lock);
83
84         return xsegbd_dev;
85 }
86
87 static long src_portno_to_id(xport src_portno)
88 {
89         return (src_portno - start_portno);
90 }
91
92 /* ************************* */
93 /* ***** sysfs helpers ***** */
94 /* ************************* */
95
96 static struct xsegbd_device *dev_to_xsegbd(struct device *dev)
97 {
98         return container_of(dev, struct xsegbd_device, dev);
99 }
100
101 static struct device *xsegbd_get_dev(struct xsegbd_device *xsegbd_dev)
102 {
103         /* FIXME */
104         return get_device(&xsegbd_dev->dev);
105 }
106
107 static void xsegbd_put_dev(struct xsegbd_device *xsegbd_dev)
108 {
109         put_device(&xsegbd_dev->dev);
110 }
111
112 /* ************************* */
113 /* ** XSEG Initialization ** */
114 /* ************************* */
115
116 static void xseg_callback(uint32_t portno);
117
118 int xsegbd_xseg_init(void)
119 {
120         int r;
121
122         if (!xsegbd.name[0])
123                 strncpy(xsegbd.name, name, XSEGBD_SEGMENT_NAMELEN);
124
125         r = xseg_initialize();
126         if (r) {
127                 XSEGLOG("cannot initialize 'segdev' peer");
128                 goto err;
129         }
130
131         r = xseg_parse_spec(spec, &xsegbd.config);
132         if (r)
133                 goto err;
134
135         if (strncmp(xsegbd.config.type, "segdev", 16))
136                 XSEGLOG("WARNING: unexpected segment type '%s' vs 'segdev'",
137                          xsegbd.config.type);
138
139         /* leave it here for now */
140         XSEGLOG("joining segment");
141         xsegbd.xseg = xseg_join(        xsegbd.config.type,
142                                         xsegbd.config.name,
143                                         "segdev",
144                                         xseg_callback           );
145         if (!xsegbd.xseg) {
146                 XSEGLOG("cannot find segment");
147                 r = -ENODEV;
148                 goto err;
149         }
150
151         return 0;
152 err:
153         return r;
154
155 }
156
157 int xsegbd_xseg_quit(void)
158 {
159         struct segdev *segdev;
160
161         /* make sure to unmap the segment first */
162         xseg_leave(xsegbd.xseg);
163
164         return 0;
165 }
166
167
168 /* ***************************** */
169 /* ** Block Device Operations ** */
170 /* ***************************** */
171
172 static int xsegbd_open(struct block_device *bdev, fmode_t mode)
173 {
174         int ret = 0, id;
175         struct gendisk *disk = bdev->bd_disk;
176         struct xsegbd_device *xsegbd_dev;
177         id = (long)disk->private_data;
178
179         //struct xsegbd_device *xsegbd_dev = disk->private_data;
180
181
182         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
183
184         spin_lock(&xsegbd_devices_lock);
185         xsegbd_dev = xsegbd_devices[id];
186         spin_unlock(&xsegbd_devices_lock);
187         if (!xsegbd_dev){
188                 ret = -ENOENT;
189                 goto out;
190         }
191
192         xsegbd_get_dev(xsegbd_dev);
193         xsegbd_dev->user_count++;
194 out:
195         mutex_unlock(&xsegbd_mutex);
196         return ret;
197 }
198
199 static int xsegbd_release(struct gendisk *gd, fmode_t mode)
200 {
201         int ret = 0, id;
202         struct xsegbd_device *xsegbd_dev;
203         id = (long)gd->private_data;
204
205         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
206
207         spin_lock(&xsegbd_devices_lock);
208         xsegbd_dev = xsegbd_devices[id];
209         spin_unlock(&xsegbd_devices_lock);
210         if (!xsegbd_dev){
211                 ret = -ENOENT;
212                 goto out;
213         }
214         if (!(xsegbd_dev->user_count > 0)){
215                 XSEGLOG("User count for xsebd %d not > 0", xsegbd_dev->id);
216                 WARN_ON(1);
217         }
218
219         xsegbd_dev->user_count--;
220         xsegbd_put_dev(xsegbd_dev);
221 out:
222         mutex_unlock(&xsegbd_mutex);
223         return ret;
224 }
225
226 static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode,
227                         unsigned int cmd, unsigned long arg)
228 {
229         return -ENOTTY;
230 }
231
232 static const struct block_device_operations xsegbd_ops = {
233         .owner          = THIS_MODULE,
234         .open           = xsegbd_open,
235         .release        = xsegbd_release,
236         .ioctl          = xsegbd_ioctl 
237 };
238
239
240 /* *************************** */
241 /* ** Device Initialization ** */
242 /* *************************** */
243
244 static void xseg_request_fn(struct request_queue *rq);
245 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev);
246 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev);
247
248 static int xsegbd_dev_init(struct xsegbd_device *xsegbd_dev)
249 {
250         int ret = -ENOMEM;
251         struct gendisk *disk;
252         unsigned int max_request_size_bytes;
253
254         spin_lock_init(&xsegbd_dev->rqlock);
255
256         xsegbd_dev->xsegbd = &xsegbd;
257
258         /* allocates and initializes queue */
259         xsegbd_dev->blk_queue = blk_init_queue(xseg_request_fn, &xsegbd_dev->rqlock);
260         if (!xsegbd_dev->blk_queue)
261                 goto out;
262
263         xsegbd_dev->blk_queue->queuedata = xsegbd_dev;
264
265         blk_queue_flush(xsegbd_dev->blk_queue, REQ_FLUSH | REQ_FUA);
266         blk_queue_logical_block_size(xsegbd_dev->blk_queue, 512);
267         blk_queue_physical_block_size(xsegbd_dev->blk_queue, blksize);
268         blk_queue_bounce_limit(xsegbd_dev->blk_queue, BLK_BOUNCE_ANY);
269         
270
271         max_request_size_bytes = XSEGBD_MAX_REQUEST_SIZE;
272         blk_queue_max_hw_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 9);
273 //      blk_queue_max_sectors(xsegbd_dev->blk_queue, max_request_size_bytes >> 10);
274         blk_queue_max_segments(xsegbd_dev->blk_queue, 1024);
275         blk_queue_max_segment_size(xsegbd_dev->blk_queue, max_request_size_bytes);
276         blk_queue_io_min(xsegbd_dev->blk_queue, max_request_size_bytes);
277         blk_queue_io_opt(xsegbd_dev->blk_queue, max_request_size_bytes);
278
279         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, xsegbd_dev->blk_queue);
280
281         /* vkoukis says we don't need partitions */
282         xsegbd_dev->gd = disk = alloc_disk(XSEGBD_MINORS);
283         if (!disk)
284                 goto out;
285
286         disk->major = xsegbd_dev->major;
287         disk->first_minor = xsegbd_dev->id * XSEGBD_MINORS;
288         disk->fops = &xsegbd_ops;
289         disk->queue = xsegbd_dev->blk_queue;
290 //      disk->private_data = xsegbd_dev;
291         disk->private_data = (void *)xsegbd_dev->id;
292         disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO;
293         snprintf(disk->disk_name, 32, "xsegbd%ld", xsegbd_dev->id);
294
295         ret = 0;
296
297         /* allow a non-zero sector_size parameter to override the disk size */
298         if (sector_size)
299                 xsegbd_dev->sectors = sector_size;
300         else {
301                 ret = xsegbd_get_size(xsegbd_dev);
302                 if (ret)
303                         goto out;
304         }
305
306         set_capacity(disk, xsegbd_dev->sectors);
307         XSEGLOG("xsegbd active...");
308         add_disk(disk); /* immediately activates the device */
309
310 out:
311         /* on error, everything is cleaned up in xsegbd_dev_release */
312         return ret;
313 }
314
315 static void xsegbd_dev_release(struct device *dev)
316 {
317         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
318
319
320         /* cleanup gendisk and blk_queue the right way */
321         if (xsegbd_dev->gd) {
322                 if (xsegbd_dev->gd->flags & GENHD_FL_UP)
323                         del_gendisk(xsegbd_dev->gd);
324
325                 xsegbd_mapclose(xsegbd_dev);
326         }
327
328         spin_lock(&xsegbd_devices_lock);
329         BUG_ON(xsegbd_devices[xsegbd_dev->id] != xsegbd_dev);
330         xsegbd_devices[xsegbd_dev->id] = NULL;
331         spin_unlock(&xsegbd_devices_lock);
332
333         XSEGLOG("releasing id: %d", xsegbd_dev->id);
334 //      xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
335         xseg_quit_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
336
337         if (xsegbd_dev->blk_queue)
338                 blk_cleanup_queue(xsegbd_dev->blk_queue);
339         if (xsegbd_dev->gd)
340                 put_disk(xsegbd_dev->gd);
341
342 //      if (xseg_free_requests(xsegbd_dev->xseg,
343 //                      xsegbd_dev->src_portno, xsegbd_dev->nr_requests) < 0)
344 //              XSEGLOG("Error trying to free requests!\n");
345
346         if (xsegbd_dev->xseg){
347                 xseg_leave(xsegbd_dev->xseg);
348                 xsegbd_dev->xseg = NULL;
349         }
350
351         if (xsegbd_dev->blk_req_pending){
352                 kfree(xsegbd_dev->blk_req_pending);
353                 xsegbd_dev->blk_req_pending = NULL;
354         }
355         xq_free(&xsegbd_dev->blk_queue_pending);
356         kfree(xsegbd_dev);
357         module_put(THIS_MODULE);
358 }
359
360 /* ******************* */
361 /* ** Critical Path ** */
362 /* ******************* */
363
364 static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq,
365                         struct request *blkreq)
366 {
367         struct bio_vec *bvec;
368         struct req_iterator iter;
369         uint64_t off = 0;
370         char *data = xseg_get_data(xseg, xreq);
371         rq_for_each_segment(bvec, blkreq, iter) {
372                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
373                 memcpy(data + off, bdata, bvec->bv_len);
374                 off += bvec->bv_len;
375                 kunmap_atomic(bdata);
376         }
377 }
378
379 static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq,
380                         struct request *blkreq)
381 {
382         struct bio_vec *bvec;
383         struct req_iterator iter;
384         uint64_t off = 0;
385         char *data = xseg_get_data(xseg, xreq);
386         rq_for_each_segment(bvec, blkreq, iter) {
387                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
388                 memcpy(bdata, data + off, bvec->bv_len);
389                 off += bvec->bv_len;
390                 kunmap_atomic(bdata);
391         }
392 }
393
394 static void xseg_request_fn(struct request_queue *rq)
395 {
396         struct xseg_request *xreq;
397         struct xsegbd_device *xsegbd_dev = rq->queuedata;
398         struct request *blkreq;
399         struct xsegbd_pending *pending;
400         xqindex blkreq_idx;
401         char *target;
402         uint64_t datalen;
403         xport p;
404         int r;
405         unsigned long flags;
406
407         spin_unlock_irq(&xsegbd_dev->rqlock);
408         for (;;) {
409                 if (current_thread_info()->preempt_count || irqs_disabled()){
410                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
411                                         current_thread_info()->preempt_count, irqs_disabled());
412                 }
413                 //XSEGLOG("Priority: %d", current_thread_info()->task->prio);
414                 //XSEGLOG("Static priority: %d", current_thread_info()->task->static_prio);
415                 //XSEGLOG("Normal priority: %d", current_thread_info()->task->normal_prio);
416                 //XSEGLOG("Rt_priority: %u", current_thread_info()->task->rt_priority);
417                 blkreq_idx = Noneidx;
418                 xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno, 
419                                 xsegbd_dev->dst_portno, X_ALLOC);
420                 if (!xreq)
421                         break;
422
423                 blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 
424                                                 xsegbd_dev->src_portno);
425                 if (blkreq_idx == Noneidx)
426                         break;
427
428                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
429                         XSEGLOG("blkreq_idx >= xsegbd_dev->nr_requests");
430                         WARN_ON(1);
431                         break;
432                 }
433
434
435                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
436                 blkreq = blk_fetch_request(rq);
437                 if (!blkreq){
438                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
439                         break;
440                 }
441
442                 if (blkreq->cmd_type != REQ_TYPE_FS) {
443                         //FIXME we lose xreq here
444                         XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
445                         __blk_end_request_all(blkreq, 0);
446                         spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
447                         continue;
448                 }
449                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
450                 if (current_thread_info()->preempt_count || irqs_disabled()){
451                         XSEGLOG("Current thread preempt_count: %d, irqs_disabled(): %lu ",
452                                         current_thread_info()->preempt_count, irqs_disabled());
453                 }
454
455                 datalen = blk_rq_bytes(blkreq);
456                 r = xseg_prep_request(xsegbd_dev->xseg, xreq, 
457                                         xsegbd_dev->targetlen, datalen);
458                 if (r < 0) {
459                         XSEGLOG("couldn't prep request");
460                         blk_end_request_err(blkreq, r);
461                         WARN_ON(1);
462                         break;
463                 }
464                 r = -ENOMEM;
465                 if (xreq->bufferlen - xsegbd_dev->targetlen < datalen){
466                         XSEGLOG("malformed req buffers");
467                         blk_end_request_err(blkreq, r);
468                         WARN_ON(1);
469                         break;
470                 }
471
472                 target = xseg_get_target(xsegbd_dev->xseg, xreq);
473                 strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
474
475                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
476                 pending->dev = xsegbd_dev;
477                 pending->request = blkreq;
478                 pending->comp = NULL;
479
480                 xreq->size = datalen;
481                 xreq->offset = blk_rq_pos(blkreq) << 9;
482                 xreq->priv = (uint64_t) blkreq_idx;
483
484                 /*
485                 if (xreq->offset >= (sector_size << 9))
486                         XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
487                                  blk_rq_pos(blkreq), sector_size,
488                                  blkreq->cmd_flags & REQ_FLUSH,
489                                  blkreq->cmd_flags & REQ_FUA);
490                 */
491
492                 if (blkreq->cmd_flags & REQ_FLUSH)
493                         xreq->flags |= XF_FLUSH;
494
495                 if (blkreq->cmd_flags & REQ_FUA)
496                         xreq->flags |= XF_FUA;
497
498                 if (rq_data_dir(blkreq)) {
499                         blk_to_xseg(xsegbd_dev->xseg, xreq, blkreq);
500                         xreq->op = X_WRITE;
501                 } else {
502                         xreq->op = X_READ;
503                 }
504
505
506 //              XSEGLOG("%s : %lu (%lu)", xsegbd_dev->target, xreq->offset, xreq->datalen);
507                 r = -EIO;
508                 p = xseg_submit(xsegbd_dev->xseg, xreq, 
509                                         xsegbd_dev->src_portno, X_ALLOC);
510                 if (p == NoPort) {
511                         XSEGLOG("coundn't submit req");
512                         WARN_ON(1);
513                         blk_end_request_err(blkreq, r);
514                         break;
515                 }
516                 WARN_ON(xseg_signal(xsegbd_dev->xsegbd->xseg, p) < 0);
517         }
518         if (xreq)
519                 WARN_ON(xseg_put_request(xsegbd_dev->xsegbd->xseg, xreq, 
520                                         xsegbd_dev->src_portno) == -1);
521         if (blkreq_idx != Noneidx)
522                 WARN_ON(xq_append_head(&xsegbd_dev->blk_queue_pending, 
523                                 blkreq_idx, xsegbd_dev->src_portno) == Noneidx);
524         spin_lock_irq(&xsegbd_dev->rqlock);
525 }
526
527 int update_dev_sectors_from_request(    struct xsegbd_device *xsegbd_dev,
528                                         struct xseg_request *xreq       )
529 {
530         void *data;
531         struct xseg_reply_info *xreply;
532         if (!xreq) {
533                 XSEGLOG("Invalid xreq");
534                 return -EIO;
535         }
536
537         if (xreq->state & XS_FAILED)
538                 return -ENOENT;
539
540         if (!(xreq->state & XS_SERVED))
541                 return -EIO;
542
543         data = xseg_get_data(xsegbd_dev->xseg, xreq);
544         if (!data) {
545                 XSEGLOG("Invalid req data");
546                 return -EIO;
547         }
548         if (!xsegbd_dev) {
549                 XSEGLOG("Invalid xsegbd_dev");
550                 return -ENOENT;
551         }
552         xreply = (struct xseg_reply_info *)data;
553         xsegbd_dev->sectors = xreply->size / 512ULL;
554         return 0;
555 }
556
557 static int xsegbd_get_size(struct xsegbd_device *xsegbd_dev)
558 {
559         struct xseg_request *xreq;
560         char *target;
561         xqindex blkreq_idx;
562         struct xsegbd_pending *pending;
563         struct completion comp;
564         xport p;
565         int ret = -EBUSY;
566
567         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
568                         xsegbd_dev->dst_portno, X_ALLOC);
569         if (!xreq)
570                 goto out;
571
572         WARN_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 
573                                 sizeof(struct xseg_reply_info)));
574
575         init_completion(&comp);
576         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
577         if (blkreq_idx == Noneidx)
578                 goto out_put;
579
580         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
581         pending->dev = xsegbd_dev;
582         pending->request = NULL;
583         pending->comp = &comp;
584
585
586         xreq->priv = (uint64_t) blkreq_idx;
587
588         target = xseg_get_target(xsegbd_dev->xseg, xreq);
589         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
590         xreq->size = xreq->datalen;
591         xreq->offset = 0;
592         xreq->op = X_INFO;
593
594         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
595         p = xseg_submit(xsegbd_dev->xseg, xreq,
596                                 xsegbd_dev->src_portno, X_ALLOC);
597         if ( p == NoPort) {
598                 XSEGLOG("couldn't submit request");
599                 WARN_ON(1);
600                 goto out_queue;
601         }
602         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
603         XSEGLOG("Before wait for completion, comp %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
604         wait_for_completion_interruptible(&comp);
605         XSEGLOG("Woken up after wait_for_completion_interruptible(), comp: %lx [%llu]", (unsigned long) pending->comp, (unsigned long long) blkreq_idx);
606         ret = update_dev_sectors_from_request(xsegbd_dev, xreq);
607         XSEGLOG("get_size: sectors = %ld\n", (long)xsegbd_dev->sectors);
608
609 out_queue:
610         pending->dev = NULL;
611         pending->comp = NULL;
612         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
613 out_put:
614         WARN_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
615 out:
616         return ret;
617 }
618
619 static int xsegbd_mapclose(struct xsegbd_device *xsegbd_dev)
620 {
621         struct xseg_request *xreq;
622         char *target;
623         xqindex blkreq_idx;
624         struct xsegbd_pending *pending;
625         struct completion comp;
626         xport p;
627         int ret = -EBUSY;
628
629         xreq = xseg_get_request(xsegbd_dev->xseg, xsegbd_dev->src_portno,
630                         xsegbd_dev->dst_portno, X_ALLOC);
631         if (!xreq)
632                 goto out;
633
634         WARN_ON(xseg_prep_request(xsegbd_dev->xseg, xreq, xsegbd_dev->targetlen, 0));
635
636         init_completion(&comp);
637         blkreq_idx = xq_pop_head(&xsegbd_dev->blk_queue_pending, 1);
638         if (blkreq_idx == Noneidx)
639                 goto out_put;
640
641         pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
642         pending->dev = xsegbd_dev;
643         pending->request = NULL;
644         pending->comp = &comp;
645
646
647         xreq->priv = (uint64_t) blkreq_idx;
648
649         target = xseg_get_target(xsegbd_dev->xseg, xreq);
650         strncpy(target, xsegbd_dev->target, xsegbd_dev->targetlen);
651         xreq->size = xreq->datalen;
652         xreq->offset = 0;
653         xreq->op = X_CLOSE;
654
655         xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
656         p = xseg_submit(xsegbd_dev->xseg, xreq, 
657                                 xsegbd_dev->src_portno, X_ALLOC);
658         if ( p == NoPort) {
659                 XSEGLOG("couldn't submit request");
660                 WARN_ON(1);
661                 goto out_queue;
662         }
663         WARN_ON(xseg_signal(xsegbd_dev->xseg, p) < 0);
664         wait_for_completion_interruptible(&comp);
665         ret = 0;
666         if (xreq->state & XS_FAILED)
667                 XSEGLOG("Couldn't close disk on mapper");
668
669 out_queue:
670         pending->dev = NULL;
671         pending->comp = NULL;
672         xq_append_head(&xsegbd_dev->blk_queue_pending, blkreq_idx, 1);
673 out_put:
674         WARN_ON(xseg_put_request(xsegbd_dev->xseg, xreq, xsegbd_dev->src_portno) == -1);
675 out:
676         return ret;
677 }
678
679 static void xseg_callback(xport portno)
680 {
681         struct xsegbd_device *xsegbd_dev;
682         struct xseg_request *xreq;
683         struct request *blkreq;
684         struct xsegbd_pending *pending;
685         unsigned long flags;
686         xqindex blkreq_idx, ridx;
687         int err;
688         // mayby this should be src_portno_to_id(portno)
689         xsegbd_dev  = __xsegbd_get_dev(portno);
690         if (!xsegbd_dev) {
691                 XSEGLOG("portno: %u has no xsegbd device assigned", portno);
692                 WARN_ON(1);
693                 return;
694         }
695
696         for (;;) {
697                 xseg_prepare_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
698                 xreq = xseg_receive(xsegbd_dev->xseg, portno, 0);
699                 if (!xreq)
700                         break;
701
702 //              xseg_cancel_wait(xsegbd_dev->xseg, xsegbd_dev->src_portno);
703
704                 blkreq_idx = (xqindex) xreq->priv;
705                 if (blkreq_idx >= xsegbd_dev->nr_requests) {
706                         WARN_ON(1);
707                         //FIXME maybe put request?
708                         continue;
709                 }
710
711                 pending = &xsegbd_dev->blk_req_pending[blkreq_idx];
712                 if (pending->comp) {
713                         /* someone is blocking on this request
714                            and will handle it when we wake them up. */
715                         complete(pending->comp);
716                         /* the request is blocker's responsibility so
717                            we will not put_request(); */
718                         continue;
719                 }
720
721                 /* this is now treated as a block I/O request to end */
722                 blkreq = pending->request;
723                 pending->request = NULL;
724                 if (xsegbd_dev != pending->dev) {
725                         //FIXME maybe put request?
726                         XSEGLOG("xsegbd_dev != pending->dev");
727                         WARN_ON(1);
728                         continue;
729                 }
730                 pending->dev = NULL;
731                 if (!blkreq){
732                         //FIXME maybe put request?
733                         XSEGLOG("blkreq does not exist");
734                         WARN_ON(1);
735                         continue;
736                 }
737
738                 err = -EIO;
739                 if (!(xreq->state & XS_SERVED))
740                         goto blk_end;
741
742                 if (xreq->serviced != blk_rq_bytes(blkreq))
743                         goto blk_end;
744
745                 err = 0;
746                 if (!rq_data_dir(blkreq)){
747                         xseg_to_blk(xsegbd_dev->xseg, xreq, blkreq);
748                 }
749 blk_end:
750                 blk_end_request_all(blkreq, err);
751
752                 ridx = xq_append_head(&xsegbd_dev->blk_queue_pending, 
753                                         blkreq_idx, xsegbd_dev->src_portno);
754                 if (ridx == Noneidx) {
755                         XSEGLOG("couldnt append blkreq_idx");
756                         WARN_ON(1);
757                 }
758
759                 if (xseg_put_request(xsegbd_dev->xseg, xreq, 
760                                                 xsegbd_dev->src_portno) < 0){
761                         XSEGLOG("couldn't put req");
762                         WARN_ON(1);
763                 }
764         }
765         if (xsegbd_dev) {
766                 spin_lock_irqsave(&xsegbd_dev->rqlock, flags);
767                 xseg_request_fn(xsegbd_dev->blk_queue);
768                 spin_unlock_irqrestore(&xsegbd_dev->rqlock, flags);
769         }
770 }
771
772
773 /* sysfs interface */
774
775 static struct bus_type xsegbd_bus_type = {
776         .name   = "xsegbd",
777 };
778
779 static ssize_t xsegbd_size_show(struct device *dev,
780                                         struct device_attribute *attr, char *buf)
781 {
782         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
783
784         return sprintf(buf, "%llu\n", (unsigned long long) xsegbd_dev->sectors * 512ULL);
785 }
786
787 static ssize_t xsegbd_major_show(struct device *dev,
788                                         struct device_attribute *attr, char *buf)
789 {
790         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
791
792         return sprintf(buf, "%d\n", xsegbd_dev->major);
793 }
794
795 static ssize_t xsegbd_srcport_show(struct device *dev,
796                                         struct device_attribute *attr, char *buf)
797 {
798         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
799
800         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->src_portno);
801 }
802
803 static ssize_t xsegbd_dstport_show(struct device *dev,
804                                         struct device_attribute *attr, char *buf)
805 {
806         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
807
808         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->dst_portno);
809 }
810
811 static ssize_t xsegbd_id_show(struct device *dev,
812                                         struct device_attribute *attr, char *buf)
813 {
814         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
815
816         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->id);
817 }
818
819 static ssize_t xsegbd_reqs_show(struct device *dev,
820                                         struct device_attribute *attr, char *buf)
821 {
822         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
823
824         return sprintf(buf, "%u\n", (unsigned) xsegbd_dev->nr_requests);
825 }
826
827 static ssize_t xsegbd_target_show(struct device *dev,
828                                         struct device_attribute *attr, char *buf)
829 {
830         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
831
832         return sprintf(buf, "%s\n", xsegbd_dev->target);
833 }
834
835 static ssize_t xsegbd_image_refresh(struct device *dev,
836                                         struct device_attribute *attr,
837                                         const char *buf,
838                                         size_t size)
839 {
840         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
841         int rc, ret = size;
842
843         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
844
845         rc = xsegbd_get_size(xsegbd_dev);
846         if (rc < 0) {
847                 ret = rc;
848                 goto out;
849         }
850
851         set_capacity(xsegbd_dev->gd, xsegbd_dev->sectors);
852
853 out:
854         mutex_unlock(&xsegbd_mutex);
855         return ret;
856 }
857
858 //FIXME
859 //maybe try callback, first and then do a more invasive cleanup
860 static ssize_t xsegbd_cleanup(struct device *dev,
861                                         struct device_attribute *attr,
862                                         const char *buf,
863                                         size_t size)
864 {
865         struct xsegbd_device *xsegbd_dev = dev_to_xsegbd(dev);
866         int ret = size, i;
867         struct request *blkreq = NULL;
868         struct xsegbd_pending *pending = NULL;
869         struct completion *comp = NULL;
870
871         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
872         xlock_acquire(&xsegbd_dev->blk_queue_pending.lock, 
873                                 xsegbd_dev->src_portno);
874         for (i = 0; i < xsegbd_dev->nr_requests; i++) {
875                 if (!__xq_check(&xsegbd_dev->blk_queue_pending, i)) {
876                         pending = &xsegbd_dev->blk_req_pending[i];
877                         blkreq = pending->request;
878                         pending->request = NULL;
879                         comp = pending->comp;
880                         pending->comp = NULL;
881                         if (blkreq){
882                                 XSEGLOG("Cleaning up blkreq %lx [%d]", (unsigned long) blkreq, i);
883                                 blk_end_request_all(blkreq, -EIO);
884                         }
885                         if (comp){
886                                 XSEGLOG("Cleaning up comp %lx [%d]", (unsigned long) comp, i);
887                                 complete(comp);
888                         }
889                         __xq_append_tail(&xsegbd_dev->blk_queue_pending, i);
890                 }
891         }
892         xlock_release(&xsegbd_dev->blk_queue_pending.lock);
893
894         mutex_unlock(&xsegbd_mutex);
895         return ret;
896 }
897
898 static DEVICE_ATTR(size, S_IRUGO, xsegbd_size_show, NULL);
899 static DEVICE_ATTR(major, S_IRUGO, xsegbd_major_show, NULL);
900 static DEVICE_ATTR(srcport, S_IRUGO, xsegbd_srcport_show, NULL);
901 static DEVICE_ATTR(dstport, S_IRUGO, xsegbd_dstport_show, NULL);
902 static DEVICE_ATTR(id , S_IRUGO, xsegbd_id_show, NULL);
903 static DEVICE_ATTR(reqs , S_IRUGO, xsegbd_reqs_show, NULL);
904 static DEVICE_ATTR(target, S_IRUGO, xsegbd_target_show, NULL);
905 static DEVICE_ATTR(refresh , S_IWUSR, NULL, xsegbd_image_refresh);
906 static DEVICE_ATTR(cleanup , S_IWUSR, NULL, xsegbd_cleanup);
907
908 static struct attribute *xsegbd_attrs[] = {
909         &dev_attr_size.attr,
910         &dev_attr_major.attr,
911         &dev_attr_srcport.attr,
912         &dev_attr_dstport.attr,
913         &dev_attr_id.attr,
914         &dev_attr_reqs.attr,
915         &dev_attr_target.attr,
916         &dev_attr_refresh.attr,
917         &dev_attr_cleanup.attr,
918         NULL
919 };
920
921 static struct attribute_group xsegbd_attr_group = {
922         .attrs = xsegbd_attrs,
923 };
924
925 static const struct attribute_group *xsegbd_attr_groups[] = {
926         &xsegbd_attr_group,
927         NULL
928 };
929
930 static void xsegbd_sysfs_dev_release(struct device *dev)
931 {
932 }
933
934 static struct device_type xsegbd_device_type = {
935         .name           = "xsegbd",
936         .groups         = xsegbd_attr_groups,
937         .release        = xsegbd_sysfs_dev_release,
938 };
939
940 static void xsegbd_root_dev_release(struct device *dev)
941 {
942 }
943
944 static struct device xsegbd_root_dev = {
945         .init_name      = "xsegbd",
946         .release        = xsegbd_root_dev_release,
947 };
948
949 static int xsegbd_bus_add_dev(struct xsegbd_device *xsegbd_dev)
950 {
951         int ret = -ENOMEM;
952         struct device *dev;
953
954         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
955         dev = &xsegbd_dev->dev;
956
957         dev->bus = &xsegbd_bus_type;
958         dev->type = &xsegbd_device_type;
959         dev->parent = &xsegbd_root_dev;
960         dev->release = xsegbd_dev_release;
961         dev_set_name(dev, "%ld", xsegbd_dev->id);
962
963         ret = device_register(dev);
964
965         mutex_unlock(&xsegbd_mutex);
966         return ret;
967 }
968
969 static void xsegbd_bus_del_dev(struct xsegbd_device *xsegbd_dev)
970 {
971         device_unregister(&xsegbd_dev->dev);
972 }
973
974 static ssize_t xsegbd_add(struct bus_type *bus, const char *buf, size_t count)
975 {
976         struct xsegbd_device *xsegbd_dev;
977         struct xseg_port *port;
978         ssize_t ret = -ENOMEM;
979
980         if (!try_module_get(THIS_MODULE))
981                 return -ENODEV;
982
983         xsegbd_dev = kzalloc(sizeof(*xsegbd_dev), GFP_KERNEL);
984         if (!xsegbd_dev)
985                 goto out;
986
987         spin_lock_init(&xsegbd_dev->rqlock);
988         INIT_LIST_HEAD(&xsegbd_dev->node);
989         xsegbd_dev->user_count = 0;
990
991         /* parse cmd */
992         if (sscanf(buf, "%" __stringify(XSEGBD_TARGET_NAMELEN) "s "
993                         "%d:%d:%d", xsegbd_dev->target, &xsegbd_dev->src_portno,
994                         &xsegbd_dev->dst_portno, &xsegbd_dev->nr_requests) < 3) {
995                 ret = -EINVAL;
996                 goto out_dev;
997         }
998         xsegbd_dev->targetlen = strlen(xsegbd_dev->target);
999
1000         if (xsegbd_dev->src_portno < start_portno || xsegbd_dev->src_portno > end_portno){
1001                 XSEGLOG("Invadid portno");
1002                 ret = -EINVAL;
1003                 goto out_dev;
1004         }
1005         xsegbd_dev->id = src_portno_to_id(xsegbd_dev->src_portno);
1006
1007         spin_lock(&xsegbd_devices_lock);
1008         if (xsegbd_devices[xsegbd_dev->id] != NULL) {
1009                 ret = -EINVAL;
1010                 goto out_unlock;
1011         }
1012         xsegbd_devices[xsegbd_dev->id] = xsegbd_dev;
1013         spin_unlock(&xsegbd_devices_lock);
1014
1015         xsegbd_dev->major = major;
1016
1017         ret = xsegbd_bus_add_dev(xsegbd_dev);
1018         if (ret)
1019                 goto out_delentry;
1020
1021         if (!xq_alloc_seq(&xsegbd_dev->blk_queue_pending,
1022                                 xsegbd_dev->nr_requests,
1023                                 xsegbd_dev->nr_requests))
1024                 goto out_bus;
1025
1026         xsegbd_dev->blk_req_pending = kzalloc(
1027                         xsegbd_dev->nr_requests *sizeof(struct xsegbd_pending),
1028                                    GFP_KERNEL);
1029         if (!xsegbd_dev->blk_req_pending)
1030                 goto out_bus;
1031
1032
1033         XSEGLOG("joining segment");
1034         //FIXME use xsebd module config for now
1035         xsegbd_dev->xseg = xseg_join(   xsegbd.config.type,
1036                                         xsegbd.config.name,
1037                                         "segdev",
1038                                         xseg_callback           );
1039         if (!xsegbd_dev->xseg)
1040                 goto out_bus;
1041
1042         XSEGLOG("%s binding to source port %u (destination %u)", xsegbd_dev->target,
1043                         xsegbd_dev->src_portno, xsegbd_dev->dst_portno);
1044         port = xseg_bind_port(xsegbd_dev->xseg, xsegbd_dev->src_portno, NULL);
1045         if (!port) {
1046                 XSEGLOG("cannot bind to port");
1047                 ret = -EFAULT;
1048
1049                 goto out_bus;
1050         }
1051         
1052         if (xsegbd_dev->src_portno != xseg_portno(xsegbd_dev->xseg, port)) {
1053                 XSEGLOG("portno != xsegbd_dev->src_portno");
1054                 WARN_ON(1);
1055                 ret = -EFAULT;
1056                 goto out_bus;
1057         }
1058         xseg_init_local_signal(xsegbd_dev->xseg, xsegbd_dev->src_portno);
1059
1060
1061         /* make sure we don't get any requests until we're ready to handle them */
1062         xseg_cancel_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1063
1064         ret = xsegbd_dev_init(xsegbd_dev);
1065         if (ret)
1066                 goto out_bus;
1067
1068         xseg_prepare_wait(xsegbd_dev->xseg, xseg_portno(xsegbd_dev->xseg, port));
1069         return count;
1070
1071 out_bus:
1072         xsegbd_bus_del_dev(xsegbd_dev);
1073         return ret;
1074
1075 out_delentry:
1076         spin_lock(&xsegbd_devices_lock);
1077         xsegbd_devices[xsegbd_dev->id] = NULL;
1078
1079 out_unlock:
1080         spin_unlock(&xsegbd_devices_lock);
1081
1082 out_dev:
1083         kfree(xsegbd_dev);
1084
1085 out:
1086         module_put(THIS_MODULE);
1087         return ret;
1088 }
1089
1090 static ssize_t xsegbd_remove(struct bus_type *bus, const char *buf, size_t count)
1091 {
1092         struct xsegbd_device *xsegbd_dev = NULL;
1093         int id, ret;
1094         unsigned long ul_id;
1095
1096         ret = strict_strtoul(buf, 10, &ul_id);
1097         if (ret)
1098                 return ret;
1099
1100         id = (int) ul_id;
1101         if (id != ul_id)
1102                 return -EINVAL;
1103
1104         mutex_lock_nested(&xsegbd_mutex, SINGLE_DEPTH_NESTING);
1105
1106         ret = count;
1107         xsegbd_dev = __xsegbd_get_dev(id);
1108         if (!xsegbd_dev) {
1109                 ret = -ENOENT;
1110                 goto out_unlock;
1111         }
1112         if (xsegbd_dev->user_count > 0){
1113                 ret = -EBUSY;
1114                 goto out_unlock;
1115         }
1116         xsegbd_bus_del_dev(xsegbd_dev);
1117
1118 out_unlock:
1119         mutex_unlock(&xsegbd_mutex);
1120         return ret;
1121 }
1122
1123 static struct bus_attribute xsegbd_bus_attrs[] = {
1124         __ATTR(add, S_IWUSR, NULL, xsegbd_add),
1125         __ATTR(remove, S_IWUSR, NULL, xsegbd_remove),
1126         __ATTR_NULL
1127 };
1128
1129 static int xsegbd_sysfs_init(void)
1130 {
1131         int ret;
1132
1133         ret = device_register(&xsegbd_root_dev);
1134         if (ret < 0)
1135                 return ret;
1136
1137         xsegbd_bus_type.bus_attrs = xsegbd_bus_attrs;
1138         ret = bus_register(&xsegbd_bus_type);
1139         if (ret < 0)
1140                 device_unregister(&xsegbd_root_dev);
1141
1142         return ret;
1143 }
1144
1145 static void xsegbd_sysfs_cleanup(void)
1146 {
1147         bus_unregister(&xsegbd_bus_type);
1148         device_unregister(&xsegbd_root_dev);
1149 }
1150
1151 /* *************************** */
1152 /* ** Module Initialization ** */
1153 /* *************************** */
1154
1155 static int __init xsegbd_init(void)
1156 {
1157         int ret = -ENOMEM;
1158         max_dev = end_portno - start_portno;
1159         if (max_dev < 0){
1160                 XSEGLOG("invalid port numbers");
1161                 ret = -EINVAL;
1162                 goto out;
1163         }
1164         xsegbd_devices = kzalloc(max_dev * sizeof(struct xsegbd_devices *), GFP_KERNEL);
1165         if (!xsegbd_devices)
1166                 goto out;
1167
1168         spin_lock_init(&xsegbd_devices_lock);
1169
1170         XSEGLOG("registering block device major %d", major);
1171         ret = register_blkdev(major, XSEGBD_NAME);
1172         if (ret < 0) {
1173                 XSEGLOG("cannot register block device!");
1174                 ret = -EBUSY;
1175                 goto out_free;
1176         }
1177         major = ret;
1178         XSEGLOG("registered block device major %d", major);
1179
1180         ret = -ENOSYS;
1181         ret = xsegbd_xseg_init();
1182         if (ret)
1183                 goto out_unregister;
1184
1185         ret = xsegbd_sysfs_init();
1186         if (ret)
1187                 goto out_xseg;
1188
1189         XSEGLOG("initialization complete");
1190
1191 out:
1192         return ret;
1193
1194 out_xseg:
1195         xsegbd_xseg_quit();
1196
1197 out_unregister:
1198         unregister_blkdev(major, XSEGBD_NAME);
1199
1200 out_free:
1201         kfree(xsegbd_devices);
1202
1203         goto out;
1204 }
1205
1206 static void __exit xsegbd_exit(void)
1207 {
1208         xsegbd_sysfs_cleanup();
1209         xsegbd_xseg_quit();
1210         unregister_blkdev(major, XSEGBD_NAME);
1211 }
1212
1213 module_init(xsegbd_init);
1214 module_exit(xsegbd_exit);
1215