Statistics
| Branch: | Tag: | Revision:

root / xseg / tools / archipelago / archipelago / common.py @ 8ade203a

History | View | Annotate | Download (34.5 kB)

1
#!/usr/bin/env python
2

    
3
# Copyright 2012 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35
#
36

    
37

    
38
from xseg.xseg_api import *
39
from xseg.xprotocol import *
40
from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \
41
    create_string_buffer, pointer, sizeof, POINTER, byref, c_int, c_char, Structure
42
import ctypes
43
cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
44

    
45
import os
46
import sys
47
import time
48
import psutil
49
import errno
50
import signal
51
from subprocess import check_call
52
from collections import namedtuple
53
import socket
54
import random
55
from select import select
56
import ConfigParser
57

    
58
random.seed()
59
hostname = socket.gethostname()
60

    
61
valid_role_types = ['file_blocker', 'rados_blocker', 'mapperd', 'vlmcd']
62
valid_segment_types = ['segdev', 'posix']
63

    
64
peers = dict()
65
xsegbd_args = []
66
segment = None
67
modules = ['xseg', 'segdev', 'xseg_posix', 'xseg_pthread', 'xseg_segdev']
68
xsegbd = 'xsegbd'
69

    
70
BIN_DIR = '/usr/bin/'
71
DEFAULTS = '/etc/archipelago/archipelago.conf'
72

    
73
#system defaults
74
ARCHIP_PREFIX = 'archip_'
75
LOG_SUFFIX = '.log'
76
PID_SUFFIX = '.pid'
77
PIDFILE_PATH = "/var/run/archipelago"
78
VLMC_LOCK_FILE = 'vlmc.lock'
79
LOGS_PATH = "/var/log/archipelago"
80
LOCK_PATH = "/var/lock"
81
DEVICE_PREFIX = "/dev/xsegbd"
82
XSEGBD_SYSFS = "/sys/bus/xsegbd/"
83

    
84
CHARDEV_NAME = "/dev/segdev"
85
CHARDEV_MAJOR = 60
86
CHARDEV_MINOR = 0
87

    
88
REQS = 512
89

    
90
FILE_BLOCKER = 'archip-filed'
91
RADOS_BLOCKER = 'archip-sosd'
92
MAPPER = 'archip-mapperd'
93
VLMC = 'archip-vlmcd'
94

    
95
def is_power2(x):
96
    return bool(x != 0 and (x & (x-1)) == 0)
97

    
98
#hack to test green waiting with python gevent.
99
class posixfd_signal_desc(Structure):
100
    pass
101
posixfd_signal_desc._fields_ = [
102
    ('signal_file', c_char * sizeof(c_void_p)),
103
    ('fd', c_int),
104
    ('flag', c_int),
105
]
106

    
107
def xseg_wait_signal_green(ctx, sd, timeout):
108
    posixfd_sd = cast(sd, POINTER(posixfd_signal_desc))
109
    fd = posixfd_sd.contents.fd
110
    select([fd], [], [], timeout/1000000.0)
111
    while True:
112
        try:
113
            os.read(fd, 512)
114
        except OSError as (e,msg):
115
            if e == 11:
116
                break
117
            else:
118
                raise OSError(e, msg)
119

    
120

    
121
class Peer(object):
122
    cli_opts = None
123

    
124
    def __init__(self, role=None, daemon=True, nr_ops=16,
125
                 logfile=None, pidfile=None, portno_start=None,
126
                 portno_end=None, log_level=0, spec=None):
127
        if not role:
128
            raise Error("Role was not provided")
129
        self.role = role
130

    
131
        self.nr_ops = nr_ops
132
        if not self.nr_ops > 0:
133
            raise Error("Invalid nr_ops for %s" % role)
134

    
135
        if not is_power2(self.nr_ops):
136
            raise Error("nr_ops of %s is not a power of 2" % role)
137

    
138
        if not self.executable:
139
            raise Error("Executable must be provided for %s" % role)
140

    
141
        if portno_start is None:
142
            raise Error("Portno_start must be provided for %s" % role)
143
        self.portno_start = portno_start
144

    
145
        if portno_end is None:
146
            raise Error("Portno_end must be provided for %s" % role)
147
        self.portno_end = portno_end
148

    
149
        self.daemon = daemon
150
        if not spec:
151
            raise Error("Xseg spec was not provided for %s" % role)
152
        self.spec = spec
153

    
154
        if logfile:
155
            self.logfile = logfile
156
        else:
157
            self.logfile = os.path.join(LOGS_PATH, role + LOG_SUFFIX)
158

    
159
        if pidfile:
160
            self.pidfile = pidfile
161
        else:
162
            self.pidfile = os.path.join(PIDFILE_PATH, role + PID_SUFFIX)
163

    
164
        try:
165
            if not os.path.isdir(os.path.dirname(self.logfile)):
166
                raise Error("Log path %s does not exist" % self.logfile)
167
        except:
168
            raise Error("Log path %s does not exist or is not a directory" %
169
                        self.logfile)
170

    
171
        try:
172
            os.makedirs(os.path.dirname(self.pidfile))
173
        except OSError as e:
174
            if e.errno == errno.EEXIST:
175
                if os.path.isdir(os.path.dirname(self.pidfile)):
176
                    pass
177
                else:
178
                    raise Error("Pid path %s is not a directory" %
179
                                os.path.dirname(self.pidfile))
180
            else:
181
                raise Error("Cannot create path %s" %
182
                            os.path.dirname(self.pidfile))
183

    
184
        self.log_level = log_level
185

    
186
        if self.log_level < 0 or self.log_level > 3:
187
            raise Error("%s: Invalid log level %d" %
188
                        (self.role, self.log_level))
189

    
190
        if self.cli_opts is None:
191
            self.cli_opts = []
192
        self.set_cli_options()
193

    
194
    def start(self):
195
        if self.get_pid():
196
            raise Error("Peer has valid pidfile")
197
        cmd = [os.path.join(BIN_DIR, self.executable)] + self.cli_opts
198
        try:
199
            check_call(cmd, shell=False)
200
        except Exception as e:
201
            raise Error("Cannot start %s: %s" % (self.role, str(e)))
202

    
203
    def stop(self):
204
        pid = self.get_pid()
205
        if not pid:
206
            raise Error("Peer %s not running" % self.role)
207

    
208
        if self.__is_running(pid):
209
            os.kill(pid, signal.SIGTERM)
210

    
211
    def __is_running(self, pid):
212
        name = self.executable
213
        for p in psutil.process_iter():
214
            if p.name[0:len(name)] == name and pid == p.pid:
215
                return True
216

    
217
        return False
218

    
219
    def is_running(self):
220
        pid = self.get_pid()
221
        if not pid:
222
            return False
223

    
224
        if not self.__is_running(pid):
225
            raise Error("Peer %s has valid pidfile but is not running" %
226
                        self.role)
227

    
228
        return True
229

    
230
    def get_pid(self):
231
        if not self.pidfile:
232
            return None
233

    
234
        pf = None
235
        try:
236
            pf = open(self.pidfile, "r")
237
            pid = int(pf.read())
238
            pf.close()
239
        except:
240
            if pf:
241
                pf.close()
242
            return None
243

    
244
        return pid
245

    
246
    def set_cli_options(self):
247
        if self.daemon:
248
            self.cli_opts.append("-d")
249
        if self.nr_ops:
250
            self.cli_opts.append("-n")
251
            self.cli_opts.append(str(self.nr_ops))
252
        if self.logfile:
253
            self.cli_opts.append("-l")
254
            self.cli_opts.append(self.logfile)
255
        if self.pidfile:
256
            self.cli_opts.append("--pidfile")
257
            self.cli_opts.append(self.pidfile)
258
        if self.portno_start is not None:
259
            self.cli_opts.append("-sp")
260
            self.cli_opts.append(str(self.portno_start))
261
        if self.portno_end is not None:
262
            self.cli_opts.append("-ep")
263
            self.cli_opts.append(str(self.portno_end))
264
        if self.log_level is not None:
265
            self.cli_opts.append("-v")
266
            self.cli_opts.append(str(self.log_level))
267
        if self.spec:
268
            self.cli_opts.append("-g")
269
            self.cli_opts.append(self.spec)
270

    
271

    
272
class MTpeer(Peer):
273
    def __init__(self, nr_threads=1, **kwargs):
274
        self.nr_threads = nr_threads
275
        super(MTpeer, self).__init__(**kwargs)
276

    
277
        if self.cli_opts is None:
278
            self.cli_opts = []
279
        self.set_mtcli_options()
280

    
281
    def set_mtcli_options(self):
282
        self.cli_opts.append("-t")
283
        self.cli_opts.append(str(self.nr_threads))
284

    
285

    
286
class Sosd(MTpeer):
287
    def __init__(self, pool=None, **kwargs):
288
        self.executable = RADOS_BLOCKER
289
        self.pool = pool
290
        super(Sosd, self).__init__(**kwargs)
291

    
292
        if self.cli_opts is None:
293
            self.cli_opts = []
294
        self.set_sosd_cli_options()
295

    
296
    def set_sosd_cli_options(self):
297
        if self.pool:
298
            self.cli_opts.append("--pool")
299
            self.cli_opts.append(self.pool)
300

    
301

    
302
class Filed(MTpeer):
303
    def __init__(self, archip_dir=None, prefix=None, fdcache=None,
304
                 unique_str=None, nr_threads=1, nr_ops=16, **kwargs):
305
        self.executable = FILE_BLOCKER
306
        self.archip_dir = archip_dir
307
        self.prefix = prefix
308
        self.fdcache = fdcache
309
        self.unique_str = unique_str
310
        nr_threads = nr_ops
311
        if self.fdcache and fdcache < 2*nr_threads:
312
            raise Error("Fdcache should be greater than 2*nr_threads")
313

    
314
        super(Filed, self).__init__(nr_threads=nr_threads, nr_ops=nr_ops, **kwargs)
315

    
316
        if not self.archip_dir:
317
            raise Error("%s: Archip dir must be set" % self.role)
318
        if not os.path.isdir(self.archip_dir):
319
            raise Error("%s: Archip dir invalid" % self.role)
320
        if not self.fdcache:
321
            self.fdcache = 2*self.nr_ops
322
        if not self.unique_str:
323
            self.unique_str = hostname + '_' + str(self.portno_start)
324

    
325
        if self.cli_opts is None:
326
            self.cli_opts = []
327
        self.set_filed_cli_options()
328

    
329
    def set_filed_cli_options(self):
330
        if self.unique_str:
331
            self.cli_opts.append("--uniquestr")
332
            self.cli_opts.append(self.unique_str)
333
        if self.fdcache:
334
            self.cli_opts.append("--fdcache")
335
            self.cli_opts.append(str(self.fdcache))
336
        if self.archip_dir:
337
            self.cli_opts.append("--archip")
338
            self.cli_opts.append(self.archip_dir)
339
        if self.prefix:
340
            self.cli_opts.append("--prefix")
341
            self.cli_opts.append(self.prefix)
342

    
343

    
344
class Mapperd(Peer):
345
    def __init__(self, blockerm_port=None, blockerb_port=None, **kwargs):
346
        self.executable = MAPPER
347
        if blockerm_port is None:
348
            raise Error("blockerm_port must be provied for %s" % role)
349
        self.blockerm_port = blockerm_port
350

    
351
        if blockerb_port is None:
352
            raise Error("blockerb_port must be provied for %s" % role)
353
        self.blockerb_port = blockerb_port
354
        super(Mapperd, self).__init__(**kwargs)
355

    
356
        if self.cli_opts is None:
357
            self.cli_opts = []
358
        self.set_mapperd_cli_options()
359

    
360
    def set_mapperd_cli_options(self):
361
        if self.blockerm_port is not None:
362
            self.cli_opts.append("-mbp")
363
            self.cli_opts.append(str(self.blockerm_port))
364
        if self.blockerb_port is not None:
365
            self.cli_opts.append("-bp")
366
            self.cli_opts.append(str(self.blockerb_port))
367

    
368

    
369
class Vlmcd(Peer):
370
    def __init__(self, blocker_port=None, mapper_port=None, **kwargs):
371
        self.executable = VLMC
372
        if blocker_port is None:
373
            raise Error("blocker_port must be provied for %s" % role)
374
        self.blocker_port = blocker_port
375

    
376
        if mapper_port is None:
377
            raise Error("mapper_port must be provied for %s" % role)
378
        self.mapper_port = mapper_port
379
        super(Vlmcd, self).__init__(**kwargs)
380

    
381
        if self.cli_opts is None:
382
            self.cli_opts = []
383
        self.set_vlmcd_cli_opts()
384

    
385
    def set_vlmcd_cli_opts(self):
386
        if self.blocker_port is not None:
387
            self.cli_opts.append("-bp")
388
            self.cli_opts.append(str(self.blocker_port))
389
        if self.mapper_port is not None:
390
            self.cli_opts.append("-mp")
391
            self.cli_opts.append(str(self.mapper_port))
392

    
393

    
394
config = {
395
    'CEPH_CONF_FILE': '/etc/ceph/ceph.conf',
396
#    'SPEC': "segdev:xsegbd:1024:5120:12",
397
    'SEGMENT_TYPE': 'segdev',
398
    'SEGMENT_NAME': 'xsegbd',
399
    'SEGMENT_PORTS': 1024,
400
    'SEGMENT_SIZE': 5120,
401
    'SEGMENT_ALIGNMENT': 12,
402
    'XSEGBD_START': 0,
403
    'XSEGBD_END': 499,
404
    'VTOOL_START': 1003,
405
    'VTOOL_END': 1003,
406
    #RESERVED 1023
407
}
408

    
409
FIRST_COLUMN_WIDTH = 23
410
SECOND_COLUMN_WIDTH = 23
411

    
412

    
413
def green(s):
414
    return '\x1b[32m' + str(s) + '\x1b[0m'
415

    
416

    
417
def red(s):
418
    return '\x1b[31m' + str(s) + '\x1b[0m'
419

    
420

    
421
def yellow(s):
422
    return '\x1b[33m' + str(s) + '\x1b[0m'
423

    
424

    
425
def pretty_print(cid, status):
426
    sys.stdout.write(cid.ljust(FIRST_COLUMN_WIDTH))
427
    sys.stdout.write(status.ljust(SECOND_COLUMN_WIDTH))
428
    sys.stdout.write('\n')
429
    return
430

    
431

    
432
class Error(Exception):
433
    def __init__(self, msg):
434
        self.msg = msg
435

    
436
    def __str__(self):
437
        return self.msg
438

    
439
class Segment(object):
440
    type = 'segdev'
441
    name = 'xsegbd'
442
    ports = 1024
443
    size = 5120
444
    alignment = 12
445

    
446
    spec = None
447

    
448
    def __init__(self, type, name, ports, size, align=12):
449
        initialize_xseg()
450
        self.type = type
451
        self.name = name
452
        self.ports = ports
453
        self.size = size
454
        self.alignment = align
455

    
456
        if self.type not in valid_segment_types:
457
            raise Error("Segment type not valid")
458
        if self.alignment != 12:
459
            raise Error("Wrong alignemt")
460

    
461
        self.spec = self.get_spec()
462

    
463
    def get_spec(self):
464
        if not self.spec:
465
            params = [self.type, self.name, str(self.ports), str(self.size),
466
                      str(self.alignment)]
467
            self.spec = ':'.join(params).encode()
468
        return self.spec
469

    
470
    def create(self):
471
        #fixme blocking....
472
        xconf = xseg_config()
473
        c_spec = create_string_buffer(self.spec)
474
        xseg_parse_spec(c_spec, xconf)
475
        r = xseg_create(xconf)
476
        if r < 0:
477
            raise Error("Cannot create segment")
478

    
479
    def destroy(self):
480
        #fixme blocking....
481
        try:
482
            xseg = self.join()
483
            xseg_leave(xseg)
484
            xseg_destroy(xseg)
485
        except Exception:
486
            raise Error("Cannot destroy segment")
487

    
488
    def join(self):
489
        xconf = xseg_config()
490
        spec_buf = create_string_buffer(self.spec)
491
        xseg_parse_spec(spec_buf, xconf)
492
        ctx = xseg_join(xconf.type, xconf.name, "posixfd",
493
                        cast(0, cb_null_ptrtype))
494
        if not ctx:
495
            raise Error("Cannot join segment")
496

    
497
        return ctx
498

    
499
def check_conf():
500
    port_ranges = []
501

    
502
    def isExec(file_path):
503
        return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
504

    
505
    def validExec(program):
506
        for path in os.environ["PATH"].split(os.pathsep):
507
            exe_file = os.path.join(path, program)
508
            if isExec(exe_file):
509
                return True
510
        return False
511

    
512
    def validatePort(portno, limit):
513
        if portno >= limit:
514
            raise Error("Portno %d out of range" % portno)
515

    
516
    def validatePortRange(portno_start, portno_end, limit):
517
        validatePort(portno_start, limit)
518
        validatePort(portno_end, limit)
519
        if portno_start > portno_end:
520
            raise Error("Portno_start > Portno_end: %d > %d " %
521
                        (portno_start, portno_end))
522
        for start, end in port_ranges:
523
            if not (portno_end < start or portno_start > end):
524
                raise Error("Port range conflict: (%d, %d) confilcts with (%d, %d)" %
525
                            (portno_start, portno_end,  start, end))
526
        port_ranges.append((portno_start, portno_end))
527

    
528
    xseg_type = config['SEGMENT_TYPE']
529
    xseg_name = config['SEGMENT_NAME']
530
    xseg_ports = config['SEGMENT_PORTS']
531
    xseg_size = config['SEGMENT_SIZE']
532
    xseg_align = config['SEGMENT_ALIGNMENT']
533

    
534
    global segment
535
    segment = Segment(xseg_type, xseg_name, xseg_ports, xseg_size, xseg_align)
536

    
537

    
538
    try:
539
        if not config['roles']:
540
            raise Error("Roles setup must be provided")
541
    except KeyError:
542
        raise Error("Roles setup must be provided")
543

    
544
    for role, role_type in config['roles']:
545
        if role_type not in valid_role_types:
546
            raise Error("%s is not a valid role" % role_type)
547
        try:
548
            role_config = config[role]
549
        except:
550
            raise Error("No config found for %s" % role)
551

    
552
        if role_type == 'file_blocker':
553
            peers[role] = Filed(role=role, spec=segment.get_spec(),
554
                                 prefix=ARCHIP_PREFIX, **role_config)
555
        elif role_type == 'rados_blocker':
556
            peers[role] = Sosd(role=role, spec=segment.get_spec(),
557
                               **role_config)
558
        elif role_type == 'mapperd':
559
            peers[role] = Mapperd(role=role, spec=segment.get_spec(),
560
                                  **role_config)
561
        elif role_type == 'vlmcd':
562
            peers[role] = Vlmcd(role=role, spec=segment.get_spec(),
563
                                **role_config)
564
        else:
565
            raise Error("No valid peer type: %s" % role_type)
566
        validatePortRange(peers[role].portno_start, peers[role].portno_end,
567
                          xseg_ports)
568

    
569
    validatePortRange(config['VTOOL_START'], config['VTOOL_END'], xseg_ports)
570
    validatePortRange(config['XSEGBD_START'], config['XSEGBD_END'],
571
                      xseg_ports)
572

    
573
    xsegbd_range = config['XSEGBD_END'] - config['XSEGBD_START']
574
    vlmcd_range = peers['vlmcd'].portno_end - peers['vlmcd'].portno_start
575
    if xsegbd_range > vlmcd_range:
576
        raise Error("Xsegbd port range must be smaller that vlmcd port range")
577
    return True
578

    
579
def get_segment():
580
    return segment
581

    
582
def construct_peers():
583
    return peers
584

    
585
vtool_port = None
586
def get_vtool_port():
587
    global vtool_port
588
    if vtool_port is None:
589
        vtool_port = random.randint(config['VTOOL_START'], config['VTOOL_END'])
590
    return vtool_port
591

    
592
acquired_locks = {}
593

    
594
def get_lock(lock_file):
595
    while True:
596
        try:
597
            fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
598
            break
599
        except OSError, (err, reason):
600
            print >> sys.stderr, lock_file, reason
601
            if err == errno.EEXIST:
602
                time.sleep(0.2)
603
            else:
604
                raise OSError(err, lock_file + ' ' + reason)
605
    return fd
606

    
607
def exclusive(get_port=False):
608
    def wrap(fn):
609
        def lock(*args, **kwargs):
610
            if not os.path.exists(LOCK_PATH):
611
                try:
612
                    os.mkdir(LOCK_PATH)
613
                except OSError, (err, reason):
614
                    print >> sys.stderr, reason
615

    
616
            if not os.path.isdir(LOCK_PATH):
617
                raise Error("Locking error: %s is not a directory" % LOCK_PATH)
618

    
619
            if get_port:
620
                vtool_port = get_vtool_port()
621
                lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE + '_' + str(vtool_port))
622
            else:
623
                lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE)
624
            try:
625
                depth = acquired_locks[lock_file]
626
                if depth == 0:
627
                    fd = get_lock(lock_file)
628
            except KeyError:
629
                acquired_locks[lock_file] = 0
630
                fd = get_lock(lock_file)
631

    
632
            acquired_locks[lock_file] += 1
633
            try:
634
                r = fn(*args, **kwargs)
635
            finally:
636
                acquired_locks[lock_file] -= 1
637
                depth = acquired_locks[lock_file]
638
                if depth == 0:
639
                    os.close(fd)
640
                    os.unlink(lock_file)
641
            return r
642

    
643
        return lock
644
    return wrap
645

    
646
def createBDict(cfg, section):
647
        sec_dic = {}
648
        sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
649
        sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
650
        sec_dic['log_level'] = cfg.getint(section, 'log_level')
651
        sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
652
        try:
653
                sec_dic['nr_threads'] = cfg.getint(section, 'nr_threads')
654
                sec_dic['archip_dir'] = cfg.get(section, 'archip_dir')
655
                sec_dic['fdcache'] = cfg.getint(section, 'fdcache')
656
        except:
657
                sec_dic['pool'] = cfg.get(section, 'pool')
658
        return sec_dic
659

    
660
def createMDict(cfg, section):
661
        sec_dic = {}
662
        sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
663
        sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
664
        sec_dic['log_level'] = cfg.getint(section, 'log_level')
665
        sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
666
        sec_dic['blockerb_port'] = cfg.getint(section, 'blockerb_port')
667
        sec_dic['blockerm_port'] = cfg.getint(section, 'blockerm_port')
668
        return sec_dic
669

    
670
def createVDict(cfg, section):
671
        sec_dic = {}
672
        sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
673
        sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
674
        sec_dic['log_level'] = cfg.getint(section, 'log_level')
675
        sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
676
        sec_dic['blocker_port'] = cfg.getint(section, 'blocker_port')
677
        sec_dic['mapper_port'] = cfg.getint(section, 'mapper_port')
678
        return sec_dic
679

    
680

    
681
def loadrc(rc):
682
    try:
683
        if rc is None:
684
            cfg_dir = os.path.expanduser(DEFAULTS)
685
        else:
686
            cfg_dir = rc
687
        cfg_fd = open(cfg_dir)
688
    except:
689
        raise Error("Cannot read config file")
690

    
691
    cfg = ConfigParser.ConfigParser()
692
    cfg.readfp(cfg_fd)
693
    config['SEGMENT_PORTS'] = cfg.getint('XSEG','SEGMENT_PORTS')
694
    config['SEGMENT_SIZE'] = cfg.getint('XSEG','SEGMENT_SIZE')
695
    config['XSEGBD_START'] = cfg.getint('XSEG','XSEGBD_START')
696
    config['XSEGBD_END'] = cfg.getint('XSEG','XSEGBD_END')
697
    config['VTOOL_START'] = cfg.getint('XSEG','VTOOL_START')
698
    config['VTOOL_END'] = cfg.getint('XSEG','VTOOL_END')
699
    config['roles'] = eval(cfg.get('ROLES','order'))
700
    config['blockerb'] = createBDict(cfg, 'BLOCKERB')
701
    config['blockerm'] = createBDict(cfg, 'BLOCKERM')
702
    config['mapperd'] = createMDict(cfg, 'MAPPERD')
703
    config['vlmcd'] = createVDict(cfg, 'VLMCD')
704

    
705
    if not check_conf():
706
        raise Error("Invalid conf file")
707

    
708

    
709
def loaded_modules():
710
    lines = open("/proc/modules").read().split("\n")
711
    modules = [f.split(" ")[0] for f in lines]
712
    return modules
713

    
714

    
715
def loaded_module(name):
716
    return name in loaded_modules()
717

    
718

    
719
def load_module(name, args):
720
    s = "Loading %s " % name
721
    sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
722
    modules = loaded_modules()
723
    if name in modules:
724
        sys.stdout.write(yellow("Already loaded".ljust(SECOND_COLUMN_WIDTH)))
725
        sys.stdout.write("\n")
726
        return
727
    cmd = ["modprobe", "%s" % name]
728
    if args:
729
        for arg in args:
730
            cmd.extend(["%s=%s" % (arg)])
731
    try:
732
        check_call(cmd, shell=False)
733
    except Exception:
734
        sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
735
        sys.stdout.write("\n")
736
        raise Error("Cannot load module %s. Check system logs" % name)
737
    sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
738
    sys.stdout.write("\n")
739

    
740

    
741
def unload_module(name):
742
    s = "Unloading %s " % name
743
    sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
744
    modules = loaded_modules()
745
    if name not in modules:
746
        sys.stdout.write(yellow("Not loaded".ljust(SECOND_COLUMN_WIDTH)))
747
        sys.stdout.write("\n")
748
        return
749
    cmd = ["modprobe -r %s" % name]
750
    try:
751
        check_call(cmd, shell=True)
752
    except Exception:
753
        sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
754
        sys.stdout.write("\n")
755
        raise Error("Cannot unload module %s. Check system logs" % name)
756
    sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
757
    sys.stdout.write("\n")
758

    
759
xseg_initialized = False
760

    
761

    
762
def initialize_xseg():
763
    global xseg_initialized
764
    if not xseg_initialized:
765
        xseg_initialize()
766
        xseg_initialized = True
767

    
768

    
769
def check_running(name, pid=None):
770
    for p in psutil.process_iter():
771
        if p.name[0:len(name)] == name:
772
            if pid:
773
                if pid == p.pid:
774
                    return pid
775
            else:
776
                return pid
777
    return None
778

    
779

    
780
def check_pidfile(name):
781
    pidfile = os.path.join(PIDFILE_PATH, name + PID_SUFFIX)
782
    pf = None
783
    try:
784
        pf = open(pidfile, "r")
785
        pid = int(pf.read())
786
        pf.close()
787
    except:
788
        if pf:
789
            pf.close()
790
        return -1
791

    
792
    return pid
793

    
794
class Xseg_ctx(object):
795
    ctx = None
796
    port = None
797
    portno = None
798
    signal_desc = None
799

    
800
    def __init__(self, segment, portno):
801
        ctx = segment.join()
802
        if not ctx:
803
            raise Error("Cannot join segment")
804
        port = xseg_bind_port(ctx, portno, c_void_p(0))
805
        if not port:
806
            raise Error("Cannot bind to port")
807
        sd = xseg_get_signal_desc_nonstatic(ctx, port)
808
        if not sd:
809
            raise Error("Cannot get signal descriptor")
810
        xseg_init_local_signal(ctx, portno)
811
        self.ctx = ctx
812
        self.port = port
813
        self.portno = portno
814
        self.signal_desc = sd
815

    
816
    def __del__(self):
817
        return
818

    
819
    def __enter__(self):
820
        if not self.ctx:
821
            raise Error("No segment")
822
        return self
823

    
824
    def __exit__(self, type_, value, traceback):
825
        self.shutdown()
826
        return False
827

    
828
    def shutdown(self):
829
        if self.ctx:
830
        #    xseg_quit_local_signal(self.ctx, self.portno)
831
            xseg_leave(self.ctx)
832
        self.ctx = None
833

    
834
    def wait_request(self):
835
        xseg_prepare_wait(self.ctx, self.portno)
836
        while True:
837
            received = xseg_receive(self.ctx, self.portno, 0)
838
            if received:
839
                xseg_cancel_wait(self.ctx, self.portno)
840
                return received
841
            else:
842
                xseg_wait_signal_green(self.ctx, self.signal_desc, 10000000)
843

    
844
    def wait_requests(self, requests):
845
        while True:
846
            received = self.wait_request()
847
            for req in requests:
848
                xseg_req = req.req
849
                if addressof(received.contents) == \
850
                            addressof(xseg_req.contents):
851
                    return req
852
            p = xseg_respond(self.ctx, received, self.portno, X_ALLOC)
853
            if p == NoPort:
854
                xseg_put_request(self.ctx, received, self.portno)
855
            else:
856
                xseg_signal(self.ctx, p)
857

    
858

    
859
class Request(object):
860
    xseg_ctx = None
861
    req = None
862

    
863
    def __init__(self, xseg_ctx, dst_portno, target, datalen=0, size=0, op=None,
864
                 data=None, flags=0, offset=0):
865
        if not target:
866
            raise Error("No target")
867
        targetlen = len(target)
868
        if not datalen and data:
869
            if isinstance(data, basestring):
870
                datalen = len(data)
871
            else:
872
                datalen = sizeof(data)
873

    
874
        ctx = xseg_ctx.ctx
875
        if not ctx:
876
            raise Error("No context")
877
        req = xseg_get_request(ctx, xseg_ctx.portno, dst_portno, X_ALLOC)
878
        if not req:
879
            raise Error("Cannot get request")
880
        r = xseg_prep_request(ctx, req, targetlen, datalen)
881
        if r < 0:
882
            xseg_put_request(ctx, req, xseg_ctx.portno)
883
            raise Error("Cannot prepare request")
884
        self.req = req
885
        self.xseg_ctx = xseg_ctx
886
 
887
        if not self.set_target(target):
888
            self.put()
889
            raise Error("Cannot set target")
890

    
891
        if (data):
892
            if not self.set_data(data):
893
                self.put()
894
                raise Error("Cannot set data")
895

    
896
        self.set_size(size)
897
        self.set_op(op)
898
        self.set_flags(flags)
899
        self.set_offset(offset)
900

    
901
        return
902

    
903
    def __enter__(self):
904
        if not self.req:
905
            raise Error("xseg request not set")
906
        return self
907

    
908
    def __exit__(self, type_, value, traceback):
909
        self.put()
910
        self.req = None
911
        return False
912

    
913
    def put(self, force=False):
914
        if not self.req:
915
            return False;
916
        if not force:
917
            if xq_count(byref(self.req.contents.path)) > 0:
918
                return False
919
        xseg_put_request(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno)
920
        self.req = None
921
        return True
922

    
923
    def get_datalen(self):
924
        return self.req.contents.datalen
925

    
926
    def set_op(self, op):
927
        self.req.contents.op = op
928

    
929
    def get_op(self):
930
        return self.req.contents.op
931

    
932
    def set_offset(self, offset):
933
        self.req.contents.offset = offset
934

    
935
    def get_offset(self):
936
        return self.req.contents.offset
937

    
938
    def get_size(self):
939
        return self.req.contents.size
940

    
941
    def set_size(self, size):
942
        self.req.contents.size = size
943

    
944
    def get_serviced(self):
945
        return self.req.contents.serviced
946

    
947
    def set_serviced(self, serviced):
948
        self.req.contents.serviced = serviced
949

    
950
    def set_flags(self, flags):
951
        self.req.contents.flags = flags
952

    
953
    def get_flags(self):
954
        return self.req.contents.flags
955

    
956
    def set_target(self, target):
957
        """Sets the target of the request, respecting request's targetlen"""
958
        if len(target) != self.req.contents.targetlen:
959
            return False
960
        c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
961
        p_target = create_string_buffer(target)
962
#        print hex(addressof(c_target.contents))
963
        memmove(c_target, p_target, len(target))
964
        return True
965

    
966
    def get_target(self):
967
        """Return a string to the target of the request"""
968
        c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
969
#        print "target_addr " + str(addressof(c_target.contents))
970
        return string_at(c_target, self.req.contents.targetlen)
971

    
972
    def set_data(self, data):
973
        """Sets requests data. Data should be a xseg protocol structure"""
974
        if isinstance(data, basestring):
975
            if len(data) != self.req.contents.datalen:
976
                return False
977
            p_data = create_string_buffer(data)
978
        else:
979
            if sizeof(data) != self.req.contents.datalen:
980
                return False
981
            p_data = pointer(data)
982
        c_data = xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req)
983
        memmove(c_data, p_data, self.req.contents.datalen)
984

    
985
        return True
986

    
987
    def get_data(self, _type=None):
988
        """return a pointer to the data buffer of the request, casted to the
989
        selected type"""
990
#        print "data addr " + str(addressof(xseg_get_data_nonstatic(\
991
#            self.xseg_ctx.ctx, self.req).contents))
992
#        ret = cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
993
#                   _type)
994
#        print addressof(ret.contents)
995
#        return ret
996
        if _type:
997
            return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
998
                        POINTER(_type))
999
        else:
1000
            return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
1001
                        c_void_p)
1002

    
1003
    def submit(self):
1004
        """Submit the associated xseg_request"""
1005
        p = xseg_submit(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno,
1006
                        X_ALLOC)
1007
        if p == NoPort:
1008
            raise Exception
1009
        xseg_signal(self.xseg_ctx.ctx, p)
1010

    
1011
    def wait(self):
1012
        """Wait until the associated xseg_request is responded, discarding any
1013
        other requests that may be received in the meantime"""
1014
        self.xseg_ctx.wait_requests([self])
1015

    
1016
    def success(self):
1017
        if not bool(self.req.contents.state & XS_SERVED) and not \
1018
            bool(self.req.contents.state & XS_FAILED):
1019
            raise Error("Request not completed, nor Failed")
1020
        return bool((self.req.contents.state & XS_SERVED) and not \
1021
                   (self.req.contents.state & XS_FAILED))
1022

    
1023
    @classmethod
1024
    def get_write_request(cls, xseg, dst, target, data=None, offset=0,
1025
            datalen=0):
1026
        if data is None:
1027
            data = ""
1028
        size = len(data)
1029
        if not datalen:
1030
            datalen = size
1031

    
1032
        return cls(xseg, dst, target, op=X_WRITE, data=data, offset=offset,
1033
                   size=size, datalen=datalen)
1034

    
1035
    @classmethod
1036
    def get_read_request(cls, xseg, dst, target, size=0, offset=0, datalen=0):
1037
        if not datalen:
1038
            datalen=size
1039
        return cls(xseg, dst, target, op=X_READ, offset=offset, size=size,
1040
                   datalen=datalen)
1041

    
1042
    @classmethod
1043
    def get_info_request(cls, xseg, dst, target):
1044
        return cls(xseg, dst, target, op=X_INFO)
1045

    
1046
    @classmethod
1047
    def get_copy_request(cls, xseg, dst, target, copy_target=None, size=0, offset=0):
1048
        datalen = sizeof(xseg_request_copy)
1049
        xcopy = xseg_request_copy()
1050
        xcopy.target = target
1051
        xcopy.targetlen = len(target)
1052
        return cls(xseg, dst, copy_target, op=X_COPY, data=xcopy, datalen=datalen,
1053
                size=size, offset=offset)
1054
    @classmethod
1055
    def get_acquire_request(cls, xseg, dst, target, wait=False):
1056
        flags = 0
1057
        if not wait:
1058
            flags = XF_NOSYNC
1059
        return cls(xseg, dst, target, op=X_ACQUIRE, flags=flags)
1060

    
1061
    @classmethod
1062
    def get_release_request(cls, xseg, dst, target, force=False):
1063
        flags = 0
1064
        if force:
1065
            flags = XF_FORCE
1066
        return cls(xseg, dst, target, op=X_RELEASE, flags=flags)
1067

    
1068
    @classmethod
1069
    def get_delete_request(cls, xseg, dst, target):
1070
        return cls(xseg, dst, target, op=X_DELETE)
1071

    
1072
    @classmethod
1073
    def get_clone_request(cls, xseg, dst, target, clone=None, clone_size=0,
1074
            cont_addr=False):
1075
        datalen = sizeof(xseg_request_clone)
1076
        xclone = xseg_request_clone()
1077
        xclone.target = target
1078
        xclone.targetlen= len(target)
1079
        xclone.size = clone_size
1080

    
1081
        flags = 0
1082
        if cont_addr:
1083
            flags = XF_CONTADDR
1084

    
1085
        return cls(xseg, dst, clone, op=X_CLONE, data=xclone, datalen=datalen,
1086
                flags=flags)
1087

    
1088
    @classmethod
1089
    def get_open_request(cls, xseg, dst, target):
1090
        return cls(xseg, dst, target, op=X_OPEN)
1091

    
1092
    @classmethod
1093
    def get_close_request(cls, xseg, dst, target):
1094
        return cls(xseg, dst, target, op=X_CLOSE)
1095

    
1096
    @classmethod
1097
    def get_snapshot_request(cls, xseg, dst, target, snap=None):
1098
        datalen = sizeof(xseg_request_snapshot)
1099
        xsnapshot = xseg_request_snapshot()
1100
        xsnapshot.target = snap
1101
        xsnapshot.targetlen= len(snap)
1102

    
1103
        return cls(xseg, dst, target, op=X_SNAPSHOT, data=xsnapshot,
1104
                datalen=datalen)
1105

    
1106
    @classmethod
1107
    def get_mapr_request(cls, xseg, dst, target, offset=0, size=0):
1108
        return cls(xseg, dst, target, op=X_MAPR, offset=offset, size=size,
1109
                datalen=0)
1110

    
1111
    @classmethod
1112
    def get_mapw_request(cls, xseg, dst, target, offset=0, size=0):
1113
        return cls(xseg, dst, target, op=X_MAPW, offset=offset, size=size,
1114
                datalen=0)
1115

    
1116
    @classmethod
1117
    def get_hash_request(cls, xseg, dst, target, size=0, offset=0):
1118
        return cls(xseg, dst, target, op=X_HASH, size=size, offset=offset)