import subprocess
import random
import logging
+import tempfile
from ganeti import errors
from ganeti import utils
return ssh.SshRunner()
-def StartMaster():
- """Activate local node as master node.
+def _CleanDirectory(path, exclude=[]):
+ """Removes all regular files in a directory.
- There are two needed steps for this:
- - run the master script
- - register the cron script
+ @param exclude: List of files to be excluded.
+ @type exclude: list
"""
- result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
+ if not os.path.isdir(path):
+ return
- if result.failed:
- logging.error("could not activate cluster interface with command %s,"
- " error: '%s'", result.cmd, result.output)
+ # Normalize excluded paths
+ exclude = [os.path.normpath(i) for i in exclude]
+
+ for rel_name in utils.ListVisibleFiles(path):
+ full_name = os.path.normpath(os.path.join(path, rel_name))
+ if full_name in exclude:
+ continue
+ if os.path.isfile(full_name) and not os.path.islink(full_name):
+ utils.RemoveFile(full_name)
+
+
+def _JobQueuePurge(keep_lock):
+ """Removes job queue files and archived jobs
+
+ """
+ if keep_lock:
+ exclude = [constants.JOB_QUEUE_LOCK_FILE]
+ else:
+ exclude = []
+
+ _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
+ _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
+
+
+def _GetMasterInfo():
+ """Return the master ip and netdev.
+
+ """
+ try:
+ ss = ssconf.SimpleStore()
+ master_netdev = ss.GetMasterNetdev()
+ master_ip = ss.GetMasterIP()
+ except errors.ConfigurationError, err:
+ logging.exception("Cluster configuration incomplete")
+ return (None, None)
+ return (master_netdev, master_ip)
+
+
+def StartMaster(start_daemons):
+ """Activate local node as master node.
+
+ The function will always try activate the IP address of the master
+ (if someone else has it, then it won't). Then, if the start_daemons
+ parameter is True, it will also start the master daemons
+ (ganet-masterd and ganeti-rapi).
+
+ """
+ ok = True
+ master_netdev, master_ip = _GetMasterInfo()
+ if not master_netdev:
return False
- return True
+ if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
+ if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
+ source=constants.LOCALHOST_IP_ADDRESS):
+ # we already have the ip:
+ logging.debug("Already started")
+ else:
+ logging.error("Someone else has the master ip, not activating")
+ ok = False
+ else:
+ result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
+ "dev", master_netdev, "label",
+ "%s:0" % master_netdev])
+ if result.failed:
+ logging.error("Can't activate master IP: %s", result.output)
+ ok = False
+
+ result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
+ "-s", master_ip, master_ip])
+ # we'll ignore the exit code of arping
+
+ # and now start the master and rapi daemons
+ if start_daemons:
+ for daemon in 'ganeti-masterd', 'ganeti-rapi':
+ result = utils.RunCmd([daemon])
+ if result.failed:
+ logging.error("Can't start daemon %s: %s", daemon, result.output)
+ ok = False
+ return ok
-def StopMaster():
+def StopMaster(stop_daemons):
"""Deactivate this node as master.
- This runs the master stop script.
+ The function will always try to deactivate the IP address of the
+ master. Then, if the stop_daemons parameter is True, it will also
+ stop the master daemons (ganet-masterd and ganeti-rapi).
"""
- result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
+ master_netdev, master_ip = _GetMasterInfo()
+ if not master_netdev:
+ return False
+ result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
+ "dev", master_netdev])
if result.failed:
- logging.error("could not deactivate cluster interface with command %s,"
- " error: '%s'", result.cmd, result.output)
- return False
+ logging.error("Can't remove the master IP, error: %s", result.output)
+ # but otherwise ignore the failure
+
+ if stop_daemons:
+ # stop/kill the rapi and the master daemon
+ for daemon in constants.RAPI_PID, constants.MASTERD_PID:
+ utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
return True
"""Cleans up the current node and prepares it to be removed from the cluster.
"""
- if os.path.isdir(constants.DATA_DIR):
- for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
- full_name = os.path.join(constants.DATA_DIR, rel_name)
- if os.path.isfile(full_name) and not os.path.islink(full_name):
- utils.RemoveFile(full_name)
+ _CleanDirectory(constants.DATA_DIR)
+
+ # The lock can be removed because we're going to quit anyway.
+ _JobQueuePurge(keep_lock=False)
try:
priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
inst_os.path, create_script, instance.name,
real_os_dev.dev_path, real_swap_dev.dev_path,
logfile)
+ env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
- result = utils.RunCmd(command)
+ result = utils.RunCmd(command, env=env)
if result.failed:
logging.error("os create command '%s' returned error: %s, logfile: %s,"
" output: %s", command, result.fail_reason, logfile,
constants.VNC_PASSWORD_FILE,
]
allowed_files.extend(ssconf.SimpleStore().GetFileList())
+
if file_name not in allowed_files:
logging.error("Filename passed to UploadFile not in allowed"
" upload targets: '%s'", file_name)
config.set(constants.INISECT_INS, 'nic%d_mac' %
nic_count, '%s' % nic.mac)
config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
- config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge)
+ config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
+ '%s' % nic.bridge)
# TODO: redundant: on load can read nics until it doesn't exist
config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
logfile)
command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
+ env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
- result = utils.RunCmd(command)
+ result = utils.RunCmd(command, env=env)
if result.failed:
logging.error("os import command '%s' returned error: %s"
return result
+def _IsJobQueueFile(file_name):
+ """Checks whether the given filename is in the queue directory.
+
+ """
+ queue_dir = os.path.normpath(constants.QUEUE_DIR)
+ result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
+
+ if not result:
+ logging.error("'%s' is not a file in the queue directory",
+ file_name)
+
+ return result
+
+
+def JobQueueUpdate(file_name, content):
+ """Updates a file in the queue directory.
+
+ """
+ if not _IsJobQueueFile(file_name):
+ return False
+
+ # Write and replace the file atomically
+ utils.WriteFile(file_name, data=content)
+
+ return True
+
+
+def JobQueuePurge():
+ """Removes job queue files and archived jobs
+
+ """
+ # The lock must not be removed, otherwise another process could create
+ # it again.
+ return _JobQueuePurge(keep_lock=True)
+
+
+def JobQueueRename(old, new):
+ """Renames a job queue file.
+
+ """
+ if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
+ return False
+
+ os.rename(old, new)
+
+ return True
+
+
def CloseBlockDevices(disks):
"""Closes the given block devices.