"""Calculate timeouts for lock attempts.
"""
- result = [1.0]
+ result = [constants.LOCK_ATTEMPTS_MINWAIT]
+ running_sum = result[0]
- # Wait for a total of at least 150s before doing a blocking acquire
- while sum(result) < 150.0:
+ # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
+ # blocking acquire
+ while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
timeout = (result[-1] * 1.05) ** 1.25
- # Cap timeout at 10 seconds. This gives other jobs a chance to run
- # even if we're still trying to get our locks, before finally moving
- # to a blocking acquire.
- if timeout > 10.0:
- timeout = 10.0
-
- elif timeout < 0.1:
- # Lower boundary for safety
- timeout = 0.1
+ # Cap max timeout. This gives other jobs a chance to run even if
+ # we're still trying to get our locks, before finally moving to a
+ # blocking acquire.
+ timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
+ # And also cap the lower boundary for safety
+ timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
result.append(timeout)
+ running_sum += timeout
return result
"""
+ def SubmitManyJobs(self, jobs):
+ """Submits jobs for processing.
+
+ See L{jqueue.JobQueue.SubmitManyJobs}.
+
+ """
+ raise NotImplementedError
+
def _LUNameForOpName(opname):
"""Computes the LU name for a given OpCode name.
return acquired
+ def _ProcessResult(self, result):
+ """
+
+ """
+ if isinstance(result, cmdlib.ResultWithJobs):
+ # Submit jobs
+ job_submission = self._cbs.SubmitManyJobs(result.jobs)
+
+ # Build dictionary
+ result = result.other
+
+ assert constants.JOB_IDS_KEY not in result, \
+ "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+ result[constants.JOB_IDS_KEY] = job_submission
+
+ return result
+
def _ExecLU(self, lu):
"""Logical Unit execution sequence.
return lu.dry_run_result
try:
- result = lu.Exec(self.Log)
+ result = self._ProcessResult(lu.Exec(self.Log))
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
self.Log, result)
self.callfn = callfn
self.lu = lu
self.op = lu.op
- self.pre_env = None
- self.post_nodes = None
+ self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
+
+ if self.lu.HPATH is None:
+ nodes = (None, None)
+ else:
+ nodes = map(frozenset, self.lu.BuildHooksNodes())
+
+ (self.pre_nodes, self.post_nodes) = nodes
def _BuildEnv(self, phase):
"""Compute the environment and the target nodes.
env = {}
if self.lu.HPATH is not None:
- (lu_env, lu_nodes_pre, lu_nodes_post) = self.lu.BuildHooksEnv()
+ lu_env = self.lu.BuildHooksEnv()
if lu_env:
- assert not compat.any(key.upper().startswith(prefix)
- for key in lu_env)
+ assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
env.update(("%s%s" % (prefix, key), value)
for (key, value) in lu_env.items())
- else:
- lu_nodes_pre = lu_nodes_post = []
if phase == constants.HOOKS_PHASE_PRE:
assert compat.all((key.startswith("GANETI_") and
not key.startswith("GANETI_POST_"))
for key in env)
- # Record environment for any post-phase hooks
- self.pre_env = env
-
elif phase == constants.HOOKS_PHASE_POST:
assert compat.all(key.startswith("GANETI_POST_") for key in env)
+ assert isinstance(self.pre_env, dict)
- if self.pre_env:
- assert not compat.any(key.startswith("GANETI_POST_")
- for key in self.pre_env)
- env.update(self.pre_env)
+ # Merge with pre-phase environment
+ assert not compat.any(key.startswith("GANETI_POST_")
+ for key in self.pre_env)
+ env.update(self.pre_env)
else:
raise AssertionError("Unknown phase '%s'" % phase)
- return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
+ return env
def _RunWrapper(self, node_list, hpath, phase, phase_env):
"""Simple wrapper over self.callfn.
"PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
"GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
"GANETI_OP_CODE": self.op.OP_ID,
- "GANETI_OBJECT_TYPE": self.lu.HTYPE,
"GANETI_DATA_DIR": constants.DATA_DIR,
"GANETI_HOOKS_PHASE": phase,
"GANETI_HOOKS_PATH": hpath,
}
+ if self.lu.HTYPE:
+ env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
+
if cfg is not None:
env["GANETI_CLUSTER"] = cfg.GetClusterName()
env["GANETI_MASTER"] = cfg.GetMasterNode()
@raise errors.HooksAbort: on failure of one of the hooks
"""
- (env, node_list_pre, node_list_post) = self._BuildEnv(phase)
- if nodes is None:
- if phase == constants.HOOKS_PHASE_PRE:
- nodes = node_list_pre
- self.post_nodes = node_list_post
- elif self.post_nodes is None:
- raise AssertionError("Pre-phase must be run before post-phase")
- elif phase == constants.HOOKS_PHASE_POST:
+ if phase == constants.HOOKS_PHASE_PRE:
+ if nodes is None:
+ nodes = self.pre_nodes
+ env = self.pre_env
+ elif phase == constants.HOOKS_PHASE_POST:
+ if nodes is None:
nodes = self.post_nodes
- else:
- raise AssertionError("Unknown phase '%s'" % phase)
+ env = self._BuildEnv(phase)
+ else:
+ raise AssertionError("Unknown phase '%s'" % phase)
if not nodes:
# empty node list, we should not attempt to run this as either
top-level LI if the configuration has been updated.
"""
- if self.pre_env is None:
- raise AssertionError("Pre-phase must be run before configuration update")
-
phase = constants.HOOKS_PHASE_POST
hpath = constants.HOOKS_NAME_CFGUPDATE
nodes = [self.lu.cfg.GetMasterNode()]