root / xseg / tools / archipelago / archipelago / common.py @ 8ade203a
History | View | Annotate | Download (34.5 kB)
1 |
#!/usr/bin/env python
|
---|---|
2 |
|
3 |
# Copyright 2012 GRNET S.A. All rights reserved.
|
4 |
#
|
5 |
# Redistribution and use in source and binary forms, with or
|
6 |
# without modification, are permitted provided that the following
|
7 |
# conditions are met:
|
8 |
#
|
9 |
# 1. Redistributions of source code must retain the above
|
10 |
# copyright notice, this list of conditions and the following
|
11 |
# disclaimer.
|
12 |
#
|
13 |
# 2. Redistributions in binary form must reproduce the above
|
14 |
# copyright notice, this list of conditions and the following
|
15 |
# disclaimer in the documentation and/or other materials
|
16 |
# provided with the distribution.
|
17 |
#
|
18 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
19 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
20 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
21 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
22 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
23 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
24 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
25 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
26 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
27 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
28 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
29 |
# POSSIBILITY OF SUCH DAMAGE.
|
30 |
#
|
31 |
# The views and conclusions contained in the software and
|
32 |
# documentation are those of the authors and should not be
|
33 |
# interpreted as representing official policies, either expressed
|
34 |
# or implied, of GRNET S.A.
|
35 |
#
|
36 |
|
37 |
|
38 |
from xseg.xseg_api import * |
39 |
from xseg.xprotocol import * |
40 |
from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \ |
41 |
create_string_buffer, pointer, sizeof, POINTER, byref, c_int, c_char, Structure |
42 |
import ctypes |
43 |
cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
|
44 |
|
45 |
import os |
46 |
import sys |
47 |
import time |
48 |
import psutil |
49 |
import errno |
50 |
import signal |
51 |
from subprocess import check_call |
52 |
from collections import namedtuple |
53 |
import socket |
54 |
import random |
55 |
from select import select |
56 |
import ConfigParser |
57 |
|
58 |
random.seed() |
59 |
hostname = socket.gethostname() |
60 |
|
61 |
valid_role_types = ['file_blocker', 'rados_blocker', 'mapperd', 'vlmcd'] |
62 |
valid_segment_types = ['segdev', 'posix'] |
63 |
|
64 |
peers = dict()
|
65 |
xsegbd_args = [] |
66 |
segment = None
|
67 |
modules = ['xseg', 'segdev', 'xseg_posix', 'xseg_pthread', 'xseg_segdev'] |
68 |
xsegbd = 'xsegbd'
|
69 |
|
70 |
BIN_DIR = '/usr/bin/'
|
71 |
DEFAULTS = '/etc/archipelago/archipelago.conf'
|
72 |
|
73 |
#system defaults
|
74 |
ARCHIP_PREFIX = 'archip_'
|
75 |
LOG_SUFFIX = '.log'
|
76 |
PID_SUFFIX = '.pid'
|
77 |
PIDFILE_PATH = "/var/run/archipelago"
|
78 |
VLMC_LOCK_FILE = 'vlmc.lock'
|
79 |
LOGS_PATH = "/var/log/archipelago"
|
80 |
LOCK_PATH = "/var/lock"
|
81 |
DEVICE_PREFIX = "/dev/xsegbd"
|
82 |
XSEGBD_SYSFS = "/sys/bus/xsegbd/"
|
83 |
|
84 |
CHARDEV_NAME = "/dev/segdev"
|
85 |
CHARDEV_MAJOR = 60
|
86 |
CHARDEV_MINOR = 0
|
87 |
|
88 |
REQS = 512
|
89 |
|
90 |
FILE_BLOCKER = 'archip-filed'
|
91 |
RADOS_BLOCKER = 'archip-sosd'
|
92 |
MAPPER = 'archip-mapperd'
|
93 |
VLMC = 'archip-vlmcd'
|
94 |
|
95 |
def is_power2(x): |
96 |
return bool(x != 0 and (x & (x-1)) == 0) |
97 |
|
98 |
#hack to test green waiting with python gevent.
|
99 |
class posixfd_signal_desc(Structure): |
100 |
pass
|
101 |
posixfd_signal_desc._fields_ = [ |
102 |
('signal_file', c_char * sizeof(c_void_p)),
|
103 |
('fd', c_int),
|
104 |
('flag', c_int),
|
105 |
] |
106 |
|
107 |
def xseg_wait_signal_green(ctx, sd, timeout): |
108 |
posixfd_sd = cast(sd, POINTER(posixfd_signal_desc)) |
109 |
fd = posixfd_sd.contents.fd |
110 |
select([fd], [], [], timeout/1000000.0)
|
111 |
while True: |
112 |
try:
|
113 |
os.read(fd, 512)
|
114 |
except OSError as (e,msg): |
115 |
if e == 11: |
116 |
break
|
117 |
else:
|
118 |
raise OSError(e, msg) |
119 |
|
120 |
|
121 |
class Peer(object): |
122 |
cli_opts = None
|
123 |
|
124 |
def __init__(self, role=None, daemon=True, nr_ops=16, |
125 |
logfile=None, pidfile=None, portno_start=None, |
126 |
portno_end=None, log_level=0, spec=None): |
127 |
if not role: |
128 |
raise Error("Role was not provided") |
129 |
self.role = role
|
130 |
|
131 |
self.nr_ops = nr_ops
|
132 |
if not self.nr_ops > 0: |
133 |
raise Error("Invalid nr_ops for %s" % role) |
134 |
|
135 |
if not is_power2(self.nr_ops): |
136 |
raise Error("nr_ops of %s is not a power of 2" % role) |
137 |
|
138 |
if not self.executable: |
139 |
raise Error("Executable must be provided for %s" % role) |
140 |
|
141 |
if portno_start is None: |
142 |
raise Error("Portno_start must be provided for %s" % role) |
143 |
self.portno_start = portno_start
|
144 |
|
145 |
if portno_end is None: |
146 |
raise Error("Portno_end must be provided for %s" % role) |
147 |
self.portno_end = portno_end
|
148 |
|
149 |
self.daemon = daemon
|
150 |
if not spec: |
151 |
raise Error("Xseg spec was not provided for %s" % role) |
152 |
self.spec = spec
|
153 |
|
154 |
if logfile:
|
155 |
self.logfile = logfile
|
156 |
else:
|
157 |
self.logfile = os.path.join(LOGS_PATH, role + LOG_SUFFIX)
|
158 |
|
159 |
if pidfile:
|
160 |
self.pidfile = pidfile
|
161 |
else:
|
162 |
self.pidfile = os.path.join(PIDFILE_PATH, role + PID_SUFFIX)
|
163 |
|
164 |
try:
|
165 |
if not os.path.isdir(os.path.dirname(self.logfile)): |
166 |
raise Error("Log path %s does not exist" % self.logfile) |
167 |
except:
|
168 |
raise Error("Log path %s does not exist or is not a directory" % |
169 |
self.logfile)
|
170 |
|
171 |
try:
|
172 |
os.makedirs(os.path.dirname(self.pidfile))
|
173 |
except OSError as e: |
174 |
if e.errno == errno.EEXIST:
|
175 |
if os.path.isdir(os.path.dirname(self.pidfile)): |
176 |
pass
|
177 |
else:
|
178 |
raise Error("Pid path %s is not a directory" % |
179 |
os.path.dirname(self.pidfile))
|
180 |
else:
|
181 |
raise Error("Cannot create path %s" % |
182 |
os.path.dirname(self.pidfile))
|
183 |
|
184 |
self.log_level = log_level
|
185 |
|
186 |
if self.log_level < 0 or self.log_level > 3: |
187 |
raise Error("%s: Invalid log level %d" % |
188 |
(self.role, self.log_level)) |
189 |
|
190 |
if self.cli_opts is None: |
191 |
self.cli_opts = []
|
192 |
self.set_cli_options()
|
193 |
|
194 |
def start(self): |
195 |
if self.get_pid(): |
196 |
raise Error("Peer has valid pidfile") |
197 |
cmd = [os.path.join(BIN_DIR, self.executable)] + self.cli_opts |
198 |
try:
|
199 |
check_call(cmd, shell=False)
|
200 |
except Exception as e: |
201 |
raise Error("Cannot start %s: %s" % (self.role, str(e))) |
202 |
|
203 |
def stop(self): |
204 |
pid = self.get_pid()
|
205 |
if not pid: |
206 |
raise Error("Peer %s not running" % self.role) |
207 |
|
208 |
if self.__is_running(pid): |
209 |
os.kill(pid, signal.SIGTERM) |
210 |
|
211 |
def __is_running(self, pid): |
212 |
name = self.executable
|
213 |
for p in psutil.process_iter(): |
214 |
if p.name[0:len(name)] == name and pid == p.pid: |
215 |
return True |
216 |
|
217 |
return False |
218 |
|
219 |
def is_running(self): |
220 |
pid = self.get_pid()
|
221 |
if not pid: |
222 |
return False |
223 |
|
224 |
if not self.__is_running(pid): |
225 |
raise Error("Peer %s has valid pidfile but is not running" % |
226 |
self.role)
|
227 |
|
228 |
return True |
229 |
|
230 |
def get_pid(self): |
231 |
if not self.pidfile: |
232 |
return None |
233 |
|
234 |
pf = None
|
235 |
try:
|
236 |
pf = open(self.pidfile, "r") |
237 |
pid = int(pf.read())
|
238 |
pf.close() |
239 |
except:
|
240 |
if pf:
|
241 |
pf.close() |
242 |
return None |
243 |
|
244 |
return pid
|
245 |
|
246 |
def set_cli_options(self): |
247 |
if self.daemon: |
248 |
self.cli_opts.append("-d") |
249 |
if self.nr_ops: |
250 |
self.cli_opts.append("-n") |
251 |
self.cli_opts.append(str(self.nr_ops)) |
252 |
if self.logfile: |
253 |
self.cli_opts.append("-l") |
254 |
self.cli_opts.append(self.logfile) |
255 |
if self.pidfile: |
256 |
self.cli_opts.append("--pidfile") |
257 |
self.cli_opts.append(self.pidfile) |
258 |
if self.portno_start is not None: |
259 |
self.cli_opts.append("-sp") |
260 |
self.cli_opts.append(str(self.portno_start)) |
261 |
if self.portno_end is not None: |
262 |
self.cli_opts.append("-ep") |
263 |
self.cli_opts.append(str(self.portno_end)) |
264 |
if self.log_level is not None: |
265 |
self.cli_opts.append("-v") |
266 |
self.cli_opts.append(str(self.log_level)) |
267 |
if self.spec: |
268 |
self.cli_opts.append("-g") |
269 |
self.cli_opts.append(self.spec) |
270 |
|
271 |
|
272 |
class MTpeer(Peer): |
273 |
def __init__(self, nr_threads=1, **kwargs): |
274 |
self.nr_threads = nr_threads
|
275 |
super(MTpeer, self).__init__(**kwargs) |
276 |
|
277 |
if self.cli_opts is None: |
278 |
self.cli_opts = []
|
279 |
self.set_mtcli_options()
|
280 |
|
281 |
def set_mtcli_options(self): |
282 |
self.cli_opts.append("-t") |
283 |
self.cli_opts.append(str(self.nr_threads)) |
284 |
|
285 |
|
286 |
class Sosd(MTpeer): |
287 |
def __init__(self, pool=None, **kwargs): |
288 |
self.executable = RADOS_BLOCKER
|
289 |
self.pool = pool
|
290 |
super(Sosd, self).__init__(**kwargs) |
291 |
|
292 |
if self.cli_opts is None: |
293 |
self.cli_opts = []
|
294 |
self.set_sosd_cli_options()
|
295 |
|
296 |
def set_sosd_cli_options(self): |
297 |
if self.pool: |
298 |
self.cli_opts.append("--pool") |
299 |
self.cli_opts.append(self.pool) |
300 |
|
301 |
|
302 |
class Filed(MTpeer): |
303 |
def __init__(self, archip_dir=None, prefix=None, fdcache=None, |
304 |
unique_str=None, nr_threads=1, nr_ops=16, **kwargs): |
305 |
self.executable = FILE_BLOCKER
|
306 |
self.archip_dir = archip_dir
|
307 |
self.prefix = prefix
|
308 |
self.fdcache = fdcache
|
309 |
self.unique_str = unique_str
|
310 |
nr_threads = nr_ops |
311 |
if self.fdcache and fdcache < 2*nr_threads: |
312 |
raise Error("Fdcache should be greater than 2*nr_threads") |
313 |
|
314 |
super(Filed, self).__init__(nr_threads=nr_threads, nr_ops=nr_ops, **kwargs) |
315 |
|
316 |
if not self.archip_dir: |
317 |
raise Error("%s: Archip dir must be set" % self.role) |
318 |
if not os.path.isdir(self.archip_dir): |
319 |
raise Error("%s: Archip dir invalid" % self.role) |
320 |
if not self.fdcache: |
321 |
self.fdcache = 2*self.nr_ops |
322 |
if not self.unique_str: |
323 |
self.unique_str = hostname + '_' + str(self.portno_start) |
324 |
|
325 |
if self.cli_opts is None: |
326 |
self.cli_opts = []
|
327 |
self.set_filed_cli_options()
|
328 |
|
329 |
def set_filed_cli_options(self): |
330 |
if self.unique_str: |
331 |
self.cli_opts.append("--uniquestr") |
332 |
self.cli_opts.append(self.unique_str) |
333 |
if self.fdcache: |
334 |
self.cli_opts.append("--fdcache") |
335 |
self.cli_opts.append(str(self.fdcache)) |
336 |
if self.archip_dir: |
337 |
self.cli_opts.append("--archip") |
338 |
self.cli_opts.append(self.archip_dir) |
339 |
if self.prefix: |
340 |
self.cli_opts.append("--prefix") |
341 |
self.cli_opts.append(self.prefix) |
342 |
|
343 |
|
344 |
class Mapperd(Peer): |
345 |
def __init__(self, blockerm_port=None, blockerb_port=None, **kwargs): |
346 |
self.executable = MAPPER
|
347 |
if blockerm_port is None: |
348 |
raise Error("blockerm_port must be provied for %s" % role) |
349 |
self.blockerm_port = blockerm_port
|
350 |
|
351 |
if blockerb_port is None: |
352 |
raise Error("blockerb_port must be provied for %s" % role) |
353 |
self.blockerb_port = blockerb_port
|
354 |
super(Mapperd, self).__init__(**kwargs) |
355 |
|
356 |
if self.cli_opts is None: |
357 |
self.cli_opts = []
|
358 |
self.set_mapperd_cli_options()
|
359 |
|
360 |
def set_mapperd_cli_options(self): |
361 |
if self.blockerm_port is not None: |
362 |
self.cli_opts.append("-mbp") |
363 |
self.cli_opts.append(str(self.blockerm_port)) |
364 |
if self.blockerb_port is not None: |
365 |
self.cli_opts.append("-bp") |
366 |
self.cli_opts.append(str(self.blockerb_port)) |
367 |
|
368 |
|
369 |
class Vlmcd(Peer): |
370 |
def __init__(self, blocker_port=None, mapper_port=None, **kwargs): |
371 |
self.executable = VLMC
|
372 |
if blocker_port is None: |
373 |
raise Error("blocker_port must be provied for %s" % role) |
374 |
self.blocker_port = blocker_port
|
375 |
|
376 |
if mapper_port is None: |
377 |
raise Error("mapper_port must be provied for %s" % role) |
378 |
self.mapper_port = mapper_port
|
379 |
super(Vlmcd, self).__init__(**kwargs) |
380 |
|
381 |
if self.cli_opts is None: |
382 |
self.cli_opts = []
|
383 |
self.set_vlmcd_cli_opts()
|
384 |
|
385 |
def set_vlmcd_cli_opts(self): |
386 |
if self.blocker_port is not None: |
387 |
self.cli_opts.append("-bp") |
388 |
self.cli_opts.append(str(self.blocker_port)) |
389 |
if self.mapper_port is not None: |
390 |
self.cli_opts.append("-mp") |
391 |
self.cli_opts.append(str(self.mapper_port)) |
392 |
|
393 |
|
394 |
config = { |
395 |
'CEPH_CONF_FILE': '/etc/ceph/ceph.conf', |
396 |
# 'SPEC': "segdev:xsegbd:1024:5120:12",
|
397 |
'SEGMENT_TYPE': 'segdev', |
398 |
'SEGMENT_NAME': 'xsegbd', |
399 |
'SEGMENT_PORTS': 1024, |
400 |
'SEGMENT_SIZE': 5120, |
401 |
'SEGMENT_ALIGNMENT': 12, |
402 |
'XSEGBD_START': 0, |
403 |
'XSEGBD_END': 499, |
404 |
'VTOOL_START': 1003, |
405 |
'VTOOL_END': 1003, |
406 |
#RESERVED 1023
|
407 |
} |
408 |
|
409 |
FIRST_COLUMN_WIDTH = 23
|
410 |
SECOND_COLUMN_WIDTH = 23
|
411 |
|
412 |
|
413 |
def green(s): |
414 |
return '\x1b[32m' + str(s) + '\x1b[0m' |
415 |
|
416 |
|
417 |
def red(s): |
418 |
return '\x1b[31m' + str(s) + '\x1b[0m' |
419 |
|
420 |
|
421 |
def yellow(s): |
422 |
return '\x1b[33m' + str(s) + '\x1b[0m' |
423 |
|
424 |
|
425 |
def pretty_print(cid, status): |
426 |
sys.stdout.write(cid.ljust(FIRST_COLUMN_WIDTH)) |
427 |
sys.stdout.write(status.ljust(SECOND_COLUMN_WIDTH)) |
428 |
sys.stdout.write('\n')
|
429 |
return
|
430 |
|
431 |
|
432 |
class Error(Exception): |
433 |
def __init__(self, msg): |
434 |
self.msg = msg
|
435 |
|
436 |
def __str__(self): |
437 |
return self.msg |
438 |
|
439 |
class Segment(object): |
440 |
type = 'segdev'
|
441 |
name = 'xsegbd'
|
442 |
ports = 1024
|
443 |
size = 5120
|
444 |
alignment = 12
|
445 |
|
446 |
spec = None
|
447 |
|
448 |
def __init__(self, type, name, ports, size, align=12): |
449 |
initialize_xseg() |
450 |
self.type = type |
451 |
self.name = name
|
452 |
self.ports = ports
|
453 |
self.size = size
|
454 |
self.alignment = align
|
455 |
|
456 |
if self.type not in valid_segment_types: |
457 |
raise Error("Segment type not valid") |
458 |
if self.alignment != 12: |
459 |
raise Error("Wrong alignemt") |
460 |
|
461 |
self.spec = self.get_spec() |
462 |
|
463 |
def get_spec(self): |
464 |
if not self.spec: |
465 |
params = [self.type, self.name, str(self.ports), str(self.size), |
466 |
str(self.alignment)] |
467 |
self.spec = ':'.join(params).encode() |
468 |
return self.spec |
469 |
|
470 |
def create(self): |
471 |
#fixme blocking....
|
472 |
xconf = xseg_config() |
473 |
c_spec = create_string_buffer(self.spec)
|
474 |
xseg_parse_spec(c_spec, xconf) |
475 |
r = xseg_create(xconf) |
476 |
if r < 0: |
477 |
raise Error("Cannot create segment") |
478 |
|
479 |
def destroy(self): |
480 |
#fixme blocking....
|
481 |
try:
|
482 |
xseg = self.join()
|
483 |
xseg_leave(xseg) |
484 |
xseg_destroy(xseg) |
485 |
except Exception: |
486 |
raise Error("Cannot destroy segment") |
487 |
|
488 |
def join(self): |
489 |
xconf = xseg_config() |
490 |
spec_buf = create_string_buffer(self.spec)
|
491 |
xseg_parse_spec(spec_buf, xconf) |
492 |
ctx = xseg_join(xconf.type, xconf.name, "posixfd",
|
493 |
cast(0, cb_null_ptrtype))
|
494 |
if not ctx: |
495 |
raise Error("Cannot join segment") |
496 |
|
497 |
return ctx
|
498 |
|
499 |
def check_conf(): |
500 |
port_ranges = [] |
501 |
|
502 |
def isExec(file_path): |
503 |
return os.path.isfile(file_path) and os.access(file_path, os.X_OK) |
504 |
|
505 |
def validExec(program): |
506 |
for path in os.environ["PATH"].split(os.pathsep): |
507 |
exe_file = os.path.join(path, program) |
508 |
if isExec(exe_file):
|
509 |
return True |
510 |
return False |
511 |
|
512 |
def validatePort(portno, limit): |
513 |
if portno >= limit:
|
514 |
raise Error("Portno %d out of range" % portno) |
515 |
|
516 |
def validatePortRange(portno_start, portno_end, limit): |
517 |
validatePort(portno_start, limit) |
518 |
validatePort(portno_end, limit) |
519 |
if portno_start > portno_end:
|
520 |
raise Error("Portno_start > Portno_end: %d > %d " % |
521 |
(portno_start, portno_end)) |
522 |
for start, end in port_ranges: |
523 |
if not (portno_end < start or portno_start > end): |
524 |
raise Error("Port range conflict: (%d, %d) confilcts with (%d, %d)" % |
525 |
(portno_start, portno_end, start, end)) |
526 |
port_ranges.append((portno_start, portno_end)) |
527 |
|
528 |
xseg_type = config['SEGMENT_TYPE']
|
529 |
xseg_name = config['SEGMENT_NAME']
|
530 |
xseg_ports = config['SEGMENT_PORTS']
|
531 |
xseg_size = config['SEGMENT_SIZE']
|
532 |
xseg_align = config['SEGMENT_ALIGNMENT']
|
533 |
|
534 |
global segment
|
535 |
segment = Segment(xseg_type, xseg_name, xseg_ports, xseg_size, xseg_align) |
536 |
|
537 |
|
538 |
try:
|
539 |
if not config['roles']: |
540 |
raise Error("Roles setup must be provided") |
541 |
except KeyError: |
542 |
raise Error("Roles setup must be provided") |
543 |
|
544 |
for role, role_type in config['roles']: |
545 |
if role_type not in valid_role_types: |
546 |
raise Error("%s is not a valid role" % role_type) |
547 |
try:
|
548 |
role_config = config[role] |
549 |
except:
|
550 |
raise Error("No config found for %s" % role) |
551 |
|
552 |
if role_type == 'file_blocker': |
553 |
peers[role] = Filed(role=role, spec=segment.get_spec(), |
554 |
prefix=ARCHIP_PREFIX, **role_config) |
555 |
elif role_type == 'rados_blocker': |
556 |
peers[role] = Sosd(role=role, spec=segment.get_spec(), |
557 |
**role_config) |
558 |
elif role_type == 'mapperd': |
559 |
peers[role] = Mapperd(role=role, spec=segment.get_spec(), |
560 |
**role_config) |
561 |
elif role_type == 'vlmcd': |
562 |
peers[role] = Vlmcd(role=role, spec=segment.get_spec(), |
563 |
**role_config) |
564 |
else:
|
565 |
raise Error("No valid peer type: %s" % role_type) |
566 |
validatePortRange(peers[role].portno_start, peers[role].portno_end, |
567 |
xseg_ports) |
568 |
|
569 |
validatePortRange(config['VTOOL_START'], config['VTOOL_END'], xseg_ports) |
570 |
validatePortRange(config['XSEGBD_START'], config['XSEGBD_END'], |
571 |
xseg_ports) |
572 |
|
573 |
xsegbd_range = config['XSEGBD_END'] - config['XSEGBD_START'] |
574 |
vlmcd_range = peers['vlmcd'].portno_end - peers['vlmcd'].portno_start |
575 |
if xsegbd_range > vlmcd_range:
|
576 |
raise Error("Xsegbd port range must be smaller that vlmcd port range") |
577 |
return True |
578 |
|
579 |
def get_segment(): |
580 |
return segment
|
581 |
|
582 |
def construct_peers(): |
583 |
return peers
|
584 |
|
585 |
vtool_port = None
|
586 |
def get_vtool_port(): |
587 |
global vtool_port
|
588 |
if vtool_port is None: |
589 |
vtool_port = random.randint(config['VTOOL_START'], config['VTOOL_END']) |
590 |
return vtool_port
|
591 |
|
592 |
acquired_locks = {} |
593 |
|
594 |
def get_lock(lock_file): |
595 |
while True: |
596 |
try:
|
597 |
fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY) |
598 |
break
|
599 |
except OSError, (err, reason): |
600 |
print >> sys.stderr, lock_file, reason
|
601 |
if err == errno.EEXIST:
|
602 |
time.sleep(0.2)
|
603 |
else:
|
604 |
raise OSError(err, lock_file + ' ' + reason) |
605 |
return fd
|
606 |
|
607 |
def exclusive(get_port=False): |
608 |
def wrap(fn): |
609 |
def lock(*args, **kwargs): |
610 |
if not os.path.exists(LOCK_PATH): |
611 |
try:
|
612 |
os.mkdir(LOCK_PATH) |
613 |
except OSError, (err, reason): |
614 |
print >> sys.stderr, reason
|
615 |
|
616 |
if not os.path.isdir(LOCK_PATH): |
617 |
raise Error("Locking error: %s is not a directory" % LOCK_PATH) |
618 |
|
619 |
if get_port:
|
620 |
vtool_port = get_vtool_port() |
621 |
lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE + '_' + str(vtool_port)) |
622 |
else:
|
623 |
lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE) |
624 |
try:
|
625 |
depth = acquired_locks[lock_file] |
626 |
if depth == 0: |
627 |
fd = get_lock(lock_file) |
628 |
except KeyError: |
629 |
acquired_locks[lock_file] = 0
|
630 |
fd = get_lock(lock_file) |
631 |
|
632 |
acquired_locks[lock_file] += 1
|
633 |
try:
|
634 |
r = fn(*args, **kwargs) |
635 |
finally:
|
636 |
acquired_locks[lock_file] -= 1
|
637 |
depth = acquired_locks[lock_file] |
638 |
if depth == 0: |
639 |
os.close(fd) |
640 |
os.unlink(lock_file) |
641 |
return r
|
642 |
|
643 |
return lock
|
644 |
return wrap
|
645 |
|
646 |
def createBDict(cfg, section): |
647 |
sec_dic = {} |
648 |
sec_dic['portno_start'] = cfg.getint(section, 'portno_start') |
649 |
sec_dic['portno_end'] = cfg.getint(section, 'portno_end') |
650 |
sec_dic['log_level'] = cfg.getint(section, 'log_level') |
651 |
sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops') |
652 |
try:
|
653 |
sec_dic['nr_threads'] = cfg.getint(section, 'nr_threads') |
654 |
sec_dic['archip_dir'] = cfg.get(section, 'archip_dir') |
655 |
sec_dic['fdcache'] = cfg.getint(section, 'fdcache') |
656 |
except:
|
657 |
sec_dic['pool'] = cfg.get(section, 'pool') |
658 |
return sec_dic
|
659 |
|
660 |
def createMDict(cfg, section): |
661 |
sec_dic = {} |
662 |
sec_dic['portno_start'] = cfg.getint(section, 'portno_start') |
663 |
sec_dic['portno_end'] = cfg.getint(section, 'portno_end') |
664 |
sec_dic['log_level'] = cfg.getint(section, 'log_level') |
665 |
sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops') |
666 |
sec_dic['blockerb_port'] = cfg.getint(section, 'blockerb_port') |
667 |
sec_dic['blockerm_port'] = cfg.getint(section, 'blockerm_port') |
668 |
return sec_dic
|
669 |
|
670 |
def createVDict(cfg, section): |
671 |
sec_dic = {} |
672 |
sec_dic['portno_start'] = cfg.getint(section, 'portno_start') |
673 |
sec_dic['portno_end'] = cfg.getint(section, 'portno_end') |
674 |
sec_dic['log_level'] = cfg.getint(section, 'log_level') |
675 |
sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops') |
676 |
sec_dic['blocker_port'] = cfg.getint(section, 'blocker_port') |
677 |
sec_dic['mapper_port'] = cfg.getint(section, 'mapper_port') |
678 |
return sec_dic
|
679 |
|
680 |
|
681 |
def loadrc(rc): |
682 |
try:
|
683 |
if rc is None: |
684 |
cfg_dir = os.path.expanduser(DEFAULTS) |
685 |
else:
|
686 |
cfg_dir = rc |
687 |
cfg_fd = open(cfg_dir)
|
688 |
except:
|
689 |
raise Error("Cannot read config file") |
690 |
|
691 |
cfg = ConfigParser.ConfigParser() |
692 |
cfg.readfp(cfg_fd) |
693 |
config['SEGMENT_PORTS'] = cfg.getint('XSEG','SEGMENT_PORTS') |
694 |
config['SEGMENT_SIZE'] = cfg.getint('XSEG','SEGMENT_SIZE') |
695 |
config['XSEGBD_START'] = cfg.getint('XSEG','XSEGBD_START') |
696 |
config['XSEGBD_END'] = cfg.getint('XSEG','XSEGBD_END') |
697 |
config['VTOOL_START'] = cfg.getint('XSEG','VTOOL_START') |
698 |
config['VTOOL_END'] = cfg.getint('XSEG','VTOOL_END') |
699 |
config['roles'] = eval(cfg.get('ROLES','order')) |
700 |
config['blockerb'] = createBDict(cfg, 'BLOCKERB') |
701 |
config['blockerm'] = createBDict(cfg, 'BLOCKERM') |
702 |
config['mapperd'] = createMDict(cfg, 'MAPPERD') |
703 |
config['vlmcd'] = createVDict(cfg, 'VLMCD') |
704 |
|
705 |
if not check_conf(): |
706 |
raise Error("Invalid conf file") |
707 |
|
708 |
|
709 |
def loaded_modules(): |
710 |
lines = open("/proc/modules").read().split("\n") |
711 |
modules = [f.split(" ")[0] for f in lines] |
712 |
return modules
|
713 |
|
714 |
|
715 |
def loaded_module(name): |
716 |
return name in loaded_modules() |
717 |
|
718 |
|
719 |
def load_module(name, args): |
720 |
s = "Loading %s " % name
|
721 |
sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH)) |
722 |
modules = loaded_modules() |
723 |
if name in modules: |
724 |
sys.stdout.write(yellow("Already loaded".ljust(SECOND_COLUMN_WIDTH)))
|
725 |
sys.stdout.write("\n")
|
726 |
return
|
727 |
cmd = ["modprobe", "%s" % name] |
728 |
if args:
|
729 |
for arg in args: |
730 |
cmd.extend(["%s=%s" % (arg)])
|
731 |
try:
|
732 |
check_call(cmd, shell=False)
|
733 |
except Exception: |
734 |
sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
|
735 |
sys.stdout.write("\n")
|
736 |
raise Error("Cannot load module %s. Check system logs" % name) |
737 |
sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
|
738 |
sys.stdout.write("\n")
|
739 |
|
740 |
|
741 |
def unload_module(name): |
742 |
s = "Unloading %s " % name
|
743 |
sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH)) |
744 |
modules = loaded_modules() |
745 |
if name not in modules: |
746 |
sys.stdout.write(yellow("Not loaded".ljust(SECOND_COLUMN_WIDTH)))
|
747 |
sys.stdout.write("\n")
|
748 |
return
|
749 |
cmd = ["modprobe -r %s" % name]
|
750 |
try:
|
751 |
check_call(cmd, shell=True)
|
752 |
except Exception: |
753 |
sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
|
754 |
sys.stdout.write("\n")
|
755 |
raise Error("Cannot unload module %s. Check system logs" % name) |
756 |
sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
|
757 |
sys.stdout.write("\n")
|
758 |
|
759 |
xseg_initialized = False
|
760 |
|
761 |
|
762 |
def initialize_xseg(): |
763 |
global xseg_initialized
|
764 |
if not xseg_initialized: |
765 |
xseg_initialize() |
766 |
xseg_initialized = True
|
767 |
|
768 |
|
769 |
def check_running(name, pid=None): |
770 |
for p in psutil.process_iter(): |
771 |
if p.name[0:len(name)] == name: |
772 |
if pid:
|
773 |
if pid == p.pid:
|
774 |
return pid
|
775 |
else:
|
776 |
return pid
|
777 |
return None |
778 |
|
779 |
|
780 |
def check_pidfile(name): |
781 |
pidfile = os.path.join(PIDFILE_PATH, name + PID_SUFFIX) |
782 |
pf = None
|
783 |
try:
|
784 |
pf = open(pidfile, "r") |
785 |
pid = int(pf.read())
|
786 |
pf.close() |
787 |
except:
|
788 |
if pf:
|
789 |
pf.close() |
790 |
return -1 |
791 |
|
792 |
return pid
|
793 |
|
794 |
class Xseg_ctx(object): |
795 |
ctx = None
|
796 |
port = None
|
797 |
portno = None
|
798 |
signal_desc = None
|
799 |
|
800 |
def __init__(self, segment, portno): |
801 |
ctx = segment.join() |
802 |
if not ctx: |
803 |
raise Error("Cannot join segment") |
804 |
port = xseg_bind_port(ctx, portno, c_void_p(0))
|
805 |
if not port: |
806 |
raise Error("Cannot bind to port") |
807 |
sd = xseg_get_signal_desc_nonstatic(ctx, port) |
808 |
if not sd: |
809 |
raise Error("Cannot get signal descriptor") |
810 |
xseg_init_local_signal(ctx, portno) |
811 |
self.ctx = ctx
|
812 |
self.port = port
|
813 |
self.portno = portno
|
814 |
self.signal_desc = sd
|
815 |
|
816 |
def __del__(self): |
817 |
return
|
818 |
|
819 |
def __enter__(self): |
820 |
if not self.ctx: |
821 |
raise Error("No segment") |
822 |
return self |
823 |
|
824 |
def __exit__(self, type_, value, traceback): |
825 |
self.shutdown()
|
826 |
return False |
827 |
|
828 |
def shutdown(self): |
829 |
if self.ctx: |
830 |
# xseg_quit_local_signal(self.ctx, self.portno)
|
831 |
xseg_leave(self.ctx)
|
832 |
self.ctx = None |
833 |
|
834 |
def wait_request(self): |
835 |
xseg_prepare_wait(self.ctx, self.portno) |
836 |
while True: |
837 |
received = xseg_receive(self.ctx, self.portno, 0) |
838 |
if received:
|
839 |
xseg_cancel_wait(self.ctx, self.portno) |
840 |
return received
|
841 |
else:
|
842 |
xseg_wait_signal_green(self.ctx, self.signal_desc, 10000000) |
843 |
|
844 |
def wait_requests(self, requests): |
845 |
while True: |
846 |
received = self.wait_request()
|
847 |
for req in requests: |
848 |
xseg_req = req.req |
849 |
if addressof(received.contents) == \
|
850 |
addressof(xseg_req.contents): |
851 |
return req
|
852 |
p = xseg_respond(self.ctx, received, self.portno, X_ALLOC) |
853 |
if p == NoPort:
|
854 |
xseg_put_request(self.ctx, received, self.portno) |
855 |
else:
|
856 |
xseg_signal(self.ctx, p)
|
857 |
|
858 |
|
859 |
class Request(object): |
860 |
xseg_ctx = None
|
861 |
req = None
|
862 |
|
863 |
def __init__(self, xseg_ctx, dst_portno, target, datalen=0, size=0, op=None, |
864 |
data=None, flags=0, offset=0): |
865 |
if not target: |
866 |
raise Error("No target") |
867 |
targetlen = len(target)
|
868 |
if not datalen and data: |
869 |
if isinstance(data, basestring): |
870 |
datalen = len(data)
|
871 |
else:
|
872 |
datalen = sizeof(data) |
873 |
|
874 |
ctx = xseg_ctx.ctx |
875 |
if not ctx: |
876 |
raise Error("No context") |
877 |
req = xseg_get_request(ctx, xseg_ctx.portno, dst_portno, X_ALLOC) |
878 |
if not req: |
879 |
raise Error("Cannot get request") |
880 |
r = xseg_prep_request(ctx, req, targetlen, datalen) |
881 |
if r < 0: |
882 |
xseg_put_request(ctx, req, xseg_ctx.portno) |
883 |
raise Error("Cannot prepare request") |
884 |
self.req = req
|
885 |
self.xseg_ctx = xseg_ctx
|
886 |
|
887 |
if not self.set_target(target): |
888 |
self.put()
|
889 |
raise Error("Cannot set target") |
890 |
|
891 |
if (data):
|
892 |
if not self.set_data(data): |
893 |
self.put()
|
894 |
raise Error("Cannot set data") |
895 |
|
896 |
self.set_size(size)
|
897 |
self.set_op(op)
|
898 |
self.set_flags(flags)
|
899 |
self.set_offset(offset)
|
900 |
|
901 |
return
|
902 |
|
903 |
def __enter__(self): |
904 |
if not self.req: |
905 |
raise Error("xseg request not set") |
906 |
return self |
907 |
|
908 |
def __exit__(self, type_, value, traceback): |
909 |
self.put()
|
910 |
self.req = None |
911 |
return False |
912 |
|
913 |
def put(self, force=False): |
914 |
if not self.req: |
915 |
return False; |
916 |
if not force: |
917 |
if xq_count(byref(self.req.contents.path)) > 0: |
918 |
return False |
919 |
xseg_put_request(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno) |
920 |
self.req = None |
921 |
return True |
922 |
|
923 |
def get_datalen(self): |
924 |
return self.req.contents.datalen |
925 |
|
926 |
def set_op(self, op): |
927 |
self.req.contents.op = op
|
928 |
|
929 |
def get_op(self): |
930 |
return self.req.contents.op |
931 |
|
932 |
def set_offset(self, offset): |
933 |
self.req.contents.offset = offset
|
934 |
|
935 |
def get_offset(self): |
936 |
return self.req.contents.offset |
937 |
|
938 |
def get_size(self): |
939 |
return self.req.contents.size |
940 |
|
941 |
def set_size(self, size): |
942 |
self.req.contents.size = size
|
943 |
|
944 |
def get_serviced(self): |
945 |
return self.req.contents.serviced |
946 |
|
947 |
def set_serviced(self, serviced): |
948 |
self.req.contents.serviced = serviced
|
949 |
|
950 |
def set_flags(self, flags): |
951 |
self.req.contents.flags = flags
|
952 |
|
953 |
def get_flags(self): |
954 |
return self.req.contents.flags |
955 |
|
956 |
def set_target(self, target): |
957 |
"""Sets the target of the request, respecting request's targetlen"""
|
958 |
if len(target) != self.req.contents.targetlen: |
959 |
return False |
960 |
c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req) |
961 |
p_target = create_string_buffer(target) |
962 |
# print hex(addressof(c_target.contents))
|
963 |
memmove(c_target, p_target, len(target))
|
964 |
return True |
965 |
|
966 |
def get_target(self): |
967 |
"""Return a string to the target of the request"""
|
968 |
c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req) |
969 |
# print "target_addr " + str(addressof(c_target.contents))
|
970 |
return string_at(c_target, self.req.contents.targetlen) |
971 |
|
972 |
def set_data(self, data): |
973 |
"""Sets requests data. Data should be a xseg protocol structure"""
|
974 |
if isinstance(data, basestring): |
975 |
if len(data) != self.req.contents.datalen: |
976 |
return False |
977 |
p_data = create_string_buffer(data) |
978 |
else:
|
979 |
if sizeof(data) != self.req.contents.datalen: |
980 |
return False |
981 |
p_data = pointer(data) |
982 |
c_data = xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req) |
983 |
memmove(c_data, p_data, self.req.contents.datalen)
|
984 |
|
985 |
return True |
986 |
|
987 |
def get_data(self, _type=None): |
988 |
"""return a pointer to the data buffer of the request, casted to the
|
989 |
selected type"""
|
990 |
# print "data addr " + str(addressof(xseg_get_data_nonstatic(\
|
991 |
# self.xseg_ctx.ctx, self.req).contents))
|
992 |
# ret = cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
|
993 |
# _type)
|
994 |
# print addressof(ret.contents)
|
995 |
# return ret
|
996 |
if _type:
|
997 |
return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req), |
998 |
POINTER(_type)) |
999 |
else:
|
1000 |
return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req), |
1001 |
c_void_p) |
1002 |
|
1003 |
def submit(self): |
1004 |
"""Submit the associated xseg_request"""
|
1005 |
p = xseg_submit(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno, |
1006 |
X_ALLOC) |
1007 |
if p == NoPort:
|
1008 |
raise Exception |
1009 |
xseg_signal(self.xseg_ctx.ctx, p)
|
1010 |
|
1011 |
def wait(self): |
1012 |
"""Wait until the associated xseg_request is responded, discarding any
|
1013 |
other requests that may be received in the meantime"""
|
1014 |
self.xseg_ctx.wait_requests([self]) |
1015 |
|
1016 |
def success(self): |
1017 |
if not bool(self.req.contents.state & XS_SERVED) and not \ |
1018 |
bool(self.req.contents.state & XS_FAILED): |
1019 |
raise Error("Request not completed, nor Failed") |
1020 |
return bool((self.req.contents.state & XS_SERVED) and not \ |
1021 |
(self.req.contents.state & XS_FAILED))
|
1022 |
|
1023 |
@classmethod
|
1024 |
def get_write_request(cls, xseg, dst, target, data=None, offset=0, |
1025 |
datalen=0):
|
1026 |
if data is None: |
1027 |
data = ""
|
1028 |
size = len(data)
|
1029 |
if not datalen: |
1030 |
datalen = size |
1031 |
|
1032 |
return cls(xseg, dst, target, op=X_WRITE, data=data, offset=offset,
|
1033 |
size=size, datalen=datalen) |
1034 |
|
1035 |
@classmethod
|
1036 |
def get_read_request(cls, xseg, dst, target, size=0, offset=0, datalen=0): |
1037 |
if not datalen: |
1038 |
datalen=size |
1039 |
return cls(xseg, dst, target, op=X_READ, offset=offset, size=size,
|
1040 |
datalen=datalen) |
1041 |
|
1042 |
@classmethod
|
1043 |
def get_info_request(cls, xseg, dst, target): |
1044 |
return cls(xseg, dst, target, op=X_INFO)
|
1045 |
|
1046 |
@classmethod
|
1047 |
def get_copy_request(cls, xseg, dst, target, copy_target=None, size=0, offset=0): |
1048 |
datalen = sizeof(xseg_request_copy) |
1049 |
xcopy = xseg_request_copy() |
1050 |
xcopy.target = target |
1051 |
xcopy.targetlen = len(target)
|
1052 |
return cls(xseg, dst, copy_target, op=X_COPY, data=xcopy, datalen=datalen,
|
1053 |
size=size, offset=offset) |
1054 |
@classmethod
|
1055 |
def get_acquire_request(cls, xseg, dst, target, wait=False): |
1056 |
flags = 0
|
1057 |
if not wait: |
1058 |
flags = XF_NOSYNC |
1059 |
return cls(xseg, dst, target, op=X_ACQUIRE, flags=flags)
|
1060 |
|
1061 |
@classmethod
|
1062 |
def get_release_request(cls, xseg, dst, target, force=False): |
1063 |
flags = 0
|
1064 |
if force:
|
1065 |
flags = XF_FORCE |
1066 |
return cls(xseg, dst, target, op=X_RELEASE, flags=flags)
|
1067 |
|
1068 |
@classmethod
|
1069 |
def get_delete_request(cls, xseg, dst, target): |
1070 |
return cls(xseg, dst, target, op=X_DELETE)
|
1071 |
|
1072 |
@classmethod
|
1073 |
def get_clone_request(cls, xseg, dst, target, clone=None, clone_size=0, |
1074 |
cont_addr=False):
|
1075 |
datalen = sizeof(xseg_request_clone) |
1076 |
xclone = xseg_request_clone() |
1077 |
xclone.target = target |
1078 |
xclone.targetlen= len(target)
|
1079 |
xclone.size = clone_size |
1080 |
|
1081 |
flags = 0
|
1082 |
if cont_addr:
|
1083 |
flags = XF_CONTADDR |
1084 |
|
1085 |
return cls(xseg, dst, clone, op=X_CLONE, data=xclone, datalen=datalen,
|
1086 |
flags=flags) |
1087 |
|
1088 |
@classmethod
|
1089 |
def get_open_request(cls, xseg, dst, target): |
1090 |
return cls(xseg, dst, target, op=X_OPEN)
|
1091 |
|
1092 |
@classmethod
|
1093 |
def get_close_request(cls, xseg, dst, target): |
1094 |
return cls(xseg, dst, target, op=X_CLOSE)
|
1095 |
|
1096 |
@classmethod
|
1097 |
def get_snapshot_request(cls, xseg, dst, target, snap=None): |
1098 |
datalen = sizeof(xseg_request_snapshot) |
1099 |
xsnapshot = xseg_request_snapshot() |
1100 |
xsnapshot.target = snap |
1101 |
xsnapshot.targetlen= len(snap)
|
1102 |
|
1103 |
return cls(xseg, dst, target, op=X_SNAPSHOT, data=xsnapshot,
|
1104 |
datalen=datalen) |
1105 |
|
1106 |
@classmethod
|
1107 |
def get_mapr_request(cls, xseg, dst, target, offset=0, size=0): |
1108 |
return cls(xseg, dst, target, op=X_MAPR, offset=offset, size=size,
|
1109 |
datalen=0)
|
1110 |
|
1111 |
@classmethod
|
1112 |
def get_mapw_request(cls, xseg, dst, target, offset=0, size=0): |
1113 |
return cls(xseg, dst, target, op=X_MAPW, offset=offset, size=size,
|
1114 |
datalen=0)
|
1115 |
|
1116 |
@classmethod
|
1117 |
def get_hash_request(cls, xseg, dst, target, size=0, offset=0): |
1118 |
return cls(xseg, dst, target, op=X_HASH, size=size, offset=offset)
|