Add OPMoveInstance and LUMoveInstance
[ganeti-local] / lib / cmdlib.py
index ceb8871..93f9e4f 100644 (file)
@@ -711,30 +711,32 @@ def _CheckInstanceBridgesExist(lu, instance, node=None):
   _CheckNicsBridgesExist(lu, instance.nics, node)
 
 
-def _GetNodePrimaryInstances(cfg, node_name):
-  """Returns primary instances on a node.
+def _GetNodeInstancesInner(cfg, fn):
+  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
+
+
+def _GetNodeInstances(cfg, node_name):
+  """Returns a list of all primary and secondary instances on a node.
 
   """
-  instances = []
 
-  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
-    if node_name == inst.primary_node:
-      instances.append(inst)
+  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
 
-  return instances
+
+def _GetNodePrimaryInstances(cfg, node_name):
+  """Returns primary instances on a node.
+
+  """
+  return _GetNodeInstancesInner(cfg,
+                                lambda inst: node_name == inst.primary_node)
 
 
 def _GetNodeSecondaryInstances(cfg, node_name):
   """Returns secondary instances on a node.
 
   """
-  instances = []
-
-  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
-    if node_name in inst.secondary_nodes:
-      instances.append(inst)
-
-  return instances
+  return _GetNodeInstancesInner(cfg,
+                                lambda inst: node_name in inst.secondary_nodes)
 
 
 def _GetStorageTypeArgs(cfg, storage_type):
@@ -2222,7 +2224,7 @@ class LUQueryNodes(NoHooksLU):
     "name", "pinst_cnt", "sinst_cnt",
     "pinst_list", "sinst_list",
     "pip", "sip", "tags",
-    "serial_no",
+    "serial_no", "ctime", "mtime",
     "master_candidate",
     "master",
     "offline",
@@ -2346,6 +2348,10 @@ class LUQueryNodes(NoHooksLU):
           val = list(node.GetTags())
         elif field == "serial_no":
           val = node.serial_no
+        elif field == "ctime":
+          val = node.ctime
+        elif field == "mtime":
+          val = node.mtime
         elif field == "master_candidate":
           val = node.master_candidate
         elif field == "master":
@@ -3040,6 +3046,8 @@ class LUQueryClusterInfo(NoHooksLU):
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
       "file_storage_dir": cluster.file_storage_dir,
+      "ctime": cluster.ctime,
+      "mtime": cluster.mtime,
       }
 
     return result
@@ -3917,7 +3925,9 @@ class LUQueryInstances(NoHooksLU):
                                     r"(nic)\.(bridge)/([0-9]+)",
                                     r"(nic)\.(macs|ips|modes|links|bridges)",
                                     r"(disk|nic)\.(count)",
-                                    "serial_no", "hypervisor", "hvparams",] +
+                                    "serial_no", "hypervisor", "hvparams",
+                                    "ctime", "mtime",
+                                    ] +
                                   ["hv/%s" % name
                                    for name in constants.HVS_PARAMETERS] +
                                   ["be/%s" % name
@@ -4102,6 +4112,10 @@ class LUQueryInstances(NoHooksLU):
           val = list(instance.GetTags())
         elif field == "serial_no":
           val = instance.serial_no
+        elif field == "ctime":
+          val = instance.ctime
+        elif field == "mtime":
+          val = instance.mtime
         elif field == "network_port":
           val = instance.network_port
         elif field == "hypervisor":
@@ -4355,6 +4369,181 @@ class LUMigrateInstance(LogicalUnit):
     return env, nl, nl
 
 
+class LUMoveInstance(LogicalUnit):
+  """Move an instance by data-copying.
+
+  """
+  HPATH = "instance-move"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "target_node"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    target_node = self.cfg.ExpandNodeName(self.op.target_node)
+    if target_node is None:
+      raise errors.OpPrereqError("Node '%s' not known" %
+                                  self.op.target_node)
+    self.op.target_node = target_node
+    self.needed_locks[locking.LEVEL_NODE] = [target_node]
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes(primary_only=True)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    env = {
+      "TARGET_NODE": self.op.target_node,
+      }
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
+                                       self.op.target_node]
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+
+    node = self.cfg.GetNodeInfo(self.op.target_node)
+    assert node is not None, \
+      "Cannot retrieve locked node %s" % self.op.target_node
+
+    self.target_node = target_node = node.name
+
+    if target_node == instance.primary_node:
+      raise errors.OpPrereqError("Instance %s is already on the node %s" %
+                                 (instance.name, target_node))
+
+    bep = self.cfg.GetClusterInfo().FillBE(instance)
+
+    for idx, dsk in enumerate(instance.disks):
+      if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
+        raise errors.OpPrereqError("Instance disk %d has a complex layout,"
+                                   " cannot copy")
+
+    _CheckNodeOnline(self, target_node)
+    _CheckNodeNotDrained(self, target_node)
+
+    if instance.admin_up:
+      # check memory requirements on the secondary node
+      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
+                           instance.name, bep[constants.BE_MEMORY],
+                           instance.hypervisor)
+    else:
+      self.LogInfo("Not checking memory on the secondary node as"
+                   " instance will not be started")
+
+    # check bridge existance
+    _CheckInstanceBridgesExist(self, instance, node=target_node)
+
+  def Exec(self, feedback_fn):
+    """Move an instance.
+
+    The move is done by shutting it down on its present node, copying
+    the data over (slow) and starting it on the new node.
+
+    """
+    instance = self.instance
+
+    source_node = instance.primary_node
+    target_node = self.target_node
+
+    self.LogInfo("Shutting down instance %s on source node %s",
+                 instance.name, source_node)
+
+    result = self.rpc.call_instance_shutdown(source_node, instance)
+    msg = result.fail_msg
+    if msg:
+      if self.op.ignore_consistency:
+        self.proc.LogWarning("Could not shutdown instance %s on node %s."
+                             " Proceeding anyway. Please make sure node"
+                             " %s is down. Error details: %s",
+                             instance.name, source_node, source_node, msg)
+      else:
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, source_node, msg))
+
+    # create the target disks
+    try:
+      _CreateDisks(self, instance, target_node=target_node)
+    except errors.OpExecError:
+      self.LogWarning("Device creation failed, reverting...")
+      try:
+        _RemoveDisks(self, instance, target_node=target_node)
+      finally:
+        self.cfg.ReleaseDRBDMinors(instance.name)
+        raise
+
+    cluster_name = self.cfg.GetClusterInfo().cluster_name
+
+    errs = []
+    # activate, get path, copy the data over
+    for idx, disk in enumerate(instance.disks):
+      self.LogInfo("Copying data for disk %d", idx)
+      result = self.rpc.call_blockdev_assemble(target_node, disk,
+                                               instance.name, True)
+      if result.fail_msg:
+        self.LogWarning("Can't assemble newly created disk %d: %s",
+                        idx, result.fail_msg)
+        errs.append(result.fail_msg)
+        break
+      dev_path = result.payload
+      result = self.rpc.call_blockdev_export(source_node, disk,
+                                             target_node, dev_path,
+                                             cluster_name)
+      if result.fail_msg:
+        self.LogWarning("Can't copy data over for disk %d: %s",
+                        idx, result.fail_msg)
+        errs.append(result.fail_msg)
+        break
+
+    if errs:
+      self.LogWarning("Some disks failed to copy, aborting")
+      try:
+        _RemoveDisks(self, instance, target_node=target_node)
+      finally:
+        self.cfg.ReleaseDRBDMinors(instance.name)
+        raise errors.OpExecError("Errors during disk copy: %s" %
+                                 (",".join(errs),))
+
+    instance.primary_node = target_node
+    self.cfg.Update(instance)
+
+    self.LogInfo("Removing the disks on the original node")
+    _RemoveDisks(self, instance, target_node=source_node)
+
+    # Only start the instance if it's marked as up
+    if instance.admin_up:
+      self.LogInfo("Starting instance %s on node %s",
+                   instance.name, target_node)
+
+      disks_ok, _ = _AssembleInstanceDisks(self, instance,
+                                           ignore_secondaries=True)
+      if not disks_ok:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Can't activate the instance's disks")
+
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
+      msg = result.fail_msg
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
+
+
 class LUMigrateNode(LogicalUnit):
   """Migrate all instances from a node.
 
@@ -4918,7 +5107,7 @@ def _GetInstanceInfoText(instance):
   return "originstname+%s" % instance.name
 
 
-def _CreateDisks(lu, instance, to_skip=None):
+def _CreateDisks(lu, instance, to_skip=None, target_node=None):
   """Create all disks for an instance.
 
   This abstracts away some work from AddInstance.
@@ -4929,12 +5118,19 @@ def _CreateDisks(lu, instance, to_skip=None):
   @param instance: the instance whose disks we should create
   @type to_skip: list
   @param to_skip: list of indices to skip
+  @type target_node: string
+  @param target_node: if passed, overrides the target node for creation
   @rtype: boolean
   @return: the success of the creation
 
   """
   info = _GetInstanceInfoText(instance)
-  pnode = instance.primary_node
+  if target_node is None:
+    pnode = instance.primary_node
+    all_nodes = instance.all_nodes
+  else:
+    pnode = target_node
+    all_nodes = [pnode]
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
@@ -4951,12 +5147,12 @@ def _CreateDisks(lu, instance, to_skip=None):
     logging.info("Creating volume %s for instance %s",
                  device.iv_name, instance.name)
     #HARDCODE
-    for node in instance.all_nodes:
+    for node in all_nodes:
       f_create = node == pnode
       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
 
 
-def _RemoveDisks(lu, instance):
+def _RemoveDisks(lu, instance, target_node=None):
   """Remove all disks for an instance.
 
   This abstracts away some work from `AddInstance()` and
@@ -4968,6 +5164,8 @@ def _RemoveDisks(lu, instance):
   @param lu: the logical unit on whose behalf we execute
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we should remove
+  @type target_node: string
+  @param target_node: used to override the node on which to remove the disks
   @rtype: boolean
   @return: the success of the removal
 
@@ -4976,7 +5174,11 @@ def _RemoveDisks(lu, instance):
 
   all_result = True
   for device in instance.disks:
-    for node, disk in device.ComputeNodeTree(instance.primary_node):
+    if target_node:
+      edata = [(target_node, device)]
+    else:
+      edata = device.ComputeNodeTree(instance.primary_node)
+    for node, disk in edata:
       lu.cfg.SetDiskID(disk, node)
       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
       if msg:
@@ -4986,12 +5188,14 @@ def _RemoveDisks(lu, instance):
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
-                                                 file_storage_dir)
-    msg = result.fail_msg
-    if msg:
+    if target_node is node:
+      tgt = instance.primary_node
+    else:
+      tgt = instance.target_node
+    result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
+    if result.fail_msg:
       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
-                    file_storage_dir, instance.primary_node, msg)
+                    file_storage_dir, instance.primary_node, result.fail_msg)
       all_result = False
 
   return all_result
@@ -6439,6 +6643,62 @@ class TLReplaceDisks(Tasklet):
     self._RemoveOldStorage(self.target_node, iv_names)
 
 
+class LURepairNodeStorage(NoHooksLU):
+  """Repairs the volume group on a node.
+
+  """
+  _OP_REQP = ["node_name"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if node_name is None:
+      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
+
+    self.op.node_name = node_name
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: [self.op.node_name],
+      }
+
+  def _CheckFaultyDisks(self, instance, node_name):
+    if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
+                                node_name, True):
+      raise errors.OpPrereqError("Instance '%s' has faulty disks on"
+                                 " node '%s'" % (inst.name, node_name))
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    storage_type = self.op.storage_type
+
+    if (constants.SO_FIX_CONSISTENCY not in
+        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
+      raise errors.OpPrereqError("Storage units of type '%s' can not be"
+                                 " repaired" % storage_type)
+
+    # Check whether any instance on this node has faulty disks
+    for inst in _GetNodeInstances(self.cfg, self.op.node_name):
+      check_nodes = set(inst.all_nodes)
+      check_nodes.discard(self.op.node_name)
+      for inst_node_name in check_nodes:
+        self._CheckFaultyDisks(inst, inst_node_name)
+
+  def Exec(self, feedback_fn):
+    feedback_fn("Repairing storage unit '%s' on %s ..." %
+                (self.op.name, self.op.node_name))
+
+    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+    result = self.rpc.call_storage_execute(self.op.node_name,
+                                           self.op.storage_type, st_args,
+                                           self.op.name,
+                                           constants.SO_FIX_CONSISTENCY)
+    result.Raise("Failed to repair storage unit '%s' on %s" %
+                 (self.op.name, self.op.node_name))
+
+
 class LUGrowDisk(LogicalUnit):
   """Grow a disk of an instance.
 
@@ -6579,7 +6839,7 @@ class LUQueryInstanceData(NoHooksLU):
     """Returns the status of a block device
 
     """
-    if self.op.static:
+    if self.op.static or not node:
       return None
 
     self.cfg.SetDiskID(dev, node)
@@ -6676,6 +6936,9 @@ class LUQueryInstanceData(NoHooksLU):
         "hv_actual": cluster.FillHV(instance),
         "be_instance": instance.beparams,
         "be_actual": cluster.FillBE(instance),
+        "serial_no": instance.serial_no,
+        "mtime": instance.mtime,
+        "ctime": instance.ctime,
         }
 
       result[instance.name] = idict