X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/dd7f6776235601c60ce4257c710a2bc0be80a206..8d99a8bf5852330464f87b40b96d6fe80e7cdbf5:/lib/mcpu.py?ds=sidebyside diff --git a/lib/mcpu.py b/lib/mcpu.py index 7ab7c18..7788959 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -56,23 +56,23 @@ def _CalculateLockAttemptTimeouts(): """Calculate timeouts for lock attempts. """ - result = [1.0] + result = [constants.LOCK_ATTEMPTS_MINWAIT] + running_sum = result[0] - # Wait for a total of at least 150s before doing a blocking acquire - while sum(result) < 150.0: + # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a + # blocking acquire + while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: timeout = (result[-1] * 1.05) ** 1.25 - # Cap timeout at 10 seconds. This gives other jobs a chance to run - # even if we're still trying to get our locks, before finally moving - # to a blocking acquire. - if timeout > 10.0: - timeout = 10.0 - - elif timeout < 0.1: - # Lower boundary for safety - timeout = 0.1 + # Cap max timeout. This gives other jobs a chance to run even if + # we're still trying to get our locks, before finally moving to a + # blocking acquire. + timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) + # And also cap the lower boundary for safety + timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) result.append(timeout) + running_sum += timeout return result @@ -144,6 +144,14 @@ class OpExecCbBase: # pylint: disable-msg=W0232 """ + def SubmitManyJobs(self, jobs): + """Submits jobs for processing. + + See L{jqueue.JobQueue.SubmitManyJobs}. + + """ + raise NotImplementedError + def _LUNameForOpName(opname): """Computes the LU name for a given OpCode name. @@ -209,6 +217,26 @@ class Processor(object): return acquired + def _ProcessResult(self, result): + """Examines opcode result. + + If necessary, additional processing on the result is done. + + """ + if isinstance(result, cmdlib.ResultWithJobs): + # Submit jobs + job_submission = self._cbs.SubmitManyJobs(result.jobs) + + # Build dictionary + result = result.other + + assert constants.JOB_IDS_KEY not in result, \ + "Key '%s' found in additional return values" % constants.JOB_IDS_KEY + + result[constants.JOB_IDS_KEY] = job_submission + + return result + def _ExecLU(self, lu): """Logical Unit execution sequence. @@ -229,7 +257,7 @@ class Processor(object): return lu.dry_run_result try: - result = lu.Exec(self.Log) + result = self._ProcessResult(lu.Exec(self.Log)) h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, self.Log, result) @@ -274,8 +302,8 @@ class Processor(object): # Acquiring locks needed_locks = lu.needed_locks[level] - acquired = self._AcquireLocks(level, needed_locks, share, - calc_timeout(), priority) + self._AcquireLocks(level, needed_locks, share, + calc_timeout(), priority) else: # Adding locks add_locks = lu.add_locks[level] @@ -289,11 +317,7 @@ class Processor(object): " with another job, who added them first" % add_locks, errors.ECODE_FAULT) - acquired = add_locks - try: - lu.acquired_locks[level] = acquired - result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) finally: if level in lu.remove_locks: @@ -427,8 +451,14 @@ class HooksMaster(object): self.callfn = callfn self.lu = lu self.op = lu.op - self.pre_env = None - self.pre_nodes = None + self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE) + + if self.lu.HPATH is None: + nodes = (None, None) + else: + nodes = map(frozenset, self.lu.BuildHooksNodes()) + + (self.pre_nodes, self.post_nodes) = nodes def _BuildEnv(self, phase): """Compute the environment and the target nodes. @@ -447,34 +477,29 @@ class HooksMaster(object): env = {} if self.lu.HPATH is not None: - (lu_env, lu_nodes_pre, lu_nodes_post) = self.lu.BuildHooksEnv() + lu_env = self.lu.BuildHooksEnv() if lu_env: - assert not compat.any(key.upper().startswith(prefix) - for key in lu_env) + assert not compat.any(key.upper().startswith(prefix) for key in lu_env) env.update(("%s%s" % (prefix, key), value) for (key, value) in lu_env.items()) - else: - lu_nodes_pre = lu_nodes_post = [] if phase == constants.HOOKS_PHASE_PRE: assert compat.all((key.startswith("GANETI_") and not key.startswith("GANETI_POST_")) for key in env) - # Record environment for any post-phase hooks - self.pre_env = env - elif phase == constants.HOOKS_PHASE_POST: assert compat.all(key.startswith("GANETI_POST_") for key in env) + assert isinstance(self.pre_env, dict) - if self.pre_env: - assert not compat.any(key.startswith("GANETI_POST_") - for key in self.pre_env) - env.update(self.pre_env) + # Merge with pre-phase environment + assert not compat.any(key.startswith("GANETI_POST_") + for key in self.pre_env) + env.update(self.pre_env) else: raise AssertionError("Unknown phase '%s'" % phase) - return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post) + return env def _RunWrapper(self, node_list, hpath, phase, phase_env): """Simple wrapper over self.callfn. @@ -488,12 +513,14 @@ class HooksMaster(object): "PATH": "/sbin:/bin:/usr/sbin:/usr/bin", "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, "GANETI_OP_CODE": self.op.OP_ID, - "GANETI_OBJECT_TYPE": self.lu.HTYPE, "GANETI_DATA_DIR": constants.DATA_DIR, "GANETI_HOOKS_PHASE": phase, "GANETI_HOOKS_PATH": hpath, } + if self.lu.HTYPE: + env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE + if cfg is not None: env["GANETI_CLUSTER"] = cfg.GetClusterName() env["GANETI_MASTER"] = cfg.GetMasterNode() @@ -523,19 +550,16 @@ class HooksMaster(object): @raise errors.HooksAbort: on failure of one of the hooks """ - (env, node_list_pre, node_list_post) = self._BuildEnv(phase) - if nodes is None: - if phase == constants.HOOKS_PHASE_PRE: - self.pre_nodes = (node_list_pre, node_list_post) - nodes = node_list_pre - elif phase == constants.HOOKS_PHASE_POST: - post_nodes = (node_list_pre, node_list_post) - assert self.pre_nodes == post_nodes, \ - ("Node lists returned for post-phase hook don't match pre-phase" - " lists (pre %s, post %s)" % (self.pre_nodes, post_nodes)) - nodes = node_list_post - else: - raise AssertionError("Unknown phase '%s'" % phase) + if phase == constants.HOOKS_PHASE_PRE: + if nodes is None: + nodes = self.pre_nodes + env = self.pre_env + elif phase == constants.HOOKS_PHASE_POST: + if nodes is None: + nodes = self.post_nodes + env = self._BuildEnv(phase) + else: + raise AssertionError("Unknown phase '%s'" % phase) if not nodes: # empty node list, we should not attempt to run this as either @@ -586,9 +610,6 @@ class HooksMaster(object): top-level LI if the configuration has been updated. """ - if self.pre_env is None: - raise AssertionError("Pre-phase must be run before configuration update") - phase = constants.HOOKS_PHASE_POST hpath = constants.HOOKS_NAME_CFGUPDATE nodes = [self.lu.cfg.GetMasterNode()]