Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 52a8a6ae

History | View | Annotate | Download (98.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node_uuid: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node_uuid)
93
  result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device,
98
                                             lu.cfg.GetNodeName(node_uuid),
99
                                             instance.name))
100
  if device.physical_id is None:
101
    device.physical_id = result.payload
102

    
103

    
104
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
105
                         info, force_open, excl_stor):
106
  """Create a tree of block devices on a given node.
107

108
  If this device type has to be created on secondaries, create it and
109
  all its children.
110

111
  If not, just recurse to children keeping the same 'force' value.
112

113
  @attention: The device has to be annotated already.
114

115
  @param lu: the lu on whose behalf we execute
116
  @param node_uuid: the node on which to create the device
117
  @type instance: L{objects.Instance}
118
  @param instance: the instance which owns the device
119
  @type device: L{objects.Disk}
120
  @param device: the device to create
121
  @type force_create: boolean
122
  @param force_create: whether to force creation of this device; this
123
      will be change to True whenever we find a device which has
124
      CreateOnSecondary() attribute
125
  @param info: the extra 'metadata' we should attach to the device
126
      (this will be represented as a LVM tag)
127
  @type force_open: boolean
128
  @param force_open: this parameter will be passes to the
129
      L{backend.BlockdevCreate} function where it specifies
130
      whether we run on primary or not, and it affects both
131
      the child assembly and the device own Open() execution
132
  @type excl_stor: boolean
133
  @param excl_stor: Whether exclusive_storage is active for the node
134

135
  @return: list of created devices
136
  """
137
  created_devices = []
138
  try:
139
    if device.CreateOnSecondary():
140
      force_create = True
141

    
142
    if device.children:
143
      for child in device.children:
144
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
145
                                    force_create, info, force_open, excl_stor)
146
        created_devices.extend(devs)
147

    
148
    if not force_create:
149
      return created_devices
150

    
151
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
152
                         excl_stor)
153
    # The device has been completely created, so there is no point in keeping
154
    # its subdevices in the list. We just add the device itself instead.
155
    created_devices = [(node_uuid, device)]
156
    return created_devices
157

    
158
  except errors.DeviceCreationError, e:
159
    e.created_devices.extend(created_devices)
160
    raise e
161
  except errors.OpExecError, e:
162
    raise errors.DeviceCreationError(str(e), created_devices)
163

    
164

    
165
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
166
  """Whether exclusive_storage is in effect for the given node.
167

168
  @type cfg: L{config.ConfigWriter}
169
  @param cfg: The cluster configuration
170
  @type node_uuid: string
171
  @param node_uuid: The node UUID
172
  @rtype: bool
173
  @return: The effective value of exclusive_storage
174
  @raise errors.OpPrereqError: if no node exists with the given name
175

176
  """
177
  ni = cfg.GetNodeInfo(node_uuid)
178
  if ni is None:
179
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
180
                               errors.ECODE_NOENT)
181
  return IsExclusiveStorageEnabledNode(cfg, ni)
182

    
183

    
184
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
185
                    force_open):
186
  """Wrapper around L{_CreateBlockDevInner}.
187

188
  This method annotates the root device first.
189

190
  """
191
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
193
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
194
                              force_open, excl_stor)
195

    
196

    
197
def _UndoCreateDisks(lu, disks_created):
198
  """Undo the work performed by L{CreateDisks}.
199

200
  This function is called in case of an error to undo the work of
201
  L{CreateDisks}.
202

203
  @type lu: L{LogicalUnit}
204
  @param lu: the logical unit on whose behalf we execute
205
  @param disks_created: the result returned by L{CreateDisks}
206

207
  """
208
  for (node_uuid, disk) in disks_created:
209
    lu.cfg.SetDiskID(disk, node_uuid)
210
    result = lu.rpc.call_blockdev_remove(node_uuid, disk)
211
    result.Warn("Failed to remove newly-created disk %s on node %s" %
212
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213

    
214

    
215
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
216
  """Create all disks for an instance.
217

218
  This abstracts away some work from AddInstance.
219

220
  @type lu: L{LogicalUnit}
221
  @param lu: the logical unit on whose behalf we execute
222
  @type instance: L{objects.Instance}
223
  @param instance: the instance whose disks we should create
224
  @type to_skip: list
225
  @param to_skip: list of indices to skip
226
  @type target_node_uuid: string
227
  @param target_node_uuid: if passed, overrides the target node for creation
228
  @type disks: list of {objects.Disk}
229
  @param disks: the disks to create; if not specified, all the disks of the
230
      instance are created
231
  @return: information about the created disks, to be used to call
232
      L{_UndoCreateDisks}
233
  @raise errors.OpPrereqError: in case of error
234

235
  """
236
  info = GetInstanceInfoText(instance)
237
  if target_node_uuid is None:
238
    pnode_uuid = instance.primary_node
239
    all_node_uuids = instance.all_nodes
240
  else:
241
    pnode_uuid = target_node_uuid
242
    all_node_uuids = [pnode_uuid]
243

    
244
  if disks is None:
245
    disks = instance.disks
246

    
247
  if instance.disk_template in constants.DTS_FILEBASED:
248
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
249
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
250

    
251
    result.Raise("Failed to create directory '%s' on"
252
                 " node %s" % (file_storage_dir,
253
                               lu.cfg.GetNodeName(pnode_uuid)))
254

    
255
  disks_created = []
256
  for idx, device in enumerate(disks):
257
    if to_skip and idx in to_skip:
258
      continue
259
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
260
    for node_uuid in all_node_uuids:
261
      f_create = node_uuid == pnode_uuid
262
      try:
263
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
264
                        f_create)
265
        disks_created.append((node_uuid, device))
266
      except errors.DeviceCreationError, e:
267
        logging.warning("Creating disk %s for instance '%s' failed",
268
                        idx, instance.name)
269
        disks_created.extend(e.created_devices)
270
        _UndoCreateDisks(lu, disks_created)
271
        raise errors.OpExecError(e.message)
272
  return disks_created
273

    
274

    
275
def ComputeDiskSizePerVG(disk_template, disks):
276
  """Compute disk size requirements in the volume group
277

278
  """
279
  def _compute(disks, payload):
280
    """Universal algorithm.
281

282
    """
283
    vgs = {}
284
    for disk in disks:
285
      vgs[disk[constants.IDISK_VG]] = \
286
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
287

    
288
    return vgs
289

    
290
  # Required free disk space as a function of disk and swap space
291
  req_size_dict = {
292
    constants.DT_DISKLESS: {},
293
    constants.DT_PLAIN: _compute(disks, 0),
294
    # 128 MB are added for drbd metadata for each disk
295
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
296
    constants.DT_FILE: {},
297
    constants.DT_SHARED_FILE: {},
298
    }
299

    
300
  if disk_template not in req_size_dict:
301
    raise errors.ProgrammerError("Disk template '%s' size requirement"
302
                                 " is unknown" % disk_template)
303

    
304
  return req_size_dict[disk_template]
305

    
306

    
307
def ComputeDisks(op, default_vg):
308
  """Computes the instance disks.
309

310
  @param op: The instance opcode
311
  @param default_vg: The default_vg to assume
312

313
  @return: The computed disks
314

315
  """
316
  disks = []
317
  for disk in op.disks:
318
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
319
    if mode not in constants.DISK_ACCESS_SET:
320
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
321
                                 mode, errors.ECODE_INVAL)
322
    size = disk.get(constants.IDISK_SIZE, None)
323
    if size is None:
324
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
325
    try:
326
      size = int(size)
327
    except (TypeError, ValueError):
328
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
329
                                 errors.ECODE_INVAL)
330

    
331
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
332
    if ext_provider and op.disk_template != constants.DT_EXT:
333
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
334
                                 " disk template, not %s" %
335
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
336
                                  op.disk_template), errors.ECODE_INVAL)
337

    
338
    data_vg = disk.get(constants.IDISK_VG, default_vg)
339
    name = disk.get(constants.IDISK_NAME, None)
340
    if name is not None and name.lower() == constants.VALUE_NONE:
341
      name = None
342
    new_disk = {
343
      constants.IDISK_SIZE: size,
344
      constants.IDISK_MODE: mode,
345
      constants.IDISK_VG: data_vg,
346
      constants.IDISK_NAME: name,
347
      }
348

    
349
    for key in [
350
      constants.IDISK_METAVG,
351
      constants.IDISK_ADOPT,
352
      constants.IDISK_SPINDLES,
353
      ]:
354
      if key in disk:
355
        new_disk[key] = disk[key]
356

    
357
    # For extstorage, demand the `provider' option and add any
358
    # additional parameters (ext-params) to the dict
359
    if op.disk_template == constants.DT_EXT:
360
      if ext_provider:
361
        new_disk[constants.IDISK_PROVIDER] = ext_provider
362
        for key in disk:
363
          if key not in constants.IDISK_PARAMS:
364
            new_disk[key] = disk[key]
365
      else:
366
        raise errors.OpPrereqError("Missing provider for template '%s'" %
367
                                   constants.DT_EXT, errors.ECODE_INVAL)
368

    
369
    disks.append(new_disk)
370

    
371
  return disks
372

    
373

    
374
def CheckRADOSFreeSpace():
375
  """Compute disk size requirements inside the RADOS cluster.
376

377
  """
378
  # For the RADOS cluster we assume there is always enough space.
379
  pass
380

    
381

    
382
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
383
                         iv_name, p_minor, s_minor):
384
  """Generate a drbd8 device complete with its children.
385

386
  """
387
  assert len(vgnames) == len(names) == 2
388
  port = lu.cfg.AllocatePort()
389
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
390

    
391
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
392
                          logical_id=(vgnames[0], names[0]),
393
                          params={})
394
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
395
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
396
                          size=constants.DRBD_META_SIZE,
397
                          logical_id=(vgnames[1], names[1]),
398
                          params={})
399
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
400
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
401
                          logical_id=(primary_uuid, secondary_uuid, port,
402
                                      p_minor, s_minor,
403
                                      shared_secret),
404
                          children=[dev_data, dev_meta],
405
                          iv_name=iv_name, params={})
406
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
407
  return drbd_dev
408

    
409

    
410
def GenerateDiskTemplate(
411
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
412
  disk_info, file_storage_dir, file_driver, base_index,
413
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
414
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
415
  """Generate the entire disk layout for a given template type.
416

417
  """
418
  vgname = lu.cfg.GetVGName()
419
  disk_count = len(disk_info)
420
  disks = []
421

    
422
  if template_name == constants.DT_DISKLESS:
423
    pass
424
  elif template_name == constants.DT_DRBD8:
425
    if len(secondary_node_uuids) != 1:
426
      raise errors.ProgrammerError("Wrong template configuration")
427
    remote_node_uuid = secondary_node_uuids[0]
428
    minors = lu.cfg.AllocateDRBDMinor(
429
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
430

    
431
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
432
                                                       full_disk_params)
433
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
434

    
435
    names = []
436
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
437
                                               for i in range(disk_count)]):
438
      names.append(lv_prefix + "_data")
439
      names.append(lv_prefix + "_meta")
440
    for idx, disk in enumerate(disk_info):
441
      disk_index = idx + base_index
442
      data_vg = disk.get(constants.IDISK_VG, vgname)
443
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
444
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
445
                                      disk[constants.IDISK_SIZE],
446
                                      [data_vg, meta_vg],
447
                                      names[idx * 2:idx * 2 + 2],
448
                                      "disk/%d" % disk_index,
449
                                      minors[idx * 2], minors[idx * 2 + 1])
450
      disk_dev.mode = disk[constants.IDISK_MODE]
451
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
452
      disks.append(disk_dev)
453
  else:
454
    if secondary_node_uuids:
455
      raise errors.ProgrammerError("Wrong template configuration")
456

    
457
    if template_name == constants.DT_FILE:
458
      _req_file_storage()
459
    elif template_name == constants.DT_SHARED_FILE:
460
      _req_shr_file_storage()
461

    
462
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
463
    if name_prefix is None:
464
      names = None
465
    else:
466
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
467
                                        (name_prefix, base_index + i)
468
                                        for i in range(disk_count)])
469

    
470
    if template_name == constants.DT_PLAIN:
471

    
472
      def logical_id_fn(idx, _, disk):
473
        vg = disk.get(constants.IDISK_VG, vgname)
474
        return (vg, names[idx])
475

    
476
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
477
      logical_id_fn = \
478
        lambda _, disk_index, disk: (file_driver,
479
                                     "%s/disk%d" % (file_storage_dir,
480
                                                    disk_index))
481
    elif template_name == constants.DT_BLOCK:
482
      logical_id_fn = \
483
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
484
                                       disk[constants.IDISK_ADOPT])
485
    elif template_name == constants.DT_RBD:
486
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
487
    elif template_name == constants.DT_EXT:
488
      def logical_id_fn(idx, _, disk):
489
        provider = disk.get(constants.IDISK_PROVIDER, None)
490
        if provider is None:
491
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
492
                                       " not found", constants.DT_EXT,
493
                                       constants.IDISK_PROVIDER)
494
        return (provider, names[idx])
495
    else:
496
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
497

    
498
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
499

    
500
    for idx, disk in enumerate(disk_info):
501
      params = {}
502
      # Only for the Ext template add disk_info to params
503
      if template_name == constants.DT_EXT:
504
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
505
        for key in disk:
506
          if key not in constants.IDISK_PARAMS:
507
            params[key] = disk[key]
508
      disk_index = idx + base_index
509
      size = disk[constants.IDISK_SIZE]
510
      feedback_fn("* disk %s, size %s" %
511
                  (disk_index, utils.FormatUnit(size, "h")))
512
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
513
                              logical_id=logical_id_fn(idx, disk_index, disk),
514
                              iv_name="disk/%d" % disk_index,
515
                              mode=disk[constants.IDISK_MODE],
516
                              params=params,
517
                              spindles=disk.get(constants.IDISK_SPINDLES))
518
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
519
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
520
      disks.append(disk_dev)
521

    
522
  return disks
523

    
524

    
525
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
526
  """Check the presence of the spindle options with exclusive_storage.
527

528
  @type diskdict: dict
529
  @param diskdict: disk parameters
530
  @type es_flag: bool
531
  @param es_flag: the effective value of the exlusive_storage flag
532
  @type required: bool
533
  @param required: whether spindles are required or just optional
534
  @raise errors.OpPrereqError when spindles are given and they should not
535

536
  """
537
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
538
      diskdict[constants.IDISK_SPINDLES] is not None):
539
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
540
                               " when exclusive storage is not active",
541
                               errors.ECODE_INVAL)
542
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
543
                                diskdict[constants.IDISK_SPINDLES] is None)):
544
    raise errors.OpPrereqError("You must specify spindles in instance disks"
545
                               " when exclusive storage is active",
546
                               errors.ECODE_INVAL)
547

    
548

    
549
class LUInstanceRecreateDisks(LogicalUnit):
550
  """Recreate an instance's missing disks.
551

552
  """
553
  HPATH = "instance-recreate-disks"
554
  HTYPE = constants.HTYPE_INSTANCE
555
  REQ_BGL = False
556

    
557
  _MODIFYABLE = compat.UniqueFrozenset([
558
    constants.IDISK_SIZE,
559
    constants.IDISK_MODE,
560
    constants.IDISK_SPINDLES,
561
    ])
562

    
563
  # New or changed disk parameters may have different semantics
564
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
565
    constants.IDISK_ADOPT,
566

    
567
    # TODO: Implement support changing VG while recreating
568
    constants.IDISK_VG,
569
    constants.IDISK_METAVG,
570
    constants.IDISK_PROVIDER,
571
    constants.IDISK_NAME,
572
    ]))
573

    
574
  def _RunAllocator(self):
575
    """Run the allocator based on input opcode.
576

577
    """
578
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
579

    
580
    # FIXME
581
    # The allocator should actually run in "relocate" mode, but current
582
    # allocators don't support relocating all the nodes of an instance at
583
    # the same time. As a workaround we use "allocate" mode, but this is
584
    # suboptimal for two reasons:
585
    # - The instance name passed to the allocator is present in the list of
586
    #   existing instances, so there could be a conflict within the
587
    #   internal structures of the allocator. This doesn't happen with the
588
    #   current allocators, but it's a liability.
589
    # - The allocator counts the resources used by the instance twice: once
590
    #   because the instance exists already, and once because it tries to
591
    #   allocate a new instance.
592
    # The allocator could choose some of the nodes on which the instance is
593
    # running, but that's not a problem. If the instance nodes are broken,
594
    # they should be already be marked as drained or offline, and hence
595
    # skipped by the allocator. If instance disks have been lost for other
596
    # reasons, then recreating the disks on the same nodes should be fine.
597
    disk_template = self.instance.disk_template
598
    spindle_use = be_full[constants.BE_SPINDLE_USE]
599
    disks = [{
600
      constants.IDISK_SIZE: d.size,
601
      constants.IDISK_MODE: d.mode,
602
      constants.IDISK_SPINDLES: d.spindles,
603
      } for d in self.instance.disks]
604
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
605
                                        disk_template=disk_template,
606
                                        tags=list(self.instance.GetTags()),
607
                                        os=self.instance.os,
608
                                        nics=[{}],
609
                                        vcpus=be_full[constants.BE_VCPUS],
610
                                        memory=be_full[constants.BE_MAXMEM],
611
                                        spindle_use=spindle_use,
612
                                        disks=disks,
613
                                        hypervisor=self.instance.hypervisor,
614
                                        node_whitelist=None)
615
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
616

    
617
    ial.Run(self.op.iallocator)
618

    
619
    assert req.RequiredNodes() == len(self.instance.all_nodes)
620

    
621
    if not ial.success:
622
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
623
                                 " %s" % (self.op.iallocator, ial.info),
624
                                 errors.ECODE_NORES)
625

    
626
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
627
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
628
                 self.op.instance_name, self.op.iallocator,
629
                 utils.CommaJoin(self.op.nodes))
630

    
631
  def CheckArguments(self):
632
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
633
      # Normalize and convert deprecated list of disk indices
634
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
635

    
636
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
637
    if duplicates:
638
      raise errors.OpPrereqError("Some disks have been specified more than"
639
                                 " once: %s" % utils.CommaJoin(duplicates),
640
                                 errors.ECODE_INVAL)
641

    
642
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
643
    # when neither iallocator nor nodes are specified
644
    if self.op.iallocator or self.op.nodes:
645
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
646

    
647
    for (idx, params) in self.op.disks:
648
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
649
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
650
      if unsupported:
651
        raise errors.OpPrereqError("Parameters for disk %s try to change"
652
                                   " unmodifyable parameter(s): %s" %
653
                                   (idx, utils.CommaJoin(unsupported)),
654
                                   errors.ECODE_INVAL)
655

    
656
  def ExpandNames(self):
657
    self._ExpandAndLockInstance()
658
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
659

    
660
    if self.op.nodes:
661
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
662
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
663
    else:
664
      self.needed_locks[locking.LEVEL_NODE] = []
665
      if self.op.iallocator:
666
        # iallocator will select a new node in the same group
667
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
668
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
669

    
670
    self.needed_locks[locking.LEVEL_NODE_RES] = []
671

    
672
  def DeclareLocks(self, level):
673
    if level == locking.LEVEL_NODEGROUP:
674
      assert self.op.iallocator is not None
675
      assert not self.op.nodes
676
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
677
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
678
      # Lock the primary group used by the instance optimistically; this
679
      # requires going via the node before it's locked, requiring
680
      # verification later on
681
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
682
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
683

    
684
    elif level == locking.LEVEL_NODE:
685
      # If an allocator is used, then we lock all the nodes in the current
686
      # instance group, as we don't know yet which ones will be selected;
687
      # if we replace the nodes without using an allocator, locks are
688
      # already declared in ExpandNames; otherwise, we need to lock all the
689
      # instance nodes for disk re-creation
690
      if self.op.iallocator:
691
        assert not self.op.nodes
692
        assert not self.needed_locks[locking.LEVEL_NODE]
693
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
694

    
695
        # Lock member nodes of the group of the primary node
696
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
697
          self.needed_locks[locking.LEVEL_NODE].extend(
698
            self.cfg.GetNodeGroup(group_uuid).members)
699

    
700
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
701
      elif not self.op.nodes:
702
        self._LockInstancesNodes(primary_only=False)
703
    elif level == locking.LEVEL_NODE_RES:
704
      # Copy node locks
705
      self.needed_locks[locking.LEVEL_NODE_RES] = \
706
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
707

    
708
  def BuildHooksEnv(self):
709
    """Build hooks env.
710

711
    This runs on master, primary and secondary nodes of the instance.
712

713
    """
714
    return BuildInstanceHookEnvByObject(self, self.instance)
715

    
716
  def BuildHooksNodes(self):
717
    """Build hooks nodes.
718

719
    """
720
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
721
    return (nl, nl)
722

    
723
  def CheckPrereq(self):
724
    """Check prerequisites.
725

726
    This checks that the instance is in the cluster and is not running.
727

728
    """
729
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
730
    assert instance is not None, \
731
      "Cannot retrieve locked instance %s" % self.op.instance_name
732
    if self.op.node_uuids:
733
      if len(self.op.node_uuids) != len(instance.all_nodes):
734
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
735
                                   " %d replacement nodes were specified" %
736
                                   (instance.name, len(instance.all_nodes),
737
                                    len(self.op.node_uuids)),
738
                                   errors.ECODE_INVAL)
739
      assert instance.disk_template != constants.DT_DRBD8 or \
740
             len(self.op.node_uuids) == 2
741
      assert instance.disk_template != constants.DT_PLAIN or \
742
             len(self.op.node_uuids) == 1
743
      primary_node = self.op.node_uuids[0]
744
    else:
745
      primary_node = instance.primary_node
746
    if not self.op.iallocator:
747
      CheckNodeOnline(self, primary_node)
748

    
749
    if instance.disk_template == constants.DT_DISKLESS:
750
      raise errors.OpPrereqError("Instance '%s' has no disks" %
751
                                 self.op.instance_name, errors.ECODE_INVAL)
752

    
753
    # Verify if node group locks are still correct
754
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
755
    if owned_groups:
756
      # Node group locks are acquired only for the primary node (and only
757
      # when the allocator is used)
758
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
759
                              primary_only=True)
760

    
761
    # if we replace nodes *and* the old primary is offline, we don't
762
    # check the instance state
763
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
764
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
765
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
766
                         msg="cannot recreate disks")
767

    
768
    if self.op.disks:
769
      self.disks = dict(self.op.disks)
770
    else:
771
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
772

    
773
    maxidx = max(self.disks.keys())
774
    if maxidx >= len(instance.disks):
775
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
776
                                 errors.ECODE_INVAL)
777

    
778
    if ((self.op.node_uuids or self.op.iallocator) and
779
         sorted(self.disks.keys()) != range(len(instance.disks))):
780
      raise errors.OpPrereqError("Can't recreate disks partially and"
781
                                 " change the nodes at the same time",
782
                                 errors.ECODE_INVAL)
783

    
784
    self.instance = instance
785

    
786
    if self.op.iallocator:
787
      self._RunAllocator()
788
      # Release unneeded node and node resource locks
789
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
790
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
791
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
792

    
793
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
794

    
795
    if self.op.node_uuids:
796
      node_uuids = self.op.node_uuids
797
    else:
798
      node_uuids = instance.all_nodes
799
    excl_stor = compat.any(
800
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
801
      )
802
    for new_params in self.disks.values():
803
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
804

    
805
  def Exec(self, feedback_fn):
806
    """Recreate the disks.
807

808
    """
809
    assert (self.owned_locks(locking.LEVEL_NODE) ==
810
            self.owned_locks(locking.LEVEL_NODE_RES))
811

    
812
    to_skip = []
813
    mods = [] # keeps track of needed changes
814

    
815
    for idx, disk in enumerate(self.instance.disks):
816
      try:
817
        changes = self.disks[idx]
818
      except KeyError:
819
        # Disk should not be recreated
820
        to_skip.append(idx)
821
        continue
822

    
823
      # update secondaries for disks, if needed
824
      if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
825
        # need to update the nodes and minors
826
        assert len(self.op.node_uuids) == 2
827
        assert len(disk.logical_id) == 6 # otherwise disk internals
828
                                         # have changed
829
        (_, _, old_port, _, _, old_secret) = disk.logical_id
830
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
831
                                                self.instance.uuid)
832
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
833
                  new_minors[0], new_minors[1], old_secret)
834
        assert len(disk.logical_id) == len(new_id)
835
      else:
836
        new_id = None
837

    
838
      mods.append((idx, new_id, changes))
839

    
840
    # now that we have passed all asserts above, we can apply the mods
841
    # in a single run (to avoid partial changes)
842
    for idx, new_id, changes in mods:
843
      disk = self.instance.disks[idx]
844
      if new_id is not None:
845
        assert disk.dev_type == constants.LD_DRBD8
846
        disk.logical_id = new_id
847
      if changes:
848
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
849
                    mode=changes.get(constants.IDISK_MODE, None),
850
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
851

    
852
    # change primary node, if needed
853
    if self.op.node_uuids:
854
      self.instance.primary_node = self.op.node_uuids[0]
855
      self.LogWarning("Changing the instance's nodes, you will have to"
856
                      " remove any disks left on the older nodes manually")
857

    
858
    if self.op.node_uuids:
859
      self.cfg.Update(self.instance, feedback_fn)
860

    
861
    # All touched nodes must be locked
862
    mylocks = self.owned_locks(locking.LEVEL_NODE)
863
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
864
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
865

    
866
    # TODO: Release node locks before wiping, or explain why it's not possible
867
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
868
      wipedisks = [(idx, disk, 0)
869
                   for (idx, disk) in enumerate(self.instance.disks)
870
                   if idx not in to_skip]
871
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
872
                         cleanup=new_disks)
873

    
874

    
875
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
876
  """Checks if nodes have enough free disk space in the specified VG.
877

878
  This function checks if all given nodes have the needed amount of
879
  free disk. In case any node has less disk or we cannot get the
880
  information from the node, this function raises an OpPrereqError
881
  exception.
882

883
  @type lu: C{LogicalUnit}
884
  @param lu: a logical unit from which we get configuration data
885
  @type node_uuids: C{list}
886
  @param node_uuids: the list of node UUIDs to check
887
  @type vg: C{str}
888
  @param vg: the volume group to check
889
  @type requested: C{int}
890
  @param requested: the amount of disk in MiB to check for
891
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
892
      or we cannot check the node
893

894
  """
895
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
896
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
897
                                                  node_uuids)
898
  hvname = lu.cfg.GetHypervisorType()
899
  hvparams = lu.cfg.GetClusterInfo().hvparams
900
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
901
                                   [(hvname, hvparams[hvname])])
902
  for node in node_uuids:
903
    node_name = lu.cfg.GetNodeName(node)
904

    
905
    info = nodeinfo[node]
906
    info.Raise("Cannot get current information from node %s" % node_name,
907
               prereq=True, ecode=errors.ECODE_ENVIRON)
908
    (_, space_info, _) = info.payload
909
    lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
910
        space_info, constants.ST_LVM_VG)
911
    if not lvm_vg_info:
912
      raise errors.OpPrereqError("Can't retrieve storage information for LVM")
913
    vg_free = lvm_vg_info.get("storage_free", None)
914
    if not isinstance(vg_free, int):
915
      raise errors.OpPrereqError("Can't compute free disk space on node"
916
                                 " %s for vg %s, result was '%s'" %
917
                                 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
918
    if requested > vg_free:
919
      raise errors.OpPrereqError("Not enough disk space on target node %s"
920
                                 " vg %s: required %d MiB, available %d MiB" %
921
                                 (node_name, vg, requested, vg_free),
922
                                 errors.ECODE_NORES)
923

    
924

    
925
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
926
  """Checks if nodes have enough free disk space in all the VGs.
927

928
  This function checks if all given nodes have the needed amount of
929
  free disk. In case any node has less disk or we cannot get the
930
  information from the node, this function raises an OpPrereqError
931
  exception.
932

933
  @type lu: C{LogicalUnit}
934
  @param lu: a logical unit from which we get configuration data
935
  @type node_uuids: C{list}
936
  @param node_uuids: the list of node UUIDs to check
937
  @type req_sizes: C{dict}
938
  @param req_sizes: the hash of vg and corresponding amount of disk in
939
      MiB to check for
940
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
941
      or we cannot check the node
942

943
  """
944
  for vg, req_size in req_sizes.items():
945
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
946

    
947

    
948
def _DiskSizeInBytesToMebibytes(lu, size):
949
  """Converts a disk size in bytes to mebibytes.
950

951
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
952

953
  """
954
  (mib, remainder) = divmod(size, 1024 * 1024)
955

    
956
  if remainder != 0:
957
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
958
                  " to not overwrite existing data (%s bytes will not be"
959
                  " wiped)", (1024 * 1024) - remainder)
960
    mib += 1
961

    
962
  return mib
963

    
964

    
965
def _CalcEta(time_taken, written, total_size):
966
  """Calculates the ETA based on size written and total size.
967

968
  @param time_taken: The time taken so far
969
  @param written: amount written so far
970
  @param total_size: The total size of data to be written
971
  @return: The remaining time in seconds
972

973
  """
974
  avg_time = time_taken / float(written)
975
  return (total_size - written) * avg_time
976

    
977

    
978
def WipeDisks(lu, instance, disks=None):
979
  """Wipes instance disks.
980

981
  @type lu: L{LogicalUnit}
982
  @param lu: the logical unit on whose behalf we execute
983
  @type instance: L{objects.Instance}
984
  @param instance: the instance whose disks we should create
985
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
986
  @param disks: Disk details; tuple contains disk index, disk object and the
987
    start offset
988

989
  """
990
  node_uuid = instance.primary_node
991
  node_name = lu.cfg.GetNodeName(node_uuid)
992

    
993
  if disks is None:
994
    disks = [(idx, disk, 0)
995
             for (idx, disk) in enumerate(instance.disks)]
996

    
997
  for (_, device, _) in disks:
998
    lu.cfg.SetDiskID(device, node_uuid)
999

    
1000
  logging.info("Pausing synchronization of disks of instance '%s'",
1001
               instance.name)
1002
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1003
                                                  (map(compat.snd, disks),
1004
                                                   instance),
1005
                                                  True)
1006
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1007

    
1008
  for idx, success in enumerate(result.payload):
1009
    if not success:
1010
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1011
                   " failed", idx, instance.name)
1012

    
1013
  try:
1014
    for (idx, device, offset) in disks:
1015
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1016
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1017
      wipe_chunk_size = \
1018
        int(min(constants.MAX_WIPE_CHUNK,
1019
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1020

    
1021
      size = device.size
1022
      last_output = 0
1023
      start_time = time.time()
1024

    
1025
      if offset == 0:
1026
        info_text = ""
1027
      else:
1028
        info_text = (" (from %s to %s)" %
1029
                     (utils.FormatUnit(offset, "h"),
1030
                      utils.FormatUnit(size, "h")))
1031

    
1032
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1033

    
1034
      logging.info("Wiping disk %d for instance %s on node %s using"
1035
                   " chunk size %s", idx, instance.name, node_name,
1036
                   wipe_chunk_size)
1037

    
1038
      while offset < size:
1039
        wipe_size = min(wipe_chunk_size, size - offset)
1040

    
1041
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1042
                      idx, offset, wipe_size)
1043

    
1044
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1045
                                           offset, wipe_size)
1046
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1047
                     (idx, offset, wipe_size))
1048

    
1049
        now = time.time()
1050
        offset += wipe_size
1051
        if now - last_output >= 60:
1052
          eta = _CalcEta(now - start_time, offset, size)
1053
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1054
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1055
          last_output = now
1056
  finally:
1057
    logging.info("Resuming synchronization of disks for instance '%s'",
1058
                 instance.name)
1059

    
1060
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1061
                                                    (map(compat.snd, disks),
1062
                                                     instance),
1063
                                                    False)
1064

    
1065
    if result.fail_msg:
1066
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1067
                    node_name, result.fail_msg)
1068
    else:
1069
      for idx, success in enumerate(result.payload):
1070
        if not success:
1071
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1072
                        " failed", idx, instance.name)
1073

    
1074

    
1075
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1076
  """Wrapper for L{WipeDisks} that handles errors.
1077

1078
  @type lu: L{LogicalUnit}
1079
  @param lu: the logical unit on whose behalf we execute
1080
  @type instance: L{objects.Instance}
1081
  @param instance: the instance whose disks we should wipe
1082
  @param disks: see L{WipeDisks}
1083
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1084
      case of error
1085
  @raise errors.OpPrereqError: in case of failure
1086

1087
  """
1088
  try:
1089
    WipeDisks(lu, instance, disks=disks)
1090
  except errors.OpExecError:
1091
    logging.warning("Wiping disks for instance '%s' failed",
1092
                    instance.name)
1093
    _UndoCreateDisks(lu, cleanup)
1094
    raise
1095

    
1096

    
1097
def ExpandCheckDisks(instance, disks):
1098
  """Return the instance disks selected by the disks list
1099

1100
  @type disks: list of L{objects.Disk} or None
1101
  @param disks: selected disks
1102
  @rtype: list of L{objects.Disk}
1103
  @return: selected instance disks to act on
1104

1105
  """
1106
  if disks is None:
1107
    return instance.disks
1108
  else:
1109
    if not set(disks).issubset(instance.disks):
1110
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1111
                                   " target instance: expected a subset of %r,"
1112
                                   " got %r" % (instance.disks, disks))
1113
    return disks
1114

    
1115

    
1116
def WaitForSync(lu, instance, disks=None, oneshot=False):
1117
  """Sleep and poll for an instance's disk to sync.
1118

1119
  """
1120
  if not instance.disks or disks is not None and not disks:
1121
    return True
1122

    
1123
  disks = ExpandCheckDisks(instance, disks)
1124

    
1125
  if not oneshot:
1126
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1127

    
1128
  node_uuid = instance.primary_node
1129
  node_name = lu.cfg.GetNodeName(node_uuid)
1130

    
1131
  for dev in disks:
1132
    lu.cfg.SetDiskID(dev, node_uuid)
1133

    
1134
  # TODO: Convert to utils.Retry
1135

    
1136
  retries = 0
1137
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1138
  while True:
1139
    max_time = 0
1140
    done = True
1141
    cumul_degraded = False
1142
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1143
    msg = rstats.fail_msg
1144
    if msg:
1145
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1146
      retries += 1
1147
      if retries >= 10:
1148
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1149
                                 " aborting." % node_name)
1150
      time.sleep(6)
1151
      continue
1152
    rstats = rstats.payload
1153
    retries = 0
1154
    for i, mstat in enumerate(rstats):
1155
      if mstat is None:
1156
        lu.LogWarning("Can't compute data for node %s/%s",
1157
                      node_name, disks[i].iv_name)
1158
        continue
1159

    
1160
      cumul_degraded = (cumul_degraded or
1161
                        (mstat.is_degraded and mstat.sync_percent is None))
1162
      if mstat.sync_percent is not None:
1163
        done = False
1164
        if mstat.estimated_time is not None:
1165
          rem_time = ("%s remaining (estimated)" %
1166
                      utils.FormatSeconds(mstat.estimated_time))
1167
          max_time = mstat.estimated_time
1168
        else:
1169
          rem_time = "no time estimate"
1170
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1171
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1172

    
1173
    # if we're done but degraded, let's do a few small retries, to
1174
    # make sure we see a stable and not transient situation; therefore
1175
    # we force restart of the loop
1176
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1177
      logging.info("Degraded disks found, %d retries left", degr_retries)
1178
      degr_retries -= 1
1179
      time.sleep(1)
1180
      continue
1181

    
1182
    if done or oneshot:
1183
      break
1184

    
1185
    time.sleep(min(60, max_time))
1186

    
1187
  if done:
1188
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1189

    
1190
  return not cumul_degraded
1191

    
1192

    
1193
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1194
  """Shutdown block devices of an instance.
1195

1196
  This does the shutdown on all nodes of the instance.
1197

1198
  If the ignore_primary is false, errors on the primary node are
1199
  ignored.
1200

1201
  """
1202
  lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1203
  all_result = True
1204
  disks = ExpandCheckDisks(instance, disks)
1205

    
1206
  for disk in disks:
1207
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1208
      lu.cfg.SetDiskID(top_disk, node_uuid)
1209
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1210
      msg = result.fail_msg
1211
      if msg:
1212
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1213
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1214
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1215
            (node_uuid != instance.primary_node and not result.offline)):
1216
          all_result = False
1217
  return all_result
1218

    
1219

    
1220
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1221
  """Shutdown block devices of an instance.
1222

1223
  This function checks if an instance is running, before calling
1224
  _ShutdownInstanceDisks.
1225

1226
  """
1227
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1228
  ShutdownInstanceDisks(lu, instance, disks=disks)
1229

    
1230

    
1231
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1232
                           ignore_size=False):
1233
  """Prepare the block devices for an instance.
1234

1235
  This sets up the block devices on all nodes.
1236

1237
  @type lu: L{LogicalUnit}
1238
  @param lu: the logical unit on whose behalf we execute
1239
  @type instance: L{objects.Instance}
1240
  @param instance: the instance for whose disks we assemble
1241
  @type disks: list of L{objects.Disk} or None
1242
  @param disks: which disks to assemble (or all, if None)
1243
  @type ignore_secondaries: boolean
1244
  @param ignore_secondaries: if true, errors on secondary nodes
1245
      won't result in an error return from the function
1246
  @type ignore_size: boolean
1247
  @param ignore_size: if true, the current known size of the disk
1248
      will not be used during the disk activation, useful for cases
1249
      when the size is wrong
1250
  @return: False if the operation failed, otherwise a list of
1251
      (host, instance_visible_name, node_visible_name)
1252
      with the mapping from node devices to instance devices
1253

1254
  """
1255
  device_info = []
1256
  disks_ok = True
1257
  disks = ExpandCheckDisks(instance, disks)
1258

    
1259
  # With the two passes mechanism we try to reduce the window of
1260
  # opportunity for the race condition of switching DRBD to primary
1261
  # before handshaking occured, but we do not eliminate it
1262

    
1263
  # The proper fix would be to wait (with some limits) until the
1264
  # connection has been made and drbd transitions from WFConnection
1265
  # into any other network-connected state (Connected, SyncTarget,
1266
  # SyncSource, etc.)
1267

    
1268
  # mark instance disks as active before doing actual work, so watcher does
1269
  # not try to shut them down erroneously
1270
  lu.cfg.MarkInstanceDisksActive(instance.uuid)
1271

    
1272
  # 1st pass, assemble on all nodes in secondary mode
1273
  for idx, inst_disk in enumerate(disks):
1274
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1275
                                  instance.primary_node):
1276
      if ignore_size:
1277
        node_disk = node_disk.Copy()
1278
        node_disk.UnsetSize()
1279
      lu.cfg.SetDiskID(node_disk, node_uuid)
1280
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1281
                                             instance.name, False, idx)
1282
      msg = result.fail_msg
1283
      if msg:
1284
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1285
                                result.offline)
1286
        lu.LogWarning("Could not prepare block device %s on node %s"
1287
                      " (is_primary=False, pass=1): %s",
1288
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1289
        if not (ignore_secondaries or is_offline_secondary):
1290
          disks_ok = False
1291

    
1292
  # FIXME: race condition on drbd migration to primary
1293

    
1294
  # 2nd pass, do only the primary node
1295
  for idx, inst_disk in enumerate(disks):
1296
    dev_path = None
1297

    
1298
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1299
                                  instance.primary_node):
1300
      if node_uuid != instance.primary_node:
1301
        continue
1302
      if ignore_size:
1303
        node_disk = node_disk.Copy()
1304
        node_disk.UnsetSize()
1305
      lu.cfg.SetDiskID(node_disk, node_uuid)
1306
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1307
                                             instance.name, True, idx)
1308
      msg = result.fail_msg
1309
      if msg:
1310
        lu.LogWarning("Could not prepare block device %s on node %s"
1311
                      " (is_primary=True, pass=2): %s",
1312
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1313
        disks_ok = False
1314
      else:
1315
        dev_path = result.payload
1316

    
1317
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1318
                        inst_disk.iv_name, dev_path))
1319

    
1320
  # leave the disks configured for the primary node
1321
  # this is a workaround that would be fixed better by
1322
  # improving the logical/physical id handling
1323
  for disk in disks:
1324
    lu.cfg.SetDiskID(disk, instance.primary_node)
1325

    
1326
  if not disks_ok:
1327
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1328

    
1329
  return disks_ok, device_info
1330

    
1331

    
1332
def StartInstanceDisks(lu, instance, force):
1333
  """Start the disks of an instance.
1334

1335
  """
1336
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1337
                                      ignore_secondaries=force)
1338
  if not disks_ok:
1339
    ShutdownInstanceDisks(lu, instance)
1340
    if force is not None and not force:
1341
      lu.LogWarning("",
1342
                    hint=("If the message above refers to a secondary node,"
1343
                          " you can retry the operation using '--force'"))
1344
    raise errors.OpExecError("Disk consistency error")
1345

    
1346

    
1347
class LUInstanceGrowDisk(LogicalUnit):
1348
  """Grow a disk of an instance.
1349

1350
  """
1351
  HPATH = "disk-grow"
1352
  HTYPE = constants.HTYPE_INSTANCE
1353
  REQ_BGL = False
1354

    
1355
  def ExpandNames(self):
1356
    self._ExpandAndLockInstance()
1357
    self.needed_locks[locking.LEVEL_NODE] = []
1358
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1359
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1360
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1361

    
1362
  def DeclareLocks(self, level):
1363
    if level == locking.LEVEL_NODE:
1364
      self._LockInstancesNodes()
1365
    elif level == locking.LEVEL_NODE_RES:
1366
      # Copy node locks
1367
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1368
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1369

    
1370
  def BuildHooksEnv(self):
1371
    """Build hooks env.
1372

1373
    This runs on the master, the primary and all the secondaries.
1374

1375
    """
1376
    env = {
1377
      "DISK": self.op.disk,
1378
      "AMOUNT": self.op.amount,
1379
      "ABSOLUTE": self.op.absolute,
1380
      }
1381
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1382
    return env
1383

    
1384
  def BuildHooksNodes(self):
1385
    """Build hooks nodes.
1386

1387
    """
1388
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1389
    return (nl, nl)
1390

    
1391
  def CheckPrereq(self):
1392
    """Check prerequisites.
1393

1394
    This checks that the instance is in the cluster.
1395

1396
    """
1397
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1398
    assert self.instance is not None, \
1399
      "Cannot retrieve locked instance %s" % self.op.instance_name
1400
    node_uuids = list(self.instance.all_nodes)
1401
    for node_uuid in node_uuids:
1402
      CheckNodeOnline(self, node_uuid)
1403
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1404

    
1405
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1406
      raise errors.OpPrereqError("Instance's disk layout does not support"
1407
                                 " growing", errors.ECODE_INVAL)
1408

    
1409
    self.disk = self.instance.FindDisk(self.op.disk)
1410

    
1411
    if self.op.absolute:
1412
      self.target = self.op.amount
1413
      self.delta = self.target - self.disk.size
1414
      if self.delta < 0:
1415
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1416
                                   "current disk size (%s)" %
1417
                                   (utils.FormatUnit(self.target, "h"),
1418
                                    utils.FormatUnit(self.disk.size, "h")),
1419
                                   errors.ECODE_STATE)
1420
    else:
1421
      self.delta = self.op.amount
1422
      self.target = self.disk.size + self.delta
1423
      if self.delta < 0:
1424
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1425
                                   utils.FormatUnit(self.delta, "h"),
1426
                                   errors.ECODE_INVAL)
1427

    
1428
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1429

    
1430
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1431
    template = self.instance.disk_template
1432
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1433
        not any(self.node_es_flags.values())):
1434
      # TODO: check the free disk space for file, when that feature will be
1435
      # supported
1436
      # With exclusive storage we need to do something smarter than just looking
1437
      # at free space, which, in the end, is basically a dry run. So we rely on
1438
      # the dry run performed in Exec() instead.
1439
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1440

    
1441
  def Exec(self, feedback_fn):
1442
    """Execute disk grow.
1443

1444
    """
1445
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1446
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1447
            self.owned_locks(locking.LEVEL_NODE_RES))
1448

    
1449
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1450

    
1451
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1452
    if not disks_ok:
1453
      raise errors.OpExecError("Cannot activate block device to grow")
1454

    
1455
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1456
                (self.op.disk, self.instance.name,
1457
                 utils.FormatUnit(self.delta, "h"),
1458
                 utils.FormatUnit(self.target, "h")))
1459

    
1460
    # First run all grow ops in dry-run mode
1461
    for node_uuid in self.instance.all_nodes:
1462
      self.cfg.SetDiskID(self.disk, node_uuid)
1463
      result = self.rpc.call_blockdev_grow(node_uuid,
1464
                                           (self.disk, self.instance),
1465
                                           self.delta, True, True,
1466
                                           self.node_es_flags[node_uuid])
1467
      result.Raise("Dry-run grow request failed to node %s" %
1468
                   self.cfg.GetNodeName(node_uuid))
1469

    
1470
    if wipe_disks:
1471
      # Get disk size from primary node for wiping
1472
      self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1473
      result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1474
                                                    [self.disk])
1475
      result.Raise("Failed to retrieve disk size from node '%s'" %
1476
                   self.instance.primary_node)
1477

    
1478
      (disk_dimensions, ) = result.payload
1479

    
1480
      if disk_dimensions is None:
1481
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1482
                                 " node '%s'" % self.instance.primary_node)
1483
      (disk_size_in_bytes, _) = disk_dimensions
1484

    
1485
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1486

    
1487
      assert old_disk_size >= self.disk.size, \
1488
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1489
         (old_disk_size, self.disk.size))
1490
    else:
1491
      old_disk_size = None
1492

    
1493
    # We know that (as far as we can test) operations across different
1494
    # nodes will succeed, time to run it for real on the backing storage
1495
    for node_uuid in self.instance.all_nodes:
1496
      self.cfg.SetDiskID(self.disk, node_uuid)
1497
      result = self.rpc.call_blockdev_grow(node_uuid,
1498
                                           (self.disk, self.instance),
1499
                                           self.delta, False, True,
1500
                                           self.node_es_flags[node_uuid])
1501
      result.Raise("Grow request failed to node %s" %
1502
                   self.cfg.GetNodeName(node_uuid))
1503

    
1504
    # And now execute it for logical storage, on the primary node
1505
    node_uuid = self.instance.primary_node
1506
    self.cfg.SetDiskID(self.disk, node_uuid)
1507
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1508
                                         self.delta, False, False,
1509
                                         self.node_es_flags[node_uuid])
1510
    result.Raise("Grow request failed to node %s" %
1511
                 self.cfg.GetNodeName(node_uuid))
1512

    
1513
    self.disk.RecordGrow(self.delta)
1514
    self.cfg.Update(self.instance, feedback_fn)
1515

    
1516
    # Changes have been recorded, release node lock
1517
    ReleaseLocks(self, locking.LEVEL_NODE)
1518

    
1519
    # Downgrade lock while waiting for sync
1520
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1521

    
1522
    assert wipe_disks ^ (old_disk_size is None)
1523

    
1524
    if wipe_disks:
1525
      assert self.instance.disks[self.op.disk] == self.disk
1526

    
1527
      # Wipe newly added disk space
1528
      WipeDisks(self, self.instance,
1529
                disks=[(self.op.disk, self.disk, old_disk_size)])
1530

    
1531
    if self.op.wait_for_sync:
1532
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1533
      if disk_abort:
1534
        self.LogWarning("Disk syncing has not returned a good status; check"
1535
                        " the instance")
1536
      if not self.instance.disks_active:
1537
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1538
    elif not self.instance.disks_active:
1539
      self.LogWarning("Not shutting down the disk even if the instance is"
1540
                      " not supposed to be running because no wait for"
1541
                      " sync mode was requested")
1542

    
1543
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1544
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1545

    
1546

    
1547
class LUInstanceReplaceDisks(LogicalUnit):
1548
  """Replace the disks of an instance.
1549

1550
  """
1551
  HPATH = "mirrors-replace"
1552
  HTYPE = constants.HTYPE_INSTANCE
1553
  REQ_BGL = False
1554

    
1555
  def CheckArguments(self):
1556
    """Check arguments.
1557

1558
    """
1559
    if self.op.mode == constants.REPLACE_DISK_CHG:
1560
      if self.op.remote_node is None and self.op.iallocator is None:
1561
        raise errors.OpPrereqError("When changing the secondary either an"
1562
                                   " iallocator script must be used or the"
1563
                                   " new node given", errors.ECODE_INVAL)
1564
      else:
1565
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1566

    
1567
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1568
      # Not replacing the secondary
1569
      raise errors.OpPrereqError("The iallocator and new node options can"
1570
                                 " only be used when changing the"
1571
                                 " secondary node", errors.ECODE_INVAL)
1572

    
1573
  def ExpandNames(self):
1574
    self._ExpandAndLockInstance()
1575

    
1576
    assert locking.LEVEL_NODE not in self.needed_locks
1577
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1578
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1579

    
1580
    assert self.op.iallocator is None or self.op.remote_node is None, \
1581
      "Conflicting options"
1582

    
1583
    if self.op.remote_node is not None:
1584
      (self.op.remote_node_uuid, self.op.remote_node) = \
1585
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1586
                              self.op.remote_node)
1587

    
1588
      # Warning: do not remove the locking of the new secondary here
1589
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1590
      # currently it doesn't since parallel invocations of
1591
      # FindUnusedMinor will conflict
1592
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1593
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1594
    else:
1595
      self.needed_locks[locking.LEVEL_NODE] = []
1596
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1597

    
1598
      if self.op.iallocator is not None:
1599
        # iallocator will select a new node in the same group
1600
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1601
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1602

    
1603
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1604

    
1605
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1606
                                   self.op.instance_name, self.op.mode,
1607
                                   self.op.iallocator, self.op.remote_node_uuid,
1608
                                   self.op.disks, self.op.early_release,
1609
                                   self.op.ignore_ipolicy)
1610

    
1611
    self.tasklets = [self.replacer]
1612

    
1613
  def DeclareLocks(self, level):
1614
    if level == locking.LEVEL_NODEGROUP:
1615
      assert self.op.remote_node_uuid is None
1616
      assert self.op.iallocator is not None
1617
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1618

    
1619
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1620
      # Lock all groups used by instance optimistically; this requires going
1621
      # via the node before it's locked, requiring verification later on
1622
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1623
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1624

    
1625
    elif level == locking.LEVEL_NODE:
1626
      if self.op.iallocator is not None:
1627
        assert self.op.remote_node_uuid is None
1628
        assert not self.needed_locks[locking.LEVEL_NODE]
1629
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1630

    
1631
        # Lock member nodes of all locked groups
1632
        self.needed_locks[locking.LEVEL_NODE] = \
1633
          [node_uuid
1634
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1635
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1636
      else:
1637
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1638

    
1639
        self._LockInstancesNodes()
1640

    
1641
    elif level == locking.LEVEL_NODE_RES:
1642
      # Reuse node locks
1643
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1644
        self.needed_locks[locking.LEVEL_NODE]
1645

    
1646
  def BuildHooksEnv(self):
1647
    """Build hooks env.
1648

1649
    This runs on the master, the primary and all the secondaries.
1650

1651
    """
1652
    instance = self.replacer.instance
1653
    env = {
1654
      "MODE": self.op.mode,
1655
      "NEW_SECONDARY": self.op.remote_node,
1656
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1657
      }
1658
    env.update(BuildInstanceHookEnvByObject(self, instance))
1659
    return env
1660

    
1661
  def BuildHooksNodes(self):
1662
    """Build hooks nodes.
1663

1664
    """
1665
    instance = self.replacer.instance
1666
    nl = [
1667
      self.cfg.GetMasterNode(),
1668
      instance.primary_node,
1669
      ]
1670
    if self.op.remote_node_uuid is not None:
1671
      nl.append(self.op.remote_node_uuid)
1672
    return nl, nl
1673

    
1674
  def CheckPrereq(self):
1675
    """Check prerequisites.
1676

1677
    """
1678
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1679
            self.op.iallocator is None)
1680

    
1681
    # Verify if node group locks are still correct
1682
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1683
    if owned_groups:
1684
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1685

    
1686
    return LogicalUnit.CheckPrereq(self)
1687

    
1688

    
1689
class LUInstanceActivateDisks(NoHooksLU):
1690
  """Bring up an instance's disks.
1691

1692
  """
1693
  REQ_BGL = False
1694

    
1695
  def ExpandNames(self):
1696
    self._ExpandAndLockInstance()
1697
    self.needed_locks[locking.LEVEL_NODE] = []
1698
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1699

    
1700
  def DeclareLocks(self, level):
1701
    if level == locking.LEVEL_NODE:
1702
      self._LockInstancesNodes()
1703

    
1704
  def CheckPrereq(self):
1705
    """Check prerequisites.
1706

1707
    This checks that the instance is in the cluster.
1708

1709
    """
1710
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1711
    assert self.instance is not None, \
1712
      "Cannot retrieve locked instance %s" % self.op.instance_name
1713
    CheckNodeOnline(self, self.instance.primary_node)
1714

    
1715
  def Exec(self, feedback_fn):
1716
    """Activate the disks.
1717

1718
    """
1719
    disks_ok, disks_info = \
1720
              AssembleInstanceDisks(self, self.instance,
1721
                                    ignore_size=self.op.ignore_size)
1722
    if not disks_ok:
1723
      raise errors.OpExecError("Cannot activate block devices")
1724

    
1725
    if self.op.wait_for_sync:
1726
      if not WaitForSync(self, self.instance):
1727
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1728
        raise errors.OpExecError("Some disks of the instance are degraded!")
1729

    
1730
    return disks_info
1731

    
1732

    
1733
class LUInstanceDeactivateDisks(NoHooksLU):
1734
  """Shutdown an instance's disks.
1735

1736
  """
1737
  REQ_BGL = False
1738

    
1739
  def ExpandNames(self):
1740
    self._ExpandAndLockInstance()
1741
    self.needed_locks[locking.LEVEL_NODE] = []
1742
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1743

    
1744
  def DeclareLocks(self, level):
1745
    if level == locking.LEVEL_NODE:
1746
      self._LockInstancesNodes()
1747

    
1748
  def CheckPrereq(self):
1749
    """Check prerequisites.
1750

1751
    This checks that the instance is in the cluster.
1752

1753
    """
1754
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1755
    assert self.instance is not None, \
1756
      "Cannot retrieve locked instance %s" % self.op.instance_name
1757

    
1758
  def Exec(self, feedback_fn):
1759
    """Deactivate the disks
1760

1761
    """
1762
    if self.op.force:
1763
      ShutdownInstanceDisks(self, self.instance)
1764
    else:
1765
      _SafeShutdownInstanceDisks(self, self.instance)
1766

    
1767

    
1768
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1769
                               ldisk=False):
1770
  """Check that mirrors are not degraded.
1771

1772
  @attention: The device has to be annotated already.
1773

1774
  The ldisk parameter, if True, will change the test from the
1775
  is_degraded attribute (which represents overall non-ok status for
1776
  the device(s)) to the ldisk (representing the local storage status).
1777

1778
  """
1779
  lu.cfg.SetDiskID(dev, node_uuid)
1780

    
1781
  result = True
1782

    
1783
  if on_primary or dev.AssembleOnSecondary():
1784
    rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1785
    msg = rstats.fail_msg
1786
    if msg:
1787
      lu.LogWarning("Can't find disk on node %s: %s",
1788
                    lu.cfg.GetNodeName(node_uuid), msg)
1789
      result = False
1790
    elif not rstats.payload:
1791
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1792
      result = False
1793
    else:
1794
      if ldisk:
1795
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1796
      else:
1797
        result = result and not rstats.payload.is_degraded
1798

    
1799
  if dev.children:
1800
    for child in dev.children:
1801
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1802
                                                     node_uuid, on_primary)
1803

    
1804
  return result
1805

    
1806

    
1807
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1808
  """Wrapper around L{_CheckDiskConsistencyInner}.
1809

1810
  """
1811
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1812
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1813
                                    ldisk=ldisk)
1814

    
1815

    
1816
def _BlockdevFind(lu, node_uuid, dev, instance):
1817
  """Wrapper around call_blockdev_find to annotate diskparams.
1818

1819
  @param lu: A reference to the lu object
1820
  @param node_uuid: The node to call out
1821
  @param dev: The device to find
1822
  @param instance: The instance object the device belongs to
1823
  @returns The result of the rpc call
1824

1825
  """
1826
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1827
  return lu.rpc.call_blockdev_find(node_uuid, disk)
1828

    
1829

    
1830
def _GenerateUniqueNames(lu, exts):
1831
  """Generate a suitable LV name.
1832

1833
  This will generate a logical volume name for the given instance.
1834

1835
  """
1836
  results = []
1837
  for val in exts:
1838
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1839
    results.append("%s%s" % (new_id, val))
1840
  return results
1841

    
1842

    
1843
class TLReplaceDisks(Tasklet):
1844
  """Replaces disks for an instance.
1845

1846
  Note: Locking is not within the scope of this class.
1847

1848
  """
1849
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1850
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1851
    """Initializes this class.
1852

1853
    """
1854
    Tasklet.__init__(self, lu)
1855

    
1856
    # Parameters
1857
    self.instance_uuid = instance_uuid
1858
    self.instance_name = instance_name
1859
    self.mode = mode
1860
    self.iallocator_name = iallocator_name
1861
    self.remote_node_uuid = remote_node_uuid
1862
    self.disks = disks
1863
    self.early_release = early_release
1864
    self.ignore_ipolicy = ignore_ipolicy
1865

    
1866
    # Runtime data
1867
    self.instance = None
1868
    self.new_node_uuid = None
1869
    self.target_node_uuid = None
1870
    self.other_node_uuid = None
1871
    self.remote_node_info = None
1872
    self.node_secondary_ip = None
1873

    
1874
  @staticmethod
1875
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1876
                    relocate_from_node_uuids):
1877
    """Compute a new secondary node using an IAllocator.
1878

1879
    """
1880
    req = iallocator.IAReqRelocate(
1881
          inst_uuid=instance_uuid,
1882
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1883
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1884

    
1885
    ial.Run(iallocator_name)
1886

    
1887
    if not ial.success:
1888
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1889
                                 " %s" % (iallocator_name, ial.info),
1890
                                 errors.ECODE_NORES)
1891

    
1892
    remote_node_name = ial.result[0]
1893
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1894

    
1895
    if remote_node is None:
1896
      raise errors.OpPrereqError("Node %s not found in configuration" %
1897
                                 remote_node_name, errors.ECODE_NOENT)
1898

    
1899
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1900
               instance_uuid, remote_node_name)
1901

    
1902
    return remote_node.uuid
1903

    
1904
  def _FindFaultyDisks(self, node_uuid):
1905
    """Wrapper for L{FindFaultyInstanceDisks}.
1906

1907
    """
1908
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1909
                                   node_uuid, True)
1910

    
1911
  def _CheckDisksActivated(self, instance):
1912
    """Checks if the instance disks are activated.
1913

1914
    @param instance: The instance to check disks
1915
    @return: True if they are activated, False otherwise
1916

1917
    """
1918
    node_uuids = instance.all_nodes
1919

    
1920
    for idx, dev in enumerate(instance.disks):
1921
      for node_uuid in node_uuids:
1922
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1923
                        self.cfg.GetNodeName(node_uuid))
1924
        self.cfg.SetDiskID(dev, node_uuid)
1925

    
1926
        result = _BlockdevFind(self, node_uuid, dev, instance)
1927

    
1928
        if result.offline:
1929
          continue
1930
        elif result.fail_msg or not result.payload:
1931
          return False
1932

    
1933
    return True
1934

    
1935
  def CheckPrereq(self):
1936
    """Check prerequisites.
1937

1938
    This checks that the instance is in the cluster.
1939

1940
    """
1941
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1942
    assert self.instance is not None, \
1943
      "Cannot retrieve locked instance %s" % self.instance_name
1944

    
1945
    if self.instance.disk_template != constants.DT_DRBD8:
1946
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1947
                                 " instances", errors.ECODE_INVAL)
1948

    
1949
    if len(self.instance.secondary_nodes) != 1:
1950
      raise errors.OpPrereqError("The instance has a strange layout,"
1951
                                 " expected one secondary but found %d" %
1952
                                 len(self.instance.secondary_nodes),
1953
                                 errors.ECODE_FAULT)
1954

    
1955
    secondary_node_uuid = self.instance.secondary_nodes[0]
1956

    
1957
    if self.iallocator_name is None:
1958
      remote_node_uuid = self.remote_node_uuid
1959
    else:
1960
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1961
                                            self.instance.uuid,
1962
                                            self.instance.secondary_nodes)
1963

    
1964
    if remote_node_uuid is None:
1965
      self.remote_node_info = None
1966
    else:
1967
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1968
             "Remote node '%s' is not locked" % remote_node_uuid
1969

    
1970
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1971
      assert self.remote_node_info is not None, \
1972
        "Cannot retrieve locked node %s" % remote_node_uuid
1973

    
1974
    if remote_node_uuid == self.instance.primary_node:
1975
      raise errors.OpPrereqError("The specified node is the primary node of"
1976
                                 " the instance", errors.ECODE_INVAL)
1977

    
1978
    if remote_node_uuid == secondary_node_uuid:
1979
      raise errors.OpPrereqError("The specified node is already the"
1980
                                 " secondary node of the instance",
1981
                                 errors.ECODE_INVAL)
1982

    
1983
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1984
                                    constants.REPLACE_DISK_CHG):
1985
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1986
                                 errors.ECODE_INVAL)
1987

    
1988
    if self.mode == constants.REPLACE_DISK_AUTO:
1989
      if not self._CheckDisksActivated(self.instance):
1990
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1991
                                   " first" % self.instance_name,
1992
                                   errors.ECODE_STATE)
1993
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
1994
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
1995

    
1996
      if faulty_primary and faulty_secondary:
1997
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1998
                                   " one node and can not be repaired"
1999
                                   " automatically" % self.instance_name,
2000
                                   errors.ECODE_STATE)
2001

    
2002
      if faulty_primary:
2003
        self.disks = faulty_primary
2004
        self.target_node_uuid = self.instance.primary_node
2005
        self.other_node_uuid = secondary_node_uuid
2006
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2007
      elif faulty_secondary:
2008
        self.disks = faulty_secondary
2009
        self.target_node_uuid = secondary_node_uuid
2010
        self.other_node_uuid = self.instance.primary_node
2011
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2012
      else:
2013
        self.disks = []
2014
        check_nodes = []
2015

    
2016
    else:
2017
      # Non-automatic modes
2018
      if self.mode == constants.REPLACE_DISK_PRI:
2019
        self.target_node_uuid = self.instance.primary_node
2020
        self.other_node_uuid = secondary_node_uuid
2021
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2022

    
2023
      elif self.mode == constants.REPLACE_DISK_SEC:
2024
        self.target_node_uuid = secondary_node_uuid
2025
        self.other_node_uuid = self.instance.primary_node
2026
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2027

    
2028
      elif self.mode == constants.REPLACE_DISK_CHG:
2029
        self.new_node_uuid = remote_node_uuid
2030
        self.other_node_uuid = self.instance.primary_node
2031
        self.target_node_uuid = secondary_node_uuid
2032
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2033

    
2034
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2035
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2036

    
2037
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2038
        assert old_node_info is not None
2039
        if old_node_info.offline and not self.early_release:
2040
          # doesn't make sense to delay the release
2041
          self.early_release = True
2042
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2043
                          " early-release mode", secondary_node_uuid)
2044

    
2045
      else:
2046
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2047
                                     self.mode)
2048

    
2049
      # If not specified all disks should be replaced
2050
      if not self.disks:
2051
        self.disks = range(len(self.instance.disks))
2052

    
2053
    # TODO: This is ugly, but right now we can't distinguish between internal
2054
    # submitted opcode and external one. We should fix that.
2055
    if self.remote_node_info:
2056
      # We change the node, lets verify it still meets instance policy
2057
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2058
      cluster = self.cfg.GetClusterInfo()
2059
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2060
                                                              new_group_info)
2061
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2062
                             self.remote_node_info, self.cfg,
2063
                             ignore=self.ignore_ipolicy)
2064

    
2065
    for node_uuid in check_nodes:
2066
      CheckNodeOnline(self.lu, node_uuid)
2067

    
2068
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2069
                                                          self.other_node_uuid,
2070
                                                          self.target_node_uuid]
2071
                              if node_uuid is not None)
2072

    
2073
    # Release unneeded node and node resource locks
2074
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2075
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2076
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2077

    
2078
    # Release any owned node group
2079
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2080

    
2081
    # Check whether disks are valid
2082
    for disk_idx in self.disks:
2083
      self.instance.FindDisk(disk_idx)
2084

    
2085
    # Get secondary node IP addresses
2086
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2087
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2088

    
2089
  def Exec(self, feedback_fn):
2090
    """Execute disk replacement.
2091

2092
    This dispatches the disk replacement to the appropriate handler.
2093

2094
    """
2095
    if __debug__:
2096
      # Verify owned locks before starting operation
2097
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2098
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2099
          ("Incorrect node locks, owning %s, expected %s" %
2100
           (owned_nodes, self.node_secondary_ip.keys()))
2101
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2102
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2103
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2104

    
2105
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2106
      assert list(owned_instances) == [self.instance_name], \
2107
          "Instance '%s' not locked" % self.instance_name
2108

    
2109
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2110
          "Should not own any node group lock at this point"
2111

    
2112
    if not self.disks:
2113
      feedback_fn("No disks need replacement for instance '%s'" %
2114
                  self.instance.name)
2115
      return
2116

    
2117
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2118
                (utils.CommaJoin(self.disks), self.instance.name))
2119
    feedback_fn("Current primary node: %s" %
2120
                self.cfg.GetNodeName(self.instance.primary_node))
2121
    feedback_fn("Current seconary node: %s" %
2122
                utils.CommaJoin(self.cfg.GetNodeNames(
2123
                                  self.instance.secondary_nodes)))
2124

    
2125
    activate_disks = not self.instance.disks_active
2126

    
2127
    # Activate the instance disks if we're replacing them on a down instance
2128
    if activate_disks:
2129
      StartInstanceDisks(self.lu, self.instance, True)
2130

    
2131
    try:
2132
      # Should we replace the secondary node?
2133
      if self.new_node_uuid is not None:
2134
        fn = self._ExecDrbd8Secondary
2135
      else:
2136
        fn = self._ExecDrbd8DiskOnly
2137

    
2138
      result = fn(feedback_fn)
2139
    finally:
2140
      # Deactivate the instance disks if we're replacing them on a
2141
      # down instance
2142
      if activate_disks:
2143
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2144

    
2145
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2146

    
2147
    if __debug__:
2148
      # Verify owned locks
2149
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2150
      nodes = frozenset(self.node_secondary_ip)
2151
      assert ((self.early_release and not owned_nodes) or
2152
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2153
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2154
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2155

    
2156
    return result
2157

    
2158
  def _CheckVolumeGroup(self, node_uuids):
2159
    self.lu.LogInfo("Checking volume groups")
2160

    
2161
    vgname = self.cfg.GetVGName()
2162

    
2163
    # Make sure volume group exists on all involved nodes
2164
    results = self.rpc.call_vg_list(node_uuids)
2165
    if not results:
2166
      raise errors.OpExecError("Can't list volume groups on the nodes")
2167

    
2168
    for node_uuid in node_uuids:
2169
      res = results[node_uuid]
2170
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2171
      if vgname not in res.payload:
2172
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2173
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2174

    
2175
  def _CheckDisksExistence(self, node_uuids):
2176
    # Check disk existence
2177
    for idx, dev in enumerate(self.instance.disks):
2178
      if idx not in self.disks:
2179
        continue
2180

    
2181
      for node_uuid in node_uuids:
2182
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2183
                        self.cfg.GetNodeName(node_uuid))
2184
        self.cfg.SetDiskID(dev, node_uuid)
2185

    
2186
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2187

    
2188
        msg = result.fail_msg
2189
        if msg or not result.payload:
2190
          if not msg:
2191
            msg = "disk not found"
2192
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2193
                                   (idx, self.cfg.GetNodeName(node_uuid), msg))
2194

    
2195
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2196
    for idx, dev in enumerate(self.instance.disks):
2197
      if idx not in self.disks:
2198
        continue
2199

    
2200
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2201
                      (idx, self.cfg.GetNodeName(node_uuid)))
2202

    
2203
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2204
                                  on_primary, ldisk=ldisk):
2205
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2206
                                 " replace disks for instance %s" %
2207
                                 (self.cfg.GetNodeName(node_uuid),
2208
                                  self.instance.name))
2209

    
2210
  def _CreateNewStorage(self, node_uuid):
2211
    """Create new storage on the primary or secondary node.
2212

2213
    This is only used for same-node replaces, not for changing the
2214
    secondary node, hence we don't want to modify the existing disk.
2215

2216
    """
2217
    iv_names = {}
2218

    
2219
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2220
    for idx, dev in enumerate(disks):
2221
      if idx not in self.disks:
2222
        continue
2223

    
2224
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2225
                      self.cfg.GetNodeName(node_uuid), idx)
2226

    
2227
      self.cfg.SetDiskID(dev, node_uuid)
2228

    
2229
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2230
      names = _GenerateUniqueNames(self.lu, lv_names)
2231

    
2232
      (data_disk, meta_disk) = dev.children
2233
      vg_data = data_disk.logical_id[0]
2234
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2235
                             logical_id=(vg_data, names[0]),
2236
                             params=data_disk.params)
2237
      vg_meta = meta_disk.logical_id[0]
2238
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2239
                             size=constants.DRBD_META_SIZE,
2240
                             logical_id=(vg_meta, names[1]),
2241
                             params=meta_disk.params)
2242

    
2243
      new_lvs = [lv_data, lv_meta]
2244
      old_lvs = [child.Copy() for child in dev.children]
2245
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2246
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2247

    
2248
      # we pass force_create=True to force the LVM creation
2249
      for new_lv in new_lvs:
2250
        try:
2251
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2252
                               GetInstanceInfoText(self.instance), False,
2253
                               excl_stor)
2254
        except errors.DeviceCreationError, e:
2255
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2256

    
2257
    return iv_names
2258

    
2259
  def _CheckDevices(self, node_uuid, iv_names):
2260
    for name, (dev, _, _) in iv_names.iteritems():
2261
      self.cfg.SetDiskID(dev, node_uuid)
2262

    
2263
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2264

    
2265
      msg = result.fail_msg
2266
      if msg or not result.payload:
2267
        if not msg:
2268
          msg = "disk not found"
2269
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2270
                                 (name, msg))
2271

    
2272
      if result.payload.is_degraded:
2273
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2274

    
2275
  def _RemoveOldStorage(self, node_uuid, iv_names):
2276
    for name, (_, old_lvs, _) in iv_names.iteritems():
2277
      self.lu.LogInfo("Remove logical volumes for %s", name)
2278

    
2279
      for lv in old_lvs:
2280
        self.cfg.SetDiskID(lv, node_uuid)
2281

    
2282
        msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2283
        if msg:
2284
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2285
                             hint="remove unused LVs manually")
2286

    
2287
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2288
    """Replace a disk on the primary or secondary for DRBD 8.
2289

2290
    The algorithm for replace is quite complicated:
2291

2292
      1. for each disk to be replaced:
2293

2294
        1. create new LVs on the target node with unique names
2295
        1. detach old LVs from the drbd device
2296
        1. rename old LVs to name_replaced.<time_t>
2297
        1. rename new LVs to old LVs
2298
        1. attach the new LVs (with the old names now) to the drbd device
2299

2300
      1. wait for sync across all devices
2301

2302
      1. for each modified disk:
2303

2304
        1. remove old LVs (which have the name name_replaces.<time_t>)
2305

2306
    Failures are not very well handled.
2307

2308
    """
2309
    steps_total = 6
2310

    
2311
    # Step: check device activation
2312
    self.lu.LogStep(1, steps_total, "Check device existence")
2313
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2314
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2315

    
2316
    # Step: check other node consistency
2317
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2318
    self._CheckDisksConsistency(
2319
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2320
      False)
2321

    
2322
    # Step: create new storage
2323
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2324
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2325

    
2326
    # Step: for each lv, detach+rename*2+attach
2327
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2328
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2329
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2330

    
2331
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2332
                                                     old_lvs)
2333
      result.Raise("Can't detach drbd from local storage on node"
2334
                   " %s for device %s" %
2335
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2336
      #dev.children = []
2337
      #cfg.Update(instance)
2338

    
2339
      # ok, we created the new LVs, so now we know we have the needed
2340
      # storage; as such, we proceed on the target node to rename
2341
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2342
      # using the assumption that logical_id == physical_id (which in
2343
      # turn is the unique_id on that node)
2344

    
2345
      # FIXME(iustin): use a better name for the replaced LVs
2346
      temp_suffix = int(time.time())
2347
      ren_fn = lambda d, suff: (d.physical_id[0],
2348
                                d.physical_id[1] + "_replaced-%s" % suff)
2349

    
2350
      # Build the rename list based on what LVs exist on the node
2351
      rename_old_to_new = []
2352
      for to_ren in old_lvs:
2353
        result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2354
        if not result.fail_msg and result.payload:
2355
          # device exists
2356
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2357

    
2358
      self.lu.LogInfo("Renaming the old LVs on the target node")
2359
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2360
                                             rename_old_to_new)
2361
      result.Raise("Can't rename old LVs on node %s" %
2362
                   self.cfg.GetNodeName(self.target_node_uuid))
2363

    
2364
      # Now we rename the new LVs to the old LVs
2365
      self.lu.LogInfo("Renaming the new LVs on the target node")
2366
      rename_new_to_old = [(new, old.physical_id)
2367
                           for old, new in zip(old_lvs, new_lvs)]
2368
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2369
                                             rename_new_to_old)
2370
      result.Raise("Can't rename new LVs on node %s" %
2371
                   self.cfg.GetNodeName(self.target_node_uuid))
2372

    
2373
      # Intermediate steps of in memory modifications
2374
      for old, new in zip(old_lvs, new_lvs):
2375
        new.logical_id = old.logical_id
2376
        self.cfg.SetDiskID(new, self.target_node_uuid)
2377

    
2378
      # We need to modify old_lvs so that removal later removes the
2379
      # right LVs, not the newly added ones; note that old_lvs is a
2380
      # copy here
2381
      for disk in old_lvs:
2382
        disk.logical_id = ren_fn(disk, temp_suffix)
2383
        self.cfg.SetDiskID(disk, self.target_node_uuid)
2384

    
2385
      # Now that the new lvs have the old name, we can add them to the device
2386
      self.lu.LogInfo("Adding new mirror component on %s",
2387
                      self.cfg.GetNodeName(self.target_node_uuid))
2388
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2389
                                                  (dev, self.instance), new_lvs)
2390
      msg = result.fail_msg
2391
      if msg:
2392
        for new_lv in new_lvs:
2393
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2394
                                               new_lv).fail_msg
2395
          if msg2:
2396
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2397
                               hint=("cleanup manually the unused logical"
2398
                                     "volumes"))
2399
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2400

    
2401
    cstep = itertools.count(5)
2402

    
2403
    if self.early_release:
2404
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2405
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2406
      # TODO: Check if releasing locks early still makes sense
2407
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2408
    else:
2409
      # Release all resource locks except those used by the instance
2410
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2411
                   keep=self.node_secondary_ip.keys())
2412

    
2413
    # Release all node locks while waiting for sync
2414
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2415

    
2416
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2417
    # shutdown in the caller into consideration.
2418

    
2419
    # Wait for sync
2420
    # This can fail as the old devices are degraded and _WaitForSync
2421
    # does a combined result over all disks, so we don't check its return value
2422
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2423
    WaitForSync(self.lu, self.instance)
2424

    
2425
    # Check all devices manually
2426
    self._CheckDevices(self.instance.primary_node, iv_names)
2427

    
2428
    # Step: remove old storage
2429
    if not self.early_release:
2430
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2431
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2432

    
2433
  def _ExecDrbd8Secondary(self, feedback_fn):
2434
    """Replace the secondary node for DRBD 8.
2435

2436
    The algorithm for replace is quite complicated:
2437
      - for all disks of the instance:
2438
        - create new LVs on the new node with same names
2439
        - shutdown the drbd device on the old secondary
2440
        - disconnect the drbd network on the primary
2441
        - create the drbd device on the new secondary
2442
        - network attach the drbd on the primary, using an artifice:
2443
          the drbd code for Attach() will connect to the network if it
2444
          finds a device which is connected to the good local disks but
2445
          not network enabled
2446
      - wait for sync across all devices
2447
      - remove all disks from the old secondary
2448

2449
    Failures are not very well handled.
2450

2451
    """
2452
    steps_total = 6
2453

    
2454
    pnode = self.instance.primary_node
2455

    
2456
    # Step: check device activation
2457
    self.lu.LogStep(1, steps_total, "Check device existence")
2458
    self._CheckDisksExistence([self.instance.primary_node])
2459
    self._CheckVolumeGroup([self.instance.primary_node])
2460

    
2461
    # Step: check other node consistency
2462
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2463
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2464

    
2465
    # Step: create new storage
2466
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2467
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2468
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2469
                                                  self.new_node_uuid)
2470
    for idx, dev in enumerate(disks):
2471
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2472
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2473
      # we pass force_create=True to force LVM creation
2474
      for new_lv in dev.children:
2475
        try:
2476
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2477
                               new_lv, True, GetInstanceInfoText(self.instance),
2478
                               False, excl_stor)
2479
        except errors.DeviceCreationError, e:
2480
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2481

    
2482
    # Step 4: dbrd minors and drbd setups changes
2483
    # after this, we must manually remove the drbd minors on both the
2484
    # error and the success paths
2485
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2486
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2487
                                         for _ in self.instance.disks],
2488
                                        self.instance.uuid)
2489
    logging.debug("Allocated minors %r", minors)
2490

    
2491
    iv_names = {}
2492
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2493
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2494
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2495
      # create new devices on new_node; note that we create two IDs:
2496
      # one without port, so the drbd will be activated without
2497
      # networking information on the new node at this stage, and one
2498
      # with network, for the latter activation in step 4
2499
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2500
      if self.instance.primary_node == o_node1:
2501
        p_minor = o_minor1
2502
      else:
2503
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2504
        p_minor = o_minor2
2505

    
2506
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2507
                      p_minor, new_minor, o_secret)
2508
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2509
                    p_minor, new_minor, o_secret)
2510

    
2511
      iv_names[idx] = (dev, dev.children, new_net_id)
2512
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2513
                    new_net_id)
2514
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2515
                              logical_id=new_alone_id,
2516
                              children=dev.children,
2517
                              size=dev.size,
2518
                              params={})
2519
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2520
                                            self.cfg)
2521
      try:
2522
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2523
                             anno_new_drbd,
2524
                             GetInstanceInfoText(self.instance), False,
2525
                             excl_stor)
2526
      except errors.GenericError:
2527
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2528
        raise
2529

    
2530
    # We have new devices, shutdown the drbd on the old secondary
2531
    for idx, dev in enumerate(self.instance.disks):
2532
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2533
      self.cfg.SetDiskID(dev, self.target_node_uuid)
2534
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2535
                                            (dev, self.instance)).fail_msg
2536
      if msg:
2537
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2538
                           "node: %s" % (idx, msg),
2539
                           hint=("Please cleanup this device manually as"
2540
                                 " soon as possible"))
2541

    
2542
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2543
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2544
                                               self.instance.disks)[pnode]
2545

    
2546
    msg = result.fail_msg
2547
    if msg:
2548
      # detaches didn't succeed (unlikely)
2549
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2550
      raise errors.OpExecError("Can't detach the disks from the network on"
2551
                               " old node: %s" % (msg,))
2552

    
2553
    # if we managed to detach at least one, we update all the disks of
2554
    # the instance to point to the new secondary
2555
    self.lu.LogInfo("Updating instance configuration")
2556
    for dev, _, new_logical_id in iv_names.itervalues():
2557
      dev.logical_id = new_logical_id
2558
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2559

    
2560
    self.cfg.Update(self.instance, feedback_fn)
2561

    
2562
    # Release all node locks (the configuration has been updated)
2563
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2564

    
2565
    # and now perform the drbd attach
2566
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2567
                    " (standalone => connected)")
2568
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2569
                                            self.new_node_uuid],
2570
                                           self.node_secondary_ip,
2571
                                           (self.instance.disks, self.instance),
2572
                                           self.instance.name,
2573
                                           False)
2574
    for to_node, to_result in result.items():
2575
      msg = to_result.fail_msg
2576
      if msg:
2577
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2578
                           self.cfg.GetNodeName(to_node), msg,
2579
                           hint=("please do a gnt-instance info to see the"
2580
                                 " status of disks"))
2581

    
2582
    cstep = itertools.count(5)
2583

    
2584
    if self.early_release:
2585
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2586
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2587
      # TODO: Check if releasing locks early still makes sense
2588
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2589
    else:
2590
      # Release all resource locks except those used by the instance
2591
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2592
                   keep=self.node_secondary_ip.keys())
2593

    
2594
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2595
    # shutdown in the caller into consideration.
2596

    
2597
    # Wait for sync
2598
    # This can fail as the old devices are degraded and _WaitForSync
2599
    # does a combined result over all disks, so we don't check its return value
2600
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2601
    WaitForSync(self.lu, self.instance)
2602

    
2603
    # Check all devices manually
2604
    self._CheckDevices(self.instance.primary_node, iv_names)
2605

    
2606
    # Step: remove old storage
2607
    if not self.early_release:
2608
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2609
      self._RemoveOldStorage(self.target_node_uuid, iv_names)