Add first implementation of generic storage unit framework
[ganeti-local] / lib / cmdlib.py
index b19efa6..ec361c8 100644 (file)
@@ -367,6 +367,13 @@ class Tasklet:
     - Implement Exec
 
   """
+  def __init__(self, lu):
+    self.lu = lu
+
+    # Shortcuts
+    self.cfg = lu.cfg
+    self.rpc = lu.rpc
+
   def CheckPrereq(self):
     """Check prerequisites for this tasklets.
 
@@ -699,6 +706,19 @@ def _CheckInstanceBridgesExist(lu, instance, node=None):
   _CheckNicsBridgesExist(lu, instance.nics, node)
 
 
+def _GetNodeSecondaryInstances(cfg, node_name):
+  """Returns secondary instances on a node.
+
+  """
+  instances = []
+
+  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
+    if node_name in inst.secondary_nodes:
+      instances.append(inst)
+
+  return instances
+
+
 class LUDestroyCluster(NoHooksLU):
   """Logical unit for destroying the cluster.
 
@@ -3892,9 +3912,14 @@ class LUMigrateInstance(LogicalUnit):
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+
     self.needed_locks[locking.LEVEL_NODE] = []
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
+    self._migrater = TLMigrateInstance(self, self.op.instance_name,
+                                       self.op.live, self.op.cleanup)
+    self.tasklets.append(self._migrater)
+
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
@@ -3905,12 +3930,26 @@ class LUMigrateInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    instance = self._migrater.instance
+    env = _BuildInstanceHookEnvByObject(self, instance)
     env["MIGRATE_LIVE"] = self.op.live
     env["MIGRATE_CLEANUP"] = self.op.cleanup
-    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
+    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
     return env, nl, nl
 
+
+class TLMigrateInstance(Tasklet):
+  def __init__(self, lu, instance_name, live, cleanup):
+    """Initializes this class.
+
+    """
+    Tasklet.__init__(self, lu)
+
+    # Parameters
+    self.instance_name = instance_name
+    self.live = live
+    self.cleanup = cleanup
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -3918,10 +3957,10 @@ class LUMigrateInstance(LogicalUnit):
 
     """
     instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
+      self.cfg.ExpandInstanceName(self.instance_name))
     if instance is None:
       raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+                                 self.instance_name)
 
     if instance.disk_template != constants.DT_DRBD8:
       raise errors.OpPrereqError("Instance's disk layout is not"
@@ -3943,7 +3982,7 @@ class LUMigrateInstance(LogicalUnit):
     # check bridge existance
     _CheckInstanceBridgesExist(self, instance, node=target_node)
 
-    if not self.op.cleanup:
+    if not self.cleanup:
       _CheckNodeNotDrained(self, target_node)
       result = self.rpc.call_instance_migratable(instance.primary_node,
                                                  instance)
@@ -4090,10 +4129,10 @@ class LUMigrateInstance(LogicalUnit):
       self._GoReconnect(False)
       self._WaitUntilSync()
     except errors.OpExecError, err:
-      self.LogWarning("Migration failed and I can't reconnect the"
-                      " drives: error '%s'\n"
-                      "Please look and recover the instance status" %
-                      str(err))
+      self.lu.LogWarning("Migration failed and I can't reconnect the"
+                         " drives: error '%s'\n"
+                         "Please look and recover the instance status" %
+                         str(err))
 
   def _AbortMigration(self):
     """Call the hypervisor code to abort a started migration.
@@ -4173,7 +4212,7 @@ class LUMigrateInstance(LogicalUnit):
     time.sleep(10)
     result = self.rpc.call_instance_migrate(source_node, instance,
                                             self.nodes_ip[target_node],
-                                            self.op.live)
+                                            self.live)
     msg = result.fail_msg
     if msg:
       logging.error("Instance migration failed, trying to revert"
@@ -4220,7 +4259,8 @@ class LUMigrateInstance(LogicalUnit):
       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
       }
-    if self.op.cleanup:
+
+    if self.cleanup:
       return self._ExecCleanup()
     else:
       return self._ExecMigration()
@@ -5177,8 +5217,8 @@ class LUReplaceDisks(LogicalUnit):
     if not hasattr(self.op, "iallocator"):
       self.op.iallocator = None
 
-    _DiskReplacer.CheckArguments(self.op.mode, self.op.remote_node,
-                                 self.op.iallocator)
+    TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
+                                  self.op.iallocator)
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
@@ -5205,9 +5245,11 @@ class LUReplaceDisks(LogicalUnit):
       self.needed_locks[locking.LEVEL_NODE] = []
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
 
-    self.replacer = _DiskReplacer(self, self.op.instance_name, self.op.mode,
-                                  self.op.iallocator, self.op.remote_node,
-                                  self.op.disks)
+    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
+                                   self.op.iallocator, self.op.remote_node,
+                                   self.op.disks)
+
+    self.tasklets.append(self.replacer)
 
   def DeclareLocks(self, level):
     # If we're not already locking all nodes in the set we have to declare the
@@ -5237,24 +5279,99 @@ class LUReplaceDisks(LogicalUnit):
       nl.append(self.op.remote_node)
     return env, nl, nl
 
-  def CheckPrereq(self):
-    """Check prerequisites.
 
-    This checks that the instance is in the cluster.
+class LUEvacuateNode(LogicalUnit):
+  """Relocate the secondary instances from a node.
 
-    """
-    self.replacer.CheckPrereq()
+  """
+  HPATH = "node-evacuate"
+  HTYPE = constants.HTYPE_NODE
+  _OP_REQP = ["node_name"]
+  REQ_BGL = False
 
-  def Exec(self, feedback_fn):
-    """Execute disk replacement.
+  def CheckArguments(self):
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+    if not hasattr(self.op, "iallocator"):
+      self.op.iallocator = None
 
-    This dispatches the disk replacement to the appropriate handler.
+    TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
+                                  self.op.remote_node,
+                                  self.op.iallocator)
+
+  def ExpandNames(self):
+    self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if self.op.node_name is None:
+      raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
+
+    self.needed_locks = {}
+
+    # Declare node locks
+    if self.op.iallocator is not None:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+    elif self.op.remote_node is not None:
+      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
+      if remote_node is None:
+        raise errors.OpPrereqError("Node '%s' not known" %
+                                   self.op.remote_node)
+
+      self.op.remote_node = remote_node
+
+      # Warning: do not remove the locking of the new secondary here
+      # unless DRBD8.AddChildren is changed to work in parallel;
+      # currently it doesn't since parallel invocations of
+      # FindUnusedMinor will conflict
+      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+    else:
+      raise errors.OpPrereqError("Invalid parameters")
+
+    # Create tasklets for replacing disks for all secondary instances on this
+    # node
+    names = []
+
+    for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
+      logging.debug("Replacing disks for instance %s", inst.name)
+      names.append(inst.name)
+
+      replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
+                                self.op.iallocator, self.op.remote_node, [])
+      self.tasklets.append(replacer)
+
+    self.instance_names = names
+
+    # Declare instance locks
+    self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
+
+  def DeclareLocks(self, level):
+    # If we're not already locking all nodes in the set we have to declare the
+    # instance's primary/secondary nodes.
+    if (level == locking.LEVEL_NODE and
+        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
 
     """
-    self.replacer.Exec()
+    env = {
+      "NODE_NAME": self.op.node_name,
+      }
+
+    nl = [self.cfg.GetMasterNode()]
+
+    if self.op.remote_node is not None:
+      env["NEW_SECONDARY"] = self.op.remote_node
+      nl.append(self.op.remote_node)
 
+    return (env, nl, nl)
 
-class _DiskReplacer:
+
+class TLReplaceDisks(Tasklet):
   """Replaces disks for an instance.
 
   Note: Locking is not within the scope of this class.
@@ -5265,18 +5382,15 @@ class _DiskReplacer:
     """Initializes this class.
 
     """
+    Tasklet.__init__(self, lu)
+
     # Parameters
-    self.lu = lu
     self.instance_name = instance_name
     self.mode = mode
     self.iallocator_name = iallocator_name
     self.remote_node = remote_node
     self.disks = disks
 
-    # Shortcuts
-    self.cfg = lu.cfg
-    self.rpc = lu.rpc
-
     # Runtime data
     self.instance = None
     self.new_node = None
@@ -5287,6 +5401,9 @@ class _DiskReplacer:
 
   @staticmethod
   def CheckArguments(mode, remote_node, iallocator):
+    """Helper function for users of this class.
+
+    """
     # check for valid parameter combination
     cnt = [remote_node, iallocator].count(None)
     if mode == constants.REPLACE_DISK_CHG:
@@ -5415,12 +5532,14 @@ class _DiskReplacer:
 
     self.node_secondary_ip = node_2nd_ip
 
-  def Exec(self):
+  def Exec(self, feedback_fn):
     """Execute disk replacement.
 
     This dispatches the disk replacement to the appropriate handler.
 
     """
+    feedback_fn("Replacing disks for %s" % self.instance.name)
+
     activate_disks = (not self.instance.admin_up)
 
     # Activate the instance disks if we're replacing them on a down instance