Wait for a while in failed resyncs
[ganeti-local] / lib / cmdlib.py
index 0e2e184..87f1452 100644 (file)
@@ -25,7 +25,6 @@
 
 import os
 import os.path
-import sha
 import time
 import tempfile
 import re
@@ -454,7 +453,8 @@ def _CheckNodeNotDrained(lu, node):
 
 
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
-                          memory, vcpus, nics, disk_template, disks):
+                          memory, vcpus, nics, disk_template, disks,
+                          bep, hvp, hypervisor):
   """Builds instance related env variables for hooks
 
   This builds the hook environment from individual variables.
@@ -480,6 +480,12 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @param disk_template: the distk template of the instance
   @type disks: list
   @param disks: the list of (size, mode) pairs
+  @type bep: dict
+  @param bep: the backend parameters for the instance
+  @type hvp: dict
+  @param hvp: the hypervisor parameters for the instance
+  @type hypervisor: string
+  @param hypervisor: the hypervisor for the instance
   @rtype: dict
   @return: the hook environment for this instance
 
@@ -498,6 +504,7 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
     "INSTANCE_MEMORY": memory,
     "INSTANCE_VCPUS": vcpus,
     "INSTANCE_DISK_TEMPLATE": disk_template,
+    "INSTANCE_HYPERVISOR": hypervisor,
   }
 
   if nics:
@@ -523,6 +530,10 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
 
   env["INSTANCE_DISK_COUNT"] = disk_count
 
+  for source, kind in [(bep, "BE"), (hvp, "HV")]:
+    for key, value in source.items():
+      env["INSTANCE_%s_%s" % (kind, key)] = value
+
   return env
 
 
@@ -541,7 +552,9 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
   @return: the hook environment dictionary
 
   """
-  bep = lu.cfg.GetClusterInfo().FillBE(instance)
+  cluster = lu.cfg.GetClusterInfo()
+  bep = cluster.FillBE(instance)
+  hvp = cluster.FillHV(instance)
   args = {
     'name': instance.name,
     'primary_node': instance.primary_node,
@@ -553,6 +566,9 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
     'disk_template': instance.disk_template,
     'disks': [(disk.size, disk.mode) for disk in instance.disks],
+    'bep': bep,
+    'hvp': hvp,
+    'hypervisor': instance.hypervisor,
   }
   if override:
     args.update(override)
@@ -1418,7 +1434,7 @@ class LUSetClusterParams(LogicalUnit):
   _OP_REQP = []
   REQ_BGL = False
 
-  def CheckParameters(self):
+  def CheckArguments(self):
     """Check parameters
 
     """
@@ -1427,7 +1443,7 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.candidate_pool_size is not None:
       try:
         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
-      except ValueError, err:
+      except (ValueError, TypeError), err:
         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
                                    str(err))
       if self.op.candidate_pool_size < 1:
@@ -1524,8 +1540,11 @@ class LUSetClusterParams(LogicalUnit):
 
     """
     if self.op.vg_name is not None:
-      if self.op.vg_name != self.cfg.GetVGName():
-        self.cfg.SetVGName(self.op.vg_name)
+      new_volume = self.op.vg_name
+      if not new_volume:
+        new_volume = None
+      if new_volume != self.cfg.GetVGName():
+        self.cfg.SetVGName(new_volume)
       else:
         feedback_fn("Cluster LVM configuration already in desired"
                     " state, not changing")
@@ -1589,6 +1608,7 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
     lu.cfg.SetDiskID(dev, node)
 
   retries = 0
+  degr_retries = 10 # in seconds, as we sleep 1 second each time
   while True:
     max_time = 0
     done = True
@@ -1621,6 +1641,16 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
           rem_time = "no time estimate"
         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
                         (instance.disks[i].iv_name, perc_done, rem_time))
+
+    # if we're done but degraded, let's do a few small retries, to
+    # make sure we see a stable and not transient situation; therefore
+    # we force restart of the loop
+    if (done or oneshot) and cumul_degraded and degr_retries > 0:
+      logging.info("Degraded disks found, %d retries left", degr_retries)
+      degr_retries -= 1
+      time.sleep(1)
+      continue
+
     if done or oneshot:
       break
 
@@ -2441,6 +2471,10 @@ class LUQueryClusterInfo(NoHooksLU):
                         for hypervisor in cluster.enabled_hypervisors]),
       "beparams": cluster.beparams,
       "candidate_pool_size": cluster.candidate_pool_size,
+      "default_bridge": cluster.default_bridge,
+      "master_netdev": cluster.master_netdev,
+      "volume_group_name": cluster.volume_group_name,
+      "file_storage_dir": cluster.file_storage_dir,
       }
 
     return result
@@ -2755,15 +2789,48 @@ class LUStartupInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    # extra beparams
+    self.beparams = getattr(self.op, "beparams", {})
+    if self.beparams:
+      if not isinstance(self.beparams, dict):
+        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
+                                   " dict" % (type(self.beparams), ))
+      # fill the beparams dict
+      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
+      self.op.beparams = self.beparams
+
+    # extra hvparams
+    self.hvparams = getattr(self.op, "hvparams", {})
+    if self.hvparams:
+      if not isinstance(self.hvparams, dict):
+        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
+                                   " dict" % (type(self.hvparams), ))
+
+      # check hypervisor parameter syntax (locally)
+      cluster = self.cfg.GetClusterInfo()
+      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
+      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
+                                    instance.hvparams)
+      filled_hvp.update(self.hvparams)
+      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
+      hv_type.CheckParameterSyntax(filled_hvp)
+      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
+      self.op.hvparams = self.hvparams
+
     _CheckNodeOnline(self, instance.primary_node)
 
     bep = self.cfg.GetClusterInfo().FillBE(instance)
     # check bridges existance
     _CheckInstanceBridgesExist(self, instance)
 
-    _CheckNodeFreeMemory(self, instance.primary_node,
-                         "starting instance %s" % instance.name,
-                         bep[constants.BE_MEMORY], instance.hypervisor)
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    remote_info.Raise()
+    if not remote_info.data:
+      _CheckNodeFreeMemory(self, instance.primary_node,
+                           "starting instance %s" % instance.name,
+                           bep[constants.BE_MEMORY], instance.hypervisor)
 
   def Exec(self, feedback_fn):
     """Start the instance.
@@ -2778,7 +2845,8 @@ class LUStartupInstance(LogicalUnit):
 
     _StartInstanceDisks(self, instance, force)
 
-    result = self.rpc.call_instance_start(node_current, instance)
+    result = self.rpc.call_instance_start(node_current, instance,
+                                          self.hvparams, self.beparams)
     msg = result.RemoteFailMsg()
     if msg:
       _ShutdownInstanceDisks(self, instance)
@@ -2860,7 +2928,7 @@ class LURebootInstance(LogicalUnit):
                                  " full reboot: %s" % msg)
       _ShutdownInstanceDisks(self, instance)
       _StartInstanceDisks(self, instance, ignore_secondaries)
-      result = self.rpc.call_instance_start(node_current, instance)
+      result = self.rpc.call_instance_start(node_current, instance, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -2960,7 +3028,8 @@ class LUReinstallInstance(LogicalUnit):
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
-    if remote_info.failed or remote_info.data:
+    remote_info.Raise()
+    if remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -3478,10 +3547,15 @@ class LUFailoverInstance(LogicalUnit):
     target_node = secondary_nodes[0]
     _CheckNodeOnline(self, target_node)
     _CheckNodeNotDrained(self, target_node)
-    # check memory requirements on the secondary node
-    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
-                         instance.name, bep[constants.BE_MEMORY],
-                         instance.hypervisor)
+
+    if instance.admin_up:
+      # check memory requirements on the secondary node
+      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
+                           instance.name, bep[constants.BE_MEMORY],
+                           instance.hypervisor)
+    else:
+      self.LogInfo("Not checking memory on the secondary node as"
+                   " instance will not be started")
 
     # check bridge existance
     brlist = [nic.bridge for nic in instance.nics]
@@ -3550,7 +3624,7 @@ class LUFailoverInstance(LogicalUnit):
         raise errors.OpExecError("Can't activate the instance's disks")
 
       feedback_fn("* starting the instance on the target node")
-      result = self.rpc.call_instance_start(target_node, instance)
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -4234,8 +4308,8 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
       continue
     msg = info.RemoteFailMsg()
     if msg:
-      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
-                                 " %s" % msg)
+      raise errors.OpPrereqError("Hypervisor parameter validation"
+                                 " failed on node %s: %s" % (node, msg))
 
 
 class LUCreateInstance(LogicalUnit):
@@ -4300,6 +4374,7 @@ class LUCreateInstance(LogicalUnit):
                                   self.op.hvparams)
     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
     hv_type.CheckParameterSyntax(filled_hvp)
+    self.hv_full = filled_hvp
 
     # fill and remember the beparams dict
     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
@@ -4477,6 +4552,9 @@ class LUCreateInstance(LogicalUnit):
       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
       disk_template=self.op.disk_template,
       disks=[(d["size"], d["mode"]) for d in self.disks],
+      bep=self.be_full,
+      hvp=self.hv_full,
+      hypervisor=self.op.hypervisor,
     ))
 
     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
@@ -4794,7 +4872,7 @@ class LUCreateInstance(LogicalUnit):
       self.cfg.Update(iobj)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      result = self.rpc.call_instance_start(pnode_name, iobj)
+      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         raise errors.OpExecError("Could not start instance: %s" % msg)
@@ -5898,7 +5976,7 @@ class LUSetInstanceParams(LogicalUnit):
         self.warn.append("Can't get info from primary node %s" % pnode)
       else:
         if not instance_info.failed and instance_info.data:
-          current_mem = instance_info.data['memory']
+          current_mem = int(instance_info.data['memory'])
         else:
           # Assume instance not running
           # (there is a slight race condition here, but it's not very probable,
@@ -6226,12 +6304,12 @@ class LUExportInstance(LogicalUnit):
       self.cfg.SetDiskID(disk, src_node)
 
     try:
-      for disk in instance.disks:
+      for idx, disk in enumerate(instance.disks):
         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
         if new_dev_name.failed or not new_dev_name.data:
-          self.LogWarning("Could not snapshot block device %s on node %s",
-                          disk.logical_id[1], src_node)
+          self.LogWarning("Could not snapshot disk/%d on node %s",
+                          idx, src_node)
           snap_disks.append(False)
         else:
           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
@@ -6242,7 +6320,7 @@ class LUExportInstance(LogicalUnit):
 
     finally:
       if self.op.shutdown and instance.admin_up:
-        result = self.rpc.call_instance_start(src_node, instance)
+        result = self.rpc.call_instance_start(src_node, instance, None, None)
         msg = result.RemoteFailMsg()
         if msg:
           _ShutdownInstanceDisks(self, instance)
@@ -6256,13 +6334,12 @@ class LUExportInstance(LogicalUnit):
         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
                                                instance, cluster_name, idx)
         if result.failed or not result.data:
-          self.LogWarning("Could not export block device %s from node %s to"
-                          " node %s", dev.logical_id[1], src_node,
-                          dst_node.name)
+          self.LogWarning("Could not export disk/%d from node %s to"
+                          " node %s", idx, src_node, dst_node.name)
         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
         if msg:
-          self.LogWarning("Could not remove snapshot block device %s from node"
-                          " %s: %s", dev.logical_id[1], src_node, msg)
+          self.LogWarning("Could not remove snapshot for disk/%d from node"
+                          " %s: %s", idx, src_node, msg)
 
     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
     if result.failed or not result.data:
@@ -6711,6 +6788,8 @@ class IAllocator(object):
         "disk_template": iinfo.disk_template,
         "hypervisor": iinfo.hypervisor,
         }
+      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
+                                                 pir["disks"])
       instance_data[iinfo.name] = pir
 
     data["instances"] = instance_data