Improve import/export timeout settings
[ganeti-local] / lib / cmdlib.py
index d5731ac..5a2ca68 100644 (file)
@@ -39,6 +39,7 @@ import OpenSSL
 import socket
 import tempfile
 import shutil
+import itertools
 
 from ganeti import ssh
 from ganeti import utils
@@ -593,17 +594,19 @@ def _CheckGlobalHvParams(params):
     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
 
-def _CheckNodeOnline(lu, node):
+def _CheckNodeOnline(lu, node, msg=None):
   """Ensure that a given node is online.
 
   @param lu: the LU on behalf of which we make the check
   @param node: the node to check
+  @param msg: if passed, should be a message to replace the default one
   @raise errors.OpPrereqError: if the node is offline
 
   """
+  if msg is None:
+    msg = "Can't use offline node"
   if lu.cfg.GetNodeInfo(node).offline:
-    raise errors.OpPrereqError("Can't use offline node %s" % node,
-                               errors.ECODE_INVAL)
+    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
 
 
 def _CheckNodeNotDrained(lu, node):
@@ -616,7 +619,20 @@ def _CheckNodeNotDrained(lu, node):
   """
   if lu.cfg.GetNodeInfo(node).drained:
     raise errors.OpPrereqError("Can't use drained node %s" % node,
-                               errors.ECODE_INVAL)
+                               errors.ECODE_STATE)
+
+
+def _CheckNodeVmCapable(lu, node):
+  """Ensure that a given node is vm capable.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @raise errors.OpPrereqError: if the node is not vm capable
+
+  """
+  if not lu.cfg.GetNodeInfo(node).vm_capable:
+    raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
+                               errors.ECODE_STATE)
 
 
 def _CheckNodeHasOS(lu, node, os_name, force_variant):
@@ -637,6 +653,33 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant):
     _CheckOSVariant(result.payload, os_name)
 
 
+def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
+  """Ensure that a node has the given secondary ip.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the LU on behalf of which we make the check
+  @type node: string
+  @param node: the node to check
+  @type secondary_ip: string
+  @param secondary_ip: the ip to check
+  @type prereq: boolean
+  @param prereq: whether to throw a prerequisite or an execute error
+  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
+  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
+
+  """
+  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
+  result.Raise("Failure checking secondary ip on node %s" % node,
+               prereq=prereq, ecode=errors.ECODE_ENVIRON)
+  if not result.payload:
+    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
+           " please fix and re-run this command" % secondary_ip)
+    if prereq:
+      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
+    else:
+      raise errors.OpExecError(msg)
+
+
 def _RequireFileStorage():
   """Checks that file storage is enabled.
 
@@ -1168,7 +1211,7 @@ class LUVerifyCluster(LogicalUnit):
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
   EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
-  EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
+  EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
   EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
   ENODEDRBD = (TNODE, "ENODEDRBD")
   ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
@@ -1219,9 +1262,11 @@ class LUVerifyCluster(LogicalUnit):
     @ivar os_fail: whether the RPC call didn't return valid OS data
     @type oslist: list
     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
+    @type vm_capable: boolean
+    @ivar vm_capable: whether the node can host instances
 
     """
-    def __init__(self, offline=False, name=None):
+    def __init__(self, offline=False, name=None, vm_capable=True):
       self.name = name
       self.volumes = {}
       self.instances = []
@@ -1231,6 +1276,7 @@ class LUVerifyCluster(LogicalUnit):
       self.mfree = 0
       self.dfree = 0
       self.offline = offline
+      self.vm_capable = vm_capable
       self.rpc_fail = False
       self.lvm_fail = False
       self.hyp_fail = False
@@ -1335,13 +1381,12 @@ class LUVerifyCluster(LogicalUnit):
                   code=self.ETYPE_WARNING)
 
     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
-    if isinstance(hyp_result, dict):
+    if ninfo.vm_capable and isinstance(hyp_result, dict):
       for hv_name, hv_result in hyp_result.iteritems():
         test = hv_result is not None
         _ErrorIf(test, self.ENODEHV, node,
                  "hypervisor %s verify failure: '%s'", hv_name, hv_result)
 
-
     test = nresult.get(constants.NV_NODESETUP,
                            ["Missing NODESETUP results"])
     _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
@@ -1460,8 +1505,8 @@ class LUVerifyCluster(LogicalUnit):
           msg = "cannot reach the master IP"
         _ErrorIf(True, self.ENODENET, node, msg)
 
-
-  def _VerifyInstance(self, instance, instanceconfig, node_image):
+  def _VerifyInstance(self, instance, instanceconfig, node_image,
+                      diskstatus):
     """Verify an instance.
 
     This function checks to see if the required block devices are
@@ -1497,6 +1542,20 @@ class LUVerifyCluster(LogicalUnit):
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
 
+    diskdata = [(nname, success, status, idx)
+                for (nname, disks) in diskstatus.items()
+                for idx, (success, status) in enumerate(disks)]
+
+    for nname, success, bdev_status, idx in diskdata:
+      _ErrorIf(instanceconfig.admin_up and not success,
+               self.EINSTANCEFAULTYDISK, instance,
+               "couldn't retrieve status for disk/%s on %s: %s",
+               idx, nname, bdev_status)
+      _ErrorIf((instanceconfig.admin_up and success and
+                bdev_status.ldisk_status == constants.LDS_FAULTY),
+               self.EINSTANCEFAULTYDISK, instance,
+               "disk/%s on %s is faulty", idx, nname)
+
   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
     """Verify if there are any unknown volumes in the cluster.
 
@@ -1847,6 +1906,103 @@ class LUVerifyCluster(LogicalUnit):
           _ErrorIf(True, self.ENODERPC, node,
                    "node returned invalid LVM info, check LVM status")
 
+  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
+    """Gets per-disk status information for all instances.
+
+    @type nodelist: list of strings
+    @param nodelist: Node names
+    @type node_image: dict of (name, L{objects.Node})
+    @param node_image: Node objects
+    @type instanceinfo: dict of (name, L{objects.Instance})
+    @param instanceinfo: Instance objects
+    @rtype: {instance: {node: [(succes, payload)]}}
+    @return: a dictionary of per-instance dictionaries with nodes as
+        keys and disk information as values; the disk information is a
+        list of tuples (success, payload)
+
+    """
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    node_disks = {}
+    node_disks_devonly = {}
+    diskless_instances = set()
+    diskless = constants.DT_DISKLESS
+
+    for nname in nodelist:
+      node_instances = list(itertools.chain(node_image[nname].pinst,
+                                            node_image[nname].sinst))
+      diskless_instances.update(inst for inst in node_instances
+                                if instanceinfo[inst].disk_template == diskless)
+      disks = [(inst, disk)
+               for inst in node_instances
+               for disk in instanceinfo[inst].disks]
+
+      if not disks:
+        # No need to collect data
+        continue
+
+      node_disks[nname] = disks
+
+      # Creating copies as SetDiskID below will modify the objects and that can
+      # lead to incorrect data returned from nodes
+      devonly = [dev.Copy() for (_, dev) in disks]
+
+      for dev in devonly:
+        self.cfg.SetDiskID(dev, nname)
+
+      node_disks_devonly[nname] = devonly
+
+    assert len(node_disks) == len(node_disks_devonly)
+
+    # Collect data from all nodes with disks
+    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
+                                                          node_disks_devonly)
+
+    assert len(result) == len(node_disks)
+
+    instdisk = {}
+
+    for (nname, nres) in result.items():
+      disks = node_disks[nname]
+
+      if nres.offline:
+        # No data from this node
+        data = len(disks) * [(False, "node offline")]
+      else:
+        msg = nres.fail_msg
+        _ErrorIf(msg, self.ENODERPC, nname,
+                 "while getting disk information: %s", msg)
+        if msg:
+          # No data from this node
+          data = len(disks) * [(False, msg)]
+        else:
+          data = []
+          for idx, i in enumerate(nres.payload):
+            if isinstance(i, (tuple, list)) and len(i) == 2:
+              data.append(i)
+            else:
+              logging.warning("Invalid result from node %s, entry %d: %s",
+                              nname, idx, i)
+              data.append((False, "Invalid result from the remote node"))
+
+      for ((inst, _), status) in zip(disks, data):
+        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
+
+    # Add empty entries for diskless instances.
+    for inst in diskless_instances:
+      assert inst not in instdisk
+      instdisk[inst] = {}
+
+    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
+                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
+                      compat.all(isinstance(s, (tuple, list)) and
+                                 len(s) == 2 for s in statuses)
+                      for inst, nnames in instdisk.items()
+                      for nname, statuses in nnames.items())
+    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
+
+    return instdisk
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -1925,6 +2081,7 @@ class LUVerifyCluster(LogicalUnit):
       constants.NV_TIME: None,
       constants.NV_MASTERIP: (master_node, master_ip),
       constants.NV_OSLIST: None,
+      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
       }
 
     if vg_name is not None:
@@ -1938,7 +2095,8 @@ class LUVerifyCluster(LogicalUnit):
 
     # Build our expected cluster state
     node_image = dict((node.name, self.NodeImage(offline=node.offline,
-                                                 name=node.name))
+                                                 name=node.name,
+                                                 vm_capable=node.vm_capable))
                       for node in nodeinfo)
 
     for instance in instancelist:
@@ -1977,6 +2135,9 @@ class LUVerifyCluster(LogicalUnit):
 
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
+    feedback_fn("* Gathering disk information (%s nodes)" % len(nodelist))
+    instdisk = self._CollectDiskInfo(nodelist, node_image, instanceinfo)
+
     feedback_fn("* Verifying node status")
 
     refos_img = None
@@ -2012,29 +2173,32 @@ class LUVerifyCluster(LogicalUnit):
       nresult = all_nvinfo[node].payload
 
       nimg.call_ok = self._VerifyNode(node_i, nresult)
+      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
       self._VerifyNodeNetwork(node_i, nresult)
-      self._VerifyNodeLVM(node_i, nresult, vg_name)
       self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
                             master_files)
-      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
-                           all_drbd_map)
-      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
 
-      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
-      self._UpdateNodeInstances(node_i, nresult, nimg)
-      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
-      self._UpdateNodeOS(node_i, nresult, nimg)
-      if not nimg.os_fail:
-        if refos_img is None:
-          refos_img = nimg
-        self._VerifyNodeOS(node_i, nimg, refos_img)
+      if nimg.vm_capable:
+        self._VerifyNodeLVM(node_i, nresult, vg_name)
+        self._VerifyNodeDrbd(node_i, nresult, instanceinfo, drbd_helper,
+                             all_drbd_map)
+
+        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
+        self._UpdateNodeInstances(node_i, nresult, nimg)
+        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
+        self._UpdateNodeOS(node_i, nresult, nimg)
+        if not nimg.os_fail:
+          if refos_img is None:
+            refos_img = nimg
+          self._VerifyNodeOS(node_i, nimg, refos_img)
 
     feedback_fn("* Verifying instance status")
     for instance in instancelist:
       if verbose:
         feedback_fn("* Verifying instance %s" % instance)
       inst_config = instanceinfo[instance]
-      self._VerifyInstance(instance, inst_config, node_image)
+      self._VerifyInstance(instance, inst_config, node_image,
+                           instdisk[instance])
       inst_nodes_offline = []
 
       pnode = inst_config.primary_node
@@ -2073,10 +2237,12 @@ class LUVerifyCluster(LogicalUnit):
       _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
                "instance lives on offline node(s) %s",
                utils.CommaJoin(inst_nodes_offline))
-      # ... or ghost nodes
+      # ... or ghost/non-vm_capable nodes
       for node in inst_config.all_nodes:
         _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
                  "instance lives on ghost node %s", node)
+        _ErrorIf(not node_image[node].vm_capable, self.EINSTANCEBADNODE,
+                 instance, "instance lives on non-vm_capable node %s", node)
 
     feedback_fn("* Verifying orphan volumes")
     reserved = utils.FieldSet(*cluster.reserved_lvs)
@@ -2411,15 +2577,7 @@ class LURenameCluster(LogicalUnit):
         node_list.remove(master)
       except ValueError:
         pass
-      result = self.rpc.call_upload_file(node_list,
-                                         constants.SSH_KNOWN_HOSTS_FILE)
-      for to_node, to_result in result.iteritems():
-        msg = to_result.fail_msg
-        if msg:
-          msg = ("Copy of file %s to node %s failed: %s" %
-                 (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
-          self.proc.LogWarning(msg)
-
+      _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE)
     finally:
       result = self.rpc.call_node_start_master(master, False, False)
       msg = result.fail_msg
@@ -2443,8 +2601,7 @@ class LUSetClusterParams(LogicalUnit):
             ht.TNone)),
     ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
                               ht.TNone)),
-    ("beparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
-                              ht.TNone)),
+    ("beparams", None, ht.TOr(ht.TDict, ht.TNone)),
     ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
                             ht.TNone)),
     ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
@@ -2751,14 +2908,14 @@ class LUSetClusterParams(LogicalUnit):
       for key, val in mods:
         if key == constants.DDM_ADD:
           if val in lst:
-            feedback_fn("OS %s already in %s, ignoring", val, desc)
+            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
           else:
             lst.append(val)
         elif key == constants.DDM_REMOVE:
           if val in lst:
             lst.remove(val)
           else:
-            feedback_fn("OS %s not found in %s, ignoring", val, desc)
+            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
         else:
           raise errors.ProgrammerError("Invalid modification '%s'" % key)
 
@@ -2771,7 +2928,21 @@ class LUSetClusterParams(LogicalUnit):
     self.cfg.Update(self.cluster, feedback_fn)
 
 
-def _RedistributeAncillaryFiles(lu, additional_nodes=None):
+def _UploadHelper(lu, nodes, fname):
+  """Helper for uploading a file and showing warnings.
+
+  """
+  if os.path.exists(fname):
+    result = lu.rpc.call_upload_file(nodes, fname)
+    for to_node, to_result in result.items():
+      msg = to_result.fail_msg
+      if msg:
+        msg = ("Copy of file %s to node %s failed: %s" %
+               (fname, to_node, msg))
+        lu.proc.LogWarning(msg)
+
+
+def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
   """Distribute additional files which are part of the cluster configuration.
 
   ConfigWriter takes care of distributing the config and ssconf files, but
@@ -2780,15 +2951,23 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
 
   @param lu: calling logical unit
   @param additional_nodes: list of nodes not in the config to distribute to
+  @type additional_vm: boolean
+  @param additional_vm: whether the additional nodes are vm-capable or not
 
   """
   # 1. Gather target nodes
   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
   dist_nodes = lu.cfg.GetOnlineNodeList()
+  nvm_nodes = lu.cfg.GetNonVmCapableNodeList()
+  vm_nodes = [name for name in dist_nodes if name not in nvm_nodes]
   if additional_nodes is not None:
     dist_nodes.extend(additional_nodes)
+    if additional_vm:
+      vm_nodes.extend(additional_nodes)
   if myself.name in dist_nodes:
     dist_nodes.remove(myself.name)
+  if myself.name in vm_nodes:
+    vm_nodes.remove(myself.name)
 
   # 2. Gather files to distribute
   dist_files = set([constants.ETC_HOSTS,
@@ -2799,21 +2978,17 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
                     constants.CLUSTER_DOMAIN_SECRET_FILE,
                    ])
 
+  vm_files = set()
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
   for hv_name in enabled_hypervisors:
     hv_class = hypervisor.GetHypervisor(hv_name)
-    dist_files.update(hv_class.GetAncillaryFiles())
+    vm_files.update(hv_class.GetAncillaryFiles())
 
   # 3. Perform the files upload
   for fname in dist_files:
-    if os.path.exists(fname):
-      result = lu.rpc.call_upload_file(dist_nodes, fname)
-      for to_node, to_result in result.items():
-        msg = to_result.fail_msg
-        if msg:
-          msg = ("Copy of file %s to node %s failed: %s" %
-                 (fname, to_node, msg))
-          lu.proc.LogWarning(msg)
+    _UploadHelper(lu, dist_nodes, fname)
+  for fname in vm_files:
+    _UploadHelper(lu, vm_nodes, fname)
 
 
 class LURedistributeConfig(NoHooksLU):
@@ -3197,7 +3372,8 @@ class LUQueryNodes(NoHooksLU):
   REQ_BGL = False
 
   _SIMPLE_FIELDS = ["name", "serial_no", "ctime", "mtime", "uuid",
-                    "master_candidate", "offline", "drained"]
+                    "master_candidate", "offline", "drained",
+                    "master_capable", "vm_capable"]
 
   _FIELDS_DYNAMIC = utils.FieldSet(
     "dtotal", "dfree",
@@ -3572,8 +3748,11 @@ class LUAddNode(LogicalUnit):
     ("primary_ip", None, ht.NoType),
     ("secondary_ip", None, ht.TMaybeString),
     ("readd", False, ht.TBool),
-    ("group", None, ht.TMaybeString)
+    ("group", None, ht.TMaybeString),
+    ("master_capable", None, ht.TMaybeBool),
+    ("vm_capable", None, ht.TMaybeBool),
     ]
+  _NFLAGS = ["master_capable", "vm_capable"]
 
   def CheckArguments(self):
     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
@@ -3596,6 +3775,8 @@ class LUAddNode(LogicalUnit):
       "NODE_NAME": self.op.node_name,
       "NODE_PIP": self.op.primary_ip,
       "NODE_SIP": self.op.secondary_ip,
+      "MASTER_CAPABLE": str(self.op.master_capable),
+      "VM_CAPABLE": str(self.op.vm_capable),
       }
     nodes_0 = self.cfg.GetNodeList()
     nodes_1 = nodes_0 + [self.op.node_name, ]
@@ -3659,6 +3840,27 @@ class LUAddNode(LogicalUnit):
                                    " existing node %s" % existing_node.name,
                                    errors.ECODE_NOTUNIQUE)
 
+    # After this 'if' block, None is no longer a valid value for the
+    # _capable op attributes
+    if self.op.readd:
+      old_node = self.cfg.GetNodeInfo(node)
+      assert old_node is not None, "Can't retrieve locked node %s" % node
+      for attr in self._NFLAGS:
+        if getattr(self.op, attr) is None:
+          setattr(self.op, attr, getattr(old_node, attr))
+    else:
+      for attr in self._NFLAGS:
+        if getattr(self.op, attr) is None:
+          setattr(self.op, attr, True)
+
+    if self.op.readd and not self.op.vm_capable:
+      pri, sec = cfg.GetNodeInstances(node)
+      if pri or sec:
+        raise errors.OpPrereqError("Node %s being re-added with vm_capable"
+                                   " flag set to false, but it already holds"
+                                   " instances" % node,
+                                   errors.ECODE_STATE)
+
     # check that the type of the node (single versus dual homed) is the
     # same as for the master
     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
@@ -3666,11 +3868,11 @@ class LUAddNode(LogicalUnit):
     newbie_singlehomed = secondary_ip == primary_ip
     if master_singlehomed != newbie_singlehomed:
       if master_singlehomed:
-        raise errors.OpPrereqError("The master has no private ip but the"
+        raise errors.OpPrereqError("The master has no secondary ip but the"
                                    " new node has one",
                                    errors.ECODE_INVAL)
       else:
-        raise errors.OpPrereqError("The master has a private ip but the"
+        raise errors.OpPrereqError("The master has a secondary ip but the"
                                    " new node doesn't have one",
                                    errors.ECODE_INVAL)
 
@@ -3684,7 +3886,7 @@ class LUAddNode(LogicalUnit):
       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
                            source=myself.secondary_ip):
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
-                                   " based ping to noded port",
+                                   " based ping to node daemon port",
                                    errors.ECODE_ENVIRON)
 
     if self.op.readd:
@@ -3692,11 +3894,13 @@ class LUAddNode(LogicalUnit):
     else:
       exceptions = []
 
-    self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
+    if self.op.master_capable:
+      self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
+    else:
+      self.master_candidate = False
 
     if self.op.readd:
-      self.new_node = self.cfg.GetNodeInfo(node)
-      assert self.new_node is not None, "Can't retrieve locked node %s" % node
+      self.new_node = old_node
     else:
       node_group = cfg.LookupNodeGroup(self.op.group)
       self.new_node = objects.Node(name=node,
@@ -3725,6 +3929,10 @@ class LUAddNode(LogicalUnit):
       if self.changed_primary_ip:
         new_node.primary_ip = self.op.primary_ip
 
+    # copy the master/vm_capable flags
+    for attr in self._NFLAGS:
+      setattr(new_node, attr, getattr(self.op, attr))
+
     # notify the user about any possible mc promotion
     if new_node.master_candidate:
       self.LogInfo("Node will be a master candidate")
@@ -3750,14 +3958,8 @@ class LUAddNode(LogicalUnit):
       result.Raise("Can't update hosts file with new host data")
 
     if new_node.secondary_ip != new_node.primary_ip:
-      result = self.rpc.call_node_has_ip_address(new_node.name,
-                                                 new_node.secondary_ip)
-      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
-                   prereq=True, ecode=errors.ECODE_ENVIRON)
-      if not result.payload:
-        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
-                                 " you gave (%s). Please fix and re-run this"
-                                 " command." % new_node.secondary_ip)
+      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
+                               False)
 
     node_verify_list = [self.cfg.GetMasterNode()]
     node_verify_param = {
@@ -3790,7 +3992,8 @@ class LUAddNode(LogicalUnit):
           self.LogWarning("Node failed to demote itself from master"
                           " candidate status: %s" % msg)
     else:
-      _RedistributeAncillaryFiles(self, additional_nodes=[node])
+      _RedistributeAncillaryFiles(self, additional_nodes=[node],
+                                  additional_vm=self.op.vm_capable)
       self.context.AddNode(new_node, self.proc.GetECId())
 
 
@@ -3811,6 +4014,9 @@ class LUSetNodeParams(LogicalUnit):
     ("offline", None, ht.TMaybeBool),
     ("drained", None, ht.TMaybeBool),
     ("auto_promote", False, ht.TBool),
+    ("master_capable", None, ht.TMaybeBool),
+    ("vm_capable", None, ht.TMaybeBool),
+    ("secondary_ip", None, ht.TMaybeString),
     _PForce,
     ]
   REQ_BGL = False
@@ -3826,7 +4032,9 @@ class LUSetNodeParams(LogicalUnit):
 
   def CheckArguments(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
-    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
+    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
+                self.op.master_capable, self.op.vm_capable,
+                self.op.secondary_ip]
     if all_mods.count(None) == len(all_mods):
       raise errors.OpPrereqError("Please pass at least one modification",
                                  errors.ECODE_INVAL)
@@ -3838,9 +4046,17 @@ class LUSetNodeParams(LogicalUnit):
     # Boolean value that tells us whether we might be demoting from MC
     self.might_demote = (self.op.master_candidate == False or
                          self.op.offline == True or
-                         self.op.drained == True)
+                         self.op.drained == True or
+                         self.op.master_capable == False)
+
+    if self.op.secondary_ip:
+      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
+        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+                                   " address" % self.op.secondary_ip,
+                                   errors.ECODE_INVAL)
 
     self.lock_all = self.op.auto_promote and self.might_demote
+    self.lock_instances = self.op.secondary_ip is not None
 
   def ExpandNames(self):
     if self.lock_all:
@@ -3848,6 +4064,29 @@ class LUSetNodeParams(LogicalUnit):
     else:
       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
+    if self.lock_instances:
+      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+
+  def DeclareLocks(self, level):
+    # If we have locked all instances, before waiting to lock nodes, release
+    # all the ones living on nodes unrelated to the current operation.
+    if level == locking.LEVEL_NODE and self.lock_instances:
+      instances_release = []
+      instances_keep = []
+      self.affected_instances = []
+      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+          instance = self.context.cfg.GetInstanceInfo(instance_name)
+          i_mirrored = instance.disk_template in constants.DTS_NET_MIRROR
+          if i_mirrored and self.op.node_name in instance.all_nodes:
+            instances_keep.append(instance_name)
+            self.affected_instances.append(instance)
+          else:
+            instances_release.append(instance_name)
+        if instances_release:
+          self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
+          self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -3859,6 +4098,8 @@ class LUSetNodeParams(LogicalUnit):
       "MASTER_CANDIDATE": str(self.op.master_candidate),
       "OFFLINE": str(self.op.offline),
       "DRAINED": str(self.op.drained),
+      "MASTER_CAPABLE": str(self.op.master_capable),
+      "VM_CAPABLE": str(self.op.vm_capable),
       }
     nl = [self.cfg.GetMasterNode(),
           self.op.node_name]
@@ -3881,6 +4122,17 @@ class LUSetNodeParams(LogicalUnit):
                                    " only via master-failover",
                                    errors.ECODE_INVAL)
 
+    if self.op.master_candidate and not node.master_capable:
+      raise errors.OpPrereqError("Node %s is not master capable, cannot make"
+                                 " it a master candidate" % node.name,
+                                 errors.ECODE_STATE)
+
+    if self.op.vm_capable == False:
+      (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
+      if ipri or isec:
+        raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
+                                   " the vm_capable flag" % node.name,
+                                   errors.ECODE_STATE)
 
     if node.master_candidate and self.might_demote and not self.lock_all:
       assert not self.op.auto_promote, "auto-promote set but lock_all not"
@@ -3896,7 +4148,7 @@ class LUSetNodeParams(LogicalUnit):
     self.old_flags = old_flags = (node.master_candidate,
                                   node.drained, node.offline)
     assert old_flags in self._F2R, "Un-handled old flags  %s" % str(old_flags)
-    self.old_role = self._F2R[old_flags]
+    self.old_role = old_role = self._F2R[old_flags]
 
     # Check for ineffective changes
     for attr in self._FLAGS:
@@ -3908,21 +4160,19 @@ class LUSetNodeParams(LogicalUnit):
     # away from the respective state, as only real changes are kept
 
     # If we're being deofflined/drained, we'll MC ourself if needed
-    if self.op.drained == False or self.op.offline == False:
+    if (self.op.drained == False or self.op.offline == False or
+        (self.op.master_capable and not node.master_capable)):
       if _DecideSelfPromotion(self):
         self.op.master_candidate = True
         self.LogInfo("Auto-promoting node to master candidate")
 
-  def Exec(self, feedback_fn):
-    """Modifies a node.
-
-    """
-    node = self.node
-    old_role = self.old_role
+    # If we're no longer master capable, we'll demote ourselves from MC
+    if self.op.master_capable == False and node.master_candidate:
+      self.LogInfo("Demoting from master candidate")
+      self.op.master_candidate = False
 
+    # Compute new role
     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
-
-    # compute new flags
     if self.op.master_candidate:
       new_role = self._ROLE_CANDIDATE
     elif self.op.drained:
@@ -3936,31 +4186,93 @@ class LUSetNodeParams(LogicalUnit):
     else: # no new flags, nothing, keep old role
       new_role = old_role
 
+    self.new_role = new_role
+
+    if old_role == self._ROLE_OFFLINE and new_role != old_role:
+      # Trying to transition out of offline status
+      result = self.rpc.call_version([node.name])[node.name]
+      if result.fail_msg:
+        raise errors.OpPrereqError("Node %s is being de-offlined but fails"
+                                   " to report its version: %s" %
+                                   (node.name, result.fail_msg),
+                                   errors.ECODE_STATE)
+      else:
+        self.LogWarning("Transitioning node from offline to online state"
+                        " without using re-add. Please make sure the node"
+                        " is healthy!")
+
+    if self.op.secondary_ip:
+      # Ok even without locking, because this can't be changed by any LU
+      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
+      master_singlehomed = master.secondary_ip == master.primary_ip
+      if master_singlehomed and self.op.secondary_ip:
+        raise errors.OpPrereqError("Cannot change the secondary ip on a single"
+                                   " homed cluster", errors.ECODE_INVAL)
+
+      if node.offline:
+        if self.affected_instances:
+          raise errors.OpPrereqError("Cannot change secondary ip: offline"
+                                     " node has instances (%s) configured"
+                                     " to use it" % self.affected_instances)
+      else:
+        # On online nodes, check that no instances are running, and that
+        # the node has the new ip and we can reach it.
+        for instance in self.affected_instances:
+          _CheckInstanceDown(self, instance, "cannot change secondary ip")
+
+        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
+        if master.name != node.name:
+          # check reachability from master secondary ip to new secondary ip
+          if not netutils.TcpPing(self.op.secondary_ip,
+                                  constants.DEFAULT_NODED_PORT,
+                                  source=master.secondary_ip):
+            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
+                                       " based ping to node daemon port",
+                                       errors.ECODE_ENVIRON)
+
+  def Exec(self, feedback_fn):
+    """Modifies a node.
+
+    """
+    node = self.node
+    old_role = self.old_role
+    new_role = self.new_role
+
     result = []
-    changed_mc = [old_role, new_role].count(self._ROLE_CANDIDATE) == 1
 
-    # Tell the node to demote itself, if no longer MC and not offline
-    if (old_role == self._ROLE_CANDIDATE and
-        new_role != self._ROLE_OFFLINE and new_role != old_role):
-      msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
-      if msg:
-        self.LogWarning("Node failed to demote itself: %s", msg)
+    for attr in ["master_capable", "vm_capable"]:
+      val = getattr(self.op, attr)
+      if val is not None:
+        setattr(node, attr, val)
+        result.append((attr, str(val)))
 
-    new_flags = self._R2F[new_role]
-    for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
-      if of != nf:
-        result.append((desc, str(nf)))
-    (node.master_candidate, node.drained, node.offline) = new_flags
+    if new_role != old_role:
+      # Tell the node to demote itself, if no longer MC and not offline
+      if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
+        msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
+        if msg:
+          self.LogWarning("Node failed to demote itself: %s", msg)
 
-    # we locked all nodes, we adjust the CP before updating this node
-    if self.lock_all:
-      _AdjustCandidatePool(self, [node.name])
+      new_flags = self._R2F[new_role]
+      for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
+        if of != nf:
+          result.append((desc, str(nf)))
+      (node.master_candidate, node.drained, node.offline) = new_flags
+
+      # we locked all nodes, we adjust the CP before updating this node
+      if self.lock_all:
+        _AdjustCandidatePool(self, [node.name])
+
+    if self.op.secondary_ip:
+      node.secondary_ip = self.op.secondary_ip
+      result.append(("secondary_ip", self.op.secondary_ip))
 
     # this will trigger configuration file update, if needed
     self.cfg.Update(node, feedback_fn)
 
-    # this will trigger job queue propagation or cleanup
-    if changed_mc:
+    # this will trigger job queue propagation or cleanup if the mc
+    # flag changed
+    if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
       self.context.ReaddNode(node)
 
     return result
@@ -4708,7 +5020,11 @@ class LUReinstallInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
+                     " offline, cannot reinstall")
+    for node in instance.secondary_nodes:
+      _CheckNodeOnline(self, node, "Instance secondary node offline,"
+                       " cannot reinstall")
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
@@ -5593,6 +5909,7 @@ class LUMoveInstance(LogicalUnit):
 
     _CheckNodeOnline(self, target_node)
     _CheckNodeNotDrained(self, target_node)
+    _CheckNodeVmCapable(self, target_node)
 
     if instance.admin_up:
       # check memory requirements on the secondary node
@@ -6543,6 +6860,8 @@ class LUCreateInstance(LogicalUnit):
     ("source_handshake", None, ht.TOr(ht.TList, ht.TNone)),
     ("source_x509_ca", None, ht.TMaybeString),
     ("source_instance_name", None, ht.TMaybeString),
+    ("source_shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
+     ht.TPositiveInt),
     ("src_node", None, ht.TMaybeString),
     ("src_path", None, ht.TMaybeString),
     ("pnode", None, ht.TMaybeString),
@@ -7181,6 +7500,9 @@ class LUCreateInstance(LogicalUnit):
     if pnode.drained:
       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
                                  pnode.name, errors.ECODE_STATE)
+    if not pnode.vm_capable:
+      raise errors.OpPrereqError("Cannot use non-vm_capable primary node"
+                                 " '%s'" % pnode.name, errors.ECODE_STATE)
 
     self.secondaries = []
 
@@ -7191,6 +7513,7 @@ class LUCreateInstance(LogicalUnit):
                                    " primary node.", errors.ECODE_INVAL)
       _CheckNodeOnline(self, self.op.snode)
       _CheckNodeNotDrained(self, self.op.snode)
+      _CheckNodeVmCapable(self, self.op.snode)
       self.secondaries.append(self.op.snode)
 
     nodenames = [pnode.name] + self.secondaries
@@ -7406,7 +7729,11 @@ class LUCreateInstance(LogicalUnit):
 
       elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
         feedback_fn("* preparing remote import...")
-        connect_timeout = constants.RIE_CONNECT_TIMEOUT
+        # The source cluster will stop the instance before attempting to make a
+        # connection. In some cases stopping an instance can take a long time,
+        # hence the shutdown timeout is added to the connection timeout.
+        connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+                           self.op.source_shutdown_timeout)
         timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
 
         disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
@@ -7772,6 +8099,7 @@ class TLReplaceDisks(Tasklet):
         check_nodes = [self.new_node, self.other_node]
 
         _CheckNodeNotDrained(self.lu, remote_node)
+        _CheckNodeVmCapable(self.lu, remote_node)
 
         old_node_info = self.cfg.GetNodeInfo(secondary_node)
         assert old_node_info is not None
@@ -9413,10 +9741,6 @@ class LUExportInstance(LogicalUnit):
     self.x509_key_name = self.op.x509_key_name
     self.dest_x509_ca_pem = self.op.destination_x509_ca
 
-    if self.op.remove_instance and not self.op.shutdown:
-      raise errors.OpPrereqError("Can not remove instance without shutting it"
-                                 " down before")
-
     if self.op.mode == constants.EXPORT_MODE_REMOTE:
       if not self.x509_key_name:
         raise errors.OpPrereqError("Missing X509 key name for encryption",
@@ -9482,6 +9806,11 @@ class LUExportInstance(LogicalUnit):
           "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
 
+    if (self.op.remove_instance and self.instance.admin_up and
+        not self.op.shutdown):
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before")
+
     if self.op.mode == constants.EXPORT_MODE_LOCAL:
       self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
       self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
@@ -10242,6 +10571,8 @@ class IAllocator(object):
         "drained": ninfo.drained,
         "master_candidate": ninfo.master_candidate,
         "group": ninfo.group,
+        "master_capable": ninfo.master_capable,
+        "vm_capable": ninfo.vm_capable,
         }
 
       if not (ninfo.offline or ninfo.drained):