Add OPMoveInstance and LUMoveInstance
[ganeti-local] / lib / cmdlib.py
index 255bfe0..93f9e4f 100644 (file)
@@ -711,30 +711,32 @@ def _CheckInstanceBridgesExist(lu, instance, node=None):
   _CheckNicsBridgesExist(lu, instance.nics, node)
 
 
-def _GetNodePrimaryInstances(cfg, node_name):
-  """Returns primary instances on a node.
+def _GetNodeInstancesInner(cfg, fn):
+  return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
+
+
+def _GetNodeInstances(cfg, node_name):
+  """Returns a list of all primary and secondary instances on a node.
 
   """
-  instances = []
 
-  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
-    if node_name == inst.primary_node:
-      instances.append(inst)
+  return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
+
 
-  return instances
+def _GetNodePrimaryInstances(cfg, node_name):
+  """Returns primary instances on a node.
+
+  """
+  return _GetNodeInstancesInner(cfg,
+                                lambda inst: node_name == inst.primary_node)
 
 
 def _GetNodeSecondaryInstances(cfg, node_name):
   """Returns secondary instances on a node.
 
   """
-  instances = []
-
-  for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
-    if node_name in inst.secondary_nodes:
-      instances.append(inst)
-
-  return instances
+  return _GetNodeInstancesInner(cfg,
+                                lambda inst: node_name in inst.secondary_nodes)
 
 
 def _GetStorageTypeArgs(cfg, storage_type):
@@ -749,6 +751,52 @@ def _GetStorageTypeArgs(cfg, storage_type):
   return []
 
 
+def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
+  faulty = []
+
+  for dev in instance.disks:
+    cfg.SetDiskID(dev, node_name)
+
+  result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks)
+  result.Raise("Failed to get disk status from node %s" % node_name,
+               prereq=prereq)
+
+  for idx, bdev_status in enumerate(result.payload):
+    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
+      faulty.append(idx)
+
+  return faulty
+
+
+class LUPostInitCluster(LogicalUnit):
+  """Logical unit for running hooks after cluster initialization.
+
+  """
+  HPATH = "cluster-init"
+  HTYPE = constants.HTYPE_CLUSTER
+  _OP_REQP = []
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    """
+    env = {"OP_TARGET": self.cfg.GetClusterName()}
+    mn = self.cfg.GetMasterNode()
+    return env, [], [mn]
+
+  def CheckPrereq(self):
+    """No prerequisites to check.
+
+    """
+    return True
+
+  def Exec(self, feedback_fn):
+    """Nothing to do.
+
+    """
+    return True
+
+
 class LUDestroyCluster(NoHooksLU):
   """Logical unit for destroying the cluster.
 
@@ -1471,6 +1519,100 @@ class LUVerifyDisks(NoHooksLU):
     return result
 
 
+class LURepairDiskSizes(NoHooksLU):
+  """Verifies the cluster disks sizes.
+
+  """
+  _OP_REQP = ["instances"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+
+    if not isinstance(self.op.instances, list):
+      raise errors.OpPrereqError("Invalid argument type 'instances'")
+
+    if self.op.instances:
+      self.wanted_names = []
+      for name in self.op.instances:
+        full_name = self.cfg.ExpandInstanceName(name)
+        if full_name is None:
+          raise errors.OpPrereqError("Instance '%s' not known" % name)
+        self.wanted_names.append(full_name)
+      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
+      self.needed_locks = {
+        locking.LEVEL_NODE: [],
+        locking.LEVEL_INSTANCE: self.wanted_names,
+        }
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    else:
+      self.wanted_names = None
+      self.needed_locks = {
+        locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_INSTANCE: locking.ALL_SET,
+        }
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE and self.wanted_names is not None:
+      self._LockInstancesNodes(primary_only=True)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This only checks the optional instance list against the existing names.
+
+    """
+    if self.wanted_names is None:
+      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
+                             in self.wanted_names]
+
+  def Exec(self, feedback_fn):
+    """Verify the size of cluster disks.
+
+    """
+    # TODO: check child disks too
+    # TODO: check differences in size between primary/secondary nodes
+    per_node_disks = {}
+    for instance in self.wanted_instances:
+      pnode = instance.primary_node
+      if pnode not in per_node_disks:
+        per_node_disks[pnode] = []
+      for idx, disk in enumerate(instance.disks):
+        per_node_disks[pnode].append((instance, idx, disk))
+
+    changed = []
+    for node, dskl in per_node_disks.items():
+      result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
+      if result.failed:
+        self.LogWarning("Failure in blockdev_getsizes call to node"
+                        " %s, ignoring", node)
+        continue
+      if len(result.data) != len(dskl):
+        self.LogWarning("Invalid result from node %s, ignoring node results",
+                        node)
+        continue
+      for ((instance, idx, disk), size) in zip(dskl, result.data):
+        if size is None:
+          self.LogWarning("Disk %d of instance %s did not return size"
+                          " information, ignoring", idx, instance.name)
+          continue
+        if not isinstance(size, (int, long)):
+          self.LogWarning("Disk %d of instance %s did not return valid"
+                          " size information, ignoring", idx, instance.name)
+          continue
+        size = size >> 20
+        if size != disk.size:
+          self.LogInfo("Disk %d of instance %s has mismatched size,"
+                       " correcting: recorded %d, actual %d", idx,
+                       instance.name, disk.size, size)
+          disk.size = size
+          self.cfg.Update(instance)
+          changed.append((instance.name, idx, size))
+    return changed
+
+
 class LURenameCluster(LogicalUnit):
   """Rename the cluster.
 
@@ -1888,7 +2030,7 @@ def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
       result = False
     else:
       if ldisk:
-        result = result and not rstats.payload.ldisk_degraded
+        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
       else:
         result = result and not rstats.payload.is_degraded
 
@@ -2082,7 +2224,7 @@ class LUQueryNodes(NoHooksLU):
     "name", "pinst_cnt", "sinst_cnt",
     "pinst_list", "sinst_list",
     "pip", "sip", "tags",
-    "serial_no",
+    "serial_no", "ctime", "mtime",
     "master_candidate",
     "master",
     "offline",
@@ -2206,6 +2348,10 @@ class LUQueryNodes(NoHooksLU):
           val = list(node.GetTags())
         elif field == "serial_no":
           val = node.serial_no
+        elif field == "ctime":
+          val = node.ctime
+        elif field == "mtime":
+          val = node.mtime
         elif field == "master_candidate":
           val = node.master_candidate
         elif field == "master":
@@ -2900,6 +3046,8 @@ class LUQueryClusterInfo(NoHooksLU):
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
       "file_storage_dir": cluster.file_storage_dir,
+      "ctime": cluster.ctime,
+      "mtime": cluster.mtime,
       }
 
     return result
@@ -2971,19 +3119,24 @@ class LUActivateInstanceDisks(NoHooksLU):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
+    if not hasattr(self.op, "ignore_size"):
+      self.op.ignore_size = False
 
   def Exec(self, feedback_fn):
     """Activate the disks.
 
     """
-    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
+    disks_ok, disks_info = \
+              _AssembleInstanceDisks(self, self.instance,
+                                     ignore_size=self.op.ignore_size)
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block devices")
 
     return disks_info
 
 
-def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
+def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
+                           ignore_size=False):
   """Prepare the block devices for an instance.
 
   This sets up the block devices on all nodes.
@@ -2995,6 +3148,10 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
   @type ignore_secondaries: boolean
   @param ignore_secondaries: if true, errors on secondary nodes
       won't result in an error return from the function
+  @type ignore_size: boolean
+  @param ignore_size: if true, the current known size of the disk
+      will not be used during the disk activation, useful for cases
+      when the size is wrong
   @return: False if the operation failed, otherwise a list of
       (host, instance_visible_name, node_visible_name)
       with the mapping from node devices to instance devices
@@ -3015,6 +3172,9 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
   # 1st pass, assemble on all nodes in secondary mode
   for inst_disk in instance.disks:
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+      if ignore_size:
+        node_disk = node_disk.Copy()
+        node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
       msg = result.fail_msg
@@ -3032,6 +3192,9 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       if node != instance.primary_node:
         continue
+      if ignore_size:
+        node_disk = node_disk.Copy()
+        node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
       msg = result.fail_msg
@@ -3488,6 +3651,89 @@ class LUReinstallInstance(LogicalUnit):
       _ShutdownInstanceDisks(self, inst)
 
 
+class LURecreateInstanceDisks(LogicalUnit):
+  """Recreate an instance's missing disks.
+
+  """
+  HPATH = "instance-recreate-disks"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "disks"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    """Check the arguments.
+
+    """
+    if not isinstance(self.op.disks, list):
+      raise errors.OpPrereqError("Invalid disks parameter")
+    for item in self.op.disks:
+      if (not isinstance(item, int) or
+          item < 0):
+        raise errors.OpPrereqError("Invalid disk specification '%s'" %
+                                   str(item))
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    env = _BuildInstanceHookEnvByObject(self, self.instance)
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster and is not running.
+
+    """
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, instance.primary_node)
+
+    if instance.disk_template == constants.DT_DISKLESS:
+      raise errors.OpPrereqError("Instance '%s' has no disks" %
+                                 self.op.instance_name)
+    if instance.admin_up:
+      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
+                                 self.op.instance_name)
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    remote_info.Raise("Error checking node %s" % instance.primary_node,
+                      prereq=True)
+    if remote_info.payload:
+      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
+                                 (self.op.instance_name,
+                                  instance.primary_node))
+
+    if not self.op.disks:
+      self.op.disks = range(len(instance.disks))
+    else:
+      for idx in self.op.disks:
+        if idx >= len(instance.disks):
+          raise errors.OpPrereqError("Invalid disk index passed '%s'" % idx)
+
+    self.instance = instance
+
+  def Exec(self, feedback_fn):
+    """Recreate the disks.
+
+    """
+    to_skip = []
+    for idx, disk in enumerate(self.instance.disks):
+      if idx not in self.op.disks: # disk idx has not been passed in
+        to_skip.append(idx)
+        continue
+
+    _CreateDisks(self, self.instance, to_skip=to_skip)
+
+
 class LURenameInstance(LogicalUnit):
   """Rename an instance.
 
@@ -3679,7 +3925,9 @@ class LUQueryInstances(NoHooksLU):
                                     r"(nic)\.(bridge)/([0-9]+)",
                                     r"(nic)\.(macs|ips|modes|links|bridges)",
                                     r"(disk|nic)\.(count)",
-                                    "serial_no", "hypervisor", "hvparams",] +
+                                    "serial_no", "hypervisor", "hvparams",
+                                    "ctime", "mtime",
+                                    ] +
                                   ["hv/%s" % name
                                    for name in constants.HVS_PARAMETERS] +
                                   ["be/%s" % name
@@ -3864,6 +4112,10 @@ class LUQueryInstances(NoHooksLU):
           val = list(instance.GetTags())
         elif field == "serial_no":
           val = instance.serial_no
+        elif field == "ctime":
+          val = instance.ctime
+        elif field == "mtime":
+          val = instance.mtime
         elif field == "network_port":
           val = instance.network_port
         elif field == "hypervisor":
@@ -4117,6 +4369,181 @@ class LUMigrateInstance(LogicalUnit):
     return env, nl, nl
 
 
+class LUMoveInstance(LogicalUnit):
+  """Move an instance by data-copying.
+
+  """
+  HPATH = "instance-move"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "target_node"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    target_node = self.cfg.ExpandNodeName(self.op.target_node)
+    if target_node is None:
+      raise errors.OpPrereqError("Node '%s' not known" %
+                                  self.op.target_node)
+    self.op.target_node = target_node
+    self.needed_locks[locking.LEVEL_NODE] = [target_node]
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes(primary_only=True)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    env = {
+      "TARGET_NODE": self.op.target_node,
+      }
+    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    nl = [self.cfg.GetMasterNode()] + [self.instance.primary_node,
+                                       self.op.target_node]
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+
+    node = self.cfg.GetNodeInfo(self.op.target_node)
+    assert node is not None, \
+      "Cannot retrieve locked node %s" % self.op.target_node
+
+    self.target_node = target_node = node.name
+
+    if target_node == instance.primary_node:
+      raise errors.OpPrereqError("Instance %s is already on the node %s" %
+                                 (instance.name, target_node))
+
+    bep = self.cfg.GetClusterInfo().FillBE(instance)
+
+    for idx, dsk in enumerate(instance.disks):
+      if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
+        raise errors.OpPrereqError("Instance disk %d has a complex layout,"
+                                   " cannot copy")
+
+    _CheckNodeOnline(self, target_node)
+    _CheckNodeNotDrained(self, target_node)
+
+    if instance.admin_up:
+      # check memory requirements on the secondary node
+      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
+                           instance.name, bep[constants.BE_MEMORY],
+                           instance.hypervisor)
+    else:
+      self.LogInfo("Not checking memory on the secondary node as"
+                   " instance will not be started")
+
+    # check bridge existance
+    _CheckInstanceBridgesExist(self, instance, node=target_node)
+
+  def Exec(self, feedback_fn):
+    """Move an instance.
+
+    The move is done by shutting it down on its present node, copying
+    the data over (slow) and starting it on the new node.
+
+    """
+    instance = self.instance
+
+    source_node = instance.primary_node
+    target_node = self.target_node
+
+    self.LogInfo("Shutting down instance %s on source node %s",
+                 instance.name, source_node)
+
+    result = self.rpc.call_instance_shutdown(source_node, instance)
+    msg = result.fail_msg
+    if msg:
+      if self.op.ignore_consistency:
+        self.proc.LogWarning("Could not shutdown instance %s on node %s."
+                             " Proceeding anyway. Please make sure node"
+                             " %s is down. Error details: %s",
+                             instance.name, source_node, source_node, msg)
+      else:
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, source_node, msg))
+
+    # create the target disks
+    try:
+      _CreateDisks(self, instance, target_node=target_node)
+    except errors.OpExecError:
+      self.LogWarning("Device creation failed, reverting...")
+      try:
+        _RemoveDisks(self, instance, target_node=target_node)
+      finally:
+        self.cfg.ReleaseDRBDMinors(instance.name)
+        raise
+
+    cluster_name = self.cfg.GetClusterInfo().cluster_name
+
+    errs = []
+    # activate, get path, copy the data over
+    for idx, disk in enumerate(instance.disks):
+      self.LogInfo("Copying data for disk %d", idx)
+      result = self.rpc.call_blockdev_assemble(target_node, disk,
+                                               instance.name, True)
+      if result.fail_msg:
+        self.LogWarning("Can't assemble newly created disk %d: %s",
+                        idx, result.fail_msg)
+        errs.append(result.fail_msg)
+        break
+      dev_path = result.payload
+      result = self.rpc.call_blockdev_export(source_node, disk,
+                                             target_node, dev_path,
+                                             cluster_name)
+      if result.fail_msg:
+        self.LogWarning("Can't copy data over for disk %d: %s",
+                        idx, result.fail_msg)
+        errs.append(result.fail_msg)
+        break
+
+    if errs:
+      self.LogWarning("Some disks failed to copy, aborting")
+      try:
+        _RemoveDisks(self, instance, target_node=target_node)
+      finally:
+        self.cfg.ReleaseDRBDMinors(instance.name)
+        raise errors.OpExecError("Errors during disk copy: %s" %
+                                 (",".join(errs),))
+
+    instance.primary_node = target_node
+    self.cfg.Update(instance)
+
+    self.LogInfo("Removing the disks on the original node")
+    _RemoveDisks(self, instance, target_node=source_node)
+
+    # Only start the instance if it's marked as up
+    if instance.admin_up:
+      self.LogInfo("Starting instance %s on node %s",
+                   instance.name, target_node)
+
+      disks_ok, _ = _AssembleInstanceDisks(self, instance,
+                                           ignore_secondaries=True)
+      if not disks_ok:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Can't activate the instance's disks")
+
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
+      msg = result.fail_msg
+      if msg:
+        _ShutdownInstanceDisks(self, instance)
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
+
+
 class LUMigrateNode(LogicalUnit):
   """Migrate all instances from a node.
 
@@ -4680,7 +5107,7 @@ def _GetInstanceInfoText(instance):
   return "originstname+%s" % instance.name
 
 
-def _CreateDisks(lu, instance):
+def _CreateDisks(lu, instance, to_skip=None, target_node=None):
   """Create all disks for an instance.
 
   This abstracts away some work from AddInstance.
@@ -4689,12 +5116,21 @@ def _CreateDisks(lu, instance):
   @param lu: the logical unit on whose behalf we execute
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we should create
+  @type to_skip: list
+  @param to_skip: list of indices to skip
+  @type target_node: string
+  @param target_node: if passed, overrides the target node for creation
   @rtype: boolean
   @return: the success of the creation
 
   """
   info = _GetInstanceInfoText(instance)
-  pnode = instance.primary_node
+  if target_node is None:
+    pnode = instance.primary_node
+    all_nodes = instance.all_nodes
+  else:
+    pnode = target_node
+    all_nodes = [pnode]
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
@@ -4705,16 +5141,18 @@ def _CreateDisks(lu, instance):
 
   # Note: this needs to be kept in sync with adding of disks in
   # LUSetInstanceParams
-  for device in instance.disks:
+  for idx, device in enumerate(instance.disks):
+    if to_skip and idx in to_skip:
+      continue
     logging.info("Creating volume %s for instance %s",
                  device.iv_name, instance.name)
     #HARDCODE
-    for node in instance.all_nodes:
+    for node in all_nodes:
       f_create = node == pnode
       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
 
 
-def _RemoveDisks(lu, instance):
+def _RemoveDisks(lu, instance, target_node=None):
   """Remove all disks for an instance.
 
   This abstracts away some work from `AddInstance()` and
@@ -4726,6 +5164,8 @@ def _RemoveDisks(lu, instance):
   @param lu: the logical unit on whose behalf we execute
   @type instance: L{objects.Instance}
   @param instance: the instance whose disks we should remove
+  @type target_node: string
+  @param target_node: used to override the node on which to remove the disks
   @rtype: boolean
   @return: the success of the removal
 
@@ -4734,7 +5174,11 @@ def _RemoveDisks(lu, instance):
 
   all_result = True
   for device in instance.disks:
-    for node, disk in device.ComputeNodeTree(instance.primary_node):
+    if target_node:
+      edata = [(target_node, device)]
+    else:
+      edata = device.ComputeNodeTree(instance.primary_node)
+    for node, disk in edata:
       lu.cfg.SetDiskID(disk, node)
       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
       if msg:
@@ -4744,12 +5188,14 @@ def _RemoveDisks(lu, instance):
 
   if instance.disk_template == constants.DT_FILE:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
-    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
-                                                 file_storage_dir)
-    msg = result.fail_msg
-    if msg:
+    if target_node is node:
+      tgt = instance.primary_node
+    else:
+      tgt = instance.target_node
+    result = lu.rpc.call_file_storage_dir_remove(tgt, file_storage_dir)
+    if result.fail_msg:
       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
-                    file_storage_dir, instance.primary_node, msg)
+                    file_storage_dir, instance.primary_node, result.fail_msg)
       all_result = False
 
   return all_result
@@ -5642,20 +6088,21 @@ class TLReplaceDisks(Tasklet):
 
     """
     # check for valid parameter combination
-    cnt = [remote_node, iallocator].count(None)
     if mode == constants.REPLACE_DISK_CHG:
-      if cnt == 2:
+      if remote_node is None and iallocator is None:
         raise errors.OpPrereqError("When changing the secondary either an"
                                    " iallocator script must be used or the"
                                    " new node given")
-      elif cnt == 0:
+
+      if remote_node is not None and iallocator is not None:
         raise errors.OpPrereqError("Give either the iallocator or the new"
                                    " secondary, not both")
-    else: # not replacing the secondary
-      if cnt != 2:
-        raise errors.OpPrereqError("The iallocator and new node options can"
-                                   " be used only when changing the"
-                                   " secondary node")
+
+    elif remote_node is not None or iallocator is not None:
+      # Not replacing the secondary
+      raise errors.OpPrereqError("The iallocator and new node options can"
+                                 " only be used when changing the"
+                                 " secondary node")
 
   @staticmethod
   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
@@ -5685,6 +6132,10 @@ class TLReplaceDisks(Tasklet):
 
     return remote_node_name
 
+  def _FindFaultyDisks(self, node_name):
+    return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
+                                    node_name, True)
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -5727,35 +6178,64 @@ class TLReplaceDisks(Tasklet):
       raise errors.OpPrereqError("The specified node is already the"
                                  " secondary node of the instance.")
 
-    if self.mode == constants.REPLACE_DISK_PRI:
-      self.target_node = self.instance.primary_node
-      self.other_node = secondary_node
-      check_nodes = [self.target_node, self.other_node]
+    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
+                                    constants.REPLACE_DISK_CHG):
+      raise errors.OpPrereqError("Cannot specify disks to be replaced")
+
+    if self.mode == constants.REPLACE_DISK_AUTO:
+      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
+      faulty_secondary = self._FindFaultyDisks(secondary_node)
+
+      if faulty_primary and faulty_secondary:
+        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
+                                   " one node and can not be repaired"
+                                   " automatically" % self.instance_name)
+
+      if faulty_primary:
+        self.disks = faulty_primary
+        self.target_node = self.instance.primary_node
+        self.other_node = secondary_node
+        check_nodes = [self.target_node, self.other_node]
+      elif faulty_secondary:
+        self.disks = faulty_secondary
+        self.target_node = secondary_node
+        self.other_node = self.instance.primary_node
+        check_nodes = [self.target_node, self.other_node]
+      else:
+        self.disks = []
+        check_nodes = []
+
+    else:
+      # Non-automatic modes
+      if self.mode == constants.REPLACE_DISK_PRI:
+        self.target_node = self.instance.primary_node
+        self.other_node = secondary_node
+        check_nodes = [self.target_node, self.other_node]
 
-    elif self.mode == constants.REPLACE_DISK_SEC:
-      self.target_node = secondary_node
-      self.other_node = self.instance.primary_node
-      check_nodes = [self.target_node, self.other_node]
+      elif self.mode == constants.REPLACE_DISK_SEC:
+        self.target_node = secondary_node
+        self.other_node = self.instance.primary_node
+        check_nodes = [self.target_node, self.other_node]
 
-    elif self.mode == constants.REPLACE_DISK_CHG:
-      self.new_node = remote_node
-      self.other_node = self.instance.primary_node
-      self.target_node = secondary_node
-      check_nodes = [self.new_node, self.other_node]
+      elif self.mode == constants.REPLACE_DISK_CHG:
+        self.new_node = remote_node
+        self.other_node = self.instance.primary_node
+        self.target_node = secondary_node
+        check_nodes = [self.new_node, self.other_node]
 
-      _CheckNodeNotDrained(self.lu, remote_node)
+        _CheckNodeNotDrained(self.lu, remote_node)
 
-    else:
-      raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
-                                   self.mode)
+      else:
+        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
+                                     self.mode)
+
+      # If not specified all disks should be replaced
+      if not self.disks:
+        self.disks = range(len(self.instance.disks))
 
     for node in check_nodes:
       _CheckNodeOnline(self.lu, node)
 
-    # If not specified all disks should be replaced
-    if not self.disks:
-      self.disks = range(len(self.instance.disks))
-
     # Check whether disks are valid
     for disk_idx in self.disks:
       self.instance.FindDisk(disk_idx)
@@ -5775,7 +6255,12 @@ class TLReplaceDisks(Tasklet):
     This dispatches the disk replacement to the appropriate handler.
 
     """
-    feedback_fn("Replacing disks for %s" % self.instance.name)
+    if not self.disks:
+      feedback_fn("No disks need replacement")
+      return
+
+    feedback_fn("Replacing disk(s) %s for %s" %
+                (", ".join([str(i) for i in self.disks]), self.instance.name))
 
     activate_disks = (not self.instance.admin_up)
 
@@ -5784,7 +6269,8 @@ class TLReplaceDisks(Tasklet):
       _StartInstanceDisks(self.lu, self.instance, True)
 
     try:
-      if self.mode == constants.REPLACE_DISK_CHG:
+      # Should we replace the secondary node?
+      if self.new_node is not None:
         return self._ExecDrbd8Secondary()
       else:
         return self._ExecDrbd8DiskOnly()
@@ -6157,6 +6643,62 @@ class TLReplaceDisks(Tasklet):
     self._RemoveOldStorage(self.target_node, iv_names)
 
 
+class LURepairNodeStorage(NoHooksLU):
+  """Repairs the volume group on a node.
+
+  """
+  _OP_REQP = ["node_name"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    node_name = self.cfg.ExpandNodeName(self.op.node_name)
+    if node_name is None:
+      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
+
+    self.op.node_name = node_name
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: [self.op.node_name],
+      }
+
+  def _CheckFaultyDisks(self, instance, node_name):
+    if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
+                                node_name, True):
+      raise errors.OpPrereqError("Instance '%s' has faulty disks on"
+                                 " node '%s'" % (inst.name, node_name))
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    storage_type = self.op.storage_type
+
+    if (constants.SO_FIX_CONSISTENCY not in
+        constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
+      raise errors.OpPrereqError("Storage units of type '%s' can not be"
+                                 " repaired" % storage_type)
+
+    # Check whether any instance on this node has faulty disks
+    for inst in _GetNodeInstances(self.cfg, self.op.node_name):
+      check_nodes = set(inst.all_nodes)
+      check_nodes.discard(self.op.node_name)
+      for inst_node_name in check_nodes:
+        self._CheckFaultyDisks(inst, inst_node_name)
+
+  def Exec(self, feedback_fn):
+    feedback_fn("Repairing storage unit '%s' on %s ..." %
+                (self.op.name, self.op.node_name))
+
+    st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
+    result = self.rpc.call_storage_execute(self.op.node_name,
+                                           self.op.storage_type, st_args,
+                                           self.op.name,
+                                           constants.SO_FIX_CONSISTENCY)
+    result.Raise("Failed to repair storage unit '%s' on %s" %
+                 (self.op.name, self.op.node_name))
+
+
 class LUGrowDisk(LogicalUnit):
   """Grow a disk of an instance.
 
@@ -6297,7 +6839,7 @@ class LUQueryInstanceData(NoHooksLU):
     """Returns the status of a block device
 
     """
-    if self.op.static:
+    if self.op.static or not node:
       return None
 
     self.cfg.SetDiskID(dev, node)
@@ -6314,7 +6856,7 @@ class LUQueryInstanceData(NoHooksLU):
 
     return (status.dev_path, status.major, status.minor,
             status.sync_percent, status.estimated_time,
-            status.is_degraded, status.ldisk_degraded)
+            status.is_degraded, status.ldisk_status)
 
   def _ComputeDiskStatus(self, instance, snode, dev):
     """Compute block device status.
@@ -6394,6 +6936,9 @@ class LUQueryInstanceData(NoHooksLU):
         "hv_actual": cluster.FillHV(instance),
         "be_instance": instance.beparams,
         "be_actual": cluster.FillBE(instance),
+        "serial_no": instance.serial_no,
+        "mtime": instance.mtime,
+        "ctime": instance.ctime,
         }
 
       result[instance.name] = idict
@@ -7041,6 +7586,8 @@ class LUExportInstance(LogicalUnit):
     for disk in instance.disks:
       self.cfg.SetDiskID(disk, src_node)
 
+    # per-disk results
+    dresults = []
     try:
       for idx, disk in enumerate(instance.disks):
         # result.payload will be a snapshot of an lvm leaf of the one we passed
@@ -7076,16 +7623,23 @@ class LUExportInstance(LogicalUnit):
         if msg:
           self.LogWarning("Could not export disk/%s from node %s to"
                           " node %s: %s", idx, src_node, dst_node.name, msg)
+          dresults.append(False)
+        else:
+          dresults.append(True)
         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
         if msg:
           self.LogWarning("Could not remove snapshot for disk/%d from node"
                           " %s: %s", idx, src_node, msg)
+      else:
+        dresults.append(False)
 
     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
+    fin_resu = True
     msg = result.fail_msg
     if msg:
       self.LogWarning("Could not finalize export for instance %s"
                       " on node %s: %s", instance.name, dst_node.name, msg)
+      fin_resu = False
 
     nodelist = self.cfg.GetNodeList()
     nodelist.remove(dst_node.name)
@@ -7104,6 +7658,7 @@ class LUExportInstance(LogicalUnit):
           if msg:
             self.LogWarning("Could not remove older export for instance %s"
                             " on node %s: %s", iname, node, msg)
+    return fin_resu, dresults
 
 
 class LURemoveExport(NoHooksLU):
@@ -7467,11 +8022,12 @@ class IAllocator(object):
         "master_candidate": ninfo.master_candidate,
         }
 
-      if not ninfo.offline:
+      if not (ninfo.offline or ninfo.drained):
         nresult.Raise("Can't get data for node %s" % nname)
         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
                                 nname)
         remote_info = nresult.payload
+
         for attr in ['memory_total', 'memory_free', 'memory_dom0',
                      'vg_size', 'vg_free', 'cpu_total']:
           if attr not in remote_info: