projects
/
ganeti-local
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
opcodes: document OP_DSC_FIELD in OpCode and OpCode.Summary()
[ganeti-local]
/
lib
/
mcpu.py
diff --git
a/lib/mcpu.py
b/lib/mcpu.py
index
9f3eacb
..
a724eb9
100644
(file)
--- a/
lib/mcpu.py
+++ b/
lib/mcpu.py
@@
-38,6
+38,7
@@
from ganeti import errors
from ganeti import rpc
from ganeti import cmdlib
from ganeti import locking
from ganeti import rpc
from ganeti import cmdlib
from ganeti import locking
+from ganeti import utils
class LockAcquireTimeout(Exception):
class LockAcquireTimeout(Exception):
@@
-71,60
+72,40
@@
def _CalculateLockAttemptTimeouts():
return result
return result
-class _LockAttemptTimeoutStrategy(object):
+class LockAttemptTimeoutStrategy(object):
"""Class with lock acquire timeout strategy.
"""
__slots__ = [
"""Class with lock acquire timeout strategy.
"""
__slots__ = [
- "_attempt",
+ "_timeouts",
"_random_fn",
"_random_fn",
- "_start_time",
"_time_fn",
"_time_fn",
- "_running_timeout",
]
_TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
]
_TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
- def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
+ def __init__(self, _time_fn=time.time, _random_fn=random.random):
"""Initializes this class.
"""Initializes this class.
- @type attempt: int
- @param attempt: Current attempt number
@param _time_fn: Time function for unittests
@param _random_fn: Random number generator for unittests
"""
object.__init__(self)
@param _time_fn: Time function for unittests
@param _random_fn: Random number generator for unittests
"""
object.__init__(self)
- if attempt < 0:
- raise ValueError("Attempt must be zero or positive")
-
- self._attempt = attempt
+ self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
self._time_fn = _time_fn
self._random_fn = _random_fn
self._time_fn = _time_fn
self._random_fn = _random_fn
- try:
- timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
- except IndexError:
- # No more timeouts, do blocking acquire
- timeout = None
-
- self._running_timeout = locking.RunningTimeout(timeout, False,
- _time_fn=_time_fn)
-
def NextAttempt(self):
def NextAttempt(self):
- """Returns the strategy for the next attempt.
-
- """
- return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
- _time_fn=self._time_fn,
- _random_fn=self._random_fn)
-
- def CalcRemainingTimeout(self):
- """Returns the remaining timeout.
+ """Returns the timeout for the next attempt.
"""
"""
- timeout = self._running_timeout.Remaining()
+ try:
+ timeout = self._timeouts.next()
+ except StopIteration:
+ # No more timeouts, do blocking acquire
+ timeout = None
if timeout is not None:
# Add a small variation (-/+ 5%) to timeout. This helps in situations
if timeout is not None:
# Add a small variation (-/+ 5%) to timeout. This helps in situations
@@
-173,6
+154,8
@@
class Processor(object):
opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
+ opcodes.OpQuery: cmdlib.LUQuery,
+ opcodes.OpQueryFields: cmdlib.LUQueryFields,
# node lu
opcodes.OpAddNode: cmdlib.LUAddNode,
opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
# node lu
opcodes.OpAddNode: cmdlib.LUAddNode,
opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
@@
-205,6
+188,8
@@
class Processor(object):
opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
+ # node group lu
+ opcodes.OpQueryGroups: cmdlib.LUQueryGroups,
# os lu
opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
# exports lu
# os lu
opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
# exports lu
@@
-238,7
+223,7
@@
class Processor(object):
self.rpc = rpc.RpcRunner(context.cfg)
self.hmclass = HooksMaster
self.rpc = rpc.RpcRunner(context.cfg)
self.hmclass = HooksMaster
- def _AcquireLocks(self, level, names, shared, timeout):
+ def _AcquireLocks(self, level, names, shared, timeout, priority):
"""Acquires locks via the Ganeti lock manager.
@type level: int
"""Acquires locks via the Ganeti lock manager.
@type level: int
@@
-249,13
+234,18
@@
class Processor(object):
@param shared: Whether the locks should be acquired in shared mode
@type timeout: None or float
@param timeout: Timeout for acquiring the locks
@param shared: Whether the locks should be acquired in shared mode
@type timeout: None or float
@param timeout: Timeout for acquiring the locks
+ @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+ amount of time
"""
if self._cbs:
self._cbs.CheckCancel()
acquired = self.context.glm.acquire(level, names, shared=shared,
"""
if self._cbs:
self._cbs.CheckCancel()
acquired = self.context.glm.acquire(level, names, shared=shared,
- timeout=timeout)
+ timeout=timeout, priority=priority)
+
+ if acquired is None:
+ raise LockAcquireTimeout()
return acquired
return acquired
@@
-290,7
+280,7
@@
class Processor(object):
return result
return result
- def _LockAndExecLU(self, lu, level, calc_timeout):
+ def _LockAndExecLU(self, lu, level, calc_timeout, priority):
"""Execute a Logical Unit, with the needed locks.
This is a recursive function that starts locking the given level, and
"""Execute a Logical Unit, with the needed locks.
This is a recursive function that starts locking the given level, and
@@
-325,11
+315,7
@@
class Processor(object):
needed_locks = lu.needed_locks[level]
acquired = self._AcquireLocks(level, needed_locks, share,
needed_locks = lu.needed_locks[level]
acquired = self._AcquireLocks(level, needed_locks, share,
- calc_timeout())
-
- if acquired is None:
- raise LockAcquireTimeout()
-
+ calc_timeout(), priority)
else:
# Adding locks
add_locks = lu.add_locks[level]
else:
# Adding locks
add_locks = lu.add_locks[level]
@@
-348,7
+334,7
@@
class Processor(object):
try:
lu.acquired_locks[level] = acquired
try:
lu.acquired_locks[level] = acquired
- result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+ result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
finally:
if level in lu.remove_locks:
self.context.glm.remove(level, lu.remove_locks[level])
finally:
if level in lu.remove_locks:
self.context.glm.remove(level, lu.remove_locks[level])
@@
-357,11
+343,11
@@
class Processor(object):
self.context.glm.release(level)
else:
self.context.glm.release(level)
else:
- result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+ result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
return result
return result
- def ExecOpCode(self, op, cbs, timeout=None):
+ def ExecOpCode(self, op, cbs, timeout=None, priority=None):
"""Execute an opcode.
@type op: an OpCode instance
"""Execute an opcode.
@type op: an OpCode instance
@@
-370,6
+356,8
@@
class Processor(object):
@param cbs: Runtime callbacks
@type timeout: float or None
@param timeout: Maximum time to acquire all locks, None for no timeout
@param cbs: Runtime callbacks
@type timeout: float or None
@param timeout: Maximum time to acquire all locks, None for no timeout
+ @type priority: number or None
+ @param priority: Priority for acquiring lock(s)
@raise LockAcquireTimeout: In case locks couldn't be acquired in specified
amount of time
@raise LockAcquireTimeout: In case locks couldn't be acquired in specified
amount of time
@@
-385,24
+373,24
@@
class Processor(object):
if timeout is None:
calc_timeout = lambda: None
else:
if timeout is None:
calc_timeout = lambda: None
else:
- calc_timeout = locking.RunningTimeout(timeout, False).Remaining
+ calc_timeout = utils.RunningTimeout(timeout, False).Remaining
self._cbs = cbs
try:
# Acquire the Big Ganeti Lock exclusively if this LU requires it,
# and in a shared fashion otherwise (to prevent concurrent run with
# an exclusive LU.
self._cbs = cbs
try:
# Acquire the Big Ganeti Lock exclusively if this LU requires it,
# and in a shared fashion otherwise (to prevent concurrent run with
# an exclusive LU.
- if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
- not lu_class.REQ_BGL, calc_timeout()) is None:
- raise LockAcquireTimeout()
-
+ self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+ not lu_class.REQ_BGL, calc_timeout(),
+ priority)
try:
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
try:
try:
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
try:
- return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
+ return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
+ priority)
finally:
if self._ec_id:
self.context.cfg.DropECReservations(self._ec_id)
finally:
if self._ec_id:
self.context.cfg.DropECReservations(self._ec_id)