Improve python archipelago common infrastructure.
authorFilippos Giannakos <philipgian@grnet.gr>
Thu, 11 Jul 2013 15:12:20 +0000 (18:12 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Thu, 11 Jul 2013 15:12:20 +0000 (18:12 +0300)
The changes include:
 * Convert pfiled to filed and add the extra arguments
 * Check if arguments are not None instead of simply checking for arguments
 * Move request to seperate method and remove __del__ method
 * Add get_serviced and get_datalen to request
 * Add support to set string add data request
 * Make all peer executables relative to a BIN_DIR
 * Create/destroy segment takes spec as arguments
 * Parsing spec on Xsegctx __init__ does not modify spec parameter

xseg/tools/archipelago/archipelago/common.py

index 1bdcd20..ee013bd 100755 (executable)
@@ -61,6 +61,7 @@ xsegbd_args = []
 modules = ['xseg', 'segdev', 'xseg_posix', 'xseg_pthread', 'xseg_segdev']
 xsegbd = 'xsegbd'
 
+BIN_DIR = '/usr/bin/'
 DEFAULTS = '/etc/default/archipelago'
 
 #system defaults
@@ -80,7 +81,7 @@ CHARDEV_MINOR = 0
 
 REQS = 512
 
-FILE_BLOCKER = 'archip-pfiled'
+FILE_BLOCKER = 'archip-filed'
 RADOS_BLOCKER = 'archip-sosd'
 MAPPER = 'archip-mapperd'
 VLMC = 'archip-vlmcd'
@@ -99,11 +100,11 @@ class Peer(object):
         if not self.executable:
             raise Error("Executable must be provided for %s" % role)
 
-        if not portno_start:
+        if portno_start is None:
             raise Error("Portno_start must be provied for %s" % role)
         self.portno_start = portno_start
 
-        if not portno_end:
+        if portno_end is None:
             raise Error("Portno_end must be provied for %s" % role)
         self.portno_end = portno_end
 
@@ -156,7 +157,7 @@ class Peer(object):
     def start(self):
         if self.get_pid():
             raise Error("Peer has valid pidfile")
-        cmd = [self.executable] + self.cli_opts
+        cmd = [os.path.join(BIN_DIR, self.executable)] + self.cli_opts
         try:
             check_call(cmd, shell=False)
         except Exception as e:
@@ -217,13 +218,13 @@ class Peer(object):
         if self.pidfile:
             self.cli_opts.append("--pidfile")
             self.cli_opts.append(self.pidfile)
-        if self.portno_start:
+        if self.portno_start is not None:
             self.cli_opts.append("-sp")
             self.cli_opts.append(str(self.portno_start))
-        if self.portno_end:
+        if self.portno_end is not None:
             self.cli_opts.append("-ep")
             self.cli_opts.append(str(self.portno_end))
-        if self.log_level:
+        if self.log_level is not None:
             self.cli_opts.append("-v")
             self.cli_opts.append(str(self.log_level))
         if self.spec:
@@ -261,33 +262,37 @@ class Sosd(MTpeer):
             self.cli_opts.append(self.pool)
 
 
-class Pfiled(MTpeer):
-    def __init__(self, pithos_dir=None, archip_dir=None, prefix=None):
+class Filed(MTpeer):
+    def __init__(self, archip_dir=None, prefix=None, fdcache=None,
+                 unique_str=None, **kwargs):
         self.executable = FILE_BLOCKER
-        self.pithos_dir = pithos_dir
         self.archip_dir = archip_dir
         self.prefix = prefix
+        self.fdcache = fdcache
+        self.unique_str = unique_str
 
-        super(Pfiled, self).__init__(**kwargs)
-
-        if not self.pithos_dir:
-            raise Error("%s: Pithos dir must be set" % self.role)
-        if not os.path.isdir(self.pithos_dir):
-            raise Error("%s: Pithos dir invalid" % self.role)
+        super(Filed, self).__init__(**kwargs)
 
         if not self.archip_dir:
             raise Error("%s: Archip dir must be set" % self.role)
         if not os.path.isdir(self.archip_dir):
             raise Error("%s: Archip dir invalid" % self.role)
+        if not self.fdcache:
+            self.fdcache = 2*self.nr_ops
+        if not self.unique_str:
+            self.unique_str = hostname + '_' + str(self.portno_start)
 
         if self.cli_opts is None:
             self.cli_opts = []
-        self.set_pfiled_cli_options()
-
-    def set_pfiled_cli_options(self):
-        if self.pithos_dir:
-            self.cli_opts.append("--pithos")
-            self.cli_opts.append(self.pithos_dir)
+        self.set_filed_cli_options()
+
+    def set_filed_cli_options(self):
+        if self.unique_str:
+            self.cli_opts.append("--uniquestr")
+            self.cli_opts.append(self.unique_str)
+        if self.fdcache:
+            self.cli_opts.append("--fdcache")
+            self.cli_opts.append(str(self.fdcache))
         if self.archip_dir:
             self.cli_opts.append("--archip")
             self.cli_opts.append(self.archip_dir)
@@ -299,7 +304,12 @@ class Pfiled(MTpeer):
 class Mapperd(Peer):
     def __init__(self, blockerm_port=None, blockerb_port=None, **kwargs):
         self.executable = MAPPER
+        if blockerm_port is None:
+            raise Error("blockerm_port must be provied for %s" % role)
         self.blockerm_port = blockerm_port
+
+        if blockerb_port is None:
+            raise Error("blockerb_port must be provied for %s" % role)
         self.blockerb_port = blockerb_port
         super(Mapperd, self).__init__(**kwargs)
 
@@ -308,10 +318,10 @@ class Mapperd(Peer):
         self.set_mapperd_cli_options()
 
     def set_mapperd_cli_options(self):
-        if self.blockerm_port:
+        if self.blockerm_port is not None:
             self.cli_opts.append("-mbp")
             self.cli_opts.append(str(self.blockerm_port))
-        if self.blockerb_port:
+        if self.blockerb_port is not None:
             self.cli_opts.append("-bp")
             self.cli_opts.append(str(self.blockerb_port))
 
@@ -319,7 +329,12 @@ class Mapperd(Peer):
 class Vlmcd(Peer):
     def __init__(self, blocker_port=None, mapper_port=None, **kwargs):
         self.executable = VLMC
+        if blocker_port is None:
+            raise Error("blocker_port must be provied for %s" % role)
         self.blocker_port = blocker_port
+
+        if mapper_port is None:
+            raise Error("mapper_port must be provied for %s" % role)
         self.mapper_port = mapper_port
         super(Vlmcd, self).__init__(**kwargs)
 
@@ -328,10 +343,10 @@ class Vlmcd(Peer):
         self.set_vlmcd_cli_opts()
 
     def set_vlmcd_cli_opts(self):
-        if self.blocker_port:
+        if self.blocker_port is not None:
             self.cli_opts.append("-bp")
             self.cli_opts.append(str(self.blocker_port))
-        if self.mapper_port:
+        if self.mapper_port is not None:
             self.cli_opts.append("-mp")
             self.cli_opts.append(str(self.mapper_port))
 
@@ -440,7 +455,7 @@ def check_conf():
             raise Error("No config found for %s" % role)
 
         if role_type == 'file_blocker':
-            peers[role] = Pfiled(role=role, spec=config['SPEC'].encode(),
+            peers[role] = Filed(role=role, spec=config['SPEC'].encode(),
                                  prefix=ARCHIP_PREFIX, **role_config)
         elif role_type == 'rados_blocker':
             peers[role] = Sosd(role=role, spec=config['SPEC'].encode(),
@@ -571,22 +586,24 @@ def initialize_xseg():
         xseg_initialized = True
 
 
-def create_segment():
+def create_segment(spec):
     #fixme blocking....
     initialize_xseg()
     xconf = xseg_config()
-    xseg_parse_spec(str(config['SPEC']), xconf)
+    c_spec = create_string_buffer(spec)
+    xseg_parse_spec(c_spec, xconf)
     r = xseg_create(xconf)
     if r < 0:
         raise Error("Cannot create segment")
 
 
-def destroy_segment():
+def destroy_segment(spec):
     #fixme blocking....
     try:
         initialize_xseg()
         xconf = xseg_config()
-        xseg_parse_spec(str(config['SPEC']), xconf)
+        c_spec = create_string_buffer(spec)
+        xseg_parse_spec(c_spec, xconf)
         xseg = xseg_join(xconf.type, xconf.name, "posix",
                          cast(0, cb_null_ptrtype))
         if not xseg:
@@ -631,7 +648,8 @@ class Xseg_ctx(object):
     def __init__(self, spec, portno):
         initialize_xseg()
         xconf = xseg_config()
-        xseg_parse_spec(create_string_buffer(spec), xconf)
+        spec_buf = create_string_buffer(spec)
+        xseg_parse_spec(spec_buf, xconf)
         ctx = xseg_join(xconf.type, xconf.name, "posix",
                         cast(0, cb_null_ptrtype))
         if not ctx:
@@ -678,32 +696,34 @@ class Request(object):
         if r < 0:
             xseg_put_request(ctx, req, xseg_ctx.portno)
             raise Error("Cannot prepare request")
-#        print hex(addressof(req.contents))
+#       print hex(addressof(req.contents))
         self.req = req
         self.xseg_ctx = xseg_ctx
         return
 
-    def __del__(self):
-        if self.req:
-            if xq_count(byref(self.req.contents.path)) == 0:
-                xseg_put_request(self.xseg_ctx.ctx, self.req,
-                                 self.xseg_ctx.portno)
-        self.req = None
-        return False
-
     def __enter__(self):
         if not self.req:
             raise Error("xseg request not set")
         return self
 
     def __exit__(self, type_, value, traceback):
-        if self.req:
-            if xq_count(byref(self.req.contents.path)) == 0:
-                xseg_put_request(self.xseg_ctx.ctx, self.req,
-                                 self.xseg_ctx.portno)
+        self.put()
         self.req = None
         return False
 
+    def put(self, force=False):
+        if not self.req:
+            return False;
+        if not force:
+            if xq_count(byref(self.req.contents.path)) > 0:
+                return False
+        xseg_put_request(self.xseg_ctx.ctx, self.req, self.xseg_ctx.portno)
+        self.req = None
+        return True
+
+    def get_datalen(self):
+        return self.req.contents.datalen
+
     def set_op(self, op):
         self.req.contents.op = op
 
@@ -722,6 +742,12 @@ class Request(object):
     def set_size(self, size):
         self.req.contents.size = size
 
+    def get_serviced(self):
+        return self.req.contents.serviced
+
+    def set_serviced(self, serviced):
+        self.req.contents.serviced = serviced
+
     def set_flags(self, flags):
         self.req.contents.flags = flags
 
@@ -746,10 +772,15 @@ class Request(object):
 
     def set_data(self, data):
         """Sets requests data. Data should be a xseg protocol structure"""
-        if sizeof(data) != self.req.contents.datalen:
-            return False
+        if isinstance(data, basestring):
+            if len(data) != self.req.contents.datalen:
+                return False
+            p_data = create_string_buffer(data)
+        else:
+            if sizeof(data) != self.req.contents.datalen:
+                return False
+            p_data = pointer(data)
         c_data = xseg_get_data_nonstatic(self.xseg_ctx.ctx, self.req)
-        p_data = pointer(data)
         memmove(c_data, p_data, self.req.contents.datalen)
 
         return True