#
#
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
from ganeti import utils
from ganeti import errors
from ganeti import constants
+from ganeti import objects
+from ganeti import compat
+from ganeti import netutils
+
+
+# Size of reads in _CanReadDevice
+_DEVICE_READ_SIZE = 128 * 1024
def _IgnoreError(fn, *args, **kwargs):
fn(*args, **kwargs)
return True
except errors.BlockDeviceError, err:
- logging.warning("Caught BlockDeviceError but ignoring: %s" % str(err))
+ logging.warning("Caught BlockDeviceError but ignoring: %s", str(err))
return False
raise errors.BlockDeviceError(msg)
+def _CanReadDevice(path):
+ """Check if we can read from the given device.
+
+ This tries to read the first 128k of the device.
+
+ """
+ try:
+ utils.ReadFile(path, size=_DEVICE_READ_SIZE)
+ return True
+ except EnvironmentError:
+ logging.warning("Can't read from device %s", path, exc_info=True)
+ return False
+
+
class BlockDev(object):
"""Block device abstract class.
after assembly we'll have our correct major/minor.
"""
- def __init__(self, unique_id, children):
+ def __init__(self, unique_id, children, size):
self._children = children
self.dev_path = None
self.unique_id = unique_id
self.major = None
self.minor = None
self.attached = False
+ self.size = size
def Assemble(self):
"""Assemble the device from its components.
"""Remove this device.
This makes sense only for some of the device types: LV and file
- storeage. Also note that if the device can't attach, the removal
+ storage. Also note that if the device can't attach, the removal
can't be completed.
"""
result = result and child.SetSyncSpeed(speed)
return result
+ def PauseResumeSync(self, pause):
+ """Pause/Resume the sync of the mirror.
+
+ In case this is not a mirroring device, this is no-op.
+
+ @param pause: Wheater to pause or resume
+
+ """
+ result = True
+ if self._children:
+ for child in self._children:
+ result = result and child.PauseResumeSync(pause)
+ return result
+
def GetSyncStatus(self):
"""Returns the sync status of the device.
data. This is only valid for some devices, the rest will always
return False (not degraded).
- @rtype: tuple
- @return: (sync_percent, estimated_time, is_degraded, ldisk)
+ @rtype: objects.BlockDevStatus
"""
- return None, None, False, False
-
+ return objects.BlockDevStatus(dev_path=self.dev_path,
+ major=self.major,
+ minor=self.minor,
+ sync_percent=None,
+ estimated_time=None,
+ is_degraded=False,
+ ldisk_status=constants.LDS_OKAY)
def CombinedSyncStatus(self):
"""Calculate the mirror status recursively for our children.
minimum percent and maximum time are calculated across our
children.
+ @rtype: objects.BlockDevStatus
+
"""
- min_percent, max_time, is_degraded, ldisk = self.GetSyncStatus()
+ status = self.GetSyncStatus()
+
+ min_percent = status.sync_percent
+ max_time = status.estimated_time
+ is_degraded = status.is_degraded
+ ldisk_status = status.ldisk_status
+
if self._children:
for child in self._children:
- c_percent, c_time, c_degraded, c_ldisk = child.GetSyncStatus()
+ child_status = child.GetSyncStatus()
+
if min_percent is None:
- min_percent = c_percent
- elif c_percent is not None:
- min_percent = min(min_percent, c_percent)
+ min_percent = child_status.sync_percent
+ elif child_status.sync_percent is not None:
+ min_percent = min(min_percent, child_status.sync_percent)
+
if max_time is None:
- max_time = c_time
- elif c_time is not None:
- max_time = max(max_time, c_time)
- is_degraded = is_degraded or c_degraded
- ldisk = ldisk or c_ldisk
- return min_percent, max_time, is_degraded, ldisk
+ max_time = child_status.estimated_time
+ elif child_status.estimated_time is not None:
+ max_time = max(max_time, child_status.estimated_time)
+
+ is_degraded = is_degraded or child_status.is_degraded
+
+ if ldisk_status is None:
+ ldisk_status = child_status.ldisk_status
+ elif child_status.ldisk_status is not None:
+ ldisk_status = max(ldisk_status, child_status.ldisk_status)
+
+ return objects.BlockDevStatus(dev_path=self.dev_path,
+ major=self.major,
+ minor=self.minor,
+ sync_percent=min_percent,
+ estimated_time=max_time,
+ is_degraded=is_degraded,
+ ldisk_status=ldisk_status)
def SetInfo(self, text):
"""
raise NotImplementedError
+ def GetActualSize(self):
+ """Return the actual disk size.
+
+ @note: the device needs to be active when this is called
+
+ """
+ assert self.attached, "BlockDevice not attached in GetActualSize()"
+ result = utils.RunCmd(["blockdev", "--getsize64", self.dev_path])
+ if result.failed:
+ _ThrowError("blockdev failed (%s): %s",
+ result.fail_reason, result.output)
+ try:
+ sz = int(result.output.strip())
+ except (ValueError, TypeError), err:
+ _ThrowError("Failed to parse blockdev output: %s", str(err))
+ return sz
+
def __repr__(self):
return ("<%s: unique_id: %s, children: %s, %s:%s, %s>" %
(self.__class__, self.unique_id, self._children,
"""Logical Volume block device.
"""
- def __init__(self, unique_id, children):
+ _VALID_NAME_RE = re.compile("^[a-zA-Z0-9+_.-]*$")
+ _INVALID_NAMES = frozenset([".", "..", "snapshot", "pvmove"])
+ _INVALID_SUBSTRINGS = frozenset(["_mlog", "_mimage"])
+
+ def __init__(self, unique_id, children, size):
"""Attaches to a LV device.
The unique_id is a tuple (vg_name, lv_name)
"""
- super(LogicalVolume, self).__init__(unique_id, children)
+ super(LogicalVolume, self).__init__(unique_id, children, size)
if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
raise ValueError("Invalid configuration data %s" % str(unique_id))
self._vg_name, self._lv_name = unique_id
- self.dev_path = "/dev/%s/%s" % (self._vg_name, self._lv_name)
+ self._ValidateName(self._vg_name)
+ self._ValidateName(self._lv_name)
+ self.dev_path = utils.PathJoin("/dev", self._vg_name, self._lv_name)
self._degraded = True
- self.major = self.minor = None
+ self.major = self.minor = self.pe_size = self.stripe_count = None
self.Attach()
@classmethod
raise errors.ProgrammerError("Invalid configuration data %s" %
str(unique_id))
vg_name, lv_name = unique_id
- pvs_info = cls.GetPVInfo(vg_name)
+ cls._ValidateName(vg_name)
+ cls._ValidateName(lv_name)
+ pvs_info = cls.GetPVInfo([vg_name])
if not pvs_info:
_ThrowError("Can't compute PV info for vg %s", vg_name)
pvs_info.sort()
pvs_info.reverse()
pvlist = [ pv[1] for pv in pvs_info ]
+ if compat.any(":" in v for v in pvlist):
+ _ThrowError("Some of your PVs have the invalid character ':' in their"
+ " name, this is not supported - please filter them out"
+ " in lvm.conf using either 'filter' or 'preferred_names'")
free_size = sum([ pv[0] for pv in pvs_info ])
+ current_pvs = len(pvlist)
+ stripes = min(current_pvs, constants.LVM_STRIPECOUNT)
# The size constraint should have been checked from the master before
# calling the create function.
if free_size < size:
_ThrowError("Not enough free space: required %s,"
" available %s", size, free_size)
- result = utils.RunCmd(["lvcreate", "-L%dm" % size, "-n%s" % lv_name,
- vg_name] + pvlist)
+ cmd = ["lvcreate", "-L%dm" % size, "-n%s" % lv_name]
+ # If the free space is not well distributed, we won't be able to
+ # create an optimally-striped volume; in that case, we want to try
+ # with N, N-1, ..., 2, and finally 1 (non-stripped) number of
+ # stripes
+ for stripes_arg in range(stripes, 0, -1):
+ result = utils.RunCmd(cmd + ["-i%d" % stripes_arg] + [vg_name] + pvlist)
+ if not result.failed:
+ break
if result.failed:
_ThrowError("LV create failed (%s): %s",
result.fail_reason, result.output)
- return LogicalVolume(unique_id, children)
+ return LogicalVolume(unique_id, children, size)
@staticmethod
- def GetPVInfo(vg_name):
+ def _GetVolumeInfo(lvm_cmd, fields):
+ """Returns LVM Volumen infos using lvm_cmd
+
+ @param lvm_cmd: Should be one of "pvs", "vgs" or "lvs"
+ @param fields: Fields to return
+ @return: A list of dicts each with the parsed fields
+
+ """
+ if not fields:
+ raise errors.ProgrammerError("No fields specified")
+
+ sep = "|"
+ cmd = [lvm_cmd, "--noheadings", "--nosuffix", "--units=m", "--unbuffered",
+ "--separator=%s" % sep, "-o%s" % ",".join(fields)]
+
+ result = utils.RunCmd(cmd)
+ if result.failed:
+ raise errors.CommandError("Can't get the volume information: %s - %s" %
+ (result.fail_reason, result.output))
+
+ data = []
+ for line in result.stdout.splitlines():
+ splitted_fields = line.strip().split(sep)
+
+ if len(fields) != len(splitted_fields):
+ raise errors.CommandError("Can't parse %s output: line '%s'" %
+ (lvm_cmd, line))
+
+ data.append(splitted_fields)
+
+ return data
+
+ @classmethod
+ def GetPVInfo(cls, vg_names, filter_allocatable=True):
"""Get the free space info for PVs in a volume group.
- @param vg_name: the volume group name
+ @param vg_names: list of volume group names, if empty all will be returned
+ @param filter_allocatable: whether to skip over unallocatable PVs
@rtype: list
@return: list of tuples (free_space, name) with free_space in mebibytes
"""
- command = ["pvs", "--noheadings", "--nosuffix", "--units=m",
- "-opv_name,vg_name,pv_free,pv_attr", "--unbuffered",
- "--separator=:"]
- result = utils.RunCmd(command)
- if result.failed:
- logging.error("Can't get the PV information: %s - %s",
- result.fail_reason, result.output)
+ try:
+ info = cls._GetVolumeInfo("pvs", ["pv_name", "vg_name", "pv_free",
+ "pv_attr"])
+ except errors.GenericError, err:
+ logging.error("Can't get PV information: %s", err)
return None
+
data = []
- for line in result.stdout.splitlines():
- fields = line.strip().split(':')
- if len(fields) != 4:
- logging.error("Can't parse pvs output: line '%s'", line)
- return None
- # skip over pvs from another vg or ones which are not allocatable
- if fields[1] != vg_name or fields[3][0] != 'a':
+ for pv_name, vg_name, pv_free, pv_attr in info:
+ # (possibly) skip over pvs which are not allocatable
+ if filter_allocatable and pv_attr[0] != "a":
+ continue
+ # (possibly) skip over pvs which are not in the right volume group(s)
+ if vg_names and vg_name not in vg_names:
+ continue
+ data.append((float(pv_free), pv_name, vg_name))
+
+ return data
+
+ @classmethod
+ def GetVGInfo(cls, vg_names, filter_readonly=True):
+ """Get the free space info for specific VGs.
+
+ @param vg_names: list of volume group names, if empty all will be returned
+ @param filter_readonly: whether to skip over readonly VGs
+
+ @rtype: list
+ @return: list of tuples (free_space, total_size, name) with free_space in
+ MiB
+
+ """
+ try:
+ info = cls._GetVolumeInfo("vgs", ["vg_name", "vg_free", "vg_attr",
+ "vg_size"])
+ except errors.GenericError, err:
+ logging.error("Can't get VG information: %s", err)
+ return None
+
+ data = []
+ for vg_name, vg_free, vg_attr, vg_size in info:
+ # (possibly) skip over vgs which are not writable
+ if filter_readonly and vg_attr[0] == "r":
continue
- data.append((float(fields[2]), fields[0]))
+ # (possibly) skip over vgs which are not in the right volume group(s)
+ if vg_names and vg_name not in vg_names:
+ continue
+ data.append((float(vg_free), float(vg_size), vg_name))
return data
+ @classmethod
+ def _ValidateName(cls, name):
+ """Validates that a given name is valid as VG or LV name.
+
+ The list of valid characters and restricted names is taken out of
+ the lvm(8) manpage, with the simplification that we enforce both
+ VG and LV restrictions on the names.
+
+ """
+ if (not cls._VALID_NAME_RE.match(name) or
+ name in cls._INVALID_NAMES or
+ compat.any(substring in name for substring in cls._INVALID_SUBSTRINGS)):
+ _ThrowError("Invalid LVM name '%s'", name)
+
def Remove(self):
"""Remove this logical volume.
if result.failed:
_ThrowError("Failed to rename the logical volume: %s", result.output)
self._lv_name = new_name
- self.dev_path = "/dev/%s/%s" % (self._vg_name, self._lv_name)
+ self.dev_path = utils.PathJoin("/dev", self._vg_name, self._lv_name)
def Attach(self):
"""Attach to an existing LV.
"""
self.attached = False
result = utils.RunCmd(["lvs", "--noheadings", "--separator=,",
- "-olv_attr,lv_kernel_major,lv_kernel_minor",
- self.dev_path])
+ "--units=m", "--nosuffix",
+ "-olv_attr,lv_kernel_major,lv_kernel_minor,"
+ "vg_extent_size,stripes", self.dev_path])
if result.failed:
logging.error("Can't find LV %s: %s, %s",
self.dev_path, result.fail_reason, result.output)
return False
- out = result.stdout.strip().rstrip(',')
+ # the output can (and will) have multiple lines for multi-segment
+ # LVs, as the 'stripes' parameter is a segment one, so we take
+ # only the last entry, which is the one we're interested in; note
+ # that with LVM2 anyway the 'stripes' value must be constant
+ # across segments, so this is a no-op actually
+ out = result.stdout.splitlines()
+ if not out: # totally empty result? splitlines() returns at least
+ # one line for any non-empty string
+ logging.error("Can't parse LVS output, no lines? Got '%s'", str(out))
+ return False
+ out = out[-1].strip().rstrip(',')
out = out.split(",")
- if len(out) != 3:
- logging.error("Can't parse LVS output, len(%s) != 3", str(out))
+ if len(out) != 5:
+ logging.error("Can't parse LVS output, len(%s) != 5", str(out))
return False
- status, major, minor = out[:3]
+ status, major, minor, pe_size, stripes = out
if len(status) != 6:
logging.error("lvs lv_attr is not 6 characters (%s)", status)
return False
try:
major = int(major)
minor = int(minor)
- except ValueError, err:
+ except (TypeError, ValueError), err:
logging.error("lvs major/minor cannot be parsed: %s", str(err))
+ try:
+ pe_size = int(float(pe_size))
+ except (TypeError, ValueError), err:
+ logging.error("Can't parse vg extent size: %s", err)
+ return False
+
+ try:
+ stripes = int(stripes)
+ except (TypeError, ValueError), err:
+ logging.error("Can't parse the number of stripes: %s", err)
+ return False
+
self.major = major
self.minor = minor
+ self.pe_size = pe_size
+ self.stripe_count = stripes
self._degraded = status[0] == 'v' # virtual volume, i.e. doesn't backing
# storage
self.attached = True
def Assemble(self):
"""Assemble the device.
- We alway run `lvchange -ay` on the LV to ensure it's active before
+ We always run `lvchange -ay` on the LV to ensure it's active before
use, as there were cases when xenvg was not active after boot
(also possibly after disk issues).
The status was already read in Attach, so we just return it.
- @rtype: tuple
- @return: (sync_percent, estimated_time, is_degraded, ldisk)
+ @rtype: objects.BlockDevStatus
"""
- return None, None, self._degraded, self._degraded
+ if self._degraded:
+ ldisk_status = constants.LDS_FAULTY
+ else:
+ ldisk_status = constants.LDS_OKAY
+
+ return objects.BlockDevStatus(dev_path=self.dev_path,
+ major=self.major,
+ minor=self.minor,
+ sync_percent=None,
+ estimated_time=None,
+ is_degraded=self._degraded,
+ ldisk_status=ldisk_status)
def Open(self, force=False):
"""Make the device ready for I/O.
def Snapshot(self, size):
"""Create a snapshot copy of an lvm block device.
+ @returns: tuple (vg, lv)
+
"""
snap_name = self._lv_name + ".snap"
# remove existing snapshot if found
- snap = LogicalVolume((self._vg_name, snap_name), None)
+ snap = LogicalVolume((self._vg_name, snap_name), None, size)
_IgnoreError(snap.Remove)
- pvs_info = self.GetPVInfo(self._vg_name)
- if not pvs_info:
- _ThrowError("Can't compute PV info for vg %s", self._vg_name)
- pvs_info.sort()
- pvs_info.reverse()
- free_size, pv_name = pvs_info[0]
+ vg_info = self.GetVGInfo([self._vg_name])
+ if not vg_info:
+ _ThrowError("Can't compute VG info for vg %s", self._vg_name)
+ free_size, _, _ = vg_info[0]
if free_size < size:
_ThrowError("Not enough free space: required %s,"
" available %s", size, free_size)
_ThrowError("command: %s error: %s - %s",
result.cmd, result.fail_reason, result.output)
- return snap_name
+ return (self._vg_name, snap_name)
def SetInfo(self, text):
"""Update metadata with info text.
"""Grow the logical volume.
"""
+ if self.pe_size is None or self.stripe_count is None:
+ if not self.Attach():
+ _ThrowError("Can't attach to LV during Grow()")
+ full_stripe_size = self.pe_size * self.stripe_count
+ rest = amount % full_stripe_size
+ if rest != 0:
+ amount += full_stripe_size - rest
# we try multiple algorithms since the 'best' ones might not have
# space available in the right place, but later ones might (since
# they have less constraints); also note that only recent LVM
"""
UNCONF_RE = re.compile(r"\s*[0-9]+:\s*cs:Unconfigured$")
- LINE_RE = re.compile(r"\s*[0-9]+:\s*cs:(\S+)\s+st:([^/]+)/(\S+)"
+ LINE_RE = re.compile(r"\s*[0-9]+:\s*cs:(\S+)\s+(?:st|ro):([^/]+)/(\S+)"
"\s+ds:([^/]+)/(\S+)\s+.*$")
SYNC_RE = re.compile(r"^.*\ssync'ed:\s*([0-9.]+)%.*"
"\sfinish: ([0-9]+):([0-9]+):([0-9]+)\s.*$")
+ CS_UNCONFIGURED = "Unconfigured"
+ CS_STANDALONE = "StandAlone"
+ CS_WFCONNECTION = "WFConnection"
+ CS_WFREPORTPARAMS = "WFReportParams"
+ CS_CONNECTED = "Connected"
+ CS_STARTINGSYNCS = "StartingSyncS"
+ CS_STARTINGSYNCT = "StartingSyncT"
+ CS_WFBITMAPS = "WFBitMapS"
+ CS_WFBITMAPT = "WFBitMapT"
+ CS_WFSYNCUUID = "WFSyncUUID"
+ CS_SYNCSOURCE = "SyncSource"
+ CS_SYNCTARGET = "SyncTarget"
+ CS_PAUSEDSYNCS = "PausedSyncS"
+ CS_PAUSEDSYNCT = "PausedSyncT"
+ CSET_SYNC = frozenset([
+ CS_WFREPORTPARAMS,
+ CS_STARTINGSYNCS,
+ CS_STARTINGSYNCT,
+ CS_WFBITMAPS,
+ CS_WFBITMAPT,
+ CS_WFSYNCUUID,
+ CS_SYNCSOURCE,
+ CS_SYNCTARGET,
+ CS_PAUSEDSYNCS,
+ CS_PAUSEDSYNCT,
+ ])
+
+ DS_DISKLESS = "Diskless"
+ DS_ATTACHING = "Attaching" # transient state
+ DS_FAILED = "Failed" # transient state, next: diskless
+ DS_NEGOTIATING = "Negotiating" # transient state
+ DS_INCONSISTENT = "Inconsistent" # while syncing or after creation
+ DS_OUTDATED = "Outdated"
+ DS_DUNKNOWN = "DUnknown" # shown for peer disk when not connected
+ DS_CONSISTENT = "Consistent"
+ DS_UPTODATE = "UpToDate" # normal state
+
+ RO_PRIMARY = "Primary"
+ RO_SECONDARY = "Secondary"
+ RO_UNKNOWN = "Unknown"
+
def __init__(self, procline):
u = self.UNCONF_RE.match(procline)
if u:
- self.cstatus = "Unconfigured"
+ self.cstatus = self.CS_UNCONFIGURED
self.lrole = self.rrole = self.ldisk = self.rdisk = None
else:
m = self.LINE_RE.match(procline)
# end reading of data from the LINE_RE or UNCONF_RE
- self.is_standalone = self.cstatus == "StandAlone"
- self.is_wfconn = self.cstatus == "WFConnection"
- self.is_connected = self.cstatus == "Connected"
- self.is_primary = self.lrole == "Primary"
- self.is_secondary = self.lrole == "Secondary"
- self.peer_primary = self.rrole == "Primary"
- self.peer_secondary = self.rrole == "Secondary"
+ self.is_standalone = self.cstatus == self.CS_STANDALONE
+ self.is_wfconn = self.cstatus == self.CS_WFCONNECTION
+ self.is_connected = self.cstatus == self.CS_CONNECTED
+ self.is_primary = self.lrole == self.RO_PRIMARY
+ self.is_secondary = self.lrole == self.RO_SECONDARY
+ self.peer_primary = self.rrole == self.RO_PRIMARY
+ self.peer_secondary = self.rrole == self.RO_SECONDARY
self.both_primary = self.is_primary and self.peer_primary
self.both_secondary = self.is_secondary and self.peer_secondary
- self.is_diskless = self.ldisk == "Diskless"
- self.is_disk_uptodate = self.ldisk == "UpToDate"
+ self.is_diskless = self.ldisk == self.DS_DISKLESS
+ self.is_disk_uptodate = self.ldisk == self.DS_UPTODATE
- self.is_in_resync = self.cstatus in ("SyncSource", "SyncTarget")
- self.is_in_use = self.cstatus != "Unconfigured"
+ self.is_in_resync = self.cstatus in self.CSET_SYNC
+ self.is_in_use = self.cstatus != self.CS_UNCONFIGURED
m = self.SYNC_RE.match(procline)
if m:
seconds = int(m.group(4))
self.est_time = hours * 3600 + minutes * 60 + seconds
else:
- self.sync_percent = None
+ # we have (in this if branch) no percent information, but if
+ # we're resyncing we need to 'fake' a sync percent information,
+ # as this is how cmdlib determines if it makes sense to wait for
+ # resyncing or not
+ if self.is_in_resync:
+ self.sync_percent = 0
+ else:
+ self.sync_percent = None
self.est_time = None
- self.is_sync_target = self.peer_sync_source = self.cstatus == "SyncTarget"
- self.peer_sync_target = self.is_sync_source = self.cstatus == "SyncSource"
- self.is_resync = self.is_sync_target or self.is_sync_source
-
-class BaseDRBD(BlockDev):
+class BaseDRBD(BlockDev): # pylint: disable-msg=W0223
"""Base DRBD class.
This class contains a few bits of common functionality between the
0.7 and 8.x versions of DRBD.
"""
- _VERSION_RE = re.compile(r"^version: (\d+)\.(\d+)\.(\d+)"
+ _VERSION_RE = re.compile(r"^version: (\d+)\.(\d+)\.(\d+)(?:\.\d+)?"
r" \(api:(\d+)/proto:(\d+)(?:-(\d+))?\)")
+ _VALID_LINE_RE = re.compile("^ *([0-9]+): cs:([^ ]+).*$")
+ _UNUSED_LINE_RE = re.compile("^ *([0-9]+): cs:Unconfigured$")
_DRBD_MAJOR = 147
_ST_UNCONFIGURED = "Unconfigured"
_ST_CONNECTED = "Connected"
_STATUS_FILE = "/proc/drbd"
+ _USERMODE_HELPER_FILE = "/sys/module/drbd/parameters/usermode_helper"
@staticmethod
def _GetProcData(filename=_STATUS_FILE):
"""Return data from /proc/drbd.
"""
- stat = open(filename, "r")
try:
- data = stat.read().splitlines()
- finally:
- stat.close()
+ data = utils.ReadFile(filename).splitlines()
+ except EnvironmentError, err:
+ if err.errno == errno.ENOENT:
+ _ThrowError("The file %s cannot be opened, check if the module"
+ " is loaded (%s)", filename, str(err))
+ else:
+ _ThrowError("Can't read the DRBD proc file %s: %s", filename, str(err))
if not data:
_ThrowError("Can't read any data from %s", filename)
return data
- @staticmethod
- def _MassageProcData(data):
+ @classmethod
+ def _MassageProcData(cls, data):
"""Transform the output of _GetProdData into a nicer form.
@return: a dictionary of minor: joined lines from /proc/drbd
for that minor
"""
- lmatch = re.compile("^ *([0-9]+):.*$")
results = {}
old_minor = old_line = None
for line in data:
- lresult = lmatch.match(line)
+ if not line: # completely empty lines, as can be returned by drbd8.0+
+ continue
+ lresult = cls._VALID_LINE_RE.match(line)
if lresult is not None:
if old_minor is not None:
results[old_minor] = old_line
return results
@classmethod
- def _GetVersion(cls):
+ def _GetVersion(cls, proc_data):
"""Return the DRBD version.
This will return a dict with keys:
- proto2 (only on drbd > 8.2.X)
"""
- proc_data = cls._GetProcData()
first_line = proc_data[0].strip()
version = cls._VERSION_RE.match(first_line)
if not version:
return retval
@staticmethod
+ def GetUsermodeHelper(filename=_USERMODE_HELPER_FILE):
+ """Returns DRBD usermode_helper currently set.
+
+ """
+ try:
+ helper = utils.ReadFile(filename).splitlines()[0]
+ except EnvironmentError, err:
+ if err.errno == errno.ENOENT:
+ _ThrowError("The file %s cannot be opened, check if the module"
+ " is loaded (%s)", filename, str(err))
+ else:
+ _ThrowError("Can't read DRBD helper file %s: %s", filename, str(err))
+ if not helper:
+ _ThrowError("Can't read any data from %s", filename)
+ return helper
+
+ @staticmethod
def _DevPath(minor):
"""Return the path to a drbd device for a given minor.
data = cls._GetProcData()
used_devs = {}
- valid_line = re.compile("^ *([0-9]+): cs:([^ ]+).*$")
for line in data:
- match = valid_line.match(line)
+ match = cls._VALID_LINE_RE.match(line)
if not match:
continue
minor = int(match.group(1))
result.fail_reason, result.output)
try:
sectors = int(result.stdout)
- except ValueError:
+ except (TypeError, ValueError):
_ThrowError("Invalid output from blockdev: '%s'", result.stdout)
- bytes = sectors * 512
- if bytes < 128 * 1024 * 1024: # less than 128MiB
- _ThrowError("Meta device too small (%.2fMib)", (bytes / 1024 / 1024))
- if bytes > (128 + 32) * 1024 * 1024: # account for an extra (big) PE on LVM
- _ThrowError("Meta device too big (%.2fMiB)", (bytes / 1024 / 1024))
+ num_bytes = sectors * 512
+ if num_bytes < 128 * 1024 * 1024: # less than 128MiB
+ _ThrowError("Meta device too small (%.2fMib)", (num_bytes / 1024 / 1024))
+ # the maximum *valid* size of the meta device when living on top
+ # of LVM is hard to compute: it depends on the number of stripes
+ # and the PE size; e.g. a 2-stripe, 64MB PE will result in a 128MB
+ # (normal size), but an eight-stripe 128MB PE will result in a 1GB
+ # size meta device; as such, we restrict it to 1GB (a little bit
+ # too generous, but making assumptions about PE size is hard)
+ if num_bytes > 1024 * 1024 * 1024:
+ _ThrowError("Meta device too big (%.2fMiB)", (num_bytes / 1024 / 1024))
def Rename(self, new_id):
"""Rename a device.
# timeout constants
_NET_RECONFIG_TIMEOUT = 60
- def __init__(self, unique_id, children):
+ def __init__(self, unique_id, children, size):
if children and children.count(None) > 0:
children = []
- super(DRBD8, self).__init__(unique_id, children)
- self.major = self._DRBD_MAJOR
- version = self._GetVersion()
- if version['k_major'] != 8 :
- _ThrowError("Mismatch in DRBD kernel version and requested ganeti"
- " usage: kernel is %s.%s, ganeti wants 8.x",
- version['k_major'], version['k_minor'])
-
if len(children) not in (0, 2):
raise ValueError("Invalid configuration data %s" % str(children))
if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 6:
(self._lhost, self._lport,
self._rhost, self._rport,
self._aminor, self._secret) = unique_id
+ if children:
+ if not _CanReadDevice(children[1].dev_path):
+ logging.info("drbd%s: Ignoring unreadable meta device", self._aminor)
+ children = []
+ super(DRBD8, self).__init__(unique_id, children, size)
+ self.major = self._DRBD_MAJOR
+ version = self._GetVersion(self._GetProcData())
+ if version['k_major'] != 8 :
+ _ThrowError("Mismatch in DRBD kernel version and requested ganeti"
+ " usage: kernel is %s.%s, ganeti wants 8.x",
+ version['k_major'], version['k_minor'])
+
if (self._lhost is not None and self._lhost == self._rhost and
self._lport == self._rport):
raise ValueError("Invalid configuration data, same local/remote %s" %
"""
data = cls._GetProcData()
- unused_line = re.compile("^ *([0-9]+): cs:Unconfigured$")
- used_line = re.compile("^ *([0-9]+): cs:")
highest = None
for line in data:
- match = unused_line.match(line)
+ match = cls._UNUSED_LINE_RE.match(line)
if match:
return int(match.group(1))
- match = used_line.match(line)
+ match = cls._VALID_LINE_RE.match(line)
if match:
minor = int(match.group(1))
highest = max(highest, minor)
# pyparsing setup
lbrace = pyp.Literal("{").suppress()
rbrace = pyp.Literal("}").suppress()
+ lbracket = pyp.Literal("[").suppress()
+ rbracket = pyp.Literal("]").suppress()
semi = pyp.Literal(";").suppress()
+ colon = pyp.Literal(":").suppress()
# this also converts the value to an int
number = pyp.Word(pyp.nums).setParseAction(lambda s, l, t: int(t[0]))
# value types
value = pyp.Word(pyp.alphanums + '_-/.:')
quoted = dbl_quote + pyp.CharsNotIn('"') + dbl_quote
- addr_port = (pyp.Word(pyp.nums + '.') + pyp.Literal(':').suppress() +
- number)
+ ipv4_addr = (pyp.Optional(pyp.Literal("ipv4")).suppress() +
+ pyp.Word(pyp.nums + ".") + colon + number)
+ ipv6_addr = (pyp.Optional(pyp.Literal("ipv6")).suppress() +
+ pyp.Optional(lbracket) + pyp.Word(pyp.hexnums + ":") +
+ pyp.Optional(rbracket) + colon + number)
# meta device, extended syntax
- meta_value = ((value ^ quoted) + pyp.Literal('[').suppress() +
- number + pyp.Word(']').suppress())
+ meta_value = ((value ^ quoted) + lbracket + number + rbracket)
+ # device name, extended syntax
+ device_value = pyp.Literal("minor").suppress() + number
# a statement
stmt = (~rbrace + keyword + ~lbrace +
- pyp.Optional(addr_port ^ value ^ quoted ^ meta_value) +
+ pyp.Optional(ipv4_addr ^ ipv6_addr ^ value ^ quoted ^ meta_value ^
+ device_value) +
pyp.Optional(defa) + semi +
pyp.Optional(pyp.restOfLine).suppress())
return retval
@classmethod
- def _AssembleLocal(cls, minor, backend, meta):
+ def _AssembleLocal(cls, minor, backend, meta, size):
"""Configure the local part of a DRBD device.
"""
args = ["drbdsetup", cls._DevPath(minor), "disk",
- backend, meta, "0", "-e", "detach", "--create-device"]
+ backend, meta, "0",
+ "-e", "detach",
+ "--create-device"]
+ if size:
+ args.extend(["-d", "%sm" % size])
+ if not constants.DRBD_BARRIERS: # disable barriers, if configured so
+ version = cls._GetVersion(cls._GetProcData())
+ # various DRBD versions support different disk barrier options;
+ # what we aim here is to revert back to the 'drain' method of
+ # disk flushes and to disable metadata barriers, in effect going
+ # back to pre-8.0.7 behaviour
+ vmaj = version['k_major']
+ vmin = version['k_minor']
+ vrel = version['k_point']
+ assert vmaj == 8
+ if vmin == 0: # 8.0.x
+ if vrel >= 12:
+ args.extend(['-i', '-m'])
+ elif vmin == 2: # 8.2.x
+ if vrel >= 7:
+ args.extend(['-i', '-m'])
+ elif vmaj >= 3: # 8.3.x or newer
+ args.extend(['-i', '-a', 'm'])
result = utils.RunCmd(args)
if result.failed:
_ThrowError("drbd%d: can't attach local disk: %s", minor, result.output)
# about its peer.
cls._SetMinorSyncSpeed(minor, constants.SYNC_SPEED)
+ if netutils.IP6Address.IsValid(lhost):
+ if not netutils.IP6Address.IsValid(rhost):
+ _ThrowError("drbd%d: can't connect ip %s to ip %s" %
+ (minor, lhost, rhost))
+ family = "ipv6"
+ elif netutils.IP4Address.IsValid(lhost):
+ if not netutils.IP4Address.IsValid(rhost):
+ _ThrowError("drbd%d: can't connect ip %s to ip %s" %
+ (minor, lhost, rhost))
+ family = "ipv4"
+ else:
+ _ThrowError("drbd%d: Invalid ip %s" % (minor, lhost))
+
args = ["drbdsetup", cls._DevPath(minor), "net",
- "%s:%s" % (lhost, lport), "%s:%s" % (rhost, rport), protocol,
+ "%s:%s:%s" % (family, lhost, lport),
+ "%s:%s:%s" % (family, rhost, rport), protocol,
"-A", "discard-zero-changes",
"-B", "consensus",
"--create-device",
_ThrowError("drbd%d: can't setup network: %s - %s",
minor, result.fail_reason, result.output)
- timeout = time.time() + 10
- ok = False
- while time.time() < timeout:
+ def _CheckNetworkConfig():
info = cls._GetDevInfo(cls._GetShowData(minor))
if not "local_addr" in info or not "remote_addr" in info:
- time.sleep(1)
- continue
+ raise utils.RetryAgain()
+
if (info["local_addr"] != (lhost, lport) or
info["remote_addr"] != (rhost, rport)):
- time.sleep(1)
- continue
- ok = True
- break
- if not ok:
+ raise utils.RetryAgain()
+
+ try:
+ utils.Retry(_CheckNetworkConfig, 1.0, 10.0)
+ except utils.RetryTimeout:
_ThrowError("drbd%d: timeout while configuring network", minor)
def AddChildren(self, devices):
self._CheckMetaSize(meta.dev_path)
self._InitMeta(self._FindUnusedMinor(), meta.dev_path)
- self._AssembleLocal(self.minor, backend.dev_path, meta.dev_path)
+ self._AssembleLocal(self.minor, backend.dev_path, meta.dev_path, self.size)
self._children = devices
def RemoveChildren(self, devices):
children_result = super(DRBD8, self).SetSyncSpeed(kbytes)
return self._SetMinorSyncSpeed(self.minor, kbytes) and children_result
+ def PauseResumeSync(self, pause):
+ """Pauses or resumes the sync of a DRBD device.
+
+ @param pause: Wether to pause or resume
+ @return: the success of the operation
+
+ """
+ if self.minor is None:
+ logging.info("Not attached during PauseSync")
+ return False
+
+ children_result = super(DRBD8, self).PauseResumeSync(pause)
+
+ if pause:
+ cmd = "pause-sync"
+ else:
+ cmd = "resume-sync"
+
+ result = utils.RunCmd(["drbdsetup", self.dev_path, cmd])
+ if result.failed:
+ logging.error("Can't %s: %s - %s", cmd,
+ result.fail_reason, result.output)
+ return not result.failed and children_result
+
def GetProcStatus(self):
"""Return device data from /proc.
If sync_percent is None, it means all is ok
- If estimated_time is None, it means we can't esimate
+ If estimated_time is None, it means we can't estimate
the time needed, otherwise it's the time left in seconds.
We set the is_degraded parameter to True on two conditions:
network not connected or local disk missing.
- We compute the ldisk parameter based on wheter we have a local
+ We compute the ldisk parameter based on whether we have a local
disk or not.
- @rtype: tuple
- @return: (sync_percent, estimated_time, is_degraded, ldisk)
+ @rtype: objects.BlockDevStatus
"""
if self.minor is None and not self.Attach():
_ThrowError("drbd%d: can't Attach() in GetSyncStatus", self._aminor)
+
stats = self.GetProcStatus()
- ldisk = not stats.is_disk_uptodate
- is_degraded = not stats.is_connected
- return stats.sync_percent, stats.est_time, is_degraded or ldisk, ldisk
+ is_degraded = not stats.is_connected or not stats.is_disk_uptodate
+
+ if stats.is_disk_uptodate:
+ ldisk_status = constants.LDS_OKAY
+ elif stats.is_diskless:
+ ldisk_status = constants.LDS_FAULTY
+ else:
+ ldisk_status = constants.LDS_UNKNOWN
+
+ return objects.BlockDevStatus(dev_path=self.dev_path,
+ major=self.major,
+ minor=self.minor,
+ sync_percent=stats.sync_percent,
+ estimated_time=stats.est_time,
+ is_degraded=is_degraded,
+ ldisk_status=ldisk_status)
def Open(self, force=False):
"""Make the local state primary.
_ThrowError("drbd%d: DRBD disk missing network info in"
" DisconnectNet()", self.minor)
- ever_disconnected = _IgnoreError(self._ShutdownNet, self.minor)
- timeout_limit = time.time() + self._NET_RECONFIG_TIMEOUT
- sleep_time = 0.100 # we start the retry time at 100 miliseconds
- while time.time() < timeout_limit:
- status = self.GetProcStatus()
- if status.is_standalone:
- break
- # retry the disconnect, it seems possible that due to a
- # well-time disconnect on the peer, my disconnect command might
- # be ingored and forgotten
- ever_disconnected = _IgnoreError(self._ShutdownNet, self.minor) or \
- ever_disconnected
- time.sleep(sleep_time)
- sleep_time = min(2, sleep_time * 1.5)
+ class _DisconnectStatus:
+ def __init__(self, ever_disconnected):
+ self.ever_disconnected = ever_disconnected
- if not status.is_standalone:
- if ever_disconnected:
+ dstatus = _DisconnectStatus(_IgnoreError(self._ShutdownNet, self.minor))
+
+ def _WaitForDisconnect():
+ if self.GetProcStatus().is_standalone:
+ return
+
+ # retry the disconnect, it seems possible that due to a well-time
+ # disconnect on the peer, my disconnect command might be ignored and
+ # forgotten
+ dstatus.ever_disconnected = \
+ _IgnoreError(self._ShutdownNet, self.minor) or dstatus.ever_disconnected
+
+ raise utils.RetryAgain()
+
+ # Keep start time
+ start_time = time.time()
+
+ try:
+ # Start delay at 100 milliseconds and grow up to 2 seconds
+ utils.Retry(_WaitForDisconnect, (0.1, 1.5, 2.0),
+ self._NET_RECONFIG_TIMEOUT)
+ except utils.RetryTimeout:
+ if dstatus.ever_disconnected:
msg = ("drbd%d: device did not react to the"
" 'disconnect' command in a timely manner")
else:
msg = "drbd%d: can't shutdown network, even after multiple retries"
+
_ThrowError(msg, self.minor)
- reconfig_time = time.time() - timeout_limit + self._NET_RECONFIG_TIMEOUT
- if reconfig_time > 15: # hardcoded alert limit
+ reconfig_time = time.time() - start_time
+ if reconfig_time > (self._NET_RECONFIG_TIMEOUT * 0.25):
logging.info("drbd%d: DisconnectNet: detach took %.3f seconds",
self.minor, reconfig_time)
the attach if can return success.
"""
+ # TODO: Rewrite to not use a for loop just because there is 'break'
+ # pylint: disable-msg=W0631
net_data = (self._lhost, self._lport, self._rhost, self._rport)
for minor in (self._aminor,):
info = self._GetDevInfo(self._GetShowData(minor))
if match_r and "local_dev" not in info:
# no local disk, but network attached and it matches
self._AssembleLocal(minor, self._children[0].dev_path,
- self._children[1].dev_path)
+ self._children[1].dev_path, self.size)
if self._MatchesNet(self._GetDevInfo(self._GetShowData(minor))):
break
else:
minor = self._aminor
if self._children and self._children[0] and self._children[1]:
self._AssembleLocal(minor, self._children[0].dev_path,
- self._children[1].dev_path)
+ self._children[1].dev_path, self.size)
if self._lhost and self._lport and self._rhost and self._rport:
self._AssembleNet(minor,
(self._lhost, self._lport, self._rhost, self._rport),
aminor, meta)
cls._CheckMetaSize(meta.dev_path)
cls._InitMeta(aminor, meta.dev_path)
- return cls(unique_id, children)
+ return cls(unique_id, children, size)
def Grow(self, amount):
"""Resize the DRBD device and its backing storage.
if len(self._children) != 2 or None in self._children:
_ThrowError("drbd%d: cannot grow diskless device", self.minor)
self._children[0].Grow(amount)
- result = utils.RunCmd(["drbdsetup", self.dev_path, "resize"])
+ result = utils.RunCmd(["drbdsetup", self.dev_path, "resize", "-s",
+ "%dm" % (self.size + amount)])
if result.failed:
_ThrowError("drbd%d: resize failed: %s", self.minor, result.output)
The unique_id for the file device is a (file_driver, file_path) tuple.
"""
- def __init__(self, unique_id, children):
+ def __init__(self, unique_id, children, size):
"""Initalizes a file device backend.
"""
if children:
raise errors.BlockDeviceError("Invalid setup for file device")
- super(FileStorage, self).__init__(unique_id, children)
+ super(FileStorage, self).__init__(unique_id, children, size)
if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
raise ValueError("Invalid configuration data %s" % str(unique_id))
self.driver = unique_id[0]
def Shutdown(self):
"""Shutdown the device.
- This is a no-op for the file type, as we don't deacivate
+ This is a no-op for the file type, as we don't deactivate
the file on shutdown.
"""
if err.errno != errno.ENOENT:
_ThrowError("Can't remove file '%s': %s", self.dev_path, err)
+ def Rename(self, new_id):
+ """Renames the file.
+
+ """
+ # TODO: implement rename for file-based storage
+ _ThrowError("Rename is not supported for file-based storage")
+
+ def Grow(self, amount):
+ """Grow the file
+
+ @param amount: the amount (in mebibytes) to grow with
+
+ """
+ # Check that the file exists
+ self.Assemble()
+ current_size = self.GetActualSize()
+ new_size = current_size + amount * 1024 * 1024
+ assert new_size > current_size, "Cannot Grow with a negative amount"
+ try:
+ f = open(self.dev_path, "a+")
+ f.truncate(new_size)
+ f.close()
+ except EnvironmentError, err:
+ _ThrowError("Error in file growth: %", str(err))
+
def Attach(self):
"""Attach to an existing file.
self.attached = os.path.exists(self.dev_path)
return self.attached
+ def GetActualSize(self):
+ """Return the actual disk size.
+
+ @note: the device needs to be active when this is called
+
+ """
+ assert self.attached, "BlockDevice not attached in GetActualSize()"
+ try:
+ st = os.stat(self.dev_path)
+ return st.st_size
+ except OSError, err:
+ _ThrowError("Can't stat %s: %s", self.dev_path, err)
+
@classmethod
def Create(cls, unique_id, children, size):
"""Create a new file.
@return: an instance of FileStorage
"""
- # TODO: decide whether we should check for existing files and
- # abort or not
if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
raise ValueError("Invalid configuration data %s" % str(unique_id))
dev_path = unique_id[1]
try:
- f = open(dev_path, 'w')
+ fd = os.open(dev_path, os.O_RDWR | os.O_CREAT | os.O_EXCL)
+ f = os.fdopen(fd, "w")
f.truncate(size * 1024 * 1024)
f.close()
- except IOError, err:
+ except EnvironmentError, err:
+ if err.errno == errno.EEXIST:
+ _ThrowError("File already existing: %s", dev_path)
_ThrowError("Error in file creation: %", str(err))
- return FileStorage(unique_id, children)
+ return FileStorage(unique_id, children, size)
DEV_MAP = {
constants.LD_LV: LogicalVolume,
constants.LD_DRBD8: DRBD8,
- constants.LD_FILE: FileStorage,
}
+if constants.ENABLE_FILE_STORAGE:
+ DEV_MAP[constants.LD_FILE] = FileStorage
+
-def FindDevice(dev_type, unique_id, children):
+def FindDevice(dev_type, unique_id, children, size):
"""Search for an existing, assembled device.
This will succeed only if the device exists and is assembled, but it
"""
if dev_type not in DEV_MAP:
raise errors.ProgrammerError("Invalid block device type '%s'" % dev_type)
- device = DEV_MAP[dev_type](unique_id, children)
+ device = DEV_MAP[dev_type](unique_id, children, size)
if not device.attached:
return None
return device
-def Assemble(dev_type, unique_id, children):
+def Assemble(dev_type, unique_id, children, size):
"""Try to attach or assemble an existing device.
This will attach to assemble the device, as needed, to bring it
"""
if dev_type not in DEV_MAP:
raise errors.ProgrammerError("Invalid block device type '%s'" % dev_type)
- device = DEV_MAP[dev_type](unique_id, children)
+ device = DEV_MAP[dev_type](unique_id, children, size)
device.Assemble()
return device