Add and remove instance/node locks
[ganeti-local] / lib / cmdlib.py
index b550e01..c339933 100644 (file)
@@ -54,16 +54,22 @@ class LogicalUnit(object):
     - implement Exec
     - implement BuildHooksEnv
     - redefine HPATH and HTYPE
-    - optionally redefine their run requirements (REQ_MASTER); note that all
-      commands require root permissions
+    - optionally redefine their run requirements:
+        REQ_MASTER: the LU needs to run on the master node
+        REQ_WSSTORE: the LU needs a writable SimpleStore
+        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
+
+  Note that all commands require root permissions.
 
   """
   HPATH = None
   HTYPE = None
   _OP_REQP = []
   REQ_MASTER = True
+  REQ_WSSTORE = False
+  REQ_BGL = True
 
-  def __init__(self, processor, op, cfg, sstore):
+  def __init__(self, processor, op, context, sstore):
     """Constructor for LogicalUnit.
 
     This needs to be overriden in derived classes in order to check op
@@ -72,8 +78,9 @@ class LogicalUnit(object):
     """
     self.proc = processor
     self.op = op
-    self.cfg = cfg
+    self.cfg = context.cfg
     self.sstore = sstore
+    self.context = context
     self.__ssh = None
 
     for attr_name in self._OP_REQP:
@@ -849,6 +856,7 @@ class LURenameCluster(LogicalUnit):
   HPATH = "cluster-rename"
   HTYPE = constants.HTYPE_CLUSTER
   _OP_REQP = ["name"]
+  REQ_WSSTORE = True
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -875,8 +883,7 @@ class LURenameCluster(LogicalUnit):
       raise errors.OpPrereqError("Neither the name nor the IP address of the"
                                  " cluster has changed")
     if new_ip != old_ip:
-      result = utils.RunCmd(["fping", "-q", new_ip])
-      if not result.failed:
+      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("The given cluster IP address (%s) is"
                                    " reachable on the network. Aborting." %
                                    new_ip)
@@ -1182,7 +1189,7 @@ class LURemoveNode(LogicalUnit):
     """Build hooks env.
 
     This doesn't run on the target node in the pre phase as a failed
-    node would not allows itself to run.
+    node would then be impossible to remove.
 
     """
     env = {
@@ -1236,11 +1243,11 @@ class LURemoveNode(LogicalUnit):
 
     rpc.call_node_leave_cluster(node.name)
 
-    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
-
     logger.Info("Removing node %s from config" % node.name)
 
     self.cfg.RemoveNode(node.name)
+    # Remove the node from the Ganeti Lock Manager
+    self.context.glm.remove(locking.LEVEL_NODE, node.name)
 
     utils.RemoveHostFromEtcHosts(node.name)
 
@@ -1266,7 +1273,7 @@ class LUQueryNodes(NoHooksLU):
 
     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
                                "pinst_list", "sinst_list",
-                               "pip", "sip"],
+                               "pip", "sip", "tags"],
                        dynamic=self.dynamic_fields,
                        selected=self.op.output_fields)
 
@@ -1337,6 +1344,8 @@ class LUQueryNodes(NoHooksLU):
           val = node.primary_ip
         elif field == "sip":
           val = node.secondary_ip
+        elif field == "tags":
+          val = list(node.GetTags())
         elif field in self.dynamic_fields:
           val = live_data[node.name].get(field, None)
         else:
@@ -1518,11 +1527,6 @@ class LUAddNode(LogicalUnit):
                                  primary_ip=primary_ip,
                                  secondary_ip=secondary_ip)
 
-    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
-      if not os.path.exists(constants.VNC_PASSWORD_FILE):
-        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
-                                   constants.VNC_PASSWORD_FILE)
-
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
 
@@ -1530,46 +1534,7 @@ class LUAddNode(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
-    # set up inter-node password and certificate and restarts the node daemon
-    gntpass = self.sstore.GetNodeDaemonPassword()
-    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
-      raise errors.OpExecError("ganeti password corruption detected")
-    f = open(constants.SSL_CERT_FILE)
-    try:
-      gntpem = f.read(8192)
-    finally:
-      f.close()
-    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
-    # so we use this to detect an invalid certificate; as long as the
-    # cert doesn't contain this, the here-document will be correctly
-    # parsed by the shell sequence below
-    if re.search('^!EOF\.', gntpem, re.MULTILINE):
-      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
-    if not gntpem.endswith("\n"):
-      raise errors.OpExecError("PEM must end with newline")
-    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
-
-    # and then connect with ssh to set password and start ganeti-noded
-    # note that all the below variables are sanitized at this point,
-    # either by being constants or by the checks above
-    ss = self.sstore
-    mycommand = ("umask 077 && "
-                 "echo '%s' > '%s' && "
-                 "cat > '%s' << '!EOF.' && \n"
-                 "%s!EOF.\n%s restart" %
-                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
-                  constants.SSL_CERT_FILE, gntpem,
-                  constants.NODE_INITD_SCRIPT))
-
-    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
-    if result.failed:
-      raise errors.OpExecError("Remote command on node %s, error: %s,"
-                               " output: %s" %
-                               (node, result.fail_reason, result.output))
-
     # check connectivity
-    time.sleep(4)
-
     result = rpc.call_version([node])[node]
     if result:
       if constants.PROTOCOL_VERSION == result:
@@ -1616,12 +1581,22 @@ class LUAddNode(LogicalUnit):
                                  " you gave (%s). Please fix and re-run this"
                                  " command." % new_node.secondary_ip)
 
-    success, msg = self.ssh.VerifyNodeHostname(node)
-    if not success:
-      raise errors.OpExecError("Node '%s' claims it has a different hostname"
-                               " than the one the resolver gives: %s."
-                               " Please fix and re-run this command." %
-                               (node, msg))
+    node_verify_list = [self.sstore.GetMasterNode()]
+    node_verify_param = {
+      'nodelist': [node],
+      # TODO: do a node-net-test as well?
+    }
+
+    result = rpc.call_node_verify(node_verify_list, node_verify_param)
+    for verifier in node_verify_list:
+      if not result[verifier]:
+        raise errors.OpExecError("Cannot communicate with %s's node daemon"
+                                 " for remote verification" % verifier)
+      if result[verifier]['nodelist']:
+        for failed in result[verifier]['nodelist']:
+          feedback_fn("ssh/hostname verification failed %s -> %s" %
+                      (verifier, result[verifier]['nodelist'][failed]))
+        raise errors.OpExecError("ssh/hostname verification failed.")
 
     # Distribute updated /etc/hosts and known_hosts to all nodes,
     # including the node just added
@@ -1640,16 +1615,19 @@ class LUAddNode(LogicalUnit):
           logger.Error("copy of file %s to node %s failed" %
                        (fname, to_node))
 
-    to_copy = ss.GetFileList()
+    to_copy = self.sstore.GetFileList()
     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
       to_copy.append(constants.VNC_PASSWORD_FILE)
     for fname in to_copy:
-      if not self.ssh.CopyFileToNode(node, fname):
+      result = rpc.call_upload_file([node], fname)
+      if not result[node]:
         logger.Error("could not copy file %s to node %s" % (fname, node))
 
     if not self.op.readd:
       logger.Info("adding node %s to cluster.conf" % node)
       self.cfg.AddNode(new_node)
+      # Add the new node to the Ganeti Lock Manager
+      self.context.glm.add(locking.LEVEL_NODE, node)
 
 
 class LUMasterFailover(LogicalUnit):
@@ -1661,6 +1639,7 @@ class LUMasterFailover(LogicalUnit):
   HPATH = "master-failover"
   HTYPE = constants.HTYPE_CLUSTER
   REQ_MASTER = False
+  REQ_WSSTORE = True
   _OP_REQP = []
 
   def BuildHooksEnv(self):
@@ -1755,45 +1734,6 @@ class LUQueryClusterInfo(NoHooksLU):
     return result
 
 
-class LUClusterCopyFile(NoHooksLU):
-  """Copy file to cluster.
-
-  """
-  _OP_REQP = ["nodes", "filename"]
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    It should check that the named file exists and that the given list
-    of nodes is valid.
-
-    """
-    if not os.path.exists(self.op.filename):
-      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
-
-    self.nodes = _GetWantedNodes(self, self.op.nodes)
-
-  def Exec(self, feedback_fn):
-    """Copy a file from master to some nodes.
-
-    Args:
-      opts - class with options as members
-      args - list containing a single element, the file name
-    Opts used:
-      nodes - list containing the name of target nodes; if empty, all nodes
-
-    """
-    filename = self.op.filename
-
-    myname = utils.HostInfo().name
-
-    for node in self.nodes:
-      if node == myname:
-        continue
-      if not self.ssh.CopyFileToNode(node, filename):
-        logger.Error("Copy of file %s to node %s failed" % (filename, node))
-
-
 class LUDumpClusterConfig(NoHooksLU):
   """Return a text-representation of the cluster-config.
 
@@ -1813,38 +1753,6 @@ class LUDumpClusterConfig(NoHooksLU):
     return self.cfg.DumpConfig()
 
 
-class LURunClusterCommand(NoHooksLU):
-  """Run a command on some nodes.
-
-  """
-  _OP_REQP = ["command", "nodes"]
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    It checks that the given list of nodes is valid.
-
-    """
-    self.nodes = _GetWantedNodes(self, self.op.nodes)
-
-  def Exec(self, feedback_fn):
-    """Run a command on some nodes.
-
-    """
-    # put the master at the end of the nodes list
-    master_node = self.sstore.GetMasterNode()
-    if master_node in self.nodes:
-      self.nodes.remove(master_node)
-      self.nodes.append(master_node)
-
-    data = []
-    for node in self.nodes:
-      result = self.ssh.Run(node, "root", self.op.command)
-      data.append((node, result.output, result.exit_code))
-
-    return data
-
-
 class LUActivateInstanceDisks(NoHooksLU):
   """Bring up an instance's disks.
 
@@ -2354,9 +2262,7 @@ class LURenameInstance(LogicalUnit):
                                  new_name)
 
     if not getattr(self.op, "ignore_ip", False):
-      command = ["fping", "-q", name_info.ip]
-      result = utils.RunCmd(command)
-      if not result.failed:
+      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("IP %s of instance %s already in use" %
                                    (name_info.ip, new_name))
 
@@ -2464,6 +2370,8 @@ class LURemoveInstance(LogicalUnit):
     logger.Info("removing instance %s out of cluster config" % instance.name)
 
     self.cfg.RemoveInstance(instance.name)
+    # Remove the new instance from the Ganeti Lock Manager
+    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
 
 
 class LUQueryInstances(NoHooksLU):
@@ -2482,7 +2390,7 @@ class LUQueryInstances(NoHooksLU):
     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
                                "admin_state", "admin_ram",
                                "disk_template", "ip", "mac", "bridge",
-                               "sda_size", "sdb_size", "vcpus"],
+                               "sda_size", "sdb_size", "vcpus", "tags"],
                        dynamic=self.dynamic_fields,
                        selected=self.op.output_fields)
 
@@ -2575,6 +2483,8 @@ class LUQueryInstances(NoHooksLU):
             val = disk.size
         elif field == "vcpus":
           val = instance.vcpus
+        elif field == "tags":
+          val = list(instance.GetTags())
         else:
           raise errors.ParameterError(field)
         iout.append(val)
@@ -2678,7 +2588,7 @@ class LUFailoverInstance(LogicalUnit):
 
     instance.primary_node = target_node
     # distribute new instance config to the other nodes
-    self.cfg.AddInstance(instance)
+    self.cfg.Update(instance)
 
     # Only start the instance if it's marked as up
     if instance.status == "up":
@@ -2762,22 +2672,6 @@ def _GenerateUniqueNames(cfg, exts):
   return results
 
 
-def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
-  """Generate a drbd device complete with its children.
-
-  """
-  port = cfg.AllocatePort()
-  vgname = cfg.GetVGName()
-  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgname, names[0]))
-  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                          logical_id=(vgname, names[1]))
-  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
-                          logical_id = (primary, secondary, port),
-                          children = [dev_data, dev_meta])
-  return drbd_dev
-
-
 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
   """Generate a drbd8 device complete with its children.
 
@@ -3336,6 +3230,8 @@ class LUCreateInstance(LogicalUnit):
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj)
+    # Add the new instance to the Ganeti Lock Manager
+    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
 
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
@@ -3350,6 +3246,8 @@ class LUCreateInstance(LogicalUnit):
     if disk_abort:
       _RemoveDisks(iobj, self.cfg)
       self.cfg.RemoveInstance(iobj.name)
+      # Remove the new instance from the Ganeti Lock Manager
+      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
@@ -3907,6 +3805,12 @@ class LUReplaceDisks(LogicalUnit):
 
     """
     instance = self.instance
+
+    # Activate the instance disks if we're replacing them on a down instance
+    if instance.status == "down":
+      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
+      self.proc.ChainOpCode(op)
+
     if instance.disk_template == constants.DT_DRBD8:
       if self.op.remote_node is None:
         fn = self._ExecD8DiskOnly
@@ -3914,7 +3818,97 @@ class LUReplaceDisks(LogicalUnit):
         fn = self._ExecD8Secondary
     else:
       raise errors.ProgrammerError("Unhandled disk replacement case")
-    return fn(feedback_fn)
+
+    ret = fn(feedback_fn)
+
+    # Deactivate the instance disks if we're replacing them on a down instance
+    if instance.status == "down":
+      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
+      self.proc.ChainOpCode(op)
+
+    return ret
+
+
+class LUGrowDisk(LogicalUnit):
+  """Grow a disk of an instance.
+
+  """
+  HPATH = "disk-grow"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "disk", "amount"]
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
+
+    """
+    env = {
+      "DISK": self.op.disk,
+      "AMOUNT": self.op.amount,
+      }
+    env.update(_BuildInstanceHookEnvByObject(self.instance))
+    nl = [
+      self.sstore.GetMasterNode(),
+      self.instance.primary_node,
+      ]
+    return env, nl, nl
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    instance = self.cfg.GetInstanceInfo(
+      self.cfg.ExpandInstanceName(self.op.instance_name))
+    if instance is None:
+      raise errors.OpPrereqError("Instance '%s' not known" %
+                                 self.op.instance_name)
+    self.instance = instance
+    self.op.instance_name = instance.name
+
+    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
+      raise errors.OpPrereqError("Instance's disk layout does not support"
+                                 " growing.")
+
+    if instance.FindDisk(self.op.disk) is None:
+      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
+                                 (self.op.disk, instance.name))
+
+    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
+    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
+    for node in nodenames:
+      info = nodeinfo.get(node, None)
+      if not info:
+        raise errors.OpPrereqError("Cannot get current information"
+                                   " from node '%s'" % node)
+      vg_free = info.get('vg_free', None)
+      if not isinstance(vg_free, int):
+        raise errors.OpPrereqError("Can't compute free disk space on"
+                                   " node %s" % node)
+      if self.op.amount > info['vg_free']:
+        raise errors.OpPrereqError("Not enough disk space on target node %s:"
+                                   " %d MiB available, %d MiB required" %
+                                   (node, info['vg_free'], self.op.amount))
+
+  def Exec(self, feedback_fn):
+    """Execute disk grow.
+
+    """
+    instance = self.instance
+    disk = instance.FindDisk(self.op.disk)
+    for node in (instance.secondary_nodes + (instance.primary_node,)):
+      self.cfg.SetDiskID(disk, node)
+      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
+      if not result or not isinstance(result, tuple) or len(result) != 2:
+        raise errors.OpExecError("grow request failed to node %s" % node)
+      elif not result[0]:
+        raise errors.OpExecError("grow request failed to node %s: %s" %
+                                 (node, result[1]))
+    disk.RecordGrow(self.op.amount)
+    self.cfg.Update(instance)
+    return
 
 
 class LUQueryInstanceData(NoHooksLU):
@@ -4567,7 +4561,7 @@ class LUDelTags(TagsLU):
 class LUTestDelay(NoHooksLU):
   """Sleep for a specified amount of time.
 
-  This LU sleeps on the master and/or nodes for a specified amoutn of
+  This LU sleeps on the master and/or nodes for a specified amount of
   time.
 
   """