cli.JobExecutor: Handle empty name, allow adding job IDs
[ganeti-local] / lib / mcpu.py
index 7ab7c18..7788959 100644 (file)
@@ -56,23 +56,23 @@ def _CalculateLockAttemptTimeouts():
   """Calculate timeouts for lock attempts.
 
   """
-  result = [1.0]
+  result = [constants.LOCK_ATTEMPTS_MINWAIT]
+  running_sum = result[0]
 
-  # Wait for a total of at least 150s before doing a blocking acquire
-  while sum(result) < 150.0:
+  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
+  # blocking acquire
+  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
     timeout = (result[-1] * 1.05) ** 1.25
 
-    # Cap timeout at 10 seconds. This gives other jobs a chance to run
-    # even if we're still trying to get our locks, before finally moving
-    # to a blocking acquire.
-    if timeout > 10.0:
-      timeout = 10.0
-
-    elif timeout < 0.1:
-      # Lower boundary for safety
-      timeout = 0.1
+    # Cap max timeout. This gives other jobs a chance to run even if
+    # we're still trying to get our locks, before finally moving to a
+    # blocking acquire.
+    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
+    # And also cap the lower boundary for safety
+    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
 
     result.append(timeout)
+    running_sum += timeout
 
   return result
 
@@ -144,6 +144,14 @@ class OpExecCbBase: # pylint: disable-msg=W0232
 
     """
 
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{jqueue.JobQueue.SubmitManyJobs}.
+
+    """
+    raise NotImplementedError
+
 
 def _LUNameForOpName(opname):
   """Computes the LU name for a given OpCode name.
@@ -209,6 +217,26 @@ class Processor(object):
 
     return acquired
 
+  def _ProcessResult(self, result):
+    """Examines opcode result.
+
+    If necessary, additional processing on the result is done.
+
+    """
+    if isinstance(result, cmdlib.ResultWithJobs):
+      # Submit jobs
+      job_submission = self._cbs.SubmitManyJobs(result.jobs)
+
+      # Build dictionary
+      result = result.other
+
+      assert constants.JOB_IDS_KEY not in result, \
+        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+      result[constants.JOB_IDS_KEY] = job_submission
+
+    return result
+
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -229,7 +257,7 @@ class Processor(object):
       return lu.dry_run_result
 
     try:
-      result = lu.Exec(self.Log)
+      result = self._ProcessResult(lu.Exec(self.Log))
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
                                 self.Log, result)
@@ -274,8 +302,8 @@ class Processor(object):
           # Acquiring locks
           needed_locks = lu.needed_locks[level]
 
-          acquired = self._AcquireLocks(level, needed_locks, share,
-                                        calc_timeout(), priority)
+          self._AcquireLocks(level, needed_locks, share,
+                             calc_timeout(), priority)
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -289,11 +317,7 @@ class Processor(object):
               " with another job, who added them first" % add_locks,
               errors.ECODE_FAULT)
 
-          acquired = add_locks
-
         try:
-          lu.acquired_locks[level] = acquired
-
           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
         finally:
           if level in lu.remove_locks:
@@ -427,8 +451,14 @@ class HooksMaster(object):
     self.callfn = callfn
     self.lu = lu
     self.op = lu.op
-    self.pre_env = None
-    self.pre_nodes = None
+    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
+
+    if self.lu.HPATH is None:
+      nodes = (None, None)
+    else:
+      nodes = map(frozenset, self.lu.BuildHooksNodes())
+
+    (self.pre_nodes, self.post_nodes) = nodes
 
   def _BuildEnv(self, phase):
     """Compute the environment and the target nodes.
@@ -447,34 +477,29 @@ class HooksMaster(object):
     env = {}
 
     if self.lu.HPATH is not None:
-      (lu_env, lu_nodes_pre, lu_nodes_post) = self.lu.BuildHooksEnv()
+      lu_env = self.lu.BuildHooksEnv()
       if lu_env:
-        assert not compat.any(key.upper().startswith(prefix)
-                              for key in lu_env)
+        assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
         env.update(("%s%s" % (prefix, key), value)
                    for (key, value) in lu_env.items())
-    else:
-      lu_nodes_pre = lu_nodes_post = []
 
     if phase == constants.HOOKS_PHASE_PRE:
       assert compat.all((key.startswith("GANETI_") and
                          not key.startswith("GANETI_POST_"))
                         for key in env)
 
-      # Record environment for any post-phase hooks
-      self.pre_env = env
-
     elif phase == constants.HOOKS_PHASE_POST:
       assert compat.all(key.startswith("GANETI_POST_") for key in env)
+      assert isinstance(self.pre_env, dict)
 
-      if self.pre_env:
-        assert not compat.any(key.startswith("GANETI_POST_")
-                              for key in self.pre_env)
-        env.update(self.pre_env)
+      # Merge with pre-phase environment
+      assert not compat.any(key.startswith("GANETI_POST_")
+                            for key in self.pre_env)
+      env.update(self.pre_env)
     else:
       raise AssertionError("Unknown phase '%s'" % phase)
 
-    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
+    return env
 
   def _RunWrapper(self, node_list, hpath, phase, phase_env):
     """Simple wrapper over self.callfn.
@@ -488,12 +513,14 @@ class HooksMaster(object):
       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
       "GANETI_OP_CODE": self.op.OP_ID,
-      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
       "GANETI_DATA_DIR": constants.DATA_DIR,
       "GANETI_HOOKS_PHASE": phase,
       "GANETI_HOOKS_PATH": hpath,
       }
 
+    if self.lu.HTYPE:
+      env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
+
     if cfg is not None:
       env["GANETI_CLUSTER"] = cfg.GetClusterName()
       env["GANETI_MASTER"] = cfg.GetMasterNode()
@@ -523,19 +550,16 @@ class HooksMaster(object):
     @raise errors.HooksAbort: on failure of one of the hooks
 
     """
-    (env, node_list_pre, node_list_post) = self._BuildEnv(phase)
-    if nodes is None:
-      if phase == constants.HOOKS_PHASE_PRE:
-        self.pre_nodes = (node_list_pre, node_list_post)
-        nodes = node_list_pre
-      elif phase == constants.HOOKS_PHASE_POST:
-        post_nodes = (node_list_pre, node_list_post)
-        assert self.pre_nodes == post_nodes, \
-               ("Node lists returned for post-phase hook don't match pre-phase"
-                " lists (pre %s, post %s)" % (self.pre_nodes, post_nodes))
-        nodes = node_list_post
-      else:
-        raise AssertionError("Unknown phase '%s'" % phase)
+    if phase == constants.HOOKS_PHASE_PRE:
+      if nodes is None:
+        nodes = self.pre_nodes
+      env = self.pre_env
+    elif phase == constants.HOOKS_PHASE_POST:
+      if nodes is None:
+        nodes = self.post_nodes
+      env = self._BuildEnv(phase)
+    else:
+      raise AssertionError("Unknown phase '%s'" % phase)
 
     if not nodes:
       # empty node list, we should not attempt to run this as either
@@ -586,9 +610,6 @@ class HooksMaster(object):
     top-level LI if the configuration has been updated.
 
     """
-    if self.pre_env is None:
-      raise AssertionError("Pre-phase must be run before configuration update")
-
     phase = constants.HOOKS_PHASE_POST
     hpath = constants.HOOKS_NAME_CFGUPDATE
     nodes = [self.lu.cfg.GetMasterNode()]