X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/8fa42c7ca2a8786524d34c2cc52f49fa2cdfec7f..bd1e4562b3951edb0815823e2505121b8bcaae2b:/lib/backend.py diff --git a/lib/backend.py b/lib/backend.py index bb5bfe0..ca86245 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -26,13 +26,14 @@ import os import os.path import shutil import time -import tempfile import stat import errno import re import subprocess +import random +import logging +import tempfile -from ganeti import logger from ganeti import errors from ganeti import utils from ganeti import ssh @@ -43,38 +44,131 @@ from ganeti import objects from ganeti import ssconf -def StartMaster(): - """Activate local node as master node. +def _GetSshRunner(): + return ssh.SshRunner() + - There are two needed steps for this: - - run the master script - - register the cron script +def _CleanDirectory(path, exclude=[]): + """Removes all regular files in a directory. + + @param exclude: List of files to be excluded. + @type exclude: list """ - result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"]) + if not os.path.isdir(path): + return - if result.failed: - logger.Error("could not activate cluster interface with command %s," - " error: '%s'" % (result.cmd, result.output)) - return False + # Normalize excluded paths + exclude = [os.path.normpath(i) for i in exclude] - return True + for rel_name in utils.ListVisibleFiles(path): + full_name = os.path.normpath(os.path.join(path, rel_name)) + if full_name in exclude: + continue + if os.path.isfile(full_name) and not os.path.islink(full_name): + utils.RemoveFile(full_name) + + +def _JobQueuePurge(keep_lock): + """Removes job queue files and archived jobs + + """ + if keep_lock: + exclude = [constants.JOB_QUEUE_LOCK_FILE] + else: + exclude = [] + + _CleanDirectory(constants.QUEUE_DIR, exclude=exclude) + _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR) + + +def GetMasterInfo(): + """Returns master information. + + This is an utility function to compute master information, either + for consumption here or from the node daemon. + + @rtype: tuple + @return: (master_netdev, master_ip, master_name) + + """ + try: + ss = ssconf.SimpleStore() + master_netdev = ss.GetMasterNetdev() + master_ip = ss.GetMasterIP() + master_node = ss.GetMasterNode() + except errors.ConfigurationError, err: + logging.exception("Cluster configuration incomplete") + return (None, None) + return (master_netdev, master_ip, master_node) + + +def StartMaster(start_daemons): + """Activate local node as master node. + + The function will always try activate the IP address of the master + (if someone else has it, then it won't). Then, if the start_daemons + parameter is True, it will also start the master daemons + (ganet-masterd and ganeti-rapi). + """ + ok = True + master_netdev, master_ip, _ = GetMasterInfo() + if not master_netdev: + return False -def StopMaster(): + if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT): + if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT, + source=constants.LOCALHOST_IP_ADDRESS): + # we already have the ip: + logging.debug("Already started") + else: + logging.error("Someone else has the master ip, not activating") + ok = False + else: + result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip, + "dev", master_netdev, "label", + "%s:0" % master_netdev]) + if result.failed: + logging.error("Can't activate master IP: %s", result.output) + ok = False + + result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, + "-s", master_ip, master_ip]) + # we'll ignore the exit code of arping + + # and now start the master and rapi daemons + if start_daemons: + for daemon in 'ganeti-masterd', 'ganeti-rapi': + result = utils.RunCmd([daemon]) + if result.failed: + logging.error("Can't start daemon %s: %s", daemon, result.output) + ok = False + return ok + + +def StopMaster(stop_daemons): """Deactivate this node as master. - This does two things: - - run the master stop script - - remove link to master cron script. + The function will always try to deactivate the IP address of the + master. Then, if the stop_daemons parameter is True, it will also + stop the master daemons (ganet-masterd and ganeti-rapi). """ - result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"]) + master_netdev, master_ip, _ = GetMasterInfo() + if not master_netdev: + return False + result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip, + "dev", master_netdev]) if result.failed: - logger.Error("could not deactivate cluster interface with command %s," - " error: '%s'" % (result.cmd, result.output)) - return False + logging.error("Can't remove the master IP, error: %s", result.output) + # but otherwise ignore the failure + + if stop_daemons: + # stop/kill the rapi and the master daemon + for daemon in constants.RAPI_PID, constants.MASTERD_PID: + utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon))) return True @@ -99,7 +193,7 @@ def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub): priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS, mkdir=True) except errors.OpExecError, err: - logger.Error("Error while processing user ssh files: %s" % err) + logging.exception("Error while processing user ssh files") return False for name, content in [(priv_key, sshkey), (pub_key, sshpub)]: @@ -116,17 +210,15 @@ def LeaveCluster(): """Cleans up the current node and prepares it to be removed from the cluster. """ - if os.path.isdir(constants.DATA_DIR): - for rel_name in utils.ListVisibleFiles(constants.DATA_DIR): - full_name = os.path.join(constants.DATA_DIR, rel_name) - if os.path.isfile(full_name) and not os.path.islink(full_name): - utils.RemoveFile(full_name) + _CleanDirectory(constants.DATA_DIR) + # The lock can be removed because we're going to quit anyway. + _JobQueuePurge(keep_lock=False) try: priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS) - except errors.OpExecError, err: - logger.Error("Error while processing ssh files: %s" % err) + except errors.OpExecError: + logging.exception("Error while processing ssh files") return f = open(pub_key, 'r') @@ -138,6 +230,9 @@ def LeaveCluster(): utils.RemoveFile(priv_key) utils.RemoveFile(pub_key) + # Return a reassuring string to the caller, and quit + raise errors.QuitGanetiException(False, 'Shutdown scheduled') + def GetNodeInfo(vgname): """Gives back a hash with different informations about the node. @@ -200,10 +295,38 @@ def VerifyNode(what): if 'nodelist' in what: result['nodelist'] = {} + random.shuffle(what['nodelist']) for node in what['nodelist']: - success, message = ssh.VerifyNodeHostname(node) + success, message = _GetSshRunner().VerifyNodeHostname(node) if not success: result['nodelist'][node] = message + if 'node-net-test' in what: + result['node-net-test'] = {} + my_name = utils.HostInfo().name + my_pip = my_sip = None + for name, pip, sip in what['node-net-test']: + if name == my_name: + my_pip = pip + my_sip = sip + break + if not my_pip: + result['node-net-test'][my_name] = ("Can't find my own" + " primary/secondary IP" + " in the node list") + else: + port = ssconf.SimpleStore().GetNodeDaemonPort() + for name, pip, sip in what['node-net-test']: + fail = [] + if not utils.TcpPing(pip, port, source=my_pip): + fail.append("primary") + if sip != pip: + if not utils.TcpPing(sip, port, source=my_sip): + fail.append("secondary") + if fail: + result['node-net-test'][name] = ("failure using the %s" + " interface(s)" % + " and ".join(fail)) + return result @@ -211,19 +334,34 @@ def GetVolumeList(vg_name): """Compute list of logical volumes and their size. Returns: - dictionary of all partions (key) with their size: - test1: 20.06MiB + dictionary of all partions (key) with their size (in MiB), inactive + and online status: + {'test1': ('20.06', True, True)} """ - result = utils.RunCmd(["lvs", "--noheadings", "--units=m", - "-oname,size", vg_name]) + lvs = {} + sep = '|' + result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix", + "--separator=%s" % sep, + "-olv_name,lv_size,lv_attr", vg_name]) if result.failed: - logger.Error("Failed to list logical volumes, lvs output: %s" % - result.output) - return {} + logging.error("Failed to list logical volumes, lvs output: %s", + result.output) + return result.output + + valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$") + for line in result.stdout.splitlines(): + line = line.strip() + match = valid_line_re.match(line) + if not match: + logging.error("Invalid line returned from lvs output: '%s'", line) + continue + name, size, attr = match.groups() + inactive = attr[4] == '-' + online = attr[5] == 'o' + lvs[name] = (size, inactive, online) - lvlist = [line.split() for line in result.output.splitlines()] - return dict(lvlist) + return lvs def ListVolumeGroups(): @@ -244,8 +382,8 @@ def NodeVolumes(): "--separator=|", "--options=lv_name,lv_size,devices,vg_name"]) if result.failed: - logger.Error("Failed to list logical volumes, lvs output: %s" % - result.output) + logging.error("Failed to list logical volumes, lvs output: %s", + result.output) return {} def parse_dev(dev): @@ -262,7 +400,8 @@ def NodeVolumes(): 'vg': line[3].strip(), } - return [map_line(line.split('|')) for line in result.output.splitlines()] + return [map_line(line.split('|')) for line in result.stdout.splitlines() + if line.count('|') >= 3] def BridgesExist(bridges_list): @@ -291,7 +430,7 @@ def GetInstanceList(): try: names = hypervisor.GetHypervisor().ListInstances() except errors.HypervisorError, err: - logger.Error("error enumerating instances: %s" % str(err)) + logging.exception("Error enumerating instances") raise return names @@ -369,12 +508,12 @@ def AddOSToInstance(instance, os_disk, swap_disk): os_device = instance.FindDisk(os_disk) if os_device is None: - logger.Error("Can't find this device-visible name '%s'" % os_disk) + logging.error("Can't find this device-visible name '%s'", os_disk) return False swap_device = instance.FindDisk(swap_disk) if swap_device is None: - logger.Error("Can't find this device-visible name '%s'" % swap_disk) + logging.error("Can't find this device-visible name '%s'", swap_disk) return False real_os_dev = _RecursiveFindBD(os_device) @@ -398,13 +537,13 @@ def AddOSToInstance(instance, os_disk, swap_disk): inst_os.path, create_script, instance.name, real_os_dev.dev_path, real_swap_dev.dev_path, logfile) + env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()} - result = utils.RunCmd(command) - + result = utils.RunCmd(command, env=env) if result.failed: - logger.Error("os create command '%s' returned error: %s" - " output: %s" % - (command, result.fail_reason, result.output)) + logging.error("os create command '%s' returned error: %s, logfile: %s," + " output: %s", command, result.fail_reason, logfile, + result.output) return False return True @@ -426,12 +565,12 @@ def RunRenameInstance(instance, old_name, os_disk, swap_disk): os_device = instance.FindDisk(os_disk) if os_device is None: - logger.Error("Can't find this device-visible name '%s'" % os_disk) + logging.error("Can't find this device-visible name '%s'", os_disk) return False swap_device = instance.FindDisk(swap_disk) if swap_device is None: - logger.Error("Can't find this device-visible name '%s'" % swap_disk) + logging.error("Can't find this device-visible name '%s'", swap_disk) return False real_os_dev = _RecursiveFindBD(os_device) @@ -460,9 +599,8 @@ def RunRenameInstance(instance, old_name, os_disk, swap_disk): result = utils.RunCmd(command) if result.failed: - logger.Error("os create command '%s' returned error: %s" - " output: %s" % - (command, result.fail_reason, result.output)) + logging.error("os create command '%s' returned error: %s output: %s", + command, result.fail_reason, result.output) return False return True @@ -481,20 +619,31 @@ def _GetVGInfo(vg_name): vg_free is the free size of the volume group in MiB pv_count are the number of physical disks in that vg + If an error occurs during gathering of data, we return the same dict + with keys all set to None. + """ + retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"]) + retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings", "--nosuffix", "--units=m", "--separator=:", vg_name]) if retval.failed: - errmsg = "volume group %s not present" % vg_name - logger.Error(errmsg) - raise errors.LVMError(errmsg) - valarr = retval.stdout.strip().split(':') - retdic = { - "vg_size": int(round(float(valarr[0]), 0)), - "vg_free": int(round(float(valarr[1]), 0)), - "pv_count": int(valarr[2]), - } + logging.error("volume group %s not present", vg_name) + return retdic + valarr = retval.stdout.strip().rstrip(':').split(':') + if len(valarr) == 3: + try: + retdic = { + "vg_size": int(round(float(valarr[0]), 0)), + "vg_free": int(round(float(valarr[1]), 0)), + "pv_count": int(valarr[2]), + } + except ValueError, err: + logging.exception("Fail to parse vgs output") + else: + logging.error("vgs output has the wrong number of fields (expected" + " three): %s", str(valarr)) return retdic @@ -534,7 +683,7 @@ def StartInstance(instance, extra_args): try: hyper.StartInstance(instance, block_devices, extra_args) except errors.HypervisorError, err: - logger.Error("Failed to start instance: %s" % err) + logging.exception("Failed to start instance") return False return True @@ -556,7 +705,7 @@ def ShutdownInstance(instance): try: hyper.StopInstance(instance) except errors.HypervisorError, err: - logger.Error("Failed to stop instance: %s" % err) + logging.error("Failed to stop instance") return False # test every 10secs for 2min @@ -569,17 +718,18 @@ def ShutdownInstance(instance): time.sleep(10) else: # the shutdown did not succeed - logger.Error("shutdown of '%s' unsuccessful, using destroy" % instance) + logging.error("shutdown of '%s' unsuccessful, using destroy", instance) try: hyper.StopInstance(instance, force=True) except errors.HypervisorError, err: - logger.Error("Failed to stop instance: %s" % err) + logging.exception("Failed to stop instance") return False time.sleep(1) if instance.name in GetInstanceList(): - logger.Error("could not shutdown instance '%s' even by destroy") + logging.error("could not shutdown instance '%s' even by destroy", + instance.name) return False return True @@ -596,7 +746,7 @@ def RebootInstance(instance, reboot_type, extra_args): running_instances = GetInstanceList() if instance.name not in running_instances: - logger.Error("Cannot reboot instance that is not running") + logging.error("Cannot reboot instance that is not running") return False hyper = hypervisor.GetHypervisor() @@ -604,14 +754,14 @@ def RebootInstance(instance, reboot_type, extra_args): try: hyper.RebootInstance(instance) except errors.HypervisorError, err: - logger.Error("Failed to soft reboot instance: %s" % err) + logging.exception("Failed to soft reboot instance") return False elif reboot_type == constants.INSTANCE_REBOOT_HARD: try: ShutdownInstance(instance) StartInstance(instance, extra_args) except errors.HypervisorError, err: - logger.Error("Failed to hard reboot instance: %s" % err) + logging.exception("Failed to hard reboot instance") return False else: raise errors.ParameterError("reboot_type invalid") @@ -620,14 +770,30 @@ def RebootInstance(instance, reboot_type, extra_args): return True +def MigrateInstance(instance, target, live): + """Migrates an instance to another node. + + """ + hyper = hypervisor.GetHypervisor() + + try: + hyper.MigrateInstance(instance, target, live) + except errors.HypervisorError, err: + msg = "Failed to migrate instance: %s" % str(err) + logging.error(msg) + return (False, msg) + return (True, "Migration successfull") + + def CreateBlockDevice(disk, size, owner, on_primary, info): """Creates a block device for an instance. Args: - bdev: a ganeti.objects.Disk object - size: the size of the physical underlying devices - do_open: if the device should be `Assemble()`-d and - `Open()`-ed after creation + disk: a ganeti.objects.Disk object + size: the size of the physical underlying device + owner: a string with the name of the instance + on_primary: a boolean indicating if it is the primary node or not + info: string that will be sent to the physical device creation Returns: the new unique_id of the device (this can sometime be @@ -643,13 +809,11 @@ def CreateBlockDevice(disk, size, owner, on_primary, info): # we need the children open in case the device itself has to # be assembled crdev.Open() - else: - crdev.Close() clist.append(crdev) try: device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist) if device is not None: - logger.Info("removing existing device %s" % disk) + logging.info("removing existing device %s", disk) device.Remove() except errors.BlockDeviceError, err: pass @@ -660,7 +824,11 @@ def CreateBlockDevice(disk, size, owner, on_primary, info): raise ValueError("Can't create child device for %s, %s" % (disk, size)) if on_primary or disk.AssembleOnSecondary(): - device.Assemble() + if not device.Assemble(): + errorstring = "Can't assemble device after creation" + logging.error(errorstring) + raise errors.BlockDeviceError("%s, very unusual event - check the node" + " daemon logs" % errorstring) device.SetSyncSpeed(constants.SYNC_SPEED) if on_primary or disk.OpenOnSecondary(): device.Open(force=True) @@ -685,7 +853,7 @@ def RemoveBlockDevice(disk): rdev = _RecursiveFindBD(disk, allow_partial=True) except errors.BlockDeviceError, err: # probably can't attach - logger.Info("Can't attach to device %s in remove" % disk) + logging.info("Can't attach to device %s in remove", disk) rdev = None if rdev is not None: r_path = rdev.dev_path @@ -719,8 +887,20 @@ def _RecursiveAssembleBD(disk, owner, as_primary): """ children = [] if disk.children: + mcn = disk.ChildrenNeeded() + if mcn == -1: + mcn = 0 # max number of Nones allowed + else: + mcn = len(disk.children) - mcn # max number of Nones for chld_disk in disk.children: - children.append(_RecursiveAssembleBD(chld_disk, owner, as_primary)) + try: + cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary) + except errors.BlockDeviceError, err: + if children.count(None) >= mcn: + raise + cdev = None + logging.debug("Error in child activation: %s", str(err)) + children.append(cdev) if as_primary or disk.AssembleOnSecondary(): r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children) @@ -728,8 +908,6 @@ def _RecursiveAssembleBD(disk, owner, as_primary): result = r_dev if as_primary or disk.OpenOnSecondary(): r_dev.Open() - else: - r_dev.Close() DevCacheManager.UpdateCache(r_dev.dev_path, owner, as_primary, disk.iv_name) @@ -785,12 +963,12 @@ def MirrorAddChildren(parent_cdev, new_cdevs): """ parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True) if parent_bdev is None: - logger.Error("Can't find parent device") + logging.error("Can't find parent device") return False new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs] if new_bdevs.count(None) > 0: - logger.Error("Can't find new device(s) to add: %s:%s" % - (new_bdevs, new_cdevs)) + logging.error("Can't find new device(s) to add: %s:%s", + new_bdevs, new_cdevs) return False parent_bdev.AddChildren(new_bdevs) return True @@ -802,14 +980,22 @@ def MirrorRemoveChildren(parent_cdev, new_cdevs): """ parent_bdev = _RecursiveFindBD(parent_cdev) if parent_bdev is None: - logger.Error("Can't find parent in remove children: %s" % parent_cdev) + logging.error("Can't find parent in remove children: %s", parent_cdev) return False - new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs] - if new_bdevs.count(None) > 0: - logger.Error("Can't find some devices while removing children: %s %s" % - (new_cdevs, new_bdevs)) - return False - parent_bdev.RemoveChildren(new_bdevs) + devs = [] + for disk in new_cdevs: + rpath = disk.StaticDevPath() + if rpath is None: + bd = _RecursiveFindBD(disk) + if bd is None: + logging.error("Can't find dynamic device %s while removing children", + disk) + return False + else: + devs.append(bd.dev_path) + else: + devs.append(rpath) + parent_bdev.RemoveChildren(devs) return True @@ -872,8 +1058,7 @@ def FindBlockDevice(disk): rbd = _RecursiveFindBD(disk) if rbd is None: return rbd - sync_p, est_t, is_degr = rbd.GetSyncStatus() - return rbd.dev_path, rbd.major, rbd.minor, sync_p, est_t, is_degr + return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus() def UploadFile(file_name, data, mode, uid, gid, atime, mtime): @@ -884,32 +1069,25 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime): """ if not os.path.isabs(file_name): - logger.Error("Filename passed to UploadFile is not absolute: '%s'" % - file_name) + logging.error("Filename passed to UploadFile is not absolute: '%s'", + file_name) return False - allowed_files = [constants.CLUSTER_CONF_FILE, "/etc/hosts", - constants.SSH_KNOWN_HOSTS_FILE] + allowed_files = [ + constants.CLUSTER_CONF_FILE, + constants.ETC_HOSTS, + constants.SSH_KNOWN_HOSTS_FILE, + constants.VNC_PASSWORD_FILE, + ] allowed_files.extend(ssconf.SimpleStore().GetFileList()) + if file_name not in allowed_files: - logger.Error("Filename passed to UploadFile not in allowed" - " upload targets: '%s'" % file_name) + logging.error("Filename passed to UploadFile not in allowed" + " upload targets: '%s'", file_name) return False - dir_name, small_name = os.path.split(file_name) - fd, new_name = tempfile.mkstemp('.new', small_name, dir_name) - # here we need to make sure we remove the temp file, if any error - # leaves it in place - try: - os.chown(new_name, uid, gid) - os.chmod(new_name, mode) - os.write(fd, data) - os.fsync(fd) - os.utime(new_name, (atime, mtime)) - os.rename(new_name, file_name) - finally: - os.close(fd) - utils.RemoveFile(new_name) + utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid, + atime=atime, mtime=mtime) return True @@ -928,28 +1106,6 @@ def _ErrnoOrStr(err): return detail -def _OSSearch(name, search_path=None): - """Search for OSes with the given name in the search_path. - - Args: - name: The name of the OS to look for - search_path: List of dirs to search (defaults to constants.OS_SEARCH_PATH) - - Returns: - The base_dir the OS resides in - - """ - if search_path is None: - search_path = constants.OS_SEARCH_PATH - - for dir in search_path: - t_os_dir = os.path.sep.join([dir, name]) - if os.path.isdir(t_os_dir): - return dir - - return None - - def _OSOndiskVersion(name, os_dir): """Compute and return the API version of a given OS. @@ -1006,16 +1162,16 @@ def DiagnoseOS(top_dirs=None): top_dirs = constants.OS_SEARCH_PATH result = [] - for dir in top_dirs: - if os.path.isdir(dir): + for dir_name in top_dirs: + if os.path.isdir(dir_name): try: - f_names = utils.ListVisibleFiles(dir) + f_names = utils.ListVisibleFiles(dir_name) except EnvironmentError, err: - logger.Error("Can't list the OS directory %s: %s" % (dir,str(err))) + logging.exception("Can't list the OS directory %s", dir_name) break for name in f_names: try: - os_inst = OSFromDisk(name, base_dir=dir) + os_inst = OSFromDisk(name, base_dir=dir_name) result.append(os_inst) except errors.InvalidOS, err: result.append(objects.OS.FromInvalidOS(err)) @@ -1038,12 +1194,12 @@ def OSFromDisk(name, base_dir=None): """ if base_dir is None: - base_dir = _OSSearch(name) - - if base_dir is None: - raise errors.InvalidOS(name, None, "OS dir not found in search path") + os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir) + if os_dir is None: + raise errors.InvalidOS(name, None, "OS dir not found in search path") + else: + os_dir = os.path.sep.join([base_dir, name]) - os_dir = os.path.sep.join([base_dir, name]) api_version = _OSOndiskVersion(name, os_dir) if api_version != constants.OS_API_VERSION: @@ -1080,6 +1236,32 @@ def OSFromDisk(name, base_dir=None): api_version=api_version) +def GrowBlockDevice(disk, amount): + """Grow a stack of block devices. + + This function is called recursively, with the childrens being the + first one resize. + + Args: + disk: the disk to be grown + + Returns: a tuple of (status, result), with: + status: the result (true/false) of the operation + result: the error message if the operation failed, otherwise not used + + """ + r_dev = _RecursiveFindBD(disk) + if r_dev is None: + return False, "Cannot find block device %s" % (disk,) + + try: + r_dev.Grow(amount) + except errors.BlockDeviceError, err: + return False, str(err) + + return True, None + + def SnapshotBlockDevice(disk): """Create a snapshot copy of a block device. @@ -1112,7 +1294,7 @@ def SnapshotBlockDevice(disk): return None else: raise errors.ProgrammerError("Cannot snapshot non-lvm block device" - "'%s' of type '%s'" % + " '%s' of type '%s'" % (disk.unique_id, disk.dev_type)) @@ -1157,9 +1339,8 @@ def ExportSnapshot(disk, dest_node, instance): destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s", destdir, destdir, destfile) - remotecmd = ssh.BuildSSHCmd(dest_node, constants.GANETI_RUNAS, destcmd) - - + remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS, + destcmd) # all commands have been checked, so we're safe to combine them command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)]) @@ -1167,9 +1348,8 @@ def ExportSnapshot(disk, dest_node, instance): result = utils.RunCmd(command) if result.failed: - logger.Error("os snapshot export command '%s' returned error: %s" - " output: %s" % - (command, result.fail_reason, result.output)) + logging.error("os snapshot export command '%s' returned error: %s" + " output: %s", command, result.fail_reason, result.output) return False return True @@ -1203,13 +1383,18 @@ def FinalizeExport(instance, snap_disks): config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory) config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus) config.set(constants.INISECT_INS, 'disk_template', instance.disk_template) + + nic_count = 0 for nic_count, nic in enumerate(instance.nics): config.set(constants.INISECT_INS, 'nic%d_mac' % nic_count, '%s' % nic.mac) config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip) + config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count, + '%s' % nic.bridge) # TODO: redundant: on load can read nics until it doesn't exist config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count) + disk_count = 0 for disk_count, disk in enumerate(snap_disks): config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count, ('%s' % disk.iv_name)) @@ -1273,12 +1458,12 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): os_device = instance.FindDisk(os_disk) if os_device is None: - logger.Error("Can't find this device-visible name '%s'" % os_disk) + logging.error("Can't find this device-visible name '%s'", os_disk) return False swap_device = instance.FindDisk(swap_disk) if swap_device is None: - logger.Error("Can't find this device-visible name '%s'" % swap_disk) + logging.error("Can't find this device-visible name '%s'", swap_disk) return False real_os_dev = _RecursiveFindBD(os_device) @@ -1299,7 +1484,8 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): os.mkdir(constants.LOG_OS_DIR, 0750) destcmd = utils.BuildShellCmd('cat %s', src_image) - remotecmd = ssh.BuildSSHCmd(src_node, constants.GANETI_RUNAS, destcmd) + remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS, + destcmd) comprcmd = "gunzip" impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)", @@ -1308,13 +1494,13 @@ def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image): logfile) command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd]) + env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()} - result = utils.RunCmd(command) + result = utils.RunCmd(command, env=env) if result.failed: - logger.Error("os import command '%s' returned error: %s" - " output: %s" % - (command, result.fail_reason, result.output)) + logging.error("os import command '%s' returned error: %s" + " output: %s", command, result.fail_reason, result.output) return False return True @@ -1375,12 +1561,208 @@ def RenameBlockDevices(devlist): # cache? for now, we only lose lvm data when we rename, which # is less critical than DRBD or MD except errors.BlockDeviceError, err: - logger.Error("Can't rename device '%s' to '%s': %s" % - (dev, unique_id, err)) + logging.exception("Can't rename device '%s' to '%s'", dev, unique_id) result = False return result +def _TransformFileStorageDir(file_storage_dir): + """Checks whether given file_storage_dir is valid. + + Checks wheter the given file_storage_dir is within the cluster-wide + default file_storage_dir stored in SimpleStore. Only paths under that + directory are allowed. + + Args: + file_storage_dir: string with path + + Returns: + normalized file_storage_dir (string) if valid, None otherwise + + """ + file_storage_dir = os.path.normpath(file_storage_dir) + base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir() + if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) == + base_file_storage_dir): + logging.error("file storage directory '%s' is not under base file" + " storage directory '%s'", + file_storage_dir, base_file_storage_dir) + return None + return file_storage_dir + + +def CreateFileStorageDir(file_storage_dir): + """Create file storage directory. + + Args: + file_storage_dir: string containing the path + + Returns: + tuple with first element a boolean indicating wheter dir + creation was successful or not + + """ + file_storage_dir = _TransformFileStorageDir(file_storage_dir) + result = True, + if not file_storage_dir: + result = False, + else: + if os.path.exists(file_storage_dir): + if not os.path.isdir(file_storage_dir): + logging.error("'%s' is not a directory", file_storage_dir) + result = False, + else: + try: + os.makedirs(file_storage_dir, 0750) + except OSError, err: + logging.error("Cannot create file storage directory '%s': %s", + file_storage_dir, err) + result = False, + return result + + +def RemoveFileStorageDir(file_storage_dir): + """Remove file storage directory. + + Remove it only if it's empty. If not log an error and return. + + Args: + file_storage_dir: string containing the path + + Returns: + tuple with first element a boolean indicating wheter dir + removal was successful or not + + """ + file_storage_dir = _TransformFileStorageDir(file_storage_dir) + result = True, + if not file_storage_dir: + result = False, + else: + if os.path.exists(file_storage_dir): + if not os.path.isdir(file_storage_dir): + logging.error("'%s' is not a directory", file_storage_dir) + result = False, + # deletes dir only if empty, otherwise we want to return False + try: + os.rmdir(file_storage_dir) + except OSError, err: + logging.exception("Cannot remove file storage directory '%s'", + file_storage_dir) + result = False, + return result + + +def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir): + """Rename the file storage directory. + + Args: + old_file_storage_dir: string containing the old path + new_file_storage_dir: string containing the new path + + Returns: + tuple with first element a boolean indicating wheter dir + rename was successful or not + + """ + old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir) + new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir) + result = True, + if not old_file_storage_dir or not new_file_storage_dir: + result = False, + else: + if not os.path.exists(new_file_storage_dir): + if os.path.isdir(old_file_storage_dir): + try: + os.rename(old_file_storage_dir, new_file_storage_dir) + except OSError, err: + logging.exception("Cannot rename '%s' to '%s'", + old_file_storage_dir, new_file_storage_dir) + result = False, + else: + logging.error("'%s' is not a directory", old_file_storage_dir) + result = False, + else: + if os.path.exists(old_file_storage_dir): + logging.error("Cannot rename '%s' to '%s'. Both locations exist.", + old_file_storage_dir, new_file_storage_dir) + result = False, + return result + + +def _IsJobQueueFile(file_name): + """Checks whether the given filename is in the queue directory. + + """ + queue_dir = os.path.normpath(constants.QUEUE_DIR) + result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir) + + if not result: + logging.error("'%s' is not a file in the queue directory", + file_name) + + return result + + +def JobQueueUpdate(file_name, content): + """Updates a file in the queue directory. + + """ + if not _IsJobQueueFile(file_name): + return False + + # Write and replace the file atomically + utils.WriteFile(file_name, data=content) + + return True + + +def JobQueuePurge(): + """Removes job queue files and archived jobs + + """ + # The lock must not be removed, otherwise another process could create + # it again. + return _JobQueuePurge(keep_lock=True) + + +def JobQueueRename(old, new): + """Renames a job queue file. + + """ + if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)): + return False + + os.rename(old, new) + + return True + + +def CloseBlockDevices(disks): + """Closes the given block devices. + + This means they will be switched to secondary mode (in case of DRBD). + + """ + bdevs = [] + for cf in disks: + rd = _RecursiveFindBD(cf) + if rd is None: + return (False, "Can't find device %s" % cf) + bdevs.append(rd) + + msg = [] + for rd in bdevs: + try: + rd.Close() + except errors.BlockDeviceError, err: + msg.append(str(err)) + if msg: + return (False, "Can't make devices secondary: %s" % ",".join(msg)) + else: + return (True, "All devices secondary") + + class HooksRunner(object): """Hook runner. @@ -1396,9 +1778,6 @@ class HooksRunner(object): Args: - hooks_base_dir: if not None, this overrides the constants.HOOKS_BASE_DIR (useful for unittests) - - logs_base_dir: if not None, this overrides the - constants.LOG_HOOKS_DIR (useful for unittests) - - logging: enable or disable logging of script output """ if hooks_base_dir is None: @@ -1410,7 +1789,6 @@ class HooksRunner(object): """Exec one hook script. Args: - - phase: the phase - script: the full path to the script - env: the environment with which to exec the script @@ -1421,7 +1799,7 @@ class HooksRunner(object): fdstdin = open("/dev/null", "r") child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, - shell=False, cwd="/",env=env) + shell=False, cwd="/", env=env) output = "" try: output = child.stdout.read(4096) @@ -1445,7 +1823,7 @@ class HooksRunner(object): fd.close() except EnvironmentError, err: # just log the error - #logger.Error("While closing fd %s: %s" % (fd, err)) + #logging.exception("Error while closing fd %s", fd) pass return result == 0, output @@ -1492,8 +1870,44 @@ class HooksRunner(object): return rr +class IAllocatorRunner(object): + """IAllocator runner. + + This class is instantiated on the node side (ganeti-noded) and not on + the master side. + + """ + def Run(self, name, idata): + """Run an iallocator script. + + Return value: tuple of: + - run status (one of the IARUN_ constants) + - stdout + - stderr + - fail reason (as from utils.RunResult) + + """ + alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH, + os.path.isfile) + if alloc_script is None: + return (constants.IARUN_NOTFOUND, None, None, None) + + fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.") + try: + os.write(fd, idata) + os.close(fd) + result = utils.RunCmd([alloc_script, fin_name]) + if result.failed: + return (constants.IARUN_FAILURE, result.stdout, result.stderr, + result.fail_reason) + finally: + os.unlink(fin_name) + + return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None) + + class DevCacheManager(object): - """Simple class for managing a chache of block device information. + """Simple class for managing a cache of block device information. """ _DEV_PREFIX = "/dev/" @@ -1518,6 +1932,9 @@ class DevCacheManager(object): """Updates the cache information for a given device. """ + if dev_path is None: + logging.error("DevCacheManager.UpdateCache got a None dev_path") + return fpath = cls._ConvertPath(dev_path) if on_primary: state = "primary" @@ -1529,17 +1946,18 @@ class DevCacheManager(object): try: utils.WriteFile(fpath, data=fdata) except EnvironmentError, err: - logger.Error("Can't update bdev cache for %s, error %s" % - (dev_path, str(err))) + logging.exception("Can't update bdev cache for %s", dev_path) @classmethod def RemoveCache(cls, dev_path): """Remove data for a dev_path. """ + if dev_path is None: + logging.error("DevCacheManager.RemoveCache got a None dev_path") + return fpath = cls._ConvertPath(dev_path) try: utils.RemoveFile(fpath) except EnvironmentError, err: - logger.Error("Can't update bdev cache for %s, error %s" % - (dev_path, str(err))) + logging.exception("Can't update bdev cache for %s", dev_path)