from xseg.xprotocol import *
from ctypes import CFUNCTYPE, cast, c_void_p, addressof, string_at, memmove, \
create_string_buffer, pointer, sizeof, POINTER, byref
-
+import ctypes
cb_null_ptrtype = CFUNCTYPE(None, uint32_t)
import os
def shutdown(self):
if self.ctx:
- xseg_quit_local_signal(self.ctx, self.portno)
+ # xseg_quit_local_signal(self.ctx, self.portno)
xseg_leave(self.ctx)
self.ctx = None
+ def wait_request(self):
+ while True:
+ received = xseg_receive(self.ctx, self.portno, 0)
+ if received:
+ return received
+ else:
+ xseg_prepare_wait(self.ctx, self.portno)
+ xseg_wait_signal(self.ctx, 10000000)
+ xseg_cancel_wait(self.ctx, self.portno)
+
+ def wait_requests(self, requests):
+ while True:
+ received = self.wait_request()
+ for req in requests:
+ xseg_req = req.req
+ if addressof(received.contents) == \
+ addressof(xseg_req.contents):
+ return req
+ p = xseg_respond(self.ctx, received, self.portno, X_ALLOC)
+ if p == NoPort:
+ xseg_put_request(self.ctx, received, self.portno)
+ else:
+ xseg_signal(self.ctx, p)
+
class Request(object):
xseg_ctx = None
req = None
- def __init__(self, xseg_ctx, dst_portno, targetlen, datalen):
+ #def __init__(self, xseg_ctx, dst_portno, targetlen, datalen):
+ #ctx = xseg_ctx.ctx
+ #if not ctx:
+ #raise Error("No context")
+ #req = xseg_get_request(ctx, xseg_ctx.portno, dst_portno, X_ALLOC)
+ #if not req:
+ #raise Error("Cannot get request")
+ #r = xseg_prep_request(ctx, req, targetlen, datalen)
+ #if r < 0:
+ #xseg_put_request(ctx, req, xseg_ctx.portno)
+ #raise Error("Cannot prepare request")
+## print hex(addressof(req.contents))
+ #self.req = req
+ #self.xseg_ctx = xseg_ctx
+ #return
+
+ def __init__(self, xseg_ctx, dst_portno, target, datalen=0, size=0, op=None,
+ data=None, flags=0, offset=0):
+ if not target:
+ raise Error("No target")
+ targetlen = len(target)
+ if not datalen and data:
+ if isinstance(data, basestring):
+ datalen = len(data)
+ else:
+ datalen = sizeof(data)
+
ctx = xseg_ctx.ctx
if not ctx:
raise Error("No context")
if r < 0:
xseg_put_request(ctx, req, xseg_ctx.portno)
raise Error("Cannot prepare request")
-# print hex(addressof(req.contents))
self.req = req
self.xseg_ctx = xseg_ctx
+
+ if not self.set_target(target):
+ self.put()
+ raise Error("Cannot set target")
+
+ if (data):
+ if not self.set_data(data):
+ self.put()
+ raise Error("Cannot set data")
+
+ self.set_size(size)
+ self.set_op(op)
+ self.set_flags(flags)
+ self.set_offset(offset)
+
return
def __enter__(self):
return True
- def get_data(self, _type):
+ def get_data(self, _type=None):
"""return a pointer to the data buffer of the request, casted to the
selected type"""
# print "data addr " + str(addressof(xseg_get_data_nonstatic(\
def wait(self):
"""Wait until the associated xseg_request is responded, discarding any
other requests that may be received in the meantime"""
- while True:
- received = xseg_receive(self.xseg_ctx.ctx, self.xseg_ctx.portno, 0)
- if received:
-# print addressof(cast(self.req, c_void_p))
-# print addressof(cast(received, c_void_p))
-# print addressof(self.req.contents)
-# print addressof(received.contents)
- if addressof(received.contents) == \
- addressof(self.req.contents):
-# if addressof(cast(received, c_void_p)) == \
-# addressof(cast(self.req, c_void_p)):
- break
- else:
- p = xseg_respond(self.xseg_ctx.ctx, received,
- self.xseg_ctx.portno, X_ALLOC)
- if p == NoPort:
- xseg_put_request(self.xseg_ctx.ctx, received,
- self.xseg_ctx.portno)
- else:
- xseg_signal(self.xseg_ctx.ctx, p)
- else:
- xseg_prepare_wait(self.xseg_ctx.ctx, self.xseg_ctx.portno)
- xseg_wait_signal(self.xseg_ctx.ctx, 10000000)
- xseg_cancel_wait(self.xseg_ctx.ctx, self.xseg_ctx.portno)
- return True
+ self.xseg_ctx.wait_requests([self])
def success(self):
- return bool((self.req.contents.state & XS_SERVED) and not
+ if not bool(self.req.contents.state & XS_SERVED) and not \
+ bool(self.req.contents.state & XS_FAILED):
+ raise Error("Request not completed, nor Failed")
+ return bool((self.req.contents.state & XS_SERVED) and not \
(self.req.contents.state & XS_FAILED))
+
+ @classmethod
+ def get_write_request(cls, xseg, dst, target, data=None, offset=0,
+ datalen=0):
+ if data is None:
+ data = ""
+ size = len(data)
+ if not datalen:
+ datalen = size
+
+ return cls(xseg, dst, target, op=X_WRITE, data=data, offset=offset,
+ size=size, datalen=datalen)
+
+ @classmethod
+ def get_read_request(cls, xseg, dst, target, size=0, offset=0, datalen=0):
+ if not datalen:
+ datalen=size
+ return cls(xseg, dst, target, op=X_READ, offset=offset, size=size,
+ datalen=datalen)
+
+ @classmethod
+ def get_info_request(cls, xseg, dst, target):
+ return cls(xseg, dst, target, op=X_INFO)
+
+ @classmethod
+ def get_copy_request(cls, xseg, dst, target, copy_target=None, size=0, offset=0):
+ datalen = sizeof(xseg_request_copy)
+ xcopy = xseg_request_copy()
+ xcopy.target = target
+ xcopy.targetlen = len(target)
+ return cls(xseg, dst, copy_target, op=X_COPY, data=xcopy, datalen=datalen,
+ size=size, offset=offset)
+ @classmethod
+ def get_acquire_request(cls, xseg, dst, target, wait=False):
+ flags = 0
+ if not wait:
+ flags = XF_NOSYNC
+ return cls(xseg, dst, target, op=X_ACQUIRE, flags=flags)
+
+ @classmethod
+ def get_release_request(cls, xseg, dst, target, force=False):
+ flags = 0
+ if force:
+ flags = XF_FORCE
+ return cls(xseg, dst, target, op=X_RELEASE, flags=flags)
+
+ @classmethod
+ def get_delete_request(cls, xseg, dst, target):
+ return cls(xseg, dst, target, op=X_DELETE)
+
+ @classmethod
+ def get_clone_request(cls, xseg, dst, target, clone=None, clone_size=0):
+ datalen = sizeof(xseg_request_clone)
+ xclone = xseg_request_clone()
+ xclone.target = target
+ xclone.targetlen= len(target)
+ xclone.size = clone_size
+
+ return cls(xseg, dst, clone, op=X_CLONE, data=xclone, datalen=datalen)
+
+ @classmethod
+ def get_open_request(cls, xseg, dst, target):
+ return cls(xseg, dst, target, op=X_OPEN)
+
+ @classmethod
+ def get_close_request(cls, xseg, dst, target):
+ return cls(xseg, dst, target, op=X_CLOSE)
+
+ @classmethod
+ def get_snapshot_request(cls, xseg, dst, target, snap=None):
+ datalen = sizeof(xseg_request_snapshot)
+ xsnapshot = xseg_request_snapshot()
+ xsnapshot.target = snap
+ xsnapshot.targetlen= len(snap)
+
+ return cls(xseg, dst, target, op=X_SNAPSHOT, data=xsnapshot,
+ datalen=datalen)
+
+ @classmethod
+ def get_mapr_request(cls, xseg, dst, target, offset=0, size=0):
+ return cls(xseg, dst, target, op=X_MAPR, offset=offset, size=size,
+ datalen=0)
+
+ @classmethod
+ def get_mapw_request(cls, xseg, dst, target, offset=0, size=0):
+ return cls(xseg, dst, target, op=X_MAPW, offset=offset, size=size,
+ datalen=0)
if size is None and snap is None:
raise Error("At least one of the size/snap args must be provided")
+ if not snap:
+ snap = ""
+ if not size:
+ size = 0
+ else:
+ size = size << 20
+
ret = False
xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
mport = peers['mapperd'].portno_start
- datasize = sizeof(xseg_request_clone)
- with Request(xseg_ctx, mport, len(name), datasize) as req:
- req.set_op(X_CLONE)
- req.set_size(sizeof(xseg_request_clone))
- req.set_offset(0)
- req.set_target(name)
-
- xclone = xseg_request_clone()
- if snap:
- xclone.target = snap
- xclone.targetlen = len(snap)
- else:
- xclone.target = ""
- xclone.targetlen = 0
- if size:
- xclone.size = size << 20
- else:
- xclone.size = 0
-
- req.set_data(xclone)
- req.submit()
- req.wait()
- ret = req.success()
+ req = Request.get_clone_request(xseg_ctx, mport, snap, clone=name,
+ clone_size=size)
+ req.submit()
+ req.wait()
+ ret = req.success()
+ req.put()
xseg_ctx.shutdown()
if not ret:
raise Error("vlmc creation failed")
@exclusive
-def snapshot(name, cli=False, **kwargs):
+def snapshot(name, snap_name=None, cli=False, **kwargs):
if len(name) < 6:
raise Error("Name should have at least len 6")
- ret = False
xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
vport = peers['vlmcd'].portno_start
- datasize = sizeof(xseg_request_snapshot)
- with Request(xseg_ctx, vport, len(name), datasize) as req:
- req.set_op(X_SNAPSHOT)
- req.set_size(sizeof(xseg_request_snapshot))
- req.set_offset(0)
- req.set_target(name)
-
- xsnapshot = xseg_request_snapshot()
- xsnapshot.target = ""
- xsnapshot.targetlen = 0
- req.set_data(xsnapshot)
- req.submit()
- req.wait()
- ret = req.success()
- if ret:
- reply = string_at(req.get_data(xseg_reply_snapshot).
- contents.target, 64)
+ req = Request.get_snapshot_request(xseg_ctx, vport, name, snap=snap_name)
+ req.submit()
+ req.wait()
+ ret = req.success()
+ req.put()
xseg_ctx.shutdown()
+
if not ret:
raise Error("vlmc snapshot failed")
if cli:
- sys.stdout.write("Snapshot name: %s\n" % reply)
- return reply
-
+ sys.stdout.write("Snapshot name: %s\n" % snap_name)
def list_volumes(**kwargs):
if isinstance(peers['blockerm'], Sosd):
ret = False
xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
mport = peers['mapperd'].portno_start
- with Request(xseg_ctx, mport, len(name), 0) as req:
- req.set_op(X_DELETE)
- req.set_size(0)
- req.set_offset(0)
- req.set_target(name)
- req.submit()
- req.wait()
- ret = req.success()
+ req = Request.get_delete_request(xseg_ctx, mport, name)
+ req.submit()
+ req.wait()
+ ret = req.success()
xseg_ctx.shutdown()
if not ret:
raise Error("vlmc removal failed")
name = ARCHIP_PREFIX + name
- ret = False
xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
mbport = peers['blockerm'].portno_start
- with Request(xseg_ctx, mbport, len(name), 0) as req:
- req.set_op(X_ACQUIRE)
- req.set_size(0)
- req.set_offset(0)
- req.set_flags(XF_NOSYNC)
- req.set_target(name)
- req.submit()
- req.wait()
- ret = req.success()
+ ret = Request.get_acquire_request(xseg_ctx, mbport, name)
+ req.submit()
+ req.wait()
+ ret = req.success()
xseg_ctx.shutdown()
if not ret:
raise Error("vlmc lock failed")
name = ARCHIP_PREFIX + name
- ret = False
xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
mbport = peers['blockerm'].portno_start
- with Request(xseg_ctx, mbport, len(name), 0) as req:
- req.set_op(X_RELEASE)
- req.set_size(0)
- req.set_offset(0)
- req.set_target(name)
- if force:
- req.set_flags(XF_NOSYNC | XF_FORCE)
- else:
- req.set_flags(XF_NOSYNC)
- req.submit()
- req.wait()
- ret = req.success()
+ req = Request.get_release_request(xseg_ctx, mbport, name, force=force)
+ req.submit()
+ req.wait()
+ ret = req.success()
xseg_ctx.shutdown()
if not ret:
raise Error("vlmc unlock failed")
ret = False
xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
vport = peers['vlmcd'].portno_start
- with Request(xseg_ctx, vport, len(name), 0) as req:
- req.set_op(X_OPEN)
- req.set_size(0)
- req.set_offset(0)
- req.set_target(name)
- req.submit()
- req.wait()
- ret = req.success()
+ ret = Request.get_open_request(xseg_ctx, vport, name)
+ req.submit()
+ req.wait()
+ ret = req.success()
xseg_ctx.shutdown()
if not ret:
raise Error("vlmc open failed")
ret = False
xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
vport = peers['vlmcd'].portno_start
- with Request(xseg_ctx, vport, len(name), 0) as req:
- req.set_op(X_CLOSE)
- req.set_size(0)
- req.set_offset(0)
- req.set_target(name)
- req.submit()
- req.wait()
- ret = req.success()
+ ret = Request.get_close_request(xseg_ctx, vport, name)
+ req.submit()
+ req.wait()
+ ret = req.success()
xseg_ctx.shutdown()
if not ret:
raise Error("vlmc close failed")
ret = False
xseg_ctx = Xseg_ctx(config['SPEC'], config['VTOOL'])
mport = peers['mapperd'].portno_start
- with Request(xseg_ctx, mport, len(name), 0) as req:
- req.set_op(X_INFO)
- req.set_size(0)
- req.set_offset(0)
- req.set_target(name)
- req.submit()
- req.wait()
- ret = req.success()
- if ret:
- size = req.get_data(xseg_reply_info).contents.size
+ ret = Request.get_info_request(xseg_ctx, mport, name)
+ req.submit()
+ req.wait()
+ ret = req.success()
+ if ret:
+ size = req.get_data(xseg_reply_info).contents.size
xseg_ctx.shutdown()
if not ret:
raise Error("vlmc info failed")