add support for larger than 1 segment requests + xseg_signaling.
[archipelago] / xseg / sys / xsegbd.c
1 /* xsegbd.c
2  *
3  */
4
5 #include <linux/module.h>
6 #include <linux/moduleparam.h>
7 #include <linux/init.h>
8 #include <linux/sched.h>
9 #include <linux/kernel.h>
10 #include <linux/slab.h>
11 #include <linux/fs.h>
12 #include <linux/errno.h>
13 #include <linux/timer.h>
14 #include <linux/types.h>
15 #include <linux/vmalloc.h>
16 #include <linux/genhd.h>
17 #include <linux/blkdev.h>
18 #include <linux/bio.h>
19
20 #include "xsegdev.h"
21 #include "xsegbd.h"
22
23 #define XSEGBD_MINORS 1
24
25 MODULE_DESCRIPTION("xsegbd");
26 MODULE_AUTHOR("XSEG");
27 MODULE_LICENSE("GPL");
28
29 static long sector_size = 200000;
30 static long blksize = 512;
31 static int major = 0;
32 static char name[XSEGBD_VOLUME_NAMELEN] = "xsegbd";
33 static char spec[256] = "xsegdev:xsegbd:4:512:64:1024:12";
34 static int src_portno = 0, dst_portno = 1, nr_requests = 128;
35
36 module_param(sector_size, long, 0644);
37 module_param(blksize, long, 0644);
38 module_param(major, int, 0644);
39 module_param(src_portno, int, 0644);
40 module_param(dst_portno, int, 0644);
41 module_param(nr_requests, int, 0644);
42 module_param_string(name, name, sizeof(name), 0644);
43 module_param_string(spec, spec, sizeof(spec), 0644);
44
45 static volatile int count;
46 struct semaphore xsegbd_lock;
47 static struct xsegbd xsegbd;
48
49
50 /* ********************* */
51 /* ** XSEG Operations ** */
52 /* ********************* */
53
54 static void *xsegdev_malloc(uint64_t size)
55 {
56         return kmalloc((size_t)size, GFP_KERNEL);
57 }
58
59 static void *xsegdev_realloc(void *mem, uint64_t size)
60 {
61         return krealloc(mem, (size_t)size, GFP_KERNEL);
62 }
63
64 static void xsegdev_mfree(void *ptr)
65 {
66         return kfree(ptr);
67 }
68
69 static long xsegdev_allocate(const char *name, uint64_t size)
70 {
71         int r;
72         struct xsegdev *xsegdev = xsegdev_get(0);
73
74         r = IS_ERR(xsegdev) ? PTR_ERR(xsegdev) : 0;
75         if (r) {
76                 XSEGLOG("cannot acquire xsegdev");
77                 goto err;
78         }
79
80         if (xsegdev->segment) {
81                 XSEGLOG("destroying existing xsegdev segment");
82                 r = xsegdev_destroy_segment(xsegdev);
83                 if (r)
84                         goto err;
85         }
86
87         XSEGLOG("creating xsegdev segment size %llu", size);
88         r = xsegdev_create_segment(xsegdev, size, 1);
89         if (r)
90                 goto err;
91
92         xsegdev->segsize = size;
93         xsegdev_put(xsegdev);
94         return 0;
95
96 err:
97         return r;
98 }
99
100 static long xsegdev_deallocate(const char *name)
101 {
102         struct xsegdev *xsegdev = xsegdev_get(0);
103         int r = IS_ERR(xsegdev) ? PTR_ERR(xsegdev) : 0;
104         if (r)
105                 return r;
106
107         clear_bit(XSEGDEV_RESERVED, &xsegdev->flags);
108         XSEGLOG("destroying segment");
109         r = xsegdev_destroy_segment(xsegdev);
110         if (r)
111                 XSEGLOG("   ...failed");
112         xsegdev_put(xsegdev);
113         return r;
114 }
115
116 static long xseg_callback(void *arg);
117
118 static void *xsegdev_map(const char *name, uint64_t size)
119 {
120         struct xseg *xseg = NULL;
121         struct xsegdev *dev = xsegdev_get(0);
122         int r;
123         r = IS_ERR(dev) ? PTR_ERR(dev) : 0;
124         if (r)
125                 goto out;
126
127         if (!dev->segment)
128                 goto put_out;
129
130         if (size > dev->segsize)
131                 goto put_out;
132
133         if (dev->callback) /* in use */
134                 goto put_out;
135
136         dev->callback = xseg_callback;
137         dev->callarg = &xsegbd;
138         xseg = (void *)dev->segment;
139
140 put_out:
141         xsegdev_put(dev);
142 out:
143         return xseg;
144 }
145
146 static void xsegdev_unmap(void *ptr, uint64_t size)
147 {
148         struct xsegdev *xsegdev = xsegdev_get(0);
149         int r = IS_ERR(xsegdev) ? PTR_ERR(xsegdev) : 0;
150         if (r)
151                 return;
152
153         xsegdev->callarg = NULL;
154         xsegdev->callback = NULL;
155         xsegdev_put(xsegdev);
156 }
157
158 static struct xseg_type xseg_xsegdev = {
159         /* xseg operations */
160         {
161                 .malloc = xsegdev_malloc,
162                 .realloc = xsegdev_realloc,
163                 .mfree = xsegdev_mfree,
164                 .allocate = xsegdev_allocate,
165                 .deallocate = xsegdev_deallocate,
166                 .map = xsegdev_map,
167                 .unmap = xsegdev_unmap
168         },
169         /* name */
170         "xsegdev"
171 };
172
173 static int posix_signal_init(void)
174 {
175         return 0;
176 }
177
178 static void posix_signal_quit(void) { }
179
180 static int posix_prepare_wait(struct xseg_port *port)
181 {
182         return 0;
183 }
184
185 static int posix_cancel_wait(struct xseg_port *port)
186 {
187         return 0;
188 }
189
190 static int posix_wait_signal(struct xseg_port *port, uint32_t timeout)
191 {
192         return 0;
193 }
194
195 static int posix_signal(struct xseg_port *port)
196 {
197         struct pid *pid;
198         struct task_struct *task;
199         int ret = -ENOENT;
200
201         rcu_read_lock();
202         pid = find_vpid((pid_t)port->waitcue);
203         if (!pid)
204                 goto out;
205         task = pid_task(pid, PIDTYPE_PID);
206         if (!task)
207                 goto out;
208
209         ret = send_sig(SIGIO, task, 1);
210 out:
211         rcu_read_unlock();
212         return ret;
213 }
214
215 static void *posix_malloc(uint64_t size)
216 {
217         return NULL;
218 }
219
220 static void *posix_realloc(void *mem, uint64_t size)
221 {
222         return NULL;
223 }
224
225 static void posix_mfree(void *mem) { }
226
227 static struct xseg_peer xseg_peer_posix = {
228         /* xseg signal operations */
229         {
230                 .signal_init = posix_signal_init,
231                 .signal_quit = posix_signal_quit,
232                 .cancel_wait = posix_cancel_wait,
233                 .prepare_wait = posix_prepare_wait,
234                 .wait_signal = posix_wait_signal,
235                 .signal = posix_signal,
236                 .malloc = posix_malloc,
237                 .realloc = posix_realloc,
238                 .mfree = posix_mfree
239         },
240         /* name */
241         "posix"
242 };
243
244 static int xsegdev_signal_init(void)
245 {
246         return 0;
247 }
248
249 static void xsegdev_signal_quit(void) { }
250
251 static int xsegdev_prepare_wait(struct xseg_port *port)
252 {
253         return -1;
254 }
255
256 static int xsegdev_cancel_wait(struct xseg_port *port)
257 {
258         return -1;
259 }
260
261 static int xsegdev_wait_signal(struct xseg_port *port, uint32_t timeout)
262 {
263         return -1;
264 }
265
266 static int xsegdev_signal(struct xseg_port *port)
267 {
268         return -1;
269 }
270
271 static struct xseg_peer xseg_peer_xsegdev = {
272         /* xseg signal operations */
273         {
274                 .signal_init = xsegdev_signal_init,
275                 .signal_quit = xsegdev_signal_quit,
276                 .cancel_wait = xsegdev_cancel_wait,
277                 .prepare_wait = xsegdev_prepare_wait,
278                 .wait_signal = xsegdev_wait_signal,
279                 .signal = xsegdev_signal,
280                 .malloc = xsegdev_malloc,
281                 .realloc = xsegdev_realloc,
282                 .mfree = xsegdev_mfree
283         },
284         /* name */
285         "xsegdev"
286 };
287
288 /* ************************* */
289 /* ** XSEG Initialization ** */
290 /* ************************* */
291
292 int xsegbd_xseg_init(struct xsegbd *dev)
293 {
294         struct xseg_port *xport;
295         int r;
296
297         if (!dev->name[0])
298                 strncpy(dev->name, name, XSEGBD_VOLUME_NAMELEN);
299
300         XSEGLOG("registering xseg types");
301         dev->namesize = strlen(dev->name);
302         r = xseg_register_type(&xseg_xsegdev);
303         if (r)
304                 goto err0;
305
306         r = xseg_register_peer(&xseg_peer_posix);
307         if (r)
308                 goto err1;
309
310         r = xseg_register_peer(&xseg_peer_xsegdev);
311         if (r)
312                 goto err2;
313
314         r = xseg_initialize("xsegdev");
315         if (r) {
316                 XSEGLOG("cannot initialize 'xsegdev' peer");
317                 goto err3;
318         }
319
320         r = xseg_parse_spec(spec, &dev->config);
321         if (r)
322                 goto err3;
323
324         if (strncmp(dev->config.type, "xsegdev", 16))
325                 XSEGLOG("WARNING: unexpected segment type '%s' vs 'xsegdev'",
326                          dev->config.type);
327
328         XSEGLOG("creating segment");
329         r = xseg_create(&dev->config);
330         if (r) {
331                 XSEGLOG("cannot create segment");
332                 goto err3;
333         }
334
335         XSEGLOG("joining segment");
336         dev->xseg = xseg_join("xsegdev", "xsegbd");
337         if (!dev->xseg) {
338                 XSEGLOG("cannot join segment");
339                 r = -EFAULT;
340                 goto err3;
341         }
342
343         XSEGLOG("binding to source port %u (destination %u)",
344                  src_portno, dst_portno);
345         xport = xseg_bind_port(dev->xseg, src_portno);
346         if (!xport) {
347                 XSEGLOG("cannot bind to port");
348                 dev->xseg = NULL;
349                 r = -EFAULT;
350                 goto err3;
351         }
352         dev->src_portno = xseg_portno(dev->xseg, xport);
353         dev->dst_portno = dst_portno;
354
355         if (nr_requests > dev->xseg->config.nr_requests)
356                 nr_requests = dev->xseg->config.nr_requests;
357
358         if (xseg_alloc_requests(dev->xseg, src_portno, nr_requests)) {
359                 XSEGLOG("cannot allocate requests");
360                 dev->xseg = NULL;
361                 r = -EFAULT;
362                 goto err3;
363         }
364
365         return 0;
366 err3:
367         xseg_unregister_peer(xseg_peer_xsegdev.name);
368 err2:
369         xseg_unregister_peer(xseg_peer_posix.name);
370 err1:
371         xseg_unregister_type(xseg_xsegdev.name);
372 err0:
373         return r;
374 }
375
376 int xsegbd_xseg_quit(struct xsegbd *dev)
377 {
378         xseg_destroy(dev->xseg);
379         dev->xseg = NULL;
380         return 0;
381 }
382
383
384 /* ***************************** */
385 /* ** Block Device Operations ** */
386 /* ***************************** */
387
388 static int xsegbd_open(struct block_device *bdev, fmode_t mode)
389 {
390         int ret = down_interruptible(&xsegbd_lock);
391         if (ret == 0) {
392                 count ++;
393                 up(&xsegbd_lock);
394         }
395         return ret;
396 }
397
398 static int xsegbd_release(struct gendisk *gd, fmode_t mode)
399 {
400         int ret = down_interruptible(&xsegbd_lock);
401         if (ret == 0) {
402                 count --;
403                 up(&xsegbd_lock);
404         }
405         return ret;
406 }
407
408 static int xsegbd_ioctl(struct block_device *bdev, fmode_t mode,
409                         unsigned int cmd, unsigned long arg)
410 {
411         return -ENOTTY;
412 }
413
414 static const struct block_device_operations xsegbd_ops = {
415         .owner          = THIS_MODULE,
416         .open           = xsegbd_open,
417         .release        = xsegbd_release,
418         .ioctl          = xsegbd_ioctl 
419 };
420
421
422 /* *************************** */
423 /* ** Device Initialization ** */
424 /* *************************** */
425
426 static void xseg_request_fn(struct request_queue *rq);
427
428 static int xsegbd_dev_init(struct xsegbd *dev, int id, sector_t size)
429 {
430         int ret = -ENOMEM;
431         struct gendisk *disk;
432         unsigned int max_request_size_bytes;
433
434         spin_lock_init(&dev->lock);
435
436         dev->id = id;
437         ret = xsegbd_xseg_init(dev);
438         if (ret < 0)
439                 goto out;
440
441         dev->blk_queue = blk_alloc_queue(GFP_KERNEL);
442         if (!dev->blk_queue)
443                 goto free_xseg;
444
445         blk_init_allocated_queue(dev->blk_queue, xseg_request_fn, &dev->lock);
446         dev->blk_queue->queuedata = dev;
447
448         blk_queue_flush(dev->blk_queue, REQ_FLUSH | REQ_FUA);
449         blk_queue_logical_block_size(dev->blk_queue, 512);
450         blk_queue_physical_block_size(dev->blk_queue, blksize);
451         blk_queue_bounce_limit(dev->blk_queue, BLK_BOUNCE_ANY);
452         
453         //blk_queue_max_segments(dev->blk_queue, 512);
454         /* calculate maximum block request size
455          * request size in pages * page_size
456          * leave one page in buffer for name
457          */
458         max_request_size_bytes = (unsigned int) (dev->config.request_size -1) * ( 1 << dev->config.page_shift) ;
459         blk_queue_max_hw_sectors(dev->blk_queue, max_request_size_bytes >> 9);
460         blk_queue_max_segment_size(dev->blk_queue, max_request_size_bytes);
461         blk_queue_io_min(dev->blk_queue, max_request_size_bytes);
462         blk_queue_io_opt(dev->blk_queue, max_request_size_bytes);
463
464         queue_flag_set_unlocked(QUEUE_FLAG_NONROT, dev->blk_queue);
465
466         /* vkoukis says we don't need partitions */
467         dev->gd = disk = alloc_disk(1);
468         if (!disk)
469                 goto out_free_queue;
470
471         disk->major = major;
472         disk->first_minor = id * XSEGBD_MINORS;
473         disk->fops = &xsegbd_ops;
474         disk->queue = dev->blk_queue;
475         disk->private_data = dev;
476         disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO;
477         snprintf(disk->disk_name, 32, "xsegbd%c", 'a' + id);
478
479
480         if (!xq_alloc_seq(&dev->blk_queue_pending, nr_requests, nr_requests))
481                 goto out_free_disk;
482
483         dev->blk_req_pending = kmalloc(sizeof(struct request *) * nr_requests, GFP_KERNEL);
484         if (!dev->blk_req_pending)
485                 goto out_free_pending;
486
487         dev->sectors = size;
488         set_capacity(disk, dev->sectors);
489
490         add_disk(disk); /* immediately activates the device */
491
492 out:
493         return ret;
494
495 out_free_pending:
496         xq_free(&dev->blk_queue_pending);
497
498 out_free_disk:
499         put_disk(disk);
500
501 out_free_queue:
502         blk_cleanup_queue(dev->blk_queue);
503
504 free_xseg:
505         xsegbd_xseg_quit(dev);
506         goto out;
507 }
508
509 static int xsegbd_dev_destroy(struct xsegbd *dev)
510 {
511         xq_free(&dev->blk_queue_pending);
512         kfree(dev->blk_req_pending);
513         del_gendisk(dev->gd);
514         put_disk(dev->gd);
515         blk_cleanup_queue(dev->blk_queue);
516         xsegbd_xseg_quit(dev);
517         return 0;
518 }
519
520
521 /* *************************** */
522 /* ** Module Initialization ** */
523 /* *************************** */
524
525 static int __init xsegbd_init(void)
526 {
527         int ret;
528
529         sema_init(&xsegbd_lock, 1);
530
531         XSEGLOG("registering block device major %d", major);
532         ret = register_blkdev(major, XSEGBD_NAME);
533         if (ret < 0) {
534                 XSEGLOG("cannot register block device!");
535                 ret = -EBUSY;
536                 goto out;
537         }
538         major = ret;
539         XSEGLOG("registered block device major %d", major);
540
541         XSEGLOG("initializing device");
542         ret = xsegbd_dev_init(&xsegbd, 0, sector_size);
543         if (ret < 0) {
544                 XSEGLOG("cannot initialize device!");
545                 goto unregister;
546         }
547
548         XSEGLOG("initialization complete");
549 out:
550         return ret;
551
552 unregister:
553         unregister_blkdev(major, XSEGBD_NAME);
554         goto out;
555 }
556
557 static void __exit xsegbd_exit(void)
558 {
559         unregister_blkdev(major, XSEGBD_NAME);
560
561         xseg_disable_driver(xsegbd.xseg, "posix");
562         xseg_unregister_peer("posix");
563         xseg_disable_driver(xsegbd.xseg, "xsegdev");
564         xseg_unregister_peer("xsegdev");
565
566         xsegbd_dev_destroy(&xsegbd);
567         xseg_unregister_type("xsegdev");
568 }
569
570 module_init(xsegbd_init);
571 module_exit(xsegbd_exit);
572
573
574 /* ******************* */
575 /* ** Critical Path ** */
576 /* ******************* */
577
578 static void blk_to_xseg(struct xseg *xseg, struct xseg_request *xreq,
579                         struct request *blkreq)
580 {
581         struct bio_vec *bvec;
582         struct req_iterator iter;
583         uint64_t off = 0;
584         char *data = XSEG_TAKE_PTR(xreq->data, xseg->segment);
585         rq_for_each_segment(bvec, blkreq, iter) {
586                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
587                 memcpy(data + off, bdata, bvec->bv_len);
588                 off += bvec->bv_len;
589                 kunmap_atomic(bdata);
590         }
591 }
592
593 static void xseg_to_blk(struct xseg *xseg, struct xseg_request *xreq,
594                         struct request *blkreq)
595 {
596         struct bio_vec *bvec;
597         struct req_iterator iter;
598         uint64_t off = 0;
599         char *data = XSEG_TAKE_PTR(xreq->data, xseg->segment);
600         rq_for_each_segment(bvec, blkreq, iter) {
601                 char *bdata = kmap_atomic(bvec->bv_page) + bvec->bv_offset;
602                 memcpy(bdata, data + off, bvec->bv_len);
603                 off += bvec->bv_len;
604                 kunmap_atomic(bdata);
605         }
606 }
607
608 static void xseg_request_fn(struct request_queue *rq)
609 {
610         struct xseg_request *xreq;
611         struct xsegbd *dev = rq->queuedata;
612         struct request *blkreq;
613         xqindex blkreq_idx;
614         char *name;
615         uint64_t datasize;
616
617         for (;;) {
618                 xreq = xseg_get_request(dev->xseg, dev->src_portno);
619                 if (!xreq)
620                         break;
621
622                 blkreq = blk_fetch_request(rq);
623                 if (!blkreq)
624                         break;
625
626                 if (blkreq->cmd_type != REQ_TYPE_FS) {
627                         XSEGLOG("non-fs cmd_type: %u. *shrug*", blkreq->cmd_type);
628                         __blk_end_request_all(blkreq, 0);
629                 }
630
631                 datasize = blk_rq_bytes(blkreq);
632                 BUG_ON(xreq->buffersize - dev->namesize < datasize);
633                 BUG_ON(xseg_prep_request(xreq, dev->namesize, datasize));
634
635                 name = XSEG_TAKE_PTR(xreq->name, dev->xseg->segment);
636                 strncpy(name, dev->name, dev->namesize);
637                 blkreq_idx = xq_pop_head(&dev->blk_queue_pending);
638                 BUG_ON(blkreq_idx == None);
639                 /* WARN_ON(dev->blk_req_pending[blkreq_idx] */
640                 dev->blk_req_pending[blkreq_idx] = blkreq;
641                 xreq->priv = (void *)(unsigned long)blkreq_idx;
642                 xreq->size = datasize;
643                 xreq->offset = blk_rq_pos(blkreq) << 9;
644                 /*
645                 if (xreq->offset >= (sector_size << 9))
646                         XSEGLOG("sector offset: %lu > %lu, flush:%u, fua:%u",
647                                  blk_rq_pos(blkreq), sector_size,
648                                  blkreq->cmd_flags & REQ_FLUSH,
649                                  blkreq->cmd_flags & REQ_FUA);
650                 */
651
652                 if (blkreq->cmd_flags & REQ_FLUSH)
653                         xreq->flags |= XF_FLUSH;
654
655                 if (blkreq->cmd_flags & REQ_FUA)
656                         xreq->flags |= XF_FUA;
657
658                 if (rq_data_dir(blkreq)) {
659                         /* unlock for data transfers? */
660                         blk_to_xseg(dev->xseg, xreq, blkreq);
661                         xreq->op = X_WRITE;
662                 } else {
663                         xreq->op = X_READ;
664                 }
665
666                 BUG_ON(xseg_submit(dev->xseg, dev->dst_portno, xreq) == NoSerial);
667         }
668         //This is going to happen at least once.
669         //TODO find out why it happens more than once.
670         WARN_ON(xseg_signal(dev->xseg, dev->dst_portno));
671         if (xreq)
672                 xseg_put_request(dev->xseg, dev->src_portno, xreq);
673 }
674
675 static long xseg_callback(void *arg)
676 {
677         struct xsegbd *dev = arg;
678         struct xseg_request *xreq;
679         struct request *blkreq;
680         unsigned long flags;
681         xqindex blkreq_idx;
682         int err;
683
684         for (;;) {
685                 xreq = xseg_receive(dev->xseg, dev->src_portno);
686                 if (!xreq)
687                         break;
688
689                 /* we rely upon our peers to not have touched ->priv */
690                 blkreq_idx = (xqindex)(unsigned long)xreq->priv;
691                 if (blkreq_idx < 0 || blkreq_idx >= nr_requests) {
692                         XSEGLOG("invalid request index: %u! Ignoring.", blkreq_idx);
693                         goto xseg_put;
694                 }
695
696                 blkreq = dev->blk_req_pending[blkreq_idx];
697                 /* WARN_ON(!blkreq); */
698                 err = -EIO;
699
700                 if (!(xreq->state & XS_SERVED))
701                         goto blk_end;
702
703                 if (xreq->serviced != blk_rq_bytes(blkreq))
704                         goto blk_end;
705
706                 /* unlock for data transfer? */
707                 if (!rq_data_dir(blkreq))
708                         xseg_to_blk(dev->xseg, xreq, blkreq);
709
710                 err = 0;
711 blk_end:
712                 blk_end_request_all(blkreq, err);
713                 xq_append_head(&dev->blk_queue_pending, blkreq_idx);
714 xseg_put:
715                 xseg_put_request(dev->xseg, xreq->portno, xreq);
716         }
717
718         spin_lock_irqsave(&dev->lock, flags);
719         xseg_request_fn(dev->blk_queue);
720         spin_unlock_irqrestore(&dev->lock, flags);
721         return 0;
722 }
723
724