Statistics
| Branch: | Tag: | Revision:

root / lib / bdev.py @ 59726e15

History | View | Annotate | Download (102.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Block device abstraction"""
23

    
24
import re
25
import time
26
import errno
27
import shlex
28
import stat
29
import pyparsing as pyp
30
import os
31
import logging
32

    
33
from ganeti import utils
34
from ganeti import errors
35
from ganeti import constants
36
from ganeti import objects
37
from ganeti import compat
38
from ganeti import netutils
39
from ganeti import pathutils
40

    
41

    
42
# Size of reads in _CanReadDevice
43
_DEVICE_READ_SIZE = 128 * 1024
44

    
45

    
46
def _IgnoreError(fn, *args, **kwargs):
47
  """Executes the given function, ignoring BlockDeviceErrors.
48

49
  This is used in order to simplify the execution of cleanup or
50
  rollback functions.
51

52
  @rtype: boolean
53
  @return: True when fn didn't raise an exception, False otherwise
54

55
  """
56
  try:
57
    fn(*args, **kwargs)
58
    return True
59
  except errors.BlockDeviceError, err:
60
    logging.warning("Caught BlockDeviceError but ignoring: %s", str(err))
61
    return False
62

    
63

    
64
def _ThrowError(msg, *args):
65
  """Log an error to the node daemon and the raise an exception.
66

67
  @type msg: string
68
  @param msg: the text of the exception
69
  @raise errors.BlockDeviceError
70

71
  """
72
  if args:
73
    msg = msg % args
74
  logging.error(msg)
75
  raise errors.BlockDeviceError(msg)
76

    
77

    
78
def _CheckResult(result):
79
  """Throws an error if the given result is a failed one.
80

81
  @param result: result from RunCmd
82

83
  """
84
  if result.failed:
85
    _ThrowError("Command: %s error: %s - %s", result.cmd, result.fail_reason,
86
                result.output)
87

    
88

    
89
def _CanReadDevice(path):
90
  """Check if we can read from the given device.
91

92
  This tries to read the first 128k of the device.
93

94
  """
95
  try:
96
    utils.ReadFile(path, size=_DEVICE_READ_SIZE)
97
    return True
98
  except EnvironmentError:
99
    logging.warning("Can't read from device %s", path, exc_info=True)
100
    return False
101

    
102

    
103
def _GetForbiddenFileStoragePaths():
104
  """Builds a list of path prefixes which shouldn't be used for file storage.
105

106
  @rtype: frozenset
107

108
  """
109
  paths = set([
110
    "/boot",
111
    "/dev",
112
    "/etc",
113
    "/home",
114
    "/proc",
115
    "/root",
116
    "/sys",
117
    ])
118

    
119
  for prefix in ["", "/usr", "/usr/local"]:
120
    paths.update(map(lambda s: "%s/%s" % (prefix, s),
121
                     ["bin", "lib", "lib32", "lib64", "sbin"]))
122

    
123
  return compat.UniqueFrozenset(map(os.path.normpath, paths))
124

    
125

    
126
def _ComputeWrongFileStoragePaths(paths,
127
                                  _forbidden=_GetForbiddenFileStoragePaths()):
128
  """Cross-checks a list of paths for prefixes considered bad.
129

130
  Some paths, e.g. "/bin", should not be used for file storage.
131

132
  @type paths: list
133
  @param paths: List of paths to be checked
134
  @rtype: list
135
  @return: Sorted list of paths for which the user should be warned
136

137
  """
138
  def _Check(path):
139
    return (not os.path.isabs(path) or
140
            path in _forbidden or
141
            filter(lambda p: utils.IsBelowDir(p, path), _forbidden))
142

    
143
  return utils.NiceSort(filter(_Check, map(os.path.normpath, paths)))
144

    
145

    
146
def ComputeWrongFileStoragePaths(_filename=pathutils.FILE_STORAGE_PATHS_FILE):
147
  """Returns a list of file storage paths whose prefix is considered bad.
148

149
  See L{_ComputeWrongFileStoragePaths}.
150

151
  """
152
  return _ComputeWrongFileStoragePaths(_LoadAllowedFileStoragePaths(_filename))
153

    
154

    
155
def _CheckFileStoragePath(path, allowed):
156
  """Checks if a path is in a list of allowed paths for file storage.
157

158
  @type path: string
159
  @param path: Path to check
160
  @type allowed: list
161
  @param allowed: List of allowed paths
162
  @raise errors.FileStoragePathError: If the path is not allowed
163

164
  """
165
  if not os.path.isabs(path):
166
    raise errors.FileStoragePathError("File storage path must be absolute,"
167
                                      " got '%s'" % path)
168

    
169
  for i in allowed:
170
    if not os.path.isabs(i):
171
      logging.info("Ignoring relative path '%s' for file storage", i)
172
      continue
173

    
174
    if utils.IsBelowDir(i, path):
175
      break
176
  else:
177
    raise errors.FileStoragePathError("Path '%s' is not acceptable for file"
178
                                      " storage" % path)
179

    
180

    
181
def _LoadAllowedFileStoragePaths(filename):
182
  """Loads file containing allowed file storage paths.
183

184
  @rtype: list
185
  @return: List of allowed paths (can be an empty list)
186

187
  """
188
  try:
189
    contents = utils.ReadFile(filename)
190
  except EnvironmentError:
191
    return []
192
  else:
193
    return utils.FilterEmptyLinesAndComments(contents)
194

    
195

    
196
def CheckFileStoragePath(path, _filename=pathutils.FILE_STORAGE_PATHS_FILE):
197
  """Checks if a path is allowed for file storage.
198

199
  @type path: string
200
  @param path: Path to check
201
  @raise errors.FileStoragePathError: If the path is not allowed
202

203
  """
204
  allowed = _LoadAllowedFileStoragePaths(_filename)
205

    
206
  if _ComputeWrongFileStoragePaths([path]):
207
    raise errors.FileStoragePathError("Path '%s' uses a forbidden prefix" %
208
                                      path)
209

    
210
  _CheckFileStoragePath(path, allowed)
211

    
212

    
213
class BlockDev(object):
214
  """Block device abstract class.
215

216
  A block device can be in the following states:
217
    - not existing on the system, and by `Create()` it goes into:
218
    - existing but not setup/not active, and by `Assemble()` goes into:
219
    - active read-write and by `Open()` it goes into
220
    - online (=used, or ready for use)
221

222
  A device can also be online but read-only, however we are not using
223
  the readonly state (LV has it, if needed in the future) and we are
224
  usually looking at this like at a stack, so it's easier to
225
  conceptualise the transition from not-existing to online and back
226
  like a linear one.
227

228
  The many different states of the device are due to the fact that we
229
  need to cover many device types:
230
    - logical volumes are created, lvchange -a y $lv, and used
231
    - drbd devices are attached to a local disk/remote peer and made primary
232

233
  A block device is identified by three items:
234
    - the /dev path of the device (dynamic)
235
    - a unique ID of the device (static)
236
    - it's major/minor pair (dynamic)
237

238
  Not all devices implement both the first two as distinct items. LVM
239
  logical volumes have their unique ID (the pair volume group, logical
240
  volume name) in a 1-to-1 relation to the dev path. For DRBD devices,
241
  the /dev path is again dynamic and the unique id is the pair (host1,
242
  dev1), (host2, dev2).
243

244
  You can get to a device in two ways:
245
    - creating the (real) device, which returns you
246
      an attached instance (lvcreate)
247
    - attaching of a python instance to an existing (real) device
248

249
  The second point, the attachement to a device, is different
250
  depending on whether the device is assembled or not. At init() time,
251
  we search for a device with the same unique_id as us. If found,
252
  good. It also means that the device is already assembled. If not,
253
  after assembly we'll have our correct major/minor.
254

255
  """
256
  def __init__(self, unique_id, children, size, params):
257
    self._children = children
258
    self.dev_path = None
259
    self.unique_id = unique_id
260
    self.major = None
261
    self.minor = None
262
    self.attached = False
263
    self.size = size
264
    self.params = params
265

    
266
  def Assemble(self):
267
    """Assemble the device from its components.
268

269
    Implementations of this method by child classes must ensure that:
270
      - after the device has been assembled, it knows its major/minor
271
        numbers; this allows other devices (usually parents) to probe
272
        correctly for their children
273
      - calling this method on an existing, in-use device is safe
274
      - if the device is already configured (and in an OK state),
275
        this method is idempotent
276

277
    """
278
    pass
279

    
280
  def Attach(self):
281
    """Find a device which matches our config and attach to it.
282

283
    """
284
    raise NotImplementedError
285

    
286
  def Close(self):
287
    """Notifies that the device will no longer be used for I/O.
288

289
    """
290
    raise NotImplementedError
291

    
292
  @classmethod
293
  def Create(cls, unique_id, children, size, params):
294
    """Create the device.
295

296
    If the device cannot be created, it will return None
297
    instead. Error messages go to the logging system.
298

299
    Note that for some devices, the unique_id is used, and for other,
300
    the children. The idea is that these two, taken together, are
301
    enough for both creation and assembly (later).
302

303
    """
304
    raise NotImplementedError
305

    
306
  def Remove(self):
307
    """Remove this device.
308

309
    This makes sense only for some of the device types: LV and file
310
    storage. Also note that if the device can't attach, the removal
311
    can't be completed.
312

313
    """
314
    raise NotImplementedError
315

    
316
  def Rename(self, new_id):
317
    """Rename this device.
318

319
    This may or may not make sense for a given device type.
320

321
    """
322
    raise NotImplementedError
323

    
324
  def Open(self, force=False):
325
    """Make the device ready for use.
326

327
    This makes the device ready for I/O. For now, just the DRBD
328
    devices need this.
329

330
    The force parameter signifies that if the device has any kind of
331
    --force thing, it should be used, we know what we are doing.
332

333
    """
334
    raise NotImplementedError
335

    
336
  def Shutdown(self):
337
    """Shut down the device, freeing its children.
338

339
    This undoes the `Assemble()` work, except for the child
340
    assembling; as such, the children on the device are still
341
    assembled after this call.
342

343
    """
344
    raise NotImplementedError
345

    
346
  def SetSyncParams(self, params):
347
    """Adjust the synchronization parameters of the mirror.
348

349
    In case this is not a mirroring device, this is no-op.
350

351
    @param params: dictionary of LD level disk parameters related to the
352
    synchronization.
353
    @rtype: list
354
    @return: a list of error messages, emitted both by the current node and by
355
    children. An empty list means no errors.
356

357
    """
358
    result = []
359
    if self._children:
360
      for child in self._children:
361
        result.extend(child.SetSyncParams(params))
362
    return result
363

    
364
  def PauseResumeSync(self, pause):
365
    """Pause/Resume the sync of the mirror.
366

367
    In case this is not a mirroring device, this is no-op.
368

369
    @param pause: Whether to pause or resume
370

371
    """
372
    result = True
373
    if self._children:
374
      for child in self._children:
375
        result = result and child.PauseResumeSync(pause)
376
    return result
377

    
378
  def GetSyncStatus(self):
379
    """Returns the sync status of the device.
380

381
    If this device is a mirroring device, this function returns the
382
    status of the mirror.
383

384
    If sync_percent is None, it means the device is not syncing.
385

386
    If estimated_time is None, it means we can't estimate
387
    the time needed, otherwise it's the time left in seconds.
388

389
    If is_degraded is True, it means the device is missing
390
    redundancy. This is usually a sign that something went wrong in
391
    the device setup, if sync_percent is None.
392

393
    The ldisk parameter represents the degradation of the local
394
    data. This is only valid for some devices, the rest will always
395
    return False (not degraded).
396

397
    @rtype: objects.BlockDevStatus
398

399
    """
400
    return objects.BlockDevStatus(dev_path=self.dev_path,
401
                                  major=self.major,
402
                                  minor=self.minor,
403
                                  sync_percent=None,
404
                                  estimated_time=None,
405
                                  is_degraded=False,
406
                                  ldisk_status=constants.LDS_OKAY)
407

    
408
  def CombinedSyncStatus(self):
409
    """Calculate the mirror status recursively for our children.
410

411
    The return value is the same as for `GetSyncStatus()` except the
412
    minimum percent and maximum time are calculated across our
413
    children.
414

415
    @rtype: objects.BlockDevStatus
416

417
    """
418
    status = self.GetSyncStatus()
419

    
420
    min_percent = status.sync_percent
421
    max_time = status.estimated_time
422
    is_degraded = status.is_degraded
423
    ldisk_status = status.ldisk_status
424

    
425
    if self._children:
426
      for child in self._children:
427
        child_status = child.GetSyncStatus()
428

    
429
        if min_percent is None:
430
          min_percent = child_status.sync_percent
431
        elif child_status.sync_percent is not None:
432
          min_percent = min(min_percent, child_status.sync_percent)
433

    
434
        if max_time is None:
435
          max_time = child_status.estimated_time
436
        elif child_status.estimated_time is not None:
437
          max_time = max(max_time, child_status.estimated_time)
438

    
439
        is_degraded = is_degraded or child_status.is_degraded
440

    
441
        if ldisk_status is None:
442
          ldisk_status = child_status.ldisk_status
443
        elif child_status.ldisk_status is not None:
444
          ldisk_status = max(ldisk_status, child_status.ldisk_status)
445

    
446
    return objects.BlockDevStatus(dev_path=self.dev_path,
447
                                  major=self.major,
448
                                  minor=self.minor,
449
                                  sync_percent=min_percent,
450
                                  estimated_time=max_time,
451
                                  is_degraded=is_degraded,
452
                                  ldisk_status=ldisk_status)
453

    
454
  def SetInfo(self, text):
455
    """Update metadata with info text.
456

457
    Only supported for some device types.
458

459
    """
460
    for child in self._children:
461
      child.SetInfo(text)
462

    
463
  def Grow(self, amount, dryrun, backingstore):
464
    """Grow the block device.
465

466
    @type amount: integer
467
    @param amount: the amount (in mebibytes) to grow with
468
    @type dryrun: boolean
469
    @param dryrun: whether to execute the operation in simulation mode
470
        only, without actually increasing the size
471
    @param backingstore: whether to execute the operation on backing storage
472
        only, or on "logical" storage only; e.g. DRBD is logical storage,
473
        whereas LVM, file, RBD are backing storage
474

475
    """
476
    raise NotImplementedError
477

    
478
  def GetActualSize(self):
479
    """Return the actual disk size.
480

481
    @note: the device needs to be active when this is called
482

483
    """
484
    assert self.attached, "BlockDevice not attached in GetActualSize()"
485
    result = utils.RunCmd(["blockdev", "--getsize64", self.dev_path])
486
    if result.failed:
487
      _ThrowError("blockdev failed (%s): %s",
488
                  result.fail_reason, result.output)
489
    try:
490
      sz = int(result.output.strip())
491
    except (ValueError, TypeError), err:
492
      _ThrowError("Failed to parse blockdev output: %s", str(err))
493
    return sz
494

    
495
  def __repr__(self):
496
    return ("<%s: unique_id: %s, children: %s, %s:%s, %s>" %
497
            (self.__class__, self.unique_id, self._children,
498
             self.major, self.minor, self.dev_path))
499

    
500

    
501
class LogicalVolume(BlockDev):
502
  """Logical Volume block device.
503

504
  """
505
  _VALID_NAME_RE = re.compile("^[a-zA-Z0-9+_.-]*$")
506
  _INVALID_NAMES = compat.UniqueFrozenset([".", "..", "snapshot", "pvmove"])
507
  _INVALID_SUBSTRINGS = compat.UniqueFrozenset(["_mlog", "_mimage"])
508

    
509
  def __init__(self, unique_id, children, size, params):
510
    """Attaches to a LV device.
511

512
    The unique_id is a tuple (vg_name, lv_name)
513

514
    """
515
    super(LogicalVolume, self).__init__(unique_id, children, size, params)
516
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
517
      raise ValueError("Invalid configuration data %s" % str(unique_id))
518
    self._vg_name, self._lv_name = unique_id
519
    self._ValidateName(self._vg_name)
520
    self._ValidateName(self._lv_name)
521
    self.dev_path = utils.PathJoin("/dev", self._vg_name, self._lv_name)
522
    self._degraded = True
523
    self.major = self.minor = self.pe_size = self.stripe_count = None
524
    self.Attach()
525

    
526
  @classmethod
527
  def Create(cls, unique_id, children, size, params):
528
    """Create a new logical volume.
529

530
    """
531
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
532
      raise errors.ProgrammerError("Invalid configuration data %s" %
533
                                   str(unique_id))
534
    vg_name, lv_name = unique_id
535
    cls._ValidateName(vg_name)
536
    cls._ValidateName(lv_name)
537
    pvs_info = cls.GetPVInfo([vg_name])
538
    if not pvs_info:
539
      _ThrowError("Can't compute PV info for vg %s", vg_name)
540
    pvs_info.sort(key=(lambda pv: pv.free), reverse=True)
541

    
542
    pvlist = [pv.name for pv in pvs_info]
543
    if compat.any(":" in v for v in pvlist):
544
      _ThrowError("Some of your PVs have the invalid character ':' in their"
545
                  " name, this is not supported - please filter them out"
546
                  " in lvm.conf using either 'filter' or 'preferred_names'")
547
    free_size = sum([pv.free for pv in pvs_info])
548
    current_pvs = len(pvlist)
549
    desired_stripes = params[constants.LDP_STRIPES]
550
    stripes = min(current_pvs, desired_stripes)
551
    if stripes < desired_stripes:
552
      logging.warning("Could not use %d stripes for VG %s, as only %d PVs are"
553
                      " available.", desired_stripes, vg_name, current_pvs)
554

    
555
    # The size constraint should have been checked from the master before
556
    # calling the create function.
557
    if free_size < size:
558
      _ThrowError("Not enough free space: required %s,"
559
                  " available %s", size, free_size)
560
    cmd = ["lvcreate", "-L%dm" % size, "-n%s" % lv_name]
561
    # If the free space is not well distributed, we won't be able to
562
    # create an optimally-striped volume; in that case, we want to try
563
    # with N, N-1, ..., 2, and finally 1 (non-stripped) number of
564
    # stripes
565
    for stripes_arg in range(stripes, 0, -1):
566
      result = utils.RunCmd(cmd + ["-i%d" % stripes_arg] + [vg_name] + pvlist)
567
      if not result.failed:
568
        break
569
    if result.failed:
570
      _ThrowError("LV create failed (%s): %s",
571
                  result.fail_reason, result.output)
572
    return LogicalVolume(unique_id, children, size, params)
573

    
574
  @staticmethod
575
  def _GetVolumeInfo(lvm_cmd, fields):
576
    """Returns LVM Volumen infos using lvm_cmd
577

578
    @param lvm_cmd: Should be one of "pvs", "vgs" or "lvs"
579
    @param fields: Fields to return
580
    @return: A list of dicts each with the parsed fields
581

582
    """
583
    if not fields:
584
      raise errors.ProgrammerError("No fields specified")
585

    
586
    sep = "|"
587
    cmd = [lvm_cmd, "--noheadings", "--nosuffix", "--units=m", "--unbuffered",
588
           "--separator=%s" % sep, "-o%s" % ",".join(fields)]
589

    
590
    result = utils.RunCmd(cmd)
591
    if result.failed:
592
      raise errors.CommandError("Can't get the volume information: %s - %s" %
593
                                (result.fail_reason, result.output))
594

    
595
    data = []
596
    for line in result.stdout.splitlines():
597
      splitted_fields = line.strip().split(sep)
598

    
599
      if len(fields) != len(splitted_fields):
600
        raise errors.CommandError("Can't parse %s output: line '%s'" %
601
                                  (lvm_cmd, line))
602

    
603
      data.append(splitted_fields)
604

    
605
    return data
606

    
607
  @classmethod
608
  def GetPVInfo(cls, vg_names, filter_allocatable=True):
609
    """Get the free space info for PVs in a volume group.
610

611
    @param vg_names: list of volume group names, if empty all will be returned
612
    @param filter_allocatable: whether to skip over unallocatable PVs
613

614
    @rtype: list
615
    @return: list of objects.LvmPvInfo objects
616

617
    """
618
    try:
619
      info = cls._GetVolumeInfo("pvs", ["pv_name", "vg_name", "pv_free",
620
                                        "pv_attr", "pv_size"])
621
    except errors.GenericError, err:
622
      logging.error("Can't get PV information: %s", err)
623
      return None
624

    
625
    data = []
626
    for (pv_name, vg_name, pv_free, pv_attr, pv_size) in info:
627
      # (possibly) skip over pvs which are not allocatable
628
      if filter_allocatable and pv_attr[0] != "a":
629
        continue
630
      # (possibly) skip over pvs which are not in the right volume group(s)
631
      if vg_names and vg_name not in vg_names:
632
        continue
633
      pvi = objects.LvmPvInfo(name=pv_name, vg_name=vg_name,
634
                              size=float(pv_size), free=float(pv_free),
635
                              attributes=pv_attr)
636
      data.append(pvi)
637

    
638
    return data
639

    
640
  @classmethod
641
  def GetVGInfo(cls, vg_names, filter_readonly=True):
642
    """Get the free space info for specific VGs.
643

644
    @param vg_names: list of volume group names, if empty all will be returned
645
    @param filter_readonly: whether to skip over readonly VGs
646

647
    @rtype: list
648
    @return: list of tuples (free_space, total_size, name) with free_space in
649
             MiB
650

651
    """
652
    try:
653
      info = cls._GetVolumeInfo("vgs", ["vg_name", "vg_free", "vg_attr",
654
                                        "vg_size"])
655
    except errors.GenericError, err:
656
      logging.error("Can't get VG information: %s", err)
657
      return None
658

    
659
    data = []
660
    for vg_name, vg_free, vg_attr, vg_size in info:
661
      # (possibly) skip over vgs which are not writable
662
      if filter_readonly and vg_attr[0] == "r":
663
        continue
664
      # (possibly) skip over vgs which are not in the right volume group(s)
665
      if vg_names and vg_name not in vg_names:
666
        continue
667
      data.append((float(vg_free), float(vg_size), vg_name))
668

    
669
    return data
670

    
671
  @classmethod
672
  def _ValidateName(cls, name):
673
    """Validates that a given name is valid as VG or LV name.
674

675
    The list of valid characters and restricted names is taken out of
676
    the lvm(8) manpage, with the simplification that we enforce both
677
    VG and LV restrictions on the names.
678

679
    """
680
    if (not cls._VALID_NAME_RE.match(name) or
681
        name in cls._INVALID_NAMES or
682
        compat.any(substring in name for substring in cls._INVALID_SUBSTRINGS)):
683
      _ThrowError("Invalid LVM name '%s'", name)
684

    
685
  def Remove(self):
686
    """Remove this logical volume.
687

688
    """
689
    if not self.minor and not self.Attach():
690
      # the LV does not exist
691
      return
692
    result = utils.RunCmd(["lvremove", "-f", "%s/%s" %
693
                           (self._vg_name, self._lv_name)])
694
    if result.failed:
695
      _ThrowError("Can't lvremove: %s - %s", result.fail_reason, result.output)
696

    
697
  def Rename(self, new_id):
698
    """Rename this logical volume.
699

700
    """
701
    if not isinstance(new_id, (tuple, list)) or len(new_id) != 2:
702
      raise errors.ProgrammerError("Invalid new logical id '%s'" % new_id)
703
    new_vg, new_name = new_id
704
    if new_vg != self._vg_name:
705
      raise errors.ProgrammerError("Can't move a logical volume across"
706
                                   " volume groups (from %s to to %s)" %
707
                                   (self._vg_name, new_vg))
708
    result = utils.RunCmd(["lvrename", new_vg, self._lv_name, new_name])
709
    if result.failed:
710
      _ThrowError("Failed to rename the logical volume: %s", result.output)
711
    self._lv_name = new_name
712
    self.dev_path = utils.PathJoin("/dev", self._vg_name, self._lv_name)
713

    
714
  def Attach(self):
715
    """Attach to an existing LV.
716

717
    This method will try to see if an existing and active LV exists
718
    which matches our name. If so, its major/minor will be
719
    recorded.
720

721
    """
722
    self.attached = False
723
    result = utils.RunCmd(["lvs", "--noheadings", "--separator=,",
724
                           "--units=m", "--nosuffix",
725
                           "-olv_attr,lv_kernel_major,lv_kernel_minor,"
726
                           "vg_extent_size,stripes", self.dev_path])
727
    if result.failed:
728
      logging.error("Can't find LV %s: %s, %s",
729
                    self.dev_path, result.fail_reason, result.output)
730
      return False
731
    # the output can (and will) have multiple lines for multi-segment
732
    # LVs, as the 'stripes' parameter is a segment one, so we take
733
    # only the last entry, which is the one we're interested in; note
734
    # that with LVM2 anyway the 'stripes' value must be constant
735
    # across segments, so this is a no-op actually
736
    out = result.stdout.splitlines()
737
    if not out: # totally empty result? splitlines() returns at least
738
                # one line for any non-empty string
739
      logging.error("Can't parse LVS output, no lines? Got '%s'", str(out))
740
      return False
741
    out = out[-1].strip().rstrip(",")
742
    out = out.split(",")
743
    if len(out) != 5:
744
      logging.error("Can't parse LVS output, len(%s) != 5", str(out))
745
      return False
746

    
747
    status, major, minor, pe_size, stripes = out
748
    if len(status) < 6:
749
      logging.error("lvs lv_attr is not at least 6 characters (%s)", status)
750
      return False
751

    
752
    try:
753
      major = int(major)
754
      minor = int(minor)
755
    except (TypeError, ValueError), err:
756
      logging.error("lvs major/minor cannot be parsed: %s", str(err))
757

    
758
    try:
759
      pe_size = int(float(pe_size))
760
    except (TypeError, ValueError), err:
761
      logging.error("Can't parse vg extent size: %s", err)
762
      return False
763

    
764
    try:
765
      stripes = int(stripes)
766
    except (TypeError, ValueError), err:
767
      logging.error("Can't parse the number of stripes: %s", err)
768
      return False
769

    
770
    self.major = major
771
    self.minor = minor
772
    self.pe_size = pe_size
773
    self.stripe_count = stripes
774
    self._degraded = status[0] == "v" # virtual volume, i.e. doesn't backing
775
                                      # storage
776
    self.attached = True
777
    return True
778

    
779
  def Assemble(self):
780
    """Assemble the device.
781

782
    We always run `lvchange -ay` on the LV to ensure it's active before
783
    use, as there were cases when xenvg was not active after boot
784
    (also possibly after disk issues).
785

786
    """
787
    result = utils.RunCmd(["lvchange", "-ay", self.dev_path])
788
    if result.failed:
789
      _ThrowError("Can't activate lv %s: %s", self.dev_path, result.output)
790

    
791
  def Shutdown(self):
792
    """Shutdown the device.
793

794
    This is a no-op for the LV device type, as we don't deactivate the
795
    volumes on shutdown.
796

797
    """
798
    pass
799

    
800
  def GetSyncStatus(self):
801
    """Returns the sync status of the device.
802

803
    If this device is a mirroring device, this function returns the
804
    status of the mirror.
805

806
    For logical volumes, sync_percent and estimated_time are always
807
    None (no recovery in progress, as we don't handle the mirrored LV
808
    case). The is_degraded parameter is the inverse of the ldisk
809
    parameter.
810

811
    For the ldisk parameter, we check if the logical volume has the
812
    'virtual' type, which means it's not backed by existing storage
813
    anymore (read from it return I/O error). This happens after a
814
    physical disk failure and subsequent 'vgreduce --removemissing' on
815
    the volume group.
816

817
    The status was already read in Attach, so we just return it.
818

819
    @rtype: objects.BlockDevStatus
820

821
    """
822
    if self._degraded:
823
      ldisk_status = constants.LDS_FAULTY
824
    else:
825
      ldisk_status = constants.LDS_OKAY
826

    
827
    return objects.BlockDevStatus(dev_path=self.dev_path,
828
                                  major=self.major,
829
                                  minor=self.minor,
830
                                  sync_percent=None,
831
                                  estimated_time=None,
832
                                  is_degraded=self._degraded,
833
                                  ldisk_status=ldisk_status)
834

    
835
  def Open(self, force=False):
836
    """Make the device ready for I/O.
837

838
    This is a no-op for the LV device type.
839

840
    """
841
    pass
842

    
843
  def Close(self):
844
    """Notifies that the device will no longer be used for I/O.
845

846
    This is a no-op for the LV device type.
847

848
    """
849
    pass
850

    
851
  def Snapshot(self, size):
852
    """Create a snapshot copy of an lvm block device.
853

854
    @returns: tuple (vg, lv)
855

856
    """
857
    snap_name = self._lv_name + ".snap"
858

    
859
    # remove existing snapshot if found
860
    snap = LogicalVolume((self._vg_name, snap_name), None, size, self.params)
861
    _IgnoreError(snap.Remove)
862

    
863
    vg_info = self.GetVGInfo([self._vg_name])
864
    if not vg_info:
865
      _ThrowError("Can't compute VG info for vg %s", self._vg_name)
866
    free_size, _, _ = vg_info[0]
867
    if free_size < size:
868
      _ThrowError("Not enough free space: required %s,"
869
                  " available %s", size, free_size)
870

    
871
    _CheckResult(utils.RunCmd(["lvcreate", "-L%dm" % size, "-s",
872
                               "-n%s" % snap_name, self.dev_path]))
873

    
874
    return (self._vg_name, snap_name)
875

    
876
  def _RemoveOldInfo(self):
877
    """Try to remove old tags from the lv.
878

879
    """
880
    result = utils.RunCmd(["lvs", "-o", "tags", "--noheadings", "--nosuffix",
881
                           self.dev_path])
882
    _CheckResult(result)
883

    
884
    raw_tags = result.stdout.strip()
885
    if raw_tags:
886
      for tag in raw_tags.split(","):
887
        _CheckResult(utils.RunCmd(["lvchange", "--deltag",
888
                                   tag.strip(), self.dev_path]))
889

    
890
  def SetInfo(self, text):
891
    """Update metadata with info text.
892

893
    """
894
    BlockDev.SetInfo(self, text)
895

    
896
    self._RemoveOldInfo()
897

    
898
    # Replace invalid characters
899
    text = re.sub("^[^A-Za-z0-9_+.]", "_", text)
900
    text = re.sub("[^-A-Za-z0-9_+.]", "_", text)
901

    
902
    # Only up to 128 characters are allowed
903
    text = text[:128]
904

    
905
    _CheckResult(utils.RunCmd(["lvchange", "--addtag", text, self.dev_path]))
906

    
907
  def Grow(self, amount, dryrun, backingstore):
908
    """Grow the logical volume.
909

910
    """
911
    if not backingstore:
912
      return
913
    if self.pe_size is None or self.stripe_count is None:
914
      if not self.Attach():
915
        _ThrowError("Can't attach to LV during Grow()")
916
    full_stripe_size = self.pe_size * self.stripe_count
917
    rest = amount % full_stripe_size
918
    if rest != 0:
919
      amount += full_stripe_size - rest
920
    cmd = ["lvextend", "-L", "+%dm" % amount]
921
    if dryrun:
922
      cmd.append("--test")
923
    # we try multiple algorithms since the 'best' ones might not have
924
    # space available in the right place, but later ones might (since
925
    # they have less constraints); also note that only recent LVM
926
    # supports 'cling'
927
    for alloc_policy in "contiguous", "cling", "normal":
928
      result = utils.RunCmd(cmd + ["--alloc", alloc_policy, self.dev_path])
929
      if not result.failed:
930
        return
931
    _ThrowError("Can't grow LV %s: %s", self.dev_path, result.output)
932

    
933

    
934
class DRBD8Status(object):
935
  """A DRBD status representation class.
936

937
  Note that this doesn't support unconfigured devices (cs:Unconfigured).
938

939
  """
940
  UNCONF_RE = re.compile(r"\s*[0-9]+:\s*cs:Unconfigured$")
941
  LINE_RE = re.compile(r"\s*[0-9]+:\s*cs:(\S+)\s+(?:st|ro):([^/]+)/(\S+)"
942
                       "\s+ds:([^/]+)/(\S+)\s+.*$")
943
  SYNC_RE = re.compile(r"^.*\ssync'ed:\s*([0-9.]+)%.*"
944
                       # Due to a bug in drbd in the kernel, introduced in
945
                       # commit 4b0715f096 (still unfixed as of 2011-08-22)
946
                       "(?:\s|M)"
947
                       "finish: ([0-9]+):([0-9]+):([0-9]+)\s.*$")
948

    
949
  CS_UNCONFIGURED = "Unconfigured"
950
  CS_STANDALONE = "StandAlone"
951
  CS_WFCONNECTION = "WFConnection"
952
  CS_WFREPORTPARAMS = "WFReportParams"
953
  CS_CONNECTED = "Connected"
954
  CS_STARTINGSYNCS = "StartingSyncS"
955
  CS_STARTINGSYNCT = "StartingSyncT"
956
  CS_WFBITMAPS = "WFBitMapS"
957
  CS_WFBITMAPT = "WFBitMapT"
958
  CS_WFSYNCUUID = "WFSyncUUID"
959
  CS_SYNCSOURCE = "SyncSource"
960
  CS_SYNCTARGET = "SyncTarget"
961
  CS_PAUSEDSYNCS = "PausedSyncS"
962
  CS_PAUSEDSYNCT = "PausedSyncT"
963
  CSET_SYNC = compat.UniqueFrozenset([
964
    CS_WFREPORTPARAMS,
965
    CS_STARTINGSYNCS,
966
    CS_STARTINGSYNCT,
967
    CS_WFBITMAPS,
968
    CS_WFBITMAPT,
969
    CS_WFSYNCUUID,
970
    CS_SYNCSOURCE,
971
    CS_SYNCTARGET,
972
    CS_PAUSEDSYNCS,
973
    CS_PAUSEDSYNCT,
974
    ])
975

    
976
  DS_DISKLESS = "Diskless"
977
  DS_ATTACHING = "Attaching" # transient state
978
  DS_FAILED = "Failed" # transient state, next: diskless
979
  DS_NEGOTIATING = "Negotiating" # transient state
980
  DS_INCONSISTENT = "Inconsistent" # while syncing or after creation
981
  DS_OUTDATED = "Outdated"
982
  DS_DUNKNOWN = "DUnknown" # shown for peer disk when not connected
983
  DS_CONSISTENT = "Consistent"
984
  DS_UPTODATE = "UpToDate" # normal state
985

    
986
  RO_PRIMARY = "Primary"
987
  RO_SECONDARY = "Secondary"
988
  RO_UNKNOWN = "Unknown"
989

    
990
  def __init__(self, procline):
991
    u = self.UNCONF_RE.match(procline)
992
    if u:
993
      self.cstatus = self.CS_UNCONFIGURED
994
      self.lrole = self.rrole = self.ldisk = self.rdisk = None
995
    else:
996
      m = self.LINE_RE.match(procline)
997
      if not m:
998
        raise errors.BlockDeviceError("Can't parse input data '%s'" % procline)
999
      self.cstatus = m.group(1)
1000
      self.lrole = m.group(2)
1001
      self.rrole = m.group(3)
1002
      self.ldisk = m.group(4)
1003
      self.rdisk = m.group(5)
1004

    
1005
    # end reading of data from the LINE_RE or UNCONF_RE
1006

    
1007
    self.is_standalone = self.cstatus == self.CS_STANDALONE
1008
    self.is_wfconn = self.cstatus == self.CS_WFCONNECTION
1009
    self.is_connected = self.cstatus == self.CS_CONNECTED
1010
    self.is_primary = self.lrole == self.RO_PRIMARY
1011
    self.is_secondary = self.lrole == self.RO_SECONDARY
1012
    self.peer_primary = self.rrole == self.RO_PRIMARY
1013
    self.peer_secondary = self.rrole == self.RO_SECONDARY
1014
    self.both_primary = self.is_primary and self.peer_primary
1015
    self.both_secondary = self.is_secondary and self.peer_secondary
1016

    
1017
    self.is_diskless = self.ldisk == self.DS_DISKLESS
1018
    self.is_disk_uptodate = self.ldisk == self.DS_UPTODATE
1019

    
1020
    self.is_in_resync = self.cstatus in self.CSET_SYNC
1021
    self.is_in_use = self.cstatus != self.CS_UNCONFIGURED
1022

    
1023
    m = self.SYNC_RE.match(procline)
1024
    if m:
1025
      self.sync_percent = float(m.group(1))
1026
      hours = int(m.group(2))
1027
      minutes = int(m.group(3))
1028
      seconds = int(m.group(4))
1029
      self.est_time = hours * 3600 + minutes * 60 + seconds
1030
    else:
1031
      # we have (in this if branch) no percent information, but if
1032
      # we're resyncing we need to 'fake' a sync percent information,
1033
      # as this is how cmdlib determines if it makes sense to wait for
1034
      # resyncing or not
1035
      if self.is_in_resync:
1036
        self.sync_percent = 0
1037
      else:
1038
        self.sync_percent = None
1039
      self.est_time = None
1040

    
1041

    
1042
class BaseDRBD(BlockDev): # pylint: disable=W0223
1043
  """Base DRBD class.
1044

1045
  This class contains a few bits of common functionality between the
1046
  0.7 and 8.x versions of DRBD.
1047

1048
  """
1049
  _VERSION_RE = re.compile(r"^version: (\d+)\.(\d+)\.(\d+)(?:\.\d+)?"
1050
                           r" \(api:(\d+)/proto:(\d+)(?:-(\d+))?\)")
1051
  _VALID_LINE_RE = re.compile("^ *([0-9]+): cs:([^ ]+).*$")
1052
  _UNUSED_LINE_RE = re.compile("^ *([0-9]+): cs:Unconfigured$")
1053

    
1054
  _DRBD_MAJOR = 147
1055
  _ST_UNCONFIGURED = "Unconfigured"
1056
  _ST_WFCONNECTION = "WFConnection"
1057
  _ST_CONNECTED = "Connected"
1058

    
1059
  _STATUS_FILE = constants.DRBD_STATUS_FILE
1060
  _USERMODE_HELPER_FILE = "/sys/module/drbd/parameters/usermode_helper"
1061

    
1062
  @staticmethod
1063
  def _GetProcData(filename=_STATUS_FILE):
1064
    """Return data from /proc/drbd.
1065

1066
    """
1067
    try:
1068
      data = utils.ReadFile(filename).splitlines()
1069
    except EnvironmentError, err:
1070
      if err.errno == errno.ENOENT:
1071
        _ThrowError("The file %s cannot be opened, check if the module"
1072
                    " is loaded (%s)", filename, str(err))
1073
      else:
1074
        _ThrowError("Can't read the DRBD proc file %s: %s", filename, str(err))
1075
    if not data:
1076
      _ThrowError("Can't read any data from %s", filename)
1077
    return data
1078

    
1079
  @classmethod
1080
  def _MassageProcData(cls, data):
1081
    """Transform the output of _GetProdData into a nicer form.
1082

1083
    @return: a dictionary of minor: joined lines from /proc/drbd
1084
        for that minor
1085

1086
    """
1087
    results = {}
1088
    old_minor = old_line = None
1089
    for line in data:
1090
      if not line: # completely empty lines, as can be returned by drbd8.0+
1091
        continue
1092
      lresult = cls._VALID_LINE_RE.match(line)
1093
      if lresult is not None:
1094
        if old_minor is not None:
1095
          results[old_minor] = old_line
1096
        old_minor = int(lresult.group(1))
1097
        old_line = line
1098
      else:
1099
        if old_minor is not None:
1100
          old_line += " " + line.strip()
1101
    # add last line
1102
    if old_minor is not None:
1103
      results[old_minor] = old_line
1104
    return results
1105

    
1106
  @classmethod
1107
  def _GetVersion(cls, proc_data):
1108
    """Return the DRBD version.
1109

1110
    This will return a dict with keys:
1111
      - k_major
1112
      - k_minor
1113
      - k_point
1114
      - api
1115
      - proto
1116
      - proto2 (only on drbd > 8.2.X)
1117

1118
    """
1119
    first_line = proc_data[0].strip()
1120
    version = cls._VERSION_RE.match(first_line)
1121
    if not version:
1122
      raise errors.BlockDeviceError("Can't parse DRBD version from '%s'" %
1123
                                    first_line)
1124

    
1125
    values = version.groups()
1126
    retval = {
1127
      "k_major": int(values[0]),
1128
      "k_minor": int(values[1]),
1129
      "k_point": int(values[2]),
1130
      "api": int(values[3]),
1131
      "proto": int(values[4]),
1132
      }
1133
    if values[5] is not None:
1134
      retval["proto2"] = values[5]
1135

    
1136
    return retval
1137

    
1138
  @staticmethod
1139
  def GetUsermodeHelper(filename=_USERMODE_HELPER_FILE):
1140
    """Returns DRBD usermode_helper currently set.
1141

1142
    """
1143
    try:
1144
      helper = utils.ReadFile(filename).splitlines()[0]
1145
    except EnvironmentError, err:
1146
      if err.errno == errno.ENOENT:
1147
        _ThrowError("The file %s cannot be opened, check if the module"
1148
                    " is loaded (%s)", filename, str(err))
1149
      else:
1150
        _ThrowError("Can't read DRBD helper file %s: %s", filename, str(err))
1151
    if not helper:
1152
      _ThrowError("Can't read any data from %s", filename)
1153
    return helper
1154

    
1155
  @staticmethod
1156
  def _DevPath(minor):
1157
    """Return the path to a drbd device for a given minor.
1158

1159
    """
1160
    return "/dev/drbd%d" % minor
1161

    
1162
  @classmethod
1163
  def GetUsedDevs(cls):
1164
    """Compute the list of used DRBD devices.
1165

1166
    """
1167
    data = cls._GetProcData()
1168

    
1169
    used_devs = {}
1170
    for line in data:
1171
      match = cls._VALID_LINE_RE.match(line)
1172
      if not match:
1173
        continue
1174
      minor = int(match.group(1))
1175
      state = match.group(2)
1176
      if state == cls._ST_UNCONFIGURED:
1177
        continue
1178
      used_devs[minor] = state, line
1179

    
1180
    return used_devs
1181

    
1182
  def _SetFromMinor(self, minor):
1183
    """Set our parameters based on the given minor.
1184

1185
    This sets our minor variable and our dev_path.
1186

1187
    """
1188
    if minor is None:
1189
      self.minor = self.dev_path = None
1190
      self.attached = False
1191
    else:
1192
      self.minor = minor
1193
      self.dev_path = self._DevPath(minor)
1194
      self.attached = True
1195

    
1196
  @staticmethod
1197
  def _CheckMetaSize(meta_device):
1198
    """Check if the given meta device looks like a valid one.
1199

1200
    This currently only checks the size, which must be around
1201
    128MiB.
1202

1203
    """
1204
    result = utils.RunCmd(["blockdev", "--getsize", meta_device])
1205
    if result.failed:
1206
      _ThrowError("Failed to get device size: %s - %s",
1207
                  result.fail_reason, result.output)
1208
    try:
1209
      sectors = int(result.stdout)
1210
    except (TypeError, ValueError):
1211
      _ThrowError("Invalid output from blockdev: '%s'", result.stdout)
1212
    num_bytes = sectors * 512
1213
    if num_bytes < 128 * 1024 * 1024: # less than 128MiB
1214
      _ThrowError("Meta device too small (%.2fMib)", (num_bytes / 1024 / 1024))
1215
    # the maximum *valid* size of the meta device when living on top
1216
    # of LVM is hard to compute: it depends on the number of stripes
1217
    # and the PE size; e.g. a 2-stripe, 64MB PE will result in a 128MB
1218
    # (normal size), but an eight-stripe 128MB PE will result in a 1GB
1219
    # size meta device; as such, we restrict it to 1GB (a little bit
1220
    # too generous, but making assumptions about PE size is hard)
1221
    if num_bytes > 1024 * 1024 * 1024:
1222
      _ThrowError("Meta device too big (%.2fMiB)", (num_bytes / 1024 / 1024))
1223

    
1224
  def Rename(self, new_id):
1225
    """Rename a device.
1226

1227
    This is not supported for drbd devices.
1228

1229
    """
1230
    raise errors.ProgrammerError("Can't rename a drbd device")
1231

    
1232

    
1233
class DRBD8(BaseDRBD):
1234
  """DRBD v8.x block device.
1235

1236
  This implements the local host part of the DRBD device, i.e. it
1237
  doesn't do anything to the supposed peer. If you need a fully
1238
  connected DRBD pair, you need to use this class on both hosts.
1239

1240
  The unique_id for the drbd device is a (local_ip, local_port,
1241
  remote_ip, remote_port, local_minor, secret) tuple, and it must have
1242
  two children: the data device and the meta_device. The meta device
1243
  is checked for valid size and is zeroed on create.
1244

1245
  """
1246
  _MAX_MINORS = 255
1247
  _PARSE_SHOW = None
1248

    
1249
  # timeout constants
1250
  _NET_RECONFIG_TIMEOUT = 60
1251

    
1252
  # command line options for barriers
1253
  _DISABLE_DISK_OPTION = "--no-disk-barrier"  # -a
1254
  _DISABLE_DRAIN_OPTION = "--no-disk-drain"   # -D
1255
  _DISABLE_FLUSH_OPTION = "--no-disk-flushes" # -i
1256
  _DISABLE_META_FLUSH_OPTION = "--no-md-flushes"  # -m
1257

    
1258
  def __init__(self, unique_id, children, size, params):
1259
    if children and children.count(None) > 0:
1260
      children = []
1261
    if len(children) not in (0, 2):
1262
      raise ValueError("Invalid configuration data %s" % str(children))
1263
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 6:
1264
      raise ValueError("Invalid configuration data %s" % str(unique_id))
1265
    (self._lhost, self._lport,
1266
     self._rhost, self._rport,
1267
     self._aminor, self._secret) = unique_id
1268
    if children:
1269
      if not _CanReadDevice(children[1].dev_path):
1270
        logging.info("drbd%s: Ignoring unreadable meta device", self._aminor)
1271
        children = []
1272
    super(DRBD8, self).__init__(unique_id, children, size, params)
1273
    self.major = self._DRBD_MAJOR
1274
    version = self._GetVersion(self._GetProcData())
1275
    if version["k_major"] != 8:
1276
      _ThrowError("Mismatch in DRBD kernel version and requested ganeti"
1277
                  " usage: kernel is %s.%s, ganeti wants 8.x",
1278
                  version["k_major"], version["k_minor"])
1279

    
1280
    if (self._lhost is not None and self._lhost == self._rhost and
1281
        self._lport == self._rport):
1282
      raise ValueError("Invalid configuration data, same local/remote %s" %
1283
                       (unique_id,))
1284
    self.Attach()
1285

    
1286
  @classmethod
1287
  def _InitMeta(cls, minor, dev_path):
1288
    """Initialize a meta device.
1289

1290
    This will not work if the given minor is in use.
1291

1292
    """
1293
    # Zero the metadata first, in order to make sure drbdmeta doesn't
1294
    # try to auto-detect existing filesystems or similar (see
1295
    # http://code.google.com/p/ganeti/issues/detail?id=182); we only
1296
    # care about the first 128MB of data in the device, even though it
1297
    # can be bigger
1298
    result = utils.RunCmd([constants.DD_CMD,
1299
                           "if=/dev/zero", "of=%s" % dev_path,
1300
                           "bs=1048576", "count=128", "oflag=direct"])
1301
    if result.failed:
1302
      _ThrowError("Can't wipe the meta device: %s", result.output)
1303

    
1304
    result = utils.RunCmd(["drbdmeta", "--force", cls._DevPath(minor),
1305
                           "v08", dev_path, "0", "create-md"])
1306
    if result.failed:
1307
      _ThrowError("Can't initialize meta device: %s", result.output)
1308

    
1309
  @classmethod
1310
  def _FindUnusedMinor(cls):
1311
    """Find an unused DRBD device.
1312

1313
    This is specific to 8.x as the minors are allocated dynamically,
1314
    so non-existing numbers up to a max minor count are actually free.
1315

1316
    """
1317
    data = cls._GetProcData()
1318

    
1319
    highest = None
1320
    for line in data:
1321
      match = cls._UNUSED_LINE_RE.match(line)
1322
      if match:
1323
        return int(match.group(1))
1324
      match = cls._VALID_LINE_RE.match(line)
1325
      if match:
1326
        minor = int(match.group(1))
1327
        highest = max(highest, minor)
1328
    if highest is None: # there are no minors in use at all
1329
      return 0
1330
    if highest >= cls._MAX_MINORS:
1331
      logging.error("Error: no free drbd minors!")
1332
      raise errors.BlockDeviceError("Can't find a free DRBD minor")
1333
    return highest + 1
1334

    
1335
  @classmethod
1336
  def _GetShowParser(cls):
1337
    """Return a parser for `drbd show` output.
1338

1339
    This will either create or return an already-created parser for the
1340
    output of the command `drbd show`.
1341

1342
    """
1343
    if cls._PARSE_SHOW is not None:
1344
      return cls._PARSE_SHOW
1345

    
1346
    # pyparsing setup
1347
    lbrace = pyp.Literal("{").suppress()
1348
    rbrace = pyp.Literal("}").suppress()
1349
    lbracket = pyp.Literal("[").suppress()
1350
    rbracket = pyp.Literal("]").suppress()
1351
    semi = pyp.Literal(";").suppress()
1352
    colon = pyp.Literal(":").suppress()
1353
    # this also converts the value to an int
1354
    number = pyp.Word(pyp.nums).setParseAction(lambda s, l, t: int(t[0]))
1355

    
1356
    comment = pyp.Literal("#") + pyp.Optional(pyp.restOfLine)
1357
    defa = pyp.Literal("_is_default").suppress()
1358
    dbl_quote = pyp.Literal('"').suppress()
1359

    
1360
    keyword = pyp.Word(pyp.alphanums + "-")
1361

    
1362
    # value types
1363
    value = pyp.Word(pyp.alphanums + "_-/.:")
1364
    quoted = dbl_quote + pyp.CharsNotIn('"') + dbl_quote
1365
    ipv4_addr = (pyp.Optional(pyp.Literal("ipv4")).suppress() +
1366
                 pyp.Word(pyp.nums + ".") + colon + number)
1367
    ipv6_addr = (pyp.Optional(pyp.Literal("ipv6")).suppress() +
1368
                 pyp.Optional(lbracket) + pyp.Word(pyp.hexnums + ":") +
1369
                 pyp.Optional(rbracket) + colon + number)
1370
    # meta device, extended syntax
1371
    meta_value = ((value ^ quoted) + lbracket + number + rbracket)
1372
    # device name, extended syntax
1373
    device_value = pyp.Literal("minor").suppress() + number
1374

    
1375
    # a statement
1376
    stmt = (~rbrace + keyword + ~lbrace +
1377
            pyp.Optional(ipv4_addr ^ ipv6_addr ^ value ^ quoted ^ meta_value ^
1378
                         device_value) +
1379
            pyp.Optional(defa) + semi +
1380
            pyp.Optional(pyp.restOfLine).suppress())
1381

    
1382
    # an entire section
1383
    section_name = pyp.Word(pyp.alphas + "_")
1384
    section = section_name + lbrace + pyp.ZeroOrMore(pyp.Group(stmt)) + rbrace
1385

    
1386
    bnf = pyp.ZeroOrMore(pyp.Group(section ^ stmt))
1387
    bnf.ignore(comment)
1388

    
1389
    cls._PARSE_SHOW = bnf
1390

    
1391
    return bnf
1392

    
1393
  @classmethod
1394
  def _GetShowData(cls, minor):
1395
    """Return the `drbdsetup show` data for a minor.
1396

1397
    """
1398
    result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "show"])
1399
    if result.failed:
1400
      logging.error("Can't display the drbd config: %s - %s",
1401
                    result.fail_reason, result.output)
1402
      return None
1403
    return result.stdout
1404

    
1405
  @classmethod
1406
  def _GetDevInfo(cls, out):
1407
    """Parse details about a given DRBD minor.
1408

1409
    This return, if available, the local backing device (as a path)
1410
    and the local and remote (ip, port) information from a string
1411
    containing the output of the `drbdsetup show` command as returned
1412
    by _GetShowData.
1413

1414
    """
1415
    data = {}
1416
    if not out:
1417
      return data
1418

    
1419
    bnf = cls._GetShowParser()
1420
    # run pyparse
1421

    
1422
    try:
1423
      results = bnf.parseString(out)
1424
    except pyp.ParseException, err:
1425
      _ThrowError("Can't parse drbdsetup show output: %s", str(err))
1426

    
1427
    # and massage the results into our desired format
1428
    for section in results:
1429
      sname = section[0]
1430
      if sname == "_this_host":
1431
        for lst in section[1:]:
1432
          if lst[0] == "disk":
1433
            data["local_dev"] = lst[1]
1434
          elif lst[0] == "meta-disk":
1435
            data["meta_dev"] = lst[1]
1436
            data["meta_index"] = lst[2]
1437
          elif lst[0] == "address":
1438
            data["local_addr"] = tuple(lst[1:])
1439
      elif sname == "_remote_host":
1440
        for lst in section[1:]:
1441
          if lst[0] == "address":
1442
            data["remote_addr"] = tuple(lst[1:])
1443
    return data
1444

    
1445
  def _MatchesLocal(self, info):
1446
    """Test if our local config matches with an existing device.
1447

1448
    The parameter should be as returned from `_GetDevInfo()`. This
1449
    method tests if our local backing device is the same as the one in
1450
    the info parameter, in effect testing if we look like the given
1451
    device.
1452

1453
    """
1454
    if self._children:
1455
      backend, meta = self._children
1456
    else:
1457
      backend = meta = None
1458

    
1459
    if backend is not None:
1460
      retval = ("local_dev" in info and info["local_dev"] == backend.dev_path)
1461
    else:
1462
      retval = ("local_dev" not in info)
1463

    
1464
    if meta is not None:
1465
      retval = retval and ("meta_dev" in info and
1466
                           info["meta_dev"] == meta.dev_path)
1467
      retval = retval and ("meta_index" in info and
1468
                           info["meta_index"] == 0)
1469
    else:
1470
      retval = retval and ("meta_dev" not in info and
1471
                           "meta_index" not in info)
1472
    return retval
1473

    
1474
  def _MatchesNet(self, info):
1475
    """Test if our network config matches with an existing device.
1476

1477
    The parameter should be as returned from `_GetDevInfo()`. This
1478
    method tests if our network configuration is the same as the one
1479
    in the info parameter, in effect testing if we look like the given
1480
    device.
1481

1482
    """
1483
    if (((self._lhost is None and not ("local_addr" in info)) and
1484
         (self._rhost is None and not ("remote_addr" in info)))):
1485
      return True
1486

    
1487
    if self._lhost is None:
1488
      return False
1489

    
1490
    if not ("local_addr" in info and
1491
            "remote_addr" in info):
1492
      return False
1493

    
1494
    retval = (info["local_addr"] == (self._lhost, self._lport))
1495
    retval = (retval and
1496
              info["remote_addr"] == (self._rhost, self._rport))
1497
    return retval
1498

    
1499
  def _AssembleLocal(self, minor, backend, meta, size):
1500
    """Configure the local part of a DRBD device.
1501

1502
    """
1503
    args = ["drbdsetup", self._DevPath(minor), "disk",
1504
            backend, meta, "0",
1505
            "-e", "detach",
1506
            "--create-device"]
1507
    if size:
1508
      args.extend(["-d", "%sm" % size])
1509

    
1510
    version = self._GetVersion(self._GetProcData())
1511
    vmaj = version["k_major"]
1512
    vmin = version["k_minor"]
1513
    vrel = version["k_point"]
1514

    
1515
    barrier_args = \
1516
      self._ComputeDiskBarrierArgs(vmaj, vmin, vrel,
1517
                                   self.params[constants.LDP_BARRIERS],
1518
                                   self.params[constants.LDP_NO_META_FLUSH])
1519
    args.extend(barrier_args)
1520

    
1521
    if self.params[constants.LDP_DISK_CUSTOM]:
1522
      args.extend(shlex.split(self.params[constants.LDP_DISK_CUSTOM]))
1523

    
1524
    result = utils.RunCmd(args)
1525
    if result.failed:
1526
      _ThrowError("drbd%d: can't attach local disk: %s", minor, result.output)
1527

    
1528
  @classmethod
1529
  def _ComputeDiskBarrierArgs(cls, vmaj, vmin, vrel, disabled_barriers,
1530
                              disable_meta_flush):
1531
    """Compute the DRBD command line parameters for disk barriers
1532

1533
    Returns a list of the disk barrier parameters as requested via the
1534
    disabled_barriers and disable_meta_flush arguments, and according to the
1535
    supported ones in the DRBD version vmaj.vmin.vrel
1536

1537
    If the desired option is unsupported, raises errors.BlockDeviceError.
1538

1539
    """
1540
    disabled_barriers_set = frozenset(disabled_barriers)
1541
    if not disabled_barriers_set in constants.DRBD_VALID_BARRIER_OPT:
1542
      raise errors.BlockDeviceError("%s is not a valid option set for DRBD"
1543
                                    " barriers" % disabled_barriers)
1544

    
1545
    args = []
1546

    
1547
    # The following code assumes DRBD 8.x, with x < 4 and x != 1 (DRBD 8.1.x
1548
    # does not exist)
1549
    if not vmaj == 8 and vmin in (0, 2, 3):
1550
      raise errors.BlockDeviceError("Unsupported DRBD version: %d.%d.%d" %
1551
                                    (vmaj, vmin, vrel))
1552

    
1553
    def _AppendOrRaise(option, min_version):
1554
      """Helper for DRBD options"""
1555
      if min_version is not None and vrel >= min_version:
1556
        args.append(option)
1557
      else:
1558
        raise errors.BlockDeviceError("Could not use the option %s as the"
1559
                                      " DRBD version %d.%d.%d does not support"
1560
                                      " it." % (option, vmaj, vmin, vrel))
1561

    
1562
    # the minimum version for each feature is encoded via pairs of (minor
1563
    # version -> x) where x is version in which support for the option was
1564
    # introduced.
1565
    meta_flush_supported = disk_flush_supported = {
1566
      0: 12,
1567
      2: 7,
1568
      3: 0,
1569
      }
1570

    
1571
    disk_drain_supported = {
1572
      2: 7,
1573
      3: 0,
1574
      }
1575

    
1576
    disk_barriers_supported = {
1577
      3: 0,
1578
      }
1579

    
1580
    # meta flushes
1581
    if disable_meta_flush:
1582
      _AppendOrRaise(cls._DISABLE_META_FLUSH_OPTION,
1583
                     meta_flush_supported.get(vmin, None))
1584

    
1585
    # disk flushes
1586
    if constants.DRBD_B_DISK_FLUSH in disabled_barriers_set:
1587
      _AppendOrRaise(cls._DISABLE_FLUSH_OPTION,
1588
                     disk_flush_supported.get(vmin, None))
1589

    
1590
    # disk drain
1591
    if constants.DRBD_B_DISK_DRAIN in disabled_barriers_set:
1592
      _AppendOrRaise(cls._DISABLE_DRAIN_OPTION,
1593
                     disk_drain_supported.get(vmin, None))
1594

    
1595
    # disk barriers
1596
    if constants.DRBD_B_DISK_BARRIERS in disabled_barriers_set:
1597
      _AppendOrRaise(cls._DISABLE_DISK_OPTION,
1598
                     disk_barriers_supported.get(vmin, None))
1599

    
1600
    return args
1601

    
1602
  def _AssembleNet(self, minor, net_info, protocol,
1603
                   dual_pri=False, hmac=None, secret=None):
1604
    """Configure the network part of the device.
1605

1606
    """
1607
    lhost, lport, rhost, rport = net_info
1608
    if None in net_info:
1609
      # we don't want network connection and actually want to make
1610
      # sure its shutdown
1611
      self._ShutdownNet(minor)
1612
      return
1613

    
1614
    # Workaround for a race condition. When DRBD is doing its dance to
1615
    # establish a connection with its peer, it also sends the
1616
    # synchronization speed over the wire. In some cases setting the
1617
    # sync speed only after setting up both sides can race with DRBD
1618
    # connecting, hence we set it here before telling DRBD anything
1619
    # about its peer.
1620
    sync_errors = self._SetMinorSyncParams(minor, self.params)
1621
    if sync_errors:
1622
      _ThrowError("drbd%d: can't set the synchronization parameters: %s" %
1623
                  (minor, utils.CommaJoin(sync_errors)))
1624

    
1625
    if netutils.IP6Address.IsValid(lhost):
1626
      if not netutils.IP6Address.IsValid(rhost):
1627
        _ThrowError("drbd%d: can't connect ip %s to ip %s" %
1628
                    (minor, lhost, rhost))
1629
      family = "ipv6"
1630
    elif netutils.IP4Address.IsValid(lhost):
1631
      if not netutils.IP4Address.IsValid(rhost):
1632
        _ThrowError("drbd%d: can't connect ip %s to ip %s" %
1633
                    (minor, lhost, rhost))
1634
      family = "ipv4"
1635
    else:
1636
      _ThrowError("drbd%d: Invalid ip %s" % (minor, lhost))
1637

    
1638
    args = ["drbdsetup", self._DevPath(minor), "net",
1639
            "%s:%s:%s" % (family, lhost, lport),
1640
            "%s:%s:%s" % (family, rhost, rport), protocol,
1641
            "-A", "discard-zero-changes",
1642
            "-B", "consensus",
1643
            "--create-device",
1644
            ]
1645
    if dual_pri:
1646
      args.append("-m")
1647
    if hmac and secret:
1648
      args.extend(["-a", hmac, "-x", secret])
1649

    
1650
    if self.params[constants.LDP_NET_CUSTOM]:
1651
      args.extend(shlex.split(self.params[constants.LDP_NET_CUSTOM]))
1652

    
1653
    result = utils.RunCmd(args)
1654
    if result.failed:
1655
      _ThrowError("drbd%d: can't setup network: %s - %s",
1656
                  minor, result.fail_reason, result.output)
1657

    
1658
    def _CheckNetworkConfig():
1659
      info = self._GetDevInfo(self._GetShowData(minor))
1660
      if not "local_addr" in info or not "remote_addr" in info:
1661
        raise utils.RetryAgain()
1662

    
1663
      if (info["local_addr"] != (lhost, lport) or
1664
          info["remote_addr"] != (rhost, rport)):
1665
        raise utils.RetryAgain()
1666

    
1667
    try:
1668
      utils.Retry(_CheckNetworkConfig, 1.0, 10.0)
1669
    except utils.RetryTimeout:
1670
      _ThrowError("drbd%d: timeout while configuring network", minor)
1671

    
1672
  def AddChildren(self, devices):
1673
    """Add a disk to the DRBD device.
1674

1675
    """
1676
    if self.minor is None:
1677
      _ThrowError("drbd%d: can't attach to dbrd8 during AddChildren",
1678
                  self._aminor)
1679
    if len(devices) != 2:
1680
      _ThrowError("drbd%d: need two devices for AddChildren", self.minor)
1681
    info = self._GetDevInfo(self._GetShowData(self.minor))
1682
    if "local_dev" in info:
1683
      _ThrowError("drbd%d: already attached to a local disk", self.minor)
1684
    backend, meta = devices
1685
    if backend.dev_path is None or meta.dev_path is None:
1686
      _ThrowError("drbd%d: children not ready during AddChildren", self.minor)
1687
    backend.Open()
1688
    meta.Open()
1689
    self._CheckMetaSize(meta.dev_path)
1690
    self._InitMeta(self._FindUnusedMinor(), meta.dev_path)
1691

    
1692
    self._AssembleLocal(self.minor, backend.dev_path, meta.dev_path, self.size)
1693
    self._children = devices
1694

    
1695
  def RemoveChildren(self, devices):
1696
    """Detach the drbd device from local storage.
1697

1698
    """
1699
    if self.minor is None:
1700
      _ThrowError("drbd%d: can't attach to drbd8 during RemoveChildren",
1701
                  self._aminor)
1702
    # early return if we don't actually have backing storage
1703
    info = self._GetDevInfo(self._GetShowData(self.minor))
1704
    if "local_dev" not in info:
1705
      return
1706
    if len(self._children) != 2:
1707
      _ThrowError("drbd%d: we don't have two children: %s", self.minor,
1708
                  self._children)
1709
    if self._children.count(None) == 2: # we don't actually have children :)
1710
      logging.warning("drbd%d: requested detach while detached", self.minor)
1711
      return
1712
    if len(devices) != 2:
1713
      _ThrowError("drbd%d: we need two children in RemoveChildren", self.minor)
1714
    for child, dev in zip(self._children, devices):
1715
      if dev != child.dev_path:
1716
        _ThrowError("drbd%d: mismatch in local storage (%s != %s) in"
1717
                    " RemoveChildren", self.minor, dev, child.dev_path)
1718

    
1719
    self._ShutdownLocal(self.minor)
1720
    self._children = []
1721

    
1722
  @classmethod
1723
  def _SetMinorSyncParams(cls, minor, params):
1724
    """Set the parameters of the DRBD syncer.
1725

1726
    This is the low-level implementation.
1727

1728
    @type minor: int
1729
    @param minor: the drbd minor whose settings we change
1730
    @type params: dict
1731
    @param params: LD level disk parameters related to the synchronization
1732
    @rtype: list
1733
    @return: a list of error messages
1734

1735
    """
1736

    
1737
    args = ["drbdsetup", cls._DevPath(minor), "syncer"]
1738
    if params[constants.LDP_DYNAMIC_RESYNC]:
1739
      version = cls._GetVersion(cls._GetProcData())
1740
      vmin = version["k_minor"]
1741
      vrel = version["k_point"]
1742

    
1743
      # By definition we are using 8.x, so just check the rest of the version
1744
      # number
1745
      if vmin != 3 or vrel < 9:
1746
        msg = ("The current DRBD version (8.%d.%d) does not support the "
1747
               "dynamic resync speed controller" % (vmin, vrel))
1748
        logging.error(msg)
1749
        return [msg]
1750

    
1751
      if params[constants.LDP_PLAN_AHEAD] == 0:
1752
        msg = ("A value of 0 for c-plan-ahead disables the dynamic sync speed"
1753
               " controller at DRBD level. If you want to disable it, please"
1754
               " set the dynamic-resync disk parameter to False.")
1755
        logging.error(msg)
1756
        return [msg]
1757

    
1758
      # add the c-* parameters to args
1759
      args.extend(["--c-plan-ahead", params[constants.LDP_PLAN_AHEAD],
1760
                   "--c-fill-target", params[constants.LDP_FILL_TARGET],
1761
                   "--c-delay-target", params[constants.LDP_DELAY_TARGET],
1762
                   "--c-max-rate", params[constants.LDP_MAX_RATE],
1763
                   "--c-min-rate", params[constants.LDP_MIN_RATE],
1764
                   ])
1765

    
1766
    else:
1767
      args.extend(["-r", "%d" % params[constants.LDP_RESYNC_RATE]])
1768

    
1769
    args.append("--create-device")
1770
    result = utils.RunCmd(args)
1771
    if result.failed:
1772
      msg = ("Can't change syncer rate: %s - %s" %
1773
             (result.fail_reason, result.output))
1774
      logging.error(msg)
1775
      return [msg]
1776

    
1777
    return []
1778

    
1779
  def SetSyncParams(self, params):
1780
    """Set the synchronization parameters of the DRBD syncer.
1781

1782
    @type params: dict
1783
    @param params: LD level disk parameters related to the synchronization
1784
    @rtype: list
1785
    @return: a list of error messages, emitted both by the current node and by
1786
    children. An empty list means no errors
1787

1788
    """
1789
    if self.minor is None:
1790
      err = "Not attached during SetSyncParams"
1791
      logging.info(err)
1792
      return [err]
1793

    
1794
    children_result = super(DRBD8, self).SetSyncParams(params)
1795
    children_result.extend(self._SetMinorSyncParams(self.minor, params))
1796
    return children_result
1797

    
1798
  def PauseResumeSync(self, pause):
1799
    """Pauses or resumes the sync of a DRBD device.
1800

1801
    @param pause: Wether to pause or resume
1802
    @return: the success of the operation
1803

1804
    """
1805
    if self.minor is None:
1806
      logging.info("Not attached during PauseSync")
1807
      return False
1808

    
1809
    children_result = super(DRBD8, self).PauseResumeSync(pause)
1810

    
1811
    if pause:
1812
      cmd = "pause-sync"
1813
    else:
1814
      cmd = "resume-sync"
1815

    
1816
    result = utils.RunCmd(["drbdsetup", self.dev_path, cmd])
1817
    if result.failed:
1818
      logging.error("Can't %s: %s - %s", cmd,
1819
                    result.fail_reason, result.output)
1820
    return not result.failed and children_result
1821

    
1822
  def GetProcStatus(self):
1823
    """Return device data from /proc.
1824

1825
    """
1826
    if self.minor is None:
1827
      _ThrowError("drbd%d: GetStats() called while not attached", self._aminor)
1828
    proc_info = self._MassageProcData(self._GetProcData())
1829
    if self.minor not in proc_info:
1830
      _ThrowError("drbd%d: can't find myself in /proc", self.minor)
1831
    return DRBD8Status(proc_info[self.minor])
1832

    
1833
  def GetSyncStatus(self):
1834
    """Returns the sync status of the device.
1835

1836

1837
    If sync_percent is None, it means all is ok
1838
    If estimated_time is None, it means we can't estimate
1839
    the time needed, otherwise it's the time left in seconds.
1840

1841

1842
    We set the is_degraded parameter to True on two conditions:
1843
    network not connected or local disk missing.
1844

1845
    We compute the ldisk parameter based on whether we have a local
1846
    disk or not.
1847

1848
    @rtype: objects.BlockDevStatus
1849

1850
    """
1851
    if self.minor is None and not self.Attach():
1852
      _ThrowError("drbd%d: can't Attach() in GetSyncStatus", self._aminor)
1853

    
1854
    stats = self.GetProcStatus()
1855
    is_degraded = not stats.is_connected or not stats.is_disk_uptodate
1856

    
1857
    if stats.is_disk_uptodate:
1858
      ldisk_status = constants.LDS_OKAY
1859
    elif stats.is_diskless:
1860
      ldisk_status = constants.LDS_FAULTY
1861
    else:
1862
      ldisk_status = constants.LDS_UNKNOWN
1863

    
1864
    return objects.BlockDevStatus(dev_path=self.dev_path,
1865
                                  major=self.major,
1866
                                  minor=self.minor,
1867
                                  sync_percent=stats.sync_percent,
1868
                                  estimated_time=stats.est_time,
1869
                                  is_degraded=is_degraded,
1870
                                  ldisk_status=ldisk_status)
1871

    
1872
  def Open(self, force=False):
1873
    """Make the local state primary.
1874

1875
    If the 'force' parameter is given, the '-o' option is passed to
1876
    drbdsetup. Since this is a potentially dangerous operation, the
1877
    force flag should be only given after creation, when it actually
1878
    is mandatory.
1879

1880
    """
1881
    if self.minor is None and not self.Attach():
1882
      logging.error("DRBD cannot attach to a device during open")
1883
      return False
1884
    cmd = ["drbdsetup", self.dev_path, "primary"]
1885
    if force:
1886
      cmd.append("-o")
1887
    result = utils.RunCmd(cmd)
1888
    if result.failed:
1889
      _ThrowError("drbd%d: can't make drbd device primary: %s", self.minor,
1890
                  result.output)
1891

    
1892
  def Close(self):
1893
    """Make the local state secondary.
1894

1895
    This will, of course, fail if the device is in use.
1896

1897
    """
1898
    if self.minor is None and not self.Attach():
1899
      _ThrowError("drbd%d: can't Attach() in Close()", self._aminor)
1900
    result = utils.RunCmd(["drbdsetup", self.dev_path, "secondary"])
1901
    if result.failed:
1902
      _ThrowError("drbd%d: can't switch drbd device to secondary: %s",
1903
                  self.minor, result.output)
1904

    
1905
  def DisconnectNet(self):
1906
    """Removes network configuration.
1907

1908
    This method shutdowns the network side of the device.
1909

1910
    The method will wait up to a hardcoded timeout for the device to
1911
    go into standalone after the 'disconnect' command before
1912
    re-configuring it, as sometimes it takes a while for the
1913
    disconnect to actually propagate and thus we might issue a 'net'
1914
    command while the device is still connected. If the device will
1915
    still be attached to the network and we time out, we raise an
1916
    exception.
1917

1918
    """
1919
    if self.minor is None:
1920
      _ThrowError("drbd%d: disk not attached in re-attach net", self._aminor)
1921

    
1922
    if None in (self._lhost, self._lport, self._rhost, self._rport):
1923
      _ThrowError("drbd%d: DRBD disk missing network info in"
1924
                  " DisconnectNet()", self.minor)
1925

    
1926
    class _DisconnectStatus:
1927
      def __init__(self, ever_disconnected):
1928
        self.ever_disconnected = ever_disconnected
1929

    
1930
    dstatus = _DisconnectStatus(_IgnoreError(self._ShutdownNet, self.minor))
1931

    
1932
    def _WaitForDisconnect():
1933
      if self.GetProcStatus().is_standalone:
1934
        return
1935

    
1936
      # retry the disconnect, it seems possible that due to a well-time
1937
      # disconnect on the peer, my disconnect command might be ignored and
1938
      # forgotten
1939
      dstatus.ever_disconnected = \
1940
        _IgnoreError(self._ShutdownNet, self.minor) or dstatus.ever_disconnected
1941

    
1942
      raise utils.RetryAgain()
1943

    
1944
    # Keep start time
1945
    start_time = time.time()
1946

    
1947
    try:
1948
      # Start delay at 100 milliseconds and grow up to 2 seconds
1949
      utils.Retry(_WaitForDisconnect, (0.1, 1.5, 2.0),
1950
                  self._NET_RECONFIG_TIMEOUT)
1951
    except utils.RetryTimeout:
1952
      if dstatus.ever_disconnected:
1953
        msg = ("drbd%d: device did not react to the"
1954
               " 'disconnect' command in a timely manner")
1955
      else:
1956
        msg = "drbd%d: can't shutdown network, even after multiple retries"
1957

    
1958
      _ThrowError(msg, self.minor)
1959

    
1960
    reconfig_time = time.time() - start_time
1961
    if reconfig_time > (self._NET_RECONFIG_TIMEOUT * 0.25):
1962
      logging.info("drbd%d: DisconnectNet: detach took %.3f seconds",
1963
                   self.minor, reconfig_time)
1964

    
1965
  def AttachNet(self, multimaster):
1966
    """Reconnects the network.
1967

1968
    This method connects the network side of the device with a
1969
    specified multi-master flag. The device needs to be 'Standalone'
1970
    but have valid network configuration data.
1971

1972
    Args:
1973
      - multimaster: init the network in dual-primary mode
1974

1975
    """
1976
    if self.minor is None:
1977
      _ThrowError("drbd%d: device not attached in AttachNet", self._aminor)
1978

    
1979
    if None in (self._lhost, self._lport, self._rhost, self._rport):
1980
      _ThrowError("drbd%d: missing network info in AttachNet()", self.minor)
1981

    
1982
    status = self.GetProcStatus()
1983

    
1984
    if not status.is_standalone:
1985
      _ThrowError("drbd%d: device is not standalone in AttachNet", self.minor)
1986

    
1987
    self._AssembleNet(self.minor,
1988
                      (self._lhost, self._lport, self._rhost, self._rport),
1989
                      constants.DRBD_NET_PROTOCOL, dual_pri=multimaster,
1990
                      hmac=constants.DRBD_HMAC_ALG, secret=self._secret)
1991

    
1992
  def Attach(self):
1993
    """Check if our minor is configured.
1994

1995
    This doesn't do any device configurations - it only checks if the
1996
    minor is in a state different from Unconfigured.
1997

1998
    Note that this function will not change the state of the system in
1999
    any way (except in case of side-effects caused by reading from
2000
    /proc).
2001

2002
    """
2003
    used_devs = self.GetUsedDevs()
2004
    if self._aminor in used_devs:
2005
      minor = self._aminor
2006
    else:
2007
      minor = None
2008

    
2009
    self._SetFromMinor(minor)
2010
    return minor is not None
2011

    
2012
  def Assemble(self):
2013
    """Assemble the drbd.
2014

2015
    Method:
2016
      - if we have a configured device, we try to ensure that it matches
2017
        our config
2018
      - if not, we create it from zero
2019
      - anyway, set the device parameters
2020

2021
    """
2022
    super(DRBD8, self).Assemble()
2023

    
2024
    self.Attach()
2025
    if self.minor is None:
2026
      # local device completely unconfigured
2027
      self._FastAssemble()
2028
    else:
2029
      # we have to recheck the local and network status and try to fix
2030
      # the device
2031
      self._SlowAssemble()
2032

    
2033
    sync_errors = self.SetSyncParams(self.params)
2034
    if sync_errors:
2035
      _ThrowError("drbd%d: can't set the synchronization parameters: %s" %
2036
                  (self.minor, utils.CommaJoin(sync_errors)))
2037

    
2038
  def _SlowAssemble(self):
2039
    """Assembles the DRBD device from a (partially) configured device.
2040

2041
    In case of partially attached (local device matches but no network
2042
    setup), we perform the network attach. If successful, we re-test
2043
    the attach if can return success.
2044

2045
    """
2046
    # TODO: Rewrite to not use a for loop just because there is 'break'
2047
    # pylint: disable=W0631
2048
    net_data = (self._lhost, self._lport, self._rhost, self._rport)
2049
    for minor in (self._aminor,):
2050
      info = self._GetDevInfo(self._GetShowData(minor))
2051
      match_l = self._MatchesLocal(info)
2052
      match_r = self._MatchesNet(info)
2053

    
2054
      if match_l and match_r:
2055
        # everything matches
2056
        break
2057

    
2058
      if match_l and not match_r and "local_addr" not in info:
2059
        # disk matches, but not attached to network, attach and recheck
2060
        self._AssembleNet(minor, net_data, constants.DRBD_NET_PROTOCOL,
2061
                          hmac=constants.DRBD_HMAC_ALG, secret=self._secret)
2062
        if self._MatchesNet(self._GetDevInfo(self._GetShowData(minor))):
2063
          break
2064
        else:
2065
          _ThrowError("drbd%d: network attach successful, but 'drbdsetup"
2066
                      " show' disagrees", minor)
2067

    
2068
      if match_r and "local_dev" not in info:
2069
        # no local disk, but network attached and it matches
2070
        self._AssembleLocal(minor, self._children[0].dev_path,
2071
                            self._children[1].dev_path, self.size)
2072
        if self._MatchesNet(self._GetDevInfo(self._GetShowData(minor))):
2073
          break
2074
        else:
2075
          _ThrowError("drbd%d: disk attach successful, but 'drbdsetup"
2076
                      " show' disagrees", minor)
2077

    
2078
      # this case must be considered only if we actually have local
2079
      # storage, i.e. not in diskless mode, because all diskless
2080
      # devices are equal from the point of view of local
2081
      # configuration
2082
      if (match_l and "local_dev" in info and
2083
          not match_r and "local_addr" in info):
2084
        # strange case - the device network part points to somewhere
2085
        # else, even though its local storage is ours; as we own the
2086
        # drbd space, we try to disconnect from the remote peer and
2087
        # reconnect to our correct one
2088
        try:
2089
          self._ShutdownNet(minor)
2090
        except errors.BlockDeviceError, err:
2091
          _ThrowError("drbd%d: device has correct local storage, wrong"
2092
                      " remote peer and is unable to disconnect in order"
2093
                      " to attach to the correct peer: %s", minor, str(err))
2094
        # note: _AssembleNet also handles the case when we don't want
2095
        # local storage (i.e. one or more of the _[lr](host|port) is
2096
        # None)
2097
        self._AssembleNet(minor, net_data, constants.DRBD_NET_PROTOCOL,
2098
                          hmac=constants.DRBD_HMAC_ALG, secret=self._secret)
2099
        if self._MatchesNet(self._GetDevInfo(self._GetShowData(minor))):
2100
          break
2101
        else:
2102
          _ThrowError("drbd%d: network attach successful, but 'drbdsetup"
2103
                      " show' disagrees", minor)
2104

    
2105
    else:
2106
      minor = None
2107

    
2108
    self._SetFromMinor(minor)
2109
    if minor is None:
2110
      _ThrowError("drbd%d: cannot activate, unknown or unhandled reason",
2111
                  self._aminor)
2112

    
2113
  def _FastAssemble(self):
2114
    """Assemble the drbd device from zero.
2115

2116
    This is run when in Assemble we detect our minor is unused.
2117

2118
    """
2119
    minor = self._aminor
2120
    if self._children and self._children[0] and self._children[1]:
2121
      self._AssembleLocal(minor, self._children[0].dev_path,
2122
                          self._children[1].dev_path, self.size)
2123
    if self._lhost and self._lport and self._rhost and self._rport:
2124
      self._AssembleNet(minor,
2125
                        (self._lhost, self._lport, self._rhost, self._rport),
2126
                        constants.DRBD_NET_PROTOCOL,
2127
                        hmac=constants.DRBD_HMAC_ALG, secret=self._secret)
2128
    self._SetFromMinor(minor)
2129

    
2130
  @classmethod
2131
  def _ShutdownLocal(cls, minor):
2132
    """Detach from the local device.
2133

2134
    I/Os will continue to be served from the remote device. If we
2135
    don't have a remote device, this operation will fail.
2136

2137
    """
2138
    result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "detach"])
2139
    if result.failed:
2140
      _ThrowError("drbd%d: can't detach local disk: %s", minor, result.output)
2141

    
2142
  @classmethod
2143
  def _ShutdownNet(cls, minor):
2144
    """Disconnect from the remote peer.
2145

2146
    This fails if we don't have a local device.
2147

2148
    """
2149
    result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "disconnect"])
2150
    if result.failed:
2151
      _ThrowError("drbd%d: can't shutdown network: %s", minor, result.output)
2152

    
2153
  @classmethod
2154
  def _ShutdownAll(cls, minor):
2155
    """Deactivate the device.
2156

2157
    This will, of course, fail if the device is in use.
2158

2159
    """
2160
    result = utils.RunCmd(["drbdsetup", cls._DevPath(minor), "down"])
2161
    if result.failed:
2162
      _ThrowError("drbd%d: can't shutdown drbd device: %s",
2163
                  minor, result.output)
2164

    
2165
  def Shutdown(self):
2166
    """Shutdown the DRBD device.
2167

2168
    """
2169
    if self.minor is None and not self.Attach():
2170
      logging.info("drbd%d: not attached during Shutdown()", self._aminor)
2171
      return
2172
    minor = self.minor
2173
    self.minor = None
2174
    self.dev_path = None
2175
    self._ShutdownAll(minor)
2176

    
2177
  def Remove(self):
2178
    """Stub remove for DRBD devices.
2179

2180
    """
2181
    self.Shutdown()
2182

    
2183
  @classmethod
2184
  def Create(cls, unique_id, children, size, params):
2185
    """Create a new DRBD8 device.
2186

2187
    Since DRBD devices are not created per se, just assembled, this
2188
    function only initializes the metadata.
2189

2190
    """
2191
    if len(children) != 2:
2192
      raise errors.ProgrammerError("Invalid setup for the drbd device")
2193
    # check that the minor is unused
2194
    aminor = unique_id[4]
2195
    proc_info = cls._MassageProcData(cls._GetProcData())
2196
    if aminor in proc_info:
2197
      status = DRBD8Status(proc_info[aminor])
2198
      in_use = status.is_in_use
2199
    else:
2200
      in_use = False
2201
    if in_use:
2202
      _ThrowError("drbd%d: minor is already in use at Create() time", aminor)
2203
    meta = children[1]
2204
    meta.Assemble()
2205
    if not meta.Attach():
2206
      _ThrowError("drbd%d: can't attach to meta device '%s'",
2207
                  aminor, meta)
2208
    cls._CheckMetaSize(meta.dev_path)
2209
    cls._InitMeta(aminor, meta.dev_path)
2210
    return cls(unique_id, children, size, params)
2211

    
2212
  def Grow(self, amount, dryrun, backingstore):
2213
    """Resize the DRBD device and its backing storage.
2214

2215
    """
2216
    if self.minor is None:
2217
      _ThrowError("drbd%d: Grow called while not attached", self._aminor)
2218
    if len(self._children) != 2 or None in self._children:
2219
      _ThrowError("drbd%d: cannot grow diskless device", self.minor)
2220
    self._children[0].Grow(amount, dryrun, backingstore)
2221
    if dryrun or backingstore:
2222
      # DRBD does not support dry-run mode and is not backing storage,
2223
      # so we'll return here
2224
      return
2225
    result = utils.RunCmd(["drbdsetup", self.dev_path, "resize", "-s",
2226
                           "%dm" % (self.size + amount)])
2227
    if result.failed:
2228
      _ThrowError("drbd%d: resize failed: %s", self.minor, result.output)
2229

    
2230

    
2231
class FileStorage(BlockDev):
2232
  """File device.
2233

2234
  This class represents the a file storage backend device.
2235

2236
  The unique_id for the file device is a (file_driver, file_path) tuple.
2237

2238
  """
2239
  def __init__(self, unique_id, children, size, params):
2240
    """Initalizes a file device backend.
2241

2242
    """
2243
    if children:
2244
      raise errors.BlockDeviceError("Invalid setup for file device")
2245
    super(FileStorage, self).__init__(unique_id, children, size, params)
2246
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2247
      raise ValueError("Invalid configuration data %s" % str(unique_id))
2248
    self.driver = unique_id[0]
2249
    self.dev_path = unique_id[1]
2250

    
2251
    CheckFileStoragePath(self.dev_path)
2252

    
2253
    self.Attach()
2254

    
2255
  def Assemble(self):
2256
    """Assemble the device.
2257

2258
    Checks whether the file device exists, raises BlockDeviceError otherwise.
2259

2260
    """
2261
    if not os.path.exists(self.dev_path):
2262
      _ThrowError("File device '%s' does not exist" % self.dev_path)
2263

    
2264
  def Shutdown(self):
2265
    """Shutdown the device.
2266

2267
    This is a no-op for the file type, as we don't deactivate
2268
    the file on shutdown.
2269

2270
    """
2271
    pass
2272

    
2273
  def Open(self, force=False):
2274
    """Make the device ready for I/O.
2275

2276
    This is a no-op for the file type.
2277

2278
    """
2279
    pass
2280

    
2281
  def Close(self):
2282
    """Notifies that the device will no longer be used for I/O.
2283

2284
    This is a no-op for the file type.
2285

2286
    """
2287
    pass
2288

    
2289
  def Remove(self):
2290
    """Remove the file backing the block device.
2291

2292
    @rtype: boolean
2293
    @return: True if the removal was successful
2294

2295
    """
2296
    try:
2297
      os.remove(self.dev_path)
2298
    except OSError, err:
2299
      if err.errno != errno.ENOENT:
2300
        _ThrowError("Can't remove file '%s': %s", self.dev_path, err)
2301

    
2302
  def Rename(self, new_id):
2303
    """Renames the file.
2304

2305
    """
2306
    # TODO: implement rename for file-based storage
2307
    _ThrowError("Rename is not supported for file-based storage")
2308

    
2309
  def Grow(self, amount, dryrun, backingstore):
2310
    """Grow the file
2311

2312
    @param amount: the amount (in mebibytes) to grow with
2313

2314
    """
2315
    if not backingstore:
2316
      return
2317
    # Check that the file exists
2318
    self.Assemble()
2319
    current_size = self.GetActualSize()
2320
    new_size = current_size + amount * 1024 * 1024
2321
    assert new_size > current_size, "Cannot Grow with a negative amount"
2322
    # We can't really simulate the growth
2323
    if dryrun:
2324
      return
2325
    try:
2326
      f = open(self.dev_path, "a+")
2327
      f.truncate(new_size)
2328
      f.close()
2329
    except EnvironmentError, err:
2330
      _ThrowError("Error in file growth: %", str(err))
2331

    
2332
  def Attach(self):
2333
    """Attach to an existing file.
2334

2335
    Check if this file already exists.
2336

2337
    @rtype: boolean
2338
    @return: True if file exists
2339

2340
    """
2341
    self.attached = os.path.exists(self.dev_path)
2342
    return self.attached
2343

    
2344
  def GetActualSize(self):
2345
    """Return the actual disk size.
2346

2347
    @note: the device needs to be active when this is called
2348

2349
    """
2350
    assert self.attached, "BlockDevice not attached in GetActualSize()"
2351
    try:
2352
      st = os.stat(self.dev_path)
2353
      return st.st_size
2354
    except OSError, err:
2355
      _ThrowError("Can't stat %s: %s", self.dev_path, err)
2356

    
2357
  @classmethod
2358
  def Create(cls, unique_id, children, size, params):
2359
    """Create a new file.
2360

2361
    @param size: the size of file in MiB
2362

2363
    @rtype: L{bdev.FileStorage}
2364
    @return: an instance of FileStorage
2365

2366
    """
2367
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2368
      raise ValueError("Invalid configuration data %s" % str(unique_id))
2369

    
2370
    dev_path = unique_id[1]
2371

    
2372
    CheckFileStoragePath(dev_path)
2373

    
2374
    try:
2375
      fd = os.open(dev_path, os.O_RDWR | os.O_CREAT | os.O_EXCL)
2376
      f = os.fdopen(fd, "w")
2377
      f.truncate(size * 1024 * 1024)
2378
      f.close()
2379
    except EnvironmentError, err:
2380
      if err.errno == errno.EEXIST:
2381
        _ThrowError("File already existing: %s", dev_path)
2382
      _ThrowError("Error in file creation: %", str(err))
2383

    
2384
    return FileStorage(unique_id, children, size, params)
2385

    
2386

    
2387
class PersistentBlockDevice(BlockDev):
2388
  """A block device with persistent node
2389

2390
  May be either directly attached, or exposed through DM (e.g. dm-multipath).
2391
  udev helpers are probably required to give persistent, human-friendly
2392
  names.
2393

2394
  For the time being, pathnames are required to lie under /dev.
2395

2396
  """
2397
  def __init__(self, unique_id, children, size, params):
2398
    """Attaches to a static block device.
2399

2400
    The unique_id is a path under /dev.
2401

2402
    """
2403
    super(PersistentBlockDevice, self).__init__(unique_id, children, size,
2404
                                                params)
2405
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2406
      raise ValueError("Invalid configuration data %s" % str(unique_id))
2407
    self.dev_path = unique_id[1]
2408
    if not os.path.realpath(self.dev_path).startswith("/dev/"):
2409
      raise ValueError("Full path '%s' lies outside /dev" %
2410
                              os.path.realpath(self.dev_path))
2411
    # TODO: this is just a safety guard checking that we only deal with devices
2412
    # we know how to handle. In the future this will be integrated with
2413
    # external storage backends and possible values will probably be collected
2414
    # from the cluster configuration.
2415
    if unique_id[0] != constants.BLOCKDEV_DRIVER_MANUAL:
2416
      raise ValueError("Got persistent block device of invalid type: %s" %
2417
                       unique_id[0])
2418

    
2419
    self.major = self.minor = None
2420
    self.Attach()
2421

    
2422
  @classmethod
2423
  def Create(cls, unique_id, children, size, params):
2424
    """Create a new device
2425

2426
    This is a noop, we only return a PersistentBlockDevice instance
2427

2428
    """
2429
    return PersistentBlockDevice(unique_id, children, 0, params)
2430

    
2431
  def Remove(self):
2432
    """Remove a device
2433

2434
    This is a noop
2435

2436
    """
2437
    pass
2438

    
2439
  def Rename(self, new_id):
2440
    """Rename this device.
2441

2442
    """
2443
    _ThrowError("Rename is not supported for PersistentBlockDev storage")
2444

    
2445
  def Attach(self):
2446
    """Attach to an existing block device.
2447

2448

2449
    """
2450
    self.attached = False
2451
    try:
2452
      st = os.stat(self.dev_path)
2453
    except OSError, err:
2454
      logging.error("Error stat()'ing %s: %s", self.dev_path, str(err))
2455
      return False
2456

    
2457
    if not stat.S_ISBLK(st.st_mode):
2458
      logging.error("%s is not a block device", self.dev_path)
2459
      return False
2460

    
2461
    self.major = os.major(st.st_rdev)
2462
    self.minor = os.minor(st.st_rdev)
2463
    self.attached = True
2464

    
2465
    return True
2466

    
2467
  def Assemble(self):
2468
    """Assemble the device.
2469

2470
    """
2471
    pass
2472

    
2473
  def Shutdown(self):
2474
    """Shutdown the device.
2475

2476
    """
2477
    pass
2478

    
2479
  def Open(self, force=False):
2480
    """Make the device ready for I/O.
2481

2482
    """
2483
    pass
2484

    
2485
  def Close(self):
2486
    """Notifies that the device will no longer be used for I/O.
2487

2488
    """
2489
    pass
2490

    
2491
  def Grow(self, amount, dryrun, backingstore):
2492
    """Grow the logical volume.
2493

2494
    """
2495
    _ThrowError("Grow is not supported for PersistentBlockDev storage")
2496

    
2497

    
2498
class RADOSBlockDevice(BlockDev):
2499
  """A RADOS Block Device (rbd).
2500

2501
  This class implements the RADOS Block Device for the backend. You need
2502
  the rbd kernel driver, the RADOS Tools and a working RADOS cluster for
2503
  this to be functional.
2504

2505
  """
2506
  def __init__(self, unique_id, children, size, params):
2507
    """Attaches to an rbd device.
2508

2509
    """
2510
    super(RADOSBlockDevice, self).__init__(unique_id, children, size, params)
2511
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2512
      raise ValueError("Invalid configuration data %s" % str(unique_id))
2513

    
2514
    self.driver, self.rbd_name = unique_id
2515

    
2516
    self.major = self.minor = None
2517
    self.Attach()
2518

    
2519
  @classmethod
2520
  def Create(cls, unique_id, children, size, params):
2521
    """Create a new rbd device.
2522

2523
    Provision a new rbd volume inside a RADOS pool.
2524

2525
    """
2526
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2527
      raise errors.ProgrammerError("Invalid configuration data %s" %
2528
                                   str(unique_id))
2529
    rbd_pool = params[constants.LDP_POOL]
2530
    rbd_name = unique_id[1]
2531

    
2532
    # Provision a new rbd volume (Image) inside the RADOS cluster.
2533
    cmd = [constants.RBD_CMD, "create", "-p", rbd_pool,
2534
           rbd_name, "--size", "%s" % size]
2535
    result = utils.RunCmd(cmd)
2536
    if result.failed:
2537
      _ThrowError("rbd creation failed (%s): %s",
2538
                  result.fail_reason, result.output)
2539

    
2540
    return RADOSBlockDevice(unique_id, children, size, params)
2541

    
2542
  def Remove(self):
2543
    """Remove the rbd device.
2544

2545
    """
2546
    rbd_pool = self.params[constants.LDP_POOL]
2547
    rbd_name = self.unique_id[1]
2548

    
2549
    if not self.minor and not self.Attach():
2550
      # The rbd device doesn't exist.
2551
      return
2552

    
2553
    # First shutdown the device (remove mappings).
2554
    self.Shutdown()
2555

    
2556
    # Remove the actual Volume (Image) from the RADOS cluster.
2557
    cmd = [constants.RBD_CMD, "rm", "-p", rbd_pool, rbd_name]
2558
    result = utils.RunCmd(cmd)
2559
    if result.failed:
2560
      _ThrowError("Can't remove Volume from cluster with rbd rm: %s - %s",
2561
                  result.fail_reason, result.output)
2562

    
2563
  def Rename(self, new_id):
2564
    """Rename this device.
2565

2566
    """
2567
    pass
2568

    
2569
  def Attach(self):
2570
    """Attach to an existing rbd device.
2571

2572
    This method maps the rbd volume that matches our name with
2573
    an rbd device and then attaches to this device.
2574

2575
    """
2576
    self.attached = False
2577

    
2578
    # Map the rbd volume to a block device under /dev
2579
    self.dev_path = self._MapVolumeToBlockdev(self.unique_id)
2580

    
2581
    try:
2582
      st = os.stat(self.dev_path)
2583
    except OSError, err:
2584
      logging.error("Error stat()'ing %s: %s", self.dev_path, str(err))
2585
      return False
2586

    
2587
    if not stat.S_ISBLK(st.st_mode):
2588
      logging.error("%s is not a block device", self.dev_path)
2589
      return False
2590

    
2591
    self.major = os.major(st.st_rdev)
2592
    self.minor = os.minor(st.st_rdev)
2593
    self.attached = True
2594

    
2595
    return True
2596

    
2597
  def _MapVolumeToBlockdev(self, unique_id):
2598
    """Maps existing rbd volumes to block devices.
2599

2600
    This method should be idempotent if the mapping already exists.
2601

2602
    @rtype: string
2603
    @return: the block device path that corresponds to the volume
2604

2605
    """
2606
    pool = self.params[constants.LDP_POOL]
2607
    name = unique_id[1]
2608

    
2609
    # Check if the mapping already exists.
2610
    showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool]
2611
    result = utils.RunCmd(showmap_cmd)
2612
    if result.failed:
2613
      _ThrowError("rbd showmapped failed (%s): %s",
2614
                  result.fail_reason, result.output)
2615

    
2616
    rbd_dev = self._ParseRbdShowmappedOutput(result.output, name)
2617

    
2618
    if rbd_dev:
2619
      # The mapping exists. Return it.
2620
      return rbd_dev
2621

    
2622
    # The mapping doesn't exist. Create it.
2623
    map_cmd = [constants.RBD_CMD, "map", "-p", pool, name]
2624
    result = utils.RunCmd(map_cmd)
2625
    if result.failed:
2626
      _ThrowError("rbd map failed (%s): %s",
2627
                  result.fail_reason, result.output)
2628

    
2629
    # Find the corresponding rbd device.
2630
    showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool]
2631
    result = utils.RunCmd(showmap_cmd)
2632
    if result.failed:
2633
      _ThrowError("rbd map succeeded, but showmapped failed (%s): %s",
2634
                  result.fail_reason, result.output)
2635

    
2636
    rbd_dev = self._ParseRbdShowmappedOutput(result.output, name)
2637

    
2638
    if not rbd_dev:
2639
      _ThrowError("rbd map succeeded, but could not find the rbd block"
2640
                  " device in output of showmapped, for volume: %s", name)
2641

    
2642
    # The device was successfully mapped. Return it.
2643
    return rbd_dev
2644

    
2645
  @staticmethod
2646
  def _ParseRbdShowmappedOutput(output, volume_name):
2647
    """Parse the output of `rbd showmapped'.
2648

2649
    This method parses the output of `rbd showmapped' and returns
2650
    the rbd block device path (e.g. /dev/rbd0) that matches the
2651
    given rbd volume.
2652

2653
    @type output: string
2654
    @param output: the whole output of `rbd showmapped'
2655
    @type volume_name: string
2656
    @param volume_name: the name of the volume whose device we search for
2657
    @rtype: string or None
2658
    @return: block device path if the volume is mapped, else None
2659

2660
    """
2661
    allfields = 5
2662
    volumefield = 2
2663
    devicefield = 4
2664

    
2665
    field_sep = "\t"
2666

    
2667
    lines = output.splitlines()
2668
    splitted_lines = map(lambda l: l.split(field_sep), lines)
2669

    
2670
    # Check empty output.
2671
    if not splitted_lines:
2672
      _ThrowError("rbd showmapped returned empty output")
2673

    
2674
    # Check showmapped header line, to determine number of fields.
2675
    field_cnt = len(splitted_lines[0])
2676
    if field_cnt != allfields:
2677
      _ThrowError("Cannot parse rbd showmapped output because its format"
2678
                  " seems to have changed; expected %s fields, found %s",
2679
                  allfields, field_cnt)
2680

    
2681
    matched_lines = \
2682
      filter(lambda l: len(l) == allfields and l[volumefield] == volume_name,
2683
             splitted_lines)
2684

    
2685
    if len(matched_lines) > 1:
2686
      _ThrowError("The rbd volume %s is mapped more than once."
2687
                  " This shouldn't happen, try to unmap the extra"
2688
                  " devices manually.", volume_name)
2689

    
2690
    if matched_lines:
2691
      # rbd block device found. Return it.
2692
      rbd_dev = matched_lines[0][devicefield]
2693
      return rbd_dev
2694

    
2695
    # The given volume is not mapped.
2696
    return None
2697

    
2698
  def Assemble(self):
2699
    """Assemble the device.
2700

2701
    """
2702
    pass
2703

    
2704
  def Shutdown(self):
2705
    """Shutdown the device.
2706

2707
    """
2708
    if not self.minor and not self.Attach():
2709
      # The rbd device doesn't exist.
2710
      return
2711

    
2712
    # Unmap the block device from the Volume.
2713
    self._UnmapVolumeFromBlockdev(self.unique_id)
2714

    
2715
    self.minor = None
2716
    self.dev_path = None
2717

    
2718
  def _UnmapVolumeFromBlockdev(self, unique_id):
2719
    """Unmaps the rbd device from the Volume it is mapped.
2720

2721
    Unmaps the rbd device from the Volume it was previously mapped to.
2722
    This method should be idempotent if the Volume isn't mapped.
2723

2724
    """
2725
    pool = self.params[constants.LDP_POOL]
2726
    name = unique_id[1]
2727

    
2728
    # Check if the mapping already exists.
2729
    showmap_cmd = [constants.RBD_CMD, "showmapped", "-p", pool]
2730
    result = utils.RunCmd(showmap_cmd)
2731
    if result.failed:
2732
      _ThrowError("rbd showmapped failed [during unmap](%s): %s",
2733
                  result.fail_reason, result.output)
2734

    
2735
    rbd_dev = self._ParseRbdShowmappedOutput(result.output, name)
2736

    
2737
    if rbd_dev:
2738
      # The mapping exists. Unmap the rbd device.
2739
      unmap_cmd = [constants.RBD_CMD, "unmap", "%s" % rbd_dev]
2740
      result = utils.RunCmd(unmap_cmd)
2741
      if result.failed:
2742
        _ThrowError("rbd unmap failed (%s): %s",
2743
                    result.fail_reason, result.output)
2744

    
2745
  def Open(self, force=False):
2746
    """Make the device ready for I/O.
2747

2748
    """
2749
    pass
2750

    
2751
  def Close(self):
2752
    """Notifies that the device will no longer be used for I/O.
2753

2754
    """
2755
    pass
2756

    
2757
  def Grow(self, amount, dryrun, backingstore):
2758
    """Grow the Volume.
2759

2760
    @type amount: integer
2761
    @param amount: the amount (in mebibytes) to grow with
2762
    @type dryrun: boolean
2763
    @param dryrun: whether to execute the operation in simulation mode
2764
        only, without actually increasing the size
2765

2766
    """
2767
    if not backingstore:
2768
      return
2769
    if not self.Attach():
2770
      _ThrowError("Can't attach to rbd device during Grow()")
2771

    
2772
    if dryrun:
2773
      # the rbd tool does not support dry runs of resize operations.
2774
      # Since rbd volumes are thinly provisioned, we assume
2775
      # there is always enough free space for the operation.
2776
      return
2777

    
2778
    rbd_pool = self.params[constants.LDP_POOL]
2779
    rbd_name = self.unique_id[1]
2780
    new_size = self.size + amount
2781

    
2782
    # Resize the rbd volume (Image) inside the RADOS cluster.
2783
    cmd = [constants.RBD_CMD, "resize", "-p", rbd_pool,
2784
           rbd_name, "--size", "%s" % new_size]
2785
    result = utils.RunCmd(cmd)
2786
    if result.failed:
2787
      _ThrowError("rbd resize failed (%s): %s",
2788
                  result.fail_reason, result.output)
2789

    
2790

    
2791
class ExtStorageDevice(BlockDev):
2792
  """A block device provided by an ExtStorage Provider.
2793

2794
  This class implements the External Storage Interface, which means
2795
  handling of the externally provided block devices.
2796

2797
  """
2798
  def __init__(self, unique_id, children, size, params):
2799
    """Attaches to an extstorage block device.
2800

2801
    """
2802
    super(ExtStorageDevice, self).__init__(unique_id, children, size, params)
2803
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2804
      raise ValueError("Invalid configuration data %s" % str(unique_id))
2805

    
2806
    self.driver, self.vol_name = unique_id
2807
    self.ext_params = params
2808

    
2809
    self.major = self.minor = None
2810
    self.Attach()
2811

    
2812
  @classmethod
2813
  def Create(cls, unique_id, children, size, params):
2814
    """Create a new extstorage device.
2815

2816
    Provision a new volume using an extstorage provider, which will
2817
    then be mapped to a block device.
2818

2819
    """
2820
    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 2:
2821
      raise errors.ProgrammerError("Invalid configuration data %s" %
2822
                                   str(unique_id))
2823

    
2824
    # Call the External Storage's create script,
2825
    # to provision a new Volume inside the External Storage
2826
    _ExtStorageAction(constants.ES_ACTION_CREATE, unique_id,
2827
                      params, str(size))
2828

    
2829
    return ExtStorageDevice(unique_id, children, size, params)
2830

    
2831
  def Remove(self):
2832
    """Remove the extstorage device.
2833

2834
    """
2835
    if not self.minor and not self.Attach():
2836
      # The extstorage device doesn't exist.
2837
      return
2838

    
2839
    # First shutdown the device (remove mappings).
2840
    self.Shutdown()
2841

    
2842
    # Call the External Storage's remove script,
2843
    # to remove the Volume from the External Storage
2844
    _ExtStorageAction(constants.ES_ACTION_REMOVE, self.unique_id,
2845
                      self.ext_params)
2846

    
2847
  def Rename(self, new_id):
2848
    """Rename this device.
2849

2850
    """
2851
    pass
2852

    
2853
  def Attach(self):
2854
    """Attach to an existing extstorage device.
2855

2856
    This method maps the extstorage volume that matches our name with
2857
    a corresponding block device and then attaches to this device.
2858

2859
    """
2860
    self.attached = False
2861

    
2862
    # Call the External Storage's attach script,
2863
    # to attach an existing Volume to a block device under /dev
2864
    self.dev_path = _ExtStorageAction(constants.ES_ACTION_ATTACH,
2865
                                      self.unique_id, self.ext_params)
2866

    
2867
    try:
2868
      st = os.stat(self.dev_path)
2869
    except OSError, err:
2870
      logging.error("Error stat()'ing %s: %s", self.dev_path, str(err))
2871
      return False
2872

    
2873
    if not stat.S_ISBLK(st.st_mode):
2874
      logging.error("%s is not a block device", self.dev_path)
2875
      return False
2876

    
2877
    self.major = os.major(st.st_rdev)
2878
    self.minor = os.minor(st.st_rdev)
2879
    self.attached = True
2880

    
2881
    return True
2882

    
2883
  def Assemble(self):
2884
    """Assemble the device.
2885

2886
    """
2887
    pass
2888

    
2889
  def Shutdown(self):
2890
    """Shutdown the device.
2891

2892
    """
2893
    if not self.minor and not self.Attach():
2894
      # The extstorage device doesn't exist.
2895
      return
2896

    
2897
    # Call the External Storage's detach script,
2898
    # to detach an existing Volume from it's block device under /dev
2899
    _ExtStorageAction(constants.ES_ACTION_DETACH, self.unique_id,
2900
                      self.ext_params)
2901

    
2902
    self.minor = None
2903
    self.dev_path = None
2904

    
2905
  def Open(self, force=False):
2906
    """Make the device ready for I/O.
2907

2908
    """
2909
    pass
2910

    
2911
  def Close(self):
2912
    """Notifies that the device will no longer be used for I/O.
2913

2914
    """
2915
    pass
2916

    
2917
  def Grow(self, amount, dryrun, backingstore):
2918
    """Grow the Volume.
2919

2920
    @type amount: integer
2921
    @param amount: the amount (in mebibytes) to grow with
2922
    @type dryrun: boolean
2923
    @param dryrun: whether to execute the operation in simulation mode
2924
        only, without actually increasing the size
2925

2926
    """
2927
    if not backingstore:
2928
      return
2929
    if not self.Attach():
2930
      _ThrowError("Can't attach to extstorage device during Grow()")
2931

    
2932
    if dryrun:
2933
      # we do not support dry runs of resize operations for now.
2934
      return
2935

    
2936
    new_size = self.size + amount
2937

    
2938
    # Call the External Storage's grow script,
2939
    # to grow an existing Volume inside the External Storage
2940
    _ExtStorageAction(constants.ES_ACTION_GROW, self.unique_id,
2941
                      self.ext_params, str(self.size), grow=str(new_size))
2942

    
2943
  def SetInfo(self, text):
2944
    """Update metadata with info text.
2945

2946
    """
2947
    # Replace invalid characters
2948
    text = re.sub("^[^A-Za-z0-9_+.]", "_", text)
2949
    text = re.sub("[^-A-Za-z0-9_+.]", "_", text)
2950

    
2951
    # Only up to 128 characters are allowed
2952
    text = text[:128]
2953

    
2954
    # Call the External Storage's setinfo script,
2955
    # to set metadata for an existing Volume inside the External Storage
2956
    _ExtStorageAction(constants.ES_ACTION_SETINFO, self.unique_id,
2957
                      self.ext_params, metadata=text)
2958

    
2959

    
2960
def _ExtStorageAction(action, unique_id, ext_params,
2961
                      size=None, grow=None, metadata=None):
2962
  """Take an External Storage action.
2963

2964
  Take an External Storage action concerning or affecting
2965
  a specific Volume inside the External Storage.
2966

2967
  @type action: string
2968
  @param action: which action to perform. One of:
2969
                 create / remove / grow / attach / detach
2970
  @type unique_id: tuple (driver, vol_name)
2971
  @param unique_id: a tuple containing the type of ExtStorage (driver)
2972
                    and the Volume name
2973
  @type ext_params: dict
2974
  @param ext_params: ExtStorage parameters
2975
  @type size: integer
2976
  @param size: the size of the Volume in mebibytes
2977
  @type grow: integer
2978
  @param grow: the new size in mebibytes (after grow)
2979
  @type metadata: string
2980
  @param metadata: metadata info of the Volume, for use by the provider
2981
  @rtype: None or a block device path (during attach)
2982

2983
  """
2984
  driver, vol_name = unique_id
2985

    
2986
  # Create an External Storage instance of type `driver'
2987
  status, inst_es = ExtStorageFromDisk(driver)
2988
  if not status:
2989
    _ThrowError("%s" % inst_es)
2990

    
2991
  # Create the basic environment for the driver's scripts
2992
  create_env = _ExtStorageEnvironment(unique_id, ext_params, size,
2993
                                      grow, metadata)
2994

    
2995
  # Do not use log file for action `attach' as we need
2996
  # to get the output from RunResult
2997
  # TODO: find a way to have a log file for attach too
2998
  logfile = None
2999
  if action is not constants.ES_ACTION_ATTACH:
3000
    logfile = _VolumeLogName(action, driver, vol_name)
3001

    
3002
  # Make sure the given action results in a valid script
3003
  if action not in constants.ES_SCRIPTS:
3004
    _ThrowError("Action '%s' doesn't result in a valid ExtStorage script" %
3005
                action)
3006

    
3007
  # Find out which external script to run according the given action
3008
  script_name = action + "_script"
3009
  script = getattr(inst_es, script_name)
3010

    
3011
  # Run the external script
3012
  result = utils.RunCmd([script], env=create_env,
3013
                        cwd=inst_es.path, output=logfile,)
3014
  if result.failed:
3015
    logging.error("External storage's %s command '%s' returned"
3016
                  " error: %s, logfile: %s, output: %s",
3017
                  action, result.cmd, result.fail_reason,
3018
                  logfile, result.output)
3019

    
3020
    # If logfile is 'None' (during attach), it breaks TailFile
3021
    # TODO: have a log file for attach too
3022
    if action is not constants.ES_ACTION_ATTACH:
3023
      lines = [utils.SafeEncode(val)
3024
               for val in utils.TailFile(logfile, lines=20)]
3025
    else:
3026
      lines = result.output[-20:]
3027

    
3028
    _ThrowError("External storage's %s script failed (%s), last"
3029
                " lines of output:\n%s",
3030
                action, result.fail_reason, "\n".join(lines))
3031

    
3032
  if action == constants.ES_ACTION_ATTACH:
3033
    return result.stdout
3034

    
3035

    
3036
def ExtStorageFromDisk(name, base_dir=None):
3037
  """Create an ExtStorage instance from disk.
3038

3039
  This function will return an ExtStorage instance
3040
  if the given name is a valid ExtStorage name.
3041

3042
  @type base_dir: string
3043
  @keyword base_dir: Base directory containing ExtStorage installations.
3044
                     Defaults to a search in all the ES_SEARCH_PATH dirs.
3045
  @rtype: tuple
3046
  @return: True and the ExtStorage instance if we find a valid one, or
3047
      False and the diagnose message on error
3048

3049
  """
3050
  if base_dir is None:
3051
    es_base_dir = pathutils.ES_SEARCH_PATH
3052
  else:
3053
    es_base_dir = [base_dir]
3054

    
3055
  es_dir = utils.FindFile(name, es_base_dir, os.path.isdir)
3056

    
3057
  if es_dir is None:
3058
    return False, ("Directory for External Storage Provider %s not"
3059
                   " found in search path" % name)
3060

    
3061
  # ES Files dictionary, we will populate it with the absolute path
3062
  # names; if the value is True, then it is a required file, otherwise
3063
  # an optional one
3064
  es_files = dict.fromkeys(constants.ES_SCRIPTS, True)
3065

    
3066
  es_files[constants.ES_PARAMETERS_FILE] = True
3067

    
3068
  for (filename, _) in es_files.items():
3069
    es_files[filename] = utils.PathJoin(es_dir, filename)
3070

    
3071
    try:
3072
      st = os.stat(es_files[filename])
3073
    except EnvironmentError, err:
3074
      return False, ("File '%s' under path '%s' is missing (%s)" %
3075
                     (filename, es_dir, utils.ErrnoOrStr(err)))
3076

    
3077
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
3078
      return False, ("File '%s' under path '%s' is not a regular file" %
3079
                     (filename, es_dir))
3080

    
3081
    if filename in constants.ES_SCRIPTS:
3082
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
3083
        return False, ("File '%s' under path '%s' is not executable" %
3084
                       (filename, es_dir))
3085

    
3086
  parameters = []
3087
  if constants.ES_PARAMETERS_FILE in es_files:
3088
    parameters_file = es_files[constants.ES_PARAMETERS_FILE]
3089
    try:
3090
      parameters = utils.ReadFile(parameters_file).splitlines()
3091
    except EnvironmentError, err:
3092
      return False, ("Error while reading the EXT parameters file at %s: %s" %
3093
                     (parameters_file, utils.ErrnoOrStr(err)))
3094
    parameters = [v.split(None, 1) for v in parameters]
3095

    
3096
  es_obj = \
3097
    objects.ExtStorage(name=name, path=es_dir,
3098
                       create_script=es_files[constants.ES_SCRIPT_CREATE],
3099
                       remove_script=es_files[constants.ES_SCRIPT_REMOVE],
3100
                       grow_script=es_files[constants.ES_SCRIPT_GROW],
3101
                       attach_script=es_files[constants.ES_SCRIPT_ATTACH],
3102
                       detach_script=es_files[constants.ES_SCRIPT_DETACH],
3103
                       setinfo_script=es_files[constants.ES_SCRIPT_SETINFO],
3104
                       verify_script=es_files[constants.ES_SCRIPT_VERIFY],
3105
                       supported_parameters=parameters)
3106
  return True, es_obj
3107

    
3108

    
3109
def _ExtStorageEnvironment(unique_id, ext_params,
3110
                           size=None, grow=None, metadata=None):
3111
  """Calculate the environment for an External Storage script.
3112

3113
  @type unique_id: tuple (driver, vol_name)
3114
  @param unique_id: ExtStorage pool and name of the Volume
3115
  @type ext_params: dict
3116
  @param ext_params: the EXT parameters
3117
  @type size: string
3118
  @param size: size of the Volume (in mebibytes)
3119
  @type grow: string
3120
  @param grow: new size of Volume after grow (in mebibytes)
3121
  @type metadata: string
3122
  @param metadata: metadata info of the Volume
3123
  @rtype: dict
3124
  @return: dict of environment variables
3125

3126
  """
3127
  vol_name = unique_id[1]
3128

    
3129
  result = {}
3130
  result["VOL_NAME"] = vol_name
3131

    
3132
  # EXT params
3133
  for pname, pvalue in ext_params.items():
3134
    result["EXTP_%s" % pname.upper()] = str(pvalue)
3135

    
3136
  if size is not None:
3137
    result["VOL_SIZE"] = size
3138

    
3139
  if grow is not None:
3140
    result["VOL_NEW_SIZE"] = grow
3141

    
3142
  if metadata is not None:
3143
    result["VOL_METADATA"] = metadata
3144

    
3145
  return result
3146

    
3147

    
3148
def _VolumeLogName(kind, es_name, volume):
3149
  """Compute the ExtStorage log filename for a given Volume and operation.
3150

3151
  @type kind: string
3152
  @param kind: the operation type (e.g. create, remove etc.)
3153
  @type es_name: string
3154
  @param es_name: the ExtStorage name
3155
  @type volume: string
3156
  @param volume: the name of the Volume inside the External Storage
3157

3158
  """
3159
  # Check if the extstorage log dir is a valid dir
3160
  if not os.path.isdir(pathutils.LOG_ES_DIR):
3161
    _ThrowError("Cannot find log directory: %s", pathutils.LOG_ES_DIR)
3162

    
3163
  # TODO: Use tempfile.mkstemp to create unique filename
3164
  base = ("%s-%s-%s-%s.log" %
3165
          (kind, es_name, volume, utils.TimestampForFilename()))
3166
  return utils.PathJoin(pathutils.LOG_ES_DIR, base)
3167

    
3168

    
3169
DEV_MAP = {
3170
  constants.LD_LV: LogicalVolume,
3171
  constants.LD_DRBD8: DRBD8,
3172
  constants.LD_BLOCKDEV: PersistentBlockDevice,
3173
  constants.LD_RBD: RADOSBlockDevice,
3174
  constants.LD_EXT: ExtStorageDevice,
3175
  }
3176

    
3177
if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
3178
  DEV_MAP[constants.LD_FILE] = FileStorage
3179

    
3180

    
3181
def _VerifyDiskType(dev_type):
3182
  if dev_type not in DEV_MAP:
3183
    raise errors.ProgrammerError("Invalid block device type '%s'" % dev_type)
3184

    
3185

    
3186
def _VerifyDiskParams(disk):
3187
  """Verifies if all disk parameters are set.
3188

3189
  """
3190
  missing = set(constants.DISK_LD_DEFAULTS[disk.dev_type]) - set(disk.params)
3191
  if missing:
3192
    raise errors.ProgrammerError("Block device is missing disk parameters: %s" %
3193
                                 missing)
3194

    
3195

    
3196
def FindDevice(disk, children):
3197
  """Search for an existing, assembled device.
3198

3199
  This will succeed only if the device exists and is assembled, but it
3200
  does not do any actions in order to activate the device.
3201

3202
  @type disk: L{objects.Disk}
3203
  @param disk: the disk object to find
3204
  @type children: list of L{bdev.BlockDev}
3205
  @param children: the list of block devices that are children of the device
3206
                  represented by the disk parameter
3207

3208
  """
3209
  _VerifyDiskType(disk.dev_type)
3210
  device = DEV_MAP[disk.dev_type](disk.physical_id, children, disk.size,
3211
                                  disk.params)
3212
  if not device.attached:
3213
    return None
3214
  return device
3215

    
3216

    
3217
def Assemble(disk, children):
3218
  """Try to attach or assemble an existing device.
3219

3220
  This will attach to assemble the device, as needed, to bring it
3221
  fully up. It must be safe to run on already-assembled devices.
3222

3223
  @type disk: L{objects.Disk}
3224
  @param disk: the disk object to assemble
3225
  @type children: list of L{bdev.BlockDev}
3226
  @param children: the list of block devices that are children of the device
3227
                  represented by the disk parameter
3228

3229
  """
3230
  _VerifyDiskType(disk.dev_type)
3231
  _VerifyDiskParams(disk)
3232
  device = DEV_MAP[disk.dev_type](disk.physical_id, children, disk.size,
3233
                                  disk.params)
3234
  device.Assemble()
3235
  return device
3236

    
3237

    
3238
def Create(disk, children):
3239
  """Create a device.
3240

3241
  @type disk: L{objects.Disk}
3242
  @param disk: the disk object to create
3243
  @type children: list of L{bdev.BlockDev}
3244
  @param children: the list of block devices that are children of the device
3245
                  represented by the disk parameter
3246

3247
  """
3248
  _VerifyDiskType(disk.dev_type)
3249
  _VerifyDiskParams(disk)
3250
  device = DEV_MAP[disk.dev_type].Create(disk.physical_id, children, disk.size,
3251
                                         disk.params)
3252
  return device