Improve import/export timeout settings
[ganeti-local] / lib / cmdlib.py
index f089b16..5a2ca68 100644 (file)
@@ -39,6 +39,7 @@ import OpenSSL
 import socket
 import tempfile
 import shutil
+import itertools
 
 from ganeti import ssh
 from ganeti import utils
@@ -593,17 +594,19 @@ def _CheckGlobalHvParams(params):
     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
 
 
-def _CheckNodeOnline(lu, node):
+def _CheckNodeOnline(lu, node, msg=None):
   """Ensure that a given node is online.
 
   @param lu: the LU on behalf of which we make the check
   @param node: the node to check
+  @param msg: if passed, should be a message to replace the default one
   @raise errors.OpPrereqError: if the node is offline
 
   """
+  if msg is None:
+    msg = "Can't use offline node"
   if lu.cfg.GetNodeInfo(node).offline:
-    raise errors.OpPrereqError("Can't use offline node %s" % node,
-                               errors.ECODE_STATE)
+    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
 
 
 def _CheckNodeNotDrained(lu, node):
@@ -650,6 +653,33 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant):
     _CheckOSVariant(result.payload, os_name)
 
 
+def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
+  """Ensure that a node has the given secondary ip.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the LU on behalf of which we make the check
+  @type node: string
+  @param node: the node to check
+  @type secondary_ip: string
+  @param secondary_ip: the ip to check
+  @type prereq: boolean
+  @param prereq: whether to throw a prerequisite or an execute error
+  @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
+  @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
+
+  """
+  result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
+  result.Raise("Failure checking secondary ip on node %s" % node,
+               prereq=prereq, ecode=errors.ECODE_ENVIRON)
+  if not result.payload:
+    msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
+           " please fix and re-run this command" % secondary_ip)
+    if prereq:
+      raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
+    else:
+      raise errors.OpExecError(msg)
+
+
 def _RequireFileStorage():
   """Checks that file storage is enabled.
 
@@ -1512,15 +1542,17 @@ class LUVerifyCluster(LogicalUnit):
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
 
-    diskdata = [(nname, disk, idx)
+    diskdata = [(nname, success, status, idx)
                 for (nname, disks) in diskstatus.items()
-                for idx, disk in enumerate(disks)]
+                for idx, (success, status) in enumerate(disks)]
 
-    for nname, bdev_status, idx in diskdata:
-      _ErrorIf(not bdev_status,
+    for nname, success, bdev_status, idx in diskdata:
+      _ErrorIf(instanceconfig.admin_up and not success,
                self.EINSTANCEFAULTYDISK, instance,
-               "couldn't retrieve status for disk/%s on %s", idx, nname)
-      _ErrorIf(bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY,
+               "couldn't retrieve status for disk/%s on %s: %s",
+               idx, nname, bdev_status)
+      _ErrorIf((instanceconfig.admin_up and success and
+                bdev_status.ldisk_status == constants.LDS_FAULTY),
                self.EINSTANCEFAULTYDISK, instance,
                "disk/%s on %s is faulty", idx, nname)
 
@@ -1883,18 +1915,26 @@ class LUVerifyCluster(LogicalUnit):
     @param node_image: Node objects
     @type instanceinfo: dict of (name, L{objects.Instance})
     @param instanceinfo: Instance objects
+    @rtype: {instance: {node: [(succes, payload)]}}
+    @return: a dictionary of per-instance dictionaries with nodes as
+        keys and disk information as values; the disk information is a
+        list of tuples (success, payload)
 
     """
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
 
     node_disks = {}
     node_disks_devonly = {}
+    diskless_instances = set()
+    diskless = constants.DT_DISKLESS
 
     for nname in nodelist:
+      node_instances = list(itertools.chain(node_image[nname].pinst,
+                                            node_image[nname].sinst))
+      diskless_instances.update(inst for inst in node_instances
+                                if instanceinfo[inst].disk_template == diskless)
       disks = [(inst, disk)
-               for instlist in [node_image[nname].pinst,
-                                node_image[nname].sinst]
-               for inst in instlist
+               for inst in node_instances
                for disk in instanceinfo[inst].disks]
 
       if not disks:
@@ -1923,28 +1963,43 @@ class LUVerifyCluster(LogicalUnit):
     instdisk = {}
 
     for (nname, nres) in result.items():
-      if nres.offline:
-        # Ignore offline node
-        continue
-
       disks = node_disks[nname]
 
-      msg = nres.fail_msg
-      _ErrorIf(msg, self.ENODERPC, nname,
-               "while getting disk information: %s", nres.fail_msg)
-      if msg:
+      if nres.offline:
         # No data from this node
-        data = len(disks) * [None]
+        data = len(disks) * [(False, "node offline")]
       else:
-        data = nres.payload
+        msg = nres.fail_msg
+        _ErrorIf(msg, self.ENODERPC, nname,
+                 "while getting disk information: %s", msg)
+        if msg:
+          # No data from this node
+          data = len(disks) * [(False, msg)]
+        else:
+          data = []
+          for idx, i in enumerate(nres.payload):
+            if isinstance(i, (tuple, list)) and len(i) == 2:
+              data.append(i)
+            else:
+              logging.warning("Invalid result from node %s, entry %d: %s",
+                              nname, idx, i)
+              data.append((False, "Invalid result from the remote node"))
 
       for ((inst, _), status) in zip(disks, data):
         instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
 
+    # Add empty entries for diskless instances.
+    for inst in diskless_instances:
+      assert inst not in instdisk
+      instdisk[inst] = {}
+
     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
-                      len(nnames) <= len(instanceinfo[inst].all_nodes)
+                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
+                      compat.all(isinstance(s, (tuple, list)) and
+                                 len(s) == 2 for s in statuses)
                       for inst, nnames in instdisk.items()
                       for nname, statuses in nnames.items())
+    assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
 
     return instdisk
 
@@ -2546,8 +2601,7 @@ class LUSetClusterParams(LogicalUnit):
             ht.TNone)),
     ("hvparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
                               ht.TNone)),
-    ("beparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
-                              ht.TNone)),
+    ("beparams", None, ht.TOr(ht.TDict, ht.TNone)),
     ("os_hvp", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
                             ht.TNone)),
     ("osparams", None, ht.TOr(ht.TDictOf(ht.TNonEmptyString, ht.TDict),
@@ -2854,14 +2908,14 @@ class LUSetClusterParams(LogicalUnit):
       for key, val in mods:
         if key == constants.DDM_ADD:
           if val in lst:
-            feedback_fn("OS %s already in %s, ignoring", val, desc)
+            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
           else:
             lst.append(val)
         elif key == constants.DDM_REMOVE:
           if val in lst:
             lst.remove(val)
           else:
-            feedback_fn("OS %s not found in %s, ignoring", val, desc)
+            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
         else:
           raise errors.ProgrammerError("Invalid modification '%s'" % key)
 
@@ -3814,11 +3868,11 @@ class LUAddNode(LogicalUnit):
     newbie_singlehomed = secondary_ip == primary_ip
     if master_singlehomed != newbie_singlehomed:
       if master_singlehomed:
-        raise errors.OpPrereqError("The master has no private ip but the"
+        raise errors.OpPrereqError("The master has no secondary ip but the"
                                    " new node has one",
                                    errors.ECODE_INVAL)
       else:
-        raise errors.OpPrereqError("The master has a private ip but the"
+        raise errors.OpPrereqError("The master has a secondary ip but the"
                                    " new node doesn't have one",
                                    errors.ECODE_INVAL)
 
@@ -3832,7 +3886,7 @@ class LUAddNode(LogicalUnit):
       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
                            source=myself.secondary_ip):
         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
-                                   " based ping to noded port",
+                                   " based ping to node daemon port",
                                    errors.ECODE_ENVIRON)
 
     if self.op.readd:
@@ -3904,14 +3958,8 @@ class LUAddNode(LogicalUnit):
       result.Raise("Can't update hosts file with new host data")
 
     if new_node.secondary_ip != new_node.primary_ip:
-      result = self.rpc.call_node_has_ip_address(new_node.name,
-                                                 new_node.secondary_ip)
-      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
-                   prereq=True, ecode=errors.ECODE_ENVIRON)
-      if not result.payload:
-        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
-                                 " you gave (%s). Please fix and re-run this"
-                                 " command." % new_node.secondary_ip)
+      _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
+                               False)
 
     node_verify_list = [self.cfg.GetMasterNode()]
     node_verify_param = {
@@ -3968,6 +4016,7 @@ class LUSetNodeParams(LogicalUnit):
     ("auto_promote", False, ht.TBool),
     ("master_capable", None, ht.TMaybeBool),
     ("vm_capable", None, ht.TMaybeBool),
+    ("secondary_ip", None, ht.TMaybeString),
     _PForce,
     ]
   REQ_BGL = False
@@ -3984,7 +4033,8 @@ class LUSetNodeParams(LogicalUnit):
   def CheckArguments(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
-                self.op.master_capable, self.op.vm_capable]
+                self.op.master_capable, self.op.vm_capable,
+                self.op.secondary_ip]
     if all_mods.count(None) == len(all_mods):
       raise errors.OpPrereqError("Please pass at least one modification",
                                  errors.ECODE_INVAL)
@@ -3999,7 +4049,14 @@ class LUSetNodeParams(LogicalUnit):
                          self.op.drained == True or
                          self.op.master_capable == False)
 
+    if self.op.secondary_ip:
+      if not netutils.IP4Address.IsValid(self.op.secondary_ip):
+        raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
+                                   " address" % self.op.secondary_ip,
+                                   errors.ECODE_INVAL)
+
     self.lock_all = self.op.auto_promote and self.might_demote
+    self.lock_instances = self.op.secondary_ip is not None
 
   def ExpandNames(self):
     if self.lock_all:
@@ -4007,6 +4064,29 @@ class LUSetNodeParams(LogicalUnit):
     else:
       self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
+    if self.lock_instances:
+      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+
+  def DeclareLocks(self, level):
+    # If we have locked all instances, before waiting to lock nodes, release
+    # all the ones living on nodes unrelated to the current operation.
+    if level == locking.LEVEL_NODE and self.lock_instances:
+      instances_release = []
+      instances_keep = []
+      self.affected_instances = []
+      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+        for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+          instance = self.context.cfg.GetInstanceInfo(instance_name)
+          i_mirrored = instance.disk_template in constants.DTS_NET_MIRROR
+          if i_mirrored and self.op.node_name in instance.all_nodes:
+            instances_keep.append(instance_name)
+            self.affected_instances.append(instance)
+          else:
+            instances_release.append(instance_name)
+        if instances_release:
+          self.context.glm.release(locking.LEVEL_INSTANCE, instances_release)
+          self.acquired_locks[locking.LEVEL_INSTANCE] = instances_keep
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -4121,6 +4201,35 @@ class LUSetNodeParams(LogicalUnit):
                         " without using re-add. Please make sure the node"
                         " is healthy!")
 
+    if self.op.secondary_ip:
+      # Ok even without locking, because this can't be changed by any LU
+      master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
+      master_singlehomed = master.secondary_ip == master.primary_ip
+      if master_singlehomed and self.op.secondary_ip:
+        raise errors.OpPrereqError("Cannot change the secondary ip on a single"
+                                   " homed cluster", errors.ECODE_INVAL)
+
+      if node.offline:
+        if self.affected_instances:
+          raise errors.OpPrereqError("Cannot change secondary ip: offline"
+                                     " node has instances (%s) configured"
+                                     " to use it" % self.affected_instances)
+      else:
+        # On online nodes, check that no instances are running, and that
+        # the node has the new ip and we can reach it.
+        for instance in self.affected_instances:
+          _CheckInstanceDown(self, instance, "cannot change secondary ip")
+
+        _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
+        if master.name != node.name:
+          # check reachability from master secondary ip to new secondary ip
+          if not netutils.TcpPing(self.op.secondary_ip,
+                                  constants.DEFAULT_NODED_PORT,
+                                  source=master.secondary_ip):
+            raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
+                                       " based ping to node daemon port",
+                                       errors.ECODE_ENVIRON)
+
   def Exec(self, feedback_fn):
     """Modifies a node.
 
@@ -4154,6 +4263,10 @@ class LUSetNodeParams(LogicalUnit):
       if self.lock_all:
         _AdjustCandidatePool(self, [node.name])
 
+    if self.op.secondary_ip:
+      node.secondary_ip = self.op.secondary_ip
+      result.append(("secondary_ip", self.op.secondary_ip))
+
     # this will trigger configuration file update, if needed
     self.cfg.Update(node, feedback_fn)
 
@@ -4907,7 +5020,11 @@ class LUReinstallInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, instance.primary_node)
+    _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
+                     " offline, cannot reinstall")
+    for node in instance.secondary_nodes:
+      _CheckNodeOnline(self, node, "Instance secondary node offline,"
+                       " cannot reinstall")
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
@@ -6743,6 +6860,8 @@ class LUCreateInstance(LogicalUnit):
     ("source_handshake", None, ht.TOr(ht.TList, ht.TNone)),
     ("source_x509_ca", None, ht.TMaybeString),
     ("source_instance_name", None, ht.TMaybeString),
+    ("source_shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT,
+     ht.TPositiveInt),
     ("src_node", None, ht.TMaybeString),
     ("src_path", None, ht.TMaybeString),
     ("pnode", None, ht.TMaybeString),
@@ -7610,7 +7729,11 @@ class LUCreateInstance(LogicalUnit):
 
       elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
         feedback_fn("* preparing remote import...")
-        connect_timeout = constants.RIE_CONNECT_TIMEOUT
+        # The source cluster will stop the instance before attempting to make a
+        # connection. In some cases stopping an instance can take a long time,
+        # hence the shutdown timeout is added to the connection timeout.
+        connect_timeout = (constants.RIE_CONNECT_TIMEOUT +
+                           self.op.source_shutdown_timeout)
         timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
 
         disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,