Add utils.IsNormAbsPath function
[ganeti-local] / lib / cmdlib.py
index 6bdc263..27dfe5f 100644 (file)
@@ -25,7 +25,6 @@
 
 import os
 import os.path
-import sha
 import time
 import tempfile
 import re
@@ -782,12 +781,13 @@ class LUVerifyCluster(LogicalUnit):
       else:
         for minor, (iname, must_exist) in drbd_map.items():
           if minor not in used_minors and must_exist:
-            feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
-                        (minor, iname))
+            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
+                        " not active" % (minor, iname))
             bad = True
         for minor in used_minors:
           if minor not in drbd_map:
-            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
+            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
+                        minor)
             bad = True
 
     return bad
@@ -915,8 +915,12 @@ class LUVerifyCluster(LogicalUnit):
 
     """
     all_nodes = self.cfg.GetNodeList()
-    # TODO: populate the environment with useful information for verify hooks
-    env = {}
+    env = {
+      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
+      }
+    for node in self.cfg.GetAllNodesInfo().values():
+      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
+
     return env, [], all_nodes
 
   def Exec(self, feedback_fn):
@@ -1006,8 +1010,16 @@ class LUVerifyCluster(LogicalUnit):
 
       node_drbd = {}
       for minor, instance in all_drbd_map[node].items():
-        instance = instanceinfo[instance]
-        node_drbd[minor] = (instance.name, instance.admin_up)
+        if instance not in instanceinfo:
+          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
+                      instance)
+          # ghost instance should not be running, but otherwise we
+          # don't give double warnings (both ghost instance and
+          # unallocated minor in use)
+          node_drbd[minor] = (instance, False)
+        else:
+          instance = instanceinfo[instance]
+          node_drbd[minor] = (instance.name, instance.admin_up)
       result = self._VerifyNode(node_i, file_names, local_checksums,
                                 nresult, feedback_fn, master_files,
                                 node_drbd, vg_name)
@@ -1060,9 +1072,17 @@ class LUVerifyCluster(LogicalUnit):
         }
         # FIXME: devise a free space model for file based instances as well
         if vg_name is not None:
+          if (constants.NV_VGLIST not in nresult or
+              vg_name not in nresult[constants.NV_VGLIST]):
+            feedback_fn("  - ERROR: node %s didn't return data for the"
+                        " volume group '%s' - it is either missing or broken" %
+                        (node, vg_name))
+            bad = True
+            continue
           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
-      except ValueError:
-        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
+      except (ValueError, KeyError):
+        feedback_fn("  - ERROR: invalid nodeinfo value returned"
+                    " from node %s" % (node,))
         bad = True
         continue
 
@@ -1269,6 +1289,7 @@ class LUVerifyDisks(NoHooksLU):
       if isinstance(lvs, basestring):
         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
         res_nlvm[node] = lvs
+        continue
       elif not isinstance(lvs, dict):
         logging.warning("Connection to node %s failed or invalid data"
                         " returned", node)
@@ -1396,7 +1417,7 @@ class LUSetClusterParams(LogicalUnit):
   _OP_REQP = []
   REQ_BGL = False
 
-  def CheckParameters(self):
+  def CheckArguments(self):
     """Check parameters
 
     """
@@ -1405,7 +1426,7 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.candidate_pool_size is not None:
       try:
         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
-      except ValueError, err:
+      except (ValueError, TypeError), err:
         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
                                    str(err))
       if self.op.candidate_pool_size < 1:
@@ -1502,8 +1523,11 @@ class LUSetClusterParams(LogicalUnit):
 
     """
     if self.op.vg_name is not None:
-      if self.op.vg_name != self.cfg.GetVGName():
-        self.cfg.SetVGName(self.op.vg_name)
+      new_volume = self.op.vg_name
+      if not new_volume:
+        new_volume = None
+      if new_volume != self.cfg.GetVGName():
+        self.cfg.SetVGName(new_volume)
       else:
         feedback_fn("Cluster LVM configuration already in desired"
                     " state, not changing")
@@ -1524,6 +1548,45 @@ class LUSetClusterParams(LogicalUnit):
       _AdjustCandidatePool(self)
 
 
+def _RedistributeAncillaryFiles(lu, additional_nodes=None):
+  """Distribute additional files which are part of the cluster configuration.
+
+  ConfigWriter takes care of distributing the config and ssconf files, but
+  there are more files which should be distributed to all nodes. This function
+  makes sure those are copied.
+
+  @param lu: calling logical unit
+  @param additional_nodes: list of nodes not in the config to distribute to
+
+  """
+  # 1. Gather target nodes
+  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
+  dist_nodes = lu.cfg.GetNodeList()
+  if additional_nodes is not None:
+    dist_nodes.extend(additional_nodes)
+  if myself.name in dist_nodes:
+    dist_nodes.remove(myself.name)
+  # 2. Gather files to distribute
+  dist_files = set([constants.ETC_HOSTS,
+                    constants.SSH_KNOWN_HOSTS_FILE,
+                    constants.RAPI_CERT_FILE,
+                    constants.RAPI_USERS_FILE,
+                   ])
+
+  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
+  for hv_name in enabled_hypervisors:
+    hv_class = hypervisor.GetHypervisor(hv_name)
+    dist_files.update(hv_class.GetAncillaryFiles())
+
+  # 3. Perform the files upload
+  for fname in dist_files:
+    if os.path.exists(fname):
+      result = lu.rpc.call_upload_file(dist_nodes, fname)
+      for to_node, to_result in result.items():
+        if to_result.failed or not to_result.data:
+          logging.error("Copy of file %s to node %s failed", fname, to_node)
+
+
 class LURedistributeConfig(NoHooksLU):
   """Force the redistribution of cluster configuration.
 
@@ -1549,6 +1612,7 @@ class LURedistributeConfig(NoHooksLU):
 
     """
     self.cfg.Update(self.cfg.GetClusterInfo())
+    _RedistributeAncillaryFiles(self)
 
 
 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
@@ -1660,9 +1724,11 @@ class LUDiagnoseOS(NoHooksLU):
                        selected=self.op.output_fields)
 
     # Lock all nodes, in shared mode
+    # Temporary removal of locks, should be reverted later
+    # TODO: reintroduce locks when they are lighter-weight
     self.needed_locks = {}
-    self.share_locks[locking.LEVEL_NODE] = 1
-    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    #self.share_locks[locking.LEVEL_NODE] = 1
+    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1686,6 +1752,11 @@ class LUDiagnoseOS(NoHooksLU):
 
     """
     all_os = {}
+    # we build here the list of nodes that didn't fail the RPC (at RPC
+    # level), so that nodes with a non-responding node daemon don't
+    # make all OSes invalid
+    good_nodes = [node_name for node_name in rlist
+                  if not rlist[node_name].failed]
     for node_name, nr in rlist.iteritems():
       if nr.failed or not nr.data:
         continue
@@ -1694,7 +1765,7 @@ class LUDiagnoseOS(NoHooksLU):
           # build a list of nodes for this os containing empty lists
           # for each node in node_list
           all_os[os_obj.name] = {}
-          for nname in node_list:
+          for nname in good_nodes:
             all_os[os_obj.name][nname] = []
         all_os[os_obj.name][node_name].append(os_obj)
     return all_os
@@ -1703,9 +1774,7 @@ class LUDiagnoseOS(NoHooksLU):
     """Compute the list of OSes.
 
     """
-    node_list = self.acquired_locks[locking.LEVEL_NODE]
-    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
-                   if node in node_list]
+    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
     node_data = self.rpc.call_os_diagnose(valid_nodes)
     if node_data == False:
       raise errors.OpExecError("Can't gather the list of OSes")
@@ -2219,35 +2288,11 @@ class LUAddNode(LogicalUnit):
                       (verifier, result[verifier].data['nodelist'][failed]))
         raise errors.OpExecError("ssh/hostname verification failed.")
 
-    # Distribute updated /etc/hosts and known_hosts to all nodes,
-    # including the node just added
-    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
-    dist_nodes = self.cfg.GetNodeList()
-    if not self.op.readd:
-      dist_nodes.append(node)
-    if myself.name in dist_nodes:
-      dist_nodes.remove(myself.name)
-
-    logging.debug("Copying hosts and known_hosts to all nodes")
-    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
-      result = self.rpc.call_upload_file(dist_nodes, fname)
-      for to_node, to_result in result.iteritems():
-        if to_result.failed or not to_result.data:
-          logging.error("Copy of file %s to node %s failed", fname, to_node)
-
-    to_copy = []
-    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
-    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
-      to_copy.append(constants.VNC_PASSWORD_FILE)
-
-    for fname in to_copy:
-      result = self.rpc.call_upload_file([node], fname)
-      if result[node].failed or not result[node]:
-        logging.error("Could not copy file %s to node %s", fname, node)
-
     if self.op.readd:
+      _RedistributeAncillaryFiles(self)
       self.context.ReaddNode(new_node)
     else:
+      _RedistributeAncillaryFiles(self, additional_nodes=node)
       self.context.AddNode(new_node)
 
 
@@ -2322,7 +2367,7 @@ class LUSetNodeParams(LogicalUnit):
         ((node.offline and not self.op.offline == False) or
          (node.drained and not self.op.drained == False))):
       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
-                                 " to master_candidate")
+                                 " to master_candidate" % node.name)
 
     return
 
@@ -2414,6 +2459,10 @@ class LUQueryClusterInfo(NoHooksLU):
                         for hypervisor in cluster.enabled_hypervisors]),
       "beparams": cluster.beparams,
       "candidate_pool_size": cluster.candidate_pool_size,
+      "default_bridge": cluster.default_bridge,
+      "master_netdev": cluster.master_netdev,
+      "volume_group_name": cluster.volume_group_name,
+      "file_storage_dir": cluster.file_storage_dir,
       }
 
     return result
@@ -2728,15 +2777,48 @@ class LUStartupInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    # extra beparams
+    self.beparams = getattr(self.op, "beparams", {})
+    if self.beparams:
+      if not isinstance(self.beparams, dict):
+        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
+                                   " dict" % (type(self.beparams), ))
+      # fill the beparams dict
+      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
+      self.op.beparams = self.beparams
+
+    # extra hvparams
+    self.hvparams = getattr(self.op, "hvparams", {})
+    if self.hvparams:
+      if not isinstance(self.hvparams, dict):
+        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
+                                   " dict" % (type(self.hvparams), ))
+
+      # check hypervisor parameter syntax (locally)
+      cluster = self.cfg.GetClusterInfo()
+      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
+      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
+                                    instance.hvparams)
+      filled_hvp.update(self.hvparams)
+      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
+      hv_type.CheckParameterSyntax(filled_hvp)
+      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
+      self.op.hvparams = self.hvparams
+
     _CheckNodeOnline(self, instance.primary_node)
 
     bep = self.cfg.GetClusterInfo().FillBE(instance)
     # check bridges existance
     _CheckInstanceBridgesExist(self, instance)
 
-    _CheckNodeFreeMemory(self, instance.primary_node,
-                         "starting instance %s" % instance.name,
-                         bep[constants.BE_MEMORY], instance.hypervisor)
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    remote_info.Raise()
+    if not remote_info.data:
+      _CheckNodeFreeMemory(self, instance.primary_node,
+                           "starting instance %s" % instance.name,
+                           bep[constants.BE_MEMORY], instance.hypervisor)
 
   def Exec(self, feedback_fn):
     """Start the instance.
@@ -2751,7 +2833,8 @@ class LUStartupInstance(LogicalUnit):
 
     _StartInstanceDisks(self, instance, force)
 
-    result = self.rpc.call_instance_start(node_current, instance)
+    result = self.rpc.call_instance_start(node_current, instance,
+                                          self.hvparams, self.beparams)
     msg = result.RemoteFailMsg()
     if msg:
       _ShutdownInstanceDisks(self, instance)
@@ -2833,7 +2916,7 @@ class LURebootInstance(LogicalUnit):
                                  " full reboot: %s" % msg)
       _ShutdownInstanceDisks(self, instance)
       _StartInstanceDisks(self, instance, ignore_secondaries)
-      result = self.rpc.call_instance_start(node_current, instance)
+      result = self.rpc.call_instance_start(node_current, instance, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -2933,7 +3016,8 @@ class LUReinstallInstance(LogicalUnit):
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
-    if remote_info.failed or remote_info.data:
+    remote_info.Raise()
+    if remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -2968,7 +3052,7 @@ class LUReinstallInstance(LogicalUnit):
     _StartInstanceDisks(self, inst, None)
     try:
       feedback_fn("Running the instance OS create scripts...")
-      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
+      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
       msg = result.RemoteFailMsg()
       if msg:
         raise errors.OpExecError("Could not install OS for instance %s"
@@ -3523,7 +3607,7 @@ class LUFailoverInstance(LogicalUnit):
         raise errors.OpExecError("Can't activate the instance's disks")
 
       feedback_fn("* starting the instance on the target node")
-      result = self.rpc.call_instance_start(target_node, instance)
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -4207,8 +4291,8 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams):
       continue
     msg = info.RemoteFailMsg()
     if msg:
-      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
-                                 " %s" % msg)
+      raise errors.OpPrereqError("Hypervisor parameter validation"
+                                 " failed on node %s: %s" % (node, msg))
 
 
 class LUCreateInstance(LogicalUnit):
@@ -4736,7 +4820,7 @@ class LUCreateInstance(LogicalUnit):
     if iobj.disk_template != constants.DT_DISKLESS:
       if self.op.mode == constants.INSTANCE_CREATE:
         feedback_fn("* running the instance OS create scripts...")
-        result = self.rpc.call_instance_os_add(pnode_name, iobj)
+        result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
         msg = result.RemoteFailMsg()
         if msg:
           raise errors.OpExecError("Could not add os for instance %s"
@@ -4767,7 +4851,7 @@ class LUCreateInstance(LogicalUnit):
       self.cfg.Update(iobj)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      result = self.rpc.call_instance_start(pnode_name, iobj)
+      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         raise errors.OpExecError("Could not start instance: %s" % msg)
@@ -5292,7 +5376,7 @@ class LUReplaceDisks(LogicalUnit):
       try:
         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
                               _GetInstanceInfoText(instance), False)
-      except errors.BlockDeviceError:
+      except errors.GenericError:
         self.cfg.ReleaseDRBDMinors(instance.name)
         raise
 
@@ -5871,7 +5955,7 @@ class LUSetInstanceParams(LogicalUnit):
         self.warn.append("Can't get info from primary node %s" % pnode)
       else:
         if not instance_info.failed and instance_info.data:
-          current_mem = instance_info.data['memory']
+          current_mem = int(instance_info.data['memory'])
         else:
           # Assume instance not running
           # (there is a slight race condition here, but it's not very probable,
@@ -6215,7 +6299,7 @@ class LUExportInstance(LogicalUnit):
 
     finally:
       if self.op.shutdown and instance.admin_up:
-        result = self.rpc.call_instance_start(src_node, instance)
+        result = self.rpc.call_instance_start(src_node, instance, None, None)
         msg = result.RemoteFailMsg()
         if msg:
           _ShutdownInstanceDisks(self, instance)
@@ -6684,6 +6768,8 @@ class IAllocator(object):
         "disk_template": iinfo.disk_template,
         "hypervisor": iinfo.hypervisor,
         }
+      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
+                                                 pir["disks"])
       instance_data[iinfo.name] = pir
 
     data["instances"] = instance_data