Update scripts and qa config for changed hypervisor names.
[ganeti-local] / lib / backend.py
index b78a5ec..0df7a00 100644 (file)
@@ -32,6 +32,7 @@ import re
 import subprocess
 import random
 import logging
+import tempfile
 
 from ganeti import errors
 from ganeti import utils
@@ -43,8 +44,62 @@ from ganeti import objects
 from ganeti import ssconf
 
 
-def _GetSshRunner():
-  return ssh.SshRunner()
+def _GetConfig():
+  return ssconf.SimpleConfigReader()
+
+
+def _GetSshRunner(cluster_name):
+  return ssh.SshRunner(cluster_name)
+
+
+def _CleanDirectory(path, exclude=[]):
+  """Removes all regular files in a directory.
+
+  @param exclude: List of files to be excluded.
+  @type exclude: list
+
+  """
+  if not os.path.isdir(path):
+    return
+
+  # Normalize excluded paths
+  exclude = [os.path.normpath(i) for i in exclude]
+
+  for rel_name in utils.ListVisibleFiles(path):
+    full_name = os.path.normpath(os.path.join(path, rel_name))
+    if full_name in exclude:
+      continue
+    if os.path.isfile(full_name) and not os.path.islink(full_name):
+      utils.RemoveFile(full_name)
+
+
+def JobQueuePurge():
+  """Removes job queue files and archived jobs
+
+  """
+  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
+  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
+def GetMasterInfo():
+  """Returns master information.
+
+  This is an utility function to compute master information, either
+  for consumption here or from the node daemon.
+
+  @rtype: tuple
+  @return: (master_netdev, master_ip, master_name)
+
+  """
+  try:
+    cfg = _GetConfig()
+    master_netdev = cfg.GetMasterNetdev()
+    master_ip = cfg.GetMasterIP()
+    master_node = cfg.GetMasterNode()
+  except errors.ConfigurationError, err:
+    logging.exception("Cluster configuration incomplete")
+    return (None, None)
+  return (master_netdev, master_ip, master_node)
 
 
 def StartMaster(start_daemons):
@@ -56,14 +111,39 @@ def StartMaster(start_daemons):
   (ganet-masterd and ganeti-rapi).
 
   """
-  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
-
-  if result.failed:
-    logging.error("could not activate cluster interface with command %s,"
-                  " error: '%s'", result.cmd, result.output)
+  ok = True
+  master_netdev, master_ip, _ = GetMasterInfo()
+  if not master_netdev:
     return False
 
-  return True
+  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
+    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
+                     source=constants.LOCALHOST_IP_ADDRESS):
+      # we already have the ip:
+      logging.debug("Already started")
+    else:
+      logging.error("Someone else has the master ip, not activating")
+      ok = False
+  else:
+    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
+                           "dev", master_netdev, "label",
+                           "%s:0" % master_netdev])
+    if result.failed:
+      logging.error("Can't activate master IP: %s", result.output)
+      ok = False
+
+    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
+                           "-s", master_ip, master_ip])
+    # we'll ignore the exit code of arping
+
+  # and now start the master and rapi daemons
+  if start_daemons:
+    for daemon in 'ganeti-masterd', 'ganeti-rapi':
+      result = utils.RunCmd([daemon])
+      if result.failed:
+        logging.error("Can't start daemon %s: %s", daemon, result.output)
+        ok = False
+  return ok
 
 
 def StopMaster(stop_daemons):
@@ -74,12 +154,20 @@ def StopMaster(stop_daemons):
   stop the master daemons (ganet-masterd and ganeti-rapi).
 
   """
-  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
+  master_netdev, master_ip, _ = GetMasterInfo()
+  if not master_netdev:
+    return False
 
+  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
+                         "dev", master_netdev])
   if result.failed:
-    logging.error("could not deactivate cluster interface with command %s,"
-                  " error: '%s'", result.cmd, result.output)
-    return False
+    logging.error("Can't remove the master IP, error: %s", result.output)
+    # but otherwise ignore the failure
+
+  if stop_daemons:
+    # stop/kill the rapi and the master daemon
+    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
+      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
 
   return True
 
@@ -121,11 +209,8 @@ def LeaveCluster():
   """Cleans up the current node and prepares it to be removed from the cluster.
 
   """
-  if os.path.isdir(constants.DATA_DIR):
-    for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
-      full_name = os.path.join(constants.DATA_DIR, rel_name)
-      if os.path.isfile(full_name) and not os.path.islink(full_name):
-        utils.RemoveFile(full_name)
+  _CleanDirectory(constants.DATA_DIR)
+  JobQueuePurge()
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
@@ -146,18 +231,21 @@ def LeaveCluster():
   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
 
 
-def GetNodeInfo(vgname):
+def GetNodeInfo(vgname, hypervisor_type):
   """Gives back a hash with different informations about the node.
 
-  Returns:
-    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
-      'memory_free' : xxx, 'memory_total' : xxx }
-    where
-    vg_size is the size of the configured volume group in MiB
-    vg_free is the free size of the volume group in MiB
-    memory_dom0 is the memory allocated for domain0 in MiB
-    memory_free is the currently available (free) ram in MiB
-    memory_total is the total number of ram in MiB
+  @type vgname: C{string}
+  @param vgname: the name of the volume group to ask for disk space information
+  @type hypervisor_type: C{str}
+  @param hypervisor_type: the name of the hypervisor to ask for
+      memory information
+  @rtype: C{dict}
+  @return: dictionary with the following keys:
+      - vg_size is the size of the configured volume group in MiB
+      - vg_free is the free size of the volume group in MiB
+      - memory_dom0 is the memory allocated for domain0 in MiB
+      - memory_free is the currently available (free) ram in MiB
+      - memory_total is the total number of ram in MiB
 
   """
   outputarray = {}
@@ -165,7 +253,7 @@ def GetNodeInfo(vgname):
   outputarray['vg_size'] = vginfo['vg_size']
   outputarray['vg_free'] = vginfo['vg_free']
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(hypervisor_type)
   hyp_info = hyper.GetNodeInfo()
   if hyp_info is not None:
     outputarray.update(hyp_info)
@@ -179,28 +267,39 @@ def GetNodeInfo(vgname):
   return outputarray
 
 
-def VerifyNode(what):
+def VerifyNode(what, cluster_name):
   """Verify the status of the local node.
 
-  Args:
-    what - a dictionary of things to check:
-      'filelist' : list of files for which to compute checksums
-      'nodelist' : list of nodes we should check communication with
-      'hypervisor': run the hypervisor-specific verify
+  Based on the input L{what} parameter, various checks are done on the
+  local node.
+
+  If the I{filelist} key is present, this list of
+  files is checksummed and the file/checksum pairs are returned.
 
-  Requested files on local node are checksummed and the result returned.
+  If the I{nodelist} key is present, we check that we have
+  connectivity via ssh with the target nodes (and check the hostname
+  report).
+
+  If the I{node-net-test} key is present, we check that we have
+  connectivity to the given nodes via both primary IP and, if
+  applicable, secondary IPs.
+
+  @type what: C{dict}
+  @param what: a dictionary of things to check:
+      - filelist: list of files for which to compute checksums
+      - nodelist: list of nodes we should check ssh communication with
+      - node-net-test: list of nodes we should check node daemon port
+        connectivity with
+      - hypervisor: list with hypervisors to run the verify for
 
-  The nodelist is traversed, with the following checks being made
-  for each node:
-  - known_hosts key correct
-  - correct resolving of node name (target node returns its own hostname
-    by ssh-execution of 'hostname', result compared against name in list.
 
   """
   result = {}
 
   if 'hypervisor' in what:
-    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
+    result['hypervisor'] = my_dict = {}
+    for hv_name in what['hypervisor']:
+      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
 
   if 'filelist' in what:
     result['filelist'] = utils.FingerprintFiles(what['filelist'])
@@ -209,7 +308,7 @@ def VerifyNode(what):
     result['nodelist'] = {}
     random.shuffle(what['nodelist'])
     for node in what['nodelist']:
-      success, message = _GetSshRunner().VerifyNodeHostname(node)
+      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
       if not success:
         result['nodelist'][node] = message
   if 'node-net-test' in what:
@@ -226,7 +325,7 @@ def VerifyNode(what):
                                           " primary/secondary IP"
                                           " in the node list")
     else:
-      port = ssconf.SimpleStore().GetNodeDaemonPort()
+      port = utils.GetNodeDaemonPort()
       for name, pip, sip in what['node-net-test']:
         fail = []
         if not utils.TcpPing(pip, port, source=my_pip):
@@ -330,41 +429,49 @@ def BridgesExist(bridges_list):
   return True
 
 
-def GetInstanceList():
+def GetInstanceList(hypervisor_list):
   """Provides a list of instances.
 
-  Returns:
-    A list of all running instances on the current node
-    - instance1.example.com
-    - instance2.example.com
+  @type hypervisor_list: list
+  @param hypervisor_list: the list of hypervisors to query information
+
+  @rtype: list
+  @return: a list of all running instances on the current node
+             - instance1.example.com
+             - instance2.example.com
 
   """
-  try:
-    names = hypervisor.GetHypervisor().ListInstances()
-  except errors.HypervisorError, err:
-    logging.exception("Error enumerating instances")
-    raise
+  results = []
+  for hname in hypervisor_list:
+    try:
+      names = hypervisor.GetHypervisor(hname).ListInstances()
+      results.extend(names)
+    except errors.HypervisorError, err:
+      logging.exception("Error enumerating instances for hypevisor %s", hname)
+      # FIXME: should we somehow not propagate this to the master?
+      raise
 
-  return names
+  return results
 
 
-def GetInstanceInfo(instance):
+def GetInstanceInfo(instance, hname):
   """Gives back the informations about an instance as a dictionary.
 
-  Args:
-    instance: name of the instance (ex. instance1.example.com)
+  @type instance: string
+  @param instance: the instance name
+  @type hname: string
+  @param hname: the hypervisor type of the instance
 
-  Returns:
-    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
-    where
-    memory: memory size of instance (int)
-    state: xen state of instance (string)
-    time: cpu time of instance (float)
+  @rtype: dict
+  @return: dictionary with the following keys:
+      - memory: memory size of instance (int)
+      - state: xen state of instance (string)
+      - time: cpu time of instance (float)
 
   """
   output = {}
 
-  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
+  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
   if iinfo is not None:
     output['memory'] = iinfo[2]
     output['state'] = iinfo[4]
@@ -373,34 +480,38 @@ def GetInstanceInfo(instance):
   return output
 
 
-def GetAllInstancesInfo():
+def GetAllInstancesInfo(hypervisor_list):
   """Gather data about all instances.
 
   This is the equivalent of `GetInstanceInfo()`, except that it
   computes data for all instances at once, thus being faster if one
   needs data about more than one instance.
 
-  Returns: a dictionary of dictionaries, keys being the instance name,
-    and with values:
-    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
-    where
-    memory: memory size of instance (int)
-    state: xen state of instance (string)
-    time: cpu time of instance (float)
-    vcpus: the number of cpus
+  @type hypervisor_list: list
+  @param hypervisor_list: list of hypervisors to query for instance data
+
+  @rtype: dict of dicts
+  @return: dictionary of instance: data, with data having the following keys:
+      - memory: memory size of instance (int)
+      - state: xen state of instance (string)
+      - time: cpu time of instance (float)
+      - vcpuus: the number of vcpus
 
   """
   output = {}
 
-  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
-  if iinfo:
-    for name, inst_id, memory, vcpus, state, times in iinfo:
-      output[name] = {
-        'memory': memory,
-        'vcpus': vcpus,
-        'state': state,
-        'time': times,
-        }
+  for hname in hypervisor_list:
+    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
+    if iinfo:
+      for name, inst_id, memory, vcpus, state, times in iinfo:
+        if name in output:
+          raise errors.HypervisorError("Instance %s running duplicate" % name)
+        output[name] = {
+          'memory': memory,
+          'vcpus': vcpus,
+          'state': state,
+          'time': times,
+          }
 
   return output
 
@@ -449,8 +560,9 @@ def AddOSToInstance(instance, os_disk, swap_disk):
                                 inst_os.path, create_script, instance.name,
                                 real_os_dev.dev_path, real_swap_dev.dev_path,
                                 logfile)
+  env = {'HYPERVISOR': instance.hypervisor}
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", command, result.fail_reason, logfile,
@@ -579,17 +691,19 @@ def _GatherBlockDevs(instance):
 def StartInstance(instance, extra_args):
   """Start an instance.
 
-  Args:
-    instance - name of instance to start.
+  @type instance: instance object
+  @param instance: the instance object
+  @rtype: boolean
+  @return: whether the startup was successful or not
 
   """
-  running_instances = GetInstanceList()
+  running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name in running_instances:
     return True
 
   block_devices = _GatherBlockDevs(instance)
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
 
   try:
     hyper.StartInstance(instance, block_devices, extra_args)
@@ -603,16 +717,19 @@ def StartInstance(instance, extra_args):
 def ShutdownInstance(instance):
   """Shut an instance down.
 
-  Args:
-    instance - name of instance to shutdown.
+  @type instance: instance object
+  @param instance: the instance object
+  @rtype: boolean
+  @return: whether the startup was successful or not
 
   """
-  running_instances = GetInstanceList()
+  hv_name = instance.hypervisor
+  running_instances = GetInstanceList([hv_name])
 
   if instance.name not in running_instances:
     return True
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(hv_name)
   try:
     hyper.StopInstance(instance)
   except errors.HypervisorError, err:
@@ -624,7 +741,7 @@ def ShutdownInstance(instance):
 
   time.sleep(1)
   for dummy in range(11):
-    if instance.name not in GetInstanceList():
+    if instance.name not in GetInstanceList([hv_name]):
       break
     time.sleep(10)
   else:
@@ -638,7 +755,7 @@ def ShutdownInstance(instance):
       return False
 
     time.sleep(1)
-    if instance.name in GetInstanceList():
+    if instance.name in GetInstanceList([hv_name]):
       logging.error("could not shutdown instance '%s' even by destroy",
                     instance.name)
       return False
@@ -654,13 +771,13 @@ def RebootInstance(instance, reboot_type, extra_args):
     reboot_type - how to reboot [soft,hard,full]
 
   """
-  running_instances = GetInstanceList()
+  running_instances = GetInstanceList([instance.hypervisor])
 
   if instance.name not in running_instances:
     logging.error("Cannot reboot instance that is not running")
     return False
 
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
     try:
       hyper.RebootInstance(instance)
@@ -677,18 +794,29 @@ def RebootInstance(instance, reboot_type, extra_args):
   else:
     raise errors.ParameterError("reboot_type invalid")
 
-
   return True
 
 
 def MigrateInstance(instance, target, live):
   """Migrates an instance to another node.
 
+  @type instance: C{objects.Instance}
+  @param instance: the instance definition
+  @type target: string
+  @param target: the target node name
+  @type live: boolean
+  @param live: whether the migration should be done live or not (the
+      interpretation of this parameter is left to the hypervisor)
+  @rtype: tuple
+  @return: a tuple of (success, msg) where:
+      - succes is a boolean denoting the success/failure of the operation
+      - msg is a string with details in case of failure
+
   """
-  hyper = hypervisor.GetHypervisor()
+  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
 
   try:
-    hyper.MigrateInstance(instance, target, live)
+    hyper.MigrateInstance(instance.name, target, live)
   except errors.HypervisorError, err:
     msg = "Failed to migrate instance: %s" % str(err)
     logging.error(msg)
@@ -989,9 +1117,8 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
     constants.ETC_HOSTS,
     constants.SSH_KNOWN_HOSTS_FILE,
     constants.VNC_PASSWORD_FILE,
-    constants.JOB_QUEUE_SERIAL_FILE,
     ]
-  allowed_files.extend(ssconf.SimpleStore().GetFileList())
+
   if file_name not in allowed_files:
     logging.error("Filename passed to UploadFile not in allowed"
                  " upload targets: '%s'", file_name)
@@ -1209,7 +1336,7 @@ def SnapshotBlockDevice(disk):
                                  (disk.unique_id, disk.dev_type))
 
 
-def ExportSnapshot(disk, dest_node, instance):
+def ExportSnapshot(disk, dest_node, instance, cluster_name):
   """Export a block device snapshot to a remote node.
 
   Args:
@@ -1250,8 +1377,9 @@ def ExportSnapshot(disk, dest_node, instance):
 
   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
                                 destdir, destdir, destfile)
-  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
-                                       destcmd)
+  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
+                                                   constants.GANETI_RUNAS,
+                                                   destcmd)
 
   # all commands have been checked, so we're safe to combine them
   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
@@ -1300,7 +1428,8 @@ def FinalizeExport(instance, snap_disks):
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
-    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
+    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
+               '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
 
@@ -1349,7 +1478,8 @@ def ExportInfo(dest):
   return config
 
 
-def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
+def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
+                         cluster_name):
   """Import an os image into an instance.
 
   Args:
@@ -1394,8 +1524,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
     os.mkdir(constants.LOG_OS_DIR, 0750)
 
   destcmd = utils.BuildShellCmd('cat %s', src_image)
-  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
-                                       destcmd)
+  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
+                                                   constants.GANETI_RUNAS,
+                                                   destcmd)
 
   comprcmd = "gunzip"
   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
@@ -1404,8 +1535,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
                                logfile)
 
   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+  env = {'HYPERVISOR': instance.hypervisor}
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
 
   if result.failed:
     logging.error("os import command '%s' returned error: %s"
@@ -1489,8 +1621,9 @@ def _TransformFileStorageDir(file_storage_dir):
     normalized file_storage_dir (string) if valid, None otherwise
 
   """
+  cfg = _GetConfig()
   file_storage_dir = os.path.normpath(file_storage_dir)
-  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
+  base_file_storage_dir = cfg.GetFileStorageDir()
   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
       base_file_storage_dir):
     logging.error("file storage directory '%s' is not under base file"
@@ -1599,6 +1732,45 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
   return result
 
 
+def _IsJobQueueFile(file_name):
+  """Checks whether the given filename is in the queue directory.
+
+  """
+  queue_dir = os.path.normpath(constants.QUEUE_DIR)
+  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+  if not result:
+    logging.error("'%s' is not a file in the queue directory",
+                  file_name)
+
+  return result
+
+
+def JobQueueUpdate(file_name, content):
+  """Updates a file in the queue directory.
+
+  """
+  if not _IsJobQueueFile(file_name):
+    return False
+
+  # Write and replace the file atomically
+  utils.WriteFile(file_name, data=content)
+
+  return True
+
+
+def JobQueueRename(old, new):
+  """Renames a job queue file.
+
+  """
+  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+    return False
+
+  os.rename(old, new)
+
+  return True
+
+
 def CloseBlockDevices(disks):
   """Closes the given block devices.