Merge branch 'feature-python' into xseg-refactor
[archipelago] / xseg / tools / archipelago
1 #!/usr/bin/env python
2 # archipelagos tool
3
4 # Copyright 2012 GRNET S.A. All rights reserved.
5 #
6 # Redistribution and use in source and binary forms, with or
7 # without modification, are permitted provided that the following
8 # conditions are met:
9 #
10 #   1. Redistributions of source code must retain the above
11 #      copyright notice, this list of conditions and the following
12 #      disclaimer.
13 #   2. Redistributions in binary form must reproduce the above
14 #      copyright notice, this list of conditions and the following
15 #      disclaimer in the documentation and/or other materials
16 #      provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
30 #
31 # The views and conclusions contained in the software and
32 # documentation are those of the authors and should not be
33 # interpreted as representing official policies, either expressed
34 # or implied, of GRNET S.A.
35 #
36
37
38 from xseg.xseg_api import *
39 from xseg.xprotocol import *
40 from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \
41     create_string_buffer, pointer, sizeof, POINTER, c_char_p, c_char, byref
42 cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
43
44 import os, sys, subprocess, argparse, time, psutil, signal, errno
45 from subprocess import call, check_call, Popen, PIPE
46
47 DEFAULTS='/etc/default/archipelago'
48 VLMC_LOCK_FILE='/tmp/vlmc.lock'
49 ARCHIP_PREFIX='archip_'
50
51 #system defaults
52 PIDFILE_PATH="/var/run/archipelago"
53 LOGS_PATH="/var/log/archipelago"
54 DEVICE_PREFIX="/dev/xsegbd"
55 XSEGBD_SYSFS="/sys/bus/xsegbd/"
56
57 CHARDEV_NAME="/dev/segdev"
58 CHARDEV_MAJOR=60
59 CHARDEV_MINOR=0
60
61 REQS=512
62
63 FILE_BLOCKER='mt-pfiled'
64 RADOS_BLOCKER='mt-sosd'
65 MAPPER='mt-mapperd'
66 VLMC='st-vlmcd'
67 BLOCKER=''
68
69 available_storage = {'files': FILE_BLOCKER, 'rados': RADOS_BLOCKER}
70
71 peers = []
72 modules = ["xseg", "segdev", "xseg_posix", "xseg_pthread", "xseg_segdev"]
73 xsegbd = "xsegbd"
74
75 XSEGBD_START=0
76 XSEGBD_END=199
77 VPORT_START=200
78 VPORT_END=399
79 BPORT=500
80 MPORT=501
81 MBPORT=502
82 VTOOL=503
83 #RESERVED 511
84
85 #default config
86 SPEC="segdev:xsegbd:512:2048:12"
87
88 NR_OPS_BLOCKERB=""
89 NR_OPS_BLOCKERM=""
90 NR_OPS_VLMC=""
91 NR_OPS_MAPPER=""
92
93 VERBOSITY_BLOCKERB=""
94 VERBOSITY_BLOCKERM=""
95 VERBOSITY_MAPPER=""
96 VERBOSITY_VLMC=""
97
98
99 #mt-pfiled specific options
100 FILED_IMAGES=""
101 FILED_MAPS=""
102 PITHOS=""
103 PITHOSMAPS=""
104
105 #mt-sosd specific options
106 RADOS_POOL_MAPS=""
107 RADOS_POOL_BLOCKS=""
108
109 FIRST_COLUMN_WIDTH = 23
110 SECOND_COLUMN_WIDTH = 23
111
112 def green(s):
113     return '\x1b[32m' + s + '\x1b[0m'
114
115 def red(s):
116     return '\x1b[31m' + s + '\x1b[0m'
117
118 def yellow(s):
119     return '\x1b[33m' + s + '\x1b[0m'
120
121 def pretty_print(cid, status):
122     sys.stdout.write(cid.ljust(FIRST_COLUMN_WIDTH))
123     sys.stdout.write(status.ljust(SECOND_COLUMN_WIDTH))
124     sys.stdout.write('\n')
125     return
126
127 def check_conf():
128     def isExec(file_path):
129         return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
130
131     def validExec(program):
132         for path in os.environ["PATH"].split(os.pathsep):
133             exe_file = os.path.join(path, program)
134             if isExec(exe_file):
135                 return True
136         return False
137
138
139     def validPort(port, limit, name):
140         try:
141             if int(port) >= limit:
142                 print red(str(port) + " >= " + limit)
143                 return False
144         except:
145             print red("Invalid port "+name+" : " + str(port))
146             return False
147
148         return True
149
150
151     if not LOGS_PATH:
152         print red("LOGS_PATH is not set")
153         return False
154     if not PIDFILE_PATH:
155         print red("PIDFILE_PATH is not set")
156         return False
157
158     try:
159         if not os.path.isdir(str(LOGS_PATH)):
160             print red("LOGS_PATH "+str(LOGS_PATH)+" does not exist")
161             return False
162     except:
163         print red("LOGS_PATH doesn't exist or is not a directory")
164         return False
165
166     try:
167         os.makedirs(str(PIDFILE_PATH))
168     except OSError as e:
169         if e.errno == errno.EEXIST:
170             if os.path.isdir(str(PIDFILE_PATH)):
171                 pass
172             else:
173                 print red(str(PIDFILE_PATH) + " is not a directory")
174                 return False
175         else:
176             print red("Cannot create " + str(PIDFILE_PATH))
177             return False
178     except:
179         print red("PIDFILE_PATH is not set")
180         return False
181
182     splitted_spec = str(SPEC).split(':')
183     if len(splitted_spec) < 5:
184         print red("Invalid spec")
185         return False
186
187     xseg_type=splitted_spec[0]
188     xseg_name=splitted_spec[1]
189     xseg_ports=int(splitted_spec[2])
190     xseg_heapsize=int(splitted_spec[3])
191     xseg_align=int(splitted_spec[4])
192
193     if xseg_type != "segdev":
194         print red("Segment type not segdev")
195         return False
196     if xseg_name != "xsegbd":
197         print red("Segment name not equal xsegbd")
198         return False
199     if xseg_align != 12:
200         print red("Wrong alignemt")
201         return False
202
203     for v in [VERBOSITY_BLOCKERB, VERBOSITY_BLOCKERM, VERBOSITY_MAPPER,
204                     VERBOSITY_VLMC]:
205          if v is None:
206              print red("Verbosity missing")
207          try:
208              if (int(v) > 3 or int(v) < 0):
209                  print red("Invalid verbosity " + str(v))
210                  return False
211          except:
212              print red("Invalid verbosity " + str(v))
213              return False
214
215     for n in [NR_OPS_BLOCKERB, NR_OPS_BLOCKERM, NR_OPS_VLMC, NR_OPS_MAPPER]:
216          if n is None:
217              print red("Nr ops missing")
218          try:
219              if (int(n) <= 0):
220                  print red("Invalid nr_ops " + str(n))
221                  return False
222          except:
223              print red("Invalid nr_ops " + str(n))
224              return False
225
226     if not validPort(VTOOL, xseg_ports, "VTOOL"):
227         return False
228     if not validPort(MPORT, xseg_ports, "MPORT"):
229         return False
230     if not validPort(BPORT, xseg_ports, "BPORT"):
231         return False
232     if not validPort(MBPORT, xseg_ports, "MBPORT"):
233         return False
234     if not validPort(VPORT_START, xseg_ports, "VPORT_START"):
235         return False
236     if not validPort(VPORT_END, xseg_ports, "VPORT_END"):
237         return False
238     if not validPort(XSEGBD_START, xseg_ports, "XSEGBD_START"):
239         return False
240     if not validPort(XSEGBD_END, xseg_ports, "XSEGBD_END"):
241         return False
242
243     if not XSEGBD_START < XSEGBD_END:
244         print red("XSEGBD_START should be less than XSEGBD_END")
245         return False
246     if not VPORT_START < VPORT_END:
247         print red("VPORT_START should be less than VPORT_END")
248         return False
249 #TODO check than no other port is set in the above ranges
250
251     global BLOCKER
252     try:
253         BLOCKER = available_storage[str(STORAGE)]
254     except:
255         print red("Invalid storage " + str(STORAGE))
256         print "Available storage: \"" + ', "'.join(available_storage) + "\""
257         return False
258
259     if STORAGE=="files":
260         if FILED_IMAGES and not os.path.isdir(str(FILED_IMAGES)):
261              print red("FILED_IMAGES invalid")
262              return False
263         if FILED_MAPS and not os.path.isdir(str(FILED_MAPS)):
264              print red("FILED_PATH invalid")
265              return False
266         if PITHOS and not os.path.isdir(str(PITHOS)):
267              print red("PITHOS invalid ")
268              return False
269         if PITHOSMAPS and not os.path.isdir(str(PITHOSMAPS)):
270              print red("PITHOSMAPS invalid")
271              return False
272
273     for p in [BLOCKER, MAPPER, VLMC]:
274         if not validExec(p):
275             print red(p + "is not a valid executable")
276             return False
277
278     return True
279
280 def construct_peers():
281     if BLOCKER == "pfiled":
282         peer_blockerb = [BLOCKER,
283                 ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
284                  str(PITHOS), str(FILED_IMAGES), "-d",
285                 "-f", os.path.join(PIDFILE_PATH, "blockerb.pid")],
286                 "blockerb"]
287         peer_blockerm = [BLOCKER,
288                 ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
289                 str(PITHOSMAPS), str(FILED_MAPS), "-d",
290                 "-f", os.path.join(PIDFILE_PATH, "blockerm.pid")],
291                 "blockerm" ]
292     elif BLOCKER == "mt-sosd":
293         peer_blockerb = [BLOCKER,
294                 ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
295                  "--pool", str(RADOS_POOL_BLOCKS), "-v", str(VERBOSITY_BLOCKERB),
296                  "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
297                  "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
298                  "-t", "3"],
299                  "blockerb"]
300         peer_blockerm = [BLOCKER,
301                 ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
302                  "--pool", str(RADOS_POOL_MAPS), "-v", str(VERBOSITY_BLOCKERM),
303                  "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
304                  "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
305                  "-t", "3"],
306                  "blockerm"]
307     elif BLOCKER == "mt-pfiled":
308         peer_blockerb = [BLOCKER,
309                 ["-p" , str(BPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERB),
310                  "--pithos", str(PITHOS), "--archip", str(FILED_IMAGES),
311              "-v", str(VERBOSITY_BLOCKERB),
312                  "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerb.pid"),
313                  "-l", os.path.join(str(LOGS_PATH), "blockerb.log"),
314                  "-t", str(NR_OPS_BLOCKERB), "--prefix", ARCHIP_PREFIX],
315                  "blockerb"]
316         peer_blockerm = [BLOCKER,
317                 ["-p" , str(MBPORT), "-g", str(SPEC), "-n", str(NR_OPS_BLOCKERM),
318                  "--pithos", str(PITHOSMAPS), "--archip", str(FILED_MAPS),
319              "-v", str(VERBOSITY_BLOCKERM),
320                  "-d", "--pidfile", os.path.join(PIDFILE_PATH, "blockerm.pid"),
321                  "-l", os.path.join(str(LOGS_PATH), "blockerm.log"),
322                  "-t", str(NR_OPS_BLOCKERM), "--prefix", ARCHIP_PREFIX],
323                  "blockerm"]
324     else:
325             sys.exit(-1)
326
327     peer_vlmcd = [VLMC,
328              ["-t" , "1", "-sp",  str(VPORT_START), "-ep", str(VPORT_END),
329               "-g", str(SPEC), "-n", str(NR_OPS_VLMC), "-bp", str(BPORT),
330               "-mp", str(MPORT), "-d", "-v", str(VERBOSITY_VLMC),
331               "--pidfile", os.path.join(PIDFILE_PATH, "vlmcd.pid"),
332               "-l", os.path.join(str(LOGS_PATH), "vlmcd.log")
333               ], "vlmcd"]
334     peer_mapperd = [MAPPER,
335              ["-t" , "1", "-p",  str(MPORT), "-mbp", str(MBPORT),
336               "-g", str(SPEC), "-n", str(NR_OPS_MAPPER), "-bp", str(BPORT),
337               "--pidfile", os.path.join(PIDFILE_PATH, "mapperd.pid"),
338               "-v", str(VERBOSITY_MAPPER), "-d",
339               "-l", os.path.join(str(LOGS_PATH), "mapperd.log")
340               ], "mapperd"]
341
342     peers = []
343     peers.append(peer_blockerb)
344     peers.append(peer_blockerm)
345     peers.append(peer_vlmcd)
346     peers.append(peer_mapperd)
347
348     return peers
349
350
351 def exclusive(fn):
352     def exclusive_args(args):
353         while True:
354             try:
355                 fd = os.open(VLMC_LOCK_FILE, os.O_CREAT|os.O_EXCL|os.O_WRONLY)
356                 break;
357             except OSError, (err, reason):
358                 print >> sys.stderr, reason
359                 if err == errno.EEXIST:
360                     time.sleep(0.05)
361                 else:
362                     raise OSError(err, VLMC_LOCK_FILE + ' ' + reason)
363         try:
364             r = fn(args)
365         finally:
366             os.close(fd)
367             os.unlink(VLMC_LOCK_FILE)
368         return r
369
370     return exclusive_args
371
372 def loadrc(rc):
373     try:
374         if rc == None:
375             execfile(os.path.expanduser(DEFAULTS), globals())
376         else:
377             execfile(rc, globals())
378     except:
379         sys.stderr.write("Cannot read config file\n")
380         sys.exit(1)
381
382     if not check_conf():
383         sys.exit(1)
384
385 def loaded_modules():
386     lines = open("/proc/modules").read().split("\n")
387     modules = [f.split(" ")[0] for f in lines]
388     return modules
389
390 def loaded_module(name):
391     return name in loaded_modules()
392
393 def load_module(name, args):
394     s = "Loading %s " % name
395     sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
396     modules = loaded_modules()
397     if name in modules:
398         sys.stdout.write(yellow("Already loaded".ljust(SECOND_COLUMN_WIDTH)))
399         sys.stdout.write("\n")
400         return 0
401     cmd = ["modprobe", "%s" % name]
402     if args:
403         for arg in args:
404             cmd.extend(["%s=%s" % (arg)])
405     try:
406         check_call(cmd, shell=False);
407     except Exception:
408         sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
409         sys.stdout.write("\n")
410         return -1
411     sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
412     sys.stdout.write("\n")
413     return 0
414
415 def unload_module(name):
416     s = "Unloading %s " % name
417     sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
418     modules = loaded_modules()
419     if name not in modules:
420         sys.stdout.write(yellow("Not loaded".ljust(SECOND_COLUMN_WIDTH)))
421         sys.stdout.write("\n")
422         return 0
423     cmd = ["modprobe -r %s" % name]
424     try:
425         check_call(cmd, shell=True);
426     except Exception:
427         sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
428         sys.stdout.write("\n")
429         return -1
430     sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
431     sys.stdout.write("\n")
432     return 0
433
434 def create_segment():
435     #fixme blocking....
436     cmd = ["xseg", str(SPEC), "create"]
437     try:
438         check_call(cmd, shell=False);
439     except Exception:
440             sys.stderr.write(red("Cannot create segment. \n"))
441             return -1
442     return 0
443
444 def destroy_segment():
445     #fixme blocking....
446     cmd = ["xseg", str(SPEC), "destroy"]
447     try:
448         check_call(cmd, shell=False);
449     except Exception:
450             sys.stderr.write(red("Cannot destroy segment. \n"))
451             return 0
452     return 0
453
454 def check_running(name, pid = -1):
455     for p in psutil.process_iter():
456         if p.name == name:
457             if pid != -1:
458                 if pid == p.pid:
459                     return pid
460             else:
461                 return pid
462     return -1
463
464 def check_pidfile(name):
465     pidfile = os.path.join(PIDFILE_PATH, name + ".pid")
466     pf = None
467     try:
468         pf = open(pidfile, "r")
469         pid = int(pf.read())
470         pf.close()
471     except:
472         if pf:
473             pf.close()
474         return -1
475
476     return pid
477
478 def start_peer(peer):
479     cmd = [peer[0]] + peer[1]
480     s = "Starting %s " % peer[2]
481     sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
482     try:
483         check_call(cmd, shell=False);
484     except Exception:
485         sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
486         sys.stdout.write("\n")
487         return -1
488     sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
489     sys.stdout.write("\n")
490     return 0
491
492 def stop_peer(peer):
493     pid = check_pidfile(peer[2])
494     if pid < 0:
495         pretty_print(peer[2], yellow("not running"))
496         return -1
497
498     s = "Stopping %s " % peer[2]
499     sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
500     os.kill(pid, signal.SIGTERM)
501     i = 0
502     while check_running(peer[0], pid) > 0:
503         time.sleep(0.1)
504         i += 1
505         if i > 150:
506             sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
507             sys.stdout.write("\n")
508             return -1
509     sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
510     sys.stdout.write("\n")
511     return 0
512
513 def peer_running(peer):
514     pid = check_pidfile(peer[2])
515     if pid < 0:
516         return -1
517
518     r = check_running(peer[0], pid)
519     if r < 0:
520         pretty_print(peer[2], yellow("Has valid pidfile but does not seem to be active"))
521     return 0
522
523
524 def make_segdev():
525     try:
526         os.stat(str(CHARDEV_NAME))
527         return -2
528     except:
529         pass
530     cmd = ["mknod", str(CHARDEV_NAME), "c", str(CHARDEV_MAJOR), str(CHARDEV_MINOR)]
531     print ' '.join(cmd)
532     try:
533         check_call(cmd, shell=False);
534     except Exception:
535         sys.stderr.write(red("Segdev device creation failed.\n"))
536         return -1
537     return 0
538
539 def remove_segdev():
540     try:
541         os.stat(str(CHARDEV_NAME))
542     except:
543         return -2
544     try:
545         os.unlink(str(CHARDEV_NAME))
546     except:
547         sys.stderr.write(red("Segdev device removal failed.\n"))
548         return -1
549
550 def start_peers(peers):
551     for m in modules:
552         if not loaded_module(m):
553             print red("Cannot start userspace peers. " + m + " module not loaded")
554             return -1
555     for p in peers:
556         if start_peer(p) < 0:
557             return -1
558     return 0
559
560 def stop_peers(peers):
561     for p in reversed(peers):
562         stop_peer(p)
563     return 0
564
565 def start(args):
566     if args.user:
567         return start_peers(peers)
568
569     if status(args) > 0:
570         return -1
571
572     for m in modules:
573         if load_module(m, None) < 0:
574             stop(args)
575             return -1
576     time.sleep(0.5)
577
578     if make_segdev() < 0:
579         stop(args)
580         return -1
581
582     time.sleep(0.5)
583
584     if create_segment() < 0:
585         stop(args)
586         return -1
587
588     time.sleep(0.5)
589
590     if start_peers(peers) < 0:
591         stop(args)
592         return -1
593
594
595     if load_module(xsegbd, xsegbd_args) < 0:
596         stop(args)
597         return -1
598     return 0
599
600 def stop(args):
601     if args.user:
602         return stop_peers(peers)
603     #check devices
604     if vlmc_showmapped(args) > 0:
605         print "Cannot stop archipelago. Mapped volumes exist"
606         return -1
607     if unload_module(xsegbd):
608         return -1
609     r = 0
610
611     stop_peers(peers)
612
613     remove_segdev()
614
615     for m in reversed(modules):
616         unload_module(m)
617     return 0
618
619 def status(args):
620     r = 0
621     if vlmc_showmapped(args) >= 0:
622         r += 1
623     if loaded_module(xsegbd):
624         pretty_print(xsegbd, green('Loaded'))
625         r += 1
626     else:
627         pretty_print(xsegbd, red('Not loaded'))
628     for m in reversed(modules):
629         if loaded_module(m):
630             pretty_print(m, green('Loaded'))
631             r += 1
632         else:
633             pretty_print(m, red('Not loaded'))
634     for p in reversed(peers):
635         if peer_running(p) < 0:
636             pretty_print(p[0], red('not running'))
637         else:
638             pretty_print(p[0], green('running'))
639             r += 1
640     return r
641
642 def restart(args):
643     r = stop(args)
644     if r < 0:
645         return r
646     return start(args)
647
648 class Xseg_ctx(object):
649     ctx = None
650     port = None
651     portno = None
652
653     def __init__(self, spec, portno):
654         xseg_initialize()
655         xconf = xseg_config()
656         xseg_parse_spec(spec, xconf)
657         ctx = xseg_join(xconf.type, xconf.name, "posix", cast(0, cb_null_ptrtype))
658         if not ctx:
659             raise Exception("Cannot join segment")
660         port = xseg_bind_port(ctx, portno, c_void_p(0))
661         if not port:
662             raise Exception("Cannot bind to port")
663         xseg_init_local_signal(ctx, portno)
664         self.ctx = ctx
665         self.port = port
666         self.portno = portno
667
668
669     def __del__(self):
670         return
671
672     def __enter__(self):
673         if not self.ctx:
674             raise Exception("No segment")
675         return self
676
677     def __exit__(self, type_, value, traceback):
678         self.shutdown()
679         return False
680
681     def shutdown(self):
682         if self.ctx:
683             xseg_quit_local_signal(self.ctx, self.portno)
684             xseg_leave(self.ctx)
685         self.ctx = None
686
687 class Request(object):
688     xseg_ctx = None
689     req = None
690
691     def __init__(self, xseg_ctx, dst_portno, targetlen, datalen):
692         ctx = xseg_ctx.ctx
693         if not ctx:
694             raise Exception("No context")
695         req = xseg_get_request(ctx, xseg_ctx.portno, dst_portno, X_ALLOC)
696         if not req:
697             raise Exception("Cannot get request")
698         r = xseg_prep_request(ctx, req, targetlen, datalen)
699         if r < 0:
700             xseg_put_request(ctx, req, xseg_ctx.portno)
701             raise Exception("Cannot prepare request")
702 #        print hex(addressof(req.contents))
703         self.req = req
704         self.xseg_ctx = xseg_ctx
705         return
706
707     def __del__(self):
708         if self.req:
709             if xq_count(byref(self.req.contents.path)) == 0:
710                 xseg_put_request(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno)
711         self.req = None
712         return False
713
714     def __enter__(self):
715         if not self.req:
716             raise Exception("xseg request not set")
717         return self
718
719     def __exit__(self, type_, value, traceback):
720         if self.req:
721             if xq_count(byref(self.req.contents.path)) == 0:
722                 xseg_put_request(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno)
723         self.req = None
724         return False
725
726     def set_op(self, op):
727         self.req.contents.op = op
728
729     def get_op(self):
730         return self.req.contents.op
731
732     def set_offset(self, offset):
733         self.req.contents.offset = offset
734
735     def get_offset(self):
736         return self.req.contents.offset
737
738     def get_size(self):
739         return self.req.contents.size
740
741     def set_size(self, size):
742         self.req.contents.size = size
743
744     def set_flags(self, flags):
745         self.req.contents.flags = flags
746
747     def get_flags(self):
748         return self.req.contents.flags
749
750     def set_target(self, target):
751         """Sets the target of the request, respecting request's targetlen"""
752         if len(target) != self.req.contents.targetlen:
753             return False
754         c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
755         p_target = create_string_buffer(target)
756 #        print hex(addressof(c_target.contents))
757         memmove(c_target, p_target, len(target))
758         return True
759
760     def get_target(self):
761         """Return a string to the target of the request"""
762         c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
763 #        print "target_addr " + str(addressof(c_target.contents))
764         return string_at(c_target, self.req.contents.targetlen)
765
766     def set_data(self, data):
767         """Sets requests data. Data should be a xseg protocol structure""" 
768         if sizeof(data) != self.req.contents.datalen:
769             return False
770         c_data = xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req)
771         p_data = pointer(data)
772         memmove(c_data, p_data, self.req.contents.datalen)
773
774         return True
775
776     def get_data(self, _type):
777         """return a pointer to the data buffer of the request, casted to the
778         selected type"""
779 #        print "data addr " +  str(addressof(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req).contents))
780 #        ret = cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req), _type)
781 #        print addressof(ret.contents)
782 #        return ret
783         if _type:
784             return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),\
785                                                                  POINTER(_type))
786         else:
787             return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req), \
788                                                                        c_void_p)
789
790     def submit(self):
791         """Submit the associated xseg_request"""
792         p = xseg_submit(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno, X_ALLOC)
793         if p == NoPort:
794             raise Exception
795         xseg_signal(self.xseg_ctx.ctx, p)
796
797     def wait(self):
798         """Wait until the associated xseg_request is responded, discarding any
799         other requests that may be received in the meantime"""
800         while True:
801             received = xseg_receive(self.xseg_ctx.ctx, self.xseg_ctx.portno, 0)
802             if received:
803 #                print addressof(cast(self.req, c_void_p))
804 #                print addressof(cast(received, c_void_p))
805 #                print addressof(self.req.contents)
806 #                print addressof(received.contents)
807                 if addressof(received.contents) == addressof(self.req.contents):
808 #                if addressof(cast(received, c_void_p)) == addressof(cast(self.req, c_void_p)):
809                     break
810                 else:
811                     p = xseg_respond(self.xseg_ctx.ctx, received, self.xseg_ctx.portno, X_ALLOC)
812                     if p == NoPort:
813                         xseg_put_request(self.xseg_ctx.ctx, received,
814                                 self.xseg_ctx.portno)
815                     else:
816                         xseg_signal(self.xseg_ctx.ctx, p)
817             else:
818                 xseg_prepare_wait(self.xseg_ctx.ctx, self.xseg_ctx.portno)
819                 xseg_wait_signal(self.xseg_ctx.ctx, 10000000)
820                 xseg_cancel_wait(self.xseg_ctx.ctx, self.xseg_ctx.portno)
821         return True
822
823     def success(self):
824         return bool((self.req.contents.state & XS_SERVED) and not
825                 (self.req.contents.state & XS_FAILED))
826
827 @exclusive
828 def vlmc_showmapped(args):
829     try:
830         devices = os.listdir(os.path.join(XSEGBD_SYSFS, "devices/"))
831     except:
832         return -1
833
834     print "id\tpool\timage\tsnap\tdevice"
835     if not devices:
836         print "No volumes mapped\n"
837         return 0
838     try:
839         for f in devices:
840             d_id = open(XSEGBD_SYSFS + "devices/" + f + "/id").read().strip()
841             target = open(XSEGBD_SYSFS + "devices/"+ f + "/target").read().strip()
842
843             print "%s\t%s\t%s\t%s\t%s" % (d_id, '-', target, '-', DEVICE_PREFIX +
844             d_id)
845     except Exception, reason:
846         print >> sys.stderr, reason
847         return -2
848     return len(devices)
849
850 def vlmc_showmapped_wrapper(args):
851     r = vlmc_showmapped(args)
852     if r < 0:
853         return r
854     return 0
855
856
857 @exclusive
858 def vlmc_create(args):
859     name = args.name[0]
860     size = args.size
861     snap = args.snap
862
863     if len(name) < 6:
864         print >> sys.stderr, "Name should have at least len 6"
865         sys.exit(-1)
866     if size == None and snap == None:
867         print >> sys.stderr, "At least one of the size/snap args must be provided"
868         sys.exit(-1)
869
870     ret = False
871     xseg_ctx = Xseg_ctx(SPEC, VTOOL)
872     with Request(xseg_ctx, MPORT, len(name), sizeof(xseg_request_clone)) as req:
873         req.set_op(X_CLONE)
874         req.set_size(sizeof(xseg_request_clone))
875         req.set_offset(0)
876         req.set_target(name)
877
878         xclone = xseg_request_clone()
879         if snap:
880             xclone.target = snap
881             xclone.targetlen = len(snap)
882         else:
883             xclone.target = ""
884             xclone.targetlen = 0
885         if size:
886             xclone.size = size << 20
887         else:
888             xclone.size = -1
889
890         req.set_data(xclone)
891         req.submit()
892         req.wait()
893         ret = req.success()
894     xseg_ctx.shutdown()
895     if not ret:
896         sys.stderr.write("vlmc creation failed\n")
897         sys.exit(-1)
898
899 @exclusive
900 def vlmc_snapshot(args):
901     # snapshot
902     name = args.name[0]
903
904     if len(name) < 6:
905         print >> sys.stderr, "Name should have at least len 6"
906         sys.exit(-1)
907
908     ret = False
909     xseg_ctx = Xseg_ctx(SPEC, VTOOL)
910     with Request(xseg_ctx, VPORT_START, len(name), sizeof(xseg_request_snapshot)) as req:
911         req.set_op(X_SNAPSHOT)
912         req.set_size(sizeof(xseg_request_snapshot))
913         req.set_offset(0)
914         req.set_target(name)
915
916         xsnapshot = xseg_request_snapshot()
917         xsnapshot.target = ""
918         xsnapshot.targetlen = 0
919         req.set_data(xsnapshot)
920         req.submit()
921         req.wait()
922         ret = req.success()
923         reply = string_at(req.get_data(xseg_reply_snapshot).contents.target, 64)
924     xseg_ctx.shutdown()
925     if not ret:
926         sys.stderr.write("vlmc snapshot failed\n")
927         sys.exit(-1)
928     sys.stdout.write("Snapshot name: %s\n" % reply)
929     return
930
931
932 def vlmc_list(args):
933     if STORAGE == "rados":
934         cmd = [ 'rados', '-p', '%s' % RADOS_POOL_MAPS, 'ls' ]
935         proc = Popen(cmd, stdout = PIPE)
936         while proc.poll() is None:
937             output = proc.stdout.readline()
938             if output.startswith(ARCHIP_PREFIX) and not output.endswith('_lock\n'):
939                 print output.lstrip(ARCHIP_PREFIX),
940     elif STORAGE == "files":
941         print >> sys.stderr, "Vlmc list not supported for files yet"
942         return 0
943     else:
944         print >> sys.stderr, "Invalid storage"
945         sys.exit(-1)
946
947     return
948
949 @exclusive
950 def vlmc_remove(args):
951     name = args.name[0]
952
953     try:
954         for f in os.listdir(XSEGBD_SYSFS + "devices/"):
955             d_id = open(XSEGBD_SYSFS + "devices/" + f + "/id").read().strip()
956             target = open(XSEGBD_SYSFS + "devices/"+ f + "/target").read().strip()
957             if target == name:
958                 sys.stderr.write("Volume mapped on device %s%s\n" % (DEVICE_PREFIX,
959                                                                 d_id))
960                 sys.exit(-1)
961
962     except Exception, reason:
963         print >> sys.stderr, reason
964         sys.exit(-1)
965
966     ret = False
967     xseg_ctx = Xseg_ctx(SPEC, VTOOL)
968     with Request(xseg_ctx, MPORT, len(name), 0) as req:
969         req.set_op(X_DELETE)
970         req.set_size(0)
971         req.set_offset(0)
972         req.set_target(name)
973         req.submit()
974         req.wait()
975         ret = req.success()
976     xseg_ctx.shutdown()
977     if not ret:
978         sys.stderr.write("vlmc removal failed\n")
979         sys.exit(-1)
980
981
982 @exclusive
983 def vlmc_map(args):
984     if not loaded_module(xsegbd):
985         sys.stderr.write("Xsegbd module not loaded\n")
986         sys.exit(-1)
987     name = args.name[0]
988     prev = XSEGBD_START
989     try:
990         result = [int(open(XSEGBD_SYSFS + "devices/" + f + "/srcport").read().strip()) for f in os.listdir(XSEGBD_SYSFS + "devices/")]
991         result.sort()
992
993         for p in result:
994             if p - prev > 1:
995                break
996             else:
997                prev = p
998
999         port = prev + 1
1000         if port > XSEGBD_END:
1001             print >> sys.stderr, "Max xsegbd devices reached"
1002             sys.exit(-1)
1003         fd = os.open(XSEGBD_SYSFS + "add", os.O_WRONLY)
1004         print >> sys.stderr, "write to %s : %s %d:%d:%d" %( XSEGBD_SYSFS +
1005                         "add", name, port, port - XSEGBD_START + VPORT_START, REQS )
1006         os.write(fd, "%s %d:%d:%d" % (name, port, port - XSEGBD_START + VPORT_START, REQS))
1007         os.close(fd)
1008     except Exception, reason:
1009         print >> sys.stderr, reason
1010         sys.exit(-1)
1011
1012 @exclusive
1013 def vlmc_unmap(args):
1014     if not loaded_module(xsegbd):
1015         sys.stderr.write("Xsegbd module not loaded\n")
1016         sys.exit(-1)
1017     device = args.name[0]
1018     try:
1019         for f in os.listdir(XSEGBD_SYSFS + "devices/"):
1020             d_id = open(XSEGBD_SYSFS + "devices/" + f + "/id").read().strip()
1021             name = open(XSEGBD_SYSFS + "devices/"+ f + "/target").read().strip()
1022             if device == DEVICE_PREFIX + d_id:
1023                 fd = os.open(XSEGBD_SYSFS + "remove", os.O_WRONLY)
1024                 os.write(fd, d_id)
1025                 os.close(fd)
1026
1027                 sys.exit(0)
1028         print >> sys.stderr, "Device %s doesn't exist" % device
1029         sys.exit(-1)
1030     except Exception, reason:
1031         print >> sys.stderr, reason
1032         sys.exit(-1)
1033
1034 # FIXME:
1035 def vlmc_resize(args):
1036     if not loaded_module(xsegbd):
1037         sys.stderr.write("Xsegbd module not loaded\n")
1038         sys.exit(-1)
1039
1040     name = args.name[0]
1041     size = args.size[0]
1042
1043     try:
1044
1045         for f in os.listdir(XSEGBD_SYSFS + "devices/"):
1046             d_id = open(XSEGBD_SYSFS + "devices/" + f + "/id").read().strip()
1047             d_name = open(XSEGBD_SYSFS + "devices/"+ f + "/name").read().strip()
1048             if name == d_name:
1049                 fd = os.open(XSEGBD_SYSFS + "devices/" +  d_id +"/refresh", os.O_WRONLY)
1050                 os.write(fd, "1")
1051                 os.close(fd)
1052
1053         sys.exit(0)
1054     except Exception, reason:
1055         print >> sys.stderr, reason
1056         sys.exit(-1)
1057
1058 @exclusive
1059 def vlmc_lock(args):
1060     name = args.name[0]
1061
1062     if len(name) < 6:
1063         print >> sys.stderr, "Name should have at least len 6"
1064         sys.exit(-1)
1065     name = ARCHIP_PREFIX + name
1066
1067     ret = False
1068     xseg_ctx = Xseg_ctx(SPEC, VTOOL)
1069     with Request(xseg_ctx, MBPORT, len(name), 0) as req:
1070         req.set_op(X_ACQUIRE)
1071         req.set_size(0)
1072         req.set_offset(0)
1073         req.set_flags(XF_NOSYNC)
1074         req.set_target(name)
1075         req.submit()
1076         req.wait()
1077         ret = req.success()
1078     xseg_ctx.shutdown()
1079     if not ret:
1080         sys.stderr.write("vlmc lock failed\n")
1081         sys.exit(-1)
1082     else:
1083         sys.stdout.write("Volume locked\n")
1084
1085 @exclusive
1086 def vlmc_unlock(args):
1087     name = args.name[0]
1088     force = args.force
1089
1090     if len(name) < 6:
1091         print >> sys.stderr, "Name should have at least len 6"
1092         sys.exit(-1)
1093     name = ARCHIP_PREFIX + name
1094
1095     ret = False
1096     xseg_ctx = Xseg_ctx(SPEC, VTOOL)
1097     with Request(xseg_ctx, MBPORT, len(name), 0) as req:
1098         req.set_op(X_RELEASE)
1099         req.set_size(0)
1100         req.set_offset(0)
1101         req.set_target(name)
1102         if force:
1103             req.set_flags(XF_NOSYNC|XF_FORCE)
1104         else:
1105             req.set_flags(XF_NOSYNC)
1106         req.submit()
1107         req.wait()
1108         ret = req.success()
1109     xseg_ctx.shutdown()
1110     if not ret:
1111         sys.stderr.write("vlmc unlock failed\n")
1112         sys.exit(-1)
1113     else:
1114         sys.stdout.write("Volume unlocked\n")
1115
1116 @exclusive
1117 def vlmc_open(args):
1118     name = args.name[0]
1119
1120     if len(name) < 6:
1121         print >> sys.stderr, "Name should have at least len 6"
1122         sys.exit(-1)
1123
1124     ret = False
1125     xseg_ctx = Xseg_ctx(SPEC, VTOOL)
1126     with Request(xseg_ctx, VPORT, len(name), 0) as req:
1127         req.set_op(X_OPEN)
1128         req.set_size(0)
1129         req.set_offset(0)
1130         req.set_target(name)
1131         req.submit()
1132         req.wait()
1133         ret = req.success()
1134     xseg_ctx.shutdown()
1135     if not ret:
1136         sys.stderr.write("vlmc open failed\n")
1137         sys.exit(-1)
1138     else:
1139         sys.stdout.write("Volume opened\n")
1140
1141 @exclusive
1142 def vlmc_close(args):
1143     name = args.name[0]
1144
1145     if len(name) < 6:
1146         print >> sys.stderr, "Name should have at least len 6"
1147         sys.exit(-1)
1148
1149     ret = False
1150     xseg_ctx = Xseg_ctx(SPEC, VTOOL)
1151     with Request(xseg_ctx, VPORT, len(name), 0) as req:
1152         req.set_op(X_CLOSE)
1153         req.set_size(0)
1154         req.set_offset(0)
1155         req.set_target(name)
1156         req.submit()
1157         req.wait()
1158         ret = req.success()
1159     xseg_ctx.shutdown()
1160     if not ret:
1161         sys.stderr.write("vlmc close failed\n")
1162         sys.exit(-1)
1163     else:
1164         sys.stdout.write("Volume closed\n")
1165
1166 def archipelago():
1167     parser = argparse.ArgumentParser(description='Archipelago tool')
1168     parser.add_argument('-c', '--config', type=str, nargs='?', help='config file')
1169     parser.add_argument('-u', '--user',  action='store_true', default=False , help='affect only userspace peers')
1170     subparsers = parser.add_subparsers()
1171
1172     start_parser = subparsers.add_parser('start', help='Start archipelago')
1173     start_parser.set_defaults(func=start)
1174
1175     stop_parser = subparsers.add_parser('stop', help='Stop archipelago')
1176     stop_parser.set_defaults(func=stop)
1177
1178     status_parser = subparsers.add_parser('status', help='Archipelago status')
1179     status_parser.set_defaults(func=status)
1180
1181     restart_parser = subparsers.add_parser('restart', help='Restart archipelago')
1182     restart_parser.set_defaults(func=restart)
1183
1184     return parser
1185
1186 def vlmc():
1187     parser = argparse.ArgumentParser(description='vlmc tool')
1188     parser.add_argument('-c', '--config', type=str, nargs='?', help='config file')
1189     subparsers = parser.add_subparsers()
1190
1191     create_parser = subparsers.add_parser('create', help='Create volume')
1192     #group = create_parser.add_mutually_exclusive_group(required=True)
1193     create_parser.add_argument('-s', '--size', type=int, nargs='?', help='requested size in MB for create')
1194     create_parser.add_argument('--snap', type=str, nargs='?', help='create from snapshot')
1195     create_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1196     create_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1197     create_parser.set_defaults(func=vlmc_create)
1198
1199     remove_parser = subparsers.add_parser('remove', help='Delete volume')
1200     remove_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1201     remove_parser.set_defaults(func=vlmc_remove)
1202     remove_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1203
1204     rm_parser = subparsers.add_parser('rm', help='Delete volume')
1205     rm_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1206     rm_parser.set_defaults(func=vlmc_remove)
1207     rm_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1208
1209     map_parser = subparsers.add_parser('map', help='Map volume')
1210     map_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1211     map_parser.set_defaults(func=vlmc_map)
1212     map_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1213
1214     unmap_parser = subparsers.add_parser('unmap', help='Unmap volume')
1215     unmap_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1216     unmap_parser.set_defaults(func=vlmc_unmap)
1217     unmap_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1218
1219     showmapped_parser = subparsers.add_parser('showmapped', help='Show mapped volumes')
1220     showmapped_parser.set_defaults(func=vlmc_showmapped_wrapper)
1221     showmapped_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1222
1223     list_parser = subparsers.add_parser('list', help='List volumes')
1224     list_parser.set_defaults(func=vlmc_list)
1225     list_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1226
1227     snapshot_parser = subparsers.add_parser('snapshot', help='snapshot volume')
1228     #group = snapshot_parser.add_mutually_exclusive_group(required=True)
1229     snapshot_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1230     snapshot_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1231     snapshot_parser.set_defaults(func=vlmc_snapshot)
1232
1233     ls_parser = subparsers.add_parser('ls', help='List volumes')
1234     ls_parser.set_defaults(func=vlmc_list)
1235     ls_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1236
1237     resize_parser = subparsers.add_parser('resize', help='Resize volume')
1238     resize_parser.add_argument('-s', '--size', type=int, nargs=1, help='requested size in MB for resize')
1239     resize_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1240     resize_parser.set_defaults(func=vlmc_resize)
1241     resize_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1242
1243     open_parser = subparsers.add_parser('open', help='open volume')
1244     open_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1245     open_parser.set_defaults(func=vlmc_open)
1246     open_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1247
1248     close_parser = subparsers.add_parser('close', help='close volume')
1249     close_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1250     close_parser.set_defaults(func=vlmc_close)
1251     close_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1252
1253     lock_parser = subparsers.add_parser('lock', help='lock volume')
1254     lock_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1255     lock_parser.set_defaults(func=vlmc_lock)
1256     lock_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1257
1258     unlock_parser = subparsers.add_parser('unlock', help='unlock volume')
1259     unlock_parser.add_argument('name', type=str, nargs=1, help='volume/device name')
1260     unlock_parser.add_argument('-f', '--force',  action='store_true', default=False , help='break lock')
1261     unlock_parser.set_defaults(func=vlmc_unlock)
1262     unlock_parser.add_argument('-p', '--pool', type=str, nargs='?', help='for backwards compatiblity with rbd')
1263
1264     return parser
1265
1266 if __name__ == "__main__":
1267     # parse arguments and discpatch to the correct func
1268     try:
1269         parser_func = {
1270             'archipelago' : archipelago,
1271             'vlmc'        : vlmc,
1272         }[os.path.basename(sys.argv[0])]
1273         parser = parser_func()
1274     except Exception as e:
1275         sys.stderr.write("Invalid basename\n")
1276         sys.exit(-1)
1277
1278     args = parser.parse_args()
1279     loadrc(args.config)
1280     if parser_func == archipelago:
1281         peers = construct_peers()
1282         xsegbd_args = [('start_portno', str(XSEGBD_START)), ('end_portno',
1283                 str(XSEGBD_END))]
1284
1285     sys.exit(args.func(args))
1286