Fix a small typo in a constant
[ganeti-local] / lib / backend.py
index 1054108..b993f9d 100644 (file)
@@ -32,6 +32,7 @@ import re
 import subprocess
 import random
 import logging
+import tempfile
 
 from ganeti import errors
 from ganeti import utils
@@ -47,36 +48,120 @@ def _GetSshRunner():
   return ssh.SshRunner()
 
 
-def StartMaster():
-  """Activate local node as master node.
+def _CleanDirectory(path, exclude=[]):
+  """Removes all regular files in a directory.
 
-  There are two needed steps for this:
-    - run the master script
-    - register the cron script
+  @param exclude: List of files to be excluded.
+  @type exclude: list
 
   """
-  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
+  if not os.path.isdir(path):
+    return
 
-  if result.failed:
-    logging.error("could not activate cluster interface with command %s,"
-                  " error: '%s'", result.cmd, result.output)
+  # Normalize excluded paths
+  exclude = [os.path.normpath(i) for i in exclude]
+
+  for rel_name in utils.ListVisibleFiles(path):
+    full_name = os.path.normpath(os.path.join(path, rel_name))
+    if full_name in exclude:
+      continue
+    if os.path.isfile(full_name) and not os.path.islink(full_name):
+      utils.RemoveFile(full_name)
+
+
+def _JobQueuePurge(keep_lock):
+  """Removes job queue files and archived jobs
+
+  """
+  if keep_lock:
+    exclude = [constants.JOB_QUEUE_LOCK_FILE]
+  else:
+    exclude = []
+
+  _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
+  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
+def _GetMasterInfo():
+  """Return the master ip and netdev.
+
+  """
+  try:
+    ss = ssconf.SimpleStore()
+    master_netdev = ss.GetMasterNetdev()
+    master_ip = ss.GetMasterIP()
+  except errors.ConfigurationError, err:
+    logging.exception("Cluster configuration incomplete")
+    return (None, None)
+  return (master_netdev, master_ip)
+
+
+def StartMaster(start_daemons):
+  """Activate local node as master node.
+
+  The function will always try activate the IP address of the master
+  (if someone else has it, then it won't). Then, if the start_daemons
+  parameter is True, it will also start the master daemons
+  (ganet-masterd and ganeti-rapi).
+
+  """
+  ok = True
+  master_netdev, master_ip = _GetMasterInfo()
+  if not master_netdev:
     return False
 
-  return True
+  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
+    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
+                     source=constants.LOCALHOST_IP_ADDRESS):
+      # we already have the ip:
+      logging.debug("Already started")
+    else:
+      logging.error("Someone else has the master ip, not activating")
+      ok = False
+  else:
+    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
+                           "dev", master_netdev, "label",
+                           "%s:0" % master_netdev])
+    if result.failed:
+      logging.error("Can't activate master IP: %s", result.output)
+      ok = False
+
+    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
+                           "-s", master_ip, master_ip])
+    # we'll ignore the exit code of arping
+
+  # and now start the master and rapi daemons
+  if start_daemons:
+    for daemon in 'ganeti-masterd', 'ganeti-rapi':
+      result = utils.RunCmd([daemon])
+      if result.failed:
+        logging.error("Can't start daemon %s: %s", daemon, result.output)
+        ok = False
+  return ok
 
 
-def StopMaster():
+def StopMaster(stop_daemons):
   """Deactivate this node as master.
 
-  This runs the master stop script.
+  The function will always try to deactivate the IP address of the
+  master. Then, if the stop_daemons parameter is True, it will also
+  stop the master daemons (ganet-masterd and ganeti-rapi).
 
   """
-  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
+  master_netdev, master_ip = _GetMasterInfo()
+  if not master_netdev:
+    return False
 
+  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
+                         "dev", master_netdev])
   if result.failed:
-    logging.error("could not deactivate cluster interface with command %s,"
-                  " error: '%s'", result.cmd, result.output)
-    return False
+    logging.error("Can't remove the master IP, error: %s", result.output)
+    # but otherwise ignore the failure
+
+  if stop_daemons:
+    # stop/kill the rapi and the master daemon
+    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
+      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
 
   return True
 
@@ -118,11 +203,10 @@ def LeaveCluster():
   """Cleans up the current node and prepares it to be removed from the cluster.
 
   """
-  if os.path.isdir(constants.DATA_DIR):
-    for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
-      full_name = os.path.join(constants.DATA_DIR, rel_name)
-      if os.path.isfile(full_name) and not os.path.islink(full_name):
-        utils.RemoveFile(full_name)
+  _CleanDirectory(constants.DATA_DIR)
+
+  # The lock can be removed because we're going to quit anyway.
+  _JobQueuePurge(keep_lock=False)
 
   try:
     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
@@ -446,8 +530,9 @@ def AddOSToInstance(instance, os_disk, swap_disk):
                                 inst_os.path, create_script, instance.name,
                                 real_os_dev.dev_path, real_swap_dev.dev_path,
                                 logfile)
+  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
   if result.failed:
     logging.error("os create command '%s' returned error: %s, logfile: %s,"
                   " output: %s", command, result.fail_reason, logfile,
@@ -988,6 +1073,7 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
     constants.VNC_PASSWORD_FILE,
     ]
   allowed_files.extend(ssconf.SimpleStore().GetFileList())
+
   if file_name not in allowed_files:
     logging.error("Filename passed to UploadFile not in allowed"
                  " upload targets: '%s'", file_name)
@@ -1296,7 +1382,8 @@ def FinalizeExport(instance, snap_disks):
     config.set(constants.INISECT_INS, 'nic%d_mac' %
                nic_count, '%s' % nic.mac)
     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
-    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
+    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
+               '%s' % nic.bridge)
   # TODO: redundant: on load can read nics until it doesn't exist
   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
 
@@ -1400,8 +1487,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
                                logfile)
 
   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
 
-  result = utils.RunCmd(command)
+  result = utils.RunCmd(command, env=env)
 
   if result.failed:
     logging.error("os import command '%s' returned error: %s"
@@ -1595,6 +1683,54 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
   return result
 
 
+def _IsJobQueueFile(file_name):
+  """Checks whether the given filename is in the queue directory.
+
+  """
+  queue_dir = os.path.normpath(constants.QUEUE_DIR)
+  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+  if not result:
+    logging.error("'%s' is not a file in the queue directory",
+                  file_name)
+
+  return result
+
+
+def JobQueueUpdate(file_name, content):
+  """Updates a file in the queue directory.
+
+  """
+  if not _IsJobQueueFile(file_name):
+    return False
+
+  # Write and replace the file atomically
+  utils.WriteFile(file_name, data=content)
+
+  return True
+
+
+def JobQueuePurge():
+  """Removes job queue files and archived jobs
+
+  """
+  # The lock must not be removed, otherwise another process could create
+  # it again.
+  return _JobQueuePurge(keep_lock=True)
+
+
+def JobQueueRename(old, new):
+  """Renames a job queue file.
+
+  """
+  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+    return False
+
+  os.rename(old, new)
+
+  return True
+
+
 def CloseBlockDevices(disks):
   """Closes the given block devices.