Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ c42be2c0

History | View | Annotate | Download (98.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
59
                         excl_stor):
60
  """Create a single block device on a given node.
61

62
  This will not recurse over children of the device, so they must be
63
  created in advance.
64

65
  @param lu: the lu on whose behalf we execute
66
  @param node_uuid: the node on which to create the device
67
  @type instance: L{objects.Instance}
68
  @param instance: the instance which owns the device
69
  @type device: L{objects.Disk}
70
  @param device: the device to create
71
  @param info: the extra 'metadata' we should attach to the device
72
      (this will be represented as a LVM tag)
73
  @type force_open: boolean
74
  @param force_open: this parameter will be passes to the
75
      L{backend.BlockdevCreate} function where it specifies
76
      whether we run on primary or not, and it affects both
77
      the child assembly and the device own Open() execution
78
  @type excl_stor: boolean
79
  @param excl_stor: Whether exclusive_storage is active for the node
80

81
  """
82
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
83
                                       device.size, instance.name, force_open,
84
                                       info, excl_stor)
85
  result.Raise("Can't create block device %s on"
86
               " node %s for instance %s" % (device,
87
                                             lu.cfg.GetNodeName(node_uuid),
88
                                             instance.name))
89

    
90

    
91
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
92
                         info, force_open, excl_stor):
93
  """Create a tree of block devices on a given node.
94

95
  If this device type has to be created on secondaries, create it and
96
  all its children.
97

98
  If not, just recurse to children keeping the same 'force' value.
99

100
  @attention: The device has to be annotated already.
101

102
  @param lu: the lu on whose behalf we execute
103
  @param node_uuid: the node on which to create the device
104
  @type instance: L{objects.Instance}
105
  @param instance: the instance which owns the device
106
  @type device: L{objects.Disk}
107
  @param device: the device to create
108
  @type force_create: boolean
109
  @param force_create: whether to force creation of this device; this
110
      will be change to True whenever we find a device which has
111
      CreateOnSecondary() attribute
112
  @param info: the extra 'metadata' we should attach to the device
113
      (this will be represented as a LVM tag)
114
  @type force_open: boolean
115
  @param force_open: this parameter will be passes to the
116
      L{backend.BlockdevCreate} function where it specifies
117
      whether we run on primary or not, and it affects both
118
      the child assembly and the device own Open() execution
119
  @type excl_stor: boolean
120
  @param excl_stor: Whether exclusive_storage is active for the node
121

122
  @return: list of created devices
123
  """
124
  created_devices = []
125
  try:
126
    if device.CreateOnSecondary():
127
      force_create = True
128

    
129
    if device.children:
130
      for child in device.children:
131
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
132
                                    force_create, info, force_open, excl_stor)
133
        created_devices.extend(devs)
134

    
135
    if not force_create:
136
      return created_devices
137

    
138
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
139
                         excl_stor)
140
    # The device has been completely created, so there is no point in keeping
141
    # its subdevices in the list. We just add the device itself instead.
142
    created_devices = [(node_uuid, device)]
143
    return created_devices
144

    
145
  except errors.DeviceCreationError, e:
146
    e.created_devices.extend(created_devices)
147
    raise e
148
  except errors.OpExecError, e:
149
    raise errors.DeviceCreationError(str(e), created_devices)
150

    
151

    
152
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
153
  """Whether exclusive_storage is in effect for the given node.
154

155
  @type cfg: L{config.ConfigWriter}
156
  @param cfg: The cluster configuration
157
  @type node_uuid: string
158
  @param node_uuid: The node UUID
159
  @rtype: bool
160
  @return: The effective value of exclusive_storage
161
  @raise errors.OpPrereqError: if no node exists with the given name
162

163
  """
164
  ni = cfg.GetNodeInfo(node_uuid)
165
  if ni is None:
166
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
167
                               errors.ECODE_NOENT)
168
  return IsExclusiveStorageEnabledNode(cfg, ni)
169

    
170

    
171
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
172
                    force_open):
173
  """Wrapper around L{_CreateBlockDevInner}.
174

175
  This method annotates the root device first.
176

177
  """
178
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
179
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
180
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
181
                              force_open, excl_stor)
182

    
183

    
184
def _UndoCreateDisks(lu, disks_created, instance):
185
  """Undo the work performed by L{CreateDisks}.
186

187
  This function is called in case of an error to undo the work of
188
  L{CreateDisks}.
189

190
  @type lu: L{LogicalUnit}
191
  @param lu: the logical unit on whose behalf we execute
192
  @param disks_created: the result returned by L{CreateDisks}
193
  @type instance: L{objects.Instance}
194
  @param instance: the instance for which disks were created
195

196
  """
197
  for (node_uuid, disk) in disks_created:
198
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
199
    result.Warn("Failed to remove newly-created disk %s on node %s" %
200
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
201

    
202

    
203
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
204
  """Create all disks for an instance.
205

206
  This abstracts away some work from AddInstance.
207

208
  @type lu: L{LogicalUnit}
209
  @param lu: the logical unit on whose behalf we execute
210
  @type instance: L{objects.Instance}
211
  @param instance: the instance whose disks we should create
212
  @type to_skip: list
213
  @param to_skip: list of indices to skip
214
  @type target_node_uuid: string
215
  @param target_node_uuid: if passed, overrides the target node for creation
216
  @type disks: list of {objects.Disk}
217
  @param disks: the disks to create; if not specified, all the disks of the
218
      instance are created
219
  @return: information about the created disks, to be used to call
220
      L{_UndoCreateDisks}
221
  @raise errors.OpPrereqError: in case of error
222

223
  """
224
  info = GetInstanceInfoText(instance)
225
  if target_node_uuid is None:
226
    pnode_uuid = instance.primary_node
227
    all_node_uuids = instance.all_nodes
228
  else:
229
    pnode_uuid = target_node_uuid
230
    all_node_uuids = [pnode_uuid]
231

    
232
  if disks is None:
233
    disks = instance.disks
234

    
235
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
236

    
237
  if instance.disk_template in constants.DTS_FILEBASED:
238
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
239
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
240

    
241
    result.Raise("Failed to create directory '%s' on"
242
                 " node %s" % (file_storage_dir,
243
                               lu.cfg.GetNodeName(pnode_uuid)))
244

    
245
  disks_created = []
246
  for idx, device in enumerate(disks):
247
    if to_skip and idx in to_skip:
248
      continue
249
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
250
    for node_uuid in all_node_uuids:
251
      f_create = node_uuid == pnode_uuid
252
      try:
253
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
254
                        f_create)
255
        disks_created.append((node_uuid, device))
256
      except errors.DeviceCreationError, e:
257
        logging.warning("Creating disk %s for instance '%s' failed",
258
                        idx, instance.name)
259
        disks_created.extend(e.created_devices)
260
        _UndoCreateDisks(lu, disks_created, instance)
261
        raise errors.OpExecError(e.message)
262
  return disks_created
263

    
264

    
265
def ComputeDiskSizePerVG(disk_template, disks):
266
  """Compute disk size requirements in the volume group
267

268
  """
269
  def _compute(disks, payload):
270
    """Universal algorithm.
271

272
    """
273
    vgs = {}
274
    for disk in disks:
275
      vgs[disk[constants.IDISK_VG]] = \
276
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
277

    
278
    return vgs
279

    
280
  # Required free disk space as a function of disk and swap space
281
  req_size_dict = {
282
    constants.DT_DISKLESS: {},
283
    constants.DT_PLAIN: _compute(disks, 0),
284
    # 128 MB are added for drbd metadata for each disk
285
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
286
    constants.DT_FILE: {},
287
    constants.DT_SHARED_FILE: {},
288
    }
289

    
290
  if disk_template not in req_size_dict:
291
    raise errors.ProgrammerError("Disk template '%s' size requirement"
292
                                 " is unknown" % disk_template)
293

    
294
  return req_size_dict[disk_template]
295

    
296

    
297
def ComputeDisks(op, default_vg):
298
  """Computes the instance disks.
299

300
  @param op: The instance opcode
301
  @param default_vg: The default_vg to assume
302

303
  @return: The computed disks
304

305
  """
306
  disks = []
307
  for disk in op.disks:
308
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
309
    if mode not in constants.DISK_ACCESS_SET:
310
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
311
                                 mode, errors.ECODE_INVAL)
312
    size = disk.get(constants.IDISK_SIZE, None)
313
    if size is None:
314
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
315
    try:
316
      size = int(size)
317
    except (TypeError, ValueError):
318
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
319
                                 errors.ECODE_INVAL)
320

    
321
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
322
    if ext_provider and op.disk_template != constants.DT_EXT:
323
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
324
                                 " disk template, not %s" %
325
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
326
                                  op.disk_template), errors.ECODE_INVAL)
327

    
328
    data_vg = disk.get(constants.IDISK_VG, default_vg)
329
    name = disk.get(constants.IDISK_NAME, None)
330
    if name is not None and name.lower() == constants.VALUE_NONE:
331
      name = None
332
    new_disk = {
333
      constants.IDISK_SIZE: size,
334
      constants.IDISK_MODE: mode,
335
      constants.IDISK_VG: data_vg,
336
      constants.IDISK_NAME: name,
337
      }
338

    
339
    for key in [
340
      constants.IDISK_METAVG,
341
      constants.IDISK_ADOPT,
342
      constants.IDISK_SPINDLES,
343
      ]:
344
      if key in disk:
345
        new_disk[key] = disk[key]
346

    
347
    # For extstorage, demand the `provider' option and add any
348
    # additional parameters (ext-params) to the dict
349
    if op.disk_template == constants.DT_EXT:
350
      if ext_provider:
351
        new_disk[constants.IDISK_PROVIDER] = ext_provider
352
        for key in disk:
353
          if key not in constants.IDISK_PARAMS:
354
            new_disk[key] = disk[key]
355
      else:
356
        raise errors.OpPrereqError("Missing provider for template '%s'" %
357
                                   constants.DT_EXT, errors.ECODE_INVAL)
358

    
359
    disks.append(new_disk)
360

    
361
  return disks
362

    
363

    
364
def CheckRADOSFreeSpace():
365
  """Compute disk size requirements inside the RADOS cluster.
366

367
  """
368
  # For the RADOS cluster we assume there is always enough space.
369
  pass
370

    
371

    
372
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
373
                         iv_name, p_minor, s_minor):
374
  """Generate a drbd8 device complete with its children.
375

376
  """
377
  assert len(vgnames) == len(names) == 2
378
  port = lu.cfg.AllocatePort()
379
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
380

    
381
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
382
                          logical_id=(vgnames[0], names[0]),
383
                          params={})
384
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
385
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
386
                          size=constants.DRBD_META_SIZE,
387
                          logical_id=(vgnames[1], names[1]),
388
                          params={})
389
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
390
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
391
                          logical_id=(primary_uuid, secondary_uuid, port,
392
                                      p_minor, s_minor,
393
                                      shared_secret),
394
                          children=[dev_data, dev_meta],
395
                          iv_name=iv_name, params={})
396
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  return drbd_dev
398

    
399

    
400
def GenerateDiskTemplate(
401
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
402
  disk_info, file_storage_dir, file_driver, base_index,
403
  feedback_fn, full_disk_params):
404
  """Generate the entire disk layout for a given template type.
405

406
  """
407
  vgname = lu.cfg.GetVGName()
408
  disk_count = len(disk_info)
409
  disks = []
410

    
411
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
412

    
413
  if template_name == constants.DT_DISKLESS:
414
    pass
415
  elif template_name == constants.DT_DRBD8:
416
    if len(secondary_node_uuids) != 1:
417
      raise errors.ProgrammerError("Wrong template configuration")
418
    remote_node_uuid = secondary_node_uuids[0]
419
    minors = lu.cfg.AllocateDRBDMinor(
420
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
421

    
422
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
423
                                                       full_disk_params)
424
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
425

    
426
    names = []
427
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
428
                                               for i in range(disk_count)]):
429
      names.append(lv_prefix + "_data")
430
      names.append(lv_prefix + "_meta")
431
    for idx, disk in enumerate(disk_info):
432
      disk_index = idx + base_index
433
      data_vg = disk.get(constants.IDISK_VG, vgname)
434
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
435
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
436
                                      disk[constants.IDISK_SIZE],
437
                                      [data_vg, meta_vg],
438
                                      names[idx * 2:idx * 2 + 2],
439
                                      "disk/%d" % disk_index,
440
                                      minors[idx * 2], minors[idx * 2 + 1])
441
      disk_dev.mode = disk[constants.IDISK_MODE]
442
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
443
      disks.append(disk_dev)
444
  else:
445
    if secondary_node_uuids:
446
      raise errors.ProgrammerError("Wrong template configuration")
447

    
448
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
449
    if name_prefix is None:
450
      names = None
451
    else:
452
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
453
                                        (name_prefix, base_index + i)
454
                                        for i in range(disk_count)])
455

    
456
    if template_name == constants.DT_PLAIN:
457

    
458
      def logical_id_fn(idx, _, disk):
459
        vg = disk.get(constants.IDISK_VG, vgname)
460
        return (vg, names[idx])
461

    
462
    elif template_name in constants.DTS_FILEBASED:
463
      logical_id_fn = \
464
        lambda _, disk_index, disk: (file_driver,
465
                                     "%s/disk%d" % (file_storage_dir,
466
                                                    disk_index))
467
    elif template_name == constants.DT_BLOCK:
468
      logical_id_fn = \
469
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
470
                                       disk[constants.IDISK_ADOPT])
471
    elif template_name == constants.DT_RBD:
472
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
473
    elif template_name == constants.DT_EXT:
474
      def logical_id_fn(idx, _, disk):
475
        provider = disk.get(constants.IDISK_PROVIDER, None)
476
        if provider is None:
477
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
478
                                       " not found", constants.DT_EXT,
479
                                       constants.IDISK_PROVIDER)
480
        return (provider, names[idx])
481
    else:
482
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
483

    
484
    dev_type = template_name
485

    
486
    for idx, disk in enumerate(disk_info):
487
      params = {}
488
      # Only for the Ext template add disk_info to params
489
      if template_name == constants.DT_EXT:
490
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
491
        for key in disk:
492
          if key not in constants.IDISK_PARAMS:
493
            params[key] = disk[key]
494
      disk_index = idx + base_index
495
      size = disk[constants.IDISK_SIZE]
496
      feedback_fn("* disk %s, size %s" %
497
                  (disk_index, utils.FormatUnit(size, "h")))
498
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
499
                              logical_id=logical_id_fn(idx, disk_index, disk),
500
                              iv_name="disk/%d" % disk_index,
501
                              mode=disk[constants.IDISK_MODE],
502
                              params=params,
503
                              spindles=disk.get(constants.IDISK_SPINDLES))
504
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
505
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
506
      disks.append(disk_dev)
507

    
508
  return disks
509

    
510

    
511
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
512
  """Check the presence of the spindle options with exclusive_storage.
513

514
  @type diskdict: dict
515
  @param diskdict: disk parameters
516
  @type es_flag: bool
517
  @param es_flag: the effective value of the exlusive_storage flag
518
  @type required: bool
519
  @param required: whether spindles are required or just optional
520
  @raise errors.OpPrereqError when spindles are given and they should not
521

522
  """
523
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
524
      diskdict[constants.IDISK_SPINDLES] is not None):
525
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
526
                               " when exclusive storage is not active",
527
                               errors.ECODE_INVAL)
528
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
529
                                diskdict[constants.IDISK_SPINDLES] is None)):
530
    raise errors.OpPrereqError("You must specify spindles in instance disks"
531
                               " when exclusive storage is active",
532
                               errors.ECODE_INVAL)
533

    
534

    
535
class LUInstanceRecreateDisks(LogicalUnit):
536
  """Recreate an instance's missing disks.
537

538
  """
539
  HPATH = "instance-recreate-disks"
540
  HTYPE = constants.HTYPE_INSTANCE
541
  REQ_BGL = False
542

    
543
  _MODIFYABLE = compat.UniqueFrozenset([
544
    constants.IDISK_SIZE,
545
    constants.IDISK_MODE,
546
    constants.IDISK_SPINDLES,
547
    ])
548

    
549
  # New or changed disk parameters may have different semantics
550
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
551
    constants.IDISK_ADOPT,
552

    
553
    # TODO: Implement support changing VG while recreating
554
    constants.IDISK_VG,
555
    constants.IDISK_METAVG,
556
    constants.IDISK_PROVIDER,
557
    constants.IDISK_NAME,
558
    ]))
559

    
560
  def _RunAllocator(self):
561
    """Run the allocator based on input opcode.
562

563
    """
564
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
565

    
566
    # FIXME
567
    # The allocator should actually run in "relocate" mode, but current
568
    # allocators don't support relocating all the nodes of an instance at
569
    # the same time. As a workaround we use "allocate" mode, but this is
570
    # suboptimal for two reasons:
571
    # - The instance name passed to the allocator is present in the list of
572
    #   existing instances, so there could be a conflict within the
573
    #   internal structures of the allocator. This doesn't happen with the
574
    #   current allocators, but it's a liability.
575
    # - The allocator counts the resources used by the instance twice: once
576
    #   because the instance exists already, and once because it tries to
577
    #   allocate a new instance.
578
    # The allocator could choose some of the nodes on which the instance is
579
    # running, but that's not a problem. If the instance nodes are broken,
580
    # they should be already be marked as drained or offline, and hence
581
    # skipped by the allocator. If instance disks have been lost for other
582
    # reasons, then recreating the disks on the same nodes should be fine.
583
    disk_template = self.instance.disk_template
584
    spindle_use = be_full[constants.BE_SPINDLE_USE]
585
    disks = [{
586
      constants.IDISK_SIZE: d.size,
587
      constants.IDISK_MODE: d.mode,
588
      constants.IDISK_SPINDLES: d.spindles,
589
      } for d in self.instance.disks]
590
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
591
                                        disk_template=disk_template,
592
                                        tags=list(self.instance.GetTags()),
593
                                        os=self.instance.os,
594
                                        nics=[{}],
595
                                        vcpus=be_full[constants.BE_VCPUS],
596
                                        memory=be_full[constants.BE_MAXMEM],
597
                                        spindle_use=spindle_use,
598
                                        disks=disks,
599
                                        hypervisor=self.instance.hypervisor,
600
                                        node_whitelist=None)
601
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
602

    
603
    ial.Run(self.op.iallocator)
604

    
605
    assert req.RequiredNodes() == len(self.instance.all_nodes)
606

    
607
    if not ial.success:
608
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
609
                                 " %s" % (self.op.iallocator, ial.info),
610
                                 errors.ECODE_NORES)
611

    
612
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
613
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
614
                 self.op.instance_name, self.op.iallocator,
615
                 utils.CommaJoin(self.op.nodes))
616

    
617
  def CheckArguments(self):
618
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
619
      # Normalize and convert deprecated list of disk indices
620
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
621

    
622
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
623
    if duplicates:
624
      raise errors.OpPrereqError("Some disks have been specified more than"
625
                                 " once: %s" % utils.CommaJoin(duplicates),
626
                                 errors.ECODE_INVAL)
627

    
628
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
629
    # when neither iallocator nor nodes are specified
630
    if self.op.iallocator or self.op.nodes:
631
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
632

    
633
    for (idx, params) in self.op.disks:
634
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
635
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
636
      if unsupported:
637
        raise errors.OpPrereqError("Parameters for disk %s try to change"
638
                                   " unmodifyable parameter(s): %s" %
639
                                   (idx, utils.CommaJoin(unsupported)),
640
                                   errors.ECODE_INVAL)
641

    
642
  def ExpandNames(self):
643
    self._ExpandAndLockInstance()
644
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
645

    
646
    if self.op.nodes:
647
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
648
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
649
    else:
650
      self.needed_locks[locking.LEVEL_NODE] = []
651
      if self.op.iallocator:
652
        # iallocator will select a new node in the same group
653
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
654
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
655

    
656
    self.needed_locks[locking.LEVEL_NODE_RES] = []
657

    
658
  def DeclareLocks(self, level):
659
    if level == locking.LEVEL_NODEGROUP:
660
      assert self.op.iallocator is not None
661
      assert not self.op.nodes
662
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
663
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
664
      # Lock the primary group used by the instance optimistically; this
665
      # requires going via the node before it's locked, requiring
666
      # verification later on
667
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
668
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
669

    
670
    elif level == locking.LEVEL_NODE:
671
      # If an allocator is used, then we lock all the nodes in the current
672
      # instance group, as we don't know yet which ones will be selected;
673
      # if we replace the nodes without using an allocator, locks are
674
      # already declared in ExpandNames; otherwise, we need to lock all the
675
      # instance nodes for disk re-creation
676
      if self.op.iallocator:
677
        assert not self.op.nodes
678
        assert not self.needed_locks[locking.LEVEL_NODE]
679
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
680

    
681
        # Lock member nodes of the group of the primary node
682
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
683
          self.needed_locks[locking.LEVEL_NODE].extend(
684
            self.cfg.GetNodeGroup(group_uuid).members)
685

    
686
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
687
      elif not self.op.nodes:
688
        self._LockInstancesNodes(primary_only=False)
689
    elif level == locking.LEVEL_NODE_RES:
690
      # Copy node locks
691
      self.needed_locks[locking.LEVEL_NODE_RES] = \
692
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
693

    
694
  def BuildHooksEnv(self):
695
    """Build hooks env.
696

697
    This runs on master, primary and secondary nodes of the instance.
698

699
    """
700
    return BuildInstanceHookEnvByObject(self, self.instance)
701

    
702
  def BuildHooksNodes(self):
703
    """Build hooks nodes.
704

705
    """
706
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
707
    return (nl, nl)
708

    
709
  def CheckPrereq(self):
710
    """Check prerequisites.
711

712
    This checks that the instance is in the cluster and is not running.
713

714
    """
715
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
716
    assert instance is not None, \
717
      "Cannot retrieve locked instance %s" % self.op.instance_name
718
    if self.op.node_uuids:
719
      if len(self.op.node_uuids) != len(instance.all_nodes):
720
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
721
                                   " %d replacement nodes were specified" %
722
                                   (instance.name, len(instance.all_nodes),
723
                                    len(self.op.node_uuids)),
724
                                   errors.ECODE_INVAL)
725
      assert instance.disk_template != constants.DT_DRBD8 or \
726
             len(self.op.node_uuids) == 2
727
      assert instance.disk_template != constants.DT_PLAIN or \
728
             len(self.op.node_uuids) == 1
729
      primary_node = self.op.node_uuids[0]
730
    else:
731
      primary_node = instance.primary_node
732
    if not self.op.iallocator:
733
      CheckNodeOnline(self, primary_node)
734

    
735
    if instance.disk_template == constants.DT_DISKLESS:
736
      raise errors.OpPrereqError("Instance '%s' has no disks" %
737
                                 self.op.instance_name, errors.ECODE_INVAL)
738

    
739
    # Verify if node group locks are still correct
740
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
741
    if owned_groups:
742
      # Node group locks are acquired only for the primary node (and only
743
      # when the allocator is used)
744
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
745
                              primary_only=True)
746

    
747
    # if we replace nodes *and* the old primary is offline, we don't
748
    # check the instance state
749
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
750
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
751
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
752
                         msg="cannot recreate disks")
753

    
754
    if self.op.disks:
755
      self.disks = dict(self.op.disks)
756
    else:
757
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
758

    
759
    maxidx = max(self.disks.keys())
760
    if maxidx >= len(instance.disks):
761
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
762
                                 errors.ECODE_INVAL)
763

    
764
    if ((self.op.node_uuids or self.op.iallocator) and
765
         sorted(self.disks.keys()) != range(len(instance.disks))):
766
      raise errors.OpPrereqError("Can't recreate disks partially and"
767
                                 " change the nodes at the same time",
768
                                 errors.ECODE_INVAL)
769

    
770
    self.instance = instance
771

    
772
    if self.op.iallocator:
773
      self._RunAllocator()
774
      # Release unneeded node and node resource locks
775
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
776
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
777
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
778

    
779
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
780

    
781
    if self.op.node_uuids:
782
      node_uuids = self.op.node_uuids
783
    else:
784
      node_uuids = instance.all_nodes
785
    excl_stor = compat.any(
786
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
787
      )
788
    for new_params in self.disks.values():
789
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
790

    
791
  def Exec(self, feedback_fn):
792
    """Recreate the disks.
793

794
    """
795
    assert (self.owned_locks(locking.LEVEL_NODE) ==
796
            self.owned_locks(locking.LEVEL_NODE_RES))
797

    
798
    to_skip = []
799
    mods = [] # keeps track of needed changes
800

    
801
    for idx, disk in enumerate(self.instance.disks):
802
      try:
803
        changes = self.disks[idx]
804
      except KeyError:
805
        # Disk should not be recreated
806
        to_skip.append(idx)
807
        continue
808

    
809
      # update secondaries for disks, if needed
810
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
811
        # need to update the nodes and minors
812
        assert len(self.op.node_uuids) == 2
813
        assert len(disk.logical_id) == 6 # otherwise disk internals
814
                                         # have changed
815
        (_, _, old_port, _, _, old_secret) = disk.logical_id
816
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
817
                                                self.instance.uuid)
818
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
819
                  new_minors[0], new_minors[1], old_secret)
820
        assert len(disk.logical_id) == len(new_id)
821
      else:
822
        new_id = None
823

    
824
      mods.append((idx, new_id, changes))
825

    
826
    # now that we have passed all asserts above, we can apply the mods
827
    # in a single run (to avoid partial changes)
828
    for idx, new_id, changes in mods:
829
      disk = self.instance.disks[idx]
830
      if new_id is not None:
831
        assert disk.dev_type == constants.DT_DRBD8
832
        disk.logical_id = new_id
833
      if changes:
834
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
835
                    mode=changes.get(constants.IDISK_MODE, None),
836
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
837

    
838
    # change primary node, if needed
839
    if self.op.node_uuids:
840
      self.instance.primary_node = self.op.node_uuids[0]
841
      self.LogWarning("Changing the instance's nodes, you will have to"
842
                      " remove any disks left on the older nodes manually")
843

    
844
    if self.op.node_uuids:
845
      self.cfg.Update(self.instance, feedback_fn)
846

    
847
    # All touched nodes must be locked
848
    mylocks = self.owned_locks(locking.LEVEL_NODE)
849
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
850
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
851

    
852
    # TODO: Release node locks before wiping, or explain why it's not possible
853
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
854
      wipedisks = [(idx, disk, 0)
855
                   for (idx, disk) in enumerate(self.instance.disks)
856
                   if idx not in to_skip]
857
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
858
                         cleanup=new_disks)
859

    
860

    
861
def _PerformNodeInfoCall(lu, node_uuids, vg):
862
  """Prepares the input and performs a node info call.
863

864
  @type lu: C{LogicalUnit}
865
  @param lu: a logical unit from which we get configuration data
866
  @type node_uuids: list of string
867
  @param node_uuids: list of node UUIDs to perform the call for
868
  @type vg: string
869
  @param vg: the volume group's name
870

871
  """
872
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
873
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
874
                                                  node_uuids)
875
  hvname = lu.cfg.GetHypervisorType()
876
  hvparams = lu.cfg.GetClusterInfo().hvparams
877
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
878
                                   [(hvname, hvparams[hvname])])
879
  return nodeinfo
880

    
881

    
882
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
883
  """Checks the vg capacity for a given node.
884

885
  @type node_info: tuple (_, list of dicts, _)
886
  @param node_info: the result of the node info call for one node
887
  @type node_name: string
888
  @param node_name: the name of the node
889
  @type vg: string
890
  @param vg: volume group name
891
  @type requested: int
892
  @param requested: the amount of disk in MiB to check for
893
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
894
      or we cannot check the node
895

896
  """
897
  (_, space_info, _) = node_info
898
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
899
      space_info, constants.ST_LVM_VG)
900
  if not lvm_vg_info:
901
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
902
  vg_free = lvm_vg_info.get("storage_free", None)
903
  if not isinstance(vg_free, int):
904
    raise errors.OpPrereqError("Can't compute free disk space on node"
905
                               " %s for vg %s, result was '%s'" %
906
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
907
  if requested > vg_free:
908
    raise errors.OpPrereqError("Not enough disk space on target node %s"
909
                               " vg %s: required %d MiB, available %d MiB" %
910
                               (node_name, vg, requested, vg_free),
911
                               errors.ECODE_NORES)
912

    
913

    
914
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
915
  """Checks if nodes have enough free disk space in the specified VG.
916

917
  This function checks if all given nodes have the needed amount of
918
  free disk. In case any node has less disk or we cannot get the
919
  information from the node, this function raises an OpPrereqError
920
  exception.
921

922
  @type lu: C{LogicalUnit}
923
  @param lu: a logical unit from which we get configuration data
924
  @type node_uuids: C{list}
925
  @param node_uuids: the list of node UUIDs to check
926
  @type vg: C{str}
927
  @param vg: the volume group to check
928
  @type requested: C{int}
929
  @param requested: the amount of disk in MiB to check for
930
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
931
      or we cannot check the node
932

933
  """
934
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
935
  for node_uuid in node_uuids:
936
    node_name = lu.cfg.GetNodeName(node_uuid)
937
    info = nodeinfo[node_uuid]
938
    info.Raise("Cannot get current information from node %s" % node_name,
939
               prereq=True, ecode=errors.ECODE_ENVIRON)
940
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
941

    
942

    
943
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
944
  """Checks if nodes have enough free disk space in all the VGs.
945

946
  This function checks if all given nodes have the needed amount of
947
  free disk. In case any node has less disk or we cannot get the
948
  information from the node, this function raises an OpPrereqError
949
  exception.
950

951
  @type lu: C{LogicalUnit}
952
  @param lu: a logical unit from which we get configuration data
953
  @type node_uuids: C{list}
954
  @param node_uuids: the list of node UUIDs to check
955
  @type req_sizes: C{dict}
956
  @param req_sizes: the hash of vg and corresponding amount of disk in
957
      MiB to check for
958
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
959
      or we cannot check the node
960

961
  """
962
  for vg, req_size in req_sizes.items():
963
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
964

    
965

    
966
def _DiskSizeInBytesToMebibytes(lu, size):
967
  """Converts a disk size in bytes to mebibytes.
968

969
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
970

971
  """
972
  (mib, remainder) = divmod(size, 1024 * 1024)
973

    
974
  if remainder != 0:
975
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
976
                  " to not overwrite existing data (%s bytes will not be"
977
                  " wiped)", (1024 * 1024) - remainder)
978
    mib += 1
979

    
980
  return mib
981

    
982

    
983
def _CalcEta(time_taken, written, total_size):
984
  """Calculates the ETA based on size written and total size.
985

986
  @param time_taken: The time taken so far
987
  @param written: amount written so far
988
  @param total_size: The total size of data to be written
989
  @return: The remaining time in seconds
990

991
  """
992
  avg_time = time_taken / float(written)
993
  return (total_size - written) * avg_time
994

    
995

    
996
def WipeDisks(lu, instance, disks=None):
997
  """Wipes instance disks.
998

999
  @type lu: L{LogicalUnit}
1000
  @param lu: the logical unit on whose behalf we execute
1001
  @type instance: L{objects.Instance}
1002
  @param instance: the instance whose disks we should create
1003
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1004
  @param disks: Disk details; tuple contains disk index, disk object and the
1005
    start offset
1006

1007
  """
1008
  node_uuid = instance.primary_node
1009
  node_name = lu.cfg.GetNodeName(node_uuid)
1010

    
1011
  if disks is None:
1012
    disks = [(idx, disk, 0)
1013
             for (idx, disk) in enumerate(instance.disks)]
1014

    
1015
  logging.info("Pausing synchronization of disks of instance '%s'",
1016
               instance.name)
1017
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1018
                                                  (map(compat.snd, disks),
1019
                                                   instance),
1020
                                                  True)
1021
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1022

    
1023
  for idx, success in enumerate(result.payload):
1024
    if not success:
1025
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1026
                   " failed", idx, instance.name)
1027

    
1028
  try:
1029
    for (idx, device, offset) in disks:
1030
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1031
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1032
      wipe_chunk_size = \
1033
        int(min(constants.MAX_WIPE_CHUNK,
1034
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1035

    
1036
      size = device.size
1037
      last_output = 0
1038
      start_time = time.time()
1039

    
1040
      if offset == 0:
1041
        info_text = ""
1042
      else:
1043
        info_text = (" (from %s to %s)" %
1044
                     (utils.FormatUnit(offset, "h"),
1045
                      utils.FormatUnit(size, "h")))
1046

    
1047
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1048

    
1049
      logging.info("Wiping disk %d for instance %s on node %s using"
1050
                   " chunk size %s", idx, instance.name, node_name,
1051
                   wipe_chunk_size)
1052

    
1053
      while offset < size:
1054
        wipe_size = min(wipe_chunk_size, size - offset)
1055

    
1056
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1057
                      idx, offset, wipe_size)
1058

    
1059
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1060
                                           offset, wipe_size)
1061
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1062
                     (idx, offset, wipe_size))
1063

    
1064
        now = time.time()
1065
        offset += wipe_size
1066
        if now - last_output >= 60:
1067
          eta = _CalcEta(now - start_time, offset, size)
1068
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1069
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1070
          last_output = now
1071
  finally:
1072
    logging.info("Resuming synchronization of disks for instance '%s'",
1073
                 instance.name)
1074

    
1075
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1076
                                                    (map(compat.snd, disks),
1077
                                                     instance),
1078
                                                    False)
1079

    
1080
    if result.fail_msg:
1081
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1082
                    node_name, result.fail_msg)
1083
    else:
1084
      for idx, success in enumerate(result.payload):
1085
        if not success:
1086
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1087
                        " failed", idx, instance.name)
1088

    
1089

    
1090
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1091
  """Wrapper for L{WipeDisks} that handles errors.
1092

1093
  @type lu: L{LogicalUnit}
1094
  @param lu: the logical unit on whose behalf we execute
1095
  @type instance: L{objects.Instance}
1096
  @param instance: the instance whose disks we should wipe
1097
  @param disks: see L{WipeDisks}
1098
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1099
      case of error
1100
  @raise errors.OpPrereqError: in case of failure
1101

1102
  """
1103
  try:
1104
    WipeDisks(lu, instance, disks=disks)
1105
  except errors.OpExecError:
1106
    logging.warning("Wiping disks for instance '%s' failed",
1107
                    instance.name)
1108
    _UndoCreateDisks(lu, cleanup, instance)
1109
    raise
1110

    
1111

    
1112
def ExpandCheckDisks(instance, disks):
1113
  """Return the instance disks selected by the disks list
1114

1115
  @type disks: list of L{objects.Disk} or None
1116
  @param disks: selected disks
1117
  @rtype: list of L{objects.Disk}
1118
  @return: selected instance disks to act on
1119

1120
  """
1121
  if disks is None:
1122
    return instance.disks
1123
  else:
1124
    if not set(disks).issubset(instance.disks):
1125
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1126
                                   " target instance: expected a subset of %r,"
1127
                                   " got %r" % (instance.disks, disks))
1128
    return disks
1129

    
1130

    
1131
def WaitForSync(lu, instance, disks=None, oneshot=False):
1132
  """Sleep and poll for an instance's disk to sync.
1133

1134
  """
1135
  if not instance.disks or disks is not None and not disks:
1136
    return True
1137

    
1138
  disks = ExpandCheckDisks(instance, disks)
1139

    
1140
  if not oneshot:
1141
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1142

    
1143
  node_uuid = instance.primary_node
1144
  node_name = lu.cfg.GetNodeName(node_uuid)
1145

    
1146
  # TODO: Convert to utils.Retry
1147

    
1148
  retries = 0
1149
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1150
  while True:
1151
    max_time = 0
1152
    done = True
1153
    cumul_degraded = False
1154
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1155
    msg = rstats.fail_msg
1156
    if msg:
1157
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1158
      retries += 1
1159
      if retries >= 10:
1160
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1161
                                 " aborting." % node_name)
1162
      time.sleep(6)
1163
      continue
1164
    rstats = rstats.payload
1165
    retries = 0
1166
    for i, mstat in enumerate(rstats):
1167
      if mstat is None:
1168
        lu.LogWarning("Can't compute data for node %s/%s",
1169
                      node_name, disks[i].iv_name)
1170
        continue
1171

    
1172
      cumul_degraded = (cumul_degraded or
1173
                        (mstat.is_degraded and mstat.sync_percent is None))
1174
      if mstat.sync_percent is not None:
1175
        done = False
1176
        if mstat.estimated_time is not None:
1177
          rem_time = ("%s remaining (estimated)" %
1178
                      utils.FormatSeconds(mstat.estimated_time))
1179
          max_time = mstat.estimated_time
1180
        else:
1181
          rem_time = "no time estimate"
1182
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1183
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1184

    
1185
    # if we're done but degraded, let's do a few small retries, to
1186
    # make sure we see a stable and not transient situation; therefore
1187
    # we force restart of the loop
1188
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1189
      logging.info("Degraded disks found, %d retries left", degr_retries)
1190
      degr_retries -= 1
1191
      time.sleep(1)
1192
      continue
1193

    
1194
    if done or oneshot:
1195
      break
1196

    
1197
    time.sleep(min(60, max_time))
1198

    
1199
  if done:
1200
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1201

    
1202
  return not cumul_degraded
1203

    
1204

    
1205
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1206
  """Shutdown block devices of an instance.
1207

1208
  This does the shutdown on all nodes of the instance.
1209

1210
  If the ignore_primary is false, errors on the primary node are
1211
  ignored.
1212

1213
  """
1214
  all_result = True
1215

    
1216
  if disks is None:
1217
    # only mark instance disks as inactive if all disks are affected
1218
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1219
  disks = ExpandCheckDisks(instance, disks)
1220

    
1221
  for disk in disks:
1222
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1223
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1224
      msg = result.fail_msg
1225
      if msg:
1226
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1227
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1228
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1229
            (node_uuid != instance.primary_node and not result.offline)):
1230
          all_result = False
1231
  return all_result
1232

    
1233

    
1234
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1235
  """Shutdown block devices of an instance.
1236

1237
  This function checks if an instance is running, before calling
1238
  _ShutdownInstanceDisks.
1239

1240
  """
1241
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1242
  ShutdownInstanceDisks(lu, instance, disks=disks)
1243

    
1244

    
1245
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1246
                          ignore_size=False):
1247
  """Prepare the block devices for an instance.
1248

1249
  This sets up the block devices on all nodes.
1250

1251
  @type lu: L{LogicalUnit}
1252
  @param lu: the logical unit on whose behalf we execute
1253
  @type instance: L{objects.Instance}
1254
  @param instance: the instance for whose disks we assemble
1255
  @type disks: list of L{objects.Disk} or None
1256
  @param disks: which disks to assemble (or all, if None)
1257
  @type ignore_secondaries: boolean
1258
  @param ignore_secondaries: if true, errors on secondary nodes
1259
      won't result in an error return from the function
1260
  @type ignore_size: boolean
1261
  @param ignore_size: if true, the current known size of the disk
1262
      will not be used during the disk activation, useful for cases
1263
      when the size is wrong
1264
  @return: False if the operation failed, otherwise a list of
1265
      (host, instance_visible_name, node_visible_name)
1266
      with the mapping from node devices to instance devices
1267

1268
  """
1269
  device_info = []
1270
  disks_ok = True
1271

    
1272
  if disks is None:
1273
    # only mark instance disks as active if all disks are affected
1274
    lu.cfg.MarkInstanceDisksActive(instance.uuid)
1275

    
1276
  disks = ExpandCheckDisks(instance, disks)
1277

    
1278
  # With the two passes mechanism we try to reduce the window of
1279
  # opportunity for the race condition of switching DRBD to primary
1280
  # before handshaking occured, but we do not eliminate it
1281

    
1282
  # The proper fix would be to wait (with some limits) until the
1283
  # connection has been made and drbd transitions from WFConnection
1284
  # into any other network-connected state (Connected, SyncTarget,
1285
  # SyncSource, etc.)
1286

    
1287
  # 1st pass, assemble on all nodes in secondary mode
1288
  for idx, inst_disk in enumerate(disks):
1289
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1290
                                  instance.primary_node):
1291
      if ignore_size:
1292
        node_disk = node_disk.Copy()
1293
        node_disk.UnsetSize()
1294
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1295
                                             instance.name, False, idx)
1296
      msg = result.fail_msg
1297
      if msg:
1298
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1299
                                result.offline)
1300
        lu.LogWarning("Could not prepare block device %s on node %s"
1301
                      " (is_primary=False, pass=1): %s",
1302
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1303
        if not (ignore_secondaries or is_offline_secondary):
1304
          disks_ok = False
1305

    
1306
  # FIXME: race condition on drbd migration to primary
1307

    
1308
  # 2nd pass, do only the primary node
1309
  for idx, inst_disk in enumerate(disks):
1310
    dev_path = None
1311

    
1312
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1313
                                  instance.primary_node):
1314
      if node_uuid != instance.primary_node:
1315
        continue
1316
      if ignore_size:
1317
        node_disk = node_disk.Copy()
1318
        node_disk.UnsetSize()
1319
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1320
                                             instance.name, True, idx)
1321
      msg = result.fail_msg
1322
      if msg:
1323
        lu.LogWarning("Could not prepare block device %s on node %s"
1324
                      " (is_primary=True, pass=2): %s",
1325
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1326
        disks_ok = False
1327
      else:
1328
        dev_path, _ = result.payload
1329

    
1330
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1331
                        inst_disk.iv_name, dev_path))
1332

    
1333
  if not disks_ok:
1334
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1335

    
1336
  return disks_ok, device_info
1337

    
1338

    
1339
def StartInstanceDisks(lu, instance, force):
1340
  """Start the disks of an instance.
1341

1342
  """
1343
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1344
                                      ignore_secondaries=force)
1345
  if not disks_ok:
1346
    ShutdownInstanceDisks(lu, instance)
1347
    if force is not None and not force:
1348
      lu.LogWarning("",
1349
                    hint=("If the message above refers to a secondary node,"
1350
                          " you can retry the operation using '--force'"))
1351
    raise errors.OpExecError("Disk consistency error")
1352

    
1353

    
1354
class LUInstanceGrowDisk(LogicalUnit):
1355
  """Grow a disk of an instance.
1356

1357
  """
1358
  HPATH = "disk-grow"
1359
  HTYPE = constants.HTYPE_INSTANCE
1360
  REQ_BGL = False
1361

    
1362
  def ExpandNames(self):
1363
    self._ExpandAndLockInstance()
1364
    self.needed_locks[locking.LEVEL_NODE] = []
1365
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1366
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1367
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1368

    
1369
  def DeclareLocks(self, level):
1370
    if level == locking.LEVEL_NODE:
1371
      self._LockInstancesNodes()
1372
    elif level == locking.LEVEL_NODE_RES:
1373
      # Copy node locks
1374
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1375
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1376

    
1377
  def BuildHooksEnv(self):
1378
    """Build hooks env.
1379

1380
    This runs on the master, the primary and all the secondaries.
1381

1382
    """
1383
    env = {
1384
      "DISK": self.op.disk,
1385
      "AMOUNT": self.op.amount,
1386
      "ABSOLUTE": self.op.absolute,
1387
      }
1388
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1389
    return env
1390

    
1391
  def BuildHooksNodes(self):
1392
    """Build hooks nodes.
1393

1394
    """
1395
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1396
    return (nl, nl)
1397

    
1398
  def CheckPrereq(self):
1399
    """Check prerequisites.
1400

1401
    This checks that the instance is in the cluster.
1402

1403
    """
1404
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1405
    assert self.instance is not None, \
1406
      "Cannot retrieve locked instance %s" % self.op.instance_name
1407
    node_uuids = list(self.instance.all_nodes)
1408
    for node_uuid in node_uuids:
1409
      CheckNodeOnline(self, node_uuid)
1410
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1411

    
1412
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1413
      raise errors.OpPrereqError("Instance's disk layout does not support"
1414
                                 " growing", errors.ECODE_INVAL)
1415

    
1416
    self.disk = self.instance.FindDisk(self.op.disk)
1417

    
1418
    if self.op.absolute:
1419
      self.target = self.op.amount
1420
      self.delta = self.target - self.disk.size
1421
      if self.delta < 0:
1422
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1423
                                   "current disk size (%s)" %
1424
                                   (utils.FormatUnit(self.target, "h"),
1425
                                    utils.FormatUnit(self.disk.size, "h")),
1426
                                   errors.ECODE_STATE)
1427
    else:
1428
      self.delta = self.op.amount
1429
      self.target = self.disk.size + self.delta
1430
      if self.delta < 0:
1431
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1432
                                   utils.FormatUnit(self.delta, "h"),
1433
                                   errors.ECODE_INVAL)
1434

    
1435
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1436

    
1437
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1438
    template = self.instance.disk_template
1439
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1440
        not any(self.node_es_flags.values())):
1441
      # TODO: check the free disk space for file, when that feature will be
1442
      # supported
1443
      # With exclusive storage we need to do something smarter than just looking
1444
      # at free space, which, in the end, is basically a dry run. So we rely on
1445
      # the dry run performed in Exec() instead.
1446
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1447

    
1448
  def Exec(self, feedback_fn):
1449
    """Execute disk grow.
1450

1451
    """
1452
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1453
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1454
            self.owned_locks(locking.LEVEL_NODE_RES))
1455

    
1456
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1457

    
1458
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1459
    if not disks_ok:
1460
      raise errors.OpExecError("Cannot activate block device to grow")
1461

    
1462
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1463
                (self.op.disk, self.instance.name,
1464
                 utils.FormatUnit(self.delta, "h"),
1465
                 utils.FormatUnit(self.target, "h")))
1466

    
1467
    # First run all grow ops in dry-run mode
1468
    for node_uuid in self.instance.all_nodes:
1469
      result = self.rpc.call_blockdev_grow(node_uuid,
1470
                                           (self.disk, self.instance),
1471
                                           self.delta, True, True,
1472
                                           self.node_es_flags[node_uuid])
1473
      result.Raise("Dry-run grow request failed to node %s" %
1474
                   self.cfg.GetNodeName(node_uuid))
1475

    
1476
    if wipe_disks:
1477
      # Get disk size from primary node for wiping
1478
      result = self.rpc.call_blockdev_getdimensions(
1479
                 self.instance.primary_node, [([self.disk], self.instance)])
1480
      result.Raise("Failed to retrieve disk size from node '%s'" %
1481
                   self.instance.primary_node)
1482

    
1483
      (disk_dimensions, ) = result.payload
1484

    
1485
      if disk_dimensions is None:
1486
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1487
                                 " node '%s'" % self.instance.primary_node)
1488
      (disk_size_in_bytes, _) = disk_dimensions
1489

    
1490
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1491

    
1492
      assert old_disk_size >= self.disk.size, \
1493
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1494
         (old_disk_size, self.disk.size))
1495
    else:
1496
      old_disk_size = None
1497

    
1498
    # We know that (as far as we can test) operations across different
1499
    # nodes will succeed, time to run it for real on the backing storage
1500
    for node_uuid in self.instance.all_nodes:
1501
      result = self.rpc.call_blockdev_grow(node_uuid,
1502
                                           (self.disk, self.instance),
1503
                                           self.delta, False, True,
1504
                                           self.node_es_flags[node_uuid])
1505
      result.Raise("Grow request failed to node %s" %
1506
                   self.cfg.GetNodeName(node_uuid))
1507

    
1508
    # And now execute it for logical storage, on the primary node
1509
    node_uuid = self.instance.primary_node
1510
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1511
                                         self.delta, False, False,
1512
                                         self.node_es_flags[node_uuid])
1513
    result.Raise("Grow request failed to node %s" %
1514
                 self.cfg.GetNodeName(node_uuid))
1515

    
1516
    self.disk.RecordGrow(self.delta)
1517
    self.cfg.Update(self.instance, feedback_fn)
1518

    
1519
    # Changes have been recorded, release node lock
1520
    ReleaseLocks(self, locking.LEVEL_NODE)
1521

    
1522
    # Downgrade lock while waiting for sync
1523
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1524

    
1525
    assert wipe_disks ^ (old_disk_size is None)
1526

    
1527
    if wipe_disks:
1528
      assert self.instance.disks[self.op.disk] == self.disk
1529

    
1530
      # Wipe newly added disk space
1531
      WipeDisks(self, self.instance,
1532
                disks=[(self.op.disk, self.disk, old_disk_size)])
1533

    
1534
    if self.op.wait_for_sync:
1535
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1536
      if disk_abort:
1537
        self.LogWarning("Disk syncing has not returned a good status; check"
1538
                        " the instance")
1539
      if not self.instance.disks_active:
1540
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1541
    elif not self.instance.disks_active:
1542
      self.LogWarning("Not shutting down the disk even if the instance is"
1543
                      " not supposed to be running because no wait for"
1544
                      " sync mode was requested")
1545

    
1546
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1547
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1548

    
1549

    
1550
class LUInstanceReplaceDisks(LogicalUnit):
1551
  """Replace the disks of an instance.
1552

1553
  """
1554
  HPATH = "mirrors-replace"
1555
  HTYPE = constants.HTYPE_INSTANCE
1556
  REQ_BGL = False
1557

    
1558
  def CheckArguments(self):
1559
    """Check arguments.
1560

1561
    """
1562
    if self.op.mode == constants.REPLACE_DISK_CHG:
1563
      if self.op.remote_node is None and self.op.iallocator is None:
1564
        raise errors.OpPrereqError("When changing the secondary either an"
1565
                                   " iallocator script must be used or the"
1566
                                   " new node given", errors.ECODE_INVAL)
1567
      else:
1568
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1569

    
1570
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1571
      # Not replacing the secondary
1572
      raise errors.OpPrereqError("The iallocator and new node options can"
1573
                                 " only be used when changing the"
1574
                                 " secondary node", errors.ECODE_INVAL)
1575

    
1576
  def ExpandNames(self):
1577
    self._ExpandAndLockInstance()
1578

    
1579
    assert locking.LEVEL_NODE not in self.needed_locks
1580
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1581
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1582

    
1583
    assert self.op.iallocator is None or self.op.remote_node is None, \
1584
      "Conflicting options"
1585

    
1586
    if self.op.remote_node is not None:
1587
      (self.op.remote_node_uuid, self.op.remote_node) = \
1588
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1589
                              self.op.remote_node)
1590

    
1591
      # Warning: do not remove the locking of the new secondary here
1592
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1593
      # currently it doesn't since parallel invocations of
1594
      # FindUnusedMinor will conflict
1595
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1596
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1597
    else:
1598
      self.needed_locks[locking.LEVEL_NODE] = []
1599
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1600

    
1601
      if self.op.iallocator is not None:
1602
        # iallocator will select a new node in the same group
1603
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1604
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1605

    
1606
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1607

    
1608
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1609
                                   self.op.instance_name, self.op.mode,
1610
                                   self.op.iallocator, self.op.remote_node_uuid,
1611
                                   self.op.disks, self.op.early_release,
1612
                                   self.op.ignore_ipolicy)
1613

    
1614
    self.tasklets = [self.replacer]
1615

    
1616
  def DeclareLocks(self, level):
1617
    if level == locking.LEVEL_NODEGROUP:
1618
      assert self.op.remote_node_uuid is None
1619
      assert self.op.iallocator is not None
1620
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1621

    
1622
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1623
      # Lock all groups used by instance optimistically; this requires going
1624
      # via the node before it's locked, requiring verification later on
1625
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1626
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1627

    
1628
    elif level == locking.LEVEL_NODE:
1629
      if self.op.iallocator is not None:
1630
        assert self.op.remote_node_uuid is None
1631
        assert not self.needed_locks[locking.LEVEL_NODE]
1632
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1633

    
1634
        # Lock member nodes of all locked groups
1635
        self.needed_locks[locking.LEVEL_NODE] = \
1636
          [node_uuid
1637
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1638
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1639
      else:
1640
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1641

    
1642
        self._LockInstancesNodes()
1643

    
1644
    elif level == locking.LEVEL_NODE_RES:
1645
      # Reuse node locks
1646
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1647
        self.needed_locks[locking.LEVEL_NODE]
1648

    
1649
  def BuildHooksEnv(self):
1650
    """Build hooks env.
1651

1652
    This runs on the master, the primary and all the secondaries.
1653

1654
    """
1655
    instance = self.replacer.instance
1656
    env = {
1657
      "MODE": self.op.mode,
1658
      "NEW_SECONDARY": self.op.remote_node,
1659
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1660
      }
1661
    env.update(BuildInstanceHookEnvByObject(self, instance))
1662
    return env
1663

    
1664
  def BuildHooksNodes(self):
1665
    """Build hooks nodes.
1666

1667
    """
1668
    instance = self.replacer.instance
1669
    nl = [
1670
      self.cfg.GetMasterNode(),
1671
      instance.primary_node,
1672
      ]
1673
    if self.op.remote_node_uuid is not None:
1674
      nl.append(self.op.remote_node_uuid)
1675
    return nl, nl
1676

    
1677
  def CheckPrereq(self):
1678
    """Check prerequisites.
1679

1680
    """
1681
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1682
            self.op.iallocator is None)
1683

    
1684
    # Verify if node group locks are still correct
1685
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1686
    if owned_groups:
1687
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1688

    
1689
    return LogicalUnit.CheckPrereq(self)
1690

    
1691

    
1692
class LUInstanceActivateDisks(NoHooksLU):
1693
  """Bring up an instance's disks.
1694

1695
  """
1696
  REQ_BGL = False
1697

    
1698
  def ExpandNames(self):
1699
    self._ExpandAndLockInstance()
1700
    self.needed_locks[locking.LEVEL_NODE] = []
1701
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1702

    
1703
  def DeclareLocks(self, level):
1704
    if level == locking.LEVEL_NODE:
1705
      self._LockInstancesNodes()
1706

    
1707
  def CheckPrereq(self):
1708
    """Check prerequisites.
1709

1710
    This checks that the instance is in the cluster.
1711

1712
    """
1713
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1714
    assert self.instance is not None, \
1715
      "Cannot retrieve locked instance %s" % self.op.instance_name
1716
    CheckNodeOnline(self, self.instance.primary_node)
1717

    
1718
  def Exec(self, feedback_fn):
1719
    """Activate the disks.
1720

1721
    """
1722
    disks_ok, disks_info = \
1723
              AssembleInstanceDisks(self, self.instance,
1724
                                    ignore_size=self.op.ignore_size)
1725
    if not disks_ok:
1726
      raise errors.OpExecError("Cannot activate block devices")
1727

    
1728
    if self.op.wait_for_sync:
1729
      if not WaitForSync(self, self.instance):
1730
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1731
        raise errors.OpExecError("Some disks of the instance are degraded!")
1732

    
1733
    return disks_info
1734

    
1735

    
1736
class LUInstanceDeactivateDisks(NoHooksLU):
1737
  """Shutdown an instance's disks.
1738

1739
  """
1740
  REQ_BGL = False
1741

    
1742
  def ExpandNames(self):
1743
    self._ExpandAndLockInstance()
1744
    self.needed_locks[locking.LEVEL_NODE] = []
1745
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1746

    
1747
  def DeclareLocks(self, level):
1748
    if level == locking.LEVEL_NODE:
1749
      self._LockInstancesNodes()
1750

    
1751
  def CheckPrereq(self):
1752
    """Check prerequisites.
1753

1754
    This checks that the instance is in the cluster.
1755

1756
    """
1757
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1758
    assert self.instance is not None, \
1759
      "Cannot retrieve locked instance %s" % self.op.instance_name
1760

    
1761
  def Exec(self, feedback_fn):
1762
    """Deactivate the disks
1763

1764
    """
1765
    if self.op.force:
1766
      ShutdownInstanceDisks(self, self.instance)
1767
    else:
1768
      _SafeShutdownInstanceDisks(self, self.instance)
1769

    
1770

    
1771
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1772
                               ldisk=False):
1773
  """Check that mirrors are not degraded.
1774

1775
  @attention: The device has to be annotated already.
1776

1777
  The ldisk parameter, if True, will change the test from the
1778
  is_degraded attribute (which represents overall non-ok status for
1779
  the device(s)) to the ldisk (representing the local storage status).
1780

1781
  """
1782
  result = True
1783

    
1784
  if on_primary or dev.AssembleOnSecondary():
1785
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1786
    msg = rstats.fail_msg
1787
    if msg:
1788
      lu.LogWarning("Can't find disk on node %s: %s",
1789
                    lu.cfg.GetNodeName(node_uuid), msg)
1790
      result = False
1791
    elif not rstats.payload:
1792
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1793
      result = False
1794
    else:
1795
      if ldisk:
1796
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1797
      else:
1798
        result = result and not rstats.payload.is_degraded
1799

    
1800
  if dev.children:
1801
    for child in dev.children:
1802
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1803
                                                     node_uuid, on_primary)
1804

    
1805
  return result
1806

    
1807

    
1808
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1809
  """Wrapper around L{_CheckDiskConsistencyInner}.
1810

1811
  """
1812
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1813
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1814
                                    ldisk=ldisk)
1815

    
1816

    
1817
def _BlockdevFind(lu, node_uuid, dev, instance):
1818
  """Wrapper around call_blockdev_find to annotate diskparams.
1819

1820
  @param lu: A reference to the lu object
1821
  @param node_uuid: The node to call out
1822
  @param dev: The device to find
1823
  @param instance: The instance object the device belongs to
1824
  @returns The result of the rpc call
1825

1826
  """
1827
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1828
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1829

    
1830

    
1831
def _GenerateUniqueNames(lu, exts):
1832
  """Generate a suitable LV name.
1833

1834
  This will generate a logical volume name for the given instance.
1835

1836
  """
1837
  results = []
1838
  for val in exts:
1839
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1840
    results.append("%s%s" % (new_id, val))
1841
  return results
1842

    
1843

    
1844
class TLReplaceDisks(Tasklet):
1845
  """Replaces disks for an instance.
1846

1847
  Note: Locking is not within the scope of this class.
1848

1849
  """
1850
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1851
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1852
    """Initializes this class.
1853

1854
    """
1855
    Tasklet.__init__(self, lu)
1856

    
1857
    # Parameters
1858
    self.instance_uuid = instance_uuid
1859
    self.instance_name = instance_name
1860
    self.mode = mode
1861
    self.iallocator_name = iallocator_name
1862
    self.remote_node_uuid = remote_node_uuid
1863
    self.disks = disks
1864
    self.early_release = early_release
1865
    self.ignore_ipolicy = ignore_ipolicy
1866

    
1867
    # Runtime data
1868
    self.instance = None
1869
    self.new_node_uuid = None
1870
    self.target_node_uuid = None
1871
    self.other_node_uuid = None
1872
    self.remote_node_info = None
1873
    self.node_secondary_ip = None
1874

    
1875
  @staticmethod
1876
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1877
                    relocate_from_node_uuids):
1878
    """Compute a new secondary node using an IAllocator.
1879

1880
    """
1881
    req = iallocator.IAReqRelocate(
1882
          inst_uuid=instance_uuid,
1883
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1884
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1885

    
1886
    ial.Run(iallocator_name)
1887

    
1888
    if not ial.success:
1889
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1890
                                 " %s" % (iallocator_name, ial.info),
1891
                                 errors.ECODE_NORES)
1892

    
1893
    remote_node_name = ial.result[0]
1894
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1895

    
1896
    if remote_node is None:
1897
      raise errors.OpPrereqError("Node %s not found in configuration" %
1898
                                 remote_node_name, errors.ECODE_NOENT)
1899

    
1900
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1901
               instance_uuid, remote_node_name)
1902

    
1903
    return remote_node.uuid
1904

    
1905
  def _FindFaultyDisks(self, node_uuid):
1906
    """Wrapper for L{FindFaultyInstanceDisks}.
1907

1908
    """
1909
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1910
                                   node_uuid, True)
1911

    
1912
  def _CheckDisksActivated(self, instance):
1913
    """Checks if the instance disks are activated.
1914

1915
    @param instance: The instance to check disks
1916
    @return: True if they are activated, False otherwise
1917

1918
    """
1919
    node_uuids = instance.all_nodes
1920

    
1921
    for idx, dev in enumerate(instance.disks):
1922
      for node_uuid in node_uuids:
1923
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1924
                        self.cfg.GetNodeName(node_uuid))
1925

    
1926
        result = _BlockdevFind(self, node_uuid, dev, instance)
1927

    
1928
        if result.offline:
1929
          continue
1930
        elif result.fail_msg or not result.payload:
1931
          return False
1932

    
1933
    return True
1934

    
1935
  def CheckPrereq(self):
1936
    """Check prerequisites.
1937

1938
    This checks that the instance is in the cluster.
1939

1940
    """
1941
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1942
    assert self.instance is not None, \
1943
      "Cannot retrieve locked instance %s" % self.instance_name
1944

    
1945
    if self.instance.disk_template != constants.DT_DRBD8:
1946
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1947
                                 " instances", errors.ECODE_INVAL)
1948

    
1949
    if len(self.instance.secondary_nodes) != 1:
1950
      raise errors.OpPrereqError("The instance has a strange layout,"
1951
                                 " expected one secondary but found %d" %
1952
                                 len(self.instance.secondary_nodes),
1953
                                 errors.ECODE_FAULT)
1954

    
1955
    secondary_node_uuid = self.instance.secondary_nodes[0]
1956

    
1957
    if self.iallocator_name is None:
1958
      remote_node_uuid = self.remote_node_uuid
1959
    else:
1960
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1961
                                            self.instance.uuid,
1962
                                            self.instance.secondary_nodes)
1963

    
1964
    if remote_node_uuid is None:
1965
      self.remote_node_info = None
1966
    else:
1967
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1968
             "Remote node '%s' is not locked" % remote_node_uuid
1969

    
1970
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1971
      assert self.remote_node_info is not None, \
1972
        "Cannot retrieve locked node %s" % remote_node_uuid
1973

    
1974
    if remote_node_uuid == self.instance.primary_node:
1975
      raise errors.OpPrereqError("The specified node is the primary node of"
1976
                                 " the instance", errors.ECODE_INVAL)
1977

    
1978
    if remote_node_uuid == secondary_node_uuid:
1979
      raise errors.OpPrereqError("The specified node is already the"
1980
                                 " secondary node of the instance",
1981
                                 errors.ECODE_INVAL)
1982

    
1983
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1984
                                    constants.REPLACE_DISK_CHG):
1985
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1986
                                 errors.ECODE_INVAL)
1987

    
1988
    if self.mode == constants.REPLACE_DISK_AUTO:
1989
      if not self._CheckDisksActivated(self.instance):
1990
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1991
                                   " first" % self.instance_name,
1992
                                   errors.ECODE_STATE)
1993
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
1994
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
1995

    
1996
      if faulty_primary and faulty_secondary:
1997
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1998
                                   " one node and can not be repaired"
1999
                                   " automatically" % self.instance_name,
2000
                                   errors.ECODE_STATE)
2001

    
2002
      if faulty_primary:
2003
        self.disks = faulty_primary
2004
        self.target_node_uuid = self.instance.primary_node
2005
        self.other_node_uuid = secondary_node_uuid
2006
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2007
      elif faulty_secondary:
2008
        self.disks = faulty_secondary
2009
        self.target_node_uuid = secondary_node_uuid
2010
        self.other_node_uuid = self.instance.primary_node
2011
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2012
      else:
2013
        self.disks = []
2014
        check_nodes = []
2015

    
2016
    else:
2017
      # Non-automatic modes
2018
      if self.mode == constants.REPLACE_DISK_PRI:
2019
        self.target_node_uuid = self.instance.primary_node
2020
        self.other_node_uuid = secondary_node_uuid
2021
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2022

    
2023
      elif self.mode == constants.REPLACE_DISK_SEC:
2024
        self.target_node_uuid = secondary_node_uuid
2025
        self.other_node_uuid = self.instance.primary_node
2026
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2027

    
2028
      elif self.mode == constants.REPLACE_DISK_CHG:
2029
        self.new_node_uuid = remote_node_uuid
2030
        self.other_node_uuid = self.instance.primary_node
2031
        self.target_node_uuid = secondary_node_uuid
2032
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2033

    
2034
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2035
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2036

    
2037
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2038
        assert old_node_info is not None
2039
        if old_node_info.offline and not self.early_release:
2040
          # doesn't make sense to delay the release
2041
          self.early_release = True
2042
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2043
                          " early-release mode", secondary_node_uuid)
2044

    
2045
      else:
2046
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2047
                                     self.mode)
2048

    
2049
      # If not specified all disks should be replaced
2050
      if not self.disks:
2051
        self.disks = range(len(self.instance.disks))
2052

    
2053
    # TODO: This is ugly, but right now we can't distinguish between internal
2054
    # submitted opcode and external one. We should fix that.
2055
    if self.remote_node_info:
2056
      # We change the node, lets verify it still meets instance policy
2057
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2058
      cluster = self.cfg.GetClusterInfo()
2059
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2060
                                                              new_group_info)
2061
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2062
                             self.remote_node_info, self.cfg,
2063
                             ignore=self.ignore_ipolicy)
2064

    
2065
    for node_uuid in check_nodes:
2066
      CheckNodeOnline(self.lu, node_uuid)
2067

    
2068
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2069
                                                          self.other_node_uuid,
2070
                                                          self.target_node_uuid]
2071
                              if node_uuid is not None)
2072

    
2073
    # Release unneeded node and node resource locks
2074
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2075
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2076
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2077

    
2078
    # Release any owned node group
2079
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2080

    
2081
    # Check whether disks are valid
2082
    for disk_idx in self.disks:
2083
      self.instance.FindDisk(disk_idx)
2084

    
2085
    # Get secondary node IP addresses
2086
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2087
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2088

    
2089
  def Exec(self, feedback_fn):
2090
    """Execute disk replacement.
2091

2092
    This dispatches the disk replacement to the appropriate handler.
2093

2094
    """
2095
    if __debug__:
2096
      # Verify owned locks before starting operation
2097
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2098
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2099
          ("Incorrect node locks, owning %s, expected %s" %
2100
           (owned_nodes, self.node_secondary_ip.keys()))
2101
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2102
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2103
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2104

    
2105
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2106
      assert list(owned_instances) == [self.instance_name], \
2107
          "Instance '%s' not locked" % self.instance_name
2108

    
2109
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2110
          "Should not own any node group lock at this point"
2111

    
2112
    if not self.disks:
2113
      feedback_fn("No disks need replacement for instance '%s'" %
2114
                  self.instance.name)
2115
      return
2116

    
2117
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2118
                (utils.CommaJoin(self.disks), self.instance.name))
2119
    feedback_fn("Current primary node: %s" %
2120
                self.cfg.GetNodeName(self.instance.primary_node))
2121
    feedback_fn("Current seconary node: %s" %
2122
                utils.CommaJoin(self.cfg.GetNodeNames(
2123
                                  self.instance.secondary_nodes)))
2124

    
2125
    activate_disks = not self.instance.disks_active
2126

    
2127
    # Activate the instance disks if we're replacing them on a down instance
2128
    if activate_disks:
2129
      StartInstanceDisks(self.lu, self.instance, True)
2130

    
2131
    try:
2132
      # Should we replace the secondary node?
2133
      if self.new_node_uuid is not None:
2134
        fn = self._ExecDrbd8Secondary
2135
      else:
2136
        fn = self._ExecDrbd8DiskOnly
2137

    
2138
      result = fn(feedback_fn)
2139
    finally:
2140
      # Deactivate the instance disks if we're replacing them on a
2141
      # down instance
2142
      if activate_disks:
2143
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2144

    
2145
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2146

    
2147
    if __debug__:
2148
      # Verify owned locks
2149
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2150
      nodes = frozenset(self.node_secondary_ip)
2151
      assert ((self.early_release and not owned_nodes) or
2152
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2153
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2154
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2155

    
2156
    return result
2157

    
2158
  def _CheckVolumeGroup(self, node_uuids):
2159
    self.lu.LogInfo("Checking volume groups")
2160

    
2161
    vgname = self.cfg.GetVGName()
2162

    
2163
    # Make sure volume group exists on all involved nodes
2164
    results = self.rpc.call_vg_list(node_uuids)
2165
    if not results:
2166
      raise errors.OpExecError("Can't list volume groups on the nodes")
2167

    
2168
    for node_uuid in node_uuids:
2169
      res = results[node_uuid]
2170
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2171
      if vgname not in res.payload:
2172
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2173
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2174

    
2175
  def _CheckDisksExistence(self, node_uuids):
2176
    # Check disk existence
2177
    for idx, dev in enumerate(self.instance.disks):
2178
      if idx not in self.disks:
2179
        continue
2180

    
2181
      for node_uuid in node_uuids:
2182
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2183
                        self.cfg.GetNodeName(node_uuid))
2184

    
2185
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2186

    
2187
        msg = result.fail_msg
2188
        if msg or not result.payload:
2189
          if not msg:
2190
            msg = "disk not found"
2191
          if not self._CheckDisksActivated(self.instance):
2192
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2193
                          " running activate-disks on the instance before"
2194
                          " using replace-disks.")
2195
          else:
2196
            extra_hint = ""
2197
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2198
                                   (idx, self.cfg.GetNodeName(node_uuid), msg,
2199
                                    extra_hint))
2200

    
2201
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2202
    for idx, dev in enumerate(self.instance.disks):
2203
      if idx not in self.disks:
2204
        continue
2205

    
2206
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2207
                      (idx, self.cfg.GetNodeName(node_uuid)))
2208

    
2209
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2210
                                  on_primary, ldisk=ldisk):
2211
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2212
                                 " replace disks for instance %s" %
2213
                                 (self.cfg.GetNodeName(node_uuid),
2214
                                  self.instance.name))
2215

    
2216
  def _CreateNewStorage(self, node_uuid):
2217
    """Create new storage on the primary or secondary node.
2218

2219
    This is only used for same-node replaces, not for changing the
2220
    secondary node, hence we don't want to modify the existing disk.
2221

2222
    """
2223
    iv_names = {}
2224

    
2225
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2226
    for idx, dev in enumerate(disks):
2227
      if idx not in self.disks:
2228
        continue
2229

    
2230
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2231
                      self.cfg.GetNodeName(node_uuid), idx)
2232

    
2233
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2234
      names = _GenerateUniqueNames(self.lu, lv_names)
2235

    
2236
      (data_disk, meta_disk) = dev.children
2237
      vg_data = data_disk.logical_id[0]
2238
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2239
                             logical_id=(vg_data, names[0]),
2240
                             params=data_disk.params)
2241
      vg_meta = meta_disk.logical_id[0]
2242
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2243
                             size=constants.DRBD_META_SIZE,
2244
                             logical_id=(vg_meta, names[1]),
2245
                             params=meta_disk.params)
2246

    
2247
      new_lvs = [lv_data, lv_meta]
2248
      old_lvs = [child.Copy() for child in dev.children]
2249
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2250
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2251

    
2252
      # we pass force_create=True to force the LVM creation
2253
      for new_lv in new_lvs:
2254
        try:
2255
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2256
                               GetInstanceInfoText(self.instance), False,
2257
                               excl_stor)
2258
        except errors.DeviceCreationError, e:
2259
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2260

    
2261
    return iv_names
2262

    
2263
  def _CheckDevices(self, node_uuid, iv_names):
2264
    for name, (dev, _, _) in iv_names.iteritems():
2265
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2266

    
2267
      msg = result.fail_msg
2268
      if msg or not result.payload:
2269
        if not msg:
2270
          msg = "disk not found"
2271
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2272
                                 (name, msg))
2273

    
2274
      if result.payload.is_degraded:
2275
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2276

    
2277
  def _RemoveOldStorage(self, node_uuid, iv_names):
2278
    for name, (_, old_lvs, _) in iv_names.iteritems():
2279
      self.lu.LogInfo("Remove logical volumes for %s", name)
2280

    
2281
      for lv in old_lvs:
2282
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2283
                .fail_msg
2284
        if msg:
2285
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2286
                             hint="remove unused LVs manually")
2287

    
2288
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2289
    """Replace a disk on the primary or secondary for DRBD 8.
2290

2291
    The algorithm for replace is quite complicated:
2292

2293
      1. for each disk to be replaced:
2294

2295
        1. create new LVs on the target node with unique names
2296
        1. detach old LVs from the drbd device
2297
        1. rename old LVs to name_replaced.<time_t>
2298
        1. rename new LVs to old LVs
2299
        1. attach the new LVs (with the old names now) to the drbd device
2300

2301
      1. wait for sync across all devices
2302

2303
      1. for each modified disk:
2304

2305
        1. remove old LVs (which have the name name_replaces.<time_t>)
2306

2307
    Failures are not very well handled.
2308

2309
    """
2310
    steps_total = 6
2311

    
2312
    # Step: check device activation
2313
    self.lu.LogStep(1, steps_total, "Check device existence")
2314
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2315
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2316

    
2317
    # Step: check other node consistency
2318
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2319
    self._CheckDisksConsistency(
2320
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2321
      False)
2322

    
2323
    # Step: create new storage
2324
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2325
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2326

    
2327
    # Step: for each lv, detach+rename*2+attach
2328
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2329
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2330
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2331

    
2332
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2333
                                                     (dev, self.instance),
2334
                                                     (old_lvs, self.instance))
2335
      result.Raise("Can't detach drbd from local storage on node"
2336
                   " %s for device %s" %
2337
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2338
      #dev.children = []
2339
      #cfg.Update(instance)
2340

    
2341
      # ok, we created the new LVs, so now we know we have the needed
2342
      # storage; as such, we proceed on the target node to rename
2343
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2344
      # using the assumption that logical_id == unique_id on that node
2345

    
2346
      # FIXME(iustin): use a better name for the replaced LVs
2347
      temp_suffix = int(time.time())
2348
      ren_fn = lambda d, suff: (d.logical_id[0],
2349
                                d.logical_id[1] + "_replaced-%s" % suff)
2350

    
2351
      # Build the rename list based on what LVs exist on the node
2352
      rename_old_to_new = []
2353
      for to_ren in old_lvs:
2354
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
2355
                                             (to_ren, self.instance))
2356
        if not result.fail_msg and result.payload:
2357
          # device exists
2358
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2359

    
2360
      self.lu.LogInfo("Renaming the old LVs on the target node")
2361
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2362
                                             rename_old_to_new)
2363
      result.Raise("Can't rename old LVs on node %s" %
2364
                   self.cfg.GetNodeName(self.target_node_uuid))
2365

    
2366
      # Now we rename the new LVs to the old LVs
2367
      self.lu.LogInfo("Renaming the new LVs on the target node")
2368
      rename_new_to_old = [(new, old.logical_id)
2369
                           for old, new in zip(old_lvs, new_lvs)]
2370
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2371
                                             rename_new_to_old)
2372
      result.Raise("Can't rename new LVs on node %s" %
2373
                   self.cfg.GetNodeName(self.target_node_uuid))
2374

    
2375
      # Intermediate steps of in memory modifications
2376
      for old, new in zip(old_lvs, new_lvs):
2377
        new.logical_id = old.logical_id
2378

    
2379
      # We need to modify old_lvs so that removal later removes the
2380
      # right LVs, not the newly added ones; note that old_lvs is a
2381
      # copy here
2382
      for disk in old_lvs:
2383
        disk.logical_id = ren_fn(disk, temp_suffix)
2384

    
2385
      # Now that the new lvs have the old name, we can add them to the device
2386
      self.lu.LogInfo("Adding new mirror component on %s",
2387
                      self.cfg.GetNodeName(self.target_node_uuid))
2388
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2389
                                                  (dev, self.instance),
2390
                                                  (new_lvs, self.instance))
2391
      msg = result.fail_msg
2392
      if msg:
2393
        for new_lv in new_lvs:
2394
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2395
                                               (new_lv, self.instance)).fail_msg
2396
          if msg2:
2397
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2398
                               hint=("cleanup manually the unused logical"
2399
                                     "volumes"))
2400
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2401

    
2402
    cstep = itertools.count(5)
2403

    
2404
    if self.early_release:
2405
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2406
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2407
      # TODO: Check if releasing locks early still makes sense
2408
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2409
    else:
2410
      # Release all resource locks except those used by the instance
2411
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2412
                   keep=self.node_secondary_ip.keys())
2413

    
2414
    # Release all node locks while waiting for sync
2415
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2416

    
2417
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2418
    # shutdown in the caller into consideration.
2419

    
2420
    # Wait for sync
2421
    # This can fail as the old devices are degraded and _WaitForSync
2422
    # does a combined result over all disks, so we don't check its return value
2423
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2424
    WaitForSync(self.lu, self.instance)
2425

    
2426
    # Check all devices manually
2427
    self._CheckDevices(self.instance.primary_node, iv_names)
2428

    
2429
    # Step: remove old storage
2430
    if not self.early_release:
2431
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2432
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2433

    
2434
  def _ExecDrbd8Secondary(self, feedback_fn):
2435
    """Replace the secondary node for DRBD 8.
2436

2437
    The algorithm for replace is quite complicated:
2438
      - for all disks of the instance:
2439
        - create new LVs on the new node with same names
2440
        - shutdown the drbd device on the old secondary
2441
        - disconnect the drbd network on the primary
2442
        - create the drbd device on the new secondary
2443
        - network attach the drbd on the primary, using an artifice:
2444
          the drbd code for Attach() will connect to the network if it
2445
          finds a device which is connected to the good local disks but
2446
          not network enabled
2447
      - wait for sync across all devices
2448
      - remove all disks from the old secondary
2449

2450
    Failures are not very well handled.
2451

2452
    """
2453
    steps_total = 6
2454

    
2455
    pnode = self.instance.primary_node
2456

    
2457
    # Step: check device activation
2458
    self.lu.LogStep(1, steps_total, "Check device existence")
2459
    self._CheckDisksExistence([self.instance.primary_node])
2460
    self._CheckVolumeGroup([self.instance.primary_node])
2461

    
2462
    # Step: check other node consistency
2463
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2464
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2465

    
2466
    # Step: create new storage
2467
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2468
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2469
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2470
                                                  self.new_node_uuid)
2471
    for idx, dev in enumerate(disks):
2472
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2473
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2474
      # we pass force_create=True to force LVM creation
2475
      for new_lv in dev.children:
2476
        try:
2477
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2478
                               new_lv, True, GetInstanceInfoText(self.instance),
2479
                               False, excl_stor)
2480
        except errors.DeviceCreationError, e:
2481
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2482

    
2483
    # Step 4: dbrd minors and drbd setups changes
2484
    # after this, we must manually remove the drbd minors on both the
2485
    # error and the success paths
2486
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2487
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2488
                                         for _ in self.instance.disks],
2489
                                        self.instance.uuid)
2490
    logging.debug("Allocated minors %r", minors)
2491

    
2492
    iv_names = {}
2493
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2494
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2495
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2496
      # create new devices on new_node; note that we create two IDs:
2497
      # one without port, so the drbd will be activated without
2498
      # networking information on the new node at this stage, and one
2499
      # with network, for the latter activation in step 4
2500
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2501
      if self.instance.primary_node == o_node1:
2502
        p_minor = o_minor1
2503
      else:
2504
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2505
        p_minor = o_minor2
2506

    
2507
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2508
                      p_minor, new_minor, o_secret)
2509
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2510
                    p_minor, new_minor, o_secret)
2511

    
2512
      iv_names[idx] = (dev, dev.children, new_net_id)
2513
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2514
                    new_net_id)
2515
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2516
                              logical_id=new_alone_id,
2517
                              children=dev.children,
2518
                              size=dev.size,
2519
                              params={})
2520
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2521
                                            self.cfg)
2522
      try:
2523
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2524
                             anno_new_drbd,
2525
                             GetInstanceInfoText(self.instance), False,
2526
                             excl_stor)
2527
      except errors.GenericError:
2528
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2529
        raise
2530

    
2531
    # We have new devices, shutdown the drbd on the old secondary
2532
    for idx, dev in enumerate(self.instance.disks):
2533
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2534
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2535
                                            (dev, self.instance)).fail_msg
2536
      if msg:
2537
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2538
                           "node: %s" % (idx, msg),
2539
                           hint=("Please cleanup this device manually as"
2540
                                 " soon as possible"))
2541

    
2542
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2543
    result = self.rpc.call_drbd_disconnect_net(
2544
               [pnode], (self.instance.disks, self.instance))[pnode]
2545

    
2546
    msg = result.fail_msg
2547
    if msg:
2548
      # detaches didn't succeed (unlikely)
2549
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2550
      raise errors.OpExecError("Can't detach the disks from the network on"
2551
                               " old node: %s" % (msg,))
2552

    
2553
    # if we managed to detach at least one, we update all the disks of
2554
    # the instance to point to the new secondary
2555
    self.lu.LogInfo("Updating instance configuration")
2556
    for dev, _, new_logical_id in iv_names.itervalues():
2557
      dev.logical_id = new_logical_id
2558

    
2559
    self.cfg.Update(self.instance, feedback_fn)
2560

    
2561
    # Release all node locks (the configuration has been updated)
2562
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2563

    
2564
    # and now perform the drbd attach
2565
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2566
                    " (standalone => connected)")
2567
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2568
                                            self.new_node_uuid],
2569
                                           (self.instance.disks, self.instance),
2570
                                           self.instance.name,
2571
                                           False)
2572
    for to_node, to_result in result.items():
2573
      msg = to_result.fail_msg
2574
      if msg:
2575
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2576
                           self.cfg.GetNodeName(to_node), msg,
2577
                           hint=("please do a gnt-instance info to see the"
2578
                                 " status of disks"))
2579

    
2580
    cstep = itertools.count(5)
2581

    
2582
    if self.early_release:
2583
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2584
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2585
      # TODO: Check if releasing locks early still makes sense
2586
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2587
    else:
2588
      # Release all resource locks except those used by the instance
2589
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2590
                   keep=self.node_secondary_ip.keys())
2591

    
2592
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2593
    # shutdown in the caller into consideration.
2594

    
2595
    # Wait for sync
2596
    # This can fail as the old devices are degraded and _WaitForSync
2597
    # does a combined result over all disks, so we don't check its return value
2598
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2599
    WaitForSync(self.lu, self.instance)
2600

    
2601
    # Check all devices manually
2602
    self._CheckDevices(self.instance.primary_node, iv_names)
2603

    
2604
    # Step: remove old storage
2605
    if not self.early_release:
2606
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2607
      self._RemoveOldStorage(self.target_node_uuid, iv_names)