X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/3536c79204aa16d87846d262520882526b9d3f54..cfda0e48eb3238187dbb74bb56d54ca87747684c:/lib/backend.py diff --git a/lib/backend.py b/lib/backend.py index 45b4b46..0c05299 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. +# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -506,6 +506,15 @@ def VerifyNode(what, cluster_name): val = "Error while checking hypervisor: %s" % str(err) tmp[hv_name] = val + if constants.NV_HVPARAMS in what and vm_capable: + result[constants.NV_HVPARAMS] = tmp = [] + for source, hv_name, hvparms in what[constants.NV_HVPARAMS]: + try: + logging.info("Validating hv %s, %s", hv_name, hvparms) + hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms) + except errors.HypervisorError, err: + tmp.append((source, hv_name, str(err))) + if constants.NV_FILELIST in what: result[constants.NV_FILELIST] = utils.FingerprintFiles( what[constants.NV_FILELIST]) @@ -552,6 +561,22 @@ def VerifyNode(what, cluster_name): result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port, source=source) + if constants.NV_OOB_PATHS in what: + result[constants.NV_OOB_PATHS] = tmp = [] + for path in what[constants.NV_OOB_PATHS]: + try: + st = os.stat(path) + except OSError, err: + tmp.append("error stating out of band helper: %s" % err) + else: + if stat.S_ISREG(st.st_mode): + if stat.S_IMODE(st.st_mode) & stat.S_IXUSR: + tmp.append(None) + else: + tmp.append("out of band helper %s is not executable" % path) + else: + tmp.append("out of band helper %s is not a file" % path) + if constants.NV_LVLIST in what and vm_capable: try: val = GetVolumeList(utils.ListVolumeGroups().keys()) @@ -622,11 +647,50 @@ def VerifyNode(what, cluster_name): return result +def GetBlockDevSizes(devices): + """Return the size of the given block devices + + @type devices: list + @param devices: list of block device nodes to query + @rtype: dict + @return: + dictionary of all block devices under /dev (key). The value is their + size in MiB. + + {'/dev/disk/by-uuid/123456-12321231-312312-312': 124} + + """ + DEV_PREFIX = "/dev/" + blockdevs = {} + + for devpath in devices: + if os.path.commonprefix([DEV_PREFIX, devpath]) != DEV_PREFIX: + continue + + try: + st = os.stat(devpath) + except EnvironmentError, err: + logging.warning("Error stat()'ing device %s: %s", devpath, str(err)) + continue + + if stat.S_ISBLK(st.st_mode): + result = utils.RunCmd(["blockdev", "--getsize64", devpath]) + if result.failed: + # We don't want to fail, just do not list this device as available + logging.warning("Cannot get size for block device %s", devpath) + continue + + size = int(result.stdout) / (1024 * 1024) + blockdevs[devpath] = size + return blockdevs + + def GetVolumeList(vg_names): """Compute list of logical volumes and their size. @type vg_names: list - @param vg_names: the volume groups whose LVs we should list + @param vg_names: the volume groups whose LVs we should list, or + empty for all volume groups @rtype: dict @return: dictionary of all partions (key) with value being a tuple of @@ -640,6 +704,8 @@ def GetVolumeList(vg_names): """ lvs = {} sep = '|' + if not vg_names: + vg_names = [] result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix", "--separator=%s" % sep, "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names) @@ -1178,10 +1244,21 @@ def AcceptInstance(instance, info, target): @param target: target host (usually ip), on this node """ + # TODO: why is this required only for DTS_EXT_MIRROR? + if instance.disk_template in constants.DTS_EXT_MIRROR: + # Create the symlinks, as the disks are not active + # in any way + try: + _GatherAndLinkBlockDevs(instance) + except errors.BlockDeviceError, err: + _Fail("Block device error: %s", err, exc=True) + hyper = hypervisor.GetHypervisor(instance.hypervisor) try: hyper.AcceptInstance(instance, info, target) except errors.HypervisorError, err: + if instance.disk_template in constants.DTS_EXT_MIRROR: + _RemoveBlockDevLinks(instance.name, instance.disks) _Fail("Failed to accept instance: %s", err, exc=True) @@ -1337,6 +1414,41 @@ def BlockdevWipe(disk, offset, size): _WipeDevice(rdev.dev_path, offset, size) +def BlockdevPauseResumeSync(disks, pause): + """Pause or resume the sync of the block device. + + @type disks: list of L{objects.Disk} + @param disks: the disks object we want to pause/resume + @type pause: bool + @param pause: Wheater to pause or resume + + """ + success = [] + for disk in disks: + try: + rdev = _RecursiveFindBD(disk) + except errors.BlockDeviceError: + rdev = None + + if not rdev: + success.append((False, ("Cannot change sync for device %s:" + " device not found" % disk.iv_name))) + continue + + result = rdev.PauseResumeSync(pause) + + if result: + success.append((result, None)) + else: + if pause: + msg = "Pause" + else: + msg = "Resume" + success.append((result, "%s for device %s failed" % (msg, disk.iv_name))) + + return success + + def BlockdevRemove(disk): """Remove a block device. @@ -1429,7 +1541,7 @@ def _RecursiveAssembleBD(disk, owner, as_primary): return result -def BlockdevAssemble(disk, owner, as_primary): +def BlockdevAssemble(disk, owner, as_primary, idx): """Activate a block device for an instance. This is a wrapper over _RecursiveAssembleBD. @@ -1444,8 +1556,12 @@ def BlockdevAssemble(disk, owner, as_primary): if isinstance(result, bdev.BlockDev): # pylint: disable-msg=E1103 result = result.dev_path + if as_primary: + _SymlinkBlockDev(owner, result, idx) except errors.BlockDeviceError, err: _Fail("Error while assembling disk: %s", err, exc=True) + except OSError, err: + _Fail("Error while symlinking disk: %s", err, exc=True) return result @@ -2187,7 +2303,7 @@ def FinalizeExport(instance, snap_disks): config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time())) config.set(constants.INISECT_EXP, 'source', instance.primary_node) config.set(constants.INISECT_EXP, 'os', instance.os) - config.set(constants.INISECT_EXP, 'compression', 'gzip') + config.set(constants.INISECT_EXP, "compression", "none") config.add_section(constants.INISECT_INS) config.set(constants.INISECT_INS, 'name', instance.name) @@ -2337,15 +2453,15 @@ def BlockdevRename(devlist): _Fail("; ".join(msgs)) -def _TransformFileStorageDir(file_storage_dir): +def _TransformFileStorageDir(fs_dir): """Checks whether given file_storage_dir is valid. - Checks wheter the given file_storage_dir is within the cluster-wide - default file_storage_dir stored in SimpleStore. Only paths under that - directory are allowed. + Checks wheter the given fs_dir is within the cluster-wide default + file_storage_dir or the shared_file_storage_dir, which are stored in + SimpleStore. Only paths under those directories are allowed. - @type file_storage_dir: str - @param file_storage_dir: the path to check + @type fs_dir: str + @param fs_dir: the path to check @return: the normalized path if valid, None otherwise @@ -2353,13 +2469,15 @@ def _TransformFileStorageDir(file_storage_dir): if not constants.ENABLE_FILE_STORAGE: _Fail("File storage disabled at configure time") cfg = _GetConfig() - file_storage_dir = os.path.normpath(file_storage_dir) - base_file_storage_dir = cfg.GetFileStorageDir() - if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) != - base_file_storage_dir): + fs_dir = os.path.normpath(fs_dir) + base_fstore = cfg.GetFileStorageDir() + base_shared = cfg.GetSharedFileStorageDir() + if ((os.path.commonprefix([fs_dir, base_fstore]) != base_fstore) and + (os.path.commonprefix([fs_dir, base_shared]) != base_shared)): _Fail("File storage directory '%s' is not under base file" - " storage directory '%s'", file_storage_dir, base_file_storage_dir) - return file_storage_dir + " storage directory '%s' or shared storage directory '%s'", + fs_dir, base_fstore, base_shared) + return fs_dir def CreateFileStorageDir(file_storage_dir): @@ -2461,7 +2579,7 @@ def _EnsureJobQueueFile(file_name): def JobQueueUpdate(file_name, content): """Updates a file in the queue directory. - This is just a wrapper over L{utils.WriteFile}, with proper + This is just a wrapper over L{utils.io.WriteFile}, with proper checking. @type file_name: str @@ -2919,6 +3037,15 @@ def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs): if cmd_suffix: cmd.append("--cmd-suffix=%s" % cmd_suffix) + if mode == constants.IEM_EXPORT: + # Retry connection a few times when connecting to remote peer + cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES) + cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT) + elif opts.connect_timeout is not None: + assert mode == constants.IEM_IMPORT + # Overall timeout for establishing connection while listening + cmd.append("--connect-timeout=%s" % opts.connect_timeout) + logfile = _InstanceLogName(prefix, instance.os, instance.name) # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has @@ -3348,7 +3475,7 @@ class DevCacheManager(object): def RemoveCache(cls, dev_path): """Remove data for a dev_path. - This is just a wrapper over L{utils.RemoveFile} with a converted + This is just a wrapper over L{utils.io.RemoveFile} with a converted path name and logging. @type dev_path: str