Fix execution group of NodeD
[ganeti-local] / lib / bdev.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Block device abstraction"""
23
24 import re
25 import time
26 import errno
27 import shlex
28 import stat
29 import pyparsing as pyp
30 import os
31 import logging
32 import math
33
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import constants
37 from ganeti import objects
38 from ganeti import compat
39 from ganeti import netutils
40 from ganeti import pathutils
41 from ganeti import serializer
42
43
44 # Size of reads in _CanReadDevice
45 _DEVICE_READ_SIZE = 128 * 1024
46
47
48 class RbdShowmappedJsonError(Exception):
49   """`rbd showmmapped' JSON formatting error Exception class.
50
51   """
52   pass
53
54
55 def _IgnoreError(fn, *args, **kwargs):
56   """Executes the given function, ignoring BlockDeviceErrors.
57
58   This is used in order to simplify the execution of cleanup or
59   rollback functions.
60
61   @rtype: boolean
62   @return: True when fn didn't raise an exception, False otherwise
63
64   """
65   try:
66     fn(*args, **kwargs)
67     return True
68   except errors.BlockDeviceError, err:
69     logging.warning("Caught BlockDeviceError but ignoring: %s", str(err))
70     return False
71
72
73 def _ThrowError(msg, *args):
74   """Log an error to the node daemon and the raise an exception.
75
76   @type msg: string
77   @param msg: the text of the exception
78   @raise errors.BlockDeviceError
79
80   """
81   if args:
82     msg = msg % args
83   logging.error(msg)
84   raise errors.BlockDeviceError(msg)
85
86
87 def _CheckResult(result):
88   """Throws an error if the given result is a failed one.
89
90   @param result: result from RunCmd
91
92   """
93   if result.failed:
94     _ThrowError("Command: %s error: %s - %s", result.cmd, result.fail_reason,
95                 result.output)
96
97
98 def _CanReadDevice(path):
99   """Check if we can read from the given device.
100
101   This tries to read the first 128k of the device.
102
103   """
104   try:
105     utils.ReadFile(path, size=_DEVICE_READ_SIZE)
106     return True
107   except EnvironmentError:
108     logging.warning("Can't read from device %s", path, exc_info=True)
109     return False
110
111
112 def _GetForbiddenFileStoragePaths():
113   """Builds a list of path prefixes which shouldn't be used for file storage.
114
115   @rtype: frozenset
116
117   """
118   paths = set([
119     "/boot",
120     "/dev",
121     "/etc",
122     "/home",
123     "/proc",
124     "/root",
125     "/sys",
126     ])
127
128   for prefix in ["", "/usr", "/usr/local"]:
129     paths.update(map(lambda s: "%s/%s" % (prefix, s),
130                      ["bin", "lib", "lib32", "lib64", "sbin"]))
131
132   return compat.UniqueFrozenset(map(os.path.normpath, paths))
133
134
135 def _ComputeWrongFileStoragePaths(paths,
136                                   _forbidden=_GetForbiddenFileStoragePaths()):
137   """Cross-checks a list of paths for prefixes considered bad.
138
139   Some paths, e.g. "/bin", should not be used for file storage.
140
141   @type paths: list
142   @param paths: List of paths to be checked
143   @rtype: list
144   @return: Sorted list of paths for which the user should be warned
145
146   """
147   def _Check(path):
148     return (not os.path.isabs(path) or
149             path in _forbidden or
150             filter(lambda p: utils.IsBelowDir(p, path), _forbidden))
151
152   return utils.NiceSort(filter(_Check, map(os.path.normpath, paths)))
153
154
155 def ComputeWrongFileStoragePaths(_filename=pathutils.FILE_STORAGE_PATHS_FILE):
156   """Returns a list of file storage paths whose prefix is considered bad.
157
158   See L{_ComputeWrongFileStoragePaths}.
159
160   """
161   return _ComputeWrongFileStoragePaths(_LoadAllowedFileStoragePaths(_filename))
162
163
164 def _CheckFileStoragePath(path, allowed):
165   """Checks if a path is in a list of allowed paths for file storage.
166
167   @type path: string
168   @param path: Path to check
169   @type allowed: list
170   @param allowed: List of allowed paths
171   @raise errors.FileStoragePathError: If the path is not allowed
172
173   """
174   if not os.path.isabs(path):
175     raise errors.FileStoragePathError("File storage path must be absolute,"
176                                       " got '%s'" % path)
177
178   for i in allowed:
179     if not os.path.isabs(i):
180       logging.info("Ignoring relative path '%s' for file storage", i)
181       continue
182
183     if utils.IsBelowDir(i, path):
184       break
185   else:
186     raise errors.FileStoragePathError("Path '%s' is not acceptable for file"
187                                       " storage. A possible fix might be to add"
188                                       " it to /etc/ganeti/file-storage-paths"
189                                       " on all nodes." % path)
190
191
192 def _LoadAllowedFileStoragePaths(filename):
193   """Loads file containing allowed file storage paths.
194
195   @rtype: list
196   @return: List of allowed paths (can be an empty list)
197
198   """
199   try:
200     contents = utils.ReadFile(filename)
201   except EnvironmentError:
202     return []
203   else:
204     return utils.FilterEmptyLinesAndComments(contents)
205
206
207 def CheckFileStoragePath(path, _filename=pathutils.FILE_STORAGE_PATHS_FILE):
208   """Checks if a path is allowed for file storage.
209
210   @type path: string
211   @param path: Path to check
212   @raise errors.FileStoragePathError: If the path is not allowed
213
214   """
215   allowed = _LoadAllowedFileStoragePaths(_filename)
216
217   if _ComputeWrongFileStoragePaths([path]):
218     raise errors.FileStoragePathError("Path '%s' uses a forbidden prefix" %
219                                       path)
220
221   _CheckFileStoragePath(path, allowed)
222
223
224 class BlockDev(object):
225   """Block device abstract class.
226
227   A block device can be in the following states:
228     - not existing on the system, and by `Create()` it goes into:
229     - existing but not setup/not active, and by `Assemble()` goes into:
230     - active read-write and by `Open()` it goes into
231     - online (=used, or ready for use)
232
233   A device can also be online but read-only, however we are not using
234   the readonly state (LV has it, if needed in the future) and we are
235   usually looking at this like at a stack, so it's easier to
236   conceptualise the transition from not-existing to online and back
237   like a linear one.
238
239   The many different states of the device are due to the fact that we
240   need to cover many device types:
241     - logical volumes are created, lvchange -a y $lv, and used
242     - drbd devices are attached to a local disk/remote peer and made primary
243
244   A block device is identified by three items:
245     - the /dev path of the device (dynamic)
246     - a unique ID of the device (static)
247     - it's major/minor pair (dynamic)
248
249   Not all devices implement both the first two as distinct items. LVM
250   logical volumes have their unique ID (the pair volume group, logical
251   volume name) in a 1-to-1 relation to the dev path. For DRBD devices,
252   the /dev path is again dynamic and the unique id is the pair (host1,
253   dev1), (host2, dev2).
254
255   You can get to a device in two ways:
256     - creating the (real) device, which returns you
257       an attached instance (lvcreate)
258     - attaching of a python instance to an existing (real) device
259
260   The second point, the attachment to a device, is different
261   depending on whether the device is assembled or not. At init() time,
262   we search for a device with the same unique_id as us. If found,
263   good. It also means that the device is already assembled. If not,
264   after assembly we'll have our correct major/minor.
265
266   """
267   def __init__(self, unique_id, children, size, params):
268     self._children = children
269     self.dev_path = None
270     self.unique_id = unique_id
271     self.major = None
272     self.minor = None
273     self.attached = False
274     self.size = size
275     self.params = params
276
277   def Assemble(self):
278     """Assemble the device from its components.
279
280     Implementations of this method by child classes must ensure that:
281       - after the device has been assembled, it knows its major/minor
282         numbers; this allows other devices (usually parents) to probe
283         correctly for their children
284       - calling this method on an existing, in-use device is safe
285       - if the device is already configured (and in an OK state),
286         this method is idempotent
287
288     """
289     pass
290
291   def Attach(self):
292     """Find a device which matches our config and attach to it.
293
294     """
295     raise NotImplementedError
296
297   def Close(self):
298     """Notifies that the device will no longer be used for I/O.
299
300     """
301     raise NotImplementedError
302
303   @classmethod
304   def Create(cls, unique_id, children, size, params, excl_stor):
305     """Create the device.
306
307     If the device cannot be created, it will return None
308     instead. Error messages go to the logging system.
309
310     Note that for some devices, the unique_id is used, and for other,
311     the children. The idea is that these two, taken together, are
312     enough for both creation and assembly (later).
313
314     """
315     raise NotImplementedError
316
317   def Remove(self):
318     """Remove this device.
319
320     This makes sense only for some of the device types: LV and file
321     storage. Also note that if the device can't attach, the removal
322     can't be completed.
323
324     """
325     raise NotImplementedError
326
327   def Rename(self, new_id):
328     """Rename this device.
329
330     This may or may not make sense for a given device type.
331
332     """
333     raise NotImplementedError
334
335   def Open(self, force=False):
336     """Make the device ready for use.
337
338     This makes the device ready for I/O. For now, just the DRBD
339     devices need this.
340
341     The force parameter signifies that if the device has any kind of
342     --force thing, it should be used, we know what we are doing.
343
344     """
345     raise NotImplementedError
346
347   def Shutdown(self):
348     """Shut down the device, freeing its children.
349
350     This undoes the `Assemble()` work, except for the child
351     assembling; as such, the children on the device are still
352     assembled after this call.
353
354     """
355     raise NotImplementedError
356
357   def SetSyncParams(self, params):
358     """Adjust the synchronization parameters of the mirror.
359
360     In case this is not a mirroring device, this is no-op.
361
362     @param params: dictionary of LD level disk parameters related to the
363     synchronization.
364     @rtype: list
365     @return: a list of error messages, emitted both by the current node and by
366     children. An empty list means no errors.
367
368     """
369     result = []
370     if self._children:
371       for child in self._children:
372         result.extend(child.SetSyncParams(params))
373     return result
374
375   def PauseResumeSync(self, pause):
376     """Pause/Resume the sync of the mirror.
377
378     In case this is not a mirroring device, this is no-op.
379
380     @param pause: Whether to pause or resume
381
382     """
383     result = True
384     if self._children:
385       for child in self._children:
386         result = result and child.PauseResumeSync(pause)
387     return result
388
389   def GetSyncStatus(self):
390     """Returns the sync status of the device.
391
392     If this device is a mirroring device, this function returns the
393     status of the mirror.
394
395     If sync_percent is None, it means the device is not syncing.
396
397     If estimated_time is None, it means we can't estimate
398     the time needed, otherwise it's the time left in seconds.
399
400     If is_degraded is True, it means the device is missing
401     redundancy. This is usually a sign that something went wrong in
402     the device setup, if sync_percent is None.
403
404     The ldisk parameter represents the degradation of the local
405     data. This is only valid for some devices, the rest will always
406     return False (not degraded).
407
408     @rtype: objects.BlockDevStatus
409
410     """
411     return objects.BlockDevStatus(dev_path=self.dev_path,
412                                   major=self.major,
413                                   minor=self.minor,
414                                   sync_percent=None,
415                                   estimated_time=None,
416                                   is_degraded=False,
417                                   ldisk_status=constants.LDS_OKAY)
418
419   def CombinedSyncStatus(self):
420     """Calculate the mirror status recursively for our children.
421
422     The return value is the same as for `GetSyncStatus()` except the
423     minimum percent and maximum time are calculated across our
424     children.
425
426     @rtype: objects.BlockDevStatus
427
428     """
429     status = self.GetSyncStatus()
430
431     min_percent = status.sync_percent
432     max_time = status.estimated_time
433     is_degraded = status.is_degraded
434     ldisk_status = status.ldisk_status
435
436     if self._children:
437       for child in self._children:
438         child_status = child.GetSyncStatus()
439
440         if min_percent is None:
441           min_percent = child_status.sync_percent
442         elif child_status.sync_percent is not None:
443           min_percent = min(min_percent, child_status.sync_percent)
444
445         if max_time is None:
446           max_time = child_status.estimated_time
447         elif child_status.estimated_time is not None:
448           max_time = max(max_time, child_status.estimated_time)
449
450         is_degraded = is_degraded or child_status.is_degraded
451
452         if ldisk_status is None:
453           ldisk_status = child_status.ldisk_status
454         elif child_status.ldisk_status is not None:
455           ldisk_status = max(ldisk_status, child_status.ldisk_status)
456
457     return objects.BlockDevStatus(dev_path=self.dev_path,
458                                   major=self.major,
459                                   minor=self.minor,
460                                   sync_percent=min_percent,
461                                   estimated_time=max_time,
462                                   is_degraded=is_degraded,
463                                   ldisk_status=ldisk_status)
464
465   def SetInfo(self, text):
466     """Update metadata with info text.
467
468     Only supported for some device types.
469
470     """
471     for child in self._children:
472       child.SetInfo(text)
473
474   def Grow(self, amount, dryrun, backingstore):
475     """Grow the block device.
476
477     @type amount: integer
478     @param amount: the amount (in mebibytes) to grow with
479     @type dryrun: boolean
480     @param dryrun: whether to execute the operation in simulation mode
481         only, without actually increasing the size
482     @param backingstore: whether to execute the operation on backing storage
483         only, or on "logical" storage only; e.g. DRBD is logical storage,
484         whereas LVM, file, RBD are backing storage
485
486     """
487     raise NotImplementedError
488
489   def GetActualSize(self):
490     """Return the actual disk size.
491
492     @note: the device needs to be active when this is called
493
494     """
495     assert self.attached, "BlockDevice not attached in GetActualSize()"
496     result = utils.RunCmd(["blockdev", "--getsize64", self.dev_path])
497     if result.failed:
498       _ThrowError("blockdev failed (%s): %s",
499                   result.fail_reason, result.output)
500     try:
501       sz = int(result.output.strip())
502     except (ValueError, TypeError), err:
503       _ThrowError("Failed to parse blockdev output: %s", str(err))
504     return sz
505
506   def __repr__(self):
507     return ("<%s: unique_id: %s, children: %s, %s:%s, %s>" %
508             (self.__class__, self.unique_id, self._children,
509              self.major, self.minor, self.dev_path))
510
511
512 class LogicalVolume(BlockDev):
513   """Logical Volume block device.
514
515   """
516   _VALID_NAME_RE = re.compile("^[a-zA-Z0-9+_.-]*$")
517   _INVALID_NAMES = compat.UniqueFrozenset([".", "..", "snapshot", "pvmove"])
518   _INVALID_SUBSTRINGS = compat.UniqueFrozenset(["_mlog", "_mimage"])
519
520   def __init__(self, unique_id, children, size, params):
521     """Attaches to a LV device.
522
523     The unique_id is a tuple (vg_name, lv_name)
524
525     """
526     super(LogicalVolume, self).__init__(unique_id, children, size, params)
527     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
528       raise ValueError("Invalid configuration data %s" % str(unique_id))
529     self._vg_name, self._lv_name = unique_id
530     self._ValidateName(self._vg_name)
531     self._ValidateName(self._lv_name)
532     self.dev_path = utils.PathJoin("/dev", self._vg_name, self._lv_name)
533     self._degraded = True
534     self.major = self.minor = self.pe_size = self.stripe_count = None
535     self.Attach()
536
537   @staticmethod
538   def _GetStdPvSize(pvs_info):
539     """Return the the standard PV size (used with exclusive storage).
540
541     @param pvs_info: list of objects.LvmPvInfo, cannot be empty
542     @rtype: float
543     @return: size in MiB
544
545     """
546     assert len(pvs_info) > 0
547     smallest = min([pv.size for pv in pvs_info])
548     return smallest / (1 + constants.PART_MARGIN + constants.PART_RESERVED)
549
550   @staticmethod
551   def _ComputeNumPvs(size, pvs_info):
552     """Compute the number of PVs needed for an LV (with exclusive storage).
553
554     @type size: float
555     @param size: LV size in MiB
556     @param pvs_info: list of objects.LvmPvInfo, cannot be empty
557     @rtype: integer
558     @return: number of PVs needed
559     """
560     assert len(pvs_info) > 0
561     pv_size = float(LogicalVolume._GetStdPvSize(pvs_info))
562     return int(math.ceil(float(size) / pv_size))
563
564   @staticmethod
565   def _GetEmptyPvNames(pvs_info, max_pvs=None):
566     """Return a list of empty PVs, by name.
567
568     """
569     empty_pvs = filter(objects.LvmPvInfo.IsEmpty, pvs_info)
570     if max_pvs is not None:
571       empty_pvs = empty_pvs[:max_pvs]
572     return map((lambda pv: pv.name), empty_pvs)
573
574   @classmethod
575   def Create(cls, unique_id, children, size, params, excl_stor):
576     """Create a new logical volume.
577
578     """
579     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
580       raise errors.ProgrammerError("Invalid configuration data %s" %
581                                    str(unique_id))
582     vg_name, lv_name = unique_id
583     cls._ValidateName(vg_name)
584     cls._ValidateName(lv_name)
585     pvs_info = cls.GetPVInfo([vg_name])
586     if not pvs_info:
587       if excl_stor:
588         msg = "No (empty) PVs found"
589       else:
590         msg = "Can't compute PV info for vg %s" % vg_name
591       _ThrowError(msg)
592     pvs_info.sort(key=(lambda pv: pv.free), reverse=True)
593
594     pvlist = [pv.name for pv in pvs_info]
595     if compat.any(":" in v for v in pvlist):
596       _ThrowError("Some of your PVs have the invalid character ':' in their"
597                   " name, this is not supported - please filter them out"
598                   " in lvm.conf using either 'filter' or 'preferred_names'")
599
600     current_pvs = len(pvlist)
601     desired_stripes = params[constants.LDP_STRIPES]
602     stripes = min(current_pvs, desired_stripes)
603
604     if excl_stor:
605       (err_msgs, _) = utils.LvmExclusiveCheckNodePvs(pvs_info)
606       if err_msgs:
607         for m in err_msgs:
608           logging.warning(m)
609       req_pvs = cls._ComputeNumPvs(size, pvs_info)
610       pvlist = cls._GetEmptyPvNames(pvs_info, req_pvs)
611       current_pvs = len(pvlist)
612       if current_pvs < req_pvs:
613         _ThrowError("Not enough empty PVs to create a disk of %d MB:"
614                     " %d available, %d needed", size, current_pvs, req_pvs)
615       assert current_pvs == len(pvlist)
616       if stripes > current_pvs:
617         # No warning issued for this, as it's no surprise
618         stripes = current_pvs
619
620     else:
621       if stripes < desired_stripes:
622         logging.warning("Could not use %d stripes for VG %s, as only %d PVs are"
623                         " available.", desired_stripes, vg_name, current_pvs)
624       free_size = sum([pv.free for pv in pvs_info])
625       # The size constraint should have been checked from the master before
626       # calling the create function.
627       if free_size < size:
628         _ThrowError("Not enough free space: required %s,"
629                     " available %s", size, free_size)
630
631     # If the free space is not well distributed, we won't be able to
632     # create an optimally-striped volume; in that case, we want to try
633     # with N, N-1, ..., 2, and finally 1 (non-stripped) number of
634     # stripes
635     cmd = ["lvcreate", "-L%dm" % size, "-n%s" % lv_name]
636     for stripes_arg in range(stripes, 0, -1):
637       result = utils.RunCmd(cmd + ["-i%d" % stripes_arg] + [vg_name] + pvlist)
638       if not result.failed:
639         break
640     if result.failed:
641       _ThrowError("LV create failed (%s): %s",
642                   result.fail_reason, result.output)
643     return LogicalVolume(unique_id, children, size, params)
644
645   @staticmethod
646   def _GetVolumeInfo(lvm_cmd, fields):
647     """Returns LVM Volume infos using lvm_cmd
648
649     @param lvm_cmd: Should be one of "pvs", "vgs" or "lvs"
650     @param fields: Fields to return
651     @return: A list of dicts each with the parsed fields
652
653     """
654     if not fields:
655       raise errors.ProgrammerError("No fields specified")
656
657     sep = "|"
658     cmd = [lvm_cmd, "--noheadings", "--nosuffix", "--units=m", "--unbuffered",
659            "--separator=%s" % sep, "-o%s" % ",".join(fields)]
660
661     result = utils.RunCmd(cmd)
662     if result.failed:
663       raise errors.CommandError("Can't get the volume information: %s - %s" %
664                                 (result.fail_reason, result.output))
665
666     data = []
667     for line in result.stdout.splitlines():
668       splitted_fields = line.strip().split(sep)
669
670       if len(fields) != len(splitted_fields):
671         raise errors.CommandError("Can't parse %s output: line '%s'" %
672                                   (lvm_cmd, line))
673
674       data.append(splitted_fields)
675
676     return data
677
678   @classmethod
679   def GetPVInfo(cls, vg_names, filter_allocatable=True, include_lvs=False):
680     """Get the free space info for PVs in a volume group.
681
682     @param vg_names: list of volume group names, if empty all will be returned
683     @param filter_allocatable: whether to skip over unallocatable PVs
684     @param include_lvs: whether to include a list of LVs hosted on each PV
685
686     @rtype: list
687     @return: list of objects.LvmPvInfo objects
688
689     """
690     # We request "lv_name" field only if we care about LVs, so we don't get
691     # a long list of entries with many duplicates unless we really have to.
692     # The duplicate "pv_name" field will be ignored.
693     if include_lvs:
694       lvfield = "lv_name"
695     else:
696       lvfield = "pv_name"
697     try:
698       info = cls._GetVolumeInfo("pvs", ["pv_name", "vg_name", "pv_free",
699                                         "pv_attr", "pv_size", lvfield])
700     except errors.GenericError, err:
701       logging.error("Can't get PV information: %s", err)
702       return None
703
704     # When asked for LVs, "pvs" may return multiple entries for the same PV-LV
705     # pair. We sort entries by PV name and then LV name, so it's easy to weed
706     # out duplicates.
707     if include_lvs:
708       info.sort(key=(lambda i: (i[0], i[5])))
709     data = []
710     lastpvi = None
711     for (pv_name, vg_name, pv_free, pv_attr, pv_size, lv_name) in info:
712       # (possibly) skip over pvs which are not allocatable
713       if filter_allocatable and pv_attr[0] != "a":
714         continue
715       # (possibly) skip over pvs which are not in the right volume group(s)
716       if vg_names and vg_name not in vg_names:
717         continue
718       # Beware of duplicates (check before inserting)
719       if lastpvi and lastpvi.name == pv_name:
720         if include_lvs and lv_name:
721           if not lastpvi.lv_list or lastpvi.lv_list[-1] != lv_name:
722             lastpvi.lv_list.append(lv_name)
723       else:
724         if include_lvs and lv_name:
725           lvl = [lv_name]
726         else:
727           lvl = []
728         lastpvi = objects.LvmPvInfo(name=pv_name, vg_name=vg_name,
729                                     size=float(pv_size), free=float(pv_free),
730                                     attributes=pv_attr, lv_list=lvl)
731         data.append(lastpvi)
732
733     return data
734
735   @classmethod
736   def _GetExclusiveStorageVgFree(cls, vg_name):
737     """Return the free disk space in the given VG, in exclusive storage mode.
738
739     @type vg_name: string
740     @param vg_name: VG name
741     @rtype: float
742     @return: free space in MiB
743     """
744     pvs_info = cls.GetPVInfo([vg_name])
745     if not pvs_info:
746       return 0.0
747     pv_size = cls._GetStdPvSize(pvs_info)
748     num_pvs = len(cls._GetEmptyPvNames(pvs_info))
749     return pv_size * num_pvs
750
751   @classmethod
752   def GetVGInfo(cls, vg_names, excl_stor, filter_readonly=True):
753     """Get the free space info for specific VGs.
754
755     @param vg_names: list of volume group names, if empty all will be returned
756     @param excl_stor: whether exclusive_storage is enabled
757     @param filter_readonly: whether to skip over readonly VGs
758
759     @rtype: list
760     @return: list of tuples (free_space, total_size, name) with free_space in
761              MiB
762
763     """
764     try:
765       info = cls._GetVolumeInfo("vgs", ["vg_name", "vg_free", "vg_attr",
766                                         "vg_size"])
767     except errors.GenericError, err:
768       logging.error("Can't get VG information: %s", err)
769       return None
770
771     data = []
772     for vg_name, vg_free, vg_attr, vg_size in info:
773       # (possibly) skip over vgs which are not writable
774       if filter_readonly and vg_attr[0] == "r":
775         continue
776       # (possibly) skip over vgs which are not in the right volume group(s)
777       if vg_names and vg_name not in vg_names:
778         continue
779       # Exclusive storage needs a different concept of free space
780       if excl_stor:
781         es_free = cls._GetExclusiveStorageVgFree(vg_name)
782         assert es_free <= vg_free
783         vg_free = es_free
784       data.append((float(vg_free), float(vg_size), vg_name))
785
786     return data
787
788   @classmethod
789   def _ValidateName(cls, name):
790     """Validates that a given name is valid as VG or LV name.
791
792     The list of valid characters and restricted names is taken out of
793     the lvm(8) manpage, with the simplification that we enforce both
794     VG and LV restrictions on the names.
795
796     """
797     if (not cls._VALID_NAME_RE.match(name) or
798         name in cls._INVALID_NAMES or
799         compat.any(substring in name for substring in cls._INVALID_SUBSTRINGS)):
800       _ThrowError("Invalid LVM name '%s'", name)
801
802   def Remove(self):
803     """Remove this logical volume.
804
805     """
806     if not self.minor and not self.Attach():
807       # the LV does not exist
808       return
809     result = utils.RunCmd(["lvremove", "-f", "%s/%s" %
810                            (self._vg_name, self._lv_name)])
811     if result.failed:
812       _ThrowError("Can't lvremove: %s - %s", result.fail_reason, result.output)
813
814   def Rename(self, new_id):
815     """Rename this logical volume.
816
817     """
818     if not isinstance(new_id, (tuple, list)) or len(new_id) != 2:
819       raise errors.ProgrammerError("Invalid new logical id '%s'" % new_id)
820     new_vg, new_name = new_id
821     if new_vg != self._vg_name:
822       raise errors.ProgrammerError("Can't move a logical volume across"
823                                    " volume groups (from %s to to %s)" %
824                                    (self._vg_name, new_vg))
825     result = utils.RunCmd(["lvrename", new_vg, self._lv_name, new_name])
826     if result.failed:
827       _ThrowError("Failed to rename the logical volume: %s", result.output)
828     self._lv_name = new_name
829     self.dev_path = utils.PathJoin("/dev", self._vg_name, self._lv_name)
830
831   def Attach(self):
832     """Attach to an existing LV.
833
834     This method will try to see if an existing and active LV exists
835     which matches our name. If so, its major/minor will be
836     recorded.
837
838     """
839     self.attached = False
840     result = utils.RunCmd(["lvs", "--noheadings", "--separator=,",
841                            "--units=k", "--nosuffix",
842                            "-olv_attr,lv_kernel_major,lv_kernel_minor,"
843                            "vg_extent_size,stripes", self.dev_path])
844     if result.failed:
845       logging.error("Can't find LV %s: %s, %s",
846                     self.dev_path, result.fail_reason, result.output)
847       return False
848     # the output can (and will) have multiple lines for multi-segment
849     # LVs, as the 'stripes' parameter is a segment one, so we take
850     # only the last entry, which is the one we're interested in; note
851     # that with LVM2 anyway the 'stripes' value must be constant
852     # across segments, so this is a no-op actually
853     out = result.stdout.splitlines()
854     if not out: # totally empty result? splitlines() returns at least
855                 # one line for any non-empty string
856       logging.error("Can't parse LVS output, no lines? Got '%s'", str(out))
857       return False
858     out = out[-1].strip().rstrip(",")
859     out = out.split(",")
860     if len(out) != 5:
861       logging.error("Can't parse LVS output, len(%s) != 5", str(out))
862       return False
863
864     status, major, minor, pe_size, stripes = out
865     if len(status) < 6:
866       logging.error("lvs lv_attr is not at least 6 characters (%s)", status)
867       return False
868
869     try:
870       major = int(major)
871       minor = int(minor)
872     except (TypeError, ValueError), err:
873       logging.error("lvs major/minor cannot be parsed: %s", str(err))
874
875     try:
876       pe_size = int(float(pe_size))
877     except (TypeError, ValueError), err:
878       logging.error("Can't parse vg extent size: %s", err)
879       return False
880
881     try:
882       stripes = int(stripes)
883     except (TypeError, ValueError), err:
884       logging.error("Can't parse the number of stripes: %s", err)
885       return False
886
887     self.major = major
888     self.minor = minor
889     self.pe_size = pe_size
890     self.stripe_count = stripes
891     self._degraded = status[0] == "v" # virtual volume, i.e. doesn't backing
892                                       # storage
893     self.attached = True
894     return True
895
896   def Assemble(self):
897     """Assemble the device.
898
899     We always run `lvchange -ay` on the LV to ensure it's active before
900     use, as there were cases when xenvg was not active after boot
901     (also possibly after disk issues).
902
903     """
904     result = utils.RunCmd(["lvchange", "-ay", self.dev_path])
905     if result.failed:
906       _ThrowError("Can't activate lv %s: %s", self.dev_path, result.output)
907
908   def Shutdown(self):
909     """Shutdown the device.
910
911     This is a no-op for the LV device type, as we don't deactivate the
912     volumes on shutdown.
913
914     """
915     pass
916
917   def GetSyncStatus(self):
918     """Returns the sync status of the device.
919
920     If this device is a mirroring device, this function returns the
921     status of the mirror.
922
923     For logical volumes, sync_percent and estimated_time are always
924     None (no recovery in progress, as we don't handle the mirrored LV
925     case). The is_degraded parameter is the inverse of the ldisk
926     parameter.
927
928     For the ldisk parameter, we check if the logical volume has the
929     'virtual' type, which means it's not backed by existing storage
930     anymore (read from it return I/O error). This happens after a
931     physical disk failure and subsequent 'vgreduce --removemissing' on
932     the volume group.
933
934     The status was already read in Attach, so we just return it.
935
936     @rtype: objects.BlockDevStatus
937
938     """
939     if self._degraded:
940       ldisk_status = constants.LDS_FAULTY
941     else:
942       ldisk_status = constants.LDS_OKAY
943
944     return objects.BlockDevStatus(dev_path=self.dev_path,
945                                   major=self.major,
946                                   minor=self.minor,
947                                   sync_percent=None,
948                                   estimated_time=None,
949                                   is_degraded=self._degraded,
950                                   ldisk_status=ldisk_status)
951
952   def Open(self, force=False):
953     """Make the device ready for I/O.
954
955     This is a no-op for the LV device type.
956
957     """
958     pass
959
960   def Close(self):
961     """Notifies that the device will no longer be used for I/O.
962
963     This is a no-op for the LV device type.
964
965     """
966     pass
967
968   def Snapshot(self, size):
969     """Create a snapshot copy of an lvm block device.
970
971     @returns: tuple (vg, lv)
972
973     """
974     snap_name = self._lv_name + ".snap"
975
976     # remove existing snapshot if found
977     snap = LogicalVolume((self._vg_name, snap_name), None, size, self.params)
978     _IgnoreError(snap.Remove)
979
980     vg_info = self.GetVGInfo([self._vg_name], False)
981     if not vg_info:
982       _ThrowError("Can't compute VG info for vg %s", self._vg_name)
983     free_size, _, _ = vg_info[0]
984     if free_size < size:
985       _ThrowError("Not enough free space: required %s,"
986                   " available %s", size, free_size)
987
988     _CheckResult(utils.RunCmd(["lvcreate", "-L%dm" % size, "-s",
989                                "-n%s" % snap_name, self.dev_path]))
990
991     return (self._vg_name, snap_name)
992
993   def _RemoveOldInfo(self):
994     """Try to remove old tags from the lv.
995
996     """
997     result = utils.RunCmd(["lvs", "-o", "tags", "--noheadings", "--nosuffix",
998                            self.dev_path])
999     _CheckResult(result)
1000
1001     raw_tags = result.stdout.strip()
1002     if raw_tags:
1003       for tag in raw_tags.split(","):
1004         _CheckResult(utils.RunCmd(["lvchange", "--deltag",
1005                                    tag.strip(), self.dev_path]))
1006
1007   def SetInfo(self, text):
1008     """Update metadata with info text.
1009
1010     """
1011     BlockDev.SetInfo(self, text)
1012
1013     self._RemoveOldInfo()
1014
1015     # Replace invalid characters
1016     text = re.sub("^[^A-Za-z0-9_+.]", "_", text)
1017     text = re.sub("[^-A-Za-z0-9_+.]", "_", text)
1018
1019     # Only up to 128 characters are allowed
1020     text = text[:128]
1021
1022     _CheckResult(utils.RunCmd(["lvchange", "--addtag", text, self.dev_path]))
1023
1024   def Grow(self, amount, dryrun, backingstore):
1025     """Grow the logical volume.
1026
1027     """
1028     if not backingstore:
1029       return
1030     if self.pe_size is None or self.stripe_count is None:
1031       if not self.Attach():
1032         _ThrowError("Can't attach to LV during Grow()")
1033     full_stripe_size = self.pe_size * self.stripe_count
1034     # pe_size is in KB
1035     amount *= 1024
1036     rest = amount % full_stripe_size
1037     if rest != 0:
1038       amount += full_stripe_size - rest
1039     cmd = ["lvextend", "-L", "+%dk" % amount]
1040     if dryrun:
1041       cmd.append("--test")
1042     # we try multiple algorithms since the 'best' ones might not have
1043     # space available in the right place, but later ones might (since
1044     # they have less constraints); also note that only recent LVM
1045     # supports 'cling'
1046     for alloc_policy in "contiguous", "cling", "normal":
1047       result = utils.RunCmd(cmd + ["--alloc", alloc_policy, self.dev_path])
1048       if not result.failed:
1049         return
1050     _ThrowError("Can't grow LV %s: %s", self.dev_path, result.output)
1051
1052
1053 class DRBD8Status(object): # pylint: disable=R0902
1054   """A DRBD status representation class.
1055
1056   Note that this doesn't support unconfigured devices (cs:Unconfigured).
1057
1058   """
1059   UNCONF_RE = re.compile(r"\s*[0-9]+:\s*cs:Unconfigured$")
1060   LINE_RE = re.compile(r"\s*[0-9]+:\s*cs:(\S+)\s+(?:st|ro):([^/]+)/(\S+)"
1061                        "\s+ds:([^/]+)/(\S+)\s+.*$")
1062   SYNC_RE = re.compile(r"^.*\ssync'ed:\s*([0-9.]+)%.*"
1063                        # Due to a bug in drbd in the kernel, introduced in
1064                        # commit 4b0715f096 (still unfixed as of 2011-08-22)
1065                        "(?:\s|M)"
1066                        "finish: ([0-9]+):([0-9]+):([0-9]+)\s.*$")
1067
1068   CS_UNCONFIGURED = "Unconfigured"
1069   CS_STANDALONE = "StandAlone"
1070   CS_WFCONNECTION = "WFConnection"
1071   CS_WFREPORTPARAMS = "WFReportParams"
1072   CS_CONNECTED = "Connected"
1073   CS_STARTINGSYNCS = "StartingSyncS"
1074   CS_STARTINGSYNCT = "StartingSyncT"
1075   CS_WFBITMAPS = "WFBitMapS"
1076   CS_WFBITMAPT = "WFBitMapT"
1077   CS_WFSYNCUUID = "WFSyncUUID"
1078   CS_SYNCSOURCE = "SyncSource"
1079   CS_SYNCTARGET = "SyncTarget"
1080   CS_PAUSEDSYNCS = "PausedSyncS"
1081   CS_PAUSEDSYNCT = "PausedSyncT"
1082   CSET_SYNC = compat.UniqueFrozenset([
1083     CS_WFREPORTPARAMS,
1084     CS_STARTINGSYNCS,
1085     CS_STARTINGSYNCT,
1086     CS_WFBITMAPS,
1087     CS_WFBITMAPT,
1088     CS_WFSYNCUUID,
1089     CS_SYNCSOURCE,
1090     CS_SYNCTARGET,
1091     CS_PAUSEDSYNCS,
1092     CS_PAUSEDSYNCT,
1093     ])
1094
1095   DS_DISKLESS = "Diskless"
1096   DS_ATTACHING = "Attaching" # transient state
1097   DS_FAILED = "Failed" # transient state, next: diskless
1098   DS_NEGOTIATING = "Negotiating" # transient state
1099   DS_INCONSISTENT = "Inconsistent" # while syncing or after creation
1100   DS_OUTDATED = "Outdated"
1101   DS_DUNKNOWN = "DUnknown" # shown for peer disk when not connected
1102   DS_CONSISTENT = "Consistent"
1103   DS_UPTODATE = "UpToDate" # normal state
1104
1105   RO_PRIMARY = "Primary"
1106   RO_SECONDARY = "Secondary"
1107   RO_UNKNOWN = "Unknown"
1108
1109   def __init__(self, procline):
1110     u = self.UNCONF_RE.match(procline)
1111     if u:
1112       self.cstatus = self.CS_UNCONFIGURED
1113       self.lrole = self.rrole = self.ldisk = self.rdisk = None
1114     else:
1115       m = self.LINE_RE.match(procline)
1116       if not m:
1117         raise errors.BlockDeviceError("Can't parse input data '%s'" % procline)
1118       self.cstatus = m.group(1)
1119       self.lrole = m.group(2)
1120       self.rrole = m.group(3)
1121       self.ldisk = m.group(4)
1122       self.rdisk = m.group(5)
1123
1124     # end reading of data from the LINE_RE or UNCONF_RE
1125
1126     self.is_standalone = self.cstatus == self.CS_STANDALONE
1127     self.is_wfconn = self.cstatus == self.CS_WFCONNECTION
1128     self.is_connected = self.cstatus == self.CS_CONNECTED
1129     self.is_primary = self.lrole == self.RO_PRIMARY
1130     self.is_secondary = self.lrole == self.RO_SECONDARY
1131     self.peer_primary = self.rrole == self.RO_PRIMARY
1132     self.peer_secondary = self.rrole == self.RO_SECONDARY
1133     self.both_primary = self.is_primary and self.peer_primary
1134     self.both_secondary = self.is_secondary and self.peer_secondary
1135
1136     self.is_diskless = self.ldisk == self.DS_DISKLESS
1137     self.is_disk_uptodate = self.ldisk == self.DS_UPTODATE
1138     self.peer_disk_uptodate = self.rdisk == self.DS_UPTODATE
1139
1140     self.is_in_resync = self.cstatus in self.CSET_SYNC
1141     self.is_in_use = self.cstatus != self.CS_UNCONFIGURED
1142
1143     m = self.SYNC_RE.match(procline)
1144     if m:
1145       self.sync_percent = float(m.group(1))
1146       hours = int(m.group(2))
1147       minutes = int(m.group(3))
1148       seconds = int(m.group(4))
1149       self.est_time = hours * 3600 + minutes * 60 + seconds
1150     else:
1151       # we have (in this if branch) no percent information, but if
1152       # we're resyncing we need to 'fake' a sync percent information,
1153       # as this is how cmdlib determines if it makes sense to wait for
1154       # resyncing or not
1155       if self.is_in_resync:
1156         self.sync_percent = 0
1157       else:
1158         self.sync_percent = None
1159       self.est_time = None
1160
1161
1162 class BaseDRBD(BlockDev): # pylint: disable=W0223
1163   """Base DRBD class.
1164
1165   This class contains a few bits of common functionality between the
1166   0.7 and 8.x versions of DRBD.
1167
1168   """
1169   _VERSION_RE = re.compile(r"^version: (\d+)\.(\d+)\.(\d+)(?:\.\d+)?"
1170                            r" \(api:(\d+)/proto:(\d+)(?:-(\d+))?\)")
1171   _VALID_LINE_RE = re.compile("^ *([0-9]+): cs:([^ ]+).*$")
1172   _UNUSED_LINE_RE = re.compile("^ *([0-9]+): cs:Unconfigured$")
1173
1174   _DRBD_MAJOR = 147
1175   _ST_UNCONFIGURED = "Unconfigured"
1176   _ST_WFCONNECTION = "WFConnection"
1177   _ST_CONNECTED = "Connected"
1178
1179   _STATUS_FILE = constants.DRBD_STATUS_FILE
1180   _USERMODE_HELPER_FILE = "/sys/module/drbd/parameters/usermode_helper"
1181
1182   @staticmethod
1183   def _GetProcData(filename=_STATUS_FILE):
1184     """Return data from /proc/drbd.
1185
1186     """
1187     try:
1188       data = utils.ReadFile(filename).splitlines()
1189     except EnvironmentError, err:
1190       if err.errno == errno.ENOENT:
1191         _ThrowError("The file %s cannot be opened, check if the module"
1192                     " is loaded (%s)", filename, str(err))
1193       else:
1194         _ThrowError("Can't read the DRBD proc file %s: %s", filename, str(err))
1195     if not data:
1196       _ThrowError("Can't read any data from %s", filename)
1197     return data
1198
1199   @classmethod
1200   def _MassageProcData(cls, data):
1201     """Transform the output of _GetProdData into a nicer form.
1202
1203     @return: a dictionary of minor: joined lines from /proc/drbd
1204         for that minor
1205
1206     """
1207     results = {}
1208     old_minor = old_line = None
1209     for line in data:
1210       if not line: # completely empty lines, as can be returned by drbd8.0+
1211         continue
1212       lresult = cls._VALID_LINE_RE.match(line)
1213       if lresult is not None:
1214         if old_minor is not None:
1215           results[old_minor] = old_line
1216         old_minor = int(lresult.group(1))
1217         old_line = line
1218       else:
1219         if old_minor is not None:
1220           old_line += " " + line.strip()
1221     # add last line
1222     if old_minor is not None:
1223       results[old_minor] = old_line
1224     return results
1225
1226   @classmethod
1227   def _GetVersion(cls, proc_data):
1228     """Return the DRBD version.
1229
1230     This will return a dict with keys:
1231       - k_major
1232       - k_minor
1233       - k_point
1234       - api
1235       - proto
1236       - proto2 (only on drbd > 8.2.X)
1237
1238     """
1239     first_line = proc_data[0].strip()
1240     version = cls._VERSION_RE.match(first_line)
1241     if not version:
1242       raise errors.BlockDeviceError("Can't parse DRBD version from '%s'" %
1243                                     first_line)
1244
1245     values = version.groups()
1246     retval = {
1247       "k_major": int(values[0]),
1248       "k_minor": int(values[1]),
1249       "k_point": int(values[2]),
1250       "api": int(values[3]),
1251       "proto": int(values[4]),
1252       }
1253     if values[5] is not None:
1254       retval["proto2"] = values[5]
1255
1256     return retval
1257
1258   @staticmethod
1259   def GetUsermodeHelper(filename=_USERMODE_HELPER_FILE):
1260     """Returns DRBD usermode_helper currently set.
1261
1262     """
1263     try:
1264       helper = utils.ReadFile(filename).splitlines()[0]
1265     except EnvironmentError, err:
1266       if err.errno == errno.ENOENT:
1267         _ThrowError("The file %s cannot be opened, check if the module"
1268                     " is loaded (%s)", filename, str(err))
1269       else:
1270         _ThrowError("Can't read DRBD helper file %s: %s", filename, str(err))
1271     if not helper:
1272       _ThrowError("Can't read any data from %s", filename)
1273     return helper
1274
1275   @staticmethod
1276   def _DevPath(minor):
1277     """Return the path to a drbd device for a given minor.
1278
1279     """
1280     return "/dev/drbd%d" % minor
1281
1282   @classmethod
1283   def GetUsedDevs(cls):
1284     """Compute the list of used DRBD devices.
1285
1286     """
1287     data = cls._GetProcData()
1288
1289     used_devs = {}
1290     for line in data:
1291       match = cls._VALID_LINE_RE.match(line)
1292       if not match:
1293         continue
1294       minor = int(match.group(1))
1295       state = match.group(2)
1296       if state == cls._ST_UNCONFIGURED:
1297         continue
1298       used_devs[minor] = state, line
1299
1300     return used_devs
1301
1302   def _SetFromMinor(self, minor):
1303     """Set our parameters based on the given minor.
1304
1305     This sets our minor variable and our dev_path.
1306
1307     """
1308     if minor is None:
1309       self.minor = self.dev_path = None
1310       self.attached = False
1311     else:
1312       self.minor = minor
1313       self.dev_path = self._DevPath(minor)
1314       self.attached = True
1315
1316   @staticmethod
1317   def _CheckMetaSize(meta_device):
1318     """Check if the given meta device looks like a valid one.
1319
1320     This currently only checks the size, which must be around
1321     128MiB.
1322
1323     """
1324     result = utils.RunCmd(["blockdev", "--getsize", meta_device])
1325     if result.failed:
1326       _ThrowError("Failed to get device size: %s - %s",
1327                   result.fail_reason, result.output)
1328     try:
1329       sectors = int(result.stdout)
1330     except (TypeError, ValueError):
1331       _ThrowError("Invalid output from blockdev: '%s'", result.stdout)
1332     num_bytes = sectors * 512
1333     if num_bytes < 128 * 1024 * 1024: # less than 128MiB
1334       _ThrowError("Meta device too small (%.2fMib)", (num_bytes / 1024 / 1024))
1335     # the maximum *valid* size of the meta device when living on top
1336     # of LVM is hard to compute: it depends on the number of stripes
1337     # and the PE size; e.g. a 2-stripe, 64MB PE will result in a 128MB
1338     # (normal size), but an eight-stripe 128MB PE will result in a 1GB
1339     # size meta device; as such, we restrict it to 1GB (a little bit
1340     # too generous, but making assumptions about PE size is hard)
1341     if num_bytes > 1024 * 1024 * 1024:
1342       _ThrowError("Meta device too big (%.2fMiB)", (num_bytes / 1024 / 1024))
1343
1344   def Rename(self, new_id):
1345     """Rename a device.
1346
1347     This is not supported for drbd devices.
1348
1349     """
1350     raise errors.ProgrammerError("Can't rename a drbd device")
1351
1352
1353 class DRBD8(BaseDRBD):
1354   """DRBD v8.x block device.
1355
1356   This implements the local host part of the DRBD device, i.e. it
1357   doesn't do anything to the supposed peer. If you need a fully
1358   connected DRBD pair, you need to use this class on both hosts.
1359
1360   The unique_id for the drbd device is a (local_ip, local_port,
1361   remote_ip, remote_port, local_minor, secret) tuple, and it must have
1362   two children: the data device and the meta_device. The meta device
1363   is checked for valid size and is zeroed on create.
1364
1365   """
1366   _MAX_MINORS = 255
1367   _PARSE_SHOW = None
1368
1369   # timeout constants
1370   _NET_RECONFIG_TIMEOUT = 60
1371
1372   # command line options for barriers
1373   _DISABLE_DISK_OPTION = "--no-disk-barrier"  # -a
1374   _DISABLE_DRAIN_OPTION = "--no-disk-drain"   # -D
1375   _DISABLE_FLUSH_OPTION = "--no-disk-flushes" # -i
1376   _DISABLE_META_FLUSH_OPTION = "--no-md-flushes"  # -m
1377
1378   def __init__(self, unique_id, children, size, params):
1379     if children and children.count(None) > 0:
1380       children = []
1381     if len(children) not in (0, 2):
1382       raise ValueError("Invalid configuration data %s" % str(children))
1383     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 6:
1384       raise ValueError("Invalid configuration data %s" % str(unique_id))
1385     (self._lhost, self._lport,
1386      self._rhost, self._rport,
1387      self._aminor, self._secret) = unique_id
1388     if children:
1389       if not _CanReadDevice(children[1].dev_path):
1390         logging.info("drbd%s: Ignoring unreadable meta device", self._aminor)
1391         children = []
1392     super(DRBD8, self).__init__(unique_id, children, size, params)
1393     self.major = self._DRBD_MAJOR
1394     version = self._GetVersion(self._GetProcData())
1395     if version["k_major"] != 8:
1396       _ThrowError("Mismatch in DRBD kernel version and requested ganeti"
1397                   " usage: kernel is %s.%s, ganeti wants 8.x",
1398                   version["k_major"], version["k_minor"])
1399
1400     if (self._lhost is not None and self._lhost == self._rhost and
1401         self._lport == self._rport):
1402       raise ValueError("Invalid configuration data, same local/remote %s" %
1403                        (unique_id,))
1404     self.Attach()
1405
1406   @classmethod
1407   def _InitMeta(cls, minor, dev_path):
1408     """Initialize a meta device.
1409
1410     This will not work if the given minor is in use.
1411
1412     """
1413     # Zero the metadata first, in order to make sure drbdmeta doesn't
1414     # try to auto-detect existing filesystems or similar (see
1415     # http://code.google.com/p/ganeti/issues/detail?id=182); we only
1416     # care about the first 128MB of data in the device, even though it
1417     # can be bigger
1418     result = utils.RunCmd([constants.DD_CMD,
1419                            "if=/dev/zero", "of=%s" % dev_path,
1420                            "bs=1048576", "count=128", "oflag=direct"])
1421     if result.failed:
1422       _ThrowError("Can't wipe the meta device: %s", result.output)
1423
1424     result = utils.RunCmd(["drbdmeta", "--force", cls._DevPath(minor),
1425                            "v08", dev_path, "0", "create-md"])
1426     if result.failed:
1427       _ThrowError("Can't initialize meta device: %s", result.output)
1428
1429   @classmethod
1430   def _FindUnusedMinor(cls):
1431     """Find an unused DRBD device.
1432
1433     This is specific to 8.x as the minors are allocated dynamically,
1434     so non-existing numbers up to a max minor count are actually free.
1435
1436     """
1437     data = cls._GetProcData()
1438
1439     highest = None
1440     for line in data:
1441       match = cls._UNUSED_LINE_RE.match(line)
1442       if match:
1443         return int(match.group(1))
1444       match = cls._VALID_LINE_RE.match(line)
1445       if match:
1446         minor = int(match.group(1))
1447         highest = max(highest, minor)
1448     if highest is None: # there are no minors in use at all
1449       return 0
1450     if highest >= cls._MAX_MINORS:
1451       logging.error("Error: no free drbd minors!")
1452       raise errors.BlockDeviceError("Can't find a free DRBD minor")
1453     return highest + 1
1454
1455   @classmethod
1456   def _GetShowParser(cls):
1457     """Return a parser for `drbd show` output.
1458
1459     This will either create or return an already-created parser for the
1460     output of the command `drbd show`.
1461
1462     """
1463     if cls._PARSE_SHOW is not None:
1464       return cls._PARSE_SHOW
1465
1466     # pyparsing setup
1467     lbrace = pyp.Literal("{").suppress()
1468     rbrace = pyp.Literal("}").suppress()
1469     lbracket = pyp.Literal("[").suppress()
1470     rbracket = pyp.Literal("]").suppress()
1471     semi = pyp.Literal(";").suppress()
1472     colon = pyp.Literal(":").suppress()
1473     # this also converts the value to an int
1474     number = pyp.Word(pyp.nums).setParseAction(lambda s, l, t: int(t[0]))
1475
1476     comment = pyp.Literal("#") + pyp.Optional(pyp.restOfLine)
1477     defa = pyp.Literal("_is_default").suppress()
1478     dbl_quote = pyp.Literal('"').suppress()
1479
1480     keyword = pyp.Word(pyp.alphanums + "-")
1481
1482     # value types
1483     value = pyp.Word(pyp.alphanums + "_-/.:")
1484     quoted = dbl_quote + pyp.CharsNotIn('"') + dbl_quote
1485     ipv4_addr = (pyp.Optional(pyp.Literal("ipv4")).suppress() +
1486                  pyp.Word(pyp.nums + ".") + colon + number)
1487     ipv6_addr = (pyp.Optional(pyp.Literal("ipv6")).suppress() +
1488                  pyp.Optional(lbracket) + pyp.Word(pyp.hexnums + ":") +
1489                  pyp.Optional(rbracket) + colon + number)
1490     # meta device, extended syntax
1491     meta_value = ((value ^ quoted) + lbracket + number + rbracket)
1492     # device name, extended syntax
1493     device_value = pyp.Literal("minor").suppress() + number
1494
1495     # a statement
1496     stmt = (~rbrace + keyword + ~lbrace +
1497             pyp.Optional(ipv4_addr ^ ipv6_addr ^ value ^ quoted ^ meta_value ^
1498                          device_value) +
1499             pyp.Optional(defa) + semi +
1500             pyp.Optional(pyp.restOfLine).suppress())
1501
1502     # an entire section
1503     section_name = pyp.Word(pyp.alphas + "_")
1504     section = section_name + lbrace + pyp.ZeroOrMore(pyp.Group(stmt)) + rbrace
1505
1506     bnf = pyp.ZeroOrMore(pyp.Group(section ^ stmt))
1507     bnf.ignore(comment)
1508
1509     cls._PARSE_SHOW = bnf
1510
1511     return bnf
1512
1513   @classmethod
1514   def _GetShowData(cls, minor):
1515     """Return the `drbdsetup show` data for a minor.
1516
1517     """
1518     result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "show"])
1519     if result.failed:
1520       logging.error("Can't display the drbd config: %s - %s",
1521                     result.fail_reason, result.output)
1522       return None
1523     return result.stdout
1524
1525   @classmethod
1526   def _GetDevInfo(cls, out):
1527     """Parse details about a given DRBD minor.
1528
1529     This return, if available, the local backing device (as a path)
1530     and the local and remote (ip, port) information from a string
1531     containing the output of the `drbdsetup show` command as returned
1532     by _GetShowData.
1533
1534     """
1535     data = {}
1536     if not out:
1537       return data
1538
1539     bnf = cls._GetShowParser()
1540     # run pyparse
1541
1542     try:
1543       results = bnf.parseString(out)
1544     except pyp.ParseException, err:
1545       _ThrowError("Can't parse drbdsetup show output: %s", str(err))
1546
1547     # and massage the results into our desired format
1548     for section in results:
1549       sname = section[0]
1550       if sname == "_this_host":
1551         for lst in section[1:]:
1552           if lst[0] == "disk":
1553             data["local_dev"] = lst[1]
1554           elif lst[0] == "meta-disk":
1555             data["meta_dev"] = lst[1]
1556             data["meta_index"] = lst[2]
1557           elif lst[0] == "address":
1558             data["local_addr"] = tuple(lst[1:])
1559       elif sname == "_remote_host":
1560         for lst in section[1:]:
1561           if lst[0] == "address":
1562             data["remote_addr"] = tuple(lst[1:])
1563     return data
1564
1565   def _MatchesLocal(self, info):
1566     """Test if our local config matches with an existing device.
1567
1568     The parameter should be as returned from `_GetDevInfo()`. This
1569     method tests if our local backing device is the same as the one in
1570     the info parameter, in effect testing if we look like the given
1571     device.
1572
1573     """
1574     if self._children:
1575       backend, meta = self._children
1576     else:
1577       backend = meta = None
1578
1579     if backend is not None:
1580       retval = ("local_dev" in info and info["local_dev"] == backend.dev_path)
1581     else:
1582       retval = ("local_dev" not in info)
1583
1584     if meta is not None:
1585       retval = retval and ("meta_dev" in info and
1586                            info["meta_dev"] == meta.dev_path)
1587       retval = retval and ("meta_index" in info and
1588                            info["meta_index"] == 0)
1589     else:
1590       retval = retval and ("meta_dev" not in info and
1591                            "meta_index" not in info)
1592     return retval
1593
1594   def _MatchesNet(self, info):
1595     """Test if our network config matches with an existing device.
1596
1597     The parameter should be as returned from `_GetDevInfo()`. This
1598     method tests if our network configuration is the same as the one
1599     in the info parameter, in effect testing if we look like the given
1600     device.
1601
1602     """
1603     if (((self._lhost is None and not ("local_addr" in info)) and
1604          (self._rhost is None and not ("remote_addr" in info)))):
1605       return True
1606
1607     if self._lhost is None:
1608       return False
1609
1610     if not ("local_addr" in info and
1611             "remote_addr" in info):
1612       return False
1613
1614     retval = (info["local_addr"] == (self._lhost, self._lport))
1615     retval = (retval and
1616               info["remote_addr"] == (self._rhost, self._rport))
1617     return retval
1618
1619   def _AssembleLocal(self, minor, backend, meta, size):
1620     """Configure the local part of a DRBD device.
1621
1622     """
1623     args = ["drbdsetup", self._DevPath(minor), "disk",
1624             backend, meta, "0",
1625             "-e", "detach",
1626             "--create-device"]
1627     if size:
1628       args.extend(["-d", "%sm" % size])
1629
1630     version = self._GetVersion(self._GetProcData())
1631     vmaj = version["k_major"]
1632     vmin = version["k_minor"]
1633     vrel = version["k_point"]
1634
1635     barrier_args = \
1636       self._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
1637                                    self.params[constants.LDP_BARRIERS],
1638                                    self.params[constants.LDP_NO_META_FLUSH])
1639     args.extend(barrier_args)
1640
1641     if self.params[constants.LDP_DISK_CUSTOM]:
1642       args.extend(shlex.split(self.params[constants.LDP_DISK_CUSTOM]))
1643
1644     result = utils.RunCmd(args)
1645     if result.failed:
1646       _ThrowError("drbd%d: can't attach local disk: %s", minor, result.output)
1647
1648   @classmethod
1649   def _ComputeDiskBarrierArgs(cls, vmaj, vmin, vrel, disabled_barriers,
1650                               disable_meta_flush):
1651     """Compute the DRBD command line parameters for disk barriers
1652
1653     Returns a list of the disk barrier parameters as requested via the
1654     disabled_barriers and disable_meta_flush arguments, and according to the
1655     supported ones in the DRBD version vmaj.vmin.vrel
1656
1657     If the desired option is unsupported, raises errors.BlockDeviceError.
1658
1659     """
1660     disabled_barriers_set = frozenset(disabled_barriers)
1661     if not disabled_barriers_set in constants.DRBD_VALID_BARRIER_OPT:
1662       raise errors.BlockDeviceError("%s is not a valid option set for DRBD"
1663                                     " barriers" % disabled_barriers)
1664
1665     args = []
1666
1667     # The following code assumes DRBD 8.x, with x < 4 and x != 1 (DRBD 8.1.x
1668     # does not exist)
1669     if not vmaj == 8 and vmin in (0, 2, 3):
1670       raise errors.BlockDeviceError("Unsupported DRBD version: %d.%d.%d" %
1671                                     (vmaj, vmin, vrel))
1672
1673     def _AppendOrRaise(option, min_version):
1674       """Helper for DRBD options"""
1675       if min_version is not None and vrel >= min_version:
1676         args.append(option)
1677       else:
1678         raise errors.BlockDeviceError("Could not use the option %s as the"
1679                                       " DRBD version %d.%d.%d does not support"
1680                                       " it." % (option, vmaj, vmin, vrel))
1681
1682     # the minimum version for each feature is encoded via pairs of (minor
1683     # version -> x) where x is version in which support for the option was
1684     # introduced.
1685     meta_flush_supported = disk_flush_supported = {
1686       0: 12,
1687       2: 7,
1688       3: 0,
1689       }
1690
1691     disk_drain_supported = {
1692       2: 7,
1693       3: 0,
1694       }
1695
1696     disk_barriers_supported = {
1697       3: 0,
1698       }
1699
1700     # meta flushes
1701     if disable_meta_flush:
1702       _AppendOrRaise(cls._DISABLE_META_FLUSH_OPTION,
1703                      meta_flush_supported.get(vmin, None))
1704
1705     # disk flushes
1706     if constants.DRBD_B_DISK_FLUSH in disabled_barriers_set:
1707       _AppendOrRaise(cls._DISABLE_FLUSH_OPTION,
1708                      disk_flush_supported.get(vmin, None))
1709
1710     # disk drain
1711     if constants.DRBD_B_DISK_DRAIN in disabled_barriers_set:
1712       _AppendOrRaise(cls._DISABLE_DRAIN_OPTION,
1713                      disk_drain_supported.get(vmin, None))
1714
1715     # disk barriers
1716     if constants.DRBD_B_DISK_BARRIERS in disabled_barriers_set:
1717       _AppendOrRaise(cls._DISABLE_DISK_OPTION,
1718                      disk_barriers_supported.get(vmin, None))
1719
1720     return args
1721
1722   def _AssembleNet(self, minor, net_info, protocol,
1723                    dual_pri=False, hmac=None, secret=None):
1724     """Configure the network part of the device.
1725
1726     """
1727     lhost, lport, rhost, rport = net_info
1728     if None in net_info:
1729       # we don't want network connection and actually want to make
1730       # sure its shutdown
1731       self._ShutdownNet(minor)
1732       return
1733
1734     # Workaround for a race condition. When DRBD is doing its dance to
1735     # establish a connection with its peer, it also sends the
1736     # synchronization speed over the wire. In some cases setting the
1737     # sync speed only after setting up both sides can race with DRBD
1738     # connecting, hence we set it here before telling DRBD anything
1739     # about its peer.
1740     sync_errors = self._SetMinorSyncParams(minor, self.params)
1741     if sync_errors:
1742       _ThrowError("drbd%d: can't set the synchronization parameters: %s" %
1743                   (minor, utils.CommaJoin(sync_errors)))
1744
1745     if netutils.IP6Address.IsValid(lhost):
1746       if not netutils.IP6Address.IsValid(rhost):
1747         _ThrowError("drbd%d: can't connect ip %s to ip %s" %
1748                     (minor, lhost, rhost))
1749       family = "ipv6"
1750     elif netutils.IP4Address.IsValid(lhost):
1751       if not netutils.IP4Address.IsValid(rhost):
1752         _ThrowError("drbd%d: can't connect ip %s to ip %s" %
1753                     (minor, lhost, rhost))
1754       family = "ipv4"
1755     else:
1756       _ThrowError("drbd%d: Invalid ip %s" % (minor, lhost))
1757
1758     args = ["drbdsetup", self._DevPath(minor), "net",
1759             "%s:%s:%s" % (family, lhost, lport),
1760             "%s:%s:%s" % (family, rhost, rport), protocol,
1761             "-A", "discard-zero-changes",
1762             "-B", "consensus",
1763             "--create-device",
1764             ]
1765     if dual_pri:
1766       args.append("-m")
1767     if hmac and secret:
1768       args.extend(["-a", hmac, "-x", secret])
1769
1770     if self.params[constants.LDP_NET_CUSTOM]:
1771       args.extend(shlex.split(self.params[constants.LDP_NET_CUSTOM]))
1772
1773     result = utils.RunCmd(args)
1774     if result.failed:
1775       _ThrowError("drbd%d: can't setup network: %s - %s",
1776                   minor, result.fail_reason, result.output)
1777
1778     def _CheckNetworkConfig():
1779       info = self._GetDevInfo(self._GetShowData(minor))
1780       if not "local_addr" in info or not "remote_addr" in info:
1781         raise utils.RetryAgain()
1782
1783       if (info["local_addr"] != (lhost, lport) or
1784           info["remote_addr"] != (rhost, rport)):
1785         raise utils.RetryAgain()
1786
1787     try:
1788       utils.Retry(_CheckNetworkConfig, 1.0, 10.0)
1789     except utils.RetryTimeout:
1790       _ThrowError("drbd%d: timeout while configuring network", minor)
1791
1792   def AddChildren(self, devices):
1793     """Add a disk to the DRBD device.
1794
1795     """
1796     if self.minor is None:
1797       _ThrowError("drbd%d: can't attach to dbrd8 during AddChildren",
1798                   self._aminor)
1799     if len(devices) != 2:
1800       _ThrowError("drbd%d: need two devices for AddChildren", self.minor)
1801     info = self._GetDevInfo(self._GetShowData(self.minor))
1802     if "local_dev" in info:
1803       _ThrowError("drbd%d: already attached to a local disk", self.minor)
1804     backend, meta = devices
1805     if backend.dev_path is None or meta.dev_path is None:
1806       _ThrowError("drbd%d: children not ready during AddChildren", self.minor)
1807     backend.Open()
1808     meta.Open()
1809     self._CheckMetaSize(meta.dev_path)
1810     self._InitMeta(self._FindUnusedMinor(), meta.dev_path)
1811
1812     self._AssembleLocal(self.minor, backend.dev_path, meta.dev_path, self.size)
1813     self._children = devices
1814
1815   def RemoveChildren(self, devices):
1816     """Detach the drbd device from local storage.
1817
1818     """
1819     if self.minor is None:
1820       _ThrowError("drbd%d: can't attach to drbd8 during RemoveChildren",
1821                   self._aminor)
1822     # early return if we don't actually have backing storage
1823     info = self._GetDevInfo(self._GetShowData(self.minor))
1824     if "local_dev" not in info:
1825       return
1826     if len(self._children) != 2:
1827       _ThrowError("drbd%d: we don't have two children: %s", self.minor,
1828                   self._children)
1829     if self._children.count(None) == 2: # we don't actually have children :)
1830       logging.warning("drbd%d: requested detach while detached", self.minor)
1831       return
1832     if len(devices) != 2:
1833       _ThrowError("drbd%d: we need two children in RemoveChildren", self.minor)
1834     for child, dev in zip(self._children, devices):
1835       if dev != child.dev_path:
1836         _ThrowError("drbd%d: mismatch in local storage (%s != %s) in"
1837                     " RemoveChildren", self.minor, dev, child.dev_path)
1838
1839     self._ShutdownLocal(self.minor)
1840     self._children = []
1841
1842   @classmethod
1843   def _SetMinorSyncParams(cls, minor, params):
1844     """Set the parameters of the DRBD syncer.
1845
1846     This is the low-level implementation.
1847
1848     @type minor: int
1849     @param minor: the drbd minor whose settings we change
1850     @type params: dict
1851     @param params: LD level disk parameters related to the synchronization
1852     @rtype: list
1853     @return: a list of error messages
1854
1855     """
1856
1857     args = ["drbdsetup", cls._DevPath(minor), "syncer"]
1858     if params[constants.LDP_DYNAMIC_RESYNC]:
1859       version = cls._GetVersion(cls._GetProcData())
1860       vmin = version["k_minor"]
1861       vrel = version["k_point"]
1862
1863       # By definition we are using 8.x, so just check the rest of the version
1864       # number
1865       if vmin != 3 or vrel < 9:
1866         msg = ("The current DRBD version (8.%d.%d) does not support the "
1867                "dynamic resync speed controller" % (vmin, vrel))
1868         logging.error(msg)
1869         return [msg]
1870
1871       if params[constants.LDP_PLAN_AHEAD] == 0:
1872         msg = ("A value of 0 for c-plan-ahead disables the dynamic sync speed"
1873                " controller at DRBD level. If you want to disable it, please"
1874                " set the dynamic-resync disk parameter to False.")
1875         logging.error(msg)
1876         return [msg]
1877
1878       # add the c-* parameters to args
1879       args.extend(["--c-plan-ahead", params[constants.LDP_PLAN_AHEAD],
1880                    "--c-fill-target", params[constants.LDP_FILL_TARGET],
1881                    "--c-delay-target", params[constants.LDP_DELAY_TARGET],
1882                    "--c-max-rate", params[constants.LDP_MAX_RATE],
1883                    "--c-min-rate", params[constants.LDP_MIN_RATE],
1884                    ])
1885
1886     else:
1887       args.extend(["-r", "%d" % params[constants.LDP_RESYNC_RATE]])
1888
1889     args.append("--create-device")
1890     result = utils.RunCmd(args)
1891     if result.failed:
1892       msg = ("Can't change syncer rate: %s - %s" %
1893              (result.fail_reason, result.output))
1894       logging.error(msg)
1895       return [msg]
1896
1897     return []
1898
1899   def SetSyncParams(self, params):
1900     """Set the synchronization parameters of the DRBD syncer.
1901
1902     @type params: dict
1903     @param params: LD level disk parameters related to the synchronization
1904     @rtype: list
1905     @return: a list of error messages, emitted both by the current node and by
1906     children. An empty list means no errors
1907
1908     """
1909     if self.minor is None:
1910       err = "Not attached during SetSyncParams"
1911       logging.info(err)
1912       return [err]
1913
1914     children_result = super(DRBD8, self).SetSyncParams(params)
1915     children_result.extend(self._SetMinorSyncParams(self.minor, params))
1916     return children_result
1917
1918   def PauseResumeSync(self, pause):
1919     """Pauses or resumes the sync of a DRBD device.
1920
1921     @param pause: Wether to pause or resume
1922     @return: the success of the operation
1923
1924     """
1925     if self.minor is None:
1926       logging.info("Not attached during PauseSync")
1927       return False
1928
1929     children_result = super(DRBD8, self).PauseResumeSync(pause)
1930
1931     if pause:
1932       cmd = "pause-sync"
1933     else:
1934       cmd = "resume-sync"
1935
1936     result = utils.RunCmd(["drbdsetup", self.dev_path, cmd])
1937     if result.failed:
1938       logging.error("Can't %s: %s - %s", cmd,
1939                     result.fail_reason, result.output)
1940     return not result.failed and children_result
1941
1942   def GetProcStatus(self):
1943     """Return device data from /proc.
1944
1945     """
1946     if self.minor is None:
1947       _ThrowError("drbd%d: GetStats() called while not attached", self._aminor)
1948     proc_info = self._MassageProcData(self._GetProcData())
1949     if self.minor not in proc_info:
1950       _ThrowError("drbd%d: can't find myself in /proc", self.minor)
1951     return DRBD8Status(proc_info[self.minor])
1952
1953   def GetSyncStatus(self):
1954     """Returns the sync status of the device.
1955
1956
1957     If sync_percent is None, it means all is ok
1958     If estimated_time is None, it means we can't estimate
1959     the time needed, otherwise it's the time left in seconds.
1960
1961
1962     We set the is_degraded parameter to True on two conditions:
1963     network not connected or local disk missing.
1964
1965     We compute the ldisk parameter based on whether we have a local
1966     disk or not.
1967
1968     @rtype: objects.BlockDevStatus
1969
1970     """
1971     if self.minor is None and not self.Attach():
1972       _ThrowError("drbd%d: can't Attach() in GetSyncStatus", self._aminor)
1973
1974     stats = self.GetProcStatus()
1975     is_degraded = not stats.is_connected or not stats.is_disk_uptodate
1976
1977     if stats.is_disk_uptodate:
1978       ldisk_status = constants.LDS_OKAY
1979     elif stats.is_diskless:
1980       ldisk_status = constants.LDS_FAULTY
1981     else:
1982       ldisk_status = constants.LDS_UNKNOWN
1983
1984     return objects.BlockDevStatus(dev_path=self.dev_path,
1985                                   major=self.major,
1986                                   minor=self.minor,
1987                                   sync_percent=stats.sync_percent,
1988                                   estimated_time=stats.est_time,
1989                                   is_degraded=is_degraded,
1990                                   ldisk_status=ldisk_status)
1991
1992   def Open(self, force=False):
1993     """Make the local state primary.
1994
1995     If the 'force' parameter is given, the '-o' option is passed to
1996     drbdsetup. Since this is a potentially dangerous operation, the
1997     force flag should be only given after creation, when it actually
1998     is mandatory.
1999
2000     """
2001     if self.minor is None and not self.Attach():
2002       logging.error("DRBD cannot attach to a device during open")
2003       return False
2004     cmd = ["drbdsetup", self.dev_path, "primary"]
2005     if force:
2006       cmd.append("-o")
2007     result = utils.RunCmd(cmd)
2008     if result.failed:
2009       _ThrowError("drbd%d: can't make drbd device primary: %s", self.minor,
2010                   result.output)
2011
2012   def Close(self):
2013     """Make the local state secondary.
2014
2015     This will, of course, fail if the device is in use.
2016
2017     """
2018     if self.minor is None and not self.Attach():
2019       _ThrowError("drbd%d: can't Attach() in Close()", self._aminor)
2020     result = utils.RunCmd(["drbdsetup", self.dev_path, "secondary"])
2021     if result.failed:
2022       _ThrowError("drbd%d: can't switch drbd device to secondary: %s",
2023                   self.minor, result.output)
2024
2025   def DisconnectNet(self):
2026     """Removes network configuration.
2027
2028     This method shutdowns the network side of the device.
2029
2030     The method will wait up to a hardcoded timeout for the device to
2031     go into standalone after the 'disconnect' command before
2032     re-configuring it, as sometimes it takes a while for the
2033     disconnect to actually propagate and thus we might issue a 'net'
2034     command while the device is still connected. If the device will
2035     still be attached to the network and we time out, we raise an
2036     exception.
2037
2038     """
2039     if self.minor is None:
2040       _ThrowError("drbd%d: disk not attached in re-attach net", self._aminor)
2041
2042     if None in (self._lhost, self._lport, self._rhost, self._rport):
2043       _ThrowError("drbd%d: DRBD disk missing network info in"
2044                   " DisconnectNet()", self.minor)
2045
2046     class _DisconnectStatus:
2047       def __init__(self, ever_disconnected):
2048         self.ever_disconnected = ever_disconnected
2049
2050     dstatus = _DisconnectStatus(_IgnoreError(self._ShutdownNet, self.minor))
2051
2052     def _WaitForDisconnect():
2053       if self.GetProcStatus().is_standalone:
2054         return
2055
2056       # retry the disconnect, it seems possible that due to a well-time
2057       # disconnect on the peer, my disconnect command might be ignored and
2058       # forgotten
2059       dstatus.ever_disconnected = \
2060         _IgnoreError(self._ShutdownNet, self.minor) or dstatus.ever_disconnected
2061
2062       raise utils.RetryAgain()
2063
2064     # Keep start time
2065     start_time = time.time()
2066
2067     try:
2068       # Start delay at 100 milliseconds and grow up to 2 seconds
2069       utils.Retry(_WaitForDisconnect, (0.1, 1.5, 2.0),
2070                   self._NET_RECONFIG_TIMEOUT)
2071     except utils.RetryTimeout:
2072       if dstatus.ever_disconnected:
2073         msg = ("drbd%d: device did not react to the"
2074                " 'disconnect' command in a timely manner")
2075       else:
2076         msg = "drbd%d: can't shutdown network, even after multiple retries"
2077
2078       _ThrowError(msg, self.minor)
2079
2080     reconfig_time = time.time() - start_time
2081     if reconfig_time > (self._NET_RECONFIG_TIMEOUT * 0.25):
2082       logging.info("drbd%d: DisconnectNet: detach took %.3f seconds",
2083                    self.minor, reconfig_time)
2084
2085   def AttachNet(self, multimaster):
2086     """Reconnects the network.
2087
2088     This method connects the network side of the device with a
2089     specified multi-master flag. The device needs to be 'Standalone'
2090     but have valid network configuration data.
2091
2092     Args:
2093       - multimaster: init the network in dual-primary mode
2094
2095     """
2096     if self.minor is None:
2097       _ThrowError("drbd%d: device not attached in AttachNet", self._aminor)
2098
2099     if None in (self._lhost, self._lport, self._rhost, self._rport):
2100       _ThrowError("drbd%d: missing network info in AttachNet()", self.minor)
2101
2102     status = self.GetProcStatus()
2103
2104     if not status.is_standalone:
2105       _ThrowError("drbd%d: device is not standalone in AttachNet", self.minor)
2106
2107     self._AssembleNet(self.minor,
2108                       (self._lhost, self._lport, self._rhost, self._rport),
2109                       constants.DRBD_NET_PROTOCOL, dual_pri=multimaster,
2110                       hmac=constants.DRBD_HMAC_ALG, secret=self._secret)
2111
2112   def Attach(self):
2113     """Check if our minor is configured.
2114
2115     This doesn't do any device configurations - it only checks if the
2116     minor is in a state different from Unconfigured.
2117
2118     Note that this function will not change the state of the system in
2119     any way (except in case of side-effects caused by reading from
2120     /proc).
2121
2122     """
2123     used_devs = self.GetUsedDevs()
2124     if self._aminor in used_devs:
2125       minor = self._aminor
2126     else:
2127       minor = None
2128
2129     self._SetFromMinor(minor)
2130     return minor is not None
2131
2132   def Assemble(self):
2133     """Assemble the drbd.
2134
2135     Method:
2136       - if we have a configured device, we try to ensure that it matches
2137         our config
2138       - if not, we create it from zero
2139       - anyway, set the device parameters
2140
2141     """
2142     super(DRBD8, self).Assemble()
2143
2144     self.Attach()
2145     if self.minor is None:
2146       # local device completely unconfigured
2147       self._FastAssemble()
2148     else:
2149       # we have to recheck the local and network status and try to fix
2150       # the device
2151       self._SlowAssemble()
2152
2153     sync_errors = self.SetSyncParams(self.params)
2154     if sync_errors:
2155       _ThrowError("drbd%d: can't set the synchronization parameters: %s" %
2156                   (self.minor, utils.CommaJoin(sync_errors)))
2157
2158   def _SlowAssemble(self):
2159     """Assembles the DRBD device from a (partially) configured device.
2160
2161     In case of partially attached (local device matches but no network
2162     setup), we perform the network attach. If successful, we re-test
2163     the attach if can return success.
2164
2165     """
2166     # TODO: Rewrite to not use a for loop just because there is 'break'
2167     # pylint: disable=W0631
2168     net_data = (self._lhost, self._lport, self._rhost, self._rport)
2169     for minor in (self._aminor,):
2170       info = self._GetDevInfo(self._GetShowData(minor))
2171       match_l = self._MatchesLocal(info)
2172       match_r = self._MatchesNet(info)
2173
2174       if match_l and match_r:
2175         # everything matches
2176         break
2177
2178       if match_l and not match_r and "local_addr" not in info:
2179         # disk matches, but not attached to network, attach and recheck
2180         self._AssembleNet(minor, net_data, constants.DRBD_NET_PROTOCOL,
2181                           hmac=constants.DRBD_HMAC_ALG, secret=self._secret)
2182         if self._MatchesNet(self._GetDevInfo(self._GetShowData(minor))):
2183           break
2184         else:
2185           _ThrowError("drbd%d: network attach successful, but 'drbdsetup"
2186                       " show' disagrees", minor)
2187
2188       if match_r and "local_dev" not in info:
2189         # no local disk, but network attached and it matches
2190         self._AssembleLocal(minor, self._children[0].dev_path,
2191                             self._children[1].dev_path, self.size)
2192         if self._MatchesNet(self._GetDevInfo(self._GetShowData(minor))):
2193           break
2194         else:
2195           _ThrowError("drbd%d: disk attach successful, but 'drbdsetup"
2196                       " show' disagrees", minor)
2197
2198       # this case must be considered only if we actually have local
2199       # storage, i.e. not in diskless mode, because all diskless
2200       # devices are equal from the point of view of local
2201       # configuration
2202       if (match_l and "local_dev" in info and
2203           not match_r and "local_addr" in info):
2204         # strange case - the device network part points to somewhere
2205         # else, even though its local storage is ours; as we own the
2206         # drbd space, we try to disconnect from the remote peer and
2207         # reconnect to our correct one
2208         try:
2209           self._ShutdownNet(minor)
2210         except errors.BlockDeviceError, err:
2211           _ThrowError("drbd%d: device has correct local storage, wrong"
2212                       " remote peer and is unable to disconnect in order"
2213                       " to attach to the correct peer: %s", minor, str(err))
2214         # note: _AssembleNet also handles the case when we don't want
2215         # local storage (i.e. one or more of the _[lr](host|port) is
2216         # None)
2217         self._AssembleNet(minor, net_data, constants.DRBD_NET_PROTOCOL,
2218                           hmac=constants.DRBD_HMAC_ALG, secret=self._secret)
2219         if self._MatchesNet(self._GetDevInfo(self._GetShowData(minor))):
2220           break
2221         else:
2222           _ThrowError("drbd%d: network attach successful, but 'drbdsetup"
2223                       " show' disagrees", minor)
2224
2225     else:
2226       minor = None
2227
2228     self._SetFromMinor(minor)
2229     if minor is None:
2230       _ThrowError("drbd%d: cannot activate, unknown or unhandled reason",
2231                   self._aminor)
2232
2233   def _FastAssemble(self):
2234     """Assemble the drbd device from zero.
2235
2236     This is run when in Assemble we detect our minor is unused.
2237
2238     """
2239     minor = self._aminor
2240     if self._children and self._children[0] and self._children[1]:
2241       self._AssembleLocal(minor, self._children[0].dev_path,
2242                           self._children[1].dev_path, self.size)
2243     if self._lhost and self._lport and self._rhost and self._rport:
2244       self._AssembleNet(minor,
2245                         (self._lhost, self._lport, self._rhost, self._rport),
2246                         constants.DRBD_NET_PROTOCOL,
2247                         hmac=constants.DRBD_HMAC_ALG, secret=self._secret)
2248     self._SetFromMinor(minor)
2249
2250   @classmethod
2251   def _ShutdownLocal(cls, minor):
2252     """Detach from the local device.
2253
2254     I/Os will continue to be served from the remote device. If we
2255     don't have a remote device, this operation will fail.
2256
2257     """
2258     result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "detach"])
2259     if result.failed:
2260       _ThrowError("drbd%d: can't detach local disk: %s", minor, result.output)
2261
2262   @classmethod
2263   def _ShutdownNet(cls, minor):
2264     """Disconnect from the remote peer.
2265
2266     This fails if we don't have a local device.
2267
2268     """
2269     result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "disconnect"])
2270     if result.failed:
2271       _ThrowError("drbd%d: can't shutdown network: %s", minor, result.output)
2272
2273   @classmethod
2274   def _ShutdownAll(cls, minor):
2275     """Deactivate the device.
2276
2277     This will, of course, fail if the device is in use.
2278
2279     """
2280     result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "down"])
2281     if result.failed:
2282       _ThrowError("drbd%d: can't shutdown drbd device: %s",
2283                   minor, result.output)
2284
2285   def Shutdown(self):
2286     """Shutdown the DRBD device.
2287
2288     """
2289     if self.minor is None and not self.Attach():
2290       logging.info("drbd%d: not attached during Shutdown()", self._aminor)
2291       return
2292     minor = self.minor
2293     self.minor = None
2294     self.dev_path = None
2295     self._ShutdownAll(minor)
2296
2297   def Remove(self):
2298     """Stub remove for DRBD devices.
2299
2300     """
2301     self.Shutdown()
2302
2303   @classmethod
2304   def Create(cls, unique_id, children, size, params, excl_stor):
2305     """Create a new DRBD8 device.
2306
2307     Since DRBD devices are not created per se, just assembled, this
2308     function only initializes the metadata.
2309
2310     """
2311     if len(children) != 2:
2312       raise errors.ProgrammerError("Invalid setup for the drbd device")
2313     if excl_stor:
2314       raise errors.ProgrammerError("DRBD device requested with"
2315                                    " exclusive_storage")
2316     # check that the minor is unused
2317     aminor = unique_id[4]
2318     proc_info = cls._MassageProcData(cls._GetProcData())
2319     if aminor in proc_info:
2320       status = DRBD8Status(proc_info[aminor])
2321       in_use = status.is_in_use
2322     else:
2323       in_use = False
2324     if in_use:
2325       _ThrowError("drbd%d: minor is already in use at Create() time", aminor)
2326     meta = children[1]
2327     meta.Assemble()
2328     if not meta.Attach():
2329       _ThrowError("drbd%d: can't attach to meta device '%s'",
2330                   aminor, meta)
2331     cls._CheckMetaSize(meta.dev_path)
2332     cls._InitMeta(aminor, meta.dev_path)
2333     return cls(unique_id, children, size, params)
2334
2335   def Grow(self, amount, dryrun, backingstore):
2336     """Resize the DRBD device and its backing storage.
2337
2338     """
2339     if self.minor is None:
2340       _ThrowError("drbd%d: Grow called while not attached", self._aminor)
2341     if len(self._children) != 2 or None in self._children:
2342       _ThrowError("drbd%d: cannot grow diskless device", self.minor)
2343     self._children[0].Grow(amount, dryrun, backingstore)
2344     if dryrun or backingstore:
2345       # DRBD does not support dry-run mode and is not backing storage,
2346       # so we'll return here
2347       return
2348     result = utils.RunCmd(["drbdsetup", self.dev_path, "resize", "-s",
2349                            "%dm" % (self.size + amount)])
2350     if result.failed:
2351       _ThrowError("drbd%d: resize failed: %s", self.minor, result.output)
2352
2353
2354 class FileStorage(BlockDev):
2355   """File device.
2356
2357   This class represents a file storage backend device.
2358
2359   The unique_id for the file device is a (file_driver, file_path) tuple.
2360
2361   """
2362   def __init__(self, unique_id, children, size, params):
2363     """Initalizes a file device backend.
2364
2365     """
2366     if children:
2367       raise errors.BlockDeviceError("Invalid setup for file device")
2368     super(FileStorage, self).__init__(unique_id, children, size, params)
2369     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2370       raise ValueError("Invalid configuration data %s" % str(unique_id))
2371     self.driver = unique_id[0]
2372     self.dev_path = unique_id[1]
2373
2374     CheckFileStoragePath(self.dev_path)
2375
2376     self.Attach()
2377
2378   def Assemble(self):
2379     """Assemble the device.
2380
2381     Checks whether the file device exists, raises BlockDeviceError otherwise.
2382
2383     """
2384     if not os.path.exists(self.dev_path):
2385       _ThrowError("File device '%s' does not exist" % self.dev_path)
2386
2387   def Shutdown(self):
2388     """Shutdown the device.
2389
2390     This is a no-op for the file type, as we don't deactivate
2391     the file on shutdown.
2392
2393     """
2394     pass
2395
2396   def Open(self, force=False):
2397     """Make the device ready for I/O.
2398
2399     This is a no-op for the file type.
2400
2401     """
2402     pass
2403
2404   def Close(self):
2405     """Notifies that the device will no longer be used for I/O.
2406
2407     This is a no-op for the file type.
2408
2409     """
2410     pass
2411
2412   def Remove(self):
2413     """Remove the file backing the block device.
2414
2415     @rtype: boolean
2416     @return: True if the removal was successful
2417
2418     """
2419     try:
2420       os.remove(self.dev_path)
2421     except OSError, err:
2422       if err.errno != errno.ENOENT:
2423         _ThrowError("Can't remove file '%s': %s", self.dev_path, err)
2424
2425   def Rename(self, new_id):
2426     """Renames the file.
2427
2428     """
2429     # TODO: implement rename for file-based storage
2430     _ThrowError("Rename is not supported for file-based storage")
2431
2432   def Grow(self, amount, dryrun, backingstore):
2433     """Grow the file
2434
2435     @param amount: the amount (in mebibytes) to grow with
2436
2437     """
2438     if not backingstore:
2439       return
2440     # Check that the file exists
2441     self.Assemble()
2442     current_size = self.GetActualSize()
2443     new_size = current_size + amount * 1024 * 1024
2444     assert new_size > current_size, "Cannot Grow with a negative amount"
2445     # We can't really simulate the growth
2446     if dryrun:
2447       return
2448     try:
2449       f = open(self.dev_path, "a+")
2450       f.truncate(new_size)
2451       f.close()
2452     except EnvironmentError, err:
2453       _ThrowError("Error in file growth: %", str(err))
2454
2455   def Attach(self):
2456     """Attach to an existing file.
2457
2458     Check if this file already exists.
2459
2460     @rtype: boolean
2461     @return: True if file exists
2462
2463     """
2464     self.attached = os.path.exists(self.dev_path)
2465     return self.attached
2466
2467   def GetActualSize(self):
2468     """Return the actual disk size.
2469
2470     @note: the device needs to be active when this is called
2471
2472     """
2473     assert self.attached, "BlockDevice not attached in GetActualSize()"
2474     try:
2475       st = os.stat(self.dev_path)
2476       return st.st_size
2477     except OSError, err:
2478       _ThrowError("Can't stat %s: %s", self.dev_path, err)
2479
2480   @classmethod
2481   def Create(cls, unique_id, children, size, params, excl_stor):
2482     """Create a new file.
2483
2484     @param size: the size of file in MiB
2485
2486     @rtype: L{bdev.FileStorage}
2487     @return: an instance of FileStorage
2488
2489     """
2490     if excl_stor:
2491       raise errors.ProgrammerError("FileStorage device requested with"
2492                                    " exclusive_storage")
2493     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2494       raise ValueError("Invalid configuration data %s" % str(unique_id))
2495
2496     dev_path = unique_id[1]
2497
2498     CheckFileStoragePath(dev_path)
2499
2500     try:
2501       fd = os.open(dev_path, os.O_RDWR | os.O_CREAT | os.O_EXCL)
2502       f = os.fdopen(fd, "w")
2503       f.truncate(size * 1024 * 1024)
2504       f.close()
2505     except EnvironmentError, err:
2506       if err.errno == errno.EEXIST:
2507         _ThrowError("File already existing: %s", dev_path)
2508       _ThrowError("Error in file creation: %", str(err))
2509
2510     return FileStorage(unique_id, children, size, params)
2511
2512
2513 class PersistentBlockDevice(BlockDev):
2514   """A block device with persistent node
2515
2516   May be either directly attached, or exposed through DM (e.g. dm-multipath).
2517   udev helpers are probably required to give persistent, human-friendly
2518   names.
2519
2520   For the time being, pathnames are required to lie under /dev.
2521
2522   """
2523   def __init__(self, unique_id, children, size, params):
2524     """Attaches to a static block device.
2525
2526     The unique_id is a path under /dev.
2527
2528     """
2529     super(PersistentBlockDevice, self).__init__(unique_id, children, size,
2530                                                 params)
2531     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2532       raise ValueError("Invalid configuration data %s" % str(unique_id))
2533     self.dev_path = unique_id[1]
2534     if not os.path.realpath(self.dev_path).startswith("/dev/"):
2535       raise ValueError("Full path '%s' lies outside /dev" %
2536                               os.path.realpath(self.dev_path))
2537     # TODO: this is just a safety guard checking that we only deal with devices
2538     # we know how to handle. In the future this will be integrated with
2539     # external storage backends and possible values will probably be collected
2540     # from the cluster configuration.
2541     if unique_id[0] != constants.BLOCKDEV_DRIVER_MANUAL:
2542       raise ValueError("Got persistent block device of invalid type: %s" %
2543                        unique_id[0])
2544
2545     self.major = self.minor = None
2546     self.Attach()
2547
2548   @classmethod
2549   def Create(cls, unique_id, children, size, params, excl_stor):
2550     """Create a new device
2551
2552     This is a noop, we only return a PersistentBlockDevice instance
2553
2554     """
2555     if excl_stor:
2556       raise errors.ProgrammerError("Persistent block device requested with"
2557                                    " exclusive_storage")
2558     return PersistentBlockDevice(unique_id, children, 0, params)
2559
2560   def Remove(self):
2561     """Remove a device
2562
2563     This is a noop
2564
2565     """
2566     pass
2567
2568   def Rename(self, new_id):
2569     """Rename this device.
2570
2571     """
2572     _ThrowError("Rename is not supported for PersistentBlockDev storage")
2573
2574   def Attach(self):
2575     """Attach to an existing block device.
2576
2577
2578     """
2579     self.attached = False
2580     try:
2581       st = os.stat(self.dev_path)
2582     except OSError, err:
2583       logging.error("Error stat()'ing %s: %s", self.dev_path, str(err))
2584       return False
2585
2586     if not stat.S_ISBLK(st.st_mode):
2587       logging.error("%s is not a block device", self.dev_path)
2588       return False
2589
2590     self.major = os.major(st.st_rdev)
2591     self.minor = os.minor(st.st_rdev)
2592     self.attached = True
2593
2594     return True
2595
2596   def Assemble(self):
2597     """Assemble the device.
2598
2599     """
2600     pass
2601
2602   def Shutdown(self):
2603     """Shutdown the device.
2604
2605     """
2606     pass
2607
2608   def Open(self, force=False):
2609     """Make the device ready for I/O.
2610
2611     """
2612     pass
2613
2614   def Close(self):
2615     """Notifies that the device will no longer be used for I/O.
2616
2617     """
2618     pass
2619
2620   def Grow(self, amount, dryrun, backingstore):
2621     """Grow the logical volume.
2622
2623     """
2624     _ThrowError("Grow is not supported for PersistentBlockDev storage")
2625
2626
2627 class RADOSBlockDevice(BlockDev):
2628   """A RADOS Block Device (rbd).
2629
2630   This class implements the RADOS Block Device for the backend. You need
2631   the rbd kernel driver, the RADOS Tools and a working RADOS cluster for
2632   this to be functional.
2633
2634   """
2635   def __init__(self, unique_id, children, size, params):
2636     """Attaches to an rbd device.
2637
2638     """
2639     super(RADOSBlockDevice, self).__init__(unique_id, children, size, params)
2640     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2641       raise ValueError("Invalid configuration data %s" % str(unique_id))
2642
2643     self.driver, self.rbd_name = unique_id
2644
2645     self.major = self.minor = None
2646     self.Attach()
2647
2648   @classmethod
2649   def Create(cls, unique_id, children, size, params, excl_stor):
2650     """Create a new rbd device.
2651
2652     Provision a new rbd volume inside a RADOS pool.
2653
2654     """
2655     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2656       raise errors.ProgrammerError("Invalid configuration data %s" %
2657                                    str(unique_id))
2658     if excl_stor:
2659       raise errors.ProgrammerError("RBD device requested with"
2660                                    " exclusive_storage")
2661     rbd_pool = params[constants.LDP_POOL]
2662     rbd_name = unique_id[1]
2663
2664     # Provision a new rbd volume (Image) inside the RADOS cluster.
2665     cmd = [constants.RBD_CMD, "create", "-p", rbd_pool,
2666            rbd_name, "--size", "%s" % size]
2667     result = utils.RunCmd(cmd)
2668     if result.failed:
2669       _ThrowError("rbd creation failed (%s): %s",
2670                   result.fail_reason, result.output)
2671
2672     return RADOSBlockDevice(unique_id, children, size, params)
2673
2674   def Remove(self):
2675     """Remove the rbd device.
2676
2677     """
2678     rbd_pool = self.params[constants.LDP_POOL]
2679     rbd_name = self.unique_id[1]
2680
2681     if not self.minor and not self.Attach():
2682       # The rbd device doesn't exist.
2683       return
2684
2685     # First shutdown the device (remove mappings).
2686     self.Shutdown()
2687
2688     # Remove the actual Volume (Image) from the RADOS cluster.
2689     cmd = [constants.RBD_CMD, "rm", "-p", rbd_pool, rbd_name]
2690     result = utils.RunCmd(cmd)
2691     if result.failed:
2692       _ThrowError("Can't remove Volume from cluster with rbd rm: %s - %s",
2693                   result.fail_reason, result.output)
2694
2695   def Rename(self, new_id):
2696     """Rename this device.
2697
2698     """
2699     pass
2700
2701   def Attach(self):
2702     """Attach to an existing rbd device.
2703
2704     This method maps the rbd volume that matches our name with
2705     an rbd device and then attaches to this device.
2706
2707     """
2708     self.attached = False
2709
2710     # Map the rbd volume to a block device under /dev
2711     self.dev_path = self._MapVolumeToBlockdev(self.unique_id)
2712
2713     try:
2714       st = os.stat(self.dev_path)
2715     except OSError, err:
2716       logging.error("Error stat()'ing %s: %s", self.dev_path, str(err))
2717       return False
2718
2719     if not stat.S_ISBLK(st.st_mode):
2720       logging.error("%s is not a block device", self.dev_path)
2721       return False
2722
2723     self.major = os.major(st.st_rdev)
2724     self.minor = os.minor(st.st_rdev)
2725     self.attached = True
2726
2727     return True
2728
2729   def _MapVolumeToBlockdev(self, unique_id):
2730     """Maps existing rbd volumes to block devices.
2731
2732     This method should be idempotent if the mapping already exists.
2733
2734     @rtype: string
2735     @return: the block device path that corresponds to the volume
2736
2737     """
2738     pool = self.params[constants.LDP_POOL]
2739     name = unique_id[1]
2740
2741     # Check if the mapping already exists.
2742     rbd_dev = self._VolumeToBlockdev(pool, name)
2743     if rbd_dev:
2744       # The mapping exists. Return it.
2745       return rbd_dev
2746
2747     # The mapping doesn't exist. Create it.
2748     map_cmd = [constants.RBD_CMD, "map", "-p", pool, name]
2749     result = utils.RunCmd(map_cmd)
2750     if result.failed:
2751       _ThrowError("rbd map failed (%s): %s",
2752                   result.fail_reason, result.output)
2753
2754     # Find the corresponding rbd device.
2755     rbd_dev = self._VolumeToBlockdev(pool, name)
2756     if not rbd_dev:
2757       _ThrowError("rbd map succeeded, but could not find the rbd block"
2758                   " device in output of showmapped, for volume: %s", name)
2759
2760     # The device was successfully mapped. Return it.
2761     return rbd_dev
2762
2763   @classmethod
2764   def _VolumeToBlockdev(cls, pool, volume_name):
2765     """Do the 'volume name'-to-'rbd block device' resolving.
2766
2767     @type pool: string
2768     @param pool: RADOS pool to use
2769     @type volume_name: string
2770     @param volume_name: the name of the volume whose device we search for
2771     @rtype: string or None
2772     @return: block device path if the volume is mapped, else None
2773
2774     """
2775     try:
2776       # Newer versions of the rbd tool support json output formatting. Use it
2777       # if available.
2778       showmap_cmd = [
2779         constants.RBD_CMD,
2780         "showmapped",
2781         "-p",
2782         pool,
2783         "--format",
2784         "json"
2785         ]
2786       result = utils.RunCmd(showmap_cmd)
2787       if result.failed:
2788         logging.error("rbd JSON output formatting returned error (%s): %s,"
2789                       "falling back to plain output parsing",
2790                       result.fail_reason, result.output)
2791         raise RbdShowmappedJsonError
2792
2793       return cls._ParseRbdShowmappedJson(result.output, volume_name)
2794     except RbdShowmappedJsonError:
2795       # For older versions of rbd, we have to parse the plain / text output
2796       # manually.
2797       showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool]
2798       result = utils.RunCmd(showmap_cmd)
2799       if result.failed:
2800         _ThrowError("rbd showmapped failed (%s): %s",
2801                     result.fail_reason, result.output)
2802
2803       return cls._ParseRbdShowmappedPlain(result.output, volume_name)
2804
2805   @staticmethod
2806   def _ParseRbdShowmappedJson(output, volume_name):
2807     """Parse the json output of `rbd showmapped'.
2808
2809     This method parses the json output of `rbd showmapped' and returns the rbd
2810     block device path (e.g. /dev/rbd0) that matches the given rbd volume.
2811
2812     @type output: string
2813     @param output: the json output of `rbd showmapped'
2814     @type volume_name: string
2815     @param volume_name: the name of the volume whose device we search for
2816     @rtype: string or None
2817     @return: block device path if the volume is mapped, else None
2818
2819     """
2820     try:
2821       devices = serializer.LoadJson(output)
2822     except ValueError, err:
2823       _ThrowError("Unable to parse JSON data: %s" % err)
2824
2825     rbd_dev = None
2826     for d in devices.values(): # pylint: disable=E1103
2827       try:
2828         name = d["name"]
2829       except KeyError:
2830         _ThrowError("'name' key missing from json object %s", devices)
2831
2832       if name == volume_name:
2833         if rbd_dev is not None:
2834           _ThrowError("rbd volume %s is mapped more than once", volume_name)
2835
2836         rbd_dev = d["device"]
2837
2838     return rbd_dev
2839
2840   @staticmethod
2841   def _ParseRbdShowmappedPlain(output, volume_name):
2842     """Parse the (plain / text) output of `rbd showmapped'.
2843
2844     This method parses the output of `rbd showmapped' and returns
2845     the rbd block device path (e.g. /dev/rbd0) that matches the
2846     given rbd volume.
2847
2848     @type output: string
2849     @param output: the plain text output of `rbd showmapped'
2850     @type volume_name: string
2851     @param volume_name: the name of the volume whose device we search for
2852     @rtype: string or None
2853     @return: block device path if the volume is mapped, else None
2854
2855     """
2856     allfields = 5
2857     volumefield = 2
2858     devicefield = 4
2859
2860     lines = output.splitlines()
2861
2862     # Try parsing the new output format (ceph >= 0.55).
2863     splitted_lines = map(lambda l: l.split(), lines)
2864
2865     # Check for empty output.
2866     if not splitted_lines:
2867       return None
2868
2869     # Check showmapped output, to determine number of fields.
2870     field_cnt = len(splitted_lines[0])
2871     if field_cnt != allfields:
2872       # Parsing the new format failed. Fallback to parsing the old output
2873       # format (< 0.55).
2874       splitted_lines = map(lambda l: l.split("\t"), lines)
2875       if field_cnt != allfields:
2876         _ThrowError("Cannot parse rbd showmapped output expected %s fields,"
2877                     " found %s", allfields, field_cnt)
2878
2879     matched_lines = \
2880       filter(lambda l: len(l) == allfields and l[volumefield] == volume_name,
2881              splitted_lines)
2882
2883     if len(matched_lines) > 1:
2884       _ThrowError("rbd volume %s mapped more than once", volume_name)
2885
2886     if matched_lines:
2887       # rbd block device found. Return it.
2888       rbd_dev = matched_lines[0][devicefield]
2889       return rbd_dev
2890
2891     # The given volume is not mapped.
2892     return None
2893
2894   def Assemble(self):
2895     """Assemble the device.
2896
2897     """
2898     pass
2899
2900   def Shutdown(self):
2901     """Shutdown the device.
2902
2903     """
2904     if not self.minor and not self.Attach():
2905       # The rbd device doesn't exist.
2906       return
2907
2908     # Unmap the block device from the Volume.
2909     self._UnmapVolumeFromBlockdev(self.unique_id)
2910
2911     self.minor = None
2912     self.dev_path = None
2913
2914   def _UnmapVolumeFromBlockdev(self, unique_id):
2915     """Unmaps the rbd device from the Volume it is mapped.
2916
2917     Unmaps the rbd device from the Volume it was previously mapped to.
2918     This method should be idempotent if the Volume isn't mapped.
2919
2920     """
2921     pool = self.params[constants.LDP_POOL]
2922     name = unique_id[1]
2923
2924     # Check if the mapping already exists.
2925     rbd_dev = self._VolumeToBlockdev(pool, name)
2926
2927     if rbd_dev:
2928       # The mapping exists. Unmap the rbd device.
2929       unmap_cmd = [constants.RBD_CMD, "unmap", "%s" % rbd_dev]
2930       result = utils.RunCmd(unmap_cmd)
2931       if result.failed:
2932         _ThrowError("rbd unmap failed (%s): %s",
2933                     result.fail_reason, result.output)
2934
2935   def Open(self, force=False):
2936     """Make the device ready for I/O.
2937
2938     """
2939     pass
2940
2941   def Close(self):
2942     """Notifies that the device will no longer be used for I/O.
2943
2944     """
2945     pass
2946
2947   def Grow(self, amount, dryrun, backingstore):
2948     """Grow the Volume.
2949
2950     @type amount: integer
2951     @param amount: the amount (in mebibytes) to grow with
2952     @type dryrun: boolean
2953     @param dryrun: whether to execute the operation in simulation mode
2954         only, without actually increasing the size
2955
2956     """
2957     if not backingstore:
2958       return
2959     if not self.Attach():
2960       _ThrowError("Can't attach to rbd device during Grow()")
2961
2962     if dryrun:
2963       # the rbd tool does not support dry runs of resize operations.
2964       # Since rbd volumes are thinly provisioned, we assume
2965       # there is always enough free space for the operation.
2966       return
2967
2968     rbd_pool = self.params[constants.LDP_POOL]
2969     rbd_name = self.unique_id[1]
2970     new_size = self.size + amount
2971
2972     # Resize the rbd volume (Image) inside the RADOS cluster.
2973     cmd = [constants.RBD_CMD, "resize", "-p", rbd_pool,
2974            rbd_name, "--size", "%s" % new_size]
2975     result = utils.RunCmd(cmd)
2976     if result.failed:
2977       _ThrowError("rbd resize failed (%s): %s",
2978                   result.fail_reason, result.output)
2979
2980
2981 class ExtStorageDevice(BlockDev):
2982   """A block device provided by an ExtStorage Provider.
2983
2984   This class implements the External Storage Interface, which means
2985   handling of the externally provided block devices.
2986
2987   """
2988   def __init__(self, unique_id, children, size, params):
2989     """Attaches to an extstorage block device.
2990
2991     """
2992     super(ExtStorageDevice, self).__init__(unique_id, children, size, params)
2993     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2994       raise ValueError("Invalid configuration data %s" % str(unique_id))
2995
2996     self.driver, self.vol_name = unique_id
2997     self.ext_params = params
2998
2999     self.major = self.minor = None
3000     self.Attach()
3001
3002   @classmethod
3003   def Create(cls, unique_id, children, size, params, excl_stor):
3004     """Create a new extstorage device.
3005
3006     Provision a new volume using an extstorage provider, which will
3007     then be mapped to a block device.
3008
3009     """
3010     if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
3011       raise errors.ProgrammerError("Invalid configuration data %s" %
3012                                    str(unique_id))
3013     if excl_stor:
3014       raise errors.ProgrammerError("extstorage device requested with"
3015                                    " exclusive_storage")
3016
3017     # Call the External Storage's create script,
3018     # to provision a new Volume inside the External Storage
3019     _ExtStorageAction(constants.ES_ACTION_CREATE, unique_id,
3020                       params, str(size))
3021
3022     return ExtStorageDevice(unique_id, children, size, params)
3023
3024   def Remove(self):
3025     """Remove the extstorage device.
3026
3027     """
3028     if not self.minor and not self.Attach():
3029       # The extstorage device doesn't exist.
3030       return
3031
3032     # First shutdown the device (remove mappings).
3033     self.Shutdown()
3034
3035     # Call the External Storage's remove script,
3036     # to remove the Volume from the External Storage
3037     _ExtStorageAction(constants.ES_ACTION_REMOVE, self.unique_id,
3038                       self.ext_params)
3039
3040   def Rename(self, new_id):
3041     """Rename this device.
3042
3043     """
3044     pass
3045
3046   def Attach(self):
3047     """Attach to an existing extstorage device.
3048
3049     This method maps the extstorage volume that matches our name with
3050     a corresponding block device and then attaches to this device.
3051
3052     """
3053     self.attached = False
3054
3055     # Call the External Storage's attach script,
3056     # to attach an existing Volume to a block device under /dev
3057     self.dev_path = _ExtStorageAction(constants.ES_ACTION_ATTACH,
3058                                       self.unique_id, self.ext_params)
3059
3060     try:
3061       st = os.stat(self.dev_path)
3062     except OSError, err:
3063       logging.error("Error stat()'ing %s: %s", self.dev_path, str(err))
3064       return False
3065
3066     if not stat.S_ISBLK(st.st_mode):
3067       logging.error("%s is not a block device", self.dev_path)
3068       return False
3069
3070     self.major = os.major(st.st_rdev)
3071     self.minor = os.minor(st.st_rdev)
3072     self.attached = True
3073
3074     return True
3075
3076   def Assemble(self):
3077     """Assemble the device.
3078
3079     """
3080     pass
3081
3082   def Shutdown(self):
3083     """Shutdown the device.
3084
3085     """
3086     if not self.minor and not self.Attach():
3087       # The extstorage device doesn't exist.
3088       return
3089
3090     # Call the External Storage's detach script,
3091     # to detach an existing Volume from it's block device under /dev
3092     _ExtStorageAction(constants.ES_ACTION_DETACH, self.unique_id,
3093                       self.ext_params)
3094
3095     self.minor = None
3096     self.dev_path = None
3097
3098   def Open(self, force=False):
3099     """Make the device ready for I/O.
3100
3101     """
3102     pass
3103
3104   def Close(self):
3105     """Notifies that the device will no longer be used for I/O.
3106
3107     """
3108     pass
3109
3110   def Grow(self, amount, dryrun, backingstore):
3111     """Grow the Volume.
3112
3113     @type amount: integer
3114     @param amount: the amount (in mebibytes) to grow with
3115     @type dryrun: boolean
3116     @param dryrun: whether to execute the operation in simulation mode
3117         only, without actually increasing the size
3118
3119     """
3120     if not backingstore:
3121       return
3122     if not self.Attach():
3123       _ThrowError("Can't attach to extstorage device during Grow()")
3124
3125     if dryrun:
3126       # we do not support dry runs of resize operations for now.
3127       return
3128
3129     new_size = self.size + amount
3130
3131     # Call the External Storage's grow script,
3132     # to grow an existing Volume inside the External Storage
3133     _ExtStorageAction(constants.ES_ACTION_GROW, self.unique_id,
3134                       self.ext_params, str(self.size), grow=str(new_size))
3135
3136   def SetInfo(self, text):
3137     """Update metadata with info text.
3138
3139     """
3140     # Replace invalid characters
3141     text = re.sub("^[^A-Za-z0-9_+.]", "_", text)
3142     text = re.sub("[^-A-Za-z0-9_+.]", "_", text)
3143
3144     # Only up to 128 characters are allowed
3145     text = text[:128]
3146
3147     # Call the External Storage's setinfo script,
3148     # to set metadata for an existing Volume inside the External Storage
3149     _ExtStorageAction(constants.ES_ACTION_SETINFO, self.unique_id,
3150                       self.ext_params, metadata=text)
3151
3152
3153 def _ExtStorageAction(action, unique_id, ext_params,
3154                       size=None, grow=None, metadata=None):
3155   """Take an External Storage action.
3156
3157   Take an External Storage action concerning or affecting
3158   a specific Volume inside the External Storage.
3159
3160   @type action: string
3161   @param action: which action to perform. One of:
3162                  create / remove / grow / attach / detach
3163   @type unique_id: tuple (driver, vol_name)
3164   @param unique_id: a tuple containing the type of ExtStorage (driver)
3165                     and the Volume name
3166   @type ext_params: dict
3167   @param ext_params: ExtStorage parameters
3168   @type size: integer
3169   @param size: the size of the Volume in mebibytes
3170   @type grow: integer
3171   @param grow: the new size in mebibytes (after grow)
3172   @type metadata: string
3173   @param metadata: metadata info of the Volume, for use by the provider
3174   @rtype: None or a block device path (during attach)
3175
3176   """
3177   driver, vol_name = unique_id
3178
3179   # Create an External Storage instance of type `driver'
3180   status, inst_es = ExtStorageFromDisk(driver)
3181   if not status:
3182     _ThrowError("%s" % inst_es)
3183
3184   # Create the basic environment for the driver's scripts
3185   create_env = _ExtStorageEnvironment(unique_id, ext_params, size,
3186                                       grow, metadata)
3187
3188   # Do not use log file for action `attach' as we need
3189   # to get the output from RunResult
3190   # TODO: find a way to have a log file for attach too
3191   logfile = None
3192   if action is not constants.ES_ACTION_ATTACH:
3193     logfile = _VolumeLogName(action, driver, vol_name)
3194
3195   # Make sure the given action results in a valid script
3196   if action not in constants.ES_SCRIPTS:
3197     _ThrowError("Action '%s' doesn't result in a valid ExtStorage script" %
3198                 action)
3199
3200   # Find out which external script to run according the given action
3201   script_name = action + "_script"
3202   script = getattr(inst_es, script_name)
3203
3204   # Run the external script
3205   result = utils.RunCmd([script], env=create_env,
3206                         cwd=inst_es.path, output=logfile,)
3207   if result.failed:
3208     logging.error("External storage's %s command '%s' returned"
3209                   " error: %s, logfile: %s, output: %s",
3210                   action, result.cmd, result.fail_reason,
3211                   logfile, result.output)
3212
3213     # If logfile is 'None' (during attach), it breaks TailFile
3214     # TODO: have a log file for attach too
3215     if action is not constants.ES_ACTION_ATTACH:
3216       lines = [utils.SafeEncode(val)
3217                for val in utils.TailFile(logfile, lines=20)]
3218     else:
3219       lines = result.output[-20:]
3220
3221     _ThrowError("External storage's %s script failed (%s), last"
3222                 " lines of output:\n%s",
3223                 action, result.fail_reason, "\n".join(lines))
3224
3225   if action == constants.ES_ACTION_ATTACH:
3226     return result.stdout
3227
3228
3229 def ExtStorageFromDisk(name, base_dir=None):
3230   """Create an ExtStorage instance from disk.
3231
3232   This function will return an ExtStorage instance
3233   if the given name is a valid ExtStorage name.
3234
3235   @type base_dir: string
3236   @keyword base_dir: Base directory containing ExtStorage installations.
3237                      Defaults to a search in all the ES_SEARCH_PATH dirs.
3238   @rtype: tuple
3239   @return: True and the ExtStorage instance if we find a valid one, or
3240       False and the diagnose message on error
3241
3242   """
3243   if base_dir is None:
3244     es_base_dir = pathutils.ES_SEARCH_PATH
3245   else:
3246     es_base_dir = [base_dir]
3247
3248   es_dir = utils.FindFile(name, es_base_dir, os.path.isdir)
3249
3250   if es_dir is None:
3251     return False, ("Directory for External Storage Provider %s not"
3252                    " found in search path" % name)
3253
3254   # ES Files dictionary, we will populate it with the absolute path
3255   # names; if the value is True, then it is a required file, otherwise
3256   # an optional one
3257   es_files = dict.fromkeys(constants.ES_SCRIPTS, True)
3258
3259   es_files[constants.ES_PARAMETERS_FILE] = True
3260
3261   for (filename, _) in es_files.items():
3262     es_files[filename] = utils.PathJoin(es_dir, filename)
3263
3264     try:
3265       st = os.stat(es_files[filename])
3266     except EnvironmentError, err:
3267       return False, ("File '%s' under path '%s' is missing (%s)" %
3268                      (filename, es_dir, utils.ErrnoOrStr(err)))
3269
3270     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
3271       return False, ("File '%s' under path '%s' is not a regular file" %
3272                      (filename, es_dir))
3273
3274     if filename in constants.ES_SCRIPTS:
3275       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
3276         return False, ("File '%s' under path '%s' is not executable" %
3277                        (filename, es_dir))
3278
3279   parameters = []
3280   if constants.ES_PARAMETERS_FILE in es_files:
3281     parameters_file = es_files[constants.ES_PARAMETERS_FILE]
3282     try:
3283       parameters = utils.ReadFile(parameters_file).splitlines()
3284     except EnvironmentError, err:
3285       return False, ("Error while reading the EXT parameters file at %s: %s" %
3286                      (parameters_file, utils.ErrnoOrStr(err)))
3287     parameters = [v.split(None, 1) for v in parameters]
3288
3289   es_obj = \
3290     objects.ExtStorage(name=name, path=es_dir,
3291                        create_script=es_files[constants.ES_SCRIPT_CREATE],
3292                        remove_script=es_files[constants.ES_SCRIPT_REMOVE],
3293                        grow_script=es_files[constants.ES_SCRIPT_GROW],
3294                        attach_script=es_files[constants.ES_SCRIPT_ATTACH],
3295                        detach_script=es_files[constants.ES_SCRIPT_DETACH],
3296                        setinfo_script=es_files[constants.ES_SCRIPT_SETINFO],
3297                        verify_script=es_files[constants.ES_SCRIPT_VERIFY],
3298                        supported_parameters=parameters)
3299   return True, es_obj
3300
3301
3302 def _ExtStorageEnvironment(unique_id, ext_params,
3303                            size=None, grow=None, metadata=None):
3304   """Calculate the environment for an External Storage script.
3305
3306   @type unique_id: tuple (driver, vol_name)
3307   @param unique_id: ExtStorage pool and name of the Volume
3308   @type ext_params: dict
3309   @param ext_params: the EXT parameters
3310   @type size: string
3311   @param size: size of the Volume (in mebibytes)
3312   @type grow: string
3313   @param grow: new size of Volume after grow (in mebibytes)
3314   @type metadata: string
3315   @param metadata: metadata info of the Volume
3316   @rtype: dict
3317   @return: dict of environment variables
3318
3319   """
3320   vol_name = unique_id[1]
3321
3322   result = {}
3323   result["VOL_NAME"] = vol_name
3324
3325   # EXT params
3326   for pname, pvalue in ext_params.items():
3327     result["EXTP_%s" % pname.upper()] = str(pvalue)
3328
3329   if size is not None:
3330     result["VOL_SIZE"] = size
3331
3332   if grow is not None:
3333     result["VOL_NEW_SIZE"] = grow
3334
3335   if metadata is not None:
3336     result["VOL_METADATA"] = metadata
3337
3338   return result
3339
3340
3341 def _VolumeLogName(kind, es_name, volume):
3342   """Compute the ExtStorage log filename for a given Volume and operation.
3343
3344   @type kind: string
3345   @param kind: the operation type (e.g. create, remove etc.)
3346   @type es_name: string
3347   @param es_name: the ExtStorage name
3348   @type volume: string
3349   @param volume: the name of the Volume inside the External Storage
3350
3351   """
3352   # Check if the extstorage log dir is a valid dir
3353   if not os.path.isdir(pathutils.LOG_ES_DIR):
3354     _ThrowError("Cannot find log directory: %s", pathutils.LOG_ES_DIR)
3355
3356   # TODO: Use tempfile.mkstemp to create unique filename
3357   base = ("%s-%s-%s-%s.log" %
3358           (kind, es_name, volume, utils.TimestampForFilename()))
3359   return utils.PathJoin(pathutils.LOG_ES_DIR, base)
3360
3361
3362 DEV_MAP = {
3363   constants.LD_LV: LogicalVolume,
3364   constants.LD_DRBD8: DRBD8,
3365   constants.LD_BLOCKDEV: PersistentBlockDevice,
3366   constants.LD_RBD: RADOSBlockDevice,
3367   constants.LD_EXT: ExtStorageDevice,
3368   }
3369
3370 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
3371   DEV_MAP[constants.LD_FILE] = FileStorage
3372
3373
3374 def _VerifyDiskType(dev_type):
3375   if dev_type not in DEV_MAP:
3376     raise errors.ProgrammerError("Invalid block device type '%s'" % dev_type)
3377
3378
3379 def _VerifyDiskParams(disk):
3380   """Verifies if all disk parameters are set.
3381
3382   """
3383   missing = set(constants.DISK_LD_DEFAULTS[disk.dev_type]) - set(disk.params)
3384   if missing:
3385     raise errors.ProgrammerError("Block device is missing disk parameters: %s" %
3386                                  missing)
3387
3388
3389 def FindDevice(disk, children):
3390   """Search for an existing, assembled device.
3391
3392   This will succeed only if the device exists and is assembled, but it
3393   does not do any actions in order to activate the device.
3394
3395   @type disk: L{objects.Disk}
3396   @param disk: the disk object to find
3397   @type children: list of L{bdev.BlockDev}
3398   @param children: the list of block devices that are children of the device
3399                   represented by the disk parameter
3400
3401   """
3402   _VerifyDiskType(disk.dev_type)
3403   device = DEV_MAP[disk.dev_type](disk.physical_id, children, disk.size,
3404                                   disk.params)
3405   if not device.attached:
3406     return None
3407   return device
3408
3409
3410 def Assemble(disk, children):
3411   """Try to attach or assemble an existing device.
3412
3413   This will attach to assemble the device, as needed, to bring it
3414   fully up. It must be safe to run on already-assembled devices.
3415
3416   @type disk: L{objects.Disk}
3417   @param disk: the disk object to assemble
3418   @type children: list of L{bdev.BlockDev}
3419   @param children: the list of block devices that are children of the device
3420                   represented by the disk parameter
3421
3422   """
3423   _VerifyDiskType(disk.dev_type)
3424   _VerifyDiskParams(disk)
3425   device = DEV_MAP[disk.dev_type](disk.physical_id, children, disk.size,
3426                                   disk.params)
3427   device.Assemble()
3428   return device
3429
3430
3431 def Create(disk, children, excl_stor):
3432   """Create a device.
3433
3434   @type disk: L{objects.Disk}
3435   @param disk: the disk object to create
3436   @type children: list of L{bdev.BlockDev}
3437   @param children: the list of block devices that are children of the device
3438                   represented by the disk parameter
3439   @type excl_stor: boolean
3440   @param excl_stor: Whether exclusive_storage is active
3441
3442   """
3443   _VerifyDiskType(disk.dev_type)
3444   _VerifyDiskParams(disk)
3445   device = DEV_MAP[disk.dev_type].Create(disk.physical_id, children, disk.size,
3446                                          disk.params, excl_stor)
3447   return device