X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/778b75bbcd3ee0906bd7b1a7af1fd984f1c4608b..263ab7cf73bdbfe5becdf3c5cf6335ff80c7becc:/lib/backend.py diff --git a/lib/backend.py b/lib/backend.py index b9e8a33..7e25c68 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -30,8 +30,10 @@ import stat import errno import re import subprocess +import random +import logging +import tempfile -from ganeti import logger from ganeti import errors from ganeti import utils from ganeti import ssh @@ -42,40 +44,129 @@ from ganeti import objects from ganeti import ssconf -def _GetSshRunner(): - return ssh.SshRunner() +def _GetConfig(): + return ssconf.SimpleConfigReader() -def StartMaster(): - """Activate local node as master node. +def _GetSshRunner(cluster_name): + return ssh.SshRunner(cluster_name) + - There are two needed steps for this: - - run the master script - - register the cron script +def _CleanDirectory(path, exclude=[]): + """Removes all regular files in a directory. + + @param exclude: List of files to be excluded. + @type exclude: list """ - result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"]) + if not os.path.isdir(path): + return - if result.failed: - logger.Error("could not activate cluster interface with command %s," - " error: '%s'" % (result.cmd, result.output)) - return False + # Normalize excluded paths + exclude = [os.path.normpath(i) for i in exclude] - return True + for rel_name in utils.ListVisibleFiles(path): + full_name = os.path.normpath(os.path.join(path, rel_name)) + if full_name in exclude: + continue + if os.path.isfile(full_name) and not os.path.islink(full_name): + utils.RemoveFile(full_name) + + +def JobQueuePurge(): + """Removes job queue files and archived jobs + + """ + _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE]) + _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR) + + +def GetMasterInfo(): + """Returns master information. + This is an utility function to compute master information, either + for consumption here or from the node daemon. -def StopMaster(): + @rtype: tuple + @return: (master_netdev, master_ip, master_name) + + """ + try: + cfg = _GetConfig() + master_netdev = cfg.GetMasterNetdev() + master_ip = cfg.GetMasterIP() + master_node = cfg.GetMasterNode() + except errors.ConfigurationError, err: + logging.exception("Cluster configuration incomplete") + return (None, None) + return (master_netdev, master_ip, master_node) + + +def StartMaster(start_daemons): + """Activate local node as master node. + + The function will always try activate the IP address of the master + (if someone else has it, then it won't). Then, if the start_daemons + parameter is True, it will also start the master daemons + (ganet-masterd and ganeti-rapi). + + """ + ok = True + master_netdev, master_ip, _ = GetMasterInfo() + if not master_netdev: + return False + + if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT): + if utils.OwnIpAddress(master_ip): + # we already have the ip: + logging.debug("Already started") + else: + logging.error("Someone else has the master ip, not activating") + ok = False + else: + result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip, + "dev", master_netdev, "label", + "%s:0" % master_netdev]) + if result.failed: + logging.error("Can't activate master IP: %s", result.output) + ok = False + + result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, + "-s", master_ip, master_ip]) + # we'll ignore the exit code of arping + + # and now start the master and rapi daemons + if start_daemons: + for daemon in 'ganeti-masterd', 'ganeti-rapi': + result = utils.RunCmd([daemon]) + if result.failed: + logging.error("Can't start daemon %s: %s", daemon, result.output) + ok = False + return ok + + +def StopMaster(stop_daemons): """Deactivate this node as master. - This runs the master stop script. + The function will always try to deactivate the IP address of the + master. Then, if the stop_daemons parameter is True, it will also + stop the master daemons (ganet-masterd and ganeti-rapi). """ - result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"]) + master_netdev, master_ip, _ = GetMasterInfo() + if not master_netdev: + return False + result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip, + "dev", master_netdev]) if result.failed: - logger.Error("could not deactivate cluster interface with command %s," - " error: '%s'" % (result.cmd, result.output)) - return False + logging.error("Can't remove the master IP, error: %s", result.output) + # but otherwise ignore the failure + + if stop_daemons: + # stop/kill the rapi and the master daemon + for daemon in constants.RAPI_PID, constants.MASTERD_PID: + utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon))) return True @@ -100,7 +191,7 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub): priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS, mkdir=True) except errors.OpExecError, err: - logger.Error("Error while processing user ssh files: %s" % err) + logging.exception("Error while processing user ssh files") return False for name, content in [(priv_key, sshkey), (pub_key, sshpub)]: @@ -117,16 +208,13 @@ def LeaveCluster(): """Cleans up the current node and prepares it to be removed from the cluster. """ - if os.path.isdir(constants.DATA_DIR): - for rel_name in utils.ListVisibleFiles(constants.DATA_DIR): - full_name = os.path.join(constants.DATA_DIR, rel_name) - if os.path.isfile(full_name) and not os.path.islink(full_name): - utils.RemoveFile(full_name) + _CleanDirectory(constants.DATA_DIR) + JobQueuePurge() try: priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS) - except errors.OpExecError, err: - logger.Error("Error while processing ssh files: %s" % err) + except errors.OpExecError: + logging.exception("Error while processing ssh files") return f = open(pub_key, 'r') @@ -138,19 +226,25 @@ def LeaveCluster(): utils.RemoveFile(priv_key) utils.RemoveFile(pub_key) + # Return a reassuring string to the caller, and quit + raise errors.QuitGanetiException(False, 'Shutdown scheduled') -def GetNodeInfo(vgname): + +def GetNodeInfo(vgname, hypervisor_type): """Gives back a hash with different informations about the node. - Returns: - { 'vg_size' : xxx, 'vg_free' : xxx, 'memory_domain0': xxx, - 'memory_free' : xxx, 'memory_total' : xxx } - where - vg_size is the size of the configured volume group in MiB - vg_free is the free size of the volume group in MiB - memory_dom0 is the memory allocated for domain0 in MiB - memory_free is the currently available (free) ram in MiB - memory_total is the total number of ram in MiB + @type vgname: C{string} + @param vgname: the name of the volume group to ask for disk space information + @type hypervisor_type: C{str} + @param hypervisor_type: the name of the hypervisor to ask for + memory information + @rtype: C{dict} + @return: dictionary with the following keys: + - vg_size is the size of the configured volume group in MiB + - vg_free is the free size of the volume group in MiB + - memory_dom0 is the memory allocated for domain0 in MiB + - memory_free is the currently available (free) ram in MiB + - memory_total is the total number of ram in MiB """ outputarray = {} @@ -158,7 +252,7 @@ def GetNodeInfo(vgname): outputarray['vg_size'] = vginfo['vg_size'] outputarray['vg_free'] = vginfo['vg_free'] - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(hypervisor_type) hyp_info = hyper.GetNodeInfo() if hyp_info is not None: outputarray.update(hyp_info) @@ -172,38 +266,76 @@ def GetNodeInfo(vgname): return outputarray -def VerifyNode(what): +def VerifyNode(what, cluster_name): """Verify the status of the local node. - Args: - what - a dictionary of things to check: - 'filelist' : list of files for which to compute checksums - 'nodelist' : list of nodes we should check communication with - 'hypervisor': run the hypervisor-specific verify + Based on the input L{what} parameter, various checks are done on the + local node. + + If the I{filelist} key is present, this list of + files is checksummed and the file/checksum pairs are returned. + + If the I{nodelist} key is present, we check that we have + connectivity via ssh with the target nodes (and check the hostname + report). - Requested files on local node are checksummed and the result returned. + If the I{node-net-test} key is present, we check that we have + connectivity to the given nodes via both primary IP and, if + applicable, secondary IPs. - The nodelist is traversed, with the following checks being made - for each node: - - known_hosts key correct - - correct resolving of node name (target node returns its own hostname - by ssh-execution of 'hostname', result compared against name in list. + @type what: C{dict} + @param what: a dictionary of things to check: + - filelist: list of files for which to compute checksums + - nodelist: list of nodes we should check ssh communication with + - node-net-test: list of nodes we should check node daemon port + connectivity with + - hypervisor: list with hypervisors to run the verify for """ result = {} if 'hypervisor' in what: - result['hypervisor'] = hypervisor.GetHypervisor().Verify() + result['hypervisor'] = my_dict = {} + for hv_name in what['hypervisor']: + my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify() if 'filelist' in what: result['filelist'] = utils.FingerprintFiles(what['filelist']) if 'nodelist' in what: result['nodelist'] = {} + random.shuffle(what['nodelist']) for node in what['nodelist']: - success, message = _GetSshRunner().VerifyNodeHostname(node) + success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node) if not success: result['nodelist'][node] = message + if 'node-net-test' in what: + result['node-net-test'] = {} + my_name = utils.HostInfo().name + my_pip = my_sip = None + for name, pip, sip in what['node-net-test']: + if name == my_name: + my_pip = pip + my_sip = sip + break + if not my_pip: + result['node-net-test'][my_name] = ("Can't find my own" + " primary/secondary IP" + " in the node list") + else: + port = utils.GetNodeDaemonPort() + for name, pip, sip in what['node-net-test']: + fail = [] + if not utils.TcpPing(pip, port, source=my_pip): + fail.append("primary") + if sip != pip: + if not utils.TcpPing(sip, port, source=my_sip): + fail.append("secondary") + if fail: + result['node-net-test'][name] = ("failure using the %s" + " interface(s)" % + " and ".join(fail)) + return result @@ -222,15 +354,18 @@ def GetVolumeList(vg_name): "--separator=%s" % sep, "-olv_name,lv_size,lv_attr", vg_name]) if result.failed: - logger.Error("Failed to list logical volumes, lvs output: %s" % - result.output) + logging.error("Failed to list logical volumes, lvs output: %s", + result.output) return result.output + valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$") for line in result.stdout.splitlines(): - line = line.strip().rstrip(sep) - name, size, attr = line.split(sep) - if len(attr) != 6: - attr = '------' + line = line.strip() + match = valid_line_re.match(line) + if not match: + logging.error("Invalid line returned from lvs output: '%s'", line) + continue + name, size, attr = match.groups() inactive = attr[4] == '-' online = attr[5] == 'o' lvs[name] = (size, inactive, online) @@ -256,8 +391,8 @@ def NodeVolumes(): "--separator=|", "--options=lv_name,lv_size,devices,vg_name"]) if result.failed: - logger.Error("Failed to list logical volumes, lvs output: %s" % - result.output) + logging.error("Failed to list logical volumes, lvs output: %s", + result.output) return {} def parse_dev(dev): @@ -274,7 +409,8 @@ def NodeVolumes(): 'vg': line[3].strip(), } - return [map_line(line.split('|')) for line in result.stdout.splitlines()] + return [map_line(line.split('|')) for line in result.stdout.splitlines() + if line.count('|') >= 3] def BridgesExist(bridges_list): @@ -291,41 +427,49 @@ def BridgesExist(bridges_list): return True -def GetInstanceList(): +def GetInstanceList(hypervisor_list): """Provides a list of instances. - Returns: - A list of all running instances on the current node - - instance1.example.com - - instance2.example.com + @type hypervisor_list: list + @param hypervisor_list: the list of hypervisors to query information + + @rtype: list + @return: a list of all running instances on the current node + - instance1.example.com + - instance2.example.com """ - try: - names = hypervisor.GetHypervisor().ListInstances() - except errors.HypervisorError, err: - logger.Error("error enumerating instances: %s" % str(err)) - raise + results = [] + for hname in hypervisor_list: + try: + names = hypervisor.GetHypervisor(hname).ListInstances() + results.extend(names) + except errors.HypervisorError, err: + logging.exception("Error enumerating instances for hypevisor %s", hname) + # FIXME: should we somehow not propagate this to the master? + raise - return names + return results -def GetInstanceInfo(instance): +def GetInstanceInfo(instance, hname): """Gives back the informations about an instance as a dictionary. - Args: - instance: name of the instance (ex. instance1.example.com) + @type instance: string + @param instance: the instance name + @type hname: string + @param hname: the hypervisor type of the instance - Returns: - { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, } - where - memory: memory size of instance (int) - state: xen state of instance (string) - time: cpu time of instance (float) + @rtype: dict + @return: dictionary with the following keys: + - memory: memory size of instance (int) + - state: xen state of instance (string) + - time: cpu time of instance (float) """ output = {} - iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance) + iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance) if iinfo is not None: output['memory'] = iinfo[2] output['state'] = iinfo[4] @@ -334,128 +478,88 @@ def GetInstanceInfo(instance): return output -def GetAllInstancesInfo(): +def GetAllInstancesInfo(hypervisor_list): """Gather data about all instances. This is the equivalent of `GetInstanceInfo()`, except that it computes data for all instances at once, thus being faster if one needs data about more than one instance. - Returns: a dictionary of dictionaries, keys being the instance name, - and with values: - { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, } - where - memory: memory size of instance (int) - state: xen state of instance (string) - time: cpu time of instance (float) - vcpus: the number of cpus + @type hypervisor_list: list + @param hypervisor_list: list of hypervisors to query for instance data + + @rtype: dict of dicts + @return: dictionary of instance: data, with data having the following keys: + - memory: memory size of instance (int) + - state: xen state of instance (string) + - time: cpu time of instance (float) + - vcpuus: the number of vcpus """ output = {} - iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo() - if iinfo: - for name, inst_id, memory, vcpus, state, times in iinfo: - output[name] = { - 'memory': memory, - 'vcpus': vcpus, - 'state': state, - 'time': times, - } + for hname in hypervisor_list: + iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo() + if iinfo: + for name, inst_id, memory, vcpus, state, times in iinfo: + value = { + 'memory': memory, + 'vcpus': vcpus, + 'state': state, + 'time': times, + } + if name in output and output[name] != value: + raise errors.HypervisorError("Instance %s running duplicate" + " with different parameters" % name) + output[name] = value return output -def AddOSToInstance(instance, os_disk, swap_disk): +def AddOSToInstance(instance): """Add an OS to an instance. - Args: - instance: the instance object - os_disk: the instance-visible name of the os device - swap_disk: the instance-visible name of the swap device + @type instance: L{objects.Instance} + @param instance: Instance whose OS is to be installed """ inst_os = OSFromDisk(instance.os) create_script = inst_os.create_script - - os_device = instance.FindDisk(os_disk) - if os_device is None: - logger.Error("Can't find this device-visible name '%s'" % os_disk) - return False - - swap_device = instance.FindDisk(swap_disk) - if swap_device is None: - logger.Error("Can't find this device-visible name '%s'" % swap_disk) - return False - - real_os_dev = _RecursiveFindBD(os_device) - if real_os_dev is None: - raise errors.BlockDeviceError("Block device '%s' is not set up" % - str(os_device)) - real_os_dev.Open() - - real_swap_dev = _RecursiveFindBD(swap_device) - if real_swap_dev is None: - raise errors.BlockDeviceError("Block device '%s' is not set up" % - str(swap_device)) - real_swap_dev.Open() + create_env = OSEnvironment(instance) logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os, instance.name, int(time.time())) if not os.path.exists(constants.LOG_OS_DIR): os.mkdir(constants.LOG_OS_DIR, 0750) - command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s", - inst_os.path, create_script, instance.name, - real_os_dev.dev_path, real_swap_dev.dev_path, - logfile) + command = utils.BuildShellCmd("cd %s && %s &>%s", + inst_os.path, create_script, logfile) - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=create_env) if result.failed: - logger.Error("os create command '%s' returned error: %s, logfile: %s," - " output: %s" % - (command, result.fail_reason, logfile, result.output)) + logging.error("os create command '%s' returned error: %s, logfile: %s," + " output: %s", command, result.fail_reason, logfile, + result.output) return False return True -def RunRenameInstance(instance, old_name, os_disk, swap_disk): +def RunRenameInstance(instance, old_name): """Run the OS rename script for an instance. - Args: - instance: the instance object - old_name: the old name of the instance - os_disk: the instance-visible name of the os device - swap_disk: the instance-visible name of the swap device + @type instance: objects.Instance + @param instance: Instance whose OS is to be installed + @type old_name: string + @param old_name: previous instance name """ inst_os = OSFromDisk(instance.os) script = inst_os.rename_script - - os_device = instance.FindDisk(os_disk) - if os_device is None: - logger.Error("Can't find this device-visible name '%s'" % os_disk) - return False - - swap_device = instance.FindDisk(swap_disk) - if swap_device is None: - logger.Error("Can't find this device-visible name '%s'" % swap_disk) - return False - - real_os_dev = _RecursiveFindBD(os_device) - if real_os_dev is None: - raise errors.BlockDeviceError("Block device '%s' is not set up" % - str(os_device)) - real_os_dev.Open() - - real_swap_dev = _RecursiveFindBD(swap_device) - if real_swap_dev is None: - raise errors.BlockDeviceError("Block device '%s' is not set up" % - str(swap_device)) - real_swap_dev.Open() + rename_env = OSEnvironment(instance) + rename_env['OLD_INSTANCE_NAME'] = old_name logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os, old_name, @@ -463,17 +567,14 @@ def RunRenameInstance(instance, old_name, os_disk, swap_disk): if not os.path.exists(constants.LOG_OS_DIR): os.mkdir(constants.LOG_OS_DIR, 0750) - command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s", - inst_os.path, script, old_name, instance.name, - real_os_dev.dev_path, real_swap_dev.dev_path, - logfile) + command = utils.BuildShellCmd("cd %s && %s &>%s", + inst_os.path, script, logfile) - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=rename_env) if result.failed: - logger.Error("os create command '%s' returned error: %s" - " output: %s" % - (command, result.fail_reason, result.output)) + logging.error("os create command '%s' returned error: %s output: %s", + command, result.fail_reason, result.output) return False return True @@ -502,8 +603,7 @@ def _GetVGInfo(vg_name): "--nosuffix", "--units=m", "--separator=:", vg_name]) if retval.failed: - errmsg = "volume group %s not present" % vg_name - logger.Error(errmsg) + logging.error("volume group %s not present", vg_name) return retdic valarr = retval.stdout.strip().rstrip(':').split(':') if len(valarr) == 3: @@ -514,10 +614,10 @@ def _GetVGInfo(vg_name): "pv_count": int(valarr[2]), } except ValueError, err: - logger.Error("Fail to parse vgs output: %s" % str(err)) + logging.exception("Fail to parse vgs output") else: - logger.Error("vgs output has the wrong number of fields (expected" - " three): %s" % str(valarr)) + logging.error("vgs output has the wrong number of fields (expected" + " three): %s", str(valarr)) return retdic @@ -542,22 +642,24 @@ def _GatherBlockDevs(instance): def StartInstance(instance, extra_args): """Start an instance. - Args: - instance - name of instance to start. + @type instance: instance object + @param instance: the instance object + @rtype: boolean + @return: whether the startup was successful or not """ - running_instances = GetInstanceList() + running_instances = GetInstanceList([instance.hypervisor]) if instance.name in running_instances: return True block_devices = _GatherBlockDevs(instance) - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(instance.hypervisor) try: hyper.StartInstance(instance, block_devices, extra_args) except errors.HypervisorError, err: - logger.Error("Failed to start instance: %s" % err) + logging.exception("Failed to start instance") return False return True @@ -566,20 +668,23 @@ def StartInstance(instance, extra_args): def ShutdownInstance(instance): """Shut an instance down. - Args: - instance - name of instance to shutdown. + @type instance: instance object + @param instance: the instance object + @rtype: boolean + @return: whether the startup was successful or not """ - running_instances = GetInstanceList() + hv_name = instance.hypervisor + running_instances = GetInstanceList([hv_name]) if instance.name not in running_instances: return True - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(hv_name) try: hyper.StopInstance(instance) except errors.HypervisorError, err: - logger.Error("Failed to stop instance: %s" % err) + logging.error("Failed to stop instance") return False # test every 10secs for 2min @@ -587,22 +692,23 @@ def ShutdownInstance(instance): time.sleep(1) for dummy in range(11): - if instance.name not in GetInstanceList(): + if instance.name not in GetInstanceList([hv_name]): break time.sleep(10) else: # the shutdown did not succeed - logger.Error("shutdown of '%s' unsuccessful, using destroy" % instance) + logging.error("shutdown of '%s' unsuccessful, using destroy", instance) try: hyper.StopInstance(instance, force=True) except errors.HypervisorError, err: - logger.Error("Failed to stop instance: %s" % err) + logging.exception("Failed to stop instance") return False time.sleep(1) - if instance.name in GetInstanceList(): - logger.Error("could not shutdown instance '%s' even by destroy") + if instance.name in GetInstanceList([hv_name]): + logging.error("could not shutdown instance '%s' even by destroy", + instance.name) return False return True @@ -616,33 +722,59 @@ def RebootInstance(instance, reboot_type, extra_args): reboot_type - how to reboot [soft,hard,full] """ - running_instances = GetInstanceList() + running_instances = GetInstanceList([instance.hypervisor]) if instance.name not in running_instances: - logger.Error("Cannot reboot instance that is not running") + logging.error("Cannot reboot instance that is not running") return False - hyper = hypervisor.GetHypervisor() + hyper = hypervisor.GetHypervisor(instance.hypervisor) if reboot_type == constants.INSTANCE_REBOOT_SOFT: try: hyper.RebootInstance(instance) except errors.HypervisorError, err: - logger.Error("Failed to soft reboot instance: %s" % err) + logging.exception("Failed to soft reboot instance") return False elif reboot_type == constants.INSTANCE_REBOOT_HARD: try: ShutdownInstance(instance) StartInstance(instance, extra_args) except errors.HypervisorError, err: - logger.Error("Failed to hard reboot instance: %s" % err) + logging.exception("Failed to hard reboot instance") return False else: raise errors.ParameterError("reboot_type invalid") - return True +def MigrateInstance(instance, target, live): + """Migrates an instance to another node. + + @type instance: C{objects.Instance} + @param instance: the instance definition + @type target: string + @param target: the target node name + @type live: boolean + @param live: whether the migration should be done live or not (the + interpretation of this parameter is left to the hypervisor) + @rtype: tuple + @return: a tuple of (success, msg) where: + - succes is a boolean denoting the success/failure of the operation + - msg is a string with details in case of failure + + """ + hyper = hypervisor.GetHypervisor(instance.hypervisor_name) + + try: + hyper.MigrateInstance(instance.name, target, live) + except errors.HypervisorError, err: + msg = "Failed to migrate instance: %s" % str(err) + logging.error(msg) + return (False, msg) + return (True, "Migration successfull") + + def CreateBlockDevice(disk, size, owner, on_primary, info): """Creates a block device for an instance. @@ -671,7 +803,7 @@ def CreateBlockDevice(disk, size, owner, on_primary, info): try: device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist) if device is not None: - logger.Info("removing existing device %s" % disk) + logging.info("removing existing device %s", disk) device.Remove() except errors.BlockDeviceError, err: pass @@ -684,7 +816,7 @@ def CreateBlockDevice(disk, size, owner, on_primary, info): if on_primary or disk.AssembleOnSecondary(): if not device.Assemble(): errorstring = "Can't assemble device after creation" - logger.Error(errorstring) + logging.error(errorstring) raise errors.BlockDeviceError("%s, very unusual event - check the node" " daemon logs" % errorstring) device.SetSyncSpeed(constants.SYNC_SPEED) @@ -711,7 +843,7 @@ def RemoveBlockDevice(disk): rdev = _RecursiveFindBD(disk, allow_partial=True) except errors.BlockDeviceError, err: # probably can't attach - logger.Info("Can't attach to device %s in remove" % disk) + logging.info("Can't attach to device %s in remove", disk) rdev = None if rdev is not None: r_path = rdev.dev_path @@ -757,7 +889,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary): if children.count(None) >= mcn: raise cdev = None - logger.Debug("Error in child activation: %s" % str(err)) + logging.debug("Error in child activation: %s", str(err)) children.append(cdev) if as_primary or disk.AssembleOnSecondary(): @@ -821,12 +953,12 @@ def MirrorAddChildren(parent_cdev, new_cdevs): """ parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True) if parent_bdev is None: - logger.Error("Can't find parent device") + logging.error("Can't find parent device") return False new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs] if new_bdevs.count(None) > 0: - logger.Error("Can't find new device(s) to add: %s:%s" % - (new_bdevs, new_cdevs)) + logging.error("Can't find new device(s) to add: %s:%s", + new_bdevs, new_cdevs) return False parent_bdev.AddChildren(new_bdevs) return True @@ -838,7 +970,7 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs): """ parent_bdev = _RecursiveFindBD(parent_cdev) if parent_bdev is None: - logger.Error("Can't find parent in remove children: %s" % parent_cdev) + logging.error("Can't find parent in remove children: %s", parent_cdev) return False devs = [] for disk in new_cdevs: @@ -846,8 +978,8 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs): if rpath is None: bd = _RecursiveFindBD(disk) if bd is None: - logger.Error("Can't find dynamic device %s while removing children" % - disk) + logging.error("Can't find dynamic device %s while removing children", + disk) return False else: devs.append(bd.dev_path) @@ -927,19 +1059,20 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): """ if not os.path.isabs(file_name): - logger.Error("Filename passed to UploadFile is not absolute: '%s'" % - file_name) + logging.error("Filename passed to UploadFile is not absolute: '%s'", + file_name) return False allowed_files = [ constants.CLUSTER_CONF_FILE, constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE, + constants.VNC_PASSWORD_FILE, ] - allowed_files.extend(ssconf.SimpleStore().GetFileList()) + if file_name not in allowed_files: - logger.Error("Filename passed to UploadFile not in allowed" - " upload targets: '%s'" % file_name) + logging.error("Filename passed to UploadFile not in allowed" + " upload targets: '%s'", file_name) return False utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid, @@ -962,28 +1095,6 @@ def _ErrnoOrStr(err): return detail -def _OSSearch(name, search_path=None): - """Search for OSes with the given name in the search_path. - - Args: - name: The name of the OS to look for - search_path: List of dirs to search (defaults to constants.OS_SEARCH_PATH) - - Returns: - The base_dir the OS resides in - - """ - if search_path is None: - search_path = constants.OS_SEARCH_PATH - - for dir_name in search_path: - t_os_dir = os.path.sep.join([dir_name, name]) - if os.path.isdir(t_os_dir): - return dir_name - - return None - - def _OSOndiskVersion(name, os_dir): """Compute and return the API version of a given OS. @@ -1009,21 +1120,21 @@ def _OSOndiskVersion(name, os_dir): try: f = open(api_file) try: - api_version = f.read(256) + api_versions = f.readlines() finally: f.close() except EnvironmentError, err: raise errors.InvalidOS(name, os_dir, "error while reading the" " API version (%s)" % _ErrnoOrStr(err)) - api_version = api_version.strip() + api_versions = [version.strip() for version in api_versions] try: - api_version = int(api_version) + api_versions = [int(version) for version in api_versions] except (TypeError, ValueError), err: raise errors.InvalidOS(name, os_dir, "API version is not integer (%s)" % str(err)) - return api_version + return api_versions def DiagnoseOS(top_dirs=None): @@ -1045,8 +1156,7 @@ def DiagnoseOS(top_dirs=None): try: f_names = utils.ListVisibleFiles(dir_name) except EnvironmentError, err: - logger.Error("Can't list the OS directory %s: %s" % - (dir_name, str(err))) + logging.exception("Can't list the OS directory %s", dir_name) break for name in f_names: try: @@ -1066,28 +1176,28 @@ def OSFromDisk(name, base_dir=None): `errors.InvalidOS` exception, detailing why this is not a valid OS. - Args: - os_dir: Directory containing the OS scripts. Defaults to a search - in all the OS_SEARCH_PATH directories. + @type base_dir: string + @keyword base_dir: Base directory containing OS installations. + Defaults to a search in all the OS_SEARCH_PATH dirs. """ if base_dir is None: - base_dir = _OSSearch(name) - - if base_dir is None: - raise errors.InvalidOS(name, None, "OS dir not found in search path") + os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir) + if os_dir is None: + raise errors.InvalidOS(name, None, "OS dir not found in search path") + else: + os_dir = os.path.sep.join([base_dir, name]) - os_dir = os.path.sep.join([base_dir, name]) - api_version = _OSOndiskVersion(name, os_dir) + api_versions = _OSOndiskVersion(name, os_dir) - if api_version != constants.OS_API_VERSION: + if constants.OS_API_VERSION not in api_versions: raise errors.InvalidOS(name, os_dir, "API version mismatch" " (found %s want %s)" - % (api_version, constants.OS_API_VERSION)) + % (api_versions, constants.OS_API_VERSION)) # OS Scripts dictionary, we will populate it with the actual script names - os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''} + os_scripts = dict.fromkeys(constants.OS_SCRIPTS) for script in os_scripts: os_scripts[script] = os.path.sep.join([os_dir, script]) @@ -1108,11 +1218,82 @@ def OSFromDisk(name, base_dir=None): return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS, - create_script=os_scripts['create'], - export_script=os_scripts['export'], - import_script=os_scripts['import'], - rename_script=os_scripts['rename'], - api_version=api_version) + create_script=os_scripts[constants.OS_SCRIPT_CREATE], + export_script=os_scripts[constants.OS_SCRIPT_EXPORT], + import_script=os_scripts[constants.OS_SCRIPT_IMPORT], + rename_script=os_scripts[constants.OS_SCRIPT_RENAME], + api_versions=api_versions) + +def OSEnvironment(instance, debug=0): + """Calculate the environment for an os script. + + @type instance: instance object + @param instance: target instance for the os script run + @type debug: integer + @param debug: debug level (0 or 1, for os api 10) + @rtype: dict + @return: dict of environment variables + + """ + result = {} + result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION + result['INSTANCE_NAME'] = instance.name + result['HYPERVISOR'] = instance.hypervisor + result['DISK_COUNT'] = '%d' % len(instance.disks) + result['NIC_COUNT'] = '%d' % len(instance.nics) + result['DEBUG_LEVEL'] = '%d' % debug + for idx, disk in enumerate(instance.disks): + real_disk = _RecursiveFindBD(disk) + if real_disk is None: + raise errors.BlockDeviceError("Block device '%s' is not set up" % + str(disk)) + real_disk.Open() + result['DISK_%d_PATH' % idx] = real_disk.dev_path + # FIXME: When disks will have read-only mode, populate this + result['DISK_%d_ACCESS' % idx] = 'W' + if constants.HV_DISK_TYPE in instance.hvparams: + result['DISK_%d_FRONTEND_TYPE' % idx] = \ + instance.hvparams[constants.HV_DISK_TYPE] + if disk.dev_type in constants.LDS_BLOCK: + result['DISK_%d_BACKEND_TYPE' % idx] = 'block' + elif disk.dev_type == constants.LD_FILE: + result['DISK_%d_BACKEND_TYPE' % idx] = \ + 'file:%s' % disk.physical_id[0] + for idx, nic in enumerate(instance.nics): + result['NIC_%d_MAC' % idx] = nic.mac + if nic.ip: + result['NIC_%d_IP' % idx] = nic.ip + result['NIC_%d_BRIDGE' % idx] = nic.bridge + if constants.HV_NIC_TYPE in instance.hvparams: + result['NIC_%d_FRONTEND_TYPE' % idx] = \ + instance.hvparams[constants.HV_NIC_TYPE] + + return result + +def GrowBlockDevice(disk, amount): + """Grow a stack of block devices. + + This function is called recursively, with the childrens being the + first one resize. + + Args: + disk: the disk to be grown + + Returns: a tuple of (status, result), with: + status: the result (true/false) of the operation + result: the error message if the operation failed, otherwise not used + + """ + r_dev = _RecursiveFindBD(disk) + if r_dev is None: + return False, "Cannot find block device %s" % (disk,) + + try: + r_dev.Grow(amount) + except errors.BlockDeviceError, err: + return False, str(err) + + return True, None def SnapshotBlockDevice(disk): @@ -1151,7 +1332,7 @@ def SnapshotBlockDevice(disk): (disk.unique_id, disk.dev_type)) -def ExportSnapshot(disk, dest_node, instance): +def ExportSnapshot(disk, dest_node, instance, cluster_name): """Export a block device snapshot to a remote node. Args: @@ -1163,6 +1344,10 @@ def ExportSnapshot(disk, dest_node, instance): True if successful, False otherwise. """ + # TODO(ultrotter): Import/Export still to be converted to OS API 10 + logging.error("Import/Export still to be converted to OS API 10") + return False + inst_os = OSFromDisk(instance.os) export_script = inst_os.export_script @@ -1192,8 +1377,9 @@ def ExportSnapshot(disk, dest_node, instance): destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s", destdir, destdir, destfile) - remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS, - destcmd) + remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node, + constants.GANETI_RUNAS, + destcmd) # all commands have been checked, so we're safe to combine them command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)]) @@ -1201,9 +1387,8 @@ def ExportSnapshot(disk, dest_node, instance): result = utils.RunCmd(command) if result.failed: - logger.Error("os snapshot export command '%s' returned error: %s" - " output: %s" % - (command, result.fail_reason, result.output)) + logging.error("os snapshot export command '%s' returned error: %s" + " output: %s", command, result.fail_reason, result.output) return False return True @@ -1234,17 +1419,23 @@ def FinalizeExport(instance, snap_disks): config.add_section(constants.INISECT_INS) config.set(constants.INISECT_INS, 'name', instance.name) - config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory) - config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus) + config.set(constants.INISECT_INS, 'memory', '%d' % + instance.beparams[constants.BE_MEMORY]) + config.set(constants.INISECT_INS, 'vcpus', '%d' % + instance.beparams[constants.BE_VCPUS]) config.set(constants.INISECT_INS, 'disk_template', instance.disk_template) + + nic_count = 0 for nic_count, nic in enumerate(instance.nics): config.set(constants.INISECT_INS, 'nic%d_mac' % nic_count, '%s' % nic.mac) config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip) - config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, '%s' % nic.bridge) + config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, + '%s' % nic.bridge) # TODO: redundant: on load can read nics until it doesn't exist config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count) + disk_count = 0 for disk_count, disk in enumerate(snap_disks): config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count, ('%s' % disk.iv_name)) @@ -1289,7 +1480,8 @@ def ExportInfo(dest): return config -def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): +def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image, + cluster_name): """Import an os image into an instance. Args: @@ -1303,17 +1495,21 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): False in case of error, True otherwise. """ + # TODO(ultrotter): Import/Export still to be converted to OS API 10 + logging.error("Import/Export still to be converted to OS API 10") + return False + inst_os = OSFromDisk(instance.os) import_script = inst_os.import_script os_device = instance.FindDisk(os_disk) if os_device is None: - logger.Error("Can't find this device-visible name '%s'" % os_disk) + logging.error("Can't find this device-visible name '%s'", os_disk) return False swap_device = instance.FindDisk(swap_disk) if swap_device is None: - logger.Error("Can't find this device-visible name '%s'" % swap_disk) + logging.error("Can't find this device-visible name '%s'", swap_disk) return False real_os_dev = _RecursiveFindBD(os_device) @@ -1334,8 +1530,9 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): os.mkdir(constants.LOG_OS_DIR, 0750) destcmd = utils.BuildShellCmd('cat %s', src_image) - remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS, - destcmd) + remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node, + constants.GANETI_RUNAS, + destcmd) comprcmd = "gunzip" impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)", @@ -1344,13 +1541,13 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): logfile) command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd]) + env = {'HYPERVISOR': instance.hypervisor} - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=env) if result.failed: - logger.Error("os import command '%s' returned error: %s" - " output: %s" % - (command, result.fail_reason, result.output)) + logging.error("os import command '%s' returned error: %s" + " output: %s", command, result.fail_reason, result.output) return False return True @@ -1411,8 +1608,7 @@ def RenameBlockDevices(devlist): # cache? for now, we only lose lvm data when we rename, which # is less critical than DRBD or MD except errors.BlockDeviceError, err: - logger.Error("Can't rename device '%s' to '%s': %s" % - (dev, unique_id, err)) + logging.exception("Can't rename device '%s' to '%s'", dev, unique_id) result = False return result @@ -1426,18 +1622,19 @@ def _TransformFileStorageDir(file_storage_dir): Args: file_storage_dir: string with path - + Returns: normalized file_storage_dir (string) if valid, None otherwise """ + cfg = _GetConfig() file_storage_dir = os.path.normpath(file_storage_dir) - base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir() + base_file_storage_dir = cfg.GetFileStorageDir() if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) == base_file_storage_dir): - logger.Error("file storage directory '%s' is not under base file" - " storage directory '%s'" % - (file_storage_dir, base_file_storage_dir)) + logging.error("file storage directory '%s' is not under base file" + " storage directory '%s'", + file_storage_dir, base_file_storage_dir) return None return file_storage_dir @@ -1460,14 +1657,14 @@ def CreateFileStorageDir(file_storage_dir): else: if os.path.exists(file_storage_dir): if not os.path.isdir(file_storage_dir): - logger.Error("'%s' is not a directory" % file_storage_dir) + logging.error("'%s' is not a directory", file_storage_dir) result = False, else: try: os.makedirs(file_storage_dir, 0750) except OSError, err: - logger.Error("Cannot create file storage directory '%s': %s" % - (file_storage_dir, err)) + logging.error("Cannot create file storage directory '%s': %s", + file_storage_dir, err) result = False, return result @@ -1492,14 +1689,14 @@ def RemoveFileStorageDir(file_storage_dir): else: if os.path.exists(file_storage_dir): if not os.path.isdir(file_storage_dir): - logger.Error("'%s' is not a directory" % file_storage_dir) + logging.error("'%s' is not a directory", file_storage_dir) result = False, # deletes dir only if empty, otherwise we want to return False try: os.rmdir(file_storage_dir) except OSError, err: - logger.Error("Cannot remove file storage directory '%s': %s" % - (file_storage_dir, err)) + logging.exception("Cannot remove file storage directory '%s'", + file_storage_dir) result = False, return result @@ -1527,20 +1724,120 @@ def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): try: os.rename(old_file_storage_dir, new_file_storage_dir) except OSError, err: - logger.Error("Cannot rename '%s' to '%s': %s" - % (old_file_storage_dir, new_file_storage_dir, err)) + logging.exception("Cannot rename '%s' to '%s'", + old_file_storage_dir, new_file_storage_dir) result = False, else: - logger.Error("'%s' is not a directory" % old_file_storage_dir) + logging.error("'%s' is not a directory", old_file_storage_dir) result = False, else: if os.path.exists(old_file_storage_dir): - logger.Error("Cannot rename '%s' to '%s'. Both locations exist." % - old_file_storage_dir, new_file_storage_dir) + logging.error("Cannot rename '%s' to '%s'. Both locations exist.", + old_file_storage_dir, new_file_storage_dir) result = False, return result +def _IsJobQueueFile(file_name): + """Checks whether the given filename is in the queue directory. + + """ + queue_dir = os.path.normpath(constants.QUEUE_DIR) + result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) + + if not result: + logging.error("'%s' is not a file in the queue directory", + file_name) + + return result + + +def JobQueueUpdate(file_name, content): + """Updates a file in the queue directory. + + """ + if not _IsJobQueueFile(file_name): + return False + + # Write and replace the file atomically + utils.WriteFile(file_name, data=content) + + return True + + +def JobQueueRename(old, new): + """Renames a job queue file. + + """ + if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)): + return False + + os.rename(old, new) + + return True + + +def JobQueueSetDrainFlag(drain_flag): + """Set the drain flag for the queue. + + This will set or unset the queue drain flag. + + @type drain_flag: bool + @param drain_flag: if True, will set the drain flag, otherwise reset it. + + """ + if drain_flag: + utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True) + else: + utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE) + + return True + + +def CloseBlockDevices(disks): + """Closes the given block devices. + + This means they will be switched to secondary mode (in case of DRBD). + + """ + bdevs = [] + for cf in disks: + rd = _RecursiveFindBD(cf) + if rd is None: + return (False, "Can't find device %s" % cf) + bdevs.append(rd) + + msg = [] + for rd in bdevs: + try: + rd.Close() + except errors.BlockDeviceError, err: + msg.append(str(err)) + if msg: + return (False, "Can't make devices secondary: %s" % ",".join(msg)) + else: + return (True, "All devices secondary") + + +def ValidateHVParams(hvname, hvparams): + """Validates the given hypervisor parameters. + + @type hvname: string + @param hvname: the hypervisor name + @type hvparams: dict + @param hvparams: the hypervisor parameters to be validated + @rtype: tuple (bool, str) + @return: tuple of (success, message) + + """ + try: + hv_type = hypervisor.GetHypervisor(hvname) + hv_type.ValidateParameters(hvparams) + return (True, "Validation passed") + except errors.HypervisorError, err: + return (False, str(err)) + + class HooksRunner(object): """Hook runner. @@ -1556,9 +1853,6 @@ class HooksRunner(object): Args: - hooks_base_dir: if not None, this overrides the constants.HOOKS_BASE_DIR (useful for unittests) - - logs_base_dir: if not None, this overrides the - constants.LOG_HOOKS_DIR (useful for unittests) - - logging: enable or disable logging of script output """ if hooks_base_dir is None: @@ -1570,7 +1864,6 @@ class HooksRunner(object): """Exec one hook script. Args: - - phase: the phase - script: the full path to the script - env: the environment with which to exec the script @@ -1605,7 +1898,7 @@ class HooksRunner(object): fd.close() except EnvironmentError, err: # just log the error - #logger.Error("While closing fd %s: %s" % (fd, err)) + #logging.exception("Error while closing fd %s", fd) pass return result == 0, output @@ -1652,6 +1945,42 @@ class HooksRunner(object): return rr +class IAllocatorRunner(object): + """IAllocator runner. + + This class is instantiated on the node side (ganeti-noded) and not on + the master side. + + """ + def Run(self, name, idata): + """Run an iallocator script. + + Return value: tuple of: + - run status (one of the IARUN_ constants) + - stdout + - stderr + - fail reason (as from utils.RunResult) + + """ + alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH, + os.path.isfile) + if alloc_script is None: + return (constants.IARUN_NOTFOUND, None, None, None) + + fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.") + try: + os.write(fd, idata) + os.close(fd) + result = utils.RunCmd([alloc_script, fin_name]) + if result.failed: + return (constants.IARUN_FAILURE, result.stdout, result.stderr, + result.fail_reason) + finally: + os.unlink(fin_name) + + return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None) + + class DevCacheManager(object): """Simple class for managing a cache of block device information. @@ -1679,7 +2008,7 @@ class DevCacheManager(object): """ if dev_path is None: - logger.Error("DevCacheManager.UpdateCache got a None dev_path") + logging.error("DevCacheManager.UpdateCache got a None dev_path") return fpath = cls._ConvertPath(dev_path) if on_primary: @@ -1692,8 +2021,7 @@ class DevCacheManager(object): try: utils.WriteFile(fpath, data=fdata) except EnvironmentError, err: - logger.Error("Can't update bdev cache for %s, error %s" % - (dev_path, str(err))) + logging.exception("Can't update bdev cache for %s", dev_path) @classmethod def RemoveCache(cls, dev_path): @@ -1701,11 +2029,10 @@ class DevCacheManager(object): """ if dev_path is None: - logger.Error("DevCacheManager.RemoveCache got a None dev_path") + logging.error("DevCacheManager.RemoveCache got a None dev_path") return fpath = cls._ConvertPath(dev_path) try: utils.RemoveFile(fpath) except EnvironmentError, err: - logger.Error("Can't update bdev cache for %s, error %s" % - (dev_path, str(err))) + logging.exception("Can't update bdev cache for %s", dev_path)