cmdlib: Change tasklet logging to debug level
[ganeti-local] / lib / cmdlib.py
index 3ffecb8..2cb9a7c 100644 (file)
@@ -47,8 +47,8 @@ class LogicalUnit(object):
 
   Subclasses must follow these rules:
     - implement ExpandNames
-    - implement CheckPrereq
-    - implement Exec
+    - implement CheckPrereq (except when tasklets are used)
+    - implement Exec (except when tasklets are used)
     - implement BuildHooksEnv
     - redefine HPATH and HTYPE
     - optionally redefine their run requirements:
@@ -93,11 +93,15 @@ class LogicalUnit(object):
     # support for dry-run
     self.dry_run_result = None
 
+    # Tasklets
+    self.tasklets = None
+
     for attr_name in self._OP_REQP:
       attr_val = getattr(op, attr_name, None)
       if attr_val is None:
         raise errors.OpPrereqError("Required parameter '%s' missing" %
                                    attr_name)
+
     self.CheckArguments()
 
   def __GetSSH(self):
@@ -149,6 +153,10 @@ class LogicalUnit(object):
     level you can modify self.share_locks, setting a true value (usually 1) for
     that level. By default locks are not shared.
 
+    This function can also define a list of tasklets, which then will be
+    executed in order instead of the usual LU-level CheckPrereq and Exec
+    functions, if those are not defined by the LU.
+
     Examples::
 
       # Acquire all nodes and one instance
@@ -205,7 +213,13 @@ class LogicalUnit(object):
     their canonical form if it hasn't been done by ExpandNames before.
 
     """
-    raise NotImplementedError
+    if self.tasklets is not None:
+      for (idx, tl) in enumerate(self.tasklets):
+        logging.debug("Checking prerequisites for tasklet %s/%s",
+                      idx + 1, len(self.tasklets))
+        tl.CheckPrereq()
+    else:
+      raise NotImplementedError
 
   def Exec(self, feedback_fn):
     """Execute the LU.
@@ -215,7 +229,12 @@ class LogicalUnit(object):
     code, or expected.
 
     """
-    raise NotImplementedError
+    if self.tasklets is not None:
+      for (idx, tl) in enumerate(self.tasklets):
+        logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
+        tl.Exec(feedback_fn)
+    else:
+      raise NotImplementedError
 
   def BuildHooksEnv(self):
     """Build hooks environment for this LU.
@@ -339,6 +358,52 @@ class NoHooksLU(LogicalUnit):
   HTYPE = None
 
 
+class Tasklet:
+  """Tasklet base class.
+
+  Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
+  they can mix legacy code with tasklets. Locking needs to be done in the LU,
+  tasklets know nothing about locks.
+
+  Subclasses must follow these rules:
+    - Implement CheckPrereq
+    - Implement Exec
+
+  """
+  def __init__(self, lu):
+    self.lu = lu
+
+    # Shortcuts
+    self.cfg = lu.cfg
+    self.rpc = lu.rpc
+
+  def CheckPrereq(self):
+    """Check prerequisites for this tasklets.
+
+    This method should check whether the prerequisites for the execution of
+    this tasklet are fulfilled. It can do internode communication, but it
+    should be idempotent - no cluster or system changes are allowed.
+
+    The method should raise errors.OpPrereqError in case something is not
+    fulfilled. Its return value is ignored.
+
+    This method should also update all parameters to their canonical form if it
+    hasn't been done before.
+
+    """
+    raise NotImplementedError
+
+  def Exec(self, feedback_fn):
+    """Execute the tasklet.
+
+    This method should implement the actual work. It should raise
+    errors.OpExecError for failures that are somewhat dealt with in code, or
+    expected.
+
+    """
+    raise NotImplementedError
+
+
 def _GetWantedNodes(lu, nodes):
   """Returns list of checked and expanded node names.
 
@@ -644,6 +709,32 @@ def _CheckInstanceBridgesExist(lu, instance, node=None):
   _CheckNicsBridgesExist(lu, instance.nics, node)
 
 
+def _GetNodePrimaryInstances(cfg, node_name):
+  """Returns primary instances on a node.
+
+  """
+  instances = []
+
+  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
+    if node_name == inst.primary_node:
+      instances.append(inst)
+
+  return instances
+
+
+def _GetNodeSecondaryInstances(cfg, node_name):
+  """Returns secondary instances on a node.
+
+  """
+  instances = []
+
+  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
+    if node_name in inst.secondary_nodes:
+      instances.append(inst)
+
+  return instances
+
+
 class LUDestroyCluster(NoHooksLU):
   """Logical unit for destroying the cluster.
 
@@ -3837,9 +3928,14 @@ class LUMigrateInstance(LogicalUnit):
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+
     self.needed_locks[locking.LEVEL_NODE] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+    self._migrater = TLMigrateInstance(self, self.op.instance_name,
+                                       self.op.live, self.op.cleanup)
+    self.tasklets = [self._migrater]
+
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
@@ -3850,12 +3946,80 @@ class LUMigrateInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    instance = self._migrater.instance
+    env = _BuildInstanceHookEnvByObject(self, instance)
     env["MIGRATE_LIVE"] = self.op.live
     env["MIGRATE_CLEANUP"] = self.op.cleanup
-    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
     return env, nl, nl
 
+
+class LUMigrateNode(LogicalUnit):
+  """Migrate all instances from a node.
+
+  """
+  HPATH = "node-migrate"
+  HTYPE = constants.HTYPE_NODE
+  _OP_REQP = ["node_name", "live"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if self.op.node_name is None:
+      raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+    self.needed_locks = {
+      locking.LEVEL_NODE: [self.op.node_name],
+      }
+
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+    # Create tasklets for migrating instances for all instances on this node
+    names = []
+    tasklets = []
+
+    for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
+      logging.debug("Migrating instance %s", inst.name)
+      names.append(inst.name)
+
+      tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
+
+    self.tasklets = tasklets
+
+    # Declare instance locks
+    self.needed_locks[locking.LEVEL_INSTANCE] = names
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
+
+    """
+    env = {
+      "NODE_NAME": self.op.node_name,
+      }
+
+    nl = [self.cfg.GetMasterNode()]
+
+    return (env, nl, nl)
+
+
+class TLMigrateInstance(Tasklet):
+  def __init__(self, lu, instance_name, live, cleanup):
+    """Initializes this class.
+
+    """
+    Tasklet.__init__(self, lu)
+
+    # Parameters
+    self.instance_name = instance_name
+    self.live = live
+    self.cleanup = cleanup
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3863,10 +4027,10 @@ class LUMigrateInstance(LogicalUnit):
 
     """
     instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
+      self.cfg.ExpandInstanceName(self.instance_name))
     if instance is None:
       raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+                                 self.instance_name)
 
     if instance.disk_template != constants.DT_DRBD8:
       raise errors.OpPrereqError("Instance's disk layout is not"
@@ -3888,7 +4052,7 @@ class LUMigrateInstance(LogicalUnit):
     # check bridge existance
     _CheckInstanceBridgesExist(self, instance, node=target_node)
 
-    if not self.op.cleanup:
+    if not self.cleanup:
       _CheckNodeNotDrained(self, target_node)
       result = self.rpc.call_instance_migratable(instance.primary_node,
                                                  instance)
@@ -4035,10 +4199,10 @@ class LUMigrateInstance(LogicalUnit):
       self._GoReconnect(False)
       self._WaitUntilSync()
     except errors.OpExecError, err:
-      self.LogWarning("Migration failed and I can't reconnect the"
-                      " drives: error '%s'\n"
-                      "Please look and recover the instance status" %
-                      str(err))
+      self.lu.LogWarning("Migration failed and I can't reconnect the"
+                         " drives: error '%s'\n"
+                         "Please look and recover the instance status" %
+                         str(err))
 
   def _AbortMigration(self):
     """Call the hypervisor code to abort a started migration.
@@ -4118,7 +4282,7 @@ class LUMigrateInstance(LogicalUnit):
     time.sleep(10)
     result = self.rpc.call_instance_migrate(source_node, instance,
                                             self.nodes_ip[target_node],
-                                            self.op.live)
+                                            self.live)
     msg = result.fail_msg
     if msg:
       logging.error("Instance migration failed, trying to revert"
@@ -4156,6 +4320,8 @@ class LUMigrateInstance(LogicalUnit):
     """Perform the migration.
 
     """
+    feedback_fn("Migrating instance %s" % self.instance.name)
+
     self.feedback_fn = feedback_fn
 
     self.source_node = self.instance.primary_node
@@ -4165,7 +4331,8 @@ class LUMigrateInstance(LogicalUnit):
       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
       }
-    if self.op.cleanup:
+
+    if self.cleanup:
       return self._ExecCleanup()
     else:
       return self._ExecMigration()
@@ -5122,8 +5289,8 @@ class LUReplaceDisks(LogicalUnit):
     if not hasattr(self.op, "iallocator"):
       self.op.iallocator = None
 
-    _DiskReplacer.CheckArguments(self.op.mode, self.op.remote_node,
-                                 self.op.iallocator)
+    TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
+                                  self.op.iallocator)
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
@@ -5150,9 +5317,11 @@ class LUReplaceDisks(LogicalUnit):
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
-    self.replacer = _DiskReplacer(self, self.op.instance_name, self.op.mode,
-                                  self.op.iallocator, self.op.remote_node,
-                                  self.op.disks)
+    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+                                   self.op.iallocator, self.op.remote_node,
+                                   self.op.disks)
+
+    self.tasklets = [self.replacer]
 
   def DeclareLocks(self, level):
     # If we're not already locking all nodes in the set we have to declare the
@@ -5182,24 +5351,101 @@ class LUReplaceDisks(LogicalUnit):
       nl.append(self.op.remote_node)
     return env, nl, nl
 
-  def CheckPrereq(self):
-    """Check prerequisites.
 
-    This checks that the instance is in the cluster.
+class LUEvacuateNode(LogicalUnit):
+  """Relocate the secondary instances from a node.
 
-    """
-    self.replacer.CheckPrereq()
+  """
+  HPATH = "node-evacuate"
+  HTYPE = constants.HTYPE_NODE
+  _OP_REQP = ["node_name"]
+  REQ_BGL = False
 
-  def Exec(self, feedback_fn):
-    """Execute disk replacement.
+  def CheckArguments(self):
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+    if not hasattr(self.op, "iallocator"):
+      self.op.iallocator = None
 
-    This dispatches the disk replacement to the appropriate handler.
+    TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
+                                  self.op.remote_node,
+                                  self.op.iallocator)
+
+  def ExpandNames(self):
+    self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if self.op.node_name is None:
+      raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+    self.needed_locks = {}
+
+    # Declare node locks
+    if self.op.iallocator is not None:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+    elif self.op.remote_node is not None:
+      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
+      if remote_node is None:
+        raise errors.OpPrereqError("Node '%s' not known" %
+                                   self.op.remote_node)
+
+      self.op.remote_node = remote_node
+
+      # Warning: do not remove the locking of the new secondary here
+      # unless DRBD8.AddChildren is changed to work in parallel;
+      # currently it doesn't since parallel invocations of
+      # FindUnusedMinor will conflict
+      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+    else:
+      raise errors.OpPrereqError("Invalid parameters")
+
+    # Create tasklets for replacing disks for all secondary instances on this
+    # node
+    names = []
+    tasklets = []
+
+    for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
+      logging.debug("Replacing disks for instance %s", inst.name)
+      names.append(inst.name)
+
+      replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
+                                self.op.iallocator, self.op.remote_node, [])
+      tasklets.append(replacer)
+
+    self.tasklets = tasklets
+    self.instance_names = names
+
+    # Declare instance locks
+    self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
+
+  def DeclareLocks(self, level):
+    # If we're not already locking all nodes in the set we have to declare the
+    # instance's primary/secondary nodes.
+    if (level == locking.LEVEL_NODE and
+        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
 
     """
-    self.replacer.Exec()
+    env = {
+      "NODE_NAME": self.op.node_name,
+      }
+
+    nl = [self.cfg.GetMasterNode()]
+
+    if self.op.remote_node is not None:
+      env["NEW_SECONDARY"] = self.op.remote_node
+      nl.append(self.op.remote_node)
 
+    return (env, nl, nl)
 
-class _DiskReplacer:
+
+class TLReplaceDisks(Tasklet):
   """Replaces disks for an instance.
 
   Note: Locking is not within the scope of this class.
@@ -5210,18 +5456,15 @@ class _DiskReplacer:
     """Initializes this class.
 
     """
+    Tasklet.__init__(self, lu)
+
     # Parameters
-    self.lu = lu
     self.instance_name = instance_name
     self.mode = mode
     self.iallocator_name = iallocator_name
     self.remote_node = remote_node
     self.disks = disks
 
-    # Shortcuts
-    self.cfg = lu.cfg
-    self.rpc = lu.rpc
-
     # Runtime data
     self.instance = None
     self.new_node = None
@@ -5232,6 +5475,9 @@ class _DiskReplacer:
 
   @staticmethod
   def CheckArguments(mode, remote_node, iallocator):
+    """Helper function for users of this class.
+
+    """
     # check for valid parameter combination
     cnt = [remote_node, iallocator].count(None)
     if mode == constants.REPLACE_DISK_CHG:
@@ -5360,12 +5606,14 @@ class _DiskReplacer:
 
     self.node_secondary_ip = node_2nd_ip
 
-  def Exec(self):
+  def Exec(self, feedback_fn):
     """Execute disk replacement.
 
     This dispatches the disk replacement to the appropriate handler.
 
     """
+    feedback_fn("Replacing disks for %s" % self.instance.name)
+
     activate_disks = (not self.instance.admin_up)
 
     # Activate the instance disks if we're replacing them on a down instance