Change configuration file format
[archipelago] / xseg / tools / archipelago / archipelago / common.py
1 #!/usr/bin/env python
2
3 # Copyright 2012 GRNET S.A. All rights reserved.
4 #
5 # Redistribution and use in source and binary forms, with or
6 # without modification, are permitted provided that the following
7 # conditions are met:
8 #
9 #   1. Redistributions of source code must retain the above
10 #      copyright notice, this list of conditions and the following
11 #      disclaimer.
12 #
13 #   2. Redistributions in binary form must reproduce the above
14 #      copyright notice, this list of conditions and the following
15 #      disclaimer in the documentation and/or other materials
16 #      provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
30 #
31 # The views and conclusions contained in the software and
32 # documentation are those of the authors and should not be
33 # interpreted as representing official policies, either expressed
34 # or implied, of GRNET S.A.
35 #
36
37
38 from xseg.xseg_api import *
39 from xseg.xprotocol import *
40 from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \
41     create_string_buffer, pointer, sizeof, POINTER, byref, c_int, c_char, Structure
42 import ctypes
43 cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
44
45 import os
46 import sys
47 import time
48 import psutil
49 import errno
50 import signal
51 from subprocess import check_call
52 from collections import namedtuple
53 import socket
54 import random
55 from select import select
56 import ConfigParser
57
58 random.seed()
59 hostname = socket.gethostname()
60
61 valid_role_types = ['file_blocker', 'rados_blocker', 'mapperd', 'vlmcd']
62 valid_segment_types = ['segdev', 'posix']
63
64 peers = dict()
65 xsegbd_args = []
66 segment = None
67 modules = ['xseg', 'segdev', 'xseg_posix', 'xseg_pthread', 'xseg_segdev']
68 xsegbd = 'xsegbd'
69
70 BIN_DIR = '/usr/bin/'
71 DEFAULTS = '/etc/archipelago/archipelago.conf'
72
73 #system defaults
74 ARCHIP_PREFIX = 'archip_'
75 LOG_SUFFIX = '.log'
76 PID_SUFFIX = '.pid'
77 PIDFILE_PATH = "/var/run/archipelago"
78 VLMC_LOCK_FILE = 'vlmc.lock'
79 LOGS_PATH = "/var/log/archipelago"
80 LOCK_PATH = "/var/lock"
81 DEVICE_PREFIX = "/dev/xsegbd"
82 XSEGBD_SYSFS = "/sys/bus/xsegbd/"
83
84 CHARDEV_NAME = "/dev/segdev"
85 CHARDEV_MAJOR = 60
86 CHARDEV_MINOR = 0
87
88 REQS = 512
89
90 FILE_BLOCKER = 'archip-filed'
91 RADOS_BLOCKER = 'archip-sosd'
92 MAPPER = 'archip-mapperd'
93 VLMC = 'archip-vlmcd'
94
95 def is_power2(x):
96     return bool(x != 0 and (x & (x-1)) == 0)
97
98 #hack to test green waiting with python gevent.
99 class posixfd_signal_desc(Structure):
100     pass
101 posixfd_signal_desc._fields_ = [
102     ('signal_file', c_char * sizeof(c_void_p)),
103     ('fd', c_int),
104     ('flag', c_int),
105 ]
106
107 def xseg_wait_signal_green(ctx, sd, timeout):
108     posixfd_sd = cast(sd, POINTER(posixfd_signal_desc))
109     fd = posixfd_sd.contents.fd
110     select([fd], [], [], timeout/1000000.0)
111     while True:
112         try:
113             os.read(fd, 512)
114         except OSError as (e,msg):
115             if e == 11:
116                 break
117             else:
118                 raise OSError(e, msg)
119
120
121 class Peer(object):
122     cli_opts = None
123
124     def __init__(self, role=None, daemon=True, nr_ops=16,
125                  logfile=None, pidfile=None, portno_start=None,
126                  portno_end=None, log_level=0, spec=None, threshold=None):
127         if not role:
128             raise Error("Role was not provided")
129         self.role = role
130
131         self.nr_ops = nr_ops
132         if not self.nr_ops > 0:
133             raise Error("Invalid nr_ops for %s" % role)
134
135         if not is_power2(self.nr_ops):
136             raise Error("nr_ops of %s is not a power of 2" % role)
137
138         if not self.executable:
139             raise Error("Executable must be provided for %s" % role)
140
141         if portno_start is None:
142             raise Error("Portno_start must be provided for %s" % role)
143         self.portno_start = portno_start
144
145         if portno_end is None:
146             raise Error("Portno_end must be provided for %s" % role)
147         self.portno_end = portno_end
148
149         self.daemon = daemon
150         if not spec:
151             raise Error("Xseg spec was not provided for %s" % role)
152         self.spec = spec
153
154         if logfile:
155             self.logfile = logfile
156         else:
157             self.logfile = os.path.join(LOGS_PATH, role + LOG_SUFFIX)
158
159         if pidfile:
160             self.pidfile = pidfile
161         else:
162             self.pidfile = os.path.join(PIDFILE_PATH, role + PID_SUFFIX)
163
164         try:
165             if not os.path.isdir(os.path.dirname(self.logfile)):
166                 raise Error("Log path %s does not exist" % self.logfile)
167         except:
168             raise Error("Log path %s does not exist or is not a directory" %
169                         self.logfile)
170
171         try:
172             os.makedirs(os.path.dirname(self.pidfile))
173         except OSError as e:
174             if e.errno == errno.EEXIST:
175                 if os.path.isdir(os.path.dirname(self.pidfile)):
176                     pass
177                 else:
178                     raise Error("Pid path %s is not a directory" %
179                                 os.path.dirname(self.pidfile))
180             else:
181                 raise Error("Cannot create path %s" %
182                             os.path.dirname(self.pidfile))
183
184         self.log_level = log_level
185         self.threshold = threshold
186
187         if self.log_level < 0 or self.log_level > 3:
188             raise Error("%s: Invalid log level %d" %
189                         (self.role, self.log_level))
190
191         if self.cli_opts is None:
192             self.cli_opts = []
193         self.set_cli_options()
194
195     def start(self):
196         if self.get_pid():
197             raise Error("Peer has valid pidfile")
198         cmd = [os.path.join(BIN_DIR, self.executable)] + self.cli_opts
199         try:
200             check_call(cmd, shell=False)
201         except Exception as e:
202             raise Error("Cannot start %s: %s" % (self.role, str(e)))
203
204     def stop(self):
205         pid = self.get_pid()
206         if not pid:
207             raise Error("Peer %s not running" % self.role)
208
209         if self.__is_running(pid):
210             os.kill(pid, signal.SIGTERM)
211
212     def __is_running(self, pid):
213         name = self.executable
214         for p in psutil.process_iter():
215             if p.name[0:len(name)] == name and pid == p.pid:
216                 return True
217
218         return False
219
220     def is_running(self):
221         pid = self.get_pid()
222         if not pid:
223             return False
224
225         if not self.__is_running(pid):
226             raise Error("Peer %s has valid pidfile but is not running" %
227                         self.role)
228
229         return True
230
231     def get_pid(self):
232         if not self.pidfile:
233             return None
234
235         pf = None
236         try:
237             pf = open(self.pidfile, "r")
238             pid = int(pf.read())
239             pf.close()
240         except:
241             if pf:
242                 pf.close()
243             return None
244
245         return pid
246
247     def set_cli_options(self):
248         if self.daemon:
249             self.cli_opts.append("-d")
250         if self.nr_ops:
251             self.cli_opts.append("-n")
252             self.cli_opts.append(str(self.nr_ops))
253         if self.logfile:
254             self.cli_opts.append("-l")
255             self.cli_opts.append(self.logfile)
256         if self.pidfile:
257             self.cli_opts.append("--pidfile")
258             self.cli_opts.append(self.pidfile)
259         if self.portno_start is not None:
260             self.cli_opts.append("-sp")
261             self.cli_opts.append(str(self.portno_start))
262         if self.portno_end is not None:
263             self.cli_opts.append("-ep")
264             self.cli_opts.append(str(self.portno_end))
265         if self.log_level is not None:
266             self.cli_opts.append("-v")
267             self.cli_opts.append(str(self.log_level))
268         if self.spec:
269             self.cli_opts.append("-g")
270             self.cli_opts.append(self.spec)
271         if self.threshold:
272             self.cli_opts.append("--threshold")
273             self.cli_opts.append(str(self.threshold))
274
275
276 class MTpeer(Peer):
277     def __init__(self, nr_threads=1, **kwargs):
278         self.nr_threads = nr_threads
279         super(MTpeer, self).__init__(**kwargs)
280
281         if self.cli_opts is None:
282             self.cli_opts = []
283         self.set_mtcli_options()
284
285     def set_mtcli_options(self):
286         self.cli_opts.append("-t")
287         self.cli_opts.append(str(self.nr_threads))
288
289
290 class Sosd(MTpeer):
291     def __init__(self, pool=None, **kwargs):
292         self.executable = RADOS_BLOCKER
293         self.pool = pool
294         super(Sosd, self).__init__(**kwargs)
295
296         if self.cli_opts is None:
297             self.cli_opts = []
298         self.set_sosd_cli_options()
299
300     def set_sosd_cli_options(self):
301         if self.pool:
302             self.cli_opts.append("--pool")
303             self.cli_opts.append(self.pool)
304
305
306 class Filed(MTpeer):
307     def __init__(self, archip_dir=None, prefix=None, fdcache=None,
308                  unique_str=None, nr_threads=1, nr_ops=16, direct=True, **kwargs):
309         self.executable = FILE_BLOCKER
310         self.archip_dir = archip_dir
311         self.prefix = prefix
312         self.fdcache = fdcache
313         self.unique_str = unique_str
314         self.direct = direct
315         nr_threads = nr_ops
316         if self.fdcache and fdcache < 2*nr_threads:
317             raise Error("Fdcache should be greater than 2*nr_threads")
318
319         super(Filed, self).__init__(nr_threads=nr_threads, nr_ops=nr_ops, **kwargs)
320
321         if not self.archip_dir:
322             raise Error("%s: Archip dir must be set" % self.role)
323         if not os.path.isdir(self.archip_dir):
324             raise Error("%s: Archip dir invalid" % self.role)
325         if not self.fdcache:
326             self.fdcache = 2*self.nr_ops
327         if not self.unique_str:
328             self.unique_str = hostname + '_' + str(self.portno_start)
329
330         if self.cli_opts is None:
331             self.cli_opts = []
332         self.set_filed_cli_options()
333
334     def set_filed_cli_options(self):
335         if self.unique_str:
336             self.cli_opts.append("--uniquestr")
337             self.cli_opts.append(self.unique_str)
338         if self.fdcache:
339             self.cli_opts.append("--fdcache")
340             self.cli_opts.append(str(self.fdcache))
341         if self.archip_dir:
342             self.cli_opts.append("--archip")
343             self.cli_opts.append(self.archip_dir)
344         if self.prefix:
345             self.cli_opts.append("--prefix")
346             self.cli_opts.append(self.prefix)
347         if self.direct:
348             self.cli_opts.append("--directio")
349
350
351 class Mapperd(Peer):
352     def __init__(self, blockerm_port=None, blockerb_port=None, **kwargs):
353         self.executable = MAPPER
354         if blockerm_port is None:
355             raise Error("blockerm_port must be provied for %s" % role)
356         self.blockerm_port = blockerm_port
357
358         if blockerb_port is None:
359             raise Error("blockerb_port must be provied for %s" % role)
360         self.blockerb_port = blockerb_port
361         super(Mapperd, self).__init__(**kwargs)
362
363         if self.cli_opts is None:
364             self.cli_opts = []
365         self.set_mapperd_cli_options()
366
367     def set_mapperd_cli_options(self):
368         if self.blockerm_port is not None:
369             self.cli_opts.append("-mbp")
370             self.cli_opts.append(str(self.blockerm_port))
371         if self.blockerb_port is not None:
372             self.cli_opts.append("-bp")
373             self.cli_opts.append(str(self.blockerb_port))
374
375
376 class Vlmcd(Peer):
377     def __init__(self, blocker_port=None, mapper_port=None, **kwargs):
378         self.executable = VLMC
379         if blocker_port is None:
380             raise Error("blocker_port must be provied for %s" % role)
381         self.blocker_port = blocker_port
382
383         if mapper_port is None:
384             raise Error("mapper_port must be provied for %s" % role)
385         self.mapper_port = mapper_port
386         super(Vlmcd, self).__init__(**kwargs)
387
388         if self.cli_opts is None:
389             self.cli_opts = []
390         self.set_vlmcd_cli_opts()
391
392     def set_vlmcd_cli_opts(self):
393         if self.blocker_port is not None:
394             self.cli_opts.append("-bp")
395             self.cli_opts.append(str(self.blocker_port))
396         if self.mapper_port is not None:
397             self.cli_opts.append("-mp")
398             self.cli_opts.append(str(self.mapper_port))
399
400
401 config = {
402     'CEPH_CONF_FILE': '/etc/ceph/ceph.conf',
403 #    'SPEC': "segdev:xsegbd:1024:5120:12",
404     'SEGMENT_TYPE': 'segdev',
405     'SEGMENT_NAME': 'xsegbd',
406     'SEGMENT_DYNPORTS': 1024,
407     'SEGMENT_PORTS': 2048,
408     'SEGMENT_SIZE': 5120,
409     'SEGMENT_ALIGNMENT': 12,
410     'XSEGBD_START': 0,
411     'XSEGBD_END': 499,
412     'VTOOL_START': 1003,
413     'VTOOL_END': 1003,
414     #RESERVED 1023
415 }
416
417 FIRST_COLUMN_WIDTH = 23
418 SECOND_COLUMN_WIDTH = 23
419
420
421 def green(s):
422     return '\x1b[32m' + str(s) + '\x1b[0m'
423
424
425 def red(s):
426     return '\x1b[31m' + str(s) + '\x1b[0m'
427
428
429 def yellow(s):
430     return '\x1b[33m' + str(s) + '\x1b[0m'
431
432
433 def pretty_print(cid, status):
434     sys.stdout.write(cid.ljust(FIRST_COLUMN_WIDTH))
435     sys.stdout.write(status.ljust(SECOND_COLUMN_WIDTH))
436     sys.stdout.write('\n')
437     return
438
439
440 class Error(Exception):
441     def __init__(self, msg):
442         self.msg = msg
443
444     def __str__(self):
445         return self.msg
446
447 class Segment(object):
448     type = 'segdev'
449     name = 'xsegbd'
450     dyports = 1024
451     ports = 2048
452     size = 5120
453     alignment = 12
454
455     spec = None
456
457     def __init__(self, type, name, dynports, ports, size, align=12):
458         initialize_xseg()
459         self.type = type
460         self.name = name
461         self.dynports = dynports
462         self.ports = ports
463         self.size = size
464         self.alignment = align
465
466         if self.type not in valid_segment_types:
467             raise Error("Segment type not valid")
468         if self.alignment != 12:
469             raise Error("Wrong alignemt")
470         if self.dynports >= self.ports :
471             raise Error("Dynports >= max ports")
472
473         self.spec = self.get_spec()
474
475     def get_spec(self):
476         if not self.spec:
477             params = [self.type, self.name, str(self.dynports), str(self.ports),
478                       str(self.size), str(self.alignment)]
479             self.spec = ':'.join(params).encode()
480         return self.spec
481
482     def create(self):
483         #fixme blocking....
484         xconf = xseg_config()
485         c_spec = create_string_buffer(self.spec)
486         xseg_parse_spec(c_spec, xconf)
487         r = xseg_create(xconf)
488         if r < 0:
489             raise Error("Cannot create segment")
490
491     def destroy(self):
492         #fixme blocking....
493         try:
494             xseg = self.join()
495             xseg_leave(xseg)
496             xseg_destroy(xseg)
497         except Exception:
498             raise Error("Cannot destroy segment")
499
500     def join(self):
501         xconf = xseg_config()
502         spec_buf = create_string_buffer(self.spec)
503         xseg_parse_spec(spec_buf, xconf)
504         ctx = xseg_join(xconf.type, xconf.name, "posixfd",
505                         cast(0, cb_null_ptrtype))
506         if not ctx:
507             raise Error("Cannot join segment")
508
509         return ctx
510
511 def check_conf():
512     port_ranges = []
513
514     def isExec(file_path):
515         return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
516
517     def validExec(program):
518         for path in os.environ["PATH"].split(os.pathsep):
519             exe_file = os.path.join(path, program)
520             if isExec(exe_file):
521                 return True
522         return False
523
524     def validatePort(portno, limit):
525         if portno >= limit:
526             raise Error("Portno %d out of range" % portno)
527
528     def validatePortRange(portno_start, portno_end, limit):
529         validatePort(portno_start, limit)
530         validatePort(portno_end, limit)
531         if portno_start > portno_end:
532             raise Error("Portno_start > Portno_end: %d > %d " %
533                         (portno_start, portno_end))
534         for start, end in port_ranges:
535             if not (portno_end < start or portno_start > end):
536                 raise Error("Port range conflict: (%d, %d) confilcts with (%d, %d)" %
537                             (portno_start, portno_end,  start, end))
538         port_ranges.append((portno_start, portno_end))
539
540     xseg_type = config['SEGMENT_TYPE']
541     xseg_name = config['SEGMENT_NAME']
542     xseg_dynports = config['SEGMENT_DYNPORTS']
543     xseg_ports = config['SEGMENT_PORTS']
544     xseg_size = config['SEGMENT_SIZE']
545     xseg_align = config['SEGMENT_ALIGNMENT']
546
547     global segment
548     segment = Segment(xseg_type, xseg_name, xseg_dynports, xseg_ports, xseg_size,
549                       xseg_align)
550
551
552     try:
553         if not config['roles']:
554             raise Error("Roles setup must be provided")
555     except KeyError:
556         raise Error("Roles setup must be provided")
557
558     for role, role_type in config['roles']:
559         if role_type not in valid_role_types:
560             raise Error("%s is not a valid role" % role_type)
561         try:
562             role_config = config[role]
563         except:
564             raise Error("No config found for %s" % role)
565
566         if role_type == 'file_blocker':
567             peers[role] = Filed(role=role, spec=segment.get_spec(),
568                                  prefix=ARCHIP_PREFIX, **role_config)
569         elif role_type == 'rados_blocker':
570             peers[role] = Sosd(role=role, spec=segment.get_spec(),
571                                **role_config)
572         elif role_type == 'mapperd':
573             peers[role] = Mapperd(role=role, spec=segment.get_spec(),
574                                   **role_config)
575         elif role_type == 'vlmcd':
576             peers[role] = Vlmcd(role=role, spec=segment.get_spec(),
577                                 **role_config)
578         else:
579             raise Error("No valid peer type: %s" % role_type)
580         validatePortRange(peers[role].portno_start, peers[role].portno_end,
581                           xseg_ports)
582
583     validatePortRange(config['VTOOL_START'], config['VTOOL_END'], xseg_ports)
584     validatePortRange(config['XSEGBD_START'], config['XSEGBD_END'],
585                       xseg_ports)
586
587     xsegbd_range = config['XSEGBD_END'] - config['XSEGBD_START']
588     vlmcd_range = peers['vlmcd'].portno_end - peers['vlmcd'].portno_start
589     if xsegbd_range > vlmcd_range:
590         raise Error("Xsegbd port range must be smaller that vlmcd port range")
591     return True
592
593 def get_segment():
594     return segment
595
596 def construct_peers():
597     return peers
598
599 vtool_port = None
600 def get_vtool_port():
601     global vtool_port
602     if vtool_port is None:
603         vtool_port = random.randint(config['VTOOL_START'], config['VTOOL_END'])
604     return vtool_port
605
606 acquired_locks = {}
607
608 def get_lock(lock_file):
609     while True:
610         try:
611             fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
612             break
613         except OSError, (err, reason):
614             print >> sys.stderr, lock_file, reason
615             if err == errno.EEXIST:
616                 time.sleep(0.2)
617             else:
618                 raise OSError(err, lock_file + ' ' + reason)
619     return fd
620
621 def exclusive(get_port=False):
622     def wrap(fn):
623         def lock(*args, **kwargs):
624             if not os.path.exists(LOCK_PATH):
625                 try:
626                     os.mkdir(LOCK_PATH)
627                 except OSError, (err, reason):
628                     print >> sys.stderr, reason
629
630             if not os.path.isdir(LOCK_PATH):
631                 raise Error("Locking error: %s is not a directory" % LOCK_PATH)
632
633             if get_port:
634                 vtool_port = get_vtool_port()
635                 lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE + '_' + str(vtool_port))
636             else:
637                 lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE)
638             try:
639                 depth = acquired_locks[lock_file]
640                 if depth == 0:
641                     fd = get_lock(lock_file)
642             except KeyError:
643                 acquired_locks[lock_file] = 0
644                 fd = get_lock(lock_file)
645
646             acquired_locks[lock_file] += 1
647             try:
648                 r = fn(*args, **kwargs)
649             finally:
650                 acquired_locks[lock_file] -= 1
651                 depth = acquired_locks[lock_file]
652                 if depth == 0:
653                     os.close(fd)
654                     os.unlink(lock_file)
655             return r
656
657         return lock
658     return wrap
659
660 def createBDict(cfg, section):
661         sec_dic = {}
662         sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
663         sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
664         sec_dic['log_level'] = cfg.getint(section, 'log_level')
665         sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
666         try:
667                 sec_dic['nr_threads'] = cfg.getint(section, 'nr_threads')
668                 sec_dic['archip_dir'] = cfg.get(section, 'archip_dir')
669                 sec_dic['fdcache'] = cfg.getint(section, 'fdcache')
670         except:
671                 sec_dic['pool'] = cfg.get(section, 'pool')
672         return sec_dic
673
674 def createMDict(cfg, section):
675         sec_dic = {}
676         sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
677         sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
678         sec_dic['log_level'] = cfg.getint(section, 'log_level')
679         sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
680         sec_dic['blockerb_port'] = cfg.getint(section, 'blockerb_port')
681         sec_dic['blockerm_port'] = cfg.getint(section, 'blockerm_port')
682         return sec_dic
683
684 def createVDict(cfg, section):
685         sec_dic = {}
686         sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
687         sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
688         sec_dic['log_level'] = cfg.getint(section, 'log_level')
689         sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
690         sec_dic['blocker_port'] = cfg.getint(section, 'blocker_port')
691         sec_dic['mapper_port'] = cfg.getint(section, 'mapper_port')
692         return sec_dic
693
694
695 def loadrc(rc):
696     try:
697         if rc is None:
698             cfg_dir = os.path.expanduser(DEFAULTS)
699         else:
700             cfg_dir = rc
701         cfg_fd = open(cfg_dir)
702     except:
703         raise Error("Cannot read config file")
704
705     cfg = ConfigParser.ConfigParser()
706     cfg.readfp(cfg_fd)
707     config['SEGMENT_PORTS'] = cfg.getint('XSEG','SEGMENT_PORTS')
708     config['SEGMENT_SIZE'] = cfg.getint('XSEG','SEGMENT_SIZE')
709     config['XSEGBD_START'] = cfg.getint('XSEG','XSEGBD_START')
710     config['XSEGBD_END'] = cfg.getint('XSEG','XSEGBD_END')
711     config['VTOOL_START'] = cfg.getint('XSEG','VTOOL_START')
712     config['VTOOL_END'] = cfg.getint('XSEG','VTOOL_END')
713     config['roles'] = eval(cfg.get('ROLES','order'))
714     config['blockerb'] = createBDict(cfg, 'BLOCKERB')
715     config['blockerm'] = createBDict(cfg, 'BLOCKERM')
716     config['mapperd'] = createMDict(cfg, 'MAPPERD')
717     config['vlmcd'] = createVDict(cfg, 'VLMCD')
718
719     if not check_conf():
720         raise Error("Invalid conf file")
721
722
723 def loaded_modules():
724     lines = open("/proc/modules").read().split("\n")
725     modules = [f.split(" ")[0] for f in lines]
726     return modules
727
728
729 def loaded_module(name):
730     return name in loaded_modules()
731
732
733 def load_module(name, args):
734     s = "Loading %s " % name
735     sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
736     modules = loaded_modules()
737     if name in modules:
738         sys.stdout.write(yellow("Already loaded".ljust(SECOND_COLUMN_WIDTH)))
739         sys.stdout.write("\n")
740         return
741     cmd = ["modprobe", "%s" % name]
742     if args:
743         for arg in args:
744             cmd.extend(["%s=%s" % (arg)])
745     try:
746         check_call(cmd, shell=False)
747     except Exception:
748         sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
749         sys.stdout.write("\n")
750         raise Error("Cannot load module %s. Check system logs" % name)
751     sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
752     sys.stdout.write("\n")
753
754
755 def unload_module(name):
756     s = "Unloading %s " % name
757     sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
758     modules = loaded_modules()
759     if name not in modules:
760         sys.stdout.write(yellow("Not loaded".ljust(SECOND_COLUMN_WIDTH)))
761         sys.stdout.write("\n")
762         return
763     cmd = ["modprobe -r %s" % name]
764     try:
765         check_call(cmd, shell=True)
766     except Exception:
767         sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
768         sys.stdout.write("\n")
769         raise Error("Cannot unload module %s. Check system logs" % name)
770     sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
771     sys.stdout.write("\n")
772
773 xseg_initialized = False
774
775
776 def initialize_xseg():
777     global xseg_initialized
778     if not xseg_initialized:
779         xseg_initialize()
780         xseg_initialized = True
781
782
783 def check_running(name, pid=None):
784     for p in psutil.process_iter():
785         if p.name[0:len(name)] == name:
786             if pid:
787                 if pid == p.pid:
788                     return pid
789             else:
790                 return pid
791     return None
792
793
794 def check_pidfile(name):
795     pidfile = os.path.join(PIDFILE_PATH, name + PID_SUFFIX)
796     pf = None
797     try:
798         pf = open(pidfile, "r")
799         pid = int(pf.read())
800         pf.close()
801     except:
802         if pf:
803             pf.close()
804         return -1
805
806     return pid
807
808 class Xseg_ctx(object):
809     ctx = None
810     port = None
811     portno = None
812     signal_desc = None
813     dynalloc = False
814
815     def __init__(self, segment, portno=None):
816         ctx = segment.join()
817         if not ctx:
818             raise Error("Cannot join segment")
819         if portno == None:
820             port = xseg_bind_dynport(ctx)
821             portno = xseg_portno_nonstatic(ctx, port)
822             dynalloc = True
823         else:
824             port = xseg_bind_port(ctx, portno, c_void_p(0))
825             dynalloc = False
826
827         if not port:
828             raise Error("Cannot bind to port")
829
830         sd = xseg_get_signal_desc_nonstatic(ctx, port)
831         if not sd:
832             raise Error("Cannot get signal descriptor")
833
834         xseg_init_local_signal(ctx, portno)
835         self.ctx = ctx
836         self.port = port
837         self.portno = portno
838         self.dynalloc = dynalloc
839         self.signal_desc = sd
840
841     def __del__(self):
842         return
843
844     def __enter__(self):
845         if not self.ctx:
846             raise Error("No segment")
847         return self
848
849     def __exit__(self, type_, value, traceback):
850         self.shutdown()
851         return False
852
853     def shutdown(self):
854         if self.port is not None and self.dynalloc:
855                 xseg_leave_dynport(self.ctx, self.port)
856         if self.ctx:
857         #    xseg_quit_local_signal(self.ctx, self.portno)
858             xseg_leave(self.ctx)
859         self.ctx = None
860
861     def wait_request(self):
862         xseg_prepare_wait(self.ctx, self.portno)
863         while True:
864             received = xseg_receive(self.ctx, self.portno, 0)
865             if received:
866                 xseg_cancel_wait(self.ctx, self.portno)
867                 return received
868             else:
869                 xseg_wait_signal_green(self.ctx, self.signal_desc, 10000000)
870
871     def wait_requests(self, requests):
872         while True:
873             received = self.wait_request()
874             for req in requests:
875                 xseg_req = req.req
876                 if addressof(received.contents) == \
877                             addressof(xseg_req.contents):
878                     return req
879             p = xseg_respond(self.ctx, received, self.portno, X_ALLOC)
880             if p == NoPort:
881                 xseg_put_request(self.ctx, received, self.portno)
882             else:
883                 xseg_signal(self.ctx, p)
884
885
886 class Request(object):
887     xseg_ctx = None
888     req = None
889
890     def __init__(self, xseg_ctx, dst_portno, target, datalen=0, size=0, op=None,
891                  data=None, flags=0, offset=0):
892         if not target:
893             raise Error("No target")
894         targetlen = len(target)
895         if not datalen and data:
896             if isinstance(data, basestring):
897                 datalen = len(data)
898             else:
899                 datalen = sizeof(data)
900
901         ctx = xseg_ctx.ctx
902         if not ctx:
903             raise Error("No context")
904         req = xseg_get_request(ctx, xseg_ctx.portno, dst_portno, X_ALLOC)
905         if not req:
906             raise Error("Cannot get request")
907         r = xseg_prep_request(ctx, req, targetlen, datalen)
908         if r < 0:
909             xseg_put_request(ctx, req, xseg_ctx.portno)
910             raise Error("Cannot prepare request")
911         self.req = req
912         self.xseg_ctx = xseg_ctx
913  
914         if not self.set_target(target):
915             self.put()
916             raise Error("Cannot set target")
917
918         if (data):
919             if not self.set_data(data):
920                 self.put()
921                 raise Error("Cannot set data")
922
923         self.set_size(size)
924         self.set_op(op)
925         self.set_flags(flags)
926         self.set_offset(offset)
927
928         return
929
930     def __enter__(self):
931         if not self.req:
932             raise Error("xseg request not set")
933         return self
934
935     def __exit__(self, type_, value, traceback):
936         self.put()
937         self.req = None
938         return False
939
940     def put(self, force=False):
941         if not self.req:
942             return False;
943         if not force:
944             if xq_count(byref(self.req.contents.path)) > 0:
945                 return False
946         xseg_put_request(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno)
947         self.req = None
948         return True
949
950     def get_datalen(self):
951         return self.req.contents.datalen
952
953     def set_op(self, op):
954         self.req.contents.op = op
955
956     def get_op(self):
957         return self.req.contents.op
958
959     def set_offset(self, offset):
960         self.req.contents.offset = offset
961
962     def get_offset(self):
963         return self.req.contents.offset
964
965     def get_size(self):
966         return self.req.contents.size
967
968     def set_size(self, size):
969         self.req.contents.size = size
970
971     def get_serviced(self):
972         return self.req.contents.serviced
973
974     def set_serviced(self, serviced):
975         self.req.contents.serviced = serviced
976
977     def set_flags(self, flags):
978         self.req.contents.flags = flags
979
980     def get_flags(self):
981         return self.req.contents.flags
982
983     def set_target(self, target):
984         """Sets the target of the request, respecting request's targetlen"""
985         if len(target) != self.req.contents.targetlen:
986             return False
987         c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
988         p_target = create_string_buffer(target)
989 #        print hex(addressof(c_target.contents))
990         memmove(c_target, p_target, len(target))
991         return True
992
993     def get_target(self):
994         """Return a string to the target of the request"""
995         c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
996 #        print "target_addr " + str(addressof(c_target.contents))
997         return string_at(c_target, self.req.contents.targetlen)
998
999     def set_data(self, data):
1000         """Sets requests data. Data should be a xseg protocol structure"""
1001         if isinstance(data, basestring):
1002             if len(data) != self.req.contents.datalen:
1003                 return False
1004             p_data = create_string_buffer(data)
1005         else:
1006             if sizeof(data) != self.req.contents.datalen:
1007                 return False
1008             p_data = pointer(data)
1009         c_data = xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req)
1010         memmove(c_data, p_data, self.req.contents.datalen)
1011
1012         return True
1013
1014     def get_data(self, _type=None):
1015         """return a pointer to the data buffer of the request, casted to the
1016         selected type"""
1017 #        print "data addr " + str(addressof(xseg_get_data_nonstatic(\
1018 #            self.xseg_ctx.ctx, self.req).contents))
1019 #        ret = cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
1020 #                   _type)
1021 #        print addressof(ret.contents)
1022 #        return ret
1023         if _type:
1024             return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
1025                         POINTER(_type))
1026         else:
1027             return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
1028                         c_void_p)
1029
1030     def submit(self):
1031         """Submit the associated xseg_request"""
1032         p = xseg_submit(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno,
1033                         X_ALLOC)
1034         if p == NoPort:
1035             raise Exception
1036         xseg_signal(self.xseg_ctx.ctx, p)
1037
1038     def wait(self):
1039         """Wait until the associated xseg_request is responded, discarding any
1040         other requests that may be received in the meantime"""
1041         self.xseg_ctx.wait_requests([self])
1042
1043     def success(self):
1044         if not bool(self.req.contents.state & XS_SERVED) and not \
1045             bool(self.req.contents.state & XS_FAILED):
1046             raise Error("Request not completed, nor Failed")
1047         return bool((self.req.contents.state & XS_SERVED) and not \
1048                    (self.req.contents.state & XS_FAILED))
1049
1050     @classmethod
1051     def get_write_request(cls, xseg, dst, target, data=None, offset=0,
1052             datalen=0, flags=0):
1053         if data is None:
1054             data = ""
1055         size = len(data)
1056         if not datalen:
1057             datalen = size
1058
1059         return cls(xseg, dst, target, op=X_WRITE, data=data, offset=offset,
1060                    size=size, datalen=datalen, flags=flags)
1061
1062     @classmethod
1063     def get_read_request(cls, xseg, dst, target, size=0, offset=0, datalen=0):
1064         if not datalen:
1065             datalen=size
1066         return cls(xseg, dst, target, op=X_READ, offset=offset, size=size,
1067                    datalen=datalen)
1068
1069     @classmethod
1070     def get_info_request(cls, xseg, dst, target):
1071         return cls(xseg, dst, target, op=X_INFO)
1072
1073     @classmethod
1074     def get_copy_request(cls, xseg, dst, target, copy_target=None, size=0, offset=0):
1075         datalen = sizeof(xseg_request_copy)
1076         xcopy = xseg_request_copy()
1077         xcopy.target = target
1078         xcopy.targetlen = len(target)
1079         return cls(xseg, dst, copy_target, op=X_COPY, data=xcopy, datalen=datalen,
1080                 size=size, offset=offset)
1081     @classmethod
1082     def get_acquire_request(cls, xseg, dst, target, wait=False):
1083         flags = 0
1084         if not wait:
1085             flags = XF_NOSYNC
1086         return cls(xseg, dst, target, op=X_ACQUIRE, flags=flags)
1087
1088     @classmethod
1089     def get_release_request(cls, xseg, dst, target, force=False):
1090         flags = 0
1091         if force:
1092             flags = XF_FORCE
1093         return cls(xseg, dst, target, op=X_RELEASE, flags=flags)
1094
1095     @classmethod
1096     def get_delete_request(cls, xseg, dst, target):
1097         return cls(xseg, dst, target, op=X_DELETE)
1098
1099     @classmethod
1100     def get_clone_request(cls, xseg, dst, target, clone=None, clone_size=0,
1101             cont_addr=False):
1102         datalen = sizeof(xseg_request_clone)
1103         xclone = xseg_request_clone()
1104         xclone.target = target
1105         xclone.targetlen= len(target)
1106         xclone.size = clone_size
1107
1108         flags = 0
1109         if cont_addr:
1110             flags = XF_CONTADDR
1111
1112         return cls(xseg, dst, clone, op=X_CLONE, data=xclone, datalen=datalen,
1113                 flags=flags)
1114
1115     @classmethod
1116     def get_open_request(cls, xseg, dst, target):
1117         return cls(xseg, dst, target, op=X_OPEN)
1118
1119     @classmethod
1120     def get_close_request(cls, xseg, dst, target):
1121         return cls(xseg, dst, target, op=X_CLOSE)
1122
1123     @classmethod
1124     def get_snapshot_request(cls, xseg, dst, target, snap=None):
1125         datalen = sizeof(xseg_request_snapshot)
1126         xsnapshot = xseg_request_snapshot()
1127         xsnapshot.target = snap
1128         xsnapshot.targetlen= len(snap)
1129
1130         return cls(xseg, dst, target, op=X_SNAPSHOT, data=xsnapshot,
1131                 datalen=datalen)
1132
1133     @classmethod
1134     def get_mapr_request(cls, xseg, dst, target, offset=0, size=0):
1135         return cls(xseg, dst, target, op=X_MAPR, offset=offset, size=size,
1136                 datalen=0)
1137
1138     @classmethod
1139     def get_mapw_request(cls, xseg, dst, target, offset=0, size=0):
1140         return cls(xseg, dst, target, op=X_MAPW, offset=offset, size=size,
1141                 datalen=0)
1142
1143     @classmethod
1144     def get_hash_request(cls, xseg, dst, target, size=0, offset=0):
1145         return cls(xseg, dst, target, op=X_HASH, size=size, offset=offset)