X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/18682bca5711b7b492aeb3cd3a550087f1413e0b..6abe9194834b97e375fdbb71bd5573de4b529334:/lib/backend.py diff --git a/lib/backend.py b/lib/backend.py index 1054108..b993f9d 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -32,6 +32,7 @@ import re import subprocess import random import logging +import tempfile from ganeti import errors from ganeti import utils @@ -47,36 +48,120 @@ def _GetSshRunner(): return ssh.SshRunner() -def StartMaster(): - """Activate local node as master node. +def _CleanDirectory(path, exclude=[]): + """Removes all regular files in a directory. - There are two needed steps for this: - - run the master script - - register the cron script + @param exclude: List of files to be excluded. + @type exclude: list """ - result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"]) + if not os.path.isdir(path): + return - if result.failed: - logging.error("could not activate cluster interface with command %s," - " error: '%s'", result.cmd, result.output) + # Normalize excluded paths + exclude = [os.path.normpath(i) for i in exclude] + + for rel_name in utils.ListVisibleFiles(path): + full_name = os.path.normpath(os.path.join(path, rel_name)) + if full_name in exclude: + continue + if os.path.isfile(full_name) and not os.path.islink(full_name): + utils.RemoveFile(full_name) + + +def _JobQueuePurge(keep_lock): + """Removes job queue files and archived jobs + + """ + if keep_lock: + exclude = [constants.JOB_QUEUE_LOCK_FILE] + else: + exclude = [] + + _CleanDirectory(constants.QUEUE_DIR, exclude=exclude) + _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR) + + +def _GetMasterInfo(): + """Return the master ip and netdev. + + """ + try: + ss = ssconf.SimpleStore() + master_netdev = ss.GetMasterNetdev() + master_ip = ss.GetMasterIP() + except errors.ConfigurationError, err: + logging.exception("Cluster configuration incomplete") + return (None, None) + return (master_netdev, master_ip) + + +def StartMaster(start_daemons): + """Activate local node as master node. + + The function will always try activate the IP address of the master + (if someone else has it, then it won't). Then, if the start_daemons + parameter is True, it will also start the master daemons + (ganet-masterd and ganeti-rapi). + + """ + ok = True + master_netdev, master_ip = _GetMasterInfo() + if not master_netdev: return False - return True + if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT): + if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT, + source=constants.LOCALHOST_IP_ADDRESS): + # we already have the ip: + logging.debug("Already started") + else: + logging.error("Someone else has the master ip, not activating") + ok = False + else: + result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip, + "dev", master_netdev, "label", + "%s:0" % master_netdev]) + if result.failed: + logging.error("Can't activate master IP: %s", result.output) + ok = False + + result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, + "-s", master_ip, master_ip]) + # we'll ignore the exit code of arping + + # and now start the master and rapi daemons + if start_daemons: + for daemon in 'ganeti-masterd', 'ganeti-rapi': + result = utils.RunCmd([daemon]) + if result.failed: + logging.error("Can't start daemon %s: %s", daemon, result.output) + ok = False + return ok -def StopMaster(): +def StopMaster(stop_daemons): """Deactivate this node as master. - This runs the master stop script. + The function will always try to deactivate the IP address of the + master. Then, if the stop_daemons parameter is True, it will also + stop the master daemons (ganet-masterd and ganeti-rapi). """ - result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"]) + master_netdev, master_ip = _GetMasterInfo() + if not master_netdev: + return False + result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip, + "dev", master_netdev]) if result.failed: - logging.error("could not deactivate cluster interface with command %s," - " error: '%s'", result.cmd, result.output) - return False + logging.error("Can't remove the master IP, error: %s", result.output) + # but otherwise ignore the failure + + if stop_daemons: + # stop/kill the rapi and the master daemon + for daemon in constants.RAPI_PID, constants.MASTERD_PID: + utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon))) return True @@ -118,11 +203,10 @@ def LeaveCluster(): """Cleans up the current node and prepares it to be removed from the cluster. """ - if os.path.isdir(constants.DATA_DIR): - for rel_name in utils.ListVisibleFiles(constants.DATA_DIR): - full_name = os.path.join(constants.DATA_DIR, rel_name) - if os.path.isfile(full_name) and not os.path.islink(full_name): - utils.RemoveFile(full_name) + _CleanDirectory(constants.DATA_DIR) + + # The lock can be removed because we're going to quit anyway. + _JobQueuePurge(keep_lock=False) try: priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS) @@ -446,8 +530,9 @@ def AddOSToInstance(instance, os_disk, swap_disk): inst_os.path, create_script, instance.name, real_os_dev.dev_path, real_swap_dev.dev_path, logfile) + env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()} - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=env) if result.failed: logging.error("os create command '%s' returned error: %s, logfile: %s," " output: %s", command, result.fail_reason, logfile, @@ -988,6 +1073,7 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): constants.VNC_PASSWORD_FILE, ] allowed_files.extend(ssconf.SimpleStore().GetFileList()) + if file_name not in allowed_files: logging.error("Filename passed to UploadFile not in allowed" " upload targets: '%s'", file_name) @@ -1296,7 +1382,8 @@ def FinalizeExport(instance, snap_disks): config.set(constants.INISECT_INS, 'nic%d_mac' % nic_count, '%s' % nic.mac) config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip) - config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge) + config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, + '%s' % nic.bridge) # TODO: redundant: on load can read nics until it doesn't exist config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count) @@ -1400,8 +1487,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): logfile) command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd]) + env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()} - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=env) if result.failed: logging.error("os import command '%s' returned error: %s" @@ -1595,6 +1683,54 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): return result +def _IsJobQueueFile(file_name): + """Checks whether the given filename is in the queue directory. + + """ + queue_dir = os.path.normpath(constants.QUEUE_DIR) + result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) + + if not result: + logging.error("'%s' is not a file in the queue directory", + file_name) + + return result + + +def JobQueueUpdate(file_name, content): + """Updates a file in the queue directory. + + """ + if not _IsJobQueueFile(file_name): + return False + + # Write and replace the file atomically + utils.WriteFile(file_name, data=content) + + return True + + +def JobQueuePurge(): + """Removes job queue files and archived jobs + + """ + # The lock must not be removed, otherwise another process could create + # it again. + return _JobQueuePurge(keep_lock=True) + + +def JobQueueRename(old, new): + """Renames a job queue file. + + """ + if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)): + return False + + os.rename(old, new) + + return True + + def CloseBlockDevices(disks): """Closes the given block devices.