python xseg: Enrich request/xseg python api.
authorFilippos Giannakos <philipgian@grnet.gr>
Mon, 15 Jul 2013 13:26:22 +0000 (16:26 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Mon, 15 Jul 2013 13:26:22 +0000 (16:26 +0300)
Add more constructors for the Request class.
Add xseg_wait_requests to wait on multiple requests.
Modify vlmc script to use the new constructors.

xseg/tools/archipelago/archipelago/cli.py
xseg/tools/archipelago/archipelago/common.py
xseg/tools/archipelago/archipelago/vlmc.py

index 35d8cb1..9f3f6c3 100755 (executable)
@@ -98,7 +98,8 @@ def vlmc_parser():
     #group = snapshot_parser.add_mutually_exclusive_group(required=True)
     snapshot_parser.add_argument('-p', '--pool', type=str, nargs='?',
                                  help='for backwards compatiblity with rbd')
-    snapshot_parser.add_argument('name', type=str,  help='volume/device name')
+    snapshot_parser.add_argument('name', type=str,  help='volume name')
+    snapshot_parser.add_argument('snap_name', type=str,  help='Snapshot name')
     snapshot_parser.set_defaults(func=vlmc.snapshot)
 
     ls_parser = subparsers.add_parser('ls', help='List volumes')
index ee013bd..82ffb30 100755 (executable)
@@ -39,7 +39,7 @@ from xseg.xseg_api import *
 from xseg.xprotocol import *
 from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \
     create_string_buffer, pointer, sizeof, POINTER, byref
-
+import ctypes
 cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
 
 import os
@@ -676,16 +676,66 @@ class Xseg_ctx(object):
 
     def shutdown(self):
         if self.ctx:
-            xseg_quit_local_signal(self.ctx, self.portno)
+        #    xseg_quit_local_signal(self.ctx, self.portno)
             xseg_leave(self.ctx)
         self.ctx = None
 
+    def wait_request(self):
+        while True:
+            received = xseg_receive(self.ctx, self.portno, 0)
+            if received:
+                return received
+            else:
+                xseg_prepare_wait(self.ctx, self.portno)
+                xseg_wait_signal(self.ctx, 10000000)
+                xseg_cancel_wait(self.ctx, self.portno)
+
+    def wait_requests(self, requests):
+        while True:
+            received = self.wait_request()
+            for req in requests:
+                xseg_req = req.req
+                if addressof(received.contents) == \
+                            addressof(xseg_req.contents):
+                    return req
+            p = xseg_respond(self.ctx, received, self.portno, X_ALLOC)
+            if p == NoPort:
+                xseg_put_request(self.ctx, received, self.portno)
+            else:
+                xseg_signal(self.ctx, p)
+
 
 class Request(object):
     xseg_ctx = None
     req = None
 
-    def __init__(self, xseg_ctx, dst_portno, targetlen, datalen):
+    #def __init__(self, xseg_ctx, dst_portno, targetlen, datalen):
+        #ctx = xseg_ctx.ctx
+        #if not ctx:
+            #raise Error("No context")
+        #req = xseg_get_request(ctx, xseg_ctx.portno, dst_portno, X_ALLOC)
+        #if not req:
+            #raise Error("Cannot get request")
+        #r = xseg_prep_request(ctx, req, targetlen, datalen)
+        #if r < 0:
+            #xseg_put_request(ctx, req, xseg_ctx.portno)
+            #raise Error("Cannot prepare request")
+##       print hex(addressof(req.contents))
+        #self.req = req
+        #self.xseg_ctx = xseg_ctx
+        #return
+
+    def __init__(self, xseg_ctx, dst_portno, target, datalen=0, size=0, op=None,
+                 data=None, flags=0, offset=0):
+        if not target:
+            raise Error("No target")
+        targetlen = len(target)
+        if not datalen and data:
+            if isinstance(data, basestring):
+                datalen = len(data)
+            else:
+                datalen = sizeof(data)
+
         ctx = xseg_ctx.ctx
         if not ctx:
             raise Error("No context")
@@ -696,9 +746,23 @@ class Request(object):
         if r < 0:
             xseg_put_request(ctx, req, xseg_ctx.portno)
             raise Error("Cannot prepare request")
-#       print hex(addressof(req.contents))
         self.req = req
         self.xseg_ctx = xseg_ctx
+        if not self.set_target(target):
+            self.put()
+            raise Error("Cannot set target")
+
+        if (data):
+            if not self.set_data(data):
+                self.put()
+                raise Error("Cannot set data")
+
+        self.set_size(size)
+        self.set_op(op)
+        self.set_flags(flags)
+        self.set_offset(offset)
+
         return
 
     def __enter__(self):
@@ -785,7 +849,7 @@ class Request(object):
 
         return True
 
-    def get_data(self, _type):
+    def get_data(self, _type=None):
         """return a pointer to the data buffer of the request, casted to the
         selected type"""
 #        print "data addr " + str(addressof(xseg_get_data_nonstatic(\
@@ -812,32 +876,98 @@ class Request(object):
     def wait(self):
         """Wait until the associated xseg_request is responded, discarding any
         other requests that may be received in the meantime"""
-        while True:
-            received = xseg_receive(self.xseg_ctx.ctx, self.xseg_ctx.portno, 0)
-            if received:
-#                print addressof(cast(self.req, c_void_p))
-#                print addressof(cast(received, c_void_p))
-#                print addressof(self.req.contents)
-#                print addressof(received.contents)
-                if addressof(received.contents) == \
-                        addressof(self.req.contents):
-#                if addressof(cast(received, c_void_p)) == \
-#                        addressof(cast(self.req, c_void_p)):
-                    break
-                else:
-                    p = xseg_respond(self.xseg_ctx.ctx, received,
-                                     self.xseg_ctx.portno, X_ALLOC)
-                    if p == NoPort:
-                        xseg_put_request(self.xseg_ctx.ctx, received,
-                                         self.xseg_ctx.portno)
-                    else:
-                        xseg_signal(self.xseg_ctx.ctx, p)
-            else:
-                xseg_prepare_wait(self.xseg_ctx.ctx, self.xseg_ctx.portno)
-                xseg_wait_signal(self.xseg_ctx.ctx, 10000000)
-                xseg_cancel_wait(self.xseg_ctx.ctx, self.xseg_ctx.portno)
-        return True
+        self.xseg_ctx.wait_requests([self])
 
     def success(self):
-        return bool((self.req.contents.state & XS_SERVED) and not
+        if not bool(self.req.contents.state & XS_SERVED) and not \
+            bool(self.req.contents.state & XS_FAILED):
+            raise Error("Request not completed, nor Failed")
+        return bool((self.req.contents.state & XS_SERVED) and not \
                    (self.req.contents.state & XS_FAILED))
+
+    @classmethod
+    def get_write_request(cls, xseg, dst, target, data=None, offset=0,
+            datalen=0):
+        if data is None:
+            data = ""
+        size = len(data)
+        if not datalen:
+            datalen = size
+
+        return cls(xseg, dst, target, op=X_WRITE, data=data, offset=offset,
+                   size=size, datalen=datalen)
+
+    @classmethod
+    def get_read_request(cls, xseg, dst, target, size=0, offset=0, datalen=0):
+        if not datalen:
+            datalen=size
+        return cls(xseg, dst, target, op=X_READ, offset=offset, size=size,
+                   datalen=datalen)
+
+    @classmethod
+    def get_info_request(cls, xseg, dst, target):
+        return cls(xseg, dst, target, op=X_INFO)
+
+    @classmethod
+    def get_copy_request(cls, xseg, dst, target, copy_target=None, size=0, offset=0):
+        datalen = sizeof(xseg_request_copy)
+        xcopy = xseg_request_copy()
+        xcopy.target = target
+        xcopy.targetlen = len(target)
+        return cls(xseg, dst, copy_target, op=X_COPY, data=xcopy, datalen=datalen,
+                size=size, offset=offset)
+    @classmethod
+    def get_acquire_request(cls, xseg, dst, target, wait=False):
+        flags = 0
+        if not wait:
+            flags = XF_NOSYNC
+        return cls(xseg, dst, target, op=X_ACQUIRE, flags=flags)
+
+    @classmethod
+    def get_release_request(cls, xseg, dst, target, force=False):
+        flags = 0
+        if force:
+            flags = XF_FORCE
+        return cls(xseg, dst, target, op=X_RELEASE, flags=flags)
+
+    @classmethod
+    def get_delete_request(cls, xseg, dst, target):
+        return cls(xseg, dst, target, op=X_DELETE)
+
+    @classmethod
+    def get_clone_request(cls, xseg, dst, target, clone=None, clone_size=0):
+        datalen = sizeof(xseg_request_clone)
+        xclone = xseg_request_clone()
+        xclone.target = target
+        xclone.targetlen= len(target)
+        xclone.size = clone_size
+
+        return cls(xseg, dst, clone, op=X_CLONE, data=xclone, datalen=datalen)
+
+    @classmethod
+    def get_open_request(cls, xseg, dst, target):
+        return cls(xseg, dst, target, op=X_OPEN)
+
+    @classmethod
+    def get_close_request(cls, xseg, dst, target):
+        return cls(xseg, dst, target, op=X_CLOSE)
+
+    @classmethod
+    def get_snapshot_request(cls, xseg, dst, target, snap=None):
+        datalen = sizeof(xseg_request_snapshot)
+        xsnapshot = xseg_request_snapshot()
+        xsnapshot.target = snap
+        xsnapshot.targetlen= len(snap)
+
+        return cls(xseg, dst, target, op=X_SNAPSHOT, data=xsnapshot,
+                datalen=datalen)
+
+    @classmethod
+    def get_mapr_request(cls, xseg, dst, target, offset=0, size=0):
+        return cls(xseg, dst, target, op=X_MAPR, offset=offset, size=size,
+                datalen=0)
+
+    @classmethod
+    def get_mapw_request(cls, xseg, dst, target, offset=0, size=0):
+        return cls(xseg, dst, target, op=X_MAPW, offset=offset, size=size,
+                datalen=0)
index 1422322..2659cdc 100755 (executable)
@@ -106,69 +106,45 @@ def create(name, size=None, snap=None, **kwargs):
     if size is None and snap is None:
         raise Error("At least one of the size/snap args must be provided")
 
+    if not snap:
+        snap = ""
+    if not size:
+        size = 0
+    else:
+        size = size << 20
+
     ret = False
     xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
     mport = peers['mapperd'].portno_start
-    datasize = sizeof(xseg_request_clone)
-    with Request(xseg_ctx, mport, len(name), datasize) as req:
-        req.set_op(X_CLONE)
-        req.set_size(sizeof(xseg_request_clone))
-        req.set_offset(0)
-        req.set_target(name)
-
-        xclone = xseg_request_clone()
-        if snap:
-            xclone.target = snap
-            xclone.targetlen = len(snap)
-        else:
-            xclone.target = ""
-            xclone.targetlen = 0
-        if size:
-            xclone.size = size << 20
-        else:
-            xclone.size = 0
-
-        req.set_data(xclone)
-        req.submit()
-        req.wait()
-        ret = req.success()
+    req = Request.get_clone_request(xseg_ctx, mport, snap, clone=name,
+            clone_size=size)
+    req.submit()
+    req.wait()
+    ret = req.success()
+    req.put()
     xseg_ctx.shutdown()
     if not ret:
         raise Error("vlmc creation failed")
 
 
 @exclusive
-def snapshot(name, cli=False, **kwargs):
+def snapshot(name, snap_name=None, cli=False, **kwargs):
     if len(name) < 6:
         raise Error("Name should have at least len 6")
 
-    ret = False
     xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
     vport = peers['vlmcd'].portno_start
-    datasize = sizeof(xseg_request_snapshot)
-    with Request(xseg_ctx, vport, len(name), datasize) as req:
-        req.set_op(X_SNAPSHOT)
-        req.set_size(sizeof(xseg_request_snapshot))
-        req.set_offset(0)
-        req.set_target(name)
-
-        xsnapshot = xseg_request_snapshot()
-        xsnapshot.target = ""
-        xsnapshot.targetlen = 0
-        req.set_data(xsnapshot)
-        req.submit()
-        req.wait()
-        ret = req.success()
-        if ret:
-            reply = string_at(req.get_data(xseg_reply_snapshot).
-                              contents.target, 64)
+    req = Request.get_snapshot_request(xseg_ctx, vport, name, snap=snap_name)
+    req.submit()
+    req.wait()
+    ret = req.success()
+    req.put()
     xseg_ctx.shutdown()
+
     if not ret:
         raise Error("vlmc snapshot failed")
     if cli:
-        sys.stdout.write("Snapshot name: %s\n" % reply)
-    return reply
-
+        sys.stdout.write("Snapshot name: %s\n" % snap_name)
 
 def list_volumes(**kwargs):
     if isinstance(peers['blockerm'], Sosd):
@@ -205,14 +181,10 @@ def remove(name, **kwargs):
     ret = False
     xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
     mport = peers['mapperd'].portno_start
-    with Request(xseg_ctx, mport, len(name), 0) as req:
-        req.set_op(X_DELETE)
-        req.set_size(0)
-        req.set_offset(0)
-        req.set_target(name)
-        req.submit()
-        req.wait()
-        ret = req.success()
+    req = Request.get_delete_request(xseg_ctx, mport, name)
+    req.submit()
+    req.wait()
+    ret = req.success()
     xseg_ctx.shutdown()
     if not ret:
         raise Error("vlmc removal failed")
@@ -300,18 +272,12 @@ def lock(name, cli=False, **kwargs):
 
     name = ARCHIP_PREFIX + name
 
-    ret = False
     xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
     mbport = peers['blockerm'].portno_start
-    with Request(xseg_ctx, mbport, len(name), 0) as req:
-        req.set_op(X_ACQUIRE)
-        req.set_size(0)
-        req.set_offset(0)
-        req.set_flags(XF_NOSYNC)
-        req.set_target(name)
-        req.submit()
-        req.wait()
-        ret = req.success()
+    ret = Request.get_acquire_request(xseg_ctx, mbport, name)
+    req.submit()
+    req.wait()
+    ret = req.success()
     xseg_ctx.shutdown()
     if not ret:
         raise Error("vlmc lock failed")
@@ -326,21 +292,12 @@ def unlock(name, force=False, cli=False, **kwargs):
 
     name = ARCHIP_PREFIX + name
 
-    ret = False
     xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
     mbport = peers['blockerm'].portno_start
-    with Request(xseg_ctx, mbport, len(name), 0) as req:
-        req.set_op(X_RELEASE)
-        req.set_size(0)
-        req.set_offset(0)
-        req.set_target(name)
-        if force:
-            req.set_flags(XF_NOSYNC | XF_FORCE)
-        else:
-            req.set_flags(XF_NOSYNC)
-        req.submit()
-        req.wait()
-        ret = req.success()
+    req = Request.get_release_request(xseg_ctx, mbport, name, force=force)
+    req.submit()
+    req.wait()
+    ret = req.success()
     xseg_ctx.shutdown()
     if not ret:
         raise Error("vlmc unlock failed")
@@ -356,14 +313,10 @@ def open_volume(name, cli=False, **kwargs):
     ret = False
     xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
     vport = peers['vlmcd'].portno_start
-    with Request(xseg_ctx, vport, len(name), 0) as req:
-        req.set_op(X_OPEN)
-        req.set_size(0)
-        req.set_offset(0)
-        req.set_target(name)
-        req.submit()
-        req.wait()
-        ret = req.success()
+    ret = Request.get_open_request(xseg_ctx, vport, name)
+    req.submit()
+    req.wait()
+    ret = req.success()
     xseg_ctx.shutdown()
     if not ret:
         raise Error("vlmc open failed")
@@ -379,14 +332,10 @@ def close_volume(name, cli=False, **kwargs):
     ret = False
     xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
     vport = peers['vlmcd'].portno_start
-    with Request(xseg_ctx, vport, len(name), 0) as req:
-        req.set_op(X_CLOSE)
-        req.set_size(0)
-        req.set_offset(0)
-        req.set_target(name)
-        req.submit()
-        req.wait()
-        ret = req.success()
+    ret = Request.get_close_request(xseg_ctx, vport, name)
+    req.submit()
+    req.wait()
+    ret = req.success()
     xseg_ctx.shutdown()
     if not ret:
         raise Error("vlmc close failed")
@@ -402,16 +351,12 @@ def info(name, cli=False, **kwargs):
     ret = False
     xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
     mport = peers['mapperd'].portno_start
-    with Request(xseg_ctx, mport, len(name), 0) as req:
-        req.set_op(X_INFO)
-        req.set_size(0)
-        req.set_offset(0)
-        req.set_target(name)
-        req.submit()
-        req.wait()
-        ret = req.success()
-        if ret:
-            size = req.get_data(xseg_reply_info).contents.size
+    ret = Request.get_info_request(xseg_ctx, mport, name)
+    req.submit()
+    req.wait()
+    ret = req.success()
+    if ret:
+        size = req.get_data(xseg_reply_info).contents.size
     xseg_ctx.shutdown()
     if not ret:
         raise Error("vlmc info failed")