X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/1c65840bad209afd799ac7cac771921abbb948e2..e49099a4a9ea70ce0c48dc5b8eb322da97d910fc:/lib/backend.py diff --git a/lib/backend.py b/lib/backend.py index b78a5ec..0df7a00 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -32,6 +32,7 @@ import re import subprocess import random import logging +import tempfile from ganeti import errors from ganeti import utils @@ -43,8 +44,62 @@ from ganeti import objects from ganeti import ssconf -def _GetSshRunner(): - return ssh.SshRunner() +def _GetConfig(): + return ssconf.SimpleConfigReader() + + +def _GetSshRunner(cluster_name): + return ssh.SshRunner(cluster_name) + + +def _CleanDirectory(path, exclude=[]): + """Removes all regular files in a directory. + + @param exclude: List of files to be excluded. + @type exclude: list + + """ + if not os.path.isdir(path): + return + + # Normalize excluded paths + exclude = [os.path.normpath(i) for i in exclude] + + for rel_name in utils.ListVisibleFiles(path): + full_name = os.path.normpath(os.path.join(path, rel_name)) + if full_name in exclude: + continue + if os.path.isfile(full_name) and not os.path.islink(full_name): + utils.RemoveFile(full_name) + + +def JobQueuePurge(): + """Removes job queue files and archived jobs + + """ + _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE]) + _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR) + + +def GetMasterInfo(): + """Returns master information. + + This is an utility function to compute master information, either + for consumption here or from the node daemon. + + @rtype: tuple + @return: (master_netdev, master_ip, master_name) + + """ + try: + cfg = _GetConfig() + master_netdev = cfg.GetMasterNetdev() + master_ip = cfg.GetMasterIP() + master_node = cfg.GetMasterNode() + except errors.ConfigurationError, err: + logging.exception("Cluster configuration incomplete") + return (None, None) + return (master_netdev, master_ip, master_node) def StartMaster(start_daemons): @@ -56,14 +111,39 @@ def StartMaster(start_daemons): (ganet-masterd and ganeti-rapi). """ - result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"]) - - if result.failed: - logging.error("could not activate cluster interface with command %s," - " error: '%s'", result.cmd, result.output) + ok = True + master_netdev, master_ip, _ = GetMasterInfo() + if not master_netdev: return False - return True + if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT): + if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT, + source=constants.LOCALHOST_IP_ADDRESS): + # we already have the ip: + logging.debug("Already started") + else: + logging.error("Someone else has the master ip, not activating") + ok = False + else: + result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip, + "dev", master_netdev, "label", + "%s:0" % master_netdev]) + if result.failed: + logging.error("Can't activate master IP: %s", result.output) + ok = False + + result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, + "-s", master_ip, master_ip]) + # we'll ignore the exit code of arping + + # and now start the master and rapi daemons + if start_daemons: + for daemon in 'ganeti-masterd', 'ganeti-rapi': + result = utils.RunCmd([daemon]) + if result.failed: + logging.error("Can't start daemon %s: %s", daemon, result.output) + ok = False + return ok def StopMaster(stop_daemons): @@ -74,12 +154,20 @@ def StopMaster(stop_daemons): stop the master daemons (ganet-masterd and ganeti-rapi). """ - result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"]) + master_netdev, master_ip, _ = GetMasterInfo() + if not master_netdev: + return False + result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip, + "dev", master_netdev]) if result.failed: - logging.error("could not deactivate cluster interface with command %s," - " error: '%s'", result.cmd, result.output) - return False + logging.error("Can't remove the master IP, error: %s", result.output) + # but otherwise ignore the failure + + if stop_daemons: + # stop/kill the rapi and the master daemon + for daemon in constants.RAPI_PID, constants.MASTERD_PID: + utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon))) return True @@ -121,11 +209,8 @@ def LeaveCluster(): """Cleans up the current node and prepares it to be removed from the cluster. """ - if os.path.isdir(constants.DATA_DIR): - for rel_name in utils.ListVisibleFiles(constants.DATA_DIR): - full_name = os.path.join(constants.DATA_DIR, rel_name) - if os.path.isfile(full_name) and not os.path.islink(full_name): - utils.RemoveFile(full_name) + _CleanDirectory(constants.DATA_DIR) + JobQueuePurge() try: priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS) @@ -146,18 +231,21 @@ def LeaveCluster(): raise errors.QuitGanetiException(False, 'Shutdown scheduled') -def GetNodeInfo(vgname): +def GetNodeInfo(vgname, hypervisor_type): """Gives back a hash with different informations about the node. - Returns: - { 'vg_size' : xxx, 'vg_free' : xxx, 'memory_domain0': xxx, - 'memory_free' : xxx, 'memory_total' : xxx } - where - vg_size is the size of the configured volume group in MiB - vg_free is the free size of the volume group in MiB - memory_dom0 is the memory allocated for domain0 in MiB - memory_free is the currently available (free) ram in MiB - memory_total is the total number of ram in MiB + @type vgname: C{string} + @param vgname: the name of the volume group to ask for disk space information + @type hypervisor_type: C{str} + @param hypervisor_type: the name of the hypervisor to ask for + memory information + @rtype: C{dict} + @return: dictionary with the following keys: + - vg_size is the size of the configured volume group in MiB + - vg_free is the free size of the volume group in MiB + - memory_dom0 is the memory allocated for domain0 in MiB + - memory_free is the currently available (free) ram in MiB + - memory_total is the total number of ram in MiB """ outputarray = {} @@ -165,7 +253,7 @@ def GetNodeInfo(vgname): outputarray['vg_size'] = vginfo['vg_size'] outputarray['vg_free'] = vginfo['vg_free'] - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(hypervisor_type) hyp_info = hyper.GetNodeInfo() if hyp_info is not None: outputarray.update(hyp_info) @@ -179,28 +267,39 @@ def GetNodeInfo(vgname): return outputarray -def VerifyNode(what): +def VerifyNode(what, cluster_name): """Verify the status of the local node. - Args: - what - a dictionary of things to check: - 'filelist' : list of files for which to compute checksums - 'nodelist' : list of nodes we should check communication with - 'hypervisor': run the hypervisor-specific verify + Based on the input L{what} parameter, various checks are done on the + local node. + + If the I{filelist} key is present, this list of + files is checksummed and the file/checksum pairs are returned. - Requested files on local node are checksummed and the result returned. + If the I{nodelist} key is present, we check that we have + connectivity via ssh with the target nodes (and check the hostname + report). + + If the I{node-net-test} key is present, we check that we have + connectivity to the given nodes via both primary IP and, if + applicable, secondary IPs. + + @type what: C{dict} + @param what: a dictionary of things to check: + - filelist: list of files for which to compute checksums + - nodelist: list of nodes we should check ssh communication with + - node-net-test: list of nodes we should check node daemon port + connectivity with + - hypervisor: list with hypervisors to run the verify for - The nodelist is traversed, with the following checks being made - for each node: - - known_hosts key correct - - correct resolving of node name (target node returns its own hostname - by ssh-execution of 'hostname', result compared against name in list. """ result = {} if 'hypervisor' in what: - result['hypervisor'] = hypervisor.GetHypervisor().Verify() + result['hypervisor'] = my_dict = {} + for hv_name in what['hypervisor']: + my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify() if 'filelist' in what: result['filelist'] = utils.FingerprintFiles(what['filelist']) @@ -209,7 +308,7 @@ def VerifyNode(what): result['nodelist'] = {} random.shuffle(what['nodelist']) for node in what['nodelist']: - success, message = _GetSshRunner().VerifyNodeHostname(node) + success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node) if not success: result['nodelist'][node] = message if 'node-net-test' in what: @@ -226,7 +325,7 @@ def VerifyNode(what): " primary/secondary IP" " in the node list") else: - port = ssconf.SimpleStore().GetNodeDaemonPort() + port = utils.GetNodeDaemonPort() for name, pip, sip in what['node-net-test']: fail = [] if not utils.TcpPing(pip, port, source=my_pip): @@ -330,41 +429,49 @@ def BridgesExist(bridges_list): return True -def GetInstanceList(): +def GetInstanceList(hypervisor_list): """Provides a list of instances. - Returns: - A list of all running instances on the current node - - instance1.example.com - - instance2.example.com + @type hypervisor_list: list + @param hypervisor_list: the list of hypervisors to query information + + @rtype: list + @return: a list of all running instances on the current node + - instance1.example.com + - instance2.example.com """ - try: - names = hypervisor.GetHypervisor().ListInstances() - except errors.HypervisorError, err: - logging.exception("Error enumerating instances") - raise + results = [] + for hname in hypervisor_list: + try: + names = hypervisor.GetHypervisor(hname).ListInstances() + results.extend(names) + except errors.HypervisorError, err: + logging.exception("Error enumerating instances for hypevisor %s", hname) + # FIXME: should we somehow not propagate this to the master? + raise - return names + return results -def GetInstanceInfo(instance): +def GetInstanceInfo(instance, hname): """Gives back the informations about an instance as a dictionary. - Args: - instance: name of the instance (ex. instance1.example.com) + @type instance: string + @param instance: the instance name + @type hname: string + @param hname: the hypervisor type of the instance - Returns: - { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, } - where - memory: memory size of instance (int) - state: xen state of instance (string) - time: cpu time of instance (float) + @rtype: dict + @return: dictionary with the following keys: + - memory: memory size of instance (int) + - state: xen state of instance (string) + - time: cpu time of instance (float) """ output = {} - iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance) + iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance) if iinfo is not None: output['memory'] = iinfo[2] output['state'] = iinfo[4] @@ -373,34 +480,38 @@ def GetInstanceInfo(instance): return output -def GetAllInstancesInfo(): +def GetAllInstancesInfo(hypervisor_list): """Gather data about all instances. This is the equivalent of `GetInstanceInfo()`, except that it computes data for all instances at once, thus being faster if one needs data about more than one instance. - Returns: a dictionary of dictionaries, keys being the instance name, - and with values: - { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, } - where - memory: memory size of instance (int) - state: xen state of instance (string) - time: cpu time of instance (float) - vcpus: the number of cpus + @type hypervisor_list: list + @param hypervisor_list: list of hypervisors to query for instance data + + @rtype: dict of dicts + @return: dictionary of instance: data, with data having the following keys: + - memory: memory size of instance (int) + - state: xen state of instance (string) + - time: cpu time of instance (float) + - vcpuus: the number of vcpus """ output = {} - iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo() - if iinfo: - for name, inst_id, memory, vcpus, state, times in iinfo: - output[name] = { - 'memory': memory, - 'vcpus': vcpus, - 'state': state, - 'time': times, - } + for hname in hypervisor_list: + iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo() + if iinfo: + for name, inst_id, memory, vcpus, state, times in iinfo: + if name in output: + raise errors.HypervisorError("Instance %s running duplicate" % name) + output[name] = { + 'memory': memory, + 'vcpus': vcpus, + 'state': state, + 'time': times, + } return output @@ -449,8 +560,9 @@ def AddOSToInstance(instance, os_disk, swap_disk): inst_os.path, create_script, instance.name, real_os_dev.dev_path, real_swap_dev.dev_path, logfile) + env = {'HYPERVISOR': instance.hypervisor} - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=env) if result.failed: logging.error("os create command '%s' returned error: %s, logfile: %s," " output: %s", command, result.fail_reason, logfile, @@ -579,17 +691,19 @@ def _GatherBlockDevs(instance): def StartInstance(instance, extra_args): """Start an instance. - Args: - instance - name of instance to start. + @type instance: instance object + @param instance: the instance object + @rtype: boolean + @return: whether the startup was successful or not """ - running_instances = GetInstanceList() + running_instances = GetInstanceList([instance.hypervisor]) if instance.name in running_instances: return True block_devices = _GatherBlockDevs(instance) - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(instance.hypervisor) try: hyper.StartInstance(instance, block_devices, extra_args) @@ -603,16 +717,19 @@ def StartInstance(instance, extra_args): def ShutdownInstance(instance): """Shut an instance down. - Args: - instance - name of instance to shutdown. + @type instance: instance object + @param instance: the instance object + @rtype: boolean + @return: whether the startup was successful or not """ - running_instances = GetInstanceList() + hv_name = instance.hypervisor + running_instances = GetInstanceList([hv_name]) if instance.name not in running_instances: return True - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(hv_name) try: hyper.StopInstance(instance) except errors.HypervisorError, err: @@ -624,7 +741,7 @@ def ShutdownInstance(instance): time.sleep(1) for dummy in range(11): - if instance.name not in GetInstanceList(): + if instance.name not in GetInstanceList([hv_name]): break time.sleep(10) else: @@ -638,7 +755,7 @@ def ShutdownInstance(instance): return False time.sleep(1) - if instance.name in GetInstanceList(): + if instance.name in GetInstanceList([hv_name]): logging.error("could not shutdown instance '%s' even by destroy", instance.name) return False @@ -654,13 +771,13 @@ def RebootInstance(instance, reboot_type, extra_args): reboot_type - how to reboot [soft,hard,full] """ - running_instances = GetInstanceList() + running_instances = GetInstanceList([instance.hypervisor]) if instance.name not in running_instances: logging.error("Cannot reboot instance that is not running") return False - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(instance.hypervisor) if reboot_type == constants.INSTANCE_REBOOT_SOFT: try: hyper.RebootInstance(instance) @@ -677,18 +794,29 @@ def RebootInstance(instance, reboot_type, extra_args): else: raise errors.ParameterError("reboot_type invalid") - return True def MigrateInstance(instance, target, live): """Migrates an instance to another node. + @type instance: C{objects.Instance} + @param instance: the instance definition + @type target: string + @param target: the target node name + @type live: boolean + @param live: whether the migration should be done live or not (the + interpretation of this parameter is left to the hypervisor) + @rtype: tuple + @return: a tuple of (success, msg) where: + - succes is a boolean denoting the success/failure of the operation + - msg is a string with details in case of failure + """ - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(instance.hypervisor_name) try: - hyper.MigrateInstance(instance, target, live) + hyper.MigrateInstance(instance.name, target, live) except errors.HypervisorError, err: msg = "Failed to migrate instance: %s" % str(err) logging.error(msg) @@ -989,9 +1117,8 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE, constants.VNC_PASSWORD_FILE, - constants.JOB_QUEUE_SERIAL_FILE, ] - allowed_files.extend(ssconf.SimpleStore().GetFileList()) + if file_name not in allowed_files: logging.error("Filename passed to UploadFile not in allowed" " upload targets: '%s'", file_name) @@ -1209,7 +1336,7 @@ def SnapshotBlockDevice(disk): (disk.unique_id, disk.dev_type)) -def ExportSnapshot(disk, dest_node, instance): +def ExportSnapshot(disk, dest_node, instance, cluster_name): """Export a block device snapshot to a remote node. Args: @@ -1250,8 +1377,9 @@ def ExportSnapshot(disk, dest_node, instance): destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s", destdir, destdir, destfile) - remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS, - destcmd) + remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node, + constants.GANETI_RUNAS, + destcmd) # all commands have been checked, so we're safe to combine them command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)]) @@ -1300,7 +1428,8 @@ def FinalizeExport(instance, snap_disks): config.set(constants.INISECT_INS, 'nic%d_mac' % nic_count, '%s' % nic.mac) config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip) - config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge) + config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, + '%s' % nic.bridge) # TODO: redundant: on load can read nics until it doesn't exist config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count) @@ -1349,7 +1478,8 @@ def ExportInfo(dest): return config -def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): +def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image, + cluster_name): """Import an os image into an instance. Args: @@ -1394,8 +1524,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): os.mkdir(constants.LOG_OS_DIR, 0750) destcmd = utils.BuildShellCmd('cat %s', src_image) - remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS, - destcmd) + remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node, + constants.GANETI_RUNAS, + destcmd) comprcmd = "gunzip" impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)", @@ -1404,8 +1535,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): logfile) command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd]) + env = {'HYPERVISOR': instance.hypervisor} - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=env) if result.failed: logging.error("os import command '%s' returned error: %s" @@ -1489,8 +1621,9 @@ def _TransformFileStorageDir(file_storage_dir): normalized file_storage_dir (string) if valid, None otherwise """ + cfg = _GetConfig() file_storage_dir = os.path.normpath(file_storage_dir) - base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir() + base_file_storage_dir = cfg.GetFileStorageDir() if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) == base_file_storage_dir): logging.error("file storage directory '%s' is not under base file" @@ -1599,6 +1732,45 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): return result +def _IsJobQueueFile(file_name): + """Checks whether the given filename is in the queue directory. + + """ + queue_dir = os.path.normpath(constants.QUEUE_DIR) + result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) + + if not result: + logging.error("'%s' is not a file in the queue directory", + file_name) + + return result + + +def JobQueueUpdate(file_name, content): + """Updates a file in the queue directory. + + """ + if not _IsJobQueueFile(file_name): + return False + + # Write and replace the file atomically + utils.WriteFile(file_name, data=content) + + return True + + +def JobQueueRename(old, new): + """Renames a job queue file. + + """ + if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)): + return False + + os.rename(old, new) + + return True + + def CloseBlockDevices(disks): """Closes the given block devices.