modules = ['xseg', 'segdev', 'xseg_posix', 'xseg_pthread', 'xseg_segdev']
xsegbd = 'xsegbd'
+BIN_DIR = '/usr/bin/'
DEFAULTS = '/etc/default/archipelago'
#system defaults
REQS = 512
-FILE_BLOCKER = 'archip-pfiled'
+FILE_BLOCKER = 'archip-filed'
RADOS_BLOCKER = 'archip-sosd'
MAPPER = 'archip-mapperd'
VLMC = 'archip-vlmcd'
if not self.executable:
raise Error("Executable must be provided for %s" % role)
- if not portno_start:
+ if portno_start is None:
raise Error("Portno_start must be provied for %s" % role)
self.portno_start = portno_start
- if not portno_end:
+ if portno_end is None:
raise Error("Portno_end must be provied for %s" % role)
self.portno_end = portno_end
def start(self):
if self.get_pid():
raise Error("Peer has valid pidfile")
- cmd = [self.executable] + self.cli_opts
+ cmd = [os.path.join(BIN_DIR, self.executable)] + self.cli_opts
try:
check_call(cmd, shell=False)
except Exception as e:
if self.pidfile:
self.cli_opts.append("--pidfile")
self.cli_opts.append(self.pidfile)
- if self.portno_start:
+ if self.portno_start is not None:
self.cli_opts.append("-sp")
self.cli_opts.append(str(self.portno_start))
- if self.portno_end:
+ if self.portno_end is not None:
self.cli_opts.append("-ep")
self.cli_opts.append(str(self.portno_end))
- if self.log_level:
+ if self.log_level is not None:
self.cli_opts.append("-v")
self.cli_opts.append(str(self.log_level))
if self.spec:
self.cli_opts.append(self.pool)
-class Pfiled(MTpeer):
- def __init__(self, pithos_dir=None, archip_dir=None, prefix=None):
+class Filed(MTpeer):
+ def __init__(self, archip_dir=None, prefix=None, fdcache=None,
+ unique_str=None, **kwargs):
self.executable = FILE_BLOCKER
- self.pithos_dir = pithos_dir
self.archip_dir = archip_dir
self.prefix = prefix
+ self.fdcache = fdcache
+ self.unique_str = unique_str
- super(Pfiled, self).__init__(**kwargs)
-
- if not self.pithos_dir:
- raise Error("%s: Pithos dir must be set" % self.role)
- if not os.path.isdir(self.pithos_dir):
- raise Error("%s: Pithos dir invalid" % self.role)
+ super(Filed, self).__init__(**kwargs)
if not self.archip_dir:
raise Error("%s: Archip dir must be set" % self.role)
if not os.path.isdir(self.archip_dir):
raise Error("%s: Archip dir invalid" % self.role)
+ if not self.fdcache:
+ self.fdcache = 2*self.nr_ops
+ if not self.unique_str:
+ self.unique_str = hostname + '_' + str(self.portno_start)
if self.cli_opts is None:
self.cli_opts = []
- self.set_pfiled_cli_options()
-
- def set_pfiled_cli_options(self):
- if self.pithos_dir:
- self.cli_opts.append("--pithos")
- self.cli_opts.append(self.pithos_dir)
+ self.set_filed_cli_options()
+
+ def set_filed_cli_options(self):
+ if self.unique_str:
+ self.cli_opts.append("--uniquestr")
+ self.cli_opts.append(self.unique_str)
+ if self.fdcache:
+ self.cli_opts.append("--fdcache")
+ self.cli_opts.append(str(self.fdcache))
if self.archip_dir:
self.cli_opts.append("--archip")
self.cli_opts.append(self.archip_dir)
class Mapperd(Peer):
def __init__(self, blockerm_port=None, blockerb_port=None, **kwargs):
self.executable = MAPPER
+ if blockerm_port is None:
+ raise Error("blockerm_port must be provied for %s" % role)
self.blockerm_port = blockerm_port
+
+ if blockerb_port is None:
+ raise Error("blockerb_port must be provied for %s" % role)
self.blockerb_port = blockerb_port
super(Mapperd, self).__init__(**kwargs)
self.set_mapperd_cli_options()
def set_mapperd_cli_options(self):
- if self.blockerm_port:
+ if self.blockerm_port is not None:
self.cli_opts.append("-mbp")
self.cli_opts.append(str(self.blockerm_port))
- if self.blockerb_port:
+ if self.blockerb_port is not None:
self.cli_opts.append("-bp")
self.cli_opts.append(str(self.blockerb_port))
class Vlmcd(Peer):
def __init__(self, blocker_port=None, mapper_port=None, **kwargs):
self.executable = VLMC
+ if blocker_port is None:
+ raise Error("blocker_port must be provied for %s" % role)
self.blocker_port = blocker_port
+
+ if mapper_port is None:
+ raise Error("mapper_port must be provied for %s" % role)
self.mapper_port = mapper_port
super(Vlmcd, self).__init__(**kwargs)
self.set_vlmcd_cli_opts()
def set_vlmcd_cli_opts(self):
- if self.blocker_port:
+ if self.blocker_port is not None:
self.cli_opts.append("-bp")
self.cli_opts.append(str(self.blocker_port))
- if self.mapper_port:
+ if self.mapper_port is not None:
self.cli_opts.append("-mp")
self.cli_opts.append(str(self.mapper_port))
raise Error("No config found for %s" % role)
if role_type == 'file_blocker':
- peers[role] = Pfiled(role=role, spec=config['SPEC'].encode(),
+ peers[role] = Filed(role=role, spec=config['SPEC'].encode(),
prefix=ARCHIP_PREFIX, **role_config)
elif role_type == 'rados_blocker':
peers[role] = Sosd(role=role, spec=config['SPEC'].encode(),
xseg_initialized = True
-def create_segment():
+def create_segment(spec):
#fixme blocking....
initialize_xseg()
xconf = xseg_config()
- xseg_parse_spec(str(config['SPEC']), xconf)
+ c_spec = create_string_buffer(spec)
+ xseg_parse_spec(c_spec, xconf)
r = xseg_create(xconf)
if r < 0:
raise Error("Cannot create segment")
-def destroy_segment():
+def destroy_segment(spec):
#fixme blocking....
try:
initialize_xseg()
xconf = xseg_config()
- xseg_parse_spec(str(config['SPEC']), xconf)
+ c_spec = create_string_buffer(spec)
+ xseg_parse_spec(c_spec, xconf)
xseg = xseg_join(xconf.type, xconf.name, "posix",
cast(0, cb_null_ptrtype))
if not xseg:
def __init__(self, spec, portno):
initialize_xseg()
xconf = xseg_config()
- xseg_parse_spec(create_string_buffer(spec), xconf)
+ spec_buf = create_string_buffer(spec)
+ xseg_parse_spec(spec_buf, xconf)
ctx = xseg_join(xconf.type, xconf.name, "posix",
cast(0, cb_null_ptrtype))
if not ctx:
if r < 0:
xseg_put_request(ctx, req, xseg_ctx.portno)
raise Error("Cannot prepare request")
-# print hex(addressof(req.contents))
+# print hex(addressof(req.contents))
self.req = req
self.xseg_ctx = xseg_ctx
return
- def __del__(self):
- if self.req:
- if xq_count(byref(self.req.contents.path)) == 0:
- xseg_put_request(self.xseg_ctx.ctx, self.req,
- self.xseg_ctx.portno)
- self.req = None
- return False
-
def __enter__(self):
if not self.req:
raise Error("xseg request not set")
return self
def __exit__(self, type_, value, traceback):
- if self.req:
- if xq_count(byref(self.req.contents.path)) == 0:
- xseg_put_request(self.xseg_ctx.ctx, self.req,
- self.xseg_ctx.portno)
+ self.put()
self.req = None
return False
+ def put(self, force=False):
+ if not self.req:
+ return False;
+ if not force:
+ if xq_count(byref(self.req.contents.path)) > 0:
+ return False
+ xseg_put_request(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno)
+ self.req = None
+ return True
+
+ def get_datalen(self):
+ return self.req.contents.datalen
+
def set_op(self, op):
self.req.contents.op = op
def set_size(self, size):
self.req.contents.size = size
+ def get_serviced(self):
+ return self.req.contents.serviced
+
+ def set_serviced(self, serviced):
+ self.req.contents.serviced = serviced
+
def set_flags(self, flags):
self.req.contents.flags = flags
def set_data(self, data):
"""Sets requests data. Data should be a xseg protocol structure"""
- if sizeof(data) != self.req.contents.datalen:
- return False
+ if isinstance(data, basestring):
+ if len(data) != self.req.contents.datalen:
+ return False
+ p_data = create_string_buffer(data)
+ else:
+ if sizeof(data) != self.req.contents.datalen:
+ return False
+ p_data = pointer(data)
c_data = xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req)
- p_data = pointer(data)
memmove(c_data, p_data, self.req.contents.datalen)
return True