#
#
-# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import re
import time
import errno
+import shlex
import stat
import pyparsing as pyp
import os
import logging
+import math
from ganeti import utils
from ganeti import errors
from ganeti import objects
from ganeti import compat
from ganeti import netutils
+from ganeti import pathutils
+from ganeti import serializer
# Size of reads in _CanReadDevice
_DEVICE_READ_SIZE = 128 * 1024
+class RbdShowmappedJsonError(Exception):
+ """`rbd showmmapped' JSON formatting error Exception class.
+
+ """
+ pass
+
+
def _IgnoreError(fn, *args, **kwargs):
"""Executes the given function, ignoring BlockDeviceErrors.
raise errors.BlockDeviceError(msg)
+def _CheckResult(result):
+ """Throws an error if the given result is a failed one.
+
+ @param result: result from RunCmd
+
+ """
+ if result.failed:
+ _ThrowError("Command: %s error: %s - %s", result.cmd, result.fail_reason,
+ result.output)
+
+
def _CanReadDevice(path):
"""Check if we can read from the given device.
return False
+def _GetForbiddenFileStoragePaths():
+ """Builds a list of path prefixes which shouldn't be used for file storage.
+
+ @rtype: frozenset
+
+ """
+ paths = set([
+ "/boot",
+ "/dev",
+ "/etc",
+ "/home",
+ "/proc",
+ "/root",
+ "/sys",
+ ])
+
+ for prefix in ["", "/usr", "/usr/local"]:
+ paths.update(map(lambda s: "%s/%s" % (prefix, s),
+ ["bin", "lib", "lib32", "lib64", "sbin"]))
+
+ return compat.UniqueFrozenset(map(os.path.normpath, paths))
+
+
+def _ComputeWrongFileStoragePaths(paths,
+ _forbidden=_GetForbiddenFileStoragePaths()):
+ """Cross-checks a list of paths for prefixes considered bad.
+
+ Some paths, e.g. "/bin", should not be used for file storage.
+
+ @type paths: list
+ @param paths: List of paths to be checked
+ @rtype: list
+ @return: Sorted list of paths for which the user should be warned
+
+ """
+ def _Check(path):
+ return (not os.path.isabs(path) or
+ path in _forbidden or
+ filter(lambda p: utils.IsBelowDir(p, path), _forbidden))
+
+ return utils.NiceSort(filter(_Check, map(os.path.normpath, paths)))
+
+
+def ComputeWrongFileStoragePaths(_filename=pathutils.FILE_STORAGE_PATHS_FILE):
+ """Returns a list of file storage paths whose prefix is considered bad.
+
+ See L{_ComputeWrongFileStoragePaths}.
+
+ """
+ return _ComputeWrongFileStoragePaths(_LoadAllowedFileStoragePaths(_filename))
+
+
+def _CheckFileStoragePath(path, allowed):
+ """Checks if a path is in a list of allowed paths for file storage.
+
+ @type path: string
+ @param path: Path to check
+ @type allowed: list
+ @param allowed: List of allowed paths
+ @raise errors.FileStoragePathError: If the path is not allowed
+
+ """
+ if not os.path.isabs(path):
+ raise errors.FileStoragePathError("File storage path must be absolute,"
+ " got '%s'" % path)
+
+ for i in allowed:
+ if not os.path.isabs(i):
+ logging.info("Ignoring relative path '%s' for file storage", i)
+ continue
+
+ if utils.IsBelowDir(i, path):
+ break
+ else:
+ raise errors.FileStoragePathError("Path '%s' is not acceptable for file"
+ " storage. A possible fix might be to add"
+ " it to /etc/ganeti/file-storage-paths"
+ " on all nodes." % path)
+
+
+def _LoadAllowedFileStoragePaths(filename):
+ """Loads file containing allowed file storage paths.
+
+ @rtype: list
+ @return: List of allowed paths (can be an empty list)
+
+ """
+ try:
+ contents = utils.ReadFile(filename)
+ except EnvironmentError:
+ return []
+ else:
+ return utils.FilterEmptyLinesAndComments(contents)
+
+
+def CheckFileStoragePath(path, _filename=pathutils.FILE_STORAGE_PATHS_FILE):
+ """Checks if a path is allowed for file storage.
+
+ @type path: string
+ @param path: Path to check
+ @raise errors.FileStoragePathError: If the path is not allowed
+
+ """
+ allowed = _LoadAllowedFileStoragePaths(_filename)
+
+ if _ComputeWrongFileStoragePaths([path]):
+ raise errors.FileStoragePathError("Path '%s' uses a forbidden prefix" %
+ path)
+
+ _CheckFileStoragePath(path, allowed)
+
+
class BlockDev(object):
"""Block device abstract class.
an attached instance (lvcreate)
- attaching of a python instance to an existing (real) device
- The second point, the attachement to a device, is different
+ The second point, the attachment to a device, is different
depending on whether the device is assembled or not. At init() time,
we search for a device with the same unique_id as us. If found,
good. It also means that the device is already assembled. If not,
raise NotImplementedError
@classmethod
- def Create(cls, unique_id, children, size, params):
+ def Create(cls, unique_id, children, size, params, excl_stor):
"""Create the device.
If the device cannot be created, it will return None
"""
raise NotImplementedError
- def SetSyncSpeed(self, speed):
- """Adjust the sync speed of the mirror.
+ def SetSyncParams(self, params):
+ """Adjust the synchronization parameters of the mirror.
In case this is not a mirroring device, this is no-op.
+ @param params: dictionary of LD level disk parameters related to the
+ synchronization.
+ @rtype: list
+ @return: a list of error messages, emitted both by the current node and by
+ children. An empty list means no errors.
+
"""
- result = True
+ result = []
if self._children:
for child in self._children:
- result = result and child.SetSyncSpeed(speed)
+ result.extend(child.SetSyncParams(params))
return result
def PauseResumeSync(self, pause):
In case this is not a mirroring device, this is no-op.
- @param pause: Wheater to pause or resume
+ @param pause: Whether to pause or resume
"""
result = True
for child in self._children:
child.SetInfo(text)
- def Grow(self, amount, dryrun):
+ def Grow(self, amount, dryrun, backingstore):
"""Grow the block device.
@type amount: integer
@type dryrun: boolean
@param dryrun: whether to execute the operation in simulation mode
only, without actually increasing the size
+ @param backingstore: whether to execute the operation on backing storage
+ only, or on "logical" storage only; e.g. DRBD is logical storage,
+ whereas LVM, file, RBD are backing storage
"""
raise NotImplementedError
"""
_VALID_NAME_RE = re.compile("^[a-zA-Z0-9+_.-]*$")
- _INVALID_NAMES = frozenset([".", "..", "snapshot", "pvmove"])
- _INVALID_SUBSTRINGS = frozenset(["_mlog", "_mimage"])
+ _INVALID_NAMES = compat.UniqueFrozenset([".", "..", "snapshot", "pvmove"])
+ _INVALID_SUBSTRINGS = compat.UniqueFrozenset(["_mlog", "_mimage"])
def __init__(self, unique_id, children, size, params):
"""Attaches to a LV device.
self.major = self.minor = self.pe_size = self.stripe_count = None
self.Attach()
+ @staticmethod
+ def _GetStdPvSize(pvs_info):
+ """Return the the standard PV size (used with exclusive storage).
+
+ @param pvs_info: list of objects.LvmPvInfo, cannot be empty
+ @rtype: float
+ @return: size in MiB
+
+ """
+ assert len(pvs_info) > 0
+ smallest = min([pv.size for pv in pvs_info])
+ return smallest / (1 + constants.PART_MARGIN + constants.PART_RESERVED)
+
+ @staticmethod
+ def _ComputeNumPvs(size, pvs_info):
+ """Compute the number of PVs needed for an LV (with exclusive storage).
+
+ @type size: float
+ @param size: LV size in MiB
+ @param pvs_info: list of objects.LvmPvInfo, cannot be empty
+ @rtype: integer
+ @return: number of PVs needed
+ """
+ assert len(pvs_info) > 0
+ pv_size = float(LogicalVolume._GetStdPvSize(pvs_info))
+ return int(math.ceil(float(size) / pv_size))
+
+ @staticmethod
+ def _GetEmptyPvNames(pvs_info, max_pvs=None):
+ """Return a list of empty PVs, by name.
+
+ """
+ empty_pvs = filter(objects.LvmPvInfo.IsEmpty, pvs_info)
+ if max_pvs is not None:
+ empty_pvs = empty_pvs[:max_pvs]
+ return map((lambda pv: pv.name), empty_pvs)
+
@classmethod
- def Create(cls, unique_id, children, size, params):
+ def Create(cls, unique_id, children, size, params, excl_stor):
"""Create a new logical volume.
"""
cls._ValidateName(lv_name)
pvs_info = cls.GetPVInfo([vg_name])
if not pvs_info:
- _ThrowError("Can't compute PV info for vg %s", vg_name)
- pvs_info.sort()
- pvs_info.reverse()
+ if excl_stor:
+ msg = "No (empty) PVs found"
+ else:
+ msg = "Can't compute PV info for vg %s" % vg_name
+ _ThrowError(msg)
+ pvs_info.sort(key=(lambda pv: pv.free), reverse=True)
- pvlist = [pv[1] for pv in pvs_info]
+ pvlist = [pv.name for pv in pvs_info]
if compat.any(":" in v for v in pvlist):
_ThrowError("Some of your PVs have the invalid character ':' in their"
" name, this is not supported - please filter them out"
" in lvm.conf using either 'filter' or 'preferred_names'")
- free_size = sum([pv[0] for pv in pvs_info])
+
current_pvs = len(pvlist)
- stripes = min(current_pvs, constants.LVM_STRIPECOUNT)
+ desired_stripes = params[constants.LDP_STRIPES]
+ stripes = min(current_pvs, desired_stripes)
+
+ if excl_stor:
+ (err_msgs, _) = utils.LvmExclusiveCheckNodePvs(pvs_info)
+ if err_msgs:
+ for m in err_msgs:
+ logging.warning(m)
+ req_pvs = cls._ComputeNumPvs(size, pvs_info)
+ pvlist = cls._GetEmptyPvNames(pvs_info, req_pvs)
+ current_pvs = len(pvlist)
+ if current_pvs < req_pvs:
+ _ThrowError("Not enough empty PVs to create a disk of %d MB:"
+ " %d available, %d needed", size, current_pvs, req_pvs)
+ assert current_pvs == len(pvlist)
+ if stripes > current_pvs:
+ # No warning issued for this, as it's no surprise
+ stripes = current_pvs
+
+ else:
+ if stripes < desired_stripes:
+ logging.warning("Could not use %d stripes for VG %s, as only %d PVs are"
+ " available.", desired_stripes, vg_name, current_pvs)
+ free_size = sum([pv.free for pv in pvs_info])
+ # The size constraint should have been checked from the master before
+ # calling the create function.
+ if free_size < size:
+ _ThrowError("Not enough free space: required %s,"
+ " available %s", size, free_size)
- # The size constraint should have been checked from the master before
- # calling the create function.
- if free_size < size:
- _ThrowError("Not enough free space: required %s,"
- " available %s", size, free_size)
- cmd = ["lvcreate", "-L%dm" % size, "-n%s" % lv_name]
# If the free space is not well distributed, we won't be able to
# create an optimally-striped volume; in that case, we want to try
# with N, N-1, ..., 2, and finally 1 (non-stripped) number of
# stripes
+ cmd = ["lvcreate", "-L%dm" % size, "-n%s" % lv_name]
for stripes_arg in range(stripes, 0, -1):
result = utils.RunCmd(cmd + ["-i%d" % stripes_arg] + [vg_name] + pvlist)
if not result.failed:
@staticmethod
def _GetVolumeInfo(lvm_cmd, fields):
- """Returns LVM Volumen infos using lvm_cmd
+ """Returns LVM Volume infos using lvm_cmd
@param lvm_cmd: Should be one of "pvs", "vgs" or "lvs"
@param fields: Fields to return
return data
@classmethod
- def GetPVInfo(cls, vg_names, filter_allocatable=True):
+ def GetPVInfo(cls, vg_names, filter_allocatable=True, include_lvs=False):
"""Get the free space info for PVs in a volume group.
@param vg_names: list of volume group names, if empty all will be returned
@param filter_allocatable: whether to skip over unallocatable PVs
+ @param include_lvs: whether to include a list of LVs hosted on each PV
@rtype: list
- @return: list of tuples (free_space, name) with free_space in mebibytes
+ @return: list of objects.LvmPvInfo objects
"""
+ # We request "lv_name" field only if we care about LVs, so we don't get
+ # a long list of entries with many duplicates unless we really have to.
+ # The duplicate "pv_name" field will be ignored.
+ if include_lvs:
+ lvfield = "lv_name"
+ else:
+ lvfield = "pv_name"
try:
info = cls._GetVolumeInfo("pvs", ["pv_name", "vg_name", "pv_free",
- "pv_attr"])
+ "pv_attr", "pv_size", lvfield])
except errors.GenericError, err:
logging.error("Can't get PV information: %s", err)
return None
+ # When asked for LVs, "pvs" may return multiple entries for the same PV-LV
+ # pair. We sort entries by PV name and then LV name, so it's easy to weed
+ # out duplicates.
+ if include_lvs:
+ info.sort(key=(lambda i: (i[0], i[5])))
data = []
- for pv_name, vg_name, pv_free, pv_attr in info:
+ lastpvi = None
+ for (pv_name, vg_name, pv_free, pv_attr, pv_size, lv_name) in info:
# (possibly) skip over pvs which are not allocatable
if filter_allocatable and pv_attr[0] != "a":
continue
# (possibly) skip over pvs which are not in the right volume group(s)
if vg_names and vg_name not in vg_names:
continue
- data.append((float(pv_free), pv_name, vg_name))
+ # Beware of duplicates (check before inserting)
+ if lastpvi and lastpvi.name == pv_name:
+ if include_lvs and lv_name:
+ if not lastpvi.lv_list or lastpvi.lv_list[-1] != lv_name:
+ lastpvi.lv_list.append(lv_name)
+ else:
+ if include_lvs and lv_name:
+ lvl = [lv_name]
+ else:
+ lvl = []
+ lastpvi = objects.LvmPvInfo(name=pv_name, vg_name=vg_name,
+ size=float(pv_size), free=float(pv_free),
+ attributes=pv_attr, lv_list=lvl)
+ data.append(lastpvi)
return data
@classmethod
- def GetVGInfo(cls, vg_names, filter_readonly=True):
+ def _GetExclusiveStorageVgFree(cls, vg_name):
+ """Return the free disk space in the given VG, in exclusive storage mode.
+
+ @type vg_name: string
+ @param vg_name: VG name
+ @rtype: float
+ @return: free space in MiB
+ """
+ pvs_info = cls.GetPVInfo([vg_name])
+ if not pvs_info:
+ return 0.0
+ pv_size = cls._GetStdPvSize(pvs_info)
+ num_pvs = len(cls._GetEmptyPvNames(pvs_info))
+ return pv_size * num_pvs
+
+ @classmethod
+ def GetVGInfo(cls, vg_names, excl_stor, filter_readonly=True):
"""Get the free space info for specific VGs.
@param vg_names: list of volume group names, if empty all will be returned
+ @param excl_stor: whether exclusive_storage is enabled
@param filter_readonly: whether to skip over readonly VGs
@rtype: list
# (possibly) skip over vgs which are not in the right volume group(s)
if vg_names and vg_name not in vg_names:
continue
+ # Exclusive storage needs a different concept of free space
+ if excl_stor:
+ es_free = cls._GetExclusiveStorageVgFree(vg_name)
+ assert es_free <= vg_free
+ vg_free = es_free
data.append((float(vg_free), float(vg_size), vg_name))
return data
"""
self.attached = False
result = utils.RunCmd(["lvs", "--noheadings", "--separator=,",
- "--units=m", "--nosuffix",
+ "--units=k", "--nosuffix",
"-olv_attr,lv_kernel_major,lv_kernel_minor,"
"vg_extent_size,stripes", self.dev_path])
if result.failed:
return False
status, major, minor, pe_size, stripes = out
- if len(status) != 6:
- logging.error("lvs lv_attr is not 6 characters (%s)", status)
+ if len(status) < 6:
+ logging.error("lvs lv_attr is not at least 6 characters (%s)", status)
return False
try:
snap = LogicalVolume((self._vg_name, snap_name), None, size, self.params)
_IgnoreError(snap.Remove)
- vg_info = self.GetVGInfo([self._vg_name])
+ vg_info = self.GetVGInfo([self._vg_name], False)
if not vg_info:
_ThrowError("Can't compute VG info for vg %s", self._vg_name)
free_size, _, _ = vg_info[0]
_ThrowError("Not enough free space: required %s,"
" available %s", size, free_size)
- result = utils.RunCmd(["lvcreate", "-L%dm" % size, "-s",
- "-n%s" % snap_name, self.dev_path])
- if result.failed:
- _ThrowError("command: %s error: %s - %s",
- result.cmd, result.fail_reason, result.output)
+ _CheckResult(utils.RunCmd(["lvcreate", "-L%dm" % size, "-s",
+ "-n%s" % snap_name, self.dev_path]))
return (self._vg_name, snap_name)
+ def _RemoveOldInfo(self):
+ """Try to remove old tags from the lv.
+
+ """
+ result = utils.RunCmd(["lvs", "-o", "tags", "--noheadings", "--nosuffix",
+ self.dev_path])
+ _CheckResult(result)
+
+ raw_tags = result.stdout.strip()
+ if raw_tags:
+ for tag in raw_tags.split(","):
+ _CheckResult(utils.RunCmd(["lvchange", "--deltag",
+ tag.strip(), self.dev_path]))
+
def SetInfo(self, text):
"""Update metadata with info text.
"""
BlockDev.SetInfo(self, text)
+ self._RemoveOldInfo()
+
# Replace invalid characters
text = re.sub("^[^A-Za-z0-9_+.]", "_", text)
text = re.sub("[^-A-Za-z0-9_+.]", "_", text)
# Only up to 128 characters are allowed
text = text[:128]
- result = utils.RunCmd(["lvchange", "--addtag", text,
- self.dev_path])
- if result.failed:
- _ThrowError("Command: %s error: %s - %s", result.cmd, result.fail_reason,
- result.output)
+ _CheckResult(utils.RunCmd(["lvchange", "--addtag", text, self.dev_path]))
- def Grow(self, amount, dryrun):
+ def Grow(self, amount, dryrun, backingstore):
"""Grow the logical volume.
"""
+ if not backingstore:
+ return
if self.pe_size is None or self.stripe_count is None:
if not self.Attach():
_ThrowError("Can't attach to LV during Grow()")
full_stripe_size = self.pe_size * self.stripe_count
+ # pe_size is in KB
+ amount *= 1024
rest = amount % full_stripe_size
if rest != 0:
amount += full_stripe_size - rest
- cmd = ["lvextend", "-L", "+%dm" % amount]
+ cmd = ["lvextend", "-L", "+%dk" % amount]
if dryrun:
cmd.append("--test")
# we try multiple algorithms since the 'best' ones might not have
CS_SYNCTARGET = "SyncTarget"
CS_PAUSEDSYNCS = "PausedSyncS"
CS_PAUSEDSYNCT = "PausedSyncT"
- CSET_SYNC = frozenset([
+ CSET_SYNC = compat.UniqueFrozenset([
CS_WFREPORTPARAMS,
CS_STARTINGSYNCS,
CS_STARTINGSYNCT,
_ST_WFCONNECTION = "WFConnection"
_ST_CONNECTED = "Connected"
- _STATUS_FILE = "/proc/drbd"
+ _STATUS_FILE = constants.DRBD_STATUS_FILE
_USERMODE_HELPER_FILE = "/sys/module/drbd/parameters/usermode_helper"
@staticmethod
first_line)
values = version.groups()
- retval = {"k_major": int(values[0]),
- "k_minor": int(values[1]),
- "k_point": int(values[2]),
- "api": int(values[3]),
- "proto": int(values[4]),
- }
+ retval = {
+ "k_major": int(values[0]),
+ "k_minor": int(values[1]),
+ "k_point": int(values[2]),
+ "api": int(values[3]),
+ "proto": int(values[4]),
+ }
if values[5] is not None:
retval["proto2"] = values[5]
def _CheckMetaSize(meta_device):
"""Check if the given meta device looks like a valid one.
- This currently only check the size, which must be around
+ This currently only checks the size, which must be around
128MiB.
"""
doesn't do anything to the supposed peer. If you need a fully
connected DRBD pair, you need to use this class on both hosts.
- The unique_id for the drbd device is the (local_ip, local_port,
- remote_ip, remote_port) tuple, and it must have two children: the
- data device and the meta_device. The meta device is checked for
- valid size and is zeroed on create.
+ The unique_id for the drbd device is a (local_ip, local_port,
+ remote_ip, remote_port, local_minor, secret) tuple, and it must have
+ two children: the data device and the meta_device. The meta device
+ is checked for valid size and is zeroed on create.
"""
_MAX_MINORS = 255
# timeout constants
_NET_RECONFIG_TIMEOUT = 60
+ # command line options for barriers
+ _DISABLE_DISK_OPTION = "--no-disk-barrier" # -a
+ _DISABLE_DRAIN_OPTION = "--no-disk-drain" # -D
+ _DISABLE_FLUSH_OPTION = "--no-disk-flushes" # -i
+ _DISABLE_META_FLUSH_OPTION = "--no-md-flushes" # -m
+
def __init__(self, unique_id, children, size, params):
if children and children.count(None) > 0:
children = []
def _GetShowParser(cls):
"""Return a parser for `drbd show` output.
- This will either create or return an already-create parser for the
+ This will either create or return an already-created parser for the
output of the command `drbd show`.
"""
defa = pyp.Literal("_is_default").suppress()
dbl_quote = pyp.Literal('"').suppress()
- keyword = pyp.Word(pyp.alphanums + '-')
+ keyword = pyp.Word(pyp.alphanums + "-")
# value types
- value = pyp.Word(pyp.alphanums + '_-/.:')
+ value = pyp.Word(pyp.alphanums + "_-/.:")
quoted = dbl_quote + pyp.CharsNotIn('"') + dbl_quote
ipv4_addr = (pyp.Optional(pyp.Literal("ipv4")).suppress() +
pyp.Word(pyp.nums + ".") + colon + number)
info["remote_addr"] == (self._rhost, self._rport))
return retval
- @classmethod
- def _AssembleLocal(cls, minor, backend, meta, size):
+ def _AssembleLocal(self, minor, backend, meta, size):
"""Configure the local part of a DRBD device.
"""
- args = ["drbdsetup", cls._DevPath(minor), "disk",
+ args = ["drbdsetup", self._DevPath(minor), "disk",
backend, meta, "0",
"-e", "detach",
"--create-device"]
if size:
args.extend(["-d", "%sm" % size])
- if not constants.DRBD_BARRIERS: # disable barriers, if configured so
- version = cls._GetVersion(cls._GetProcData())
- # various DRBD versions support different disk barrier options;
- # what we aim here is to revert back to the 'drain' method of
- # disk flushes and to disable metadata barriers, in effect going
- # back to pre-8.0.7 behaviour
- vmaj = version["k_major"]
- vmin = version["k_minor"]
- vrel = version["k_point"]
- assert vmaj == 8
- if vmin == 0: # 8.0.x
- if vrel >= 12:
- args.extend(["-i", "-m"])
- elif vmin == 2: # 8.2.x
- if vrel >= 7:
- args.extend(["-i", "-m"])
- elif vmaj >= 3: # 8.3.x or newer
- args.extend(["-i", "-a", "m"])
+
+ version = self._GetVersion(self._GetProcData())
+ vmaj = version["k_major"]
+ vmin = version["k_minor"]
+ vrel = version["k_point"]
+
+ barrier_args = \
+ self._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
+ self.params[constants.LDP_BARRIERS],
+ self.params[constants.LDP_NO_META_FLUSH])
+ args.extend(barrier_args)
+
+ if self.params[constants.LDP_DISK_CUSTOM]:
+ args.extend(shlex.split(self.params[constants.LDP_DISK_CUSTOM]))
+
result = utils.RunCmd(args)
if result.failed:
_ThrowError("drbd%d: can't attach local disk: %s", minor, result.output)
+ @classmethod
+ def _ComputeDiskBarrierArgs(cls, vmaj, vmin, vrel, disabled_barriers,
+ disable_meta_flush):
+ """Compute the DRBD command line parameters for disk barriers
+
+ Returns a list of the disk barrier parameters as requested via the
+ disabled_barriers and disable_meta_flush arguments, and according to the
+ supported ones in the DRBD version vmaj.vmin.vrel
+
+ If the desired option is unsupported, raises errors.BlockDeviceError.
+
+ """
+ disabled_barriers_set = frozenset(disabled_barriers)
+ if not disabled_barriers_set in constants.DRBD_VALID_BARRIER_OPT:
+ raise errors.BlockDeviceError("%s is not a valid option set for DRBD"
+ " barriers" % disabled_barriers)
+
+ args = []
+
+ # The following code assumes DRBD 8.x, with x < 4 and x != 1 (DRBD 8.1.x
+ # does not exist)
+ if not vmaj == 8 and vmin in (0, 2, 3):
+ raise errors.BlockDeviceError("Unsupported DRBD version: %d.%d.%d" %
+ (vmaj, vmin, vrel))
+
+ def _AppendOrRaise(option, min_version):
+ """Helper for DRBD options"""
+ if min_version is not None and vrel >= min_version:
+ args.append(option)
+ else:
+ raise errors.BlockDeviceError("Could not use the option %s as the"
+ " DRBD version %d.%d.%d does not support"
+ " it." % (option, vmaj, vmin, vrel))
+
+ # the minimum version for each feature is encoded via pairs of (minor
+ # version -> x) where x is version in which support for the option was
+ # introduced.
+ meta_flush_supported = disk_flush_supported = {
+ 0: 12,
+ 2: 7,
+ 3: 0,
+ }
+
+ disk_drain_supported = {
+ 2: 7,
+ 3: 0,
+ }
+
+ disk_barriers_supported = {
+ 3: 0,
+ }
+
+ # meta flushes
+ if disable_meta_flush:
+ _AppendOrRaise(cls._DISABLE_META_FLUSH_OPTION,
+ meta_flush_supported.get(vmin, None))
+
+ # disk flushes
+ if constants.DRBD_B_DISK_FLUSH in disabled_barriers_set:
+ _AppendOrRaise(cls._DISABLE_FLUSH_OPTION,
+ disk_flush_supported.get(vmin, None))
+
+ # disk drain
+ if constants.DRBD_B_DISK_DRAIN in disabled_barriers_set:
+ _AppendOrRaise(cls._DISABLE_DRAIN_OPTION,
+ disk_drain_supported.get(vmin, None))
+
+ # disk barriers
+ if constants.DRBD_B_DISK_BARRIERS in disabled_barriers_set:
+ _AppendOrRaise(cls._DISABLE_DISK_OPTION,
+ disk_barriers_supported.get(vmin, None))
+
+ return args
+
def _AssembleNet(self, minor, net_info, protocol,
dual_pri=False, hmac=None, secret=None):
"""Configure the network part of the device.
# sync speed only after setting up both sides can race with DRBD
# connecting, hence we set it here before telling DRBD anything
# about its peer.
- sync_speed = self.params.get(constants.RESYNC_RATE)
- self._SetMinorSyncSpeed(minor, sync_speed)
+ sync_errors = self._SetMinorSyncParams(minor, self.params)
+ if sync_errors:
+ _ThrowError("drbd%d: can't set the synchronization parameters: %s" %
+ (minor, utils.CommaJoin(sync_errors)))
if netutils.IP6Address.IsValid(lhost):
if not netutils.IP6Address.IsValid(rhost):
args.append("-m")
if hmac and secret:
args.extend(["-a", hmac, "-x", secret])
+
+ if self.params[constants.LDP_NET_CUSTOM]:
+ args.extend(shlex.split(self.params[constants.LDP_NET_CUSTOM]))
+
result = utils.RunCmd(args)
if result.failed:
_ThrowError("drbd%d: can't setup network: %s - %s",
self._children = []
@classmethod
- def _SetMinorSyncSpeed(cls, minor, kbytes):
- """Set the speed of the DRBD syncer.
+ def _SetMinorSyncParams(cls, minor, params):
+ """Set the parameters of the DRBD syncer.
This is the low-level implementation.
@type minor: int
@param minor: the drbd minor whose settings we change
- @type kbytes: int
- @param kbytes: the speed in kbytes/second
- @rtype: boolean
- @return: the success of the operation
+ @type params: dict
+ @param params: LD level disk parameters related to the synchronization
+ @rtype: list
+ @return: a list of error messages
"""
- result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "syncer",
- "-r", "%d" % kbytes, "--create-device"])
+
+ args = ["drbdsetup", cls._DevPath(minor), "syncer"]
+ if params[constants.LDP_DYNAMIC_RESYNC]:
+ version = cls._GetVersion(cls._GetProcData())
+ vmin = version["k_minor"]
+ vrel = version["k_point"]
+
+ # By definition we are using 8.x, so just check the rest of the version
+ # number
+ if vmin != 3 or vrel < 9:
+ msg = ("The current DRBD version (8.%d.%d) does not support the "
+ "dynamic resync speed controller" % (vmin, vrel))
+ logging.error(msg)
+ return [msg]
+
+ if params[constants.LDP_PLAN_AHEAD] == 0:
+ msg = ("A value of 0 for c-plan-ahead disables the dynamic sync speed"
+ " controller at DRBD level. If you want to disable it, please"
+ " set the dynamic-resync disk parameter to False.")
+ logging.error(msg)
+ return [msg]
+
+ # add the c-* parameters to args
+ args.extend(["--c-plan-ahead", params[constants.LDP_PLAN_AHEAD],
+ "--c-fill-target", params[constants.LDP_FILL_TARGET],
+ "--c-delay-target", params[constants.LDP_DELAY_TARGET],
+ "--c-max-rate", params[constants.LDP_MAX_RATE],
+ "--c-min-rate", params[constants.LDP_MIN_RATE],
+ ])
+
+ else:
+ args.extend(["-r", "%d" % params[constants.LDP_RESYNC_RATE]])
+
+ args.append("--create-device")
+ result = utils.RunCmd(args)
if result.failed:
- logging.error("Can't change syncer rate: %s - %s",
- result.fail_reason, result.output)
- return not result.failed
+ msg = ("Can't change syncer rate: %s - %s" %
+ (result.fail_reason, result.output))
+ logging.error(msg)
+ return [msg]
- def SetSyncSpeed(self, kbytes):
- """Set the speed of the DRBD syncer.
+ return []
- @type kbytes: int
- @param kbytes: the speed in kbytes/second
- @rtype: boolean
- @return: the success of the operation
+ def SetSyncParams(self, params):
+ """Set the synchronization parameters of the DRBD syncer.
+
+ @type params: dict
+ @param params: LD level disk parameters related to the synchronization
+ @rtype: list
+ @return: a list of error messages, emitted both by the current node and by
+ children. An empty list means no errors
"""
if self.minor is None:
- logging.info("Not attached during SetSyncSpeed")
- return False
- children_result = super(DRBD8, self).SetSyncSpeed(kbytes)
- return self._SetMinorSyncSpeed(self.minor, kbytes) and children_result
+ err = "Not attached during SetSyncParams"
+ logging.info(err)
+ return [err]
+
+ children_result = super(DRBD8, self).SetSyncParams(params)
+ children_result.extend(self._SetMinorSyncParams(self.minor, params))
+ return children_result
def PauseResumeSync(self, pause):
"""Pauses or resumes the sync of a DRBD device.
# the device
self._SlowAssemble()
- sync_speed = self.params.get(constants.RESYNC_RATE)
- self.SetSyncSpeed(sync_speed)
+ sync_errors = self.SetSyncParams(self.params)
+ if sync_errors:
+ _ThrowError("drbd%d: can't set the synchronization parameters: %s" %
+ (self.minor, utils.CommaJoin(sync_errors)))
def _SlowAssemble(self):
"""Assembles the DRBD device from a (partially) configured device.
self.Shutdown()
@classmethod
- def Create(cls, unique_id, children, size, params):
+ def Create(cls, unique_id, children, size, params, excl_stor):
"""Create a new DRBD8 device.
Since DRBD devices are not created per se, just assembled, this
"""
if len(children) != 2:
raise errors.ProgrammerError("Invalid setup for the drbd device")
+ if excl_stor:
+ raise errors.ProgrammerError("DRBD device requested with"
+ " exclusive_storage")
# check that the minor is unused
aminor = unique_id[4]
proc_info = cls._MassageProcData(cls._GetProcData())
cls._InitMeta(aminor, meta.dev_path)
return cls(unique_id, children, size, params)
- def Grow(self, amount, dryrun):
+ def Grow(self, amount, dryrun, backingstore):
"""Resize the DRBD device and its backing storage.
"""
_ThrowError("drbd%d: Grow called while not attached", self._aminor)
if len(self._children) != 2 or None in self._children:
_ThrowError("drbd%d: cannot grow diskless device", self.minor)
- self._children[0].Grow(amount, dryrun)
- if dryrun:
- # DRBD does not support dry-run mode, so we'll return here
+ self._children[0].Grow(amount, dryrun, backingstore)
+ if dryrun or backingstore:
+ # DRBD does not support dry-run mode and is not backing storage,
+ # so we'll return here
return
result = utils.RunCmd(["drbdsetup", self.dev_path, "resize", "-s",
"%dm" % (self.size + amount)])
raise ValueError("Invalid configuration data %s" % str(unique_id))
self.driver = unique_id[0]
self.dev_path = unique_id[1]
+
+ CheckFileStoragePath(self.dev_path)
+
self.Attach()
def Assemble(self):
# TODO: implement rename for file-based storage
_ThrowError("Rename is not supported for file-based storage")
- def Grow(self, amount, dryrun):
+ def Grow(self, amount, dryrun, backingstore):
"""Grow the file
@param amount: the amount (in mebibytes) to grow with
"""
+ if not backingstore:
+ return
# Check that the file exists
self.Assemble()
current_size = self.GetActualSize()
_ThrowError("Can't stat %s: %s", self.dev_path, err)
@classmethod
- def Create(cls, unique_id, children, size, params):
+ def Create(cls, unique_id, children, size, params, excl_stor):
"""Create a new file.
@param size: the size of file in MiB
@return: an instance of FileStorage
"""
+ if excl_stor:
+ raise errors.ProgrammerError("FileStorage device requested with"
+ " exclusive_storage")
if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
raise ValueError("Invalid configuration data %s" % str(unique_id))
+
dev_path = unique_id[1]
+
+ CheckFileStoragePath(dev_path)
+
try:
fd = os.open(dev_path, os.O_RDWR | os.O_CREAT | os.O_EXCL)
f = os.fdopen(fd, "w")
self.Attach()
@classmethod
- def Create(cls, unique_id, children, size, params):
+ def Create(cls, unique_id, children, size, params, excl_stor):
"""Create a new device
This is a noop, we only return a PersistentBlockDevice instance
"""
+ if excl_stor:
+ raise errors.ProgrammerError("Persistent block device requested with"
+ " exclusive_storage")
return PersistentBlockDevice(unique_id, children, 0, params)
def Remove(self):
"""
pass
- def Grow(self, amount, dryrun):
+ def Grow(self, amount, dryrun, backingstore):
"""Grow the logical volume.
"""
_ThrowError("Grow is not supported for PersistentBlockDev storage")
+class RADOSBlockDevice(BlockDev):
+ """A RADOS Block Device (rbd).
+
+ This class implements the RADOS Block Device for the backend. You need
+ the rbd kernel driver, the RADOS Tools and a working RADOS cluster for
+ this to be functional.
+
+ """
+ def __init__(self, unique_id, children, size, params):
+ """Attaches to an rbd device.
+
+ """
+ super(RADOSBlockDevice, self).__init__(unique_id, children, size, params)
+ if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
+ raise ValueError("Invalid configuration data %s" % str(unique_id))
+
+ self.driver, self.rbd_name = unique_id
+
+ self.major = self.minor = None
+ self.Attach()
+
+ @classmethod
+ def Create(cls, unique_id, children, size, params, excl_stor):
+ """Create a new rbd device.
+
+ Provision a new rbd volume inside a RADOS pool.
+
+ """
+ if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
+ raise errors.ProgrammerError("Invalid configuration data %s" %
+ str(unique_id))
+ if excl_stor:
+ raise errors.ProgrammerError("RBD device requested with"
+ " exclusive_storage")
+ rbd_pool = params[constants.LDP_POOL]
+ rbd_name = unique_id[1]
+
+ # Provision a new rbd volume (Image) inside the RADOS cluster.
+ cmd = [constants.RBD_CMD, "create", "-p", rbd_pool,
+ rbd_name, "--size", "%s" % size]
+ result = utils.RunCmd(cmd)
+ if result.failed:
+ _ThrowError("rbd creation failed (%s): %s",
+ result.fail_reason, result.output)
+
+ return RADOSBlockDevice(unique_id, children, size, params)
+
+ def Remove(self):
+ """Remove the rbd device.
+
+ """
+ rbd_pool = self.params[constants.LDP_POOL]
+ rbd_name = self.unique_id[1]
+
+ if not self.minor and not self.Attach():
+ # The rbd device doesn't exist.
+ return
+
+ # First shutdown the device (remove mappings).
+ self.Shutdown()
+
+ # Remove the actual Volume (Image) from the RADOS cluster.
+ cmd = [constants.RBD_CMD, "rm", "-p", rbd_pool, rbd_name]
+ result = utils.RunCmd(cmd)
+ if result.failed:
+ _ThrowError("Can't remove Volume from cluster with rbd rm: %s - %s",
+ result.fail_reason, result.output)
+
+ def Rename(self, new_id):
+ """Rename this device.
+
+ """
+ pass
+
+ def Attach(self):
+ """Attach to an existing rbd device.
+
+ This method maps the rbd volume that matches our name with
+ an rbd device and then attaches to this device.
+
+ """
+ self.attached = False
+
+ # Map the rbd volume to a block device under /dev
+ self.dev_path = self._MapVolumeToBlockdev(self.unique_id)
+
+ try:
+ st = os.stat(self.dev_path)
+ except OSError, err:
+ logging.error("Error stat()'ing %s: %s", self.dev_path, str(err))
+ return False
+
+ if not stat.S_ISBLK(st.st_mode):
+ logging.error("%s is not a block device", self.dev_path)
+ return False
+
+ self.major = os.major(st.st_rdev)
+ self.minor = os.minor(st.st_rdev)
+ self.attached = True
+
+ return True
+
+ def _MapVolumeToBlockdev(self, unique_id):
+ """Maps existing rbd volumes to block devices.
+
+ This method should be idempotent if the mapping already exists.
+
+ @rtype: string
+ @return: the block device path that corresponds to the volume
+
+ """
+ pool = self.params[constants.LDP_POOL]
+ name = unique_id[1]
+
+ # Check if the mapping already exists.
+ rbd_dev = self._VolumeToBlockdev(pool, name)
+ if rbd_dev:
+ # The mapping exists. Return it.
+ return rbd_dev
+
+ # The mapping doesn't exist. Create it.
+ map_cmd = [constants.RBD_CMD, "map", "-p", pool, name]
+ result = utils.RunCmd(map_cmd)
+ if result.failed:
+ _ThrowError("rbd map failed (%s): %s",
+ result.fail_reason, result.output)
+
+ # Find the corresponding rbd device.
+ rbd_dev = self._VolumeToBlockdev(pool, name)
+ if not rbd_dev:
+ _ThrowError("rbd map succeeded, but could not find the rbd block"
+ " device in output of showmapped, for volume: %s", name)
+
+ # The device was successfully mapped. Return it.
+ return rbd_dev
+
+ @classmethod
+ def _VolumeToBlockdev(cls, pool, volume_name):
+ """Do the 'volume name'-to-'rbd block device' resolving.
+
+ @type pool: string
+ @param pool: RADOS pool to use
+ @type volume_name: string
+ @param volume_name: the name of the volume whose device we search for
+ @rtype: string or None
+ @return: block device path if the volume is mapped, else None
+
+ """
+ try:
+ # Newer versions of the rbd tool support json output formatting. Use it
+ # if available.
+ showmap_cmd = [
+ constants.RBD_CMD,
+ "showmapped",
+ "-p",
+ pool,
+ "--format",
+ "json"
+ ]
+ result = utils.RunCmd(showmap_cmd)
+ if result.failed:
+ logging.error("rbd JSON output formatting returned error (%s): %s,"
+ "falling back to plain output parsing",
+ result.fail_reason, result.output)
+ raise RbdShowmappedJsonError
+
+ return cls._ParseRbdShowmappedJson(result.output, volume_name)
+ except RbdShowmappedJsonError:
+ # For older versions of rbd, we have to parse the plain / text output
+ # manually.
+ showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool]
+ result = utils.RunCmd(showmap_cmd)
+ if result.failed:
+ _ThrowError("rbd showmapped failed (%s): %s",
+ result.fail_reason, result.output)
+
+ return cls._ParseRbdShowmappedPlain(result.output, volume_name)
+
+ @staticmethod
+ def _ParseRbdShowmappedJson(output, volume_name):
+ """Parse the json output of `rbd showmapped'.
+
+ This method parses the json output of `rbd showmapped' and returns the rbd
+ block device path (e.g. /dev/rbd0) that matches the given rbd volume.
+
+ @type output: string
+ @param output: the json output of `rbd showmapped'
+ @type volume_name: string
+ @param volume_name: the name of the volume whose device we search for
+ @rtype: string or None
+ @return: block device path if the volume is mapped, else None
+
+ """
+ try:
+ devices = serializer.LoadJson(output)
+ except ValueError, err:
+ _ThrowError("Unable to parse JSON data: %s" % err)
+
+ rbd_dev = None
+ for d in devices.values(): # pylint: disable=E1103
+ try:
+ name = d["name"]
+ except KeyError:
+ _ThrowError("'name' key missing from json object %s", devices)
+
+ if name == volume_name:
+ if rbd_dev is not None:
+ _ThrowError("rbd volume %s is mapped more than once", volume_name)
+
+ rbd_dev = d["device"]
+
+ return rbd_dev
+
+ @staticmethod
+ def _ParseRbdShowmappedPlain(output, volume_name):
+ """Parse the (plain / text) output of `rbd showmapped'.
+
+ This method parses the output of `rbd showmapped' and returns
+ the rbd block device path (e.g. /dev/rbd0) that matches the
+ given rbd volume.
+
+ @type output: string
+ @param output: the plain text output of `rbd showmapped'
+ @type volume_name: string
+ @param volume_name: the name of the volume whose device we search for
+ @rtype: string or None
+ @return: block device path if the volume is mapped, else None
+
+ """
+ allfields = 5
+ volumefield = 2
+ devicefield = 4
+
+ lines = output.splitlines()
+
+ # Try parsing the new output format (ceph >= 0.55).
+ splitted_lines = map(lambda l: l.split(), lines)
+
+ # Check for empty output.
+ if not splitted_lines:
+ return None
+
+ # Check showmapped output, to determine number of fields.
+ field_cnt = len(splitted_lines[0])
+ if field_cnt != allfields:
+ # Parsing the new format failed. Fallback to parsing the old output
+ # format (< 0.55).
+ splitted_lines = map(lambda l: l.split("\t"), lines)
+ if field_cnt != allfields:
+ _ThrowError("Cannot parse rbd showmapped output expected %s fields,"
+ " found %s", allfields, field_cnt)
+
+ matched_lines = \
+ filter(lambda l: len(l) == allfields and l[volumefield] == volume_name,
+ splitted_lines)
+
+ if len(matched_lines) > 1:
+ _ThrowError("rbd volume %s mapped more than once", volume_name)
+
+ if matched_lines:
+ # rbd block device found. Return it.
+ rbd_dev = matched_lines[0][devicefield]
+ return rbd_dev
+
+ # The given volume is not mapped.
+ return None
+
+ def Assemble(self):
+ """Assemble the device.
+
+ """
+ pass
+
+ def Shutdown(self):
+ """Shutdown the device.
+
+ """
+ if not self.minor and not self.Attach():
+ # The rbd device doesn't exist.
+ return
+
+ # Unmap the block device from the Volume.
+ self._UnmapVolumeFromBlockdev(self.unique_id)
+
+ self.minor = None
+ self.dev_path = None
+
+ def _UnmapVolumeFromBlockdev(self, unique_id):
+ """Unmaps the rbd device from the Volume it is mapped.
+
+ Unmaps the rbd device from the Volume it was previously mapped to.
+ This method should be idempotent if the Volume isn't mapped.
+
+ """
+ pool = self.params[constants.LDP_POOL]
+ name = unique_id[1]
+
+ # Check if the mapping already exists.
+ rbd_dev = self._VolumeToBlockdev(pool, name)
+
+ if rbd_dev:
+ # The mapping exists. Unmap the rbd device.
+ unmap_cmd = [constants.RBD_CMD, "unmap", "%s" % rbd_dev]
+ result = utils.RunCmd(unmap_cmd)
+ if result.failed:
+ _ThrowError("rbd unmap failed (%s): %s",
+ result.fail_reason, result.output)
+
+ def Open(self, force=False):
+ """Make the device ready for I/O.
+
+ """
+ pass
+
+ def Close(self):
+ """Notifies that the device will no longer be used for I/O.
+
+ """
+ pass
+
+ def Grow(self, amount, dryrun, backingstore):
+ """Grow the Volume.
+
+ @type amount: integer
+ @param amount: the amount (in mebibytes) to grow with
+ @type dryrun: boolean
+ @param dryrun: whether to execute the operation in simulation mode
+ only, without actually increasing the size
+
+ """
+ if not backingstore:
+ return
+ if not self.Attach():
+ _ThrowError("Can't attach to rbd device during Grow()")
+
+ if dryrun:
+ # the rbd tool does not support dry runs of resize operations.
+ # Since rbd volumes are thinly provisioned, we assume
+ # there is always enough free space for the operation.
+ return
+
+ rbd_pool = self.params[constants.LDP_POOL]
+ rbd_name = self.unique_id[1]
+ new_size = self.size + amount
+
+ # Resize the rbd volume (Image) inside the RADOS cluster.
+ cmd = [constants.RBD_CMD, "resize", "-p", rbd_pool,
+ rbd_name, "--size", "%s" % new_size]
+ result = utils.RunCmd(cmd)
+ if result.failed:
+ _ThrowError("rbd resize failed (%s): %s",
+ result.fail_reason, result.output)
+
+
+class ExtStorageDevice(BlockDev):
+ """A block device provided by an ExtStorage Provider.
+
+ This class implements the External Storage Interface, which means
+ handling of the externally provided block devices.
+
+ """
+ def __init__(self, unique_id, children, size, params):
+ """Attaches to an extstorage block device.
+
+ """
+ super(ExtStorageDevice, self).__init__(unique_id, children, size, params)
+ if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
+ raise ValueError("Invalid configuration data %s" % str(unique_id))
+
+ self.driver, self.vol_name = unique_id
+ self.ext_params = params
+
+ self.major = self.minor = None
+ self.Attach()
+
+ @classmethod
+ def Create(cls, unique_id, children, size, params, excl_stor):
+ """Create a new extstorage device.
+
+ Provision a new volume using an extstorage provider, which will
+ then be mapped to a block device.
+
+ """
+ if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
+ raise errors.ProgrammerError("Invalid configuration data %s" %
+ str(unique_id))
+ if excl_stor:
+ raise errors.ProgrammerError("extstorage device requested with"
+ " exclusive_storage")
+
+ # Call the External Storage's create script,
+ # to provision a new Volume inside the External Storage
+ _ExtStorageAction(constants.ES_ACTION_CREATE, unique_id,
+ params, str(size))
+
+ return ExtStorageDevice(unique_id, children, size, params)
+
+ def Remove(self):
+ """Remove the extstorage device.
+
+ """
+ if not self.minor and not self.Attach():
+ # The extstorage device doesn't exist.
+ return
+
+ # First shutdown the device (remove mappings).
+ self.Shutdown()
+
+ # Call the External Storage's remove script,
+ # to remove the Volume from the External Storage
+ _ExtStorageAction(constants.ES_ACTION_REMOVE, self.unique_id,
+ self.ext_params)
+
+ def Rename(self, new_id):
+ """Rename this device.
+
+ """
+ pass
+
+ def Attach(self):
+ """Attach to an existing extstorage device.
+
+ This method maps the extstorage volume that matches our name with
+ a corresponding block device and then attaches to this device.
+
+ """
+ self.attached = False
+
+ # Call the External Storage's attach script,
+ # to attach an existing Volume to a block device under /dev
+ self.dev_path = _ExtStorageAction(constants.ES_ACTION_ATTACH,
+ self.unique_id, self.ext_params)
+
+ try:
+ st = os.stat(self.dev_path)
+ except OSError, err:
+ logging.error("Error stat()'ing %s: %s", self.dev_path, str(err))
+ return False
+
+ if not stat.S_ISBLK(st.st_mode):
+ logging.error("%s is not a block device", self.dev_path)
+ return False
+
+ self.major = os.major(st.st_rdev)
+ self.minor = os.minor(st.st_rdev)
+ self.attached = True
+
+ return True
+
+ def Assemble(self):
+ """Assemble the device.
+
+ """
+ pass
+
+ def Shutdown(self):
+ """Shutdown the device.
+
+ """
+ if not self.minor and not self.Attach():
+ # The extstorage device doesn't exist.
+ return
+
+ # Call the External Storage's detach script,
+ # to detach an existing Volume from it's block device under /dev
+ _ExtStorageAction(constants.ES_ACTION_DETACH, self.unique_id,
+ self.ext_params)
+
+ self.minor = None
+ self.dev_path = None
+
+ def Open(self, force=False):
+ """Make the device ready for I/O.
+
+ """
+ pass
+
+ def Close(self):
+ """Notifies that the device will no longer be used for I/O.
+
+ """
+ pass
+
+ def Grow(self, amount, dryrun, backingstore):
+ """Grow the Volume.
+
+ @type amount: integer
+ @param amount: the amount (in mebibytes) to grow with
+ @type dryrun: boolean
+ @param dryrun: whether to execute the operation in simulation mode
+ only, without actually increasing the size
+
+ """
+ if not backingstore:
+ return
+ if not self.Attach():
+ _ThrowError("Can't attach to extstorage device during Grow()")
+
+ if dryrun:
+ # we do not support dry runs of resize operations for now.
+ return
+
+ new_size = self.size + amount
+
+ # Call the External Storage's grow script,
+ # to grow an existing Volume inside the External Storage
+ _ExtStorageAction(constants.ES_ACTION_GROW, self.unique_id,
+ self.ext_params, str(self.size), grow=str(new_size))
+
+ def SetInfo(self, text):
+ """Update metadata with info text.
+
+ """
+ # Replace invalid characters
+ text = re.sub("^[^A-Za-z0-9_+.]", "_", text)
+ text = re.sub("[^-A-Za-z0-9_+.]", "_", text)
+
+ # Only up to 128 characters are allowed
+ text = text[:128]
+
+ # Call the External Storage's setinfo script,
+ # to set metadata for an existing Volume inside the External Storage
+ _ExtStorageAction(constants.ES_ACTION_SETINFO, self.unique_id,
+ self.ext_params, metadata=text)
+
+
+def _ExtStorageAction(action, unique_id, ext_params,
+ size=None, grow=None, metadata=None):
+ """Take an External Storage action.
+
+ Take an External Storage action concerning or affecting
+ a specific Volume inside the External Storage.
+
+ @type action: string
+ @param action: which action to perform. One of:
+ create / remove / grow / attach / detach
+ @type unique_id: tuple (driver, vol_name)
+ @param unique_id: a tuple containing the type of ExtStorage (driver)
+ and the Volume name
+ @type ext_params: dict
+ @param ext_params: ExtStorage parameters
+ @type size: integer
+ @param size: the size of the Volume in mebibytes
+ @type grow: integer
+ @param grow: the new size in mebibytes (after grow)
+ @type metadata: string
+ @param metadata: metadata info of the Volume, for use by the provider
+ @rtype: None or a block device path (during attach)
+
+ """
+ driver, vol_name = unique_id
+
+ # Create an External Storage instance of type `driver'
+ status, inst_es = ExtStorageFromDisk(driver)
+ if not status:
+ _ThrowError("%s" % inst_es)
+
+ # Create the basic environment for the driver's scripts
+ create_env = _ExtStorageEnvironment(unique_id, ext_params, size,
+ grow, metadata)
+
+ # Do not use log file for action `attach' as we need
+ # to get the output from RunResult
+ # TODO: find a way to have a log file for attach too
+ logfile = None
+ if action is not constants.ES_ACTION_ATTACH:
+ logfile = _VolumeLogName(action, driver, vol_name)
+
+ # Make sure the given action results in a valid script
+ if action not in constants.ES_SCRIPTS:
+ _ThrowError("Action '%s' doesn't result in a valid ExtStorage script" %
+ action)
+
+ # Find out which external script to run according the given action
+ script_name = action + "_script"
+ script = getattr(inst_es, script_name)
+
+ # Run the external script
+ result = utils.RunCmd([script], env=create_env,
+ cwd=inst_es.path, output=logfile,)
+ if result.failed:
+ logging.error("External storage's %s command '%s' returned"
+ " error: %s, logfile: %s, output: %s",
+ action, result.cmd, result.fail_reason,
+ logfile, result.output)
+
+ # If logfile is 'None' (during attach), it breaks TailFile
+ # TODO: have a log file for attach too
+ if action is not constants.ES_ACTION_ATTACH:
+ lines = [utils.SafeEncode(val)
+ for val in utils.TailFile(logfile, lines=20)]
+ else:
+ lines = result.output[-20:]
+
+ _ThrowError("External storage's %s script failed (%s), last"
+ " lines of output:\n%s",
+ action, result.fail_reason, "\n".join(lines))
+
+ if action == constants.ES_ACTION_ATTACH:
+ return result.stdout
+
+
+def ExtStorageFromDisk(name, base_dir=None):
+ """Create an ExtStorage instance from disk.
+
+ This function will return an ExtStorage instance
+ if the given name is a valid ExtStorage name.
+
+ @type base_dir: string
+ @keyword base_dir: Base directory containing ExtStorage installations.
+ Defaults to a search in all the ES_SEARCH_PATH dirs.
+ @rtype: tuple
+ @return: True and the ExtStorage instance if we find a valid one, or
+ False and the diagnose message on error
+
+ """
+ if base_dir is None:
+ es_base_dir = pathutils.ES_SEARCH_PATH
+ else:
+ es_base_dir = [base_dir]
+
+ es_dir = utils.FindFile(name, es_base_dir, os.path.isdir)
+
+ if es_dir is None:
+ return False, ("Directory for External Storage Provider %s not"
+ " found in search path" % name)
+
+ # ES Files dictionary, we will populate it with the absolute path
+ # names; if the value is True, then it is a required file, otherwise
+ # an optional one
+ es_files = dict.fromkeys(constants.ES_SCRIPTS, True)
+
+ es_files[constants.ES_PARAMETERS_FILE] = True
+
+ for (filename, _) in es_files.items():
+ es_files[filename] = utils.PathJoin(es_dir, filename)
+
+ try:
+ st = os.stat(es_files[filename])
+ except EnvironmentError, err:
+ return False, ("File '%s' under path '%s' is missing (%s)" %
+ (filename, es_dir, utils.ErrnoOrStr(err)))
+
+ if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
+ return False, ("File '%s' under path '%s' is not a regular file" %
+ (filename, es_dir))
+
+ if filename in constants.ES_SCRIPTS:
+ if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
+ return False, ("File '%s' under path '%s' is not executable" %
+ (filename, es_dir))
+
+ parameters = []
+ if constants.ES_PARAMETERS_FILE in es_files:
+ parameters_file = es_files[constants.ES_PARAMETERS_FILE]
+ try:
+ parameters = utils.ReadFile(parameters_file).splitlines()
+ except EnvironmentError, err:
+ return False, ("Error while reading the EXT parameters file at %s: %s" %
+ (parameters_file, utils.ErrnoOrStr(err)))
+ parameters = [v.split(None, 1) for v in parameters]
+
+ es_obj = \
+ objects.ExtStorage(name=name, path=es_dir,
+ create_script=es_files[constants.ES_SCRIPT_CREATE],
+ remove_script=es_files[constants.ES_SCRIPT_REMOVE],
+ grow_script=es_files[constants.ES_SCRIPT_GROW],
+ attach_script=es_files[constants.ES_SCRIPT_ATTACH],
+ detach_script=es_files[constants.ES_SCRIPT_DETACH],
+ setinfo_script=es_files[constants.ES_SCRIPT_SETINFO],
+ verify_script=es_files[constants.ES_SCRIPT_VERIFY],
+ supported_parameters=parameters)
+ return True, es_obj
+
+
+def _ExtStorageEnvironment(unique_id, ext_params,
+ size=None, grow=None, metadata=None):
+ """Calculate the environment for an External Storage script.
+
+ @type unique_id: tuple (driver, vol_name)
+ @param unique_id: ExtStorage pool and name of the Volume
+ @type ext_params: dict
+ @param ext_params: the EXT parameters
+ @type size: string
+ @param size: size of the Volume (in mebibytes)
+ @type grow: string
+ @param grow: new size of Volume after grow (in mebibytes)
+ @type metadata: string
+ @param metadata: metadata info of the Volume
+ @rtype: dict
+ @return: dict of environment variables
+
+ """
+ vol_name = unique_id[1]
+
+ result = {}
+ result["VOL_NAME"] = vol_name
+
+ # EXT params
+ for pname, pvalue in ext_params.items():
+ result["EXTP_%s" % pname.upper()] = str(pvalue)
+
+ if size is not None:
+ result["VOL_SIZE"] = size
+
+ if grow is not None:
+ result["VOL_NEW_SIZE"] = grow
+
+ if metadata is not None:
+ result["VOL_METADATA"] = metadata
+
+ return result
+
+
+def _VolumeLogName(kind, es_name, volume):
+ """Compute the ExtStorage log filename for a given Volume and operation.
+
+ @type kind: string
+ @param kind: the operation type (e.g. create, remove etc.)
+ @type es_name: string
+ @param es_name: the ExtStorage name
+ @type volume: string
+ @param volume: the name of the Volume inside the External Storage
+
+ """
+ # Check if the extstorage log dir is a valid dir
+ if not os.path.isdir(pathutils.LOG_ES_DIR):
+ _ThrowError("Cannot find log directory: %s", pathutils.LOG_ES_DIR)
+
+ # TODO: Use tempfile.mkstemp to create unique filename
+ base = ("%s-%s-%s-%s.log" %
+ (kind, es_name, volume, utils.TimestampForFilename()))
+ return utils.PathJoin(pathutils.LOG_ES_DIR, base)
+
+
DEV_MAP = {
constants.LD_LV: LogicalVolume,
constants.LD_DRBD8: DRBD8,
constants.LD_BLOCKDEV: PersistentBlockDevice,
+ constants.LD_RBD: RADOSBlockDevice,
+ constants.LD_EXT: ExtStorageDevice,
}
if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
raise errors.ProgrammerError("Invalid block device type '%s'" % dev_type)
+def _VerifyDiskParams(disk):
+ """Verifies if all disk parameters are set.
+
+ """
+ missing = set(constants.DISK_LD_DEFAULTS[disk.dev_type]) - set(disk.params)
+ if missing:
+ raise errors.ProgrammerError("Block device is missing disk parameters: %s" %
+ missing)
+
+
def FindDevice(disk, children):
"""Search for an existing, assembled device.
"""
_VerifyDiskType(disk.dev_type)
- dev_params = objects.FillDict(constants.DISK_LD_DEFAULTS[disk.dev_type],
- disk.params)
device = DEV_MAP[disk.dev_type](disk.physical_id, children, disk.size,
- dev_params)
+ disk.params)
if not device.attached:
return None
return device
"""
_VerifyDiskType(disk.dev_type)
- dev_params = objects.FillDict(constants.DISK_LD_DEFAULTS[disk.dev_type],
- disk.params)
+ _VerifyDiskParams(disk)
device = DEV_MAP[disk.dev_type](disk.physical_id, children, disk.size,
- dev_params)
+ disk.params)
device.Assemble()
return device
-def Create(disk, children):
+def Create(disk, children, excl_stor):
"""Create a device.
@type disk: L{objects.Disk}
@type children: list of L{bdev.BlockDev}
@param children: the list of block devices that are children of the device
represented by the disk parameter
+ @type excl_stor: boolean
+ @param excl_stor: Whether exclusive_storage is active
"""
_VerifyDiskType(disk.dev_type)
- dev_params = objects.FillDict(constants.DISK_LD_DEFAULTS[disk.dev_type],
- disk.params)
+ _VerifyDiskParams(disk)
device = DEV_MAP[disk.dev_type].Create(disk.physical_id, children, disk.size,
- dev_params)
+ disk.params, excl_stor)
return device