Change backend._GetMasterInfo to return more data
[ganeti-local] / lib / backend.py
index 68b1653..ca86245 100644 (file)
@@ -48,27 +48,59 @@ def _GetSshRunner():
   return ssh.SshRunner()
 
 
   return ssh.SshRunner()
 
 
-def _CleanDirectory(path):
+def _CleanDirectory(path, exclude=[]):
+  """Removes all regular files in a directory.
+
+  @param exclude: List of files to be excluded.
+  @type exclude: list
+
+  """
   if not os.path.isdir(path):
     return
   if not os.path.isdir(path):
     return
+
+  # Normalize excluded paths
+  exclude = [os.path.normpath(i) for i in exclude]
+
   for rel_name in utils.ListVisibleFiles(path):
   for rel_name in utils.ListVisibleFiles(path):
-    full_name = os.path.join(path, rel_name)
+    full_name = os.path.normpath(os.path.join(path, rel_name))
+    if full_name in exclude:
+      continue
     if os.path.isfile(full_name) and not os.path.islink(full_name):
       utils.RemoveFile(full_name)
 
 
     if os.path.isfile(full_name) and not os.path.islink(full_name):
       utils.RemoveFile(full_name)
 
 
-def _GetMasterInfo():
-  """Return the master ip and netdev.
+def _JobQueuePurge(keep_lock):
+  """Removes job queue files and archived jobs
+
+  """
+  if keep_lock:
+    exclude = [constants.JOB_QUEUE_LOCK_FILE]
+  else:
+    exclude = []
+
+  _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
+  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
+def GetMasterInfo():
+  """Returns master information.
+
+  This is an utility function to compute master information, either
+  for consumption here or from the node daemon.
+
+  @rtype: tuple
+  @return: (master_netdev, master_ip, master_name)
 
   """
   try:
     ss = ssconf.SimpleStore()
     master_netdev = ss.GetMasterNetdev()
     master_ip = ss.GetMasterIP()
 
   """
   try:
     ss = ssconf.SimpleStore()
     master_netdev = ss.GetMasterNetdev()
     master_ip = ss.GetMasterIP()
+    master_node = ss.GetMasterNode()
   except errors.ConfigurationError, err:
     logging.exception("Cluster configuration incomplete")
     return (None, None)
   except errors.ConfigurationError, err:
     logging.exception("Cluster configuration incomplete")
     return (None, None)
-  return (master_netdev, master_ip)
+  return (master_netdev, master_ip, master_node)
 
 
 def StartMaster(start_daemons):
 
 
 def StartMaster(start_daemons):
@@ -81,7 +113,7 @@ def StartMaster(start_daemons):
 
   """
   ok = True
 
   """
   ok = True
-  master_netdev, master_ip = _GetMasterInfo()
+  master_netdev, master_ip, _ = GetMasterInfo()
   if not master_netdev:
     return False
 
   if not master_netdev:
     return False
 
@@ -123,7 +155,7 @@ def StopMaster(stop_daemons):
   stop the master daemons (ganet-masterd and ganeti-rapi).
 
   """
   stop the master daemons (ganet-masterd and ganeti-rapi).
 
   """
-  master_netdev, master_ip = _GetMasterInfo()
+  master_netdev, master_ip, _ = GetMasterInfo()
   if not master_netdev:
     return False
 
   if not master_netdev:
     return False
 
@@ -180,7 +212,8 @@ def LeaveCluster():
   """
   _CleanDirectory(constants.DATA_DIR)
 
   """
   _CleanDirectory(constants.DATA_DIR)
 
-  JobQueuePurge()
+  # The lock can be removed because we're going to quit anyway.
+  _JobQueuePurge(keep_lock=False)
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
@@ -504,8 +537,9 @@ def AddOSToInstance(instance, os_disk, swap_disk):
                                 inst_os.path, create_script, instance.name,
                                 real_os_dev.dev_path, real_swap_dev.dev_path,
                                 logfile)
                                 inst_os.path, create_script, instance.name,
                                 real_os_dev.dev_path, real_swap_dev.dev_path,
                                 logfile)
+  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
 
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", command, result.fail_reason, logfile,
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", command, result.fail_reason, logfile,
@@ -1460,8 +1494,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
                                logfile)
 
   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
                                logfile)
 
   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
 
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
 
   if result.failed:
     logging.error("os import command '%s' returned error: %s"
 
   if result.failed:
     logging.error("os import command '%s' returned error: %s"
@@ -1655,14 +1690,25 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
   return result
 
 
   return result
 
 
-def JobQueueUpdate(file_name, content):
-  """Updates a file in the queue directory.
+def _IsJobQueueFile(file_name):
+  """Checks whether the given filename is in the queue directory.
 
   """
   queue_dir = os.path.normpath(constants.QUEUE_DIR)
 
   """
   queue_dir = os.path.normpath(constants.QUEUE_DIR)
-  if os.path.commonprefix([queue_dir, file_name]) != queue_dir:
+  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+  if not result:
     logging.error("'%s' is not a file in the queue directory",
                   file_name)
     logging.error("'%s' is not a file in the queue directory",
                   file_name)
+
+  return result
+
+
+def JobQueueUpdate(file_name, content):
+  """Updates a file in the queue directory.
+
+  """
+  if not _IsJobQueueFile(file_name):
     return False
 
   # Write and replace the file atomically
     return False
 
   # Write and replace the file atomically
@@ -1675,8 +1721,21 @@ def JobQueuePurge():
   """Removes job queue files and archived jobs
 
   """
   """Removes job queue files and archived jobs
 
   """
-  _CleanDirectory(constants.QUEUE_DIR)
-  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+  # The lock must not be removed, otherwise another process could create
+  # it again.
+  return _JobQueuePurge(keep_lock=True)
+
+
+def JobQueueRename(old, new):
+  """Renames a job queue file.
+
+  """
+  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+    return False
+
+  os.rename(old, new)
+
+  return True
 
 
 def CloseBlockDevices(disks):
 
 
 def CloseBlockDevices(disks):