Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 030ab01a

History | View | Annotate | Download (94.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    result.Warn("Failed to remove newly-created disk %s on node %s" %
210
                (disk, node), logging.warning)
211

    
212

    
213
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
214
  """Create all disks for an instance.
215

216
  This abstracts away some work from AddInstance.
217

218
  @type lu: L{LogicalUnit}
219
  @param lu: the logical unit on whose behalf we execute
220
  @type instance: L{objects.Instance}
221
  @param instance: the instance whose disks we should create
222
  @type to_skip: list
223
  @param to_skip: list of indices to skip
224
  @type target_node: string
225
  @param target_node: if passed, overrides the target node for creation
226
  @type disks: list of {objects.Disk}
227
  @param disks: the disks to create; if not specified, all the disks of the
228
      instance are created
229
  @return: information about the created disks, to be used to call
230
      L{_UndoCreateDisks}
231
  @raise errors.OpPrereqError: in case of error
232

233
  """
234
  info = GetInstanceInfoText(instance)
235
  if target_node is None:
236
    pnode = instance.primary_node
237
    all_nodes = instance.all_nodes
238
  else:
239
    pnode = target_node
240
    all_nodes = [pnode]
241

    
242
  if disks is None:
243
    disks = instance.disks
244

    
245
  if instance.disk_template in constants.DTS_FILEBASED:
246
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
247
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
248

    
249
    result.Raise("Failed to create directory '%s' on"
250
                 " node %s" % (file_storage_dir, pnode))
251

    
252
  disks_created = []
253
  for idx, device in enumerate(disks):
254
    if to_skip and idx in to_skip:
255
      continue
256
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
257
    for node in all_nodes:
258
      f_create = node == pnode
259
      try:
260
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
261
        disks_created.append((node, device))
262
      except errors.DeviceCreationError, e:
263
        logging.warning("Creating disk %s for instance '%s' failed",
264
                        idx, instance.name)
265
        disks_created.extend(e.created_devices)
266
        _UndoCreateDisks(lu, disks_created)
267
        raise errors.OpExecError(e.message)
268
  return disks_created
269

    
270

    
271
def ComputeDiskSizePerVG(disk_template, disks):
272
  """Compute disk size requirements in the volume group
273

274
  """
275
  def _compute(disks, payload):
276
    """Universal algorithm.
277

278
    """
279
    vgs = {}
280
    for disk in disks:
281
      vgs[disk[constants.IDISK_VG]] = \
282
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
283

    
284
    return vgs
285

    
286
  # Required free disk space as a function of disk and swap space
287
  req_size_dict = {
288
    constants.DT_DISKLESS: {},
289
    constants.DT_PLAIN: _compute(disks, 0),
290
    # 128 MB are added for drbd metadata for each disk
291
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
292
    constants.DT_FILE: {},
293
    constants.DT_SHARED_FILE: {},
294
    }
295

    
296
  if disk_template not in req_size_dict:
297
    raise errors.ProgrammerError("Disk template '%s' size requirement"
298
                                 " is unknown" % disk_template)
299

    
300
  return req_size_dict[disk_template]
301

    
302

    
303
def ComputeDisks(op, default_vg):
304
  """Computes the instance disks.
305

306
  @param op: The instance opcode
307
  @param default_vg: The default_vg to assume
308

309
  @return: The computed disks
310

311
  """
312
  disks = []
313
  for disk in op.disks:
314
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
315
    if mode not in constants.DISK_ACCESS_SET:
316
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
317
                                 mode, errors.ECODE_INVAL)
318
    size = disk.get(constants.IDISK_SIZE, None)
319
    if size is None:
320
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
321
    try:
322
      size = int(size)
323
    except (TypeError, ValueError):
324
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
325
                                 errors.ECODE_INVAL)
326

    
327
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
328
    if ext_provider and op.disk_template != constants.DT_EXT:
329
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
330
                                 " disk template, not %s" %
331
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
332
                                  op.disk_template), errors.ECODE_INVAL)
333

    
334
    data_vg = disk.get(constants.IDISK_VG, default_vg)
335
    name = disk.get(constants.IDISK_NAME, None)
336
    if name is not None and name.lower() == constants.VALUE_NONE:
337
      name = None
338
    new_disk = {
339
      constants.IDISK_SIZE: size,
340
      constants.IDISK_MODE: mode,
341
      constants.IDISK_VG: data_vg,
342
      constants.IDISK_NAME: name,
343
      }
344

    
345
    for key in [
346
      constants.IDISK_METAVG,
347
      constants.IDISK_ADOPT,
348
      constants.IDISK_SPINDLES,
349
      ]:
350
      if key in disk:
351
        new_disk[key] = disk[key]
352

    
353
    # For extstorage, demand the `provider' option and add any
354
    # additional parameters (ext-params) to the dict
355
    if op.disk_template == constants.DT_EXT:
356
      if ext_provider:
357
        new_disk[constants.IDISK_PROVIDER] = ext_provider
358
        for key in disk:
359
          if key not in constants.IDISK_PARAMS:
360
            new_disk[key] = disk[key]
361
      else:
362
        raise errors.OpPrereqError("Missing provider for template '%s'" %
363
                                   constants.DT_EXT, errors.ECODE_INVAL)
364

    
365
    disks.append(new_disk)
366

    
367
  return disks
368

    
369

    
370
def CheckRADOSFreeSpace():
371
  """Compute disk size requirements inside the RADOS cluster.
372

373
  """
374
  # For the RADOS cluster we assume there is always enough space.
375
  pass
376

    
377

    
378
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
379
                         iv_name, p_minor, s_minor):
380
  """Generate a drbd8 device complete with its children.
381

382
  """
383
  assert len(vgnames) == len(names) == 2
384
  port = lu.cfg.AllocatePort()
385
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
386

    
387
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
388
                          logical_id=(vgnames[0], names[0]),
389
                          params={})
390
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
391
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
392
                          size=constants.DRBD_META_SIZE,
393
                          logical_id=(vgnames[1], names[1]),
394
                          params={})
395
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
396
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
397
                          logical_id=(primary, secondary, port,
398
                                      p_minor, s_minor,
399
                                      shared_secret),
400
                          children=[dev_data, dev_meta],
401
                          iv_name=iv_name, params={})
402
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
403
  return drbd_dev
404

    
405

    
406
def GenerateDiskTemplate(
407
  lu, template_name, instance_name, primary_node, secondary_nodes,
408
  disk_info, file_storage_dir, file_driver, base_index,
409
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
410
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
411
  """Generate the entire disk layout for a given template type.
412

413
  """
414
  vgname = lu.cfg.GetVGName()
415
  disk_count = len(disk_info)
416
  disks = []
417

    
418
  if template_name == constants.DT_DISKLESS:
419
    pass
420
  elif template_name == constants.DT_DRBD8:
421
    if len(secondary_nodes) != 1:
422
      raise errors.ProgrammerError("Wrong template configuration")
423
    remote_node = secondary_nodes[0]
424
    minors = lu.cfg.AllocateDRBDMinor(
425
      [primary_node, remote_node] * len(disk_info), instance_name)
426

    
427
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
428
                                                       full_disk_params)
429
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
430

    
431
    names = []
432
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
433
                                               for i in range(disk_count)]):
434
      names.append(lv_prefix + "_data")
435
      names.append(lv_prefix + "_meta")
436
    for idx, disk in enumerate(disk_info):
437
      disk_index = idx + base_index
438
      data_vg = disk.get(constants.IDISK_VG, vgname)
439
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
440
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
441
                                      disk[constants.IDISK_SIZE],
442
                                      [data_vg, meta_vg],
443
                                      names[idx * 2:idx * 2 + 2],
444
                                      "disk/%d" % disk_index,
445
                                      minors[idx * 2], minors[idx * 2 + 1])
446
      disk_dev.mode = disk[constants.IDISK_MODE]
447
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
448
      disks.append(disk_dev)
449
  else:
450
    if secondary_nodes:
451
      raise errors.ProgrammerError("Wrong template configuration")
452

    
453
    if template_name == constants.DT_FILE:
454
      _req_file_storage()
455
    elif template_name == constants.DT_SHARED_FILE:
456
      _req_shr_file_storage()
457

    
458
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
459
    if name_prefix is None:
460
      names = None
461
    else:
462
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
463
                                        (name_prefix, base_index + i)
464
                                        for i in range(disk_count)])
465

    
466
    if template_name == constants.DT_PLAIN:
467

    
468
      def logical_id_fn(idx, _, disk):
469
        vg = disk.get(constants.IDISK_VG, vgname)
470
        return (vg, names[idx])
471

    
472
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
473
      logical_id_fn = \
474
        lambda _, disk_index, disk: (file_driver,
475
                                     "%s/disk%d" % (file_storage_dir,
476
                                                    disk_index))
477
    elif template_name == constants.DT_BLOCK:
478
      logical_id_fn = \
479
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
480
                                       disk[constants.IDISK_ADOPT])
481
    elif template_name == constants.DT_RBD:
482
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
483
    elif template_name == constants.DT_EXT:
484
      def logical_id_fn(idx, _, disk):
485
        provider = disk.get(constants.IDISK_PROVIDER, None)
486
        if provider is None:
487
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
488
                                       " not found", constants.DT_EXT,
489
                                       constants.IDISK_PROVIDER)
490
        return (provider, names[idx])
491
    else:
492
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
493

    
494
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
495

    
496
    for idx, disk in enumerate(disk_info):
497
      params = {}
498
      # Only for the Ext template add disk_info to params
499
      if template_name == constants.DT_EXT:
500
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
501
        for key in disk:
502
          if key not in constants.IDISK_PARAMS:
503
            params[key] = disk[key]
504
      disk_index = idx + base_index
505
      size = disk[constants.IDISK_SIZE]
506
      feedback_fn("* disk %s, size %s" %
507
                  (disk_index, utils.FormatUnit(size, "h")))
508
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
509
                              logical_id=logical_id_fn(idx, disk_index, disk),
510
                              iv_name="disk/%d" % disk_index,
511
                              mode=disk[constants.IDISK_MODE],
512
                              params=params,
513
                              spindles=disk.get(constants.IDISK_SPINDLES))
514
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
515
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
516
      disks.append(disk_dev)
517

    
518
  return disks
519

    
520

    
521
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
522
  """Check the presence of the spindle options with exclusive_storage.
523

524
  @type diskdict: dict
525
  @param diskdict: disk parameters
526
  @type es_flag: bool
527
  @param es_flag: the effective value of the exlusive_storage flag
528
  @type required: bool
529
  @param required: whether spindles are required or just optional
530
  @raise errors.OpPrereqError when spindles are given and they should not
531

532
  """
533
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
534
      diskdict[constants.IDISK_SPINDLES] is not None):
535
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
536
                               " when exclusive storage is not active",
537
                               errors.ECODE_INVAL)
538
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
539
                                diskdict[constants.IDISK_SPINDLES] is None)):
540
    raise errors.OpPrereqError("You must specify spindles in instance disks"
541
                               " when exclusive storage is active",
542
                               errors.ECODE_INVAL)
543

    
544

    
545
class LUInstanceRecreateDisks(LogicalUnit):
546
  """Recreate an instance's missing disks.
547

548
  """
549
  HPATH = "instance-recreate-disks"
550
  HTYPE = constants.HTYPE_INSTANCE
551
  REQ_BGL = False
552

    
553
  _MODIFYABLE = compat.UniqueFrozenset([
554
    constants.IDISK_SIZE,
555
    constants.IDISK_MODE,
556
    constants.IDISK_SPINDLES,
557
    ])
558

    
559
  # New or changed disk parameters may have different semantics
560
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
561
    constants.IDISK_ADOPT,
562

    
563
    # TODO: Implement support changing VG while recreating
564
    constants.IDISK_VG,
565
    constants.IDISK_METAVG,
566
    constants.IDISK_PROVIDER,
567
    constants.IDISK_NAME,
568
    ]))
569

    
570
  def _RunAllocator(self):
571
    """Run the allocator based on input opcode.
572

573
    """
574
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
575

    
576
    # FIXME
577
    # The allocator should actually run in "relocate" mode, but current
578
    # allocators don't support relocating all the nodes of an instance at
579
    # the same time. As a workaround we use "allocate" mode, but this is
580
    # suboptimal for two reasons:
581
    # - The instance name passed to the allocator is present in the list of
582
    #   existing instances, so there could be a conflict within the
583
    #   internal structures of the allocator. This doesn't happen with the
584
    #   current allocators, but it's a liability.
585
    # - The allocator counts the resources used by the instance twice: once
586
    #   because the instance exists already, and once because it tries to
587
    #   allocate a new instance.
588
    # The allocator could choose some of the nodes on which the instance is
589
    # running, but that's not a problem. If the instance nodes are broken,
590
    # they should be already be marked as drained or offline, and hence
591
    # skipped by the allocator. If instance disks have been lost for other
592
    # reasons, then recreating the disks on the same nodes should be fine.
593
    disk_template = self.instance.disk_template
594
    spindle_use = be_full[constants.BE_SPINDLE_USE]
595
    disks = [{
596
      constants.IDISK_SIZE: d.size,
597
      constants.IDISK_MODE: d.mode,
598
      constants.IDISK_SPINDLES: d.spindles,
599
      } for d in self.instance.disks]
600
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
601
                                        disk_template=disk_template,
602
                                        tags=list(self.instance.GetTags()),
603
                                        os=self.instance.os,
604
                                        nics=[{}],
605
                                        vcpus=be_full[constants.BE_VCPUS],
606
                                        memory=be_full[constants.BE_MAXMEM],
607
                                        spindle_use=spindle_use,
608
                                        disks=disks,
609
                                        hypervisor=self.instance.hypervisor,
610
                                        node_whitelist=None)
611
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
612

    
613
    ial.Run(self.op.iallocator)
614

    
615
    assert req.RequiredNodes() == len(self.instance.all_nodes)
616

    
617
    if not ial.success:
618
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
619
                                 " %s" % (self.op.iallocator, ial.info),
620
                                 errors.ECODE_NORES)
621

    
622
    self.op.nodes = ial.result
623
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
624
                 self.op.instance_name, self.op.iallocator,
625
                 utils.CommaJoin(ial.result))
626

    
627
  def CheckArguments(self):
628
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
629
      # Normalize and convert deprecated list of disk indices
630
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
631

    
632
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
633
    if duplicates:
634
      raise errors.OpPrereqError("Some disks have been specified more than"
635
                                 " once: %s" % utils.CommaJoin(duplicates),
636
                                 errors.ECODE_INVAL)
637

    
638
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
639
    # when neither iallocator nor nodes are specified
640
    if self.op.iallocator or self.op.nodes:
641
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
642

    
643
    for (idx, params) in self.op.disks:
644
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
645
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
646
      if unsupported:
647
        raise errors.OpPrereqError("Parameters for disk %s try to change"
648
                                   " unmodifyable parameter(s): %s" %
649
                                   (idx, utils.CommaJoin(unsupported)),
650
                                   errors.ECODE_INVAL)
651

    
652
  def ExpandNames(self):
653
    self._ExpandAndLockInstance()
654
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
655

    
656
    if self.op.nodes:
657
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
658
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
659
    else:
660
      self.needed_locks[locking.LEVEL_NODE] = []
661
      if self.op.iallocator:
662
        # iallocator will select a new node in the same group
663
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
664
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
665

    
666
    self.needed_locks[locking.LEVEL_NODE_RES] = []
667

    
668
  def DeclareLocks(self, level):
669
    if level == locking.LEVEL_NODEGROUP:
670
      assert self.op.iallocator is not None
671
      assert not self.op.nodes
672
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
673
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
674
      # Lock the primary group used by the instance optimistically; this
675
      # requires going via the node before it's locked, requiring
676
      # verification later on
677
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
678
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
679

    
680
    elif level == locking.LEVEL_NODE:
681
      # If an allocator is used, then we lock all the nodes in the current
682
      # instance group, as we don't know yet which ones will be selected;
683
      # if we replace the nodes without using an allocator, locks are
684
      # already declared in ExpandNames; otherwise, we need to lock all the
685
      # instance nodes for disk re-creation
686
      if self.op.iallocator:
687
        assert not self.op.nodes
688
        assert not self.needed_locks[locking.LEVEL_NODE]
689
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
690

    
691
        # Lock member nodes of the group of the primary node
692
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
693
          self.needed_locks[locking.LEVEL_NODE].extend(
694
            self.cfg.GetNodeGroup(group_uuid).members)
695

    
696
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
697
      elif not self.op.nodes:
698
        self._LockInstancesNodes(primary_only=False)
699
    elif level == locking.LEVEL_NODE_RES:
700
      # Copy node locks
701
      self.needed_locks[locking.LEVEL_NODE_RES] = \
702
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
703

    
704
  def BuildHooksEnv(self):
705
    """Build hooks env.
706

707
    This runs on master, primary and secondary nodes of the instance.
708

709
    """
710
    return BuildInstanceHookEnvByObject(self, self.instance)
711

    
712
  def BuildHooksNodes(self):
713
    """Build hooks nodes.
714

715
    """
716
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
717
    return (nl, nl)
718

    
719
  def CheckPrereq(self):
720
    """Check prerequisites.
721

722
    This checks that the instance is in the cluster and is not running.
723

724
    """
725
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
726
    assert instance is not None, \
727
      "Cannot retrieve locked instance %s" % self.op.instance_name
728
    if self.op.nodes:
729
      if len(self.op.nodes) != len(instance.all_nodes):
730
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
731
                                   " %d replacement nodes were specified" %
732
                                   (instance.name, len(instance.all_nodes),
733
                                    len(self.op.nodes)),
734
                                   errors.ECODE_INVAL)
735
      assert instance.disk_template != constants.DT_DRBD8 or \
736
             len(self.op.nodes) == 2
737
      assert instance.disk_template != constants.DT_PLAIN or \
738
             len(self.op.nodes) == 1
739
      primary_node = self.op.nodes[0]
740
    else:
741
      primary_node = instance.primary_node
742
    if not self.op.iallocator:
743
      CheckNodeOnline(self, primary_node)
744

    
745
    if instance.disk_template == constants.DT_DISKLESS:
746
      raise errors.OpPrereqError("Instance '%s' has no disks" %
747
                                 self.op.instance_name, errors.ECODE_INVAL)
748

    
749
    # Verify if node group locks are still correct
750
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
751
    if owned_groups:
752
      # Node group locks are acquired only for the primary node (and only
753
      # when the allocator is used)
754
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
755
                              primary_only=True)
756

    
757
    # if we replace nodes *and* the old primary is offline, we don't
758
    # check the instance state
759
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
760
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
761
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
762
                         msg="cannot recreate disks")
763

    
764
    if self.op.disks:
765
      self.disks = dict(self.op.disks)
766
    else:
767
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
768

    
769
    maxidx = max(self.disks.keys())
770
    if maxidx >= len(instance.disks):
771
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
772
                                 errors.ECODE_INVAL)
773

    
774
    if ((self.op.nodes or self.op.iallocator) and
775
         sorted(self.disks.keys()) != range(len(instance.disks))):
776
      raise errors.OpPrereqError("Can't recreate disks partially and"
777
                                 " change the nodes at the same time",
778
                                 errors.ECODE_INVAL)
779

    
780
    self.instance = instance
781

    
782
    if self.op.iallocator:
783
      self._RunAllocator()
784
      # Release unneeded node and node resource locks
785
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
786
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
787
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
788

    
789
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
790

    
791
    if self.op.nodes:
792
      nodes = self.op.nodes
793
    else:
794
      nodes = instance.all_nodes
795
    excl_stor = compat.any(
796
      rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
797
      )
798
    for new_params in self.disks.values():
799
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
800

    
801
  def Exec(self, feedback_fn):
802
    """Recreate the disks.
803

804
    """
805
    instance = self.instance
806

    
807
    assert (self.owned_locks(locking.LEVEL_NODE) ==
808
            self.owned_locks(locking.LEVEL_NODE_RES))
809

    
810
    to_skip = []
811
    mods = [] # keeps track of needed changes
812

    
813
    for idx, disk in enumerate(instance.disks):
814
      try:
815
        changes = self.disks[idx]
816
      except KeyError:
817
        # Disk should not be recreated
818
        to_skip.append(idx)
819
        continue
820

    
821
      # update secondaries for disks, if needed
822
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
823
        # need to update the nodes and minors
824
        assert len(self.op.nodes) == 2
825
        assert len(disk.logical_id) == 6 # otherwise disk internals
826
                                         # have changed
827
        (_, _, old_port, _, _, old_secret) = disk.logical_id
828
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
829
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
830
                  new_minors[0], new_minors[1], old_secret)
831
        assert len(disk.logical_id) == len(new_id)
832
      else:
833
        new_id = None
834

    
835
      mods.append((idx, new_id, changes))
836

    
837
    # now that we have passed all asserts above, we can apply the mods
838
    # in a single run (to avoid partial changes)
839
    for idx, new_id, changes in mods:
840
      disk = instance.disks[idx]
841
      if new_id is not None:
842
        assert disk.dev_type == constants.LD_DRBD8
843
        disk.logical_id = new_id
844
      if changes:
845
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
846
                    mode=changes.get(constants.IDISK_MODE, None),
847
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
848

    
849
    # change primary node, if needed
850
    if self.op.nodes:
851
      instance.primary_node = self.op.nodes[0]
852
      self.LogWarning("Changing the instance's nodes, you will have to"
853
                      " remove any disks left on the older nodes manually")
854

    
855
    if self.op.nodes:
856
      self.cfg.Update(instance, feedback_fn)
857

    
858
    # All touched nodes must be locked
859
    mylocks = self.owned_locks(locking.LEVEL_NODE)
860
    assert mylocks.issuperset(frozenset(instance.all_nodes))
861
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
862

    
863
    # TODO: Release node locks before wiping, or explain why it's not possible
864
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
865
      wipedisks = [(idx, disk, 0)
866
                   for (idx, disk) in enumerate(instance.disks)
867
                   if idx not in to_skip]
868
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
869

    
870

    
871
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
872
  """Checks if nodes have enough free disk space in the specified VG.
873

874
  This function checks if all given nodes have the needed amount of
875
  free disk. In case any node has less disk or we cannot get the
876
  information from the node, this function raises an OpPrereqError
877
  exception.
878

879
  @type lu: C{LogicalUnit}
880
  @param lu: a logical unit from which we get configuration data
881
  @type nodenames: C{list}
882
  @param nodenames: the list of node names to check
883
  @type vg: C{str}
884
  @param vg: the volume group to check
885
  @type requested: C{int}
886
  @param requested: the amount of disk in MiB to check for
887
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
888
      or we cannot check the node
889

890
  """
891
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
892
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
893
  # the current functionality. Refactor to make it more flexible.
894
  hvname = lu.cfg.GetHypervisorType()
895
  hvparams = lu.cfg.GetClusterInfo().hvparams
896
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)],
897
                                   [(hvname, hvparams[hvname])], es_flags)
898
  for node in nodenames:
899
    info = nodeinfo[node]
900
    info.Raise("Cannot get current information from node %s" % node,
901
               prereq=True, ecode=errors.ECODE_ENVIRON)
902
    (_, (vg_info, ), _) = info.payload
903
    vg_free = vg_info.get("vg_free", None)
904
    if not isinstance(vg_free, int):
905
      raise errors.OpPrereqError("Can't compute free disk space on node"
906
                                 " %s for vg %s, result was '%s'" %
907
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
908
    if requested > vg_free:
909
      raise errors.OpPrereqError("Not enough disk space on target node %s"
910
                                 " vg %s: required %d MiB, available %d MiB" %
911
                                 (node, vg, requested, vg_free),
912
                                 errors.ECODE_NORES)
913

    
914

    
915
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
916
  """Checks if nodes have enough free disk space in all the VGs.
917

918
  This function checks if all given nodes have the needed amount of
919
  free disk. In case any node has less disk or we cannot get the
920
  information from the node, this function raises an OpPrereqError
921
  exception.
922

923
  @type lu: C{LogicalUnit}
924
  @param lu: a logical unit from which we get configuration data
925
  @type nodenames: C{list}
926
  @param nodenames: the list of node names to check
927
  @type req_sizes: C{dict}
928
  @param req_sizes: the hash of vg and corresponding amount of disk in
929
      MiB to check for
930
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
931
      or we cannot check the node
932

933
  """
934
  for vg, req_size in req_sizes.items():
935
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
936

    
937

    
938
def _DiskSizeInBytesToMebibytes(lu, size):
939
  """Converts a disk size in bytes to mebibytes.
940

941
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
942

943
  """
944
  (mib, remainder) = divmod(size, 1024 * 1024)
945

    
946
  if remainder != 0:
947
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
948
                  " to not overwrite existing data (%s bytes will not be"
949
                  " wiped)", (1024 * 1024) - remainder)
950
    mib += 1
951

    
952
  return mib
953

    
954

    
955
def _CalcEta(time_taken, written, total_size):
956
  """Calculates the ETA based on size written and total size.
957

958
  @param time_taken: The time taken so far
959
  @param written: amount written so far
960
  @param total_size: The total size of data to be written
961
  @return: The remaining time in seconds
962

963
  """
964
  avg_time = time_taken / float(written)
965
  return (total_size - written) * avg_time
966

    
967

    
968
def WipeDisks(lu, instance, disks=None):
969
  """Wipes instance disks.
970

971
  @type lu: L{LogicalUnit}
972
  @param lu: the logical unit on whose behalf we execute
973
  @type instance: L{objects.Instance}
974
  @param instance: the instance whose disks we should create
975
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
976
  @param disks: Disk details; tuple contains disk index, disk object and the
977
    start offset
978

979
  """
980
  node = instance.primary_node
981

    
982
  if disks is None:
983
    disks = [(idx, disk, 0)
984
             for (idx, disk) in enumerate(instance.disks)]
985

    
986
  for (_, device, _) in disks:
987
    lu.cfg.SetDiskID(device, node)
988

    
989
  logging.info("Pausing synchronization of disks of instance '%s'",
990
               instance.name)
991
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
992
                                                  (map(compat.snd, disks),
993
                                                   instance),
994
                                                  True)
995
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
996

    
997
  for idx, success in enumerate(result.payload):
998
    if not success:
999
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1000
                   " failed", idx, instance.name)
1001

    
1002
  try:
1003
    for (idx, device, offset) in disks:
1004
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1005
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1006
      wipe_chunk_size = \
1007
        int(min(constants.MAX_WIPE_CHUNK,
1008
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1009

    
1010
      size = device.size
1011
      last_output = 0
1012
      start_time = time.time()
1013

    
1014
      if offset == 0:
1015
        info_text = ""
1016
      else:
1017
        info_text = (" (from %s to %s)" %
1018
                     (utils.FormatUnit(offset, "h"),
1019
                      utils.FormatUnit(size, "h")))
1020

    
1021
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1022

    
1023
      logging.info("Wiping disk %d for instance %s on node %s using"
1024
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1025

    
1026
      while offset < size:
1027
        wipe_size = min(wipe_chunk_size, size - offset)
1028

    
1029
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1030
                      idx, offset, wipe_size)
1031

    
1032
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1033
                                           wipe_size)
1034
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1035
                     (idx, offset, wipe_size))
1036

    
1037
        now = time.time()
1038
        offset += wipe_size
1039
        if now - last_output >= 60:
1040
          eta = _CalcEta(now - start_time, offset, size)
1041
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1042
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1043
          last_output = now
1044
  finally:
1045
    logging.info("Resuming synchronization of disks for instance '%s'",
1046
                 instance.name)
1047

    
1048
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1049
                                                    (map(compat.snd, disks),
1050
                                                     instance),
1051
                                                    False)
1052

    
1053
    if result.fail_msg:
1054
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1055
                    node, result.fail_msg)
1056
    else:
1057
      for idx, success in enumerate(result.payload):
1058
        if not success:
1059
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1060
                        " failed", idx, instance.name)
1061

    
1062

    
1063
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1064
  """Wrapper for L{WipeDisks} that handles errors.
1065

1066
  @type lu: L{LogicalUnit}
1067
  @param lu: the logical unit on whose behalf we execute
1068
  @type instance: L{objects.Instance}
1069
  @param instance: the instance whose disks we should wipe
1070
  @param disks: see L{WipeDisks}
1071
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1072
      case of error
1073
  @raise errors.OpPrereqError: in case of failure
1074

1075
  """
1076
  try:
1077
    WipeDisks(lu, instance, disks=disks)
1078
  except errors.OpExecError:
1079
    logging.warning("Wiping disks for instance '%s' failed",
1080
                    instance.name)
1081
    _UndoCreateDisks(lu, cleanup)
1082
    raise
1083

    
1084

    
1085
def ExpandCheckDisks(instance, disks):
1086
  """Return the instance disks selected by the disks list
1087

1088
  @type disks: list of L{objects.Disk} or None
1089
  @param disks: selected disks
1090
  @rtype: list of L{objects.Disk}
1091
  @return: selected instance disks to act on
1092

1093
  """
1094
  if disks is None:
1095
    return instance.disks
1096
  else:
1097
    if not set(disks).issubset(instance.disks):
1098
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1099
                                   " target instance: expected a subset of %r,"
1100
                                   " got %r" % (instance.disks, disks))
1101
    return disks
1102

    
1103

    
1104
def WaitForSync(lu, instance, disks=None, oneshot=False):
1105
  """Sleep and poll for an instance's disk to sync.
1106

1107
  """
1108
  if not instance.disks or disks is not None and not disks:
1109
    return True
1110

    
1111
  disks = ExpandCheckDisks(instance, disks)
1112

    
1113
  if not oneshot:
1114
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1115

    
1116
  node = instance.primary_node
1117

    
1118
  for dev in disks:
1119
    lu.cfg.SetDiskID(dev, node)
1120

    
1121
  # TODO: Convert to utils.Retry
1122

    
1123
  retries = 0
1124
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1125
  while True:
1126
    max_time = 0
1127
    done = True
1128
    cumul_degraded = False
1129
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1130
    msg = rstats.fail_msg
1131
    if msg:
1132
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1133
      retries += 1
1134
      if retries >= 10:
1135
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1136
                                 " aborting." % node)
1137
      time.sleep(6)
1138
      continue
1139
    rstats = rstats.payload
1140
    retries = 0
1141
    for i, mstat in enumerate(rstats):
1142
      if mstat is None:
1143
        lu.LogWarning("Can't compute data for node %s/%s",
1144
                      node, disks[i].iv_name)
1145
        continue
1146

    
1147
      cumul_degraded = (cumul_degraded or
1148
                        (mstat.is_degraded and mstat.sync_percent is None))
1149
      if mstat.sync_percent is not None:
1150
        done = False
1151
        if mstat.estimated_time is not None:
1152
          rem_time = ("%s remaining (estimated)" %
1153
                      utils.FormatSeconds(mstat.estimated_time))
1154
          max_time = mstat.estimated_time
1155
        else:
1156
          rem_time = "no time estimate"
1157
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1158
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1159

    
1160
    # if we're done but degraded, let's do a few small retries, to
1161
    # make sure we see a stable and not transient situation; therefore
1162
    # we force restart of the loop
1163
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1164
      logging.info("Degraded disks found, %d retries left", degr_retries)
1165
      degr_retries -= 1
1166
      time.sleep(1)
1167
      continue
1168

    
1169
    if done or oneshot:
1170
      break
1171

    
1172
    time.sleep(min(60, max_time))
1173

    
1174
  if done:
1175
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1176

    
1177
  return not cumul_degraded
1178

    
1179

    
1180
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1181
  """Shutdown block devices of an instance.
1182

1183
  This does the shutdown on all nodes of the instance.
1184

1185
  If the ignore_primary is false, errors on the primary node are
1186
  ignored.
1187

1188
  """
1189
  lu.cfg.MarkInstanceDisksInactive(instance.name)
1190
  all_result = True
1191
  disks = ExpandCheckDisks(instance, disks)
1192

    
1193
  for disk in disks:
1194
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1195
      lu.cfg.SetDiskID(top_disk, node)
1196
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1197
      msg = result.fail_msg
1198
      if msg:
1199
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1200
                      disk.iv_name, node, msg)
1201
        if ((node == instance.primary_node and not ignore_primary) or
1202
            (node != instance.primary_node and not result.offline)):
1203
          all_result = False
1204
  return all_result
1205

    
1206

    
1207
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1208
  """Shutdown block devices of an instance.
1209

1210
  This function checks if an instance is running, before calling
1211
  _ShutdownInstanceDisks.
1212

1213
  """
1214
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1215
  ShutdownInstanceDisks(lu, instance, disks=disks)
1216

    
1217

    
1218
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1219
                           ignore_size=False):
1220
  """Prepare the block devices for an instance.
1221

1222
  This sets up the block devices on all nodes.
1223

1224
  @type lu: L{LogicalUnit}
1225
  @param lu: the logical unit on whose behalf we execute
1226
  @type instance: L{objects.Instance}
1227
  @param instance: the instance for whose disks we assemble
1228
  @type disks: list of L{objects.Disk} or None
1229
  @param disks: which disks to assemble (or all, if None)
1230
  @type ignore_secondaries: boolean
1231
  @param ignore_secondaries: if true, errors on secondary nodes
1232
      won't result in an error return from the function
1233
  @type ignore_size: boolean
1234
  @param ignore_size: if true, the current known size of the disk
1235
      will not be used during the disk activation, useful for cases
1236
      when the size is wrong
1237
  @return: False if the operation failed, otherwise a list of
1238
      (host, instance_visible_name, node_visible_name)
1239
      with the mapping from node devices to instance devices
1240

1241
  """
1242
  device_info = []
1243
  disks_ok = True
1244
  iname = instance.name
1245
  disks = ExpandCheckDisks(instance, disks)
1246

    
1247
  # With the two passes mechanism we try to reduce the window of
1248
  # opportunity for the race condition of switching DRBD to primary
1249
  # before handshaking occured, but we do not eliminate it
1250

    
1251
  # The proper fix would be to wait (with some limits) until the
1252
  # connection has been made and drbd transitions from WFConnection
1253
  # into any other network-connected state (Connected, SyncTarget,
1254
  # SyncSource, etc.)
1255

    
1256
  # mark instance disks as active before doing actual work, so watcher does
1257
  # not try to shut them down erroneously
1258
  lu.cfg.MarkInstanceDisksActive(iname)
1259

    
1260
  # 1st pass, assemble on all nodes in secondary mode
1261
  for idx, inst_disk in enumerate(disks):
1262
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1263
      if ignore_size:
1264
        node_disk = node_disk.Copy()
1265
        node_disk.UnsetSize()
1266
      lu.cfg.SetDiskID(node_disk, node)
1267
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1268
                                             False, idx)
1269
      msg = result.fail_msg
1270
      if msg:
1271
        is_offline_secondary = (node in instance.secondary_nodes and
1272
                                result.offline)
1273
        lu.LogWarning("Could not prepare block device %s on node %s"
1274
                      " (is_primary=False, pass=1): %s",
1275
                      inst_disk.iv_name, node, msg)
1276
        if not (ignore_secondaries or is_offline_secondary):
1277
          disks_ok = False
1278

    
1279
  # FIXME: race condition on drbd migration to primary
1280

    
1281
  # 2nd pass, do only the primary node
1282
  for idx, inst_disk in enumerate(disks):
1283
    dev_path = None
1284

    
1285
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1286
      if node != instance.primary_node:
1287
        continue
1288
      if ignore_size:
1289
        node_disk = node_disk.Copy()
1290
        node_disk.UnsetSize()
1291
      lu.cfg.SetDiskID(node_disk, node)
1292
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1293
                                             True, idx)
1294
      msg = result.fail_msg
1295
      if msg:
1296
        lu.LogWarning("Could not prepare block device %s on node %s"
1297
                      " (is_primary=True, pass=2): %s",
1298
                      inst_disk.iv_name, node, msg)
1299
        disks_ok = False
1300
      else:
1301
        dev_path = result.payload
1302

    
1303
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1304

    
1305
  # leave the disks configured for the primary node
1306
  # this is a workaround that would be fixed better by
1307
  # improving the logical/physical id handling
1308
  for disk in disks:
1309
    lu.cfg.SetDiskID(disk, instance.primary_node)
1310

    
1311
  if not disks_ok:
1312
    lu.cfg.MarkInstanceDisksInactive(iname)
1313

    
1314
  return disks_ok, device_info
1315

    
1316

    
1317
def StartInstanceDisks(lu, instance, force):
1318
  """Start the disks of an instance.
1319

1320
  """
1321
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1322
                                      ignore_secondaries=force)
1323
  if not disks_ok:
1324
    ShutdownInstanceDisks(lu, instance)
1325
    if force is not None and not force:
1326
      lu.LogWarning("",
1327
                    hint=("If the message above refers to a secondary node,"
1328
                          " you can retry the operation using '--force'"))
1329
    raise errors.OpExecError("Disk consistency error")
1330

    
1331

    
1332
class LUInstanceGrowDisk(LogicalUnit):
1333
  """Grow a disk of an instance.
1334

1335
  """
1336
  HPATH = "disk-grow"
1337
  HTYPE = constants.HTYPE_INSTANCE
1338
  REQ_BGL = False
1339

    
1340
  def ExpandNames(self):
1341
    self._ExpandAndLockInstance()
1342
    self.needed_locks[locking.LEVEL_NODE] = []
1343
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1344
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1345
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1346

    
1347
  def DeclareLocks(self, level):
1348
    if level == locking.LEVEL_NODE:
1349
      self._LockInstancesNodes()
1350
    elif level == locking.LEVEL_NODE_RES:
1351
      # Copy node locks
1352
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1353
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1354

    
1355
  def BuildHooksEnv(self):
1356
    """Build hooks env.
1357

1358
    This runs on the master, the primary and all the secondaries.
1359

1360
    """
1361
    env = {
1362
      "DISK": self.op.disk,
1363
      "AMOUNT": self.op.amount,
1364
      "ABSOLUTE": self.op.absolute,
1365
      }
1366
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1367
    return env
1368

    
1369
  def BuildHooksNodes(self):
1370
    """Build hooks nodes.
1371

1372
    """
1373
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1374
    return (nl, nl)
1375

    
1376
  def CheckPrereq(self):
1377
    """Check prerequisites.
1378

1379
    This checks that the instance is in the cluster.
1380

1381
    """
1382
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1383
    assert instance is not None, \
1384
      "Cannot retrieve locked instance %s" % self.op.instance_name
1385
    nodenames = list(instance.all_nodes)
1386
    for node in nodenames:
1387
      CheckNodeOnline(self, node)
1388

    
1389
    self.instance = instance
1390

    
1391
    if instance.disk_template not in constants.DTS_GROWABLE:
1392
      raise errors.OpPrereqError("Instance's disk layout does not support"
1393
                                 " growing", errors.ECODE_INVAL)
1394

    
1395
    self.disk = instance.FindDisk(self.op.disk)
1396

    
1397
    if self.op.absolute:
1398
      self.target = self.op.amount
1399
      self.delta = self.target - self.disk.size
1400
      if self.delta < 0:
1401
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1402
                                   "current disk size (%s)" %
1403
                                   (utils.FormatUnit(self.target, "h"),
1404
                                    utils.FormatUnit(self.disk.size, "h")),
1405
                                   errors.ECODE_STATE)
1406
    else:
1407
      self.delta = self.op.amount
1408
      self.target = self.disk.size + self.delta
1409
      if self.delta < 0:
1410
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1411
                                   utils.FormatUnit(self.delta, "h"),
1412
                                   errors.ECODE_INVAL)
1413

    
1414
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1415

    
1416
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1417
    template = self.instance.disk_template
1418
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1419
      # TODO: check the free disk space for file, when that feature will be
1420
      # supported
1421
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1422
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1423
                        nodes)
1424
      if es_nodes:
1425
        # With exclusive storage we need to something smarter than just looking
1426
        # at free space; for now, let's simply abort the operation.
1427
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1428
                                   " is enabled", errors.ECODE_STATE)
1429
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1430

    
1431
  def Exec(self, feedback_fn):
1432
    """Execute disk grow.
1433

1434
    """
1435
    instance = self.instance
1436
    disk = self.disk
1437

    
1438
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1439
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1440
            self.owned_locks(locking.LEVEL_NODE_RES))
1441

    
1442
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1443

    
1444
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1445
    if not disks_ok:
1446
      raise errors.OpExecError("Cannot activate block device to grow")
1447

    
1448
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1449
                (self.op.disk, instance.name,
1450
                 utils.FormatUnit(self.delta, "h"),
1451
                 utils.FormatUnit(self.target, "h")))
1452

    
1453
    # First run all grow ops in dry-run mode
1454
    for node in instance.all_nodes:
1455
      self.cfg.SetDiskID(disk, node)
1456
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1457
                                           True, True)
1458
      result.Raise("Dry-run grow request failed to node %s" % node)
1459

    
1460
    if wipe_disks:
1461
      # Get disk size from primary node for wiping
1462
      result = self.rpc.call_blockdev_getdimensions(instance.primary_node,
1463
                                                    [disk])
1464
      result.Raise("Failed to retrieve disk size from node '%s'" %
1465
                   instance.primary_node)
1466

    
1467
      (disk_dimensions, ) = result.payload
1468

    
1469
      if disk_dimensions is None:
1470
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1471
                                 " node '%s'" % instance.primary_node)
1472
      (disk_size_in_bytes, _) = disk_dimensions
1473

    
1474
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1475

    
1476
      assert old_disk_size >= disk.size, \
1477
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1478
         (old_disk_size, disk.size))
1479
    else:
1480
      old_disk_size = None
1481

    
1482
    # We know that (as far as we can test) operations across different
1483
    # nodes will succeed, time to run it for real on the backing storage
1484
    for node in instance.all_nodes:
1485
      self.cfg.SetDiskID(disk, node)
1486
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1487
                                           False, True)
1488
      result.Raise("Grow request failed to node %s" % node)
1489

    
1490
    # And now execute it for logical storage, on the primary node
1491
    node = instance.primary_node
1492
    self.cfg.SetDiskID(disk, node)
1493
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1494
                                         False, False)
1495
    result.Raise("Grow request failed to node %s" % node)
1496

    
1497
    disk.RecordGrow(self.delta)
1498
    self.cfg.Update(instance, feedback_fn)
1499

    
1500
    # Changes have been recorded, release node lock
1501
    ReleaseLocks(self, locking.LEVEL_NODE)
1502

    
1503
    # Downgrade lock while waiting for sync
1504
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1505

    
1506
    assert wipe_disks ^ (old_disk_size is None)
1507

    
1508
    if wipe_disks:
1509
      assert instance.disks[self.op.disk] == disk
1510

    
1511
      # Wipe newly added disk space
1512
      WipeDisks(self, instance,
1513
                disks=[(self.op.disk, disk, old_disk_size)])
1514

    
1515
    if self.op.wait_for_sync:
1516
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1517
      if disk_abort:
1518
        self.LogWarning("Disk syncing has not returned a good status; check"
1519
                        " the instance")
1520
      if not instance.disks_active:
1521
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1522
    elif not instance.disks_active:
1523
      self.LogWarning("Not shutting down the disk even if the instance is"
1524
                      " not supposed to be running because no wait for"
1525
                      " sync mode was requested")
1526

    
1527
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1528
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1529

    
1530

    
1531
class LUInstanceReplaceDisks(LogicalUnit):
1532
  """Replace the disks of an instance.
1533

1534
  """
1535
  HPATH = "mirrors-replace"
1536
  HTYPE = constants.HTYPE_INSTANCE
1537
  REQ_BGL = False
1538

    
1539
  def CheckArguments(self):
1540
    """Check arguments.
1541

1542
    """
1543
    remote_node = self.op.remote_node
1544
    ialloc = self.op.iallocator
1545
    if self.op.mode == constants.REPLACE_DISK_CHG:
1546
      if remote_node is None and ialloc is None:
1547
        raise errors.OpPrereqError("When changing the secondary either an"
1548
                                   " iallocator script must be used or the"
1549
                                   " new node given", errors.ECODE_INVAL)
1550
      else:
1551
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1552

    
1553
    elif remote_node is not None or ialloc is not None:
1554
      # Not replacing the secondary
1555
      raise errors.OpPrereqError("The iallocator and new node options can"
1556
                                 " only be used when changing the"
1557
                                 " secondary node", errors.ECODE_INVAL)
1558

    
1559
  def ExpandNames(self):
1560
    self._ExpandAndLockInstance()
1561

    
1562
    assert locking.LEVEL_NODE not in self.needed_locks
1563
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1564
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1565

    
1566
    assert self.op.iallocator is None or self.op.remote_node is None, \
1567
      "Conflicting options"
1568

    
1569
    if self.op.remote_node is not None:
1570
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1571

    
1572
      # Warning: do not remove the locking of the new secondary here
1573
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1574
      # currently it doesn't since parallel invocations of
1575
      # FindUnusedMinor will conflict
1576
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1577
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1578
    else:
1579
      self.needed_locks[locking.LEVEL_NODE] = []
1580
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1581

    
1582
      if self.op.iallocator is not None:
1583
        # iallocator will select a new node in the same group
1584
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1585
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1586

    
1587
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1588

    
1589
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1590
                                   self.op.iallocator, self.op.remote_node,
1591
                                   self.op.disks, self.op.early_release,
1592
                                   self.op.ignore_ipolicy)
1593

    
1594
    self.tasklets = [self.replacer]
1595

    
1596
  def DeclareLocks(self, level):
1597
    if level == locking.LEVEL_NODEGROUP:
1598
      assert self.op.remote_node is None
1599
      assert self.op.iallocator is not None
1600
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1601

    
1602
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1603
      # Lock all groups used by instance optimistically; this requires going
1604
      # via the node before it's locked, requiring verification later on
1605
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1606
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1607

    
1608
    elif level == locking.LEVEL_NODE:
1609
      if self.op.iallocator is not None:
1610
        assert self.op.remote_node is None
1611
        assert not self.needed_locks[locking.LEVEL_NODE]
1612
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1613

    
1614
        # Lock member nodes of all locked groups
1615
        self.needed_locks[locking.LEVEL_NODE] = \
1616
          [node_name
1617
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1618
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1619
      else:
1620
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1621

    
1622
        self._LockInstancesNodes()
1623

    
1624
    elif level == locking.LEVEL_NODE_RES:
1625
      # Reuse node locks
1626
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1627
        self.needed_locks[locking.LEVEL_NODE]
1628

    
1629
  def BuildHooksEnv(self):
1630
    """Build hooks env.
1631

1632
    This runs on the master, the primary and all the secondaries.
1633

1634
    """
1635
    instance = self.replacer.instance
1636
    env = {
1637
      "MODE": self.op.mode,
1638
      "NEW_SECONDARY": self.op.remote_node,
1639
      "OLD_SECONDARY": instance.secondary_nodes[0],
1640
      }
1641
    env.update(BuildInstanceHookEnvByObject(self, instance))
1642
    return env
1643

    
1644
  def BuildHooksNodes(self):
1645
    """Build hooks nodes.
1646

1647
    """
1648
    instance = self.replacer.instance
1649
    nl = [
1650
      self.cfg.GetMasterNode(),
1651
      instance.primary_node,
1652
      ]
1653
    if self.op.remote_node is not None:
1654
      nl.append(self.op.remote_node)
1655
    return nl, nl
1656

    
1657
  def CheckPrereq(self):
1658
    """Check prerequisites.
1659

1660
    """
1661
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1662
            self.op.iallocator is None)
1663

    
1664
    # Verify if node group locks are still correct
1665
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1666
    if owned_groups:
1667
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1668

    
1669
    return LogicalUnit.CheckPrereq(self)
1670

    
1671

    
1672
class LUInstanceActivateDisks(NoHooksLU):
1673
  """Bring up an instance's disks.
1674

1675
  """
1676
  REQ_BGL = False
1677

    
1678
  def ExpandNames(self):
1679
    self._ExpandAndLockInstance()
1680
    self.needed_locks[locking.LEVEL_NODE] = []
1681
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1682

    
1683
  def DeclareLocks(self, level):
1684
    if level == locking.LEVEL_NODE:
1685
      self._LockInstancesNodes()
1686

    
1687
  def CheckPrereq(self):
1688
    """Check prerequisites.
1689

1690
    This checks that the instance is in the cluster.
1691

1692
    """
1693
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1694
    assert self.instance is not None, \
1695
      "Cannot retrieve locked instance %s" % self.op.instance_name
1696
    CheckNodeOnline(self, self.instance.primary_node)
1697

    
1698
  def Exec(self, feedback_fn):
1699
    """Activate the disks.
1700

1701
    """
1702
    disks_ok, disks_info = \
1703
              AssembleInstanceDisks(self, self.instance,
1704
                                    ignore_size=self.op.ignore_size)
1705
    if not disks_ok:
1706
      raise errors.OpExecError("Cannot activate block devices")
1707

    
1708
    if self.op.wait_for_sync:
1709
      if not WaitForSync(self, self.instance):
1710
        self.cfg.MarkInstanceDisksInactive(self.instance.name)
1711
        raise errors.OpExecError("Some disks of the instance are degraded!")
1712

    
1713
    return disks_info
1714

    
1715

    
1716
class LUInstanceDeactivateDisks(NoHooksLU):
1717
  """Shutdown an instance's disks.
1718

1719
  """
1720
  REQ_BGL = False
1721

    
1722
  def ExpandNames(self):
1723
    self._ExpandAndLockInstance()
1724
    self.needed_locks[locking.LEVEL_NODE] = []
1725
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1726

    
1727
  def DeclareLocks(self, level):
1728
    if level == locking.LEVEL_NODE:
1729
      self._LockInstancesNodes()
1730

    
1731
  def CheckPrereq(self):
1732
    """Check prerequisites.
1733

1734
    This checks that the instance is in the cluster.
1735

1736
    """
1737
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1738
    assert self.instance is not None, \
1739
      "Cannot retrieve locked instance %s" % self.op.instance_name
1740

    
1741
  def Exec(self, feedback_fn):
1742
    """Deactivate the disks
1743

1744
    """
1745
    instance = self.instance
1746
    if self.op.force:
1747
      ShutdownInstanceDisks(self, instance)
1748
    else:
1749
      _SafeShutdownInstanceDisks(self, instance)
1750

    
1751

    
1752
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1753
                               ldisk=False):
1754
  """Check that mirrors are not degraded.
1755

1756
  @attention: The device has to be annotated already.
1757

1758
  The ldisk parameter, if True, will change the test from the
1759
  is_degraded attribute (which represents overall non-ok status for
1760
  the device(s)) to the ldisk (representing the local storage status).
1761

1762
  """
1763
  lu.cfg.SetDiskID(dev, node)
1764

    
1765
  result = True
1766

    
1767
  if on_primary or dev.AssembleOnSecondary():
1768
    rstats = lu.rpc.call_blockdev_find(node, dev)
1769
    msg = rstats.fail_msg
1770
    if msg:
1771
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1772
      result = False
1773
    elif not rstats.payload:
1774
      lu.LogWarning("Can't find disk on node %s", node)
1775
      result = False
1776
    else:
1777
      if ldisk:
1778
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1779
      else:
1780
        result = result and not rstats.payload.is_degraded
1781

    
1782
  if dev.children:
1783
    for child in dev.children:
1784
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1785
                                                     on_primary)
1786

    
1787
  return result
1788

    
1789

    
1790
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1791
  """Wrapper around L{_CheckDiskConsistencyInner}.
1792

1793
  """
1794
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1795
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1796
                                    ldisk=ldisk)
1797

    
1798

    
1799
def _BlockdevFind(lu, node, dev, instance):
1800
  """Wrapper around call_blockdev_find to annotate diskparams.
1801

1802
  @param lu: A reference to the lu object
1803
  @param node: The node to call out
1804
  @param dev: The device to find
1805
  @param instance: The instance object the device belongs to
1806
  @returns The result of the rpc call
1807

1808
  """
1809
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1810
  return lu.rpc.call_blockdev_find(node, disk)
1811

    
1812

    
1813
def _GenerateUniqueNames(lu, exts):
1814
  """Generate a suitable LV name.
1815

1816
  This will generate a logical volume name for the given instance.
1817

1818
  """
1819
  results = []
1820
  for val in exts:
1821
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1822
    results.append("%s%s" % (new_id, val))
1823
  return results
1824

    
1825

    
1826
class TLReplaceDisks(Tasklet):
1827
  """Replaces disks for an instance.
1828

1829
  Note: Locking is not within the scope of this class.
1830

1831
  """
1832
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1833
               disks, early_release, ignore_ipolicy):
1834
    """Initializes this class.
1835

1836
    """
1837
    Tasklet.__init__(self, lu)
1838

    
1839
    # Parameters
1840
    self.instance_name = instance_name
1841
    self.mode = mode
1842
    self.iallocator_name = iallocator_name
1843
    self.remote_node = remote_node
1844
    self.disks = disks
1845
    self.early_release = early_release
1846
    self.ignore_ipolicy = ignore_ipolicy
1847

    
1848
    # Runtime data
1849
    self.instance = None
1850
    self.new_node = None
1851
    self.target_node = None
1852
    self.other_node = None
1853
    self.remote_node_info = None
1854
    self.node_secondary_ip = None
1855

    
1856
  @staticmethod
1857
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1858
    """Compute a new secondary node using an IAllocator.
1859

1860
    """
1861
    req = iallocator.IAReqRelocate(name=instance_name,
1862
                                   relocate_from=list(relocate_from))
1863
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1864

    
1865
    ial.Run(iallocator_name)
1866

    
1867
    if not ial.success:
1868
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1869
                                 " %s" % (iallocator_name, ial.info),
1870
                                 errors.ECODE_NORES)
1871

    
1872
    remote_node_name = ial.result[0]
1873

    
1874
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1875
               instance_name, remote_node_name)
1876

    
1877
    return remote_node_name
1878

    
1879
  def _FindFaultyDisks(self, node_name):
1880
    """Wrapper for L{FindFaultyInstanceDisks}.
1881

1882
    """
1883
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1884
                                   node_name, True)
1885

    
1886
  def _CheckDisksActivated(self, instance):
1887
    """Checks if the instance disks are activated.
1888

1889
    @param instance: The instance to check disks
1890
    @return: True if they are activated, False otherwise
1891

1892
    """
1893
    nodes = instance.all_nodes
1894

    
1895
    for idx, dev in enumerate(instance.disks):
1896
      for node in nodes:
1897
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1898
        self.cfg.SetDiskID(dev, node)
1899

    
1900
        result = _BlockdevFind(self, node, dev, instance)
1901

    
1902
        if result.offline:
1903
          continue
1904
        elif result.fail_msg or not result.payload:
1905
          return False
1906

    
1907
    return True
1908

    
1909
  def CheckPrereq(self):
1910
    """Check prerequisites.
1911

1912
    This checks that the instance is in the cluster.
1913

1914
    """
1915
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1916
    assert instance is not None, \
1917
      "Cannot retrieve locked instance %s" % self.instance_name
1918

    
1919
    if instance.disk_template != constants.DT_DRBD8:
1920
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1921
                                 " instances", errors.ECODE_INVAL)
1922

    
1923
    if len(instance.secondary_nodes) != 1:
1924
      raise errors.OpPrereqError("The instance has a strange layout,"
1925
                                 " expected one secondary but found %d" %
1926
                                 len(instance.secondary_nodes),
1927
                                 errors.ECODE_FAULT)
1928

    
1929
    instance = self.instance
1930
    secondary_node = instance.secondary_nodes[0]
1931

    
1932
    if self.iallocator_name is None:
1933
      remote_node = self.remote_node
1934
    else:
1935
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1936
                                       instance.name, instance.secondary_nodes)
1937

    
1938
    if remote_node is None:
1939
      self.remote_node_info = None
1940
    else:
1941
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1942
             "Remote node '%s' is not locked" % remote_node
1943

    
1944
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1945
      assert self.remote_node_info is not None, \
1946
        "Cannot retrieve locked node %s" % remote_node
1947

    
1948
    if remote_node == self.instance.primary_node:
1949
      raise errors.OpPrereqError("The specified node is the primary node of"
1950
                                 " the instance", errors.ECODE_INVAL)
1951

    
1952
    if remote_node == secondary_node:
1953
      raise errors.OpPrereqError("The specified node is already the"
1954
                                 " secondary node of the instance",
1955
                                 errors.ECODE_INVAL)
1956

    
1957
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1958
                                    constants.REPLACE_DISK_CHG):
1959
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1960
                                 errors.ECODE_INVAL)
1961

    
1962
    if self.mode == constants.REPLACE_DISK_AUTO:
1963
      if not self._CheckDisksActivated(instance):
1964
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1965
                                   " first" % self.instance_name,
1966
                                   errors.ECODE_STATE)
1967
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1968
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1969

    
1970
      if faulty_primary and faulty_secondary:
1971
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1972
                                   " one node and can not be repaired"
1973
                                   " automatically" % self.instance_name,
1974
                                   errors.ECODE_STATE)
1975

    
1976
      if faulty_primary:
1977
        self.disks = faulty_primary
1978
        self.target_node = instance.primary_node
1979
        self.other_node = secondary_node
1980
        check_nodes = [self.target_node, self.other_node]
1981
      elif faulty_secondary:
1982
        self.disks = faulty_secondary
1983
        self.target_node = secondary_node
1984
        self.other_node = instance.primary_node
1985
        check_nodes = [self.target_node, self.other_node]
1986
      else:
1987
        self.disks = []
1988
        check_nodes = []
1989

    
1990
    else:
1991
      # Non-automatic modes
1992
      if self.mode == constants.REPLACE_DISK_PRI:
1993
        self.target_node = instance.primary_node
1994
        self.other_node = secondary_node
1995
        check_nodes = [self.target_node, self.other_node]
1996

    
1997
      elif self.mode == constants.REPLACE_DISK_SEC:
1998
        self.target_node = secondary_node
1999
        self.other_node = instance.primary_node
2000
        check_nodes = [self.target_node, self.other_node]
2001

    
2002
      elif self.mode == constants.REPLACE_DISK_CHG:
2003
        self.new_node = remote_node
2004
        self.other_node = instance.primary_node
2005
        self.target_node = secondary_node
2006
        check_nodes = [self.new_node, self.other_node]
2007

    
2008
        CheckNodeNotDrained(self.lu, remote_node)
2009
        CheckNodeVmCapable(self.lu, remote_node)
2010

    
2011
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
2012
        assert old_node_info is not None
2013
        if old_node_info.offline and not self.early_release:
2014
          # doesn't make sense to delay the release
2015
          self.early_release = True
2016
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2017
                          " early-release mode", secondary_node)
2018

    
2019
      else:
2020
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2021
                                     self.mode)
2022

    
2023
      # If not specified all disks should be replaced
2024
      if not self.disks:
2025
        self.disks = range(len(self.instance.disks))
2026

    
2027
    # TODO: This is ugly, but right now we can't distinguish between internal
2028
    # submitted opcode and external one. We should fix that.
2029
    if self.remote_node_info:
2030
      # We change the node, lets verify it still meets instance policy
2031
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2032
      cluster = self.cfg.GetClusterInfo()
2033
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2034
                                                              new_group_info)
2035
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2036
                             self.cfg, ignore=self.ignore_ipolicy)
2037

    
2038
    for node in check_nodes:
2039
      CheckNodeOnline(self.lu, node)
2040

    
2041
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2042
                                                          self.other_node,
2043
                                                          self.target_node]
2044
                              if node_name is not None)
2045

    
2046
    # Release unneeded node and node resource locks
2047
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2048
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2049
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2050

    
2051
    # Release any owned node group
2052
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2053

    
2054
    # Check whether disks are valid
2055
    for disk_idx in self.disks:
2056
      instance.FindDisk(disk_idx)
2057

    
2058
    # Get secondary node IP addresses
2059
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2060
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2061

    
2062
  def Exec(self, feedback_fn):
2063
    """Execute disk replacement.
2064

2065
    This dispatches the disk replacement to the appropriate handler.
2066

2067
    """
2068
    if __debug__:
2069
      # Verify owned locks before starting operation
2070
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2071
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2072
          ("Incorrect node locks, owning %s, expected %s" %
2073
           (owned_nodes, self.node_secondary_ip.keys()))
2074
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2075
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2076
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2077

    
2078
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2079
      assert list(owned_instances) == [self.instance_name], \
2080
          "Instance '%s' not locked" % self.instance_name
2081

    
2082
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2083
          "Should not own any node group lock at this point"
2084

    
2085
    if not self.disks:
2086
      feedback_fn("No disks need replacement for instance '%s'" %
2087
                  self.instance.name)
2088
      return
2089

    
2090
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2091
                (utils.CommaJoin(self.disks), self.instance.name))
2092
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2093
    feedback_fn("Current seconary node: %s" %
2094
                utils.CommaJoin(self.instance.secondary_nodes))
2095

    
2096
    activate_disks = not self.instance.disks_active
2097

    
2098
    # Activate the instance disks if we're replacing them on a down instance
2099
    if activate_disks:
2100
      StartInstanceDisks(self.lu, self.instance, True)
2101

    
2102
    try:
2103
      # Should we replace the secondary node?
2104
      if self.new_node is not None:
2105
        fn = self._ExecDrbd8Secondary
2106
      else:
2107
        fn = self._ExecDrbd8DiskOnly
2108

    
2109
      result = fn(feedback_fn)
2110
    finally:
2111
      # Deactivate the instance disks if we're replacing them on a
2112
      # down instance
2113
      if activate_disks:
2114
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2115

    
2116
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2117

    
2118
    if __debug__:
2119
      # Verify owned locks
2120
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2121
      nodes = frozenset(self.node_secondary_ip)
2122
      assert ((self.early_release and not owned_nodes) or
2123
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2124
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2125
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2126

    
2127
    return result
2128

    
2129
  def _CheckVolumeGroup(self, nodes):
2130
    self.lu.LogInfo("Checking volume groups")
2131

    
2132
    vgname = self.cfg.GetVGName()
2133

    
2134
    # Make sure volume group exists on all involved nodes
2135
    results = self.rpc.call_vg_list(nodes)
2136
    if not results:
2137
      raise errors.OpExecError("Can't list volume groups on the nodes")
2138

    
2139
    for node in nodes:
2140
      res = results[node]
2141
      res.Raise("Error checking node %s" % node)
2142
      if vgname not in res.payload:
2143
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2144
                                 (vgname, node))
2145

    
2146
  def _CheckDisksExistence(self, nodes):
2147
    # Check disk existence
2148
    for idx, dev in enumerate(self.instance.disks):
2149
      if idx not in self.disks:
2150
        continue
2151

    
2152
      for node in nodes:
2153
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2154
        self.cfg.SetDiskID(dev, node)
2155

    
2156
        result = _BlockdevFind(self, node, dev, self.instance)
2157

    
2158
        msg = result.fail_msg
2159
        if msg or not result.payload:
2160
          if not msg:
2161
            msg = "disk not found"
2162
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2163
                                   (idx, node, msg))
2164

    
2165
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2166
    for idx, dev in enumerate(self.instance.disks):
2167
      if idx not in self.disks:
2168
        continue
2169

    
2170
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2171
                      (idx, node_name))
2172

    
2173
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2174
                                  on_primary, ldisk=ldisk):
2175
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2176
                                 " replace disks for instance %s" %
2177
                                 (node_name, self.instance.name))
2178

    
2179
  def _CreateNewStorage(self, node_name):
2180
    """Create new storage on the primary or secondary node.
2181

2182
    This is only used for same-node replaces, not for changing the
2183
    secondary node, hence we don't want to modify the existing disk.
2184

2185
    """
2186
    iv_names = {}
2187

    
2188
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2189
    for idx, dev in enumerate(disks):
2190
      if idx not in self.disks:
2191
        continue
2192

    
2193
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2194

    
2195
      self.cfg.SetDiskID(dev, node_name)
2196

    
2197
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2198
      names = _GenerateUniqueNames(self.lu, lv_names)
2199

    
2200
      (data_disk, meta_disk) = dev.children
2201
      vg_data = data_disk.logical_id[0]
2202
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2203
                             logical_id=(vg_data, names[0]),
2204
                             params=data_disk.params)
2205
      vg_meta = meta_disk.logical_id[0]
2206
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2207
                             size=constants.DRBD_META_SIZE,
2208
                             logical_id=(vg_meta, names[1]),
2209
                             params=meta_disk.params)
2210

    
2211
      new_lvs = [lv_data, lv_meta]
2212
      old_lvs = [child.Copy() for child in dev.children]
2213
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2214
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2215

    
2216
      # we pass force_create=True to force the LVM creation
2217
      for new_lv in new_lvs:
2218
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2219
                             GetInstanceInfoText(self.instance), False,
2220
                             excl_stor)
2221

    
2222
    return iv_names
2223

    
2224
  def _CheckDevices(self, node_name, iv_names):
2225
    for name, (dev, _, _) in iv_names.iteritems():
2226
      self.cfg.SetDiskID(dev, node_name)
2227

    
2228
      result = _BlockdevFind(self, node_name, dev, self.instance)
2229

    
2230
      msg = result.fail_msg
2231
      if msg or not result.payload:
2232
        if not msg:
2233
          msg = "disk not found"
2234
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2235
                                 (name, msg))
2236

    
2237
      if result.payload.is_degraded:
2238
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2239

    
2240
  def _RemoveOldStorage(self, node_name, iv_names):
2241
    for name, (_, old_lvs, _) in iv_names.iteritems():
2242
      self.lu.LogInfo("Remove logical volumes for %s", name)
2243

    
2244
      for lv in old_lvs:
2245
        self.cfg.SetDiskID(lv, node_name)
2246

    
2247
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2248
        if msg:
2249
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2250
                             hint="remove unused LVs manually")
2251

    
2252
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2253
    """Replace a disk on the primary or secondary for DRBD 8.
2254

2255
    The algorithm for replace is quite complicated:
2256

2257
      1. for each disk to be replaced:
2258

2259
        1. create new LVs on the target node with unique names
2260
        1. detach old LVs from the drbd device
2261
        1. rename old LVs to name_replaced.<time_t>
2262
        1. rename new LVs to old LVs
2263
        1. attach the new LVs (with the old names now) to the drbd device
2264

2265
      1. wait for sync across all devices
2266

2267
      1. for each modified disk:
2268

2269
        1. remove old LVs (which have the name name_replaces.<time_t>)
2270

2271
    Failures are not very well handled.
2272

2273
    """
2274
    steps_total = 6
2275

    
2276
    # Step: check device activation
2277
    self.lu.LogStep(1, steps_total, "Check device existence")
2278
    self._CheckDisksExistence([self.other_node, self.target_node])
2279
    self._CheckVolumeGroup([self.target_node, self.other_node])
2280

    
2281
    # Step: check other node consistency
2282
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2283
    self._CheckDisksConsistency(self.other_node,
2284
                                self.other_node == self.instance.primary_node,
2285
                                False)
2286

    
2287
    # Step: create new storage
2288
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2289
    iv_names = self._CreateNewStorage(self.target_node)
2290

    
2291
    # Step: for each lv, detach+rename*2+attach
2292
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2293
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2294
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2295

    
2296
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2297
                                                     old_lvs)
2298
      result.Raise("Can't detach drbd from local storage on node"
2299
                   " %s for device %s" % (self.target_node, dev.iv_name))
2300
      #dev.children = []
2301
      #cfg.Update(instance)
2302

    
2303
      # ok, we created the new LVs, so now we know we have the needed
2304
      # storage; as such, we proceed on the target node to rename
2305
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2306
      # using the assumption that logical_id == physical_id (which in
2307
      # turn is the unique_id on that node)
2308

    
2309
      # FIXME(iustin): use a better name for the replaced LVs
2310
      temp_suffix = int(time.time())
2311
      ren_fn = lambda d, suff: (d.physical_id[0],
2312
                                d.physical_id[1] + "_replaced-%s" % suff)
2313

    
2314
      # Build the rename list based on what LVs exist on the node
2315
      rename_old_to_new = []
2316
      for to_ren in old_lvs:
2317
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2318
        if not result.fail_msg and result.payload:
2319
          # device exists
2320
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2321

    
2322
      self.lu.LogInfo("Renaming the old LVs on the target node")
2323
      result = self.rpc.call_blockdev_rename(self.target_node,
2324
                                             rename_old_to_new)
2325
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2326

    
2327
      # Now we rename the new LVs to the old LVs
2328
      self.lu.LogInfo("Renaming the new LVs on the target node")
2329
      rename_new_to_old = [(new, old.physical_id)
2330
                           for old, new in zip(old_lvs, new_lvs)]
2331
      result = self.rpc.call_blockdev_rename(self.target_node,
2332
                                             rename_new_to_old)
2333
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2334

    
2335
      # Intermediate steps of in memory modifications
2336
      for old, new in zip(old_lvs, new_lvs):
2337
        new.logical_id = old.logical_id
2338
        self.cfg.SetDiskID(new, self.target_node)
2339

    
2340
      # We need to modify old_lvs so that removal later removes the
2341
      # right LVs, not the newly added ones; note that old_lvs is a
2342
      # copy here
2343
      for disk in old_lvs:
2344
        disk.logical_id = ren_fn(disk, temp_suffix)
2345
        self.cfg.SetDiskID(disk, self.target_node)
2346

    
2347
      # Now that the new lvs have the old name, we can add them to the device
2348
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2349
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2350
                                                  (dev, self.instance), new_lvs)
2351
      msg = result.fail_msg
2352
      if msg:
2353
        for new_lv in new_lvs:
2354
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2355
                                               new_lv).fail_msg
2356
          if msg2:
2357
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2358
                               hint=("cleanup manually the unused logical"
2359
                                     "volumes"))
2360
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2361

    
2362
    cstep = itertools.count(5)
2363

    
2364
    if self.early_release:
2365
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2366
      self._RemoveOldStorage(self.target_node, iv_names)
2367
      # TODO: Check if releasing locks early still makes sense
2368
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2369
    else:
2370
      # Release all resource locks except those used by the instance
2371
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2372
                   keep=self.node_secondary_ip.keys())
2373

    
2374
    # Release all node locks while waiting for sync
2375
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2376

    
2377
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2378
    # shutdown in the caller into consideration.
2379

    
2380
    # Wait for sync
2381
    # This can fail as the old devices are degraded and _WaitForSync
2382
    # does a combined result over all disks, so we don't check its return value
2383
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2384
    WaitForSync(self.lu, self.instance)
2385

    
2386
    # Check all devices manually
2387
    self._CheckDevices(self.instance.primary_node, iv_names)
2388

    
2389
    # Step: remove old storage
2390
    if not self.early_release:
2391
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2392
      self._RemoveOldStorage(self.target_node, iv_names)
2393

    
2394
  def _ExecDrbd8Secondary(self, feedback_fn):
2395
    """Replace the secondary node for DRBD 8.
2396

2397
    The algorithm for replace is quite complicated:
2398
      - for all disks of the instance:
2399
        - create new LVs on the new node with same names
2400
        - shutdown the drbd device on the old secondary
2401
        - disconnect the drbd network on the primary
2402
        - create the drbd device on the new secondary
2403
        - network attach the drbd on the primary, using an artifice:
2404
          the drbd code for Attach() will connect to the network if it
2405
          finds a device which is connected to the good local disks but
2406
          not network enabled
2407
      - wait for sync across all devices
2408
      - remove all disks from the old secondary
2409

2410
    Failures are not very well handled.
2411

2412
    """
2413
    steps_total = 6
2414

    
2415
    pnode = self.instance.primary_node
2416

    
2417
    # Step: check device activation
2418
    self.lu.LogStep(1, steps_total, "Check device existence")
2419
    self._CheckDisksExistence([self.instance.primary_node])
2420
    self._CheckVolumeGroup([self.instance.primary_node])
2421

    
2422
    # Step: check other node consistency
2423
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2424
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2425

    
2426
    # Step: create new storage
2427
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2428
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2429
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2430
    for idx, dev in enumerate(disks):
2431
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2432
                      (self.new_node, idx))
2433
      # we pass force_create=True to force LVM creation
2434
      for new_lv in dev.children:
2435
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2436
                             True, GetInstanceInfoText(self.instance), False,
2437
                             excl_stor)
2438

    
2439
    # Step 4: dbrd minors and drbd setups changes
2440
    # after this, we must manually remove the drbd minors on both the
2441
    # error and the success paths
2442
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2443
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2444
                                         for dev in self.instance.disks],
2445
                                        self.instance.name)
2446
    logging.debug("Allocated minors %r", minors)
2447

    
2448
    iv_names = {}
2449
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2450
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2451
                      (self.new_node, idx))
2452
      # create new devices on new_node; note that we create two IDs:
2453
      # one without port, so the drbd will be activated without
2454
      # networking information on the new node at this stage, and one
2455
      # with network, for the latter activation in step 4
2456
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2457
      if self.instance.primary_node == o_node1:
2458
        p_minor = o_minor1
2459
      else:
2460
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2461
        p_minor = o_minor2
2462

    
2463
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2464
                      p_minor, new_minor, o_secret)
2465
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2466
                    p_minor, new_minor, o_secret)
2467

    
2468
      iv_names[idx] = (dev, dev.children, new_net_id)
2469
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2470
                    new_net_id)
2471
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2472
                              logical_id=new_alone_id,
2473
                              children=dev.children,
2474
                              size=dev.size,
2475
                              params={})
2476
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2477
                                            self.cfg)
2478
      try:
2479
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2480
                             anno_new_drbd,
2481
                             GetInstanceInfoText(self.instance), False,
2482
                             excl_stor)
2483
      except errors.GenericError:
2484
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2485
        raise
2486

    
2487
    # We have new devices, shutdown the drbd on the old secondary
2488
    for idx, dev in enumerate(self.instance.disks):
2489
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2490
      self.cfg.SetDiskID(dev, self.target_node)
2491
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2492
                                            (dev, self.instance)).fail_msg
2493
      if msg:
2494
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2495
                           "node: %s" % (idx, msg),
2496
                           hint=("Please cleanup this device manually as"
2497
                                 " soon as possible"))
2498

    
2499
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2500
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2501
                                               self.instance.disks)[pnode]
2502

    
2503
    msg = result.fail_msg
2504
    if msg:
2505
      # detaches didn't succeed (unlikely)
2506
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2507
      raise errors.OpExecError("Can't detach the disks from the network on"
2508
                               " old node: %s" % (msg,))
2509

    
2510
    # if we managed to detach at least one, we update all the disks of
2511
    # the instance to point to the new secondary
2512
    self.lu.LogInfo("Updating instance configuration")
2513
    for dev, _, new_logical_id in iv_names.itervalues():
2514
      dev.logical_id = new_logical_id
2515
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2516

    
2517
    self.cfg.Update(self.instance, feedback_fn)
2518

    
2519
    # Release all node locks (the configuration has been updated)
2520
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2521

    
2522
    # and now perform the drbd attach
2523
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2524
                    " (standalone => connected)")
2525
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2526
                                            self.new_node],
2527
                                           self.node_secondary_ip,
2528
                                           (self.instance.disks, self.instance),
2529
                                           self.instance.name,
2530
                                           False)
2531
    for to_node, to_result in result.items():
2532
      msg = to_result.fail_msg
2533
      if msg:
2534
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2535
                           to_node, msg,
2536
                           hint=("please do a gnt-instance info to see the"
2537
                                 " status of disks"))
2538

    
2539
    cstep = itertools.count(5)
2540

    
2541
    if self.early_release:
2542
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2543
      self._RemoveOldStorage(self.target_node, iv_names)
2544
      # TODO: Check if releasing locks early still makes sense
2545
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2546
    else:
2547
      # Release all resource locks except those used by the instance
2548
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2549
                   keep=self.node_secondary_ip.keys())
2550

    
2551
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2552
    # shutdown in the caller into consideration.
2553

    
2554
    # Wait for sync
2555
    # This can fail as the old devices are degraded and _WaitForSync
2556
    # does a combined result over all disks, so we don't check its return value
2557
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2558
    WaitForSync(self.lu, self.instance)
2559

    
2560
    # Check all devices manually
2561
    self._CheckDevices(self.instance.primary_node, iv_names)
2562

    
2563
    # Step: remove old storage
2564
    if not self.early_release:
2565
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2566
      self._RemoveOldStorage(self.target_node, iv_names)