Fix a small typo in a constant
[ganeti-local] / lib / backend.py
index c00d964..b993f9d 100644 (file)
@@ -48,6 +48,40 @@ def _GetSshRunner():
   return ssh.SshRunner()
 
 
+def _CleanDirectory(path, exclude=[]):
+  """Removes all regular files in a directory.
+
+  @param exclude: List of files to be excluded.
+  @type exclude: list
+
+  """
+  if not os.path.isdir(path):
+    return
+
+  # Normalize excluded paths
+  exclude = [os.path.normpath(i) for i in exclude]
+
+  for rel_name in utils.ListVisibleFiles(path):
+    full_name = os.path.normpath(os.path.join(path, rel_name))
+    if full_name in exclude:
+      continue
+    if os.path.isfile(full_name) and not os.path.islink(full_name):
+      utils.RemoveFile(full_name)
+
+
+def _JobQueuePurge(keep_lock):
+  """Removes job queue files and archived jobs
+
+  """
+  if keep_lock:
+    exclude = [constants.JOB_QUEUE_LOCK_FILE]
+  else:
+    exclude = []
+
+  _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
+  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
 def _GetMasterInfo():
   """Return the master ip and netdev.
 
@@ -169,11 +203,10 @@ def LeaveCluster():
   """Cleans up the current node and prepares it to be removed from the cluster.
 
   """
-  if os.path.isdir(constants.DATA_DIR):
-    for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
-      full_name = os.path.join(constants.DATA_DIR, rel_name)
-      if os.path.isfile(full_name) and not os.path.islink(full_name):
-        utils.RemoveFile(full_name)
+  _CleanDirectory(constants.DATA_DIR)
+
+  # The lock can be removed because we're going to quit anyway.
+  _JobQueuePurge(keep_lock=False)
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
@@ -497,8 +530,9 @@ def AddOSToInstance(instance, os_disk, swap_disk):
                                 inst_os.path, create_script, instance.name,
                                 real_os_dev.dev_path, real_swap_dev.dev_path,
                                 logfile)
+  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", command, result.fail_reason, logfile,
@@ -1037,9 +1071,9 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
     constants.ETC_HOSTS,
     constants.SSH_KNOWN_HOSTS_FILE,
     constants.VNC_PASSWORD_FILE,
-    constants.JOB_QUEUE_SERIAL_FILE,
     ]
   allowed_files.extend(ssconf.SimpleStore().GetFileList())
+
   if file_name not in allowed_files:
     logging.error("Filename passed to UploadFile not in allowed"
                  " upload targets: '%s'", file_name)
@@ -1348,7 +1382,8 @@ def FinalizeExport(instance, snap_disks):
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
-    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
+    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
+               '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
 
@@ -1452,8 +1487,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
                                logfile)
 
   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
 
   if result.failed:
     logging.error("os import command '%s' returned error: %s"
@@ -1647,6 +1683,54 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
   return result
 
 
+def _IsJobQueueFile(file_name):
+  """Checks whether the given filename is in the queue directory.
+
+  """
+  queue_dir = os.path.normpath(constants.QUEUE_DIR)
+  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+  if not result:
+    logging.error("'%s' is not a file in the queue directory",
+                  file_name)
+
+  return result
+
+
+def JobQueueUpdate(file_name, content):
+  """Updates a file in the queue directory.
+
+  """
+  if not _IsJobQueueFile(file_name):
+    return False
+
+  # Write and replace the file atomically
+  utils.WriteFile(file_name, data=content)
+
+  return True
+
+
+def JobQueuePurge():
+  """Removes job queue files and archived jobs
+
+  """
+  # The lock must not be removed, otherwise another process could create
+  # it again.
+  return _JobQueuePurge(keep_lock=True)
+
+
+def JobQueueRename(old, new):
+  """Renames a job queue file.
+
+  """
+  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+    return False
+
+  os.rename(old, new)
+
+  return True
+
+
 def CloseBlockDevices(disks):
   """Closes the given block devices.