3 # Copyright 2012 GRNET S.A. All rights reserved.
5 # Redistribution and use in source and binary forms, with or
6 # without modification, are permitted provided that the following
9 # 1. Redistributions of source code must retain the above
10 # copyright notice, this list of conditions and the following
13 # 2. Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials
16 # provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
31 # The views and conclusions contained in the software and
32 # documentation are those of the authors and should not be
33 # interpreted as representing official policies, either expressed
34 # or implied, of GRNET S.A.
38 from xseg.xseg_api import *
39 from xseg.xprotocol import *
40 from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \
41 create_string_buffer, pointer, sizeof, POINTER, byref, c_int, c_char, Structure
43 cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
51 from subprocess import check_call
52 from collections import namedtuple
55 from select import select
59 hostname = socket.gethostname()
61 valid_role_types = ['file_blocker', 'rados_blocker', 'mapperd', 'vlmcd']
62 valid_segment_types = ['segdev', 'posix']
67 modules = ['xseg', 'segdev', 'xseg_posix', 'xseg_pthread', 'xseg_segdev']
71 DEFAULTS = '/etc/archipelago/archipelago.conf'
74 ARCHIP_PREFIX = 'archip_'
77 PIDFILE_PATH = "/var/run/archipelago"
78 VLMC_LOCK_FILE = 'vlmc.lock'
79 LOGS_PATH = "/var/log/archipelago"
80 LOCK_PATH = "/var/lock"
81 DEVICE_PREFIX = "/dev/xsegbd"
82 XSEGBD_SYSFS = "/sys/bus/xsegbd/"
84 CHARDEV_NAME = "/dev/segdev"
90 FILE_BLOCKER = 'archip-filed'
91 RADOS_BLOCKER = 'archip-sosd'
92 MAPPER = 'archip-mapperd'
96 return bool(x != 0 and (x & (x-1)) == 0)
98 #hack to test green waiting with python gevent.
99 class posixfd_signal_desc(Structure):
101 posixfd_signal_desc._fields_ = [
102 ('signal_file', c_char * sizeof(c_void_p)),
107 def xseg_wait_signal_green(ctx, sd, timeout):
108 posixfd_sd = cast(sd, POINTER(posixfd_signal_desc))
109 fd = posixfd_sd.contents.fd
110 select([fd], [], [], timeout/1000000.0)
114 except OSError as (e,msg):
118 raise OSError(e, msg)
124 def __init__(self, role=None, daemon=True, nr_ops=16,
125 logfile=None, pidfile=None, portno_start=None,
126 portno_end=None, log_level=0, spec=None, threshold=None):
128 raise Error("Role was not provided")
132 if not self.nr_ops > 0:
133 raise Error("Invalid nr_ops for %s" % role)
135 if not is_power2(self.nr_ops):
136 raise Error("nr_ops of %s is not a power of 2" % role)
138 if not self.executable:
139 raise Error("Executable must be provided for %s" % role)
141 if portno_start is None:
142 raise Error("Portno_start must be provided for %s" % role)
143 self.portno_start = portno_start
145 if portno_end is None:
146 raise Error("Portno_end must be provided for %s" % role)
147 self.portno_end = portno_end
151 raise Error("Xseg spec was not provided for %s" % role)
155 self.logfile = logfile
157 self.logfile = os.path.join(LOGS_PATH, role + LOG_SUFFIX)
160 self.pidfile = pidfile
162 self.pidfile = os.path.join(PIDFILE_PATH, role + PID_SUFFIX)
165 if not os.path.isdir(os.path.dirname(self.logfile)):
166 raise Error("Log path %s does not exist" % self.logfile)
168 raise Error("Log path %s does not exist or is not a directory" %
172 os.makedirs(os.path.dirname(self.pidfile))
174 if e.errno == errno.EEXIST:
175 if os.path.isdir(os.path.dirname(self.pidfile)):
178 raise Error("Pid path %s is not a directory" %
179 os.path.dirname(self.pidfile))
181 raise Error("Cannot create path %s" %
182 os.path.dirname(self.pidfile))
184 self.log_level = log_level
185 self.threshold = threshold
187 if self.log_level < 0 or self.log_level > 3:
188 raise Error("%s: Invalid log level %d" %
189 (self.role, self.log_level))
191 if self.cli_opts is None:
193 self.set_cli_options()
197 raise Error("Peer has valid pidfile")
198 cmd = [os.path.join(BIN_DIR, self.executable)] + self.cli_opts
200 check_call(cmd, shell=False)
201 except Exception as e:
202 raise Error("Cannot start %s: %s" % (self.role, str(e)))
207 raise Error("Peer %s not running" % self.role)
209 if self.__is_running(pid):
210 os.kill(pid, signal.SIGTERM)
212 def __is_running(self, pid):
213 name = self.executable
214 for p in psutil.process_iter():
215 if p.name[0:len(name)] == name and pid == p.pid:
220 def is_running(self):
225 if not self.__is_running(pid):
226 raise Error("Peer %s has valid pidfile but is not running" %
237 pf = open(self.pidfile, "r")
247 def set_cli_options(self):
249 self.cli_opts.append("-d")
251 self.cli_opts.append("-n")
252 self.cli_opts.append(str(self.nr_ops))
254 self.cli_opts.append("-l")
255 self.cli_opts.append(self.logfile)
257 self.cli_opts.append("--pidfile")
258 self.cli_opts.append(self.pidfile)
259 if self.portno_start is not None:
260 self.cli_opts.append("-sp")
261 self.cli_opts.append(str(self.portno_start))
262 if self.portno_end is not None:
263 self.cli_opts.append("-ep")
264 self.cli_opts.append(str(self.portno_end))
265 if self.log_level is not None:
266 self.cli_opts.append("-v")
267 self.cli_opts.append(str(self.log_level))
269 self.cli_opts.append("-g")
270 self.cli_opts.append(self.spec)
272 self.cli_opts.append("--threshold")
273 self.cli_opts.append(str(self.threshold))
277 def __init__(self, nr_threads=1, **kwargs):
278 self.nr_threads = nr_threads
279 super(MTpeer, self).__init__(**kwargs)
281 if self.cli_opts is None:
283 self.set_mtcli_options()
285 def set_mtcli_options(self):
286 self.cli_opts.append("-t")
287 self.cli_opts.append(str(self.nr_threads))
291 def __init__(self, pool=None, **kwargs):
292 self.executable = RADOS_BLOCKER
294 super(Sosd, self).__init__(**kwargs)
296 if self.cli_opts is None:
298 self.set_sosd_cli_options()
300 def set_sosd_cli_options(self):
302 self.cli_opts.append("--pool")
303 self.cli_opts.append(self.pool)
307 def __init__(self, archip_dir=None, prefix=None, fdcache=None,
308 unique_str=None, nr_threads=1, nr_ops=16, direct=True, **kwargs):
309 self.executable = FILE_BLOCKER
310 self.archip_dir = archip_dir
312 self.fdcache = fdcache
313 self.unique_str = unique_str
316 if self.fdcache and fdcache < 2*nr_threads:
317 raise Error("Fdcache should be greater than 2*nr_threads")
319 super(Filed, self).__init__(nr_threads=nr_threads, nr_ops=nr_ops, **kwargs)
321 if not self.archip_dir:
322 raise Error("%s: Archip dir must be set" % self.role)
323 if not os.path.isdir(self.archip_dir):
324 raise Error("%s: Archip dir invalid" % self.role)
326 self.fdcache = 2*self.nr_ops
327 if not self.unique_str:
328 self.unique_str = hostname + '_' + str(self.portno_start)
330 if self.cli_opts is None:
332 self.set_filed_cli_options()
334 def set_filed_cli_options(self):
336 self.cli_opts.append("--uniquestr")
337 self.cli_opts.append(self.unique_str)
339 self.cli_opts.append("--fdcache")
340 self.cli_opts.append(str(self.fdcache))
342 self.cli_opts.append("--archip")
343 self.cli_opts.append(self.archip_dir)
345 self.cli_opts.append("--prefix")
346 self.cli_opts.append(self.prefix)
348 self.cli_opts.append("--directio")
352 def __init__(self, blockerm_port=None, blockerb_port=None, **kwargs):
353 self.executable = MAPPER
354 if blockerm_port is None:
355 raise Error("blockerm_port must be provied for %s" % role)
356 self.blockerm_port = blockerm_port
358 if blockerb_port is None:
359 raise Error("blockerb_port must be provied for %s" % role)
360 self.blockerb_port = blockerb_port
361 super(Mapperd, self).__init__(**kwargs)
363 if self.cli_opts is None:
365 self.set_mapperd_cli_options()
367 def set_mapperd_cli_options(self):
368 if self.blockerm_port is not None:
369 self.cli_opts.append("-mbp")
370 self.cli_opts.append(str(self.blockerm_port))
371 if self.blockerb_port is not None:
372 self.cli_opts.append("-bp")
373 self.cli_opts.append(str(self.blockerb_port))
377 def __init__(self, blocker_port=None, mapper_port=None, **kwargs):
378 self.executable = VLMC
379 if blocker_port is None:
380 raise Error("blocker_port must be provied for %s" % role)
381 self.blocker_port = blocker_port
383 if mapper_port is None:
384 raise Error("mapper_port must be provied for %s" % role)
385 self.mapper_port = mapper_port
386 super(Vlmcd, self).__init__(**kwargs)
388 if self.cli_opts is None:
390 self.set_vlmcd_cli_opts()
392 def set_vlmcd_cli_opts(self):
393 if self.blocker_port is not None:
394 self.cli_opts.append("-bp")
395 self.cli_opts.append(str(self.blocker_port))
396 if self.mapper_port is not None:
397 self.cli_opts.append("-mp")
398 self.cli_opts.append(str(self.mapper_port))
402 'CEPH_CONF_FILE': '/etc/ceph/ceph.conf',
403 # 'SPEC': "segdev:xsegbd:1024:5120:12",
404 'SEGMENT_TYPE': 'segdev',
405 'SEGMENT_NAME': 'xsegbd',
406 'SEGMENT_DYNPORTS': 1024,
407 'SEGMENT_PORTS': 2048,
408 'SEGMENT_SIZE': 5120,
409 'SEGMENT_ALIGNMENT': 12,
417 FIRST_COLUMN_WIDTH = 23
418 SECOND_COLUMN_WIDTH = 23
422 return '\x1b[32m' + str(s) + '\x1b[0m'
426 return '\x1b[31m' + str(s) + '\x1b[0m'
430 return '\x1b[33m' + str(s) + '\x1b[0m'
433 def pretty_print(cid, status):
434 sys.stdout.write(cid.ljust(FIRST_COLUMN_WIDTH))
435 sys.stdout.write(status.ljust(SECOND_COLUMN_WIDTH))
436 sys.stdout.write('\n')
440 class Error(Exception):
441 def __init__(self, msg):
447 class Segment(object):
457 def __init__(self, type, name, dynports, ports, size, align=12):
461 self.dynports = dynports
464 self.alignment = align
466 if self.type not in valid_segment_types:
467 raise Error("Segment type not valid")
468 if self.alignment != 12:
469 raise Error("Wrong alignemt")
470 if self.dynports >= self.ports :
471 raise Error("Dynports >= max ports")
473 self.spec = self.get_spec()
477 params = [self.type, self.name, str(self.dynports), str(self.ports),
478 str(self.size), str(self.alignment)]
479 self.spec = ':'.join(params).encode()
484 xconf = xseg_config()
485 c_spec = create_string_buffer(self.spec)
486 xseg_parse_spec(c_spec, xconf)
487 r = xseg_create(xconf)
489 raise Error("Cannot create segment")
498 raise Error("Cannot destroy segment")
501 xconf = xseg_config()
502 spec_buf = create_string_buffer(self.spec)
503 xseg_parse_spec(spec_buf, xconf)
504 ctx = xseg_join(xconf.type, xconf.name, "posixfd",
505 cast(0, cb_null_ptrtype))
507 raise Error("Cannot join segment")
514 def isExec(file_path):
515 return os.path.isfile(file_path) and os.access(file_path, os.X_OK)
517 def validExec(program):
518 for path in os.environ["PATH"].split(os.pathsep):
519 exe_file = os.path.join(path, program)
524 def validatePort(portno, limit):
526 raise Error("Portno %d out of range" % portno)
528 def validatePortRange(portno_start, portno_end, limit):
529 validatePort(portno_start, limit)
530 validatePort(portno_end, limit)
531 if portno_start > portno_end:
532 raise Error("Portno_start > Portno_end: %d > %d " %
533 (portno_start, portno_end))
534 for start, end in port_ranges:
535 if not (portno_end < start or portno_start > end):
536 raise Error("Port range conflict: (%d, %d) confilcts with (%d, %d)" %
537 (portno_start, portno_end, start, end))
538 port_ranges.append((portno_start, portno_end))
540 xseg_type = config['SEGMENT_TYPE']
541 xseg_name = config['SEGMENT_NAME']
542 xseg_dynports = config['SEGMENT_DYNPORTS']
543 xseg_ports = config['SEGMENT_PORTS']
544 xseg_size = config['SEGMENT_SIZE']
545 xseg_align = config['SEGMENT_ALIGNMENT']
548 segment = Segment(xseg_type, xseg_name, xseg_dynports, xseg_ports, xseg_size,
553 if not config['roles']:
554 raise Error("Roles setup must be provided")
556 raise Error("Roles setup must be provided")
558 for role, role_type in config['roles']:
559 if role_type not in valid_role_types:
560 raise Error("%s is not a valid role" % role_type)
562 role_config = config[role]
564 raise Error("No config found for %s" % role)
566 if role_type == 'file_blocker':
567 peers[role] = Filed(role=role, spec=segment.get_spec(),
568 prefix=ARCHIP_PREFIX, **role_config)
569 elif role_type == 'rados_blocker':
570 peers[role] = Sosd(role=role, spec=segment.get_spec(),
572 elif role_type == 'mapperd':
573 peers[role] = Mapperd(role=role, spec=segment.get_spec(),
575 elif role_type == 'vlmcd':
576 peers[role] = Vlmcd(role=role, spec=segment.get_spec(),
579 raise Error("No valid peer type: %s" % role_type)
580 validatePortRange(peers[role].portno_start, peers[role].portno_end,
583 validatePortRange(config['VTOOL_START'], config['VTOOL_END'], xseg_ports)
584 validatePortRange(config['XSEGBD_START'], config['XSEGBD_END'],
587 xsegbd_range = config['XSEGBD_END'] - config['XSEGBD_START']
588 vlmcd_range = peers['vlmcd'].portno_end - peers['vlmcd'].portno_start
589 if xsegbd_range > vlmcd_range:
590 raise Error("Xsegbd port range must be smaller that vlmcd port range")
596 def construct_peers():
600 def get_vtool_port():
602 if vtool_port is None:
603 vtool_port = random.randint(config['VTOOL_START'], config['VTOOL_END'])
608 def get_lock(lock_file):
611 fd = os.open(lock_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
613 except OSError, (err, reason):
614 print >> sys.stderr, lock_file, reason
615 if err == errno.EEXIST:
618 raise OSError(err, lock_file + ' ' + reason)
621 def exclusive(get_port=False):
623 def lock(*args, **kwargs):
624 if not os.path.exists(LOCK_PATH):
627 except OSError, (err, reason):
628 print >> sys.stderr, reason
630 if not os.path.isdir(LOCK_PATH):
631 raise Error("Locking error: %s is not a directory" % LOCK_PATH)
634 vtool_port = get_vtool_port()
635 lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE + '_' + str(vtool_port))
637 lock_file = os.path.join(LOCK_PATH, VLMC_LOCK_FILE)
639 depth = acquired_locks[lock_file]
641 fd = get_lock(lock_file)
643 acquired_locks[lock_file] = 0
644 fd = get_lock(lock_file)
646 acquired_locks[lock_file] += 1
648 r = fn(*args, **kwargs)
650 acquired_locks[lock_file] -= 1
651 depth = acquired_locks[lock_file]
660 def createBDict(cfg, section):
662 sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
663 sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
664 sec_dic['log_level'] = cfg.getint(section, 'log_level')
665 sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
667 sec_dic['nr_threads'] = cfg.getint(section, 'nr_threads')
668 sec_dic['archip_dir'] = cfg.get(section, 'archip_dir')
669 sec_dic['fdcache'] = cfg.getint(section, 'fdcache')
671 sec_dic['pool'] = cfg.get(section, 'pool')
674 def createMDict(cfg, section):
676 sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
677 sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
678 sec_dic['log_level'] = cfg.getint(section, 'log_level')
679 sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
680 sec_dic['blockerb_port'] = cfg.getint(section, 'blockerb_port')
681 sec_dic['blockerm_port'] = cfg.getint(section, 'blockerm_port')
684 def createVDict(cfg, section):
686 sec_dic['portno_start'] = cfg.getint(section, 'portno_start')
687 sec_dic['portno_end'] = cfg.getint(section, 'portno_end')
688 sec_dic['log_level'] = cfg.getint(section, 'log_level')
689 sec_dic['nr_ops'] = cfg.getint(section, 'nr_ops')
690 sec_dic['blocker_port'] = cfg.getint(section, 'blocker_port')
691 sec_dic['mapper_port'] = cfg.getint(section, 'mapper_port')
698 cfg_dir = os.path.expanduser(DEFAULTS)
701 cfg_fd = open(cfg_dir)
703 raise Error("Cannot read config file")
705 cfg = ConfigParser.ConfigParser()
707 config['SEGMENT_PORTS'] = cfg.getint('XSEG','SEGMENT_PORTS')
708 config['SEGMENT_SIZE'] = cfg.getint('XSEG','SEGMENT_SIZE')
709 config['XSEGBD_START'] = cfg.getint('XSEG','XSEGBD_START')
710 config['XSEGBD_END'] = cfg.getint('XSEG','XSEGBD_END')
711 config['VTOOL_START'] = cfg.getint('XSEG','VTOOL_START')
712 config['VTOOL_END'] = cfg.getint('XSEG','VTOOL_END')
713 config['roles'] = eval(cfg.get('ROLES','order'))
714 config['blockerb'] = createBDict(cfg, 'BLOCKERB')
715 config['blockerm'] = createBDict(cfg, 'BLOCKERM')
716 config['mapperd'] = createMDict(cfg, 'MAPPERD')
717 config['vlmcd'] = createVDict(cfg, 'VLMCD')
720 raise Error("Invalid conf file")
723 def loaded_modules():
724 lines = open("/proc/modules").read().split("\n")
725 modules = [f.split(" ")[0] for f in lines]
729 def loaded_module(name):
730 return name in loaded_modules()
733 def load_module(name, args):
734 s = "Loading %s " % name
735 sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
736 modules = loaded_modules()
738 sys.stdout.write(yellow("Already loaded".ljust(SECOND_COLUMN_WIDTH)))
739 sys.stdout.write("\n")
741 cmd = ["modprobe", "%s" % name]
744 cmd.extend(["%s=%s" % (arg)])
746 check_call(cmd, shell=False)
748 sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
749 sys.stdout.write("\n")
750 raise Error("Cannot load module %s. Check system logs" % name)
751 sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
752 sys.stdout.write("\n")
755 def unload_module(name):
756 s = "Unloading %s " % name
757 sys.stdout.write(s.ljust(FIRST_COLUMN_WIDTH))
758 modules = loaded_modules()
759 if name not in modules:
760 sys.stdout.write(yellow("Not loaded".ljust(SECOND_COLUMN_WIDTH)))
761 sys.stdout.write("\n")
763 cmd = ["modprobe -r %s" % name]
765 check_call(cmd, shell=True)
767 sys.stdout.write(red("FAILED".ljust(SECOND_COLUMN_WIDTH)))
768 sys.stdout.write("\n")
769 raise Error("Cannot unload module %s. Check system logs" % name)
770 sys.stdout.write(green("OK".ljust(SECOND_COLUMN_WIDTH)))
771 sys.stdout.write("\n")
773 xseg_initialized = False
776 def initialize_xseg():
777 global xseg_initialized
778 if not xseg_initialized:
780 xseg_initialized = True
783 def check_running(name, pid=None):
784 for p in psutil.process_iter():
785 if p.name[0:len(name)] == name:
794 def check_pidfile(name):
795 pidfile = os.path.join(PIDFILE_PATH, name + PID_SUFFIX)
798 pf = open(pidfile, "r")
808 class Xseg_ctx(object):
815 def __init__(self, segment, portno=None):
818 raise Error("Cannot join segment")
820 port = xseg_bind_dynport(ctx)
821 portno = xseg_portno_nonstatic(ctx, port)
824 port = xseg_bind_port(ctx, portno, c_void_p(0))
828 raise Error("Cannot bind to port")
830 sd = xseg_get_signal_desc_nonstatic(ctx, port)
832 raise Error("Cannot get signal descriptor")
834 xseg_init_local_signal(ctx, portno)
838 self.dynalloc = dynalloc
839 self.signal_desc = sd
846 raise Error("No segment")
849 def __exit__(self, type_, value, traceback):
854 if self.port is not None and self.dynalloc:
855 xseg_leave_dynport(self.ctx, self.port)
857 # xseg_quit_local_signal(self.ctx, self.portno)
861 def wait_request(self):
862 xseg_prepare_wait(self.ctx, self.portno)
864 received = xseg_receive(self.ctx, self.portno, 0)
866 xseg_cancel_wait(self.ctx, self.portno)
869 xseg_wait_signal_green(self.ctx, self.signal_desc, 10000000)
871 def wait_requests(self, requests):
873 received = self.wait_request()
876 if addressof(received.contents) == \
877 addressof(xseg_req.contents):
879 p = xseg_respond(self.ctx, received, self.portno, X_ALLOC)
881 xseg_put_request(self.ctx, received, self.portno)
883 xseg_signal(self.ctx, p)
886 class Request(object):
890 def __init__(self, xseg_ctx, dst_portno, target, datalen=0, size=0, op=None,
891 data=None, flags=0, offset=0):
893 raise Error("No target")
894 targetlen = len(target)
895 if not datalen and data:
896 if isinstance(data, basestring):
899 datalen = sizeof(data)
903 raise Error("No context")
904 req = xseg_get_request(ctx, xseg_ctx.portno, dst_portno, X_ALLOC)
906 raise Error("Cannot get request")
907 r = xseg_prep_request(ctx, req, targetlen, datalen)
909 xseg_put_request(ctx, req, xseg_ctx.portno)
910 raise Error("Cannot prepare request")
912 self.xseg_ctx = xseg_ctx
914 if not self.set_target(target):
916 raise Error("Cannot set target")
919 if not self.set_data(data):
921 raise Error("Cannot set data")
925 self.set_flags(flags)
926 self.set_offset(offset)
932 raise Error("xseg request not set")
935 def __exit__(self, type_, value, traceback):
940 def put(self, force=False):
944 if xq_count(byref(self.req.contents.path)) > 0:
946 xseg_put_request(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno)
950 def get_datalen(self):
951 return self.req.contents.datalen
953 def set_op(self, op):
954 self.req.contents.op = op
957 return self.req.contents.op
959 def set_offset(self, offset):
960 self.req.contents.offset = offset
962 def get_offset(self):
963 return self.req.contents.offset
966 return self.req.contents.size
968 def set_size(self, size):
969 self.req.contents.size = size
971 def get_serviced(self):
972 return self.req.contents.serviced
974 def set_serviced(self, serviced):
975 self.req.contents.serviced = serviced
977 def set_flags(self, flags):
978 self.req.contents.flags = flags
981 return self.req.contents.flags
983 def set_target(self, target):
984 """Sets the target of the request, respecting request's targetlen"""
985 if len(target) != self.req.contents.targetlen:
987 c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
988 p_target = create_string_buffer(target)
989 # print hex(addressof(c_target.contents))
990 memmove(c_target, p_target, len(target))
993 def get_target(self):
994 """Return a string to the target of the request"""
995 c_target = xseg_get_target_nonstatic(self.xseg_ctx.ctx, self.req)
996 # print "target_addr " + str(addressof(c_target.contents))
997 return string_at(c_target, self.req.contents.targetlen)
999 def set_data(self, data):
1000 """Sets requests data. Data should be a xseg protocol structure"""
1001 if isinstance(data, basestring):
1002 if len(data) != self.req.contents.datalen:
1004 p_data = create_string_buffer(data)
1006 if sizeof(data) != self.req.contents.datalen:
1008 p_data = pointer(data)
1009 c_data = xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req)
1010 memmove(c_data, p_data, self.req.contents.datalen)
1014 def get_data(self, _type=None):
1015 """return a pointer to the data buffer of the request, casted to the
1017 # print "data addr " + str(addressof(xseg_get_data_nonstatic(\
1018 # self.xseg_ctx.ctx, self.req).contents))
1019 # ret = cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
1021 # print addressof(ret.contents)
1024 return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
1027 return cast(xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req),
1031 """Submit the associated xseg_request"""
1032 p = xseg_submit(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno,
1036 xseg_signal(self.xseg_ctx.ctx, p)
1039 """Wait until the associated xseg_request is responded, discarding any
1040 other requests that may be received in the meantime"""
1041 self.xseg_ctx.wait_requests([self])
1044 if not bool(self.req.contents.state & XS_SERVED) and not \
1045 bool(self.req.contents.state & XS_FAILED):
1046 raise Error("Request not completed, nor Failed")
1047 return bool((self.req.contents.state & XS_SERVED) and not \
1048 (self.req.contents.state & XS_FAILED))
1051 def get_write_request(cls, xseg, dst, target, data=None, offset=0,
1052 datalen=0, flags=0):
1059 return cls(xseg, dst, target, op=X_WRITE, data=data, offset=offset,
1060 size=size, datalen=datalen, flags=flags)
1063 def get_read_request(cls, xseg, dst, target, size=0, offset=0, datalen=0):
1066 return cls(xseg, dst, target, op=X_READ, offset=offset, size=size,
1070 def get_info_request(cls, xseg, dst, target):
1071 return cls(xseg, dst, target, op=X_INFO)
1074 def get_copy_request(cls, xseg, dst, target, copy_target=None, size=0, offset=0):
1075 datalen = sizeof(xseg_request_copy)
1076 xcopy = xseg_request_copy()
1077 xcopy.target = target
1078 xcopy.targetlen = len(target)
1079 return cls(xseg, dst, copy_target, op=X_COPY, data=xcopy, datalen=datalen,
1080 size=size, offset=offset)
1082 def get_acquire_request(cls, xseg, dst, target, wait=False):
1086 return cls(xseg, dst, target, op=X_ACQUIRE, flags=flags)
1089 def get_release_request(cls, xseg, dst, target, force=False):
1093 return cls(xseg, dst, target, op=X_RELEASE, flags=flags)
1096 def get_delete_request(cls, xseg, dst, target):
1097 return cls(xseg, dst, target, op=X_DELETE)
1100 def get_clone_request(cls, xseg, dst, target, clone=None, clone_size=0,
1102 datalen = sizeof(xseg_request_clone)
1103 xclone = xseg_request_clone()
1104 xclone.target = target
1105 xclone.targetlen= len(target)
1106 xclone.size = clone_size
1112 return cls(xseg, dst, clone, op=X_CLONE, data=xclone, datalen=datalen,
1116 def get_open_request(cls, xseg, dst, target):
1117 return cls(xseg, dst, target, op=X_OPEN)
1120 def get_close_request(cls, xseg, dst, target):
1121 return cls(xseg, dst, target, op=X_CLOSE)
1124 def get_snapshot_request(cls, xseg, dst, target, snap=None):
1125 datalen = sizeof(xseg_request_snapshot)
1126 xsnapshot = xseg_request_snapshot()
1127 xsnapshot.target = snap
1128 xsnapshot.targetlen= len(snap)
1130 return cls(xseg, dst, target, op=X_SNAPSHOT, data=xsnapshot,
1134 def get_mapr_request(cls, xseg, dst, target, offset=0, size=0):
1135 return cls(xseg, dst, target, op=X_MAPR, offset=offset, size=size,
1139 def get_mapw_request(cls, xseg, dst, target, offset=0, size=0):
1140 return cls(xseg, dst, target, op=X_MAPW, offset=offset, size=size,
1144 def get_hash_request(cls, xseg, dst, target, size=0, offset=0):
1145 return cls(xseg, dst, target, op=X_HASH, size=size, offset=offset)