X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/8a6dbce8e7257c6c3b2510f7473c44557b1cee6d..92158c35ec991cbf164f01f5ce8efbf897cd5c2e:/lib/bdev.py diff --git a/lib/bdev.py b/lib/bdev.py index 7f34699..7226f1f 100644 --- a/lib/bdev.py +++ b/lib/bdev.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. +# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -29,6 +29,7 @@ import stat import pyparsing as pyp import os import logging +import math from ganeti import utils from ganeti import errors @@ -37,12 +38,20 @@ from ganeti import objects from ganeti import compat from ganeti import netutils from ganeti import pathutils +from ganeti import serializer # Size of reads in _CanReadDevice _DEVICE_READ_SIZE = 128 * 1024 +class RbdShowmappedJsonError(Exception): + """`rbd showmmapped' JSON formatting error Exception class. + + """ + pass + + def _IgnoreError(fn, *args, **kwargs): """Executes the given function, ignoring BlockDeviceErrors. @@ -75,6 +84,17 @@ def _ThrowError(msg, *args): raise errors.BlockDeviceError(msg) +def _CheckResult(result): + """Throws an error if the given result is a failed one. + + @param result: result from RunCmd + + """ + if result.failed: + _ThrowError("Command: %s error: %s - %s", result.cmd, result.fail_reason, + result.output) + + def _CanReadDevice(path): """Check if we can read from the given device. @@ -89,6 +109,58 @@ def _CanReadDevice(path): return False +def _GetForbiddenFileStoragePaths(): + """Builds a list of path prefixes which shouldn't be used for file storage. + + @rtype: frozenset + + """ + paths = set([ + "/boot", + "/dev", + "/etc", + "/home", + "/proc", + "/root", + "/sys", + ]) + + for prefix in ["", "/usr", "/usr/local"]: + paths.update(map(lambda s: "%s/%s" % (prefix, s), + ["bin", "lib", "lib32", "lib64", "sbin"])) + + return compat.UniqueFrozenset(map(os.path.normpath, paths)) + + +def _ComputeWrongFileStoragePaths(paths, + _forbidden=_GetForbiddenFileStoragePaths()): + """Cross-checks a list of paths for prefixes considered bad. + + Some paths, e.g. "/bin", should not be used for file storage. + + @type paths: list + @param paths: List of paths to be checked + @rtype: list + @return: Sorted list of paths for which the user should be warned + + """ + def _Check(path): + return (not os.path.isabs(path) or + path in _forbidden or + filter(lambda p: utils.IsBelowDir(p, path), _forbidden)) + + return utils.NiceSort(filter(_Check, map(os.path.normpath, paths))) + + +def ComputeWrongFileStoragePaths(_filename=pathutils.FILE_STORAGE_PATHS_FILE): + """Returns a list of file storage paths whose prefix is considered bad. + + See L{_ComputeWrongFileStoragePaths}. + + """ + return _ComputeWrongFileStoragePaths(_LoadAllowedFileStoragePaths(_filename)) + + def _CheckFileStoragePath(path, allowed): """Checks if a path is in a list of allowed paths for file storage. @@ -112,10 +184,12 @@ def _CheckFileStoragePath(path, allowed): break else: raise errors.FileStoragePathError("Path '%s' is not acceptable for file" - " storage" % path) + " storage. A possible fix might be to add" + " it to /etc/ganeti/file-storage-paths" + " on all nodes." % path) -def LoadAllowedFileStoragePaths(filename): +def _LoadAllowedFileStoragePaths(filename): """Loads file containing allowed file storage paths. @rtype: list @@ -138,7 +212,13 @@ def CheckFileStoragePath(path, _filename=pathutils.FILE_STORAGE_PATHS_FILE): @raise errors.FileStoragePathError: If the path is not allowed """ - _CheckFileStoragePath(path, LoadAllowedFileStoragePaths(_filename)) + allowed = _LoadAllowedFileStoragePaths(_filename) + + if _ComputeWrongFileStoragePaths([path]): + raise errors.FileStoragePathError("Path '%s' uses a forbidden prefix" % + path) + + _CheckFileStoragePath(path, allowed) class BlockDev(object): @@ -177,7 +257,7 @@ class BlockDev(object): an attached instance (lvcreate) - attaching of a python instance to an existing (real) device - The second point, the attachement to a device, is different + The second point, the attachment to a device, is different depending on whether the device is assembled or not. At init() time, we search for a device with the same unique_id as us. If found, good. It also means that the device is already assembled. If not, @@ -221,7 +301,7 @@ class BlockDev(object): raise NotImplementedError @classmethod - def Create(cls, unique_id, children, size, params): + def Create(cls, unique_id, children, size, params, excl_stor): """Create the device. If the device cannot be created, it will return None @@ -434,8 +514,8 @@ class LogicalVolume(BlockDev): """ _VALID_NAME_RE = re.compile("^[a-zA-Z0-9+_.-]*$") - _INVALID_NAMES = frozenset([".", "..", "snapshot", "pvmove"]) - _INVALID_SUBSTRINGS = frozenset(["_mlog", "_mimage"]) + _INVALID_NAMES = compat.UniqueFrozenset([".", "..", "snapshot", "pvmove"]) + _INVALID_SUBSTRINGS = compat.UniqueFrozenset(["_mlog", "_mimage"]) def __init__(self, unique_id, children, size, params): """Attaches to a LV device. @@ -454,8 +534,45 @@ class LogicalVolume(BlockDev): self.major = self.minor = self.pe_size = self.stripe_count = None self.Attach() + @staticmethod + def _GetStdPvSize(pvs_info): + """Return the the standard PV size (used with exclusive storage). + + @param pvs_info: list of objects.LvmPvInfo, cannot be empty + @rtype: float + @return: size in MiB + + """ + assert len(pvs_info) > 0 + smallest = min([pv.size for pv in pvs_info]) + return smallest / (1 + constants.PART_MARGIN + constants.PART_RESERVED) + + @staticmethod + def _ComputeNumPvs(size, pvs_info): + """Compute the number of PVs needed for an LV (with exclusive storage). + + @type size: float + @param size: LV size in MiB + @param pvs_info: list of objects.LvmPvInfo, cannot be empty + @rtype: integer + @return: number of PVs needed + """ + assert len(pvs_info) > 0 + pv_size = float(LogicalVolume._GetStdPvSize(pvs_info)) + return int(math.ceil(float(size) / pv_size)) + + @staticmethod + def _GetEmptyPvNames(pvs_info, max_pvs=None): + """Return a list of empty PVs, by name. + + """ + empty_pvs = filter(objects.LvmPvInfo.IsEmpty, pvs_info) + if max_pvs is not None: + empty_pvs = empty_pvs[:max_pvs] + return map((lambda pv: pv.name), empty_pvs) + @classmethod - def Create(cls, unique_id, children, size, params): + def Create(cls, unique_id, children, size, params, excl_stor): """Create a new logical volume. """ @@ -467,33 +584,55 @@ class LogicalVolume(BlockDev): cls._ValidateName(lv_name) pvs_info = cls.GetPVInfo([vg_name]) if not pvs_info: - _ThrowError("Can't compute PV info for vg %s", vg_name) - pvs_info.sort() - pvs_info.reverse() + if excl_stor: + msg = "No (empty) PVs found" + else: + msg = "Can't compute PV info for vg %s" % vg_name + _ThrowError(msg) + pvs_info.sort(key=(lambda pv: pv.free), reverse=True) - pvlist = [pv[1] for pv in pvs_info] + pvlist = [pv.name for pv in pvs_info] if compat.any(":" in v for v in pvlist): _ThrowError("Some of your PVs have the invalid character ':' in their" " name, this is not supported - please filter them out" " in lvm.conf using either 'filter' or 'preferred_names'") - free_size = sum([pv[0] for pv in pvs_info]) + current_pvs = len(pvlist) desired_stripes = params[constants.LDP_STRIPES] stripes = min(current_pvs, desired_stripes) - if stripes < desired_stripes: - logging.warning("Could not use %d stripes for VG %s, as only %d PVs are" - " available.", desired_stripes, vg_name, current_pvs) - # The size constraint should have been checked from the master before - # calling the create function. - if free_size < size: - _ThrowError("Not enough free space: required %s," - " available %s", size, free_size) - cmd = ["lvcreate", "-L%dm" % size, "-n%s" % lv_name] + if excl_stor: + (err_msgs, _) = utils.LvmExclusiveCheckNodePvs(pvs_info) + if err_msgs: + for m in err_msgs: + logging.warning(m) + req_pvs = cls._ComputeNumPvs(size, pvs_info) + pvlist = cls._GetEmptyPvNames(pvs_info, req_pvs) + current_pvs = len(pvlist) + if current_pvs < req_pvs: + _ThrowError("Not enough empty PVs to create a disk of %d MB:" + " %d available, %d needed", size, current_pvs, req_pvs) + assert current_pvs == len(pvlist) + if stripes > current_pvs: + # No warning issued for this, as it's no surprise + stripes = current_pvs + + else: + if stripes < desired_stripes: + logging.warning("Could not use %d stripes for VG %s, as only %d PVs are" + " available.", desired_stripes, vg_name, current_pvs) + free_size = sum([pv.free for pv in pvs_info]) + # The size constraint should have been checked from the master before + # calling the create function. + if free_size < size: + _ThrowError("Not enough free space: required %s," + " available %s", size, free_size) + # If the free space is not well distributed, we won't be able to # create an optimally-striped volume; in that case, we want to try # with N, N-1, ..., 2, and finally 1 (non-stripped) number of # stripes + cmd = ["lvcreate", "-L%dm" % size, "-n%s" % lv_name] for stripes_arg in range(stripes, 0, -1): result = utils.RunCmd(cmd + ["-i%d" % stripes_arg] + [vg_name] + pvlist) if not result.failed: @@ -505,7 +644,7 @@ class LogicalVolume(BlockDev): @staticmethod def _GetVolumeInfo(lvm_cmd, fields): - """Returns LVM Volumen infos using lvm_cmd + """Returns LVM Volume infos using lvm_cmd @param lvm_cmd: Should be one of "pvs", "vgs" or "lvs" @param fields: Fields to return @@ -537,40 +676,84 @@ class LogicalVolume(BlockDev): return data @classmethod - def GetPVInfo(cls, vg_names, filter_allocatable=True): + def GetPVInfo(cls, vg_names, filter_allocatable=True, include_lvs=False): """Get the free space info for PVs in a volume group. @param vg_names: list of volume group names, if empty all will be returned @param filter_allocatable: whether to skip over unallocatable PVs + @param include_lvs: whether to include a list of LVs hosted on each PV @rtype: list - @return: list of tuples (free_space, name) with free_space in mebibytes + @return: list of objects.LvmPvInfo objects """ + # We request "lv_name" field only if we care about LVs, so we don't get + # a long list of entries with many duplicates unless we really have to. + # The duplicate "pv_name" field will be ignored. + if include_lvs: + lvfield = "lv_name" + else: + lvfield = "pv_name" try: info = cls._GetVolumeInfo("pvs", ["pv_name", "vg_name", "pv_free", - "pv_attr"]) + "pv_attr", "pv_size", lvfield]) except errors.GenericError, err: logging.error("Can't get PV information: %s", err) return None + # When asked for LVs, "pvs" may return multiple entries for the same PV-LV + # pair. We sort entries by PV name and then LV name, so it's easy to weed + # out duplicates. + if include_lvs: + info.sort(key=(lambda i: (i[0], i[5]))) data = [] - for pv_name, vg_name, pv_free, pv_attr in info: + lastpvi = None + for (pv_name, vg_name, pv_free, pv_attr, pv_size, lv_name) in info: # (possibly) skip over pvs which are not allocatable if filter_allocatable and pv_attr[0] != "a": continue # (possibly) skip over pvs which are not in the right volume group(s) if vg_names and vg_name not in vg_names: continue - data.append((float(pv_free), pv_name, vg_name)) + # Beware of duplicates (check before inserting) + if lastpvi and lastpvi.name == pv_name: + if include_lvs and lv_name: + if not lastpvi.lv_list or lastpvi.lv_list[-1] != lv_name: + lastpvi.lv_list.append(lv_name) + else: + if include_lvs and lv_name: + lvl = [lv_name] + else: + lvl = [] + lastpvi = objects.LvmPvInfo(name=pv_name, vg_name=vg_name, + size=float(pv_size), free=float(pv_free), + attributes=pv_attr, lv_list=lvl) + data.append(lastpvi) return data @classmethod - def GetVGInfo(cls, vg_names, filter_readonly=True): + def _GetExclusiveStorageVgFree(cls, vg_name): + """Return the free disk space in the given VG, in exclusive storage mode. + + @type vg_name: string + @param vg_name: VG name + @rtype: float + @return: free space in MiB + """ + pvs_info = cls.GetPVInfo([vg_name]) + if not pvs_info: + return 0.0 + pv_size = cls._GetStdPvSize(pvs_info) + num_pvs = len(cls._GetEmptyPvNames(pvs_info)) + return pv_size * num_pvs + + @classmethod + def GetVGInfo(cls, vg_names, excl_stor, filter_readonly=True): """Get the free space info for specific VGs. @param vg_names: list of volume group names, if empty all will be returned + @param excl_stor: whether exclusive_storage is enabled @param filter_readonly: whether to skip over readonly VGs @rtype: list @@ -593,6 +776,11 @@ class LogicalVolume(BlockDev): # (possibly) skip over vgs which are not in the right volume group(s) if vg_names and vg_name not in vg_names: continue + # Exclusive storage needs a different concept of free space + if excl_stor: + es_free = cls._GetExclusiveStorageVgFree(vg_name) + assert es_free <= vg_free + vg_free = es_free data.append((float(vg_free), float(vg_size), vg_name)) return data @@ -650,7 +838,7 @@ class LogicalVolume(BlockDev): """ self.attached = False result = utils.RunCmd(["lvs", "--noheadings", "--separator=,", - "--units=m", "--nosuffix", + "--units=k", "--nosuffix", "-olv_attr,lv_kernel_major,lv_kernel_minor," "vg_extent_size,stripes", self.dev_path]) if result.failed: @@ -789,7 +977,7 @@ class LogicalVolume(BlockDev): snap = LogicalVolume((self._vg_name, snap_name), None, size, self.params) _IgnoreError(snap.Remove) - vg_info = self.GetVGInfo([self._vg_name]) + vg_info = self.GetVGInfo([self._vg_name], False) if not vg_info: _ThrowError("Can't compute VG info for vg %s", self._vg_name) free_size, _, _ = vg_info[0] @@ -797,20 +985,33 @@ class LogicalVolume(BlockDev): _ThrowError("Not enough free space: required %s," " available %s", size, free_size) - result = utils.RunCmd(["lvcreate", "-L%dm" % size, "-s", - "-n%s" % snap_name, self.dev_path]) - if result.failed: - _ThrowError("command: %s error: %s - %s", - result.cmd, result.fail_reason, result.output) + _CheckResult(utils.RunCmd(["lvcreate", "-L%dm" % size, "-s", + "-n%s" % snap_name, self.dev_path])) return (self._vg_name, snap_name) + def _RemoveOldInfo(self): + """Try to remove old tags from the lv. + + """ + result = utils.RunCmd(["lvs", "-o", "tags", "--noheadings", "--nosuffix", + self.dev_path]) + _CheckResult(result) + + raw_tags = result.stdout.strip() + if raw_tags: + for tag in raw_tags.split(","): + _CheckResult(utils.RunCmd(["lvchange", "--deltag", + tag.strip(), self.dev_path])) + def SetInfo(self, text): """Update metadata with info text. """ BlockDev.SetInfo(self, text) + self._RemoveOldInfo() + # Replace invalid characters text = re.sub("^[^A-Za-z0-9_+.]", "_", text) text = re.sub("[^-A-Za-z0-9_+.]", "_", text) @@ -818,11 +1019,7 @@ class LogicalVolume(BlockDev): # Only up to 128 characters are allowed text = text[:128] - result = utils.RunCmd(["lvchange", "--addtag", text, - self.dev_path]) - if result.failed: - _ThrowError("Command: %s error: %s - %s", result.cmd, result.fail_reason, - result.output) + _CheckResult(utils.RunCmd(["lvchange", "--addtag", text, self.dev_path])) def Grow(self, amount, dryrun, backingstore): """Grow the logical volume. @@ -834,10 +1031,12 @@ class LogicalVolume(BlockDev): if not self.Attach(): _ThrowError("Can't attach to LV during Grow()") full_stripe_size = self.pe_size * self.stripe_count + # pe_size is in KB + amount *= 1024 rest = amount % full_stripe_size if rest != 0: amount += full_stripe_size - rest - cmd = ["lvextend", "-L", "+%dm" % amount] + cmd = ["lvextend", "-L", "+%dk" % amount] if dryrun: cmd.append("--test") # we try multiple algorithms since the 'best' ones might not have @@ -851,7 +1050,7 @@ class LogicalVolume(BlockDev): _ThrowError("Can't grow LV %s: %s", self.dev_path, result.output) -class DRBD8Status(object): +class DRBD8Status(object): # pylint: disable=R0902 """A DRBD status representation class. Note that this doesn't support unconfigured devices (cs:Unconfigured). @@ -880,7 +1079,7 @@ class DRBD8Status(object): CS_SYNCTARGET = "SyncTarget" CS_PAUSEDSYNCS = "PausedSyncS" CS_PAUSEDSYNCT = "PausedSyncT" - CSET_SYNC = frozenset([ + CSET_SYNC = compat.UniqueFrozenset([ CS_WFREPORTPARAMS, CS_STARTINGSYNCS, CS_STARTINGSYNCT, @@ -936,6 +1135,7 @@ class DRBD8Status(object): self.is_diskless = self.ldisk == self.DS_DISKLESS self.is_disk_uptodate = self.ldisk == self.DS_UPTODATE + self.peer_disk_uptodate = self.rdisk == self.DS_UPTODATE self.is_in_resync = self.cstatus in self.CSET_SYNC self.is_in_use = self.cstatus != self.CS_UNCONFIGURED @@ -976,7 +1176,7 @@ class BaseDRBD(BlockDev): # pylint: disable=W0223 _ST_WFCONNECTION = "WFConnection" _ST_CONNECTED = "Connected" - _STATUS_FILE = "/proc/drbd" + _STATUS_FILE = constants.DRBD_STATUS_FILE _USERMODE_HELPER_FILE = "/sys/module/drbd/parameters/usermode_helper" @staticmethod @@ -2101,7 +2301,7 @@ class DRBD8(BaseDRBD): self.Shutdown() @classmethod - def Create(cls, unique_id, children, size, params): + def Create(cls, unique_id, children, size, params, excl_stor): """Create a new DRBD8 device. Since DRBD devices are not created per se, just assembled, this @@ -2110,6 +2310,9 @@ class DRBD8(BaseDRBD): """ if len(children) != 2: raise errors.ProgrammerError("Invalid setup for the drbd device") + if excl_stor: + raise errors.ProgrammerError("DRBD device requested with" + " exclusive_storage") # check that the minor is unused aminor = unique_id[4] proc_info = cls._MassageProcData(cls._GetProcData()) @@ -2151,7 +2354,7 @@ class DRBD8(BaseDRBD): class FileStorage(BlockDev): """File device. - This class represents the a file storage backend device. + This class represents a file storage backend device. The unique_id for the file device is a (file_driver, file_path) tuple. @@ -2167,6 +2370,9 @@ class FileStorage(BlockDev): raise ValueError("Invalid configuration data %s" % str(unique_id)) self.driver = unique_id[0] self.dev_path = unique_id[1] + + CheckFileStoragePath(self.dev_path) + self.Attach() def Assemble(self): @@ -2272,7 +2478,7 @@ class FileStorage(BlockDev): _ThrowError("Can't stat %s: %s", self.dev_path, err) @classmethod - def Create(cls, unique_id, children, size, params): + def Create(cls, unique_id, children, size, params, excl_stor): """Create a new file. @param size: the size of file in MiB @@ -2281,9 +2487,16 @@ class FileStorage(BlockDev): @return: an instance of FileStorage """ + if excl_stor: + raise errors.ProgrammerError("FileStorage device requested with" + " exclusive_storage") if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2: raise ValueError("Invalid configuration data %s" % str(unique_id)) + dev_path = unique_id[1] + + CheckFileStoragePath(dev_path) + try: fd = os.open(dev_path, os.O_RDWR | os.O_CREAT | os.O_EXCL) f = os.fdopen(fd, "w") @@ -2333,12 +2546,15 @@ class PersistentBlockDevice(BlockDev): self.Attach() @classmethod - def Create(cls, unique_id, children, size, params): + def Create(cls, unique_id, children, size, params, excl_stor): """Create a new device This is a noop, we only return a PersistentBlockDevice instance """ + if excl_stor: + raise errors.ProgrammerError("Persistent block device requested with" + " exclusive_storage") return PersistentBlockDevice(unique_id, children, 0, params) def Remove(self): @@ -2430,7 +2646,7 @@ class RADOSBlockDevice(BlockDev): self.Attach() @classmethod - def Create(cls, unique_id, children, size, params): + def Create(cls, unique_id, children, size, params, excl_stor): """Create a new rbd device. Provision a new rbd volume inside a RADOS pool. @@ -2439,6 +2655,9 @@ class RADOSBlockDevice(BlockDev): if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2: raise errors.ProgrammerError("Invalid configuration data %s" % str(unique_id)) + if excl_stor: + raise errors.ProgrammerError("RBD device requested with" + " exclusive_storage") rbd_pool = params[constants.LDP_POOL] rbd_name = unique_id[1] @@ -2520,14 +2739,7 @@ class RADOSBlockDevice(BlockDev): name = unique_id[1] # Check if the mapping already exists. - showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool] - result = utils.RunCmd(showmap_cmd) - if result.failed: - _ThrowError("rbd showmapped failed (%s): %s", - result.fail_reason, result.output) - - rbd_dev = self._ParseRbdShowmappedOutput(result.output, name) - + rbd_dev = self._VolumeToBlockdev(pool, name) if rbd_dev: # The mapping exists. Return it. return rbd_dev @@ -2540,14 +2752,7 @@ class RADOSBlockDevice(BlockDev): result.fail_reason, result.output) # Find the corresponding rbd device. - showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool] - result = utils.RunCmd(showmap_cmd) - if result.failed: - _ThrowError("rbd map succeeded, but showmapped failed (%s): %s", - result.fail_reason, result.output) - - rbd_dev = self._ParseRbdShowmappedOutput(result.output, name) - + rbd_dev = self._VolumeToBlockdev(pool, name) if not rbd_dev: _ThrowError("rbd map succeeded, but could not find the rbd block" " device in output of showmapped, for volume: %s", name) @@ -2555,16 +2760,93 @@ class RADOSBlockDevice(BlockDev): # The device was successfully mapped. Return it. return rbd_dev + @classmethod + def _VolumeToBlockdev(cls, pool, volume_name): + """Do the 'volume name'-to-'rbd block device' resolving. + + @type pool: string + @param pool: RADOS pool to use + @type volume_name: string + @param volume_name: the name of the volume whose device we search for + @rtype: string or None + @return: block device path if the volume is mapped, else None + + """ + try: + # Newer versions of the rbd tool support json output formatting. Use it + # if available. + showmap_cmd = [ + constants.RBD_CMD, + "showmapped", + "-p", + pool, + "--format", + "json" + ] + result = utils.RunCmd(showmap_cmd) + if result.failed: + logging.error("rbd JSON output formatting returned error (%s): %s," + "falling back to plain output parsing", + result.fail_reason, result.output) + raise RbdShowmappedJsonError + + return cls._ParseRbdShowmappedJson(result.output, volume_name) + except RbdShowmappedJsonError: + # For older versions of rbd, we have to parse the plain / text output + # manually. + showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool] + result = utils.RunCmd(showmap_cmd) + if result.failed: + _ThrowError("rbd showmapped failed (%s): %s", + result.fail_reason, result.output) + + return cls._ParseRbdShowmappedPlain(result.output, volume_name) + + @staticmethod + def _ParseRbdShowmappedJson(output, volume_name): + """Parse the json output of `rbd showmapped'. + + This method parses the json output of `rbd showmapped' and returns the rbd + block device path (e.g. /dev/rbd0) that matches the given rbd volume. + + @type output: string + @param output: the json output of `rbd showmapped' + @type volume_name: string + @param volume_name: the name of the volume whose device we search for + @rtype: string or None + @return: block device path if the volume is mapped, else None + + """ + try: + devices = serializer.LoadJson(output) + except ValueError, err: + _ThrowError("Unable to parse JSON data: %s" % err) + + rbd_dev = None + for d in devices.values(): # pylint: disable=E1103 + try: + name = d["name"] + except KeyError: + _ThrowError("'name' key missing from json object %s", devices) + + if name == volume_name: + if rbd_dev is not None: + _ThrowError("rbd volume %s is mapped more than once", volume_name) + + rbd_dev = d["device"] + + return rbd_dev + @staticmethod - def _ParseRbdShowmappedOutput(output, volume_name): - """Parse the output of `rbd showmapped'. + def _ParseRbdShowmappedPlain(output, volume_name): + """Parse the (plain / text) output of `rbd showmapped'. This method parses the output of `rbd showmapped' and returns the rbd block device path (e.g. /dev/rbd0) that matches the given rbd volume. @type output: string - @param output: the whole output of `rbd showmapped' + @param output: the plain text output of `rbd showmapped' @type volume_name: string @param volume_name: the name of the volume whose device we search for @rtype: string or None @@ -2575,30 +2857,31 @@ class RADOSBlockDevice(BlockDev): volumefield = 2 devicefield = 4 - field_sep = "\t" - lines = output.splitlines() - splitted_lines = map(lambda l: l.split(field_sep), lines) - # Check empty output. + # Try parsing the new output format (ceph >= 0.55). + splitted_lines = map(lambda l: l.split(), lines) + + # Check for empty output. if not splitted_lines: - _ThrowError("rbd showmapped returned empty output") + return None - # Check showmapped header line, to determine number of fields. + # Check showmapped output, to determine number of fields. field_cnt = len(splitted_lines[0]) if field_cnt != allfields: - _ThrowError("Cannot parse rbd showmapped output because its format" - " seems to have changed; expected %s fields, found %s", - allfields, field_cnt) + # Parsing the new format failed. Fallback to parsing the old output + # format (< 0.55). + splitted_lines = map(lambda l: l.split("\t"), lines) + if field_cnt != allfields: + _ThrowError("Cannot parse rbd showmapped output expected %s fields," + " found %s", allfields, field_cnt) matched_lines = \ filter(lambda l: len(l) == allfields and l[volumefield] == volume_name, splitted_lines) if len(matched_lines) > 1: - _ThrowError("The rbd volume %s is mapped more than once." - " This shouldn't happen, try to unmap the extra" - " devices manually.", volume_name) + _ThrowError("rbd volume %s mapped more than once", volume_name) if matched_lines: # rbd block device found. Return it. @@ -2639,13 +2922,7 @@ class RADOSBlockDevice(BlockDev): name = unique_id[1] # Check if the mapping already exists. - showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool] - result = utils.RunCmd(showmap_cmd) - if result.failed: - _ThrowError("rbd showmapped failed [during unmap](%s): %s", - result.fail_reason, result.output) - - rbd_dev = self._ParseRbdShowmappedOutput(result.output, name) + rbd_dev = self._VolumeToBlockdev(pool, name) if rbd_dev: # The mapping exists. Unmap the rbd device. @@ -2701,11 +2978,393 @@ class RADOSBlockDevice(BlockDev): result.fail_reason, result.output) +class ExtStorageDevice(BlockDev): + """A block device provided by an ExtStorage Provider. + + This class implements the External Storage Interface, which means + handling of the externally provided block devices. + + """ + def __init__(self, unique_id, children, size, params): + """Attaches to an extstorage block device. + + """ + super(ExtStorageDevice, self).__init__(unique_id, children, size, params) + if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2: + raise ValueError("Invalid configuration data %s" % str(unique_id)) + + self.driver, self.vol_name = unique_id + self.ext_params = params + + self.major = self.minor = None + self.Attach() + + @classmethod + def Create(cls, unique_id, children, size, params, excl_stor): + """Create a new extstorage device. + + Provision a new volume using an extstorage provider, which will + then be mapped to a block device. + + """ + if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2: + raise errors.ProgrammerError("Invalid configuration data %s" % + str(unique_id)) + if excl_stor: + raise errors.ProgrammerError("extstorage device requested with" + " exclusive_storage") + + # Call the External Storage's create script, + # to provision a new Volume inside the External Storage + _ExtStorageAction(constants.ES_ACTION_CREATE, unique_id, + params, str(size)) + + return ExtStorageDevice(unique_id, children, size, params) + + def Remove(self): + """Remove the extstorage device. + + """ + if not self.minor and not self.Attach(): + # The extstorage device doesn't exist. + return + + # First shutdown the device (remove mappings). + self.Shutdown() + + # Call the External Storage's remove script, + # to remove the Volume from the External Storage + _ExtStorageAction(constants.ES_ACTION_REMOVE, self.unique_id, + self.ext_params) + + def Rename(self, new_id): + """Rename this device. + + """ + pass + + def Attach(self): + """Attach to an existing extstorage device. + + This method maps the extstorage volume that matches our name with + a corresponding block device and then attaches to this device. + + """ + self.attached = False + + # Call the External Storage's attach script, + # to attach an existing Volume to a block device under /dev + self.dev_path = _ExtStorageAction(constants.ES_ACTION_ATTACH, + self.unique_id, self.ext_params) + + try: + st = os.stat(self.dev_path) + except OSError, err: + logging.error("Error stat()'ing %s: %s", self.dev_path, str(err)) + return False + + if not stat.S_ISBLK(st.st_mode): + logging.error("%s is not a block device", self.dev_path) + return False + + self.major = os.major(st.st_rdev) + self.minor = os.minor(st.st_rdev) + self.attached = True + + return True + + def Assemble(self): + """Assemble the device. + + """ + pass + + def Shutdown(self): + """Shutdown the device. + + """ + if not self.minor and not self.Attach(): + # The extstorage device doesn't exist. + return + + # Call the External Storage's detach script, + # to detach an existing Volume from it's block device under /dev + _ExtStorageAction(constants.ES_ACTION_DETACH, self.unique_id, + self.ext_params) + + self.minor = None + self.dev_path = None + + def Open(self, force=False): + """Make the device ready for I/O. + + """ + pass + + def Close(self): + """Notifies that the device will no longer be used for I/O. + + """ + pass + + def Grow(self, amount, dryrun, backingstore): + """Grow the Volume. + + @type amount: integer + @param amount: the amount (in mebibytes) to grow with + @type dryrun: boolean + @param dryrun: whether to execute the operation in simulation mode + only, without actually increasing the size + + """ + if not backingstore: + return + if not self.Attach(): + _ThrowError("Can't attach to extstorage device during Grow()") + + if dryrun: + # we do not support dry runs of resize operations for now. + return + + new_size = self.size + amount + + # Call the External Storage's grow script, + # to grow an existing Volume inside the External Storage + _ExtStorageAction(constants.ES_ACTION_GROW, self.unique_id, + self.ext_params, str(self.size), grow=str(new_size)) + + def SetInfo(self, text): + """Update metadata with info text. + + """ + # Replace invalid characters + text = re.sub("^[^A-Za-z0-9_+.]", "_", text) + text = re.sub("[^-A-Za-z0-9_+.]", "_", text) + + # Only up to 128 characters are allowed + text = text[:128] + + # Call the External Storage's setinfo script, + # to set metadata for an existing Volume inside the External Storage + _ExtStorageAction(constants.ES_ACTION_SETINFO, self.unique_id, + self.ext_params, metadata=text) + + +def _ExtStorageAction(action, unique_id, ext_params, + size=None, grow=None, metadata=None): + """Take an External Storage action. + + Take an External Storage action concerning or affecting + a specific Volume inside the External Storage. + + @type action: string + @param action: which action to perform. One of: + create / remove / grow / attach / detach + @type unique_id: tuple (driver, vol_name) + @param unique_id: a tuple containing the type of ExtStorage (driver) + and the Volume name + @type ext_params: dict + @param ext_params: ExtStorage parameters + @type size: integer + @param size: the size of the Volume in mebibytes + @type grow: integer + @param grow: the new size in mebibytes (after grow) + @type metadata: string + @param metadata: metadata info of the Volume, for use by the provider + @rtype: None or a block device path (during attach) + + """ + driver, vol_name = unique_id + + # Create an External Storage instance of type `driver' + status, inst_es = ExtStorageFromDisk(driver) + if not status: + _ThrowError("%s" % inst_es) + + # Create the basic environment for the driver's scripts + create_env = _ExtStorageEnvironment(unique_id, ext_params, size, + grow, metadata) + + # Do not use log file for action `attach' as we need + # to get the output from RunResult + # TODO: find a way to have a log file for attach too + logfile = None + if action is not constants.ES_ACTION_ATTACH: + logfile = _VolumeLogName(action, driver, vol_name) + + # Make sure the given action results in a valid script + if action not in constants.ES_SCRIPTS: + _ThrowError("Action '%s' doesn't result in a valid ExtStorage script" % + action) + + # Find out which external script to run according the given action + script_name = action + "_script" + script = getattr(inst_es, script_name) + + # Run the external script + result = utils.RunCmd([script], env=create_env, + cwd=inst_es.path, output=logfile,) + if result.failed: + logging.error("External storage's %s command '%s' returned" + " error: %s, logfile: %s, output: %s", + action, result.cmd, result.fail_reason, + logfile, result.output) + + # If logfile is 'None' (during attach), it breaks TailFile + # TODO: have a log file for attach too + if action is not constants.ES_ACTION_ATTACH: + lines = [utils.SafeEncode(val) + for val in utils.TailFile(logfile, lines=20)] + else: + lines = result.output[-20:] + + _ThrowError("External storage's %s script failed (%s), last" + " lines of output:\n%s", + action, result.fail_reason, "\n".join(lines)) + + if action == constants.ES_ACTION_ATTACH: + return result.stdout + + +def ExtStorageFromDisk(name, base_dir=None): + """Create an ExtStorage instance from disk. + + This function will return an ExtStorage instance + if the given name is a valid ExtStorage name. + + @type base_dir: string + @keyword base_dir: Base directory containing ExtStorage installations. + Defaults to a search in all the ES_SEARCH_PATH dirs. + @rtype: tuple + @return: True and the ExtStorage instance if we find a valid one, or + False and the diagnose message on error + + """ + if base_dir is None: + es_base_dir = pathutils.ES_SEARCH_PATH + else: + es_base_dir = [base_dir] + + es_dir = utils.FindFile(name, es_base_dir, os.path.isdir) + + if es_dir is None: + return False, ("Directory for External Storage Provider %s not" + " found in search path" % name) + + # ES Files dictionary, we will populate it with the absolute path + # names; if the value is True, then it is a required file, otherwise + # an optional one + es_files = dict.fromkeys(constants.ES_SCRIPTS, True) + + es_files[constants.ES_PARAMETERS_FILE] = True + + for (filename, _) in es_files.items(): + es_files[filename] = utils.PathJoin(es_dir, filename) + + try: + st = os.stat(es_files[filename]) + except EnvironmentError, err: + return False, ("File '%s' under path '%s' is missing (%s)" % + (filename, es_dir, utils.ErrnoOrStr(err))) + + if not stat.S_ISREG(stat.S_IFMT(st.st_mode)): + return False, ("File '%s' under path '%s' is not a regular file" % + (filename, es_dir)) + + if filename in constants.ES_SCRIPTS: + if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR: + return False, ("File '%s' under path '%s' is not executable" % + (filename, es_dir)) + + parameters = [] + if constants.ES_PARAMETERS_FILE in es_files: + parameters_file = es_files[constants.ES_PARAMETERS_FILE] + try: + parameters = utils.ReadFile(parameters_file).splitlines() + except EnvironmentError, err: + return False, ("Error while reading the EXT parameters file at %s: %s" % + (parameters_file, utils.ErrnoOrStr(err))) + parameters = [v.split(None, 1) for v in parameters] + + es_obj = \ + objects.ExtStorage(name=name, path=es_dir, + create_script=es_files[constants.ES_SCRIPT_CREATE], + remove_script=es_files[constants.ES_SCRIPT_REMOVE], + grow_script=es_files[constants.ES_SCRIPT_GROW], + attach_script=es_files[constants.ES_SCRIPT_ATTACH], + detach_script=es_files[constants.ES_SCRIPT_DETACH], + setinfo_script=es_files[constants.ES_SCRIPT_SETINFO], + verify_script=es_files[constants.ES_SCRIPT_VERIFY], + supported_parameters=parameters) + return True, es_obj + + +def _ExtStorageEnvironment(unique_id, ext_params, + size=None, grow=None, metadata=None): + """Calculate the environment for an External Storage script. + + @type unique_id: tuple (driver, vol_name) + @param unique_id: ExtStorage pool and name of the Volume + @type ext_params: dict + @param ext_params: the EXT parameters + @type size: string + @param size: size of the Volume (in mebibytes) + @type grow: string + @param grow: new size of Volume after grow (in mebibytes) + @type metadata: string + @param metadata: metadata info of the Volume + @rtype: dict + @return: dict of environment variables + + """ + vol_name = unique_id[1] + + result = {} + result["VOL_NAME"] = vol_name + + # EXT params + for pname, pvalue in ext_params.items(): + result["EXTP_%s" % pname.upper()] = str(pvalue) + + if size is not None: + result["VOL_SIZE"] = size + + if grow is not None: + result["VOL_NEW_SIZE"] = grow + + if metadata is not None: + result["VOL_METADATA"] = metadata + + return result + + +def _VolumeLogName(kind, es_name, volume): + """Compute the ExtStorage log filename for a given Volume and operation. + + @type kind: string + @param kind: the operation type (e.g. create, remove etc.) + @type es_name: string + @param es_name: the ExtStorage name + @type volume: string + @param volume: the name of the Volume inside the External Storage + + """ + # Check if the extstorage log dir is a valid dir + if not os.path.isdir(pathutils.LOG_ES_DIR): + _ThrowError("Cannot find log directory: %s", pathutils.LOG_ES_DIR) + + # TODO: Use tempfile.mkstemp to create unique filename + base = ("%s-%s-%s-%s.log" % + (kind, es_name, volume, utils.TimestampForFilename())) + return utils.PathJoin(pathutils.LOG_ES_DIR, base) + + DEV_MAP = { constants.LD_LV: LogicalVolume, constants.LD_DRBD8: DRBD8, constants.LD_BLOCKDEV: PersistentBlockDevice, constants.LD_RBD: RADOSBlockDevice, + constants.LD_EXT: ExtStorageDevice, } if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE: @@ -2769,7 +3428,7 @@ def Assemble(disk, children): return device -def Create(disk, children): +def Create(disk, children, excl_stor): """Create a device. @type disk: L{objects.Disk} @@ -2777,10 +3436,12 @@ def Create(disk, children): @type children: list of L{bdev.BlockDev} @param children: the list of block devices that are children of the device represented by the disk parameter + @type excl_stor: boolean + @param excl_stor: Whether exclusive_storage is active """ _VerifyDiskType(disk.dev_type) _VerifyDiskParams(disk) device = DEV_MAP[disk.dev_type].Create(disk.physical_id, children, disk.size, - disk.params) + disk.params, excl_stor) return device