Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ c7dd65be

History | View | Annotate | Download (94.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    result.Warn("Failed to remove newly-created disk %s on node %s" %
210
                (disk, node), logging.warning)
211

    
212

    
213
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
214
  """Create all disks for an instance.
215

216
  This abstracts away some work from AddInstance.
217

218
  @type lu: L{LogicalUnit}
219
  @param lu: the logical unit on whose behalf we execute
220
  @type instance: L{objects.Instance}
221
  @param instance: the instance whose disks we should create
222
  @type to_skip: list
223
  @param to_skip: list of indices to skip
224
  @type target_node: string
225
  @param target_node: if passed, overrides the target node for creation
226
  @type disks: list of {objects.Disk}
227
  @param disks: the disks to create; if not specified, all the disks of the
228
      instance are created
229
  @return: information about the created disks, to be used to call
230
      L{_UndoCreateDisks}
231
  @raise errors.OpPrereqError: in case of error
232

233
  """
234
  info = GetInstanceInfoText(instance)
235
  if target_node is None:
236
    pnode = instance.primary_node
237
    all_nodes = instance.all_nodes
238
  else:
239
    pnode = target_node
240
    all_nodes = [pnode]
241

    
242
  if disks is None:
243
    disks = instance.disks
244

    
245
  if instance.disk_template in constants.DTS_FILEBASED:
246
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
247
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
248

    
249
    result.Raise("Failed to create directory '%s' on"
250
                 " node %s" % (file_storage_dir, pnode))
251

    
252
  disks_created = []
253
  for idx, device in enumerate(disks):
254
    if to_skip and idx in to_skip:
255
      continue
256
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
257
    for node in all_nodes:
258
      f_create = node == pnode
259
      try:
260
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
261
        disks_created.append((node, device))
262
      except errors.DeviceCreationError, e:
263
        logging.warning("Creating disk %s for instance '%s' failed",
264
                        idx, instance.name)
265
        disks_created.extend(e.created_devices)
266
        _UndoCreateDisks(lu, disks_created)
267
        raise errors.OpExecError(e.message)
268
  return disks_created
269

    
270

    
271
def ComputeDiskSizePerVG(disk_template, disks):
272
  """Compute disk size requirements in the volume group
273

274
  """
275
  def _compute(disks, payload):
276
    """Universal algorithm.
277

278
    """
279
    vgs = {}
280
    for disk in disks:
281
      vgs[disk[constants.IDISK_VG]] = \
282
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
283

    
284
    return vgs
285

    
286
  # Required free disk space as a function of disk and swap space
287
  req_size_dict = {
288
    constants.DT_DISKLESS: {},
289
    constants.DT_PLAIN: _compute(disks, 0),
290
    # 128 MB are added for drbd metadata for each disk
291
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
292
    constants.DT_FILE: {},
293
    constants.DT_SHARED_FILE: {},
294
    }
295

    
296
  if disk_template not in req_size_dict:
297
    raise errors.ProgrammerError("Disk template '%s' size requirement"
298
                                 " is unknown" % disk_template)
299

    
300
  return req_size_dict[disk_template]
301

    
302

    
303
def ComputeDisks(op, default_vg):
304
  """Computes the instance disks.
305

306
  @param op: The instance opcode
307
  @param default_vg: The default_vg to assume
308

309
  @return: The computed disks
310

311
  """
312
  disks = []
313
  for disk in op.disks:
314
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
315
    if mode not in constants.DISK_ACCESS_SET:
316
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
317
                                 mode, errors.ECODE_INVAL)
318
    size = disk.get(constants.IDISK_SIZE, None)
319
    if size is None:
320
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
321
    try:
322
      size = int(size)
323
    except (TypeError, ValueError):
324
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
325
                                 errors.ECODE_INVAL)
326

    
327
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
328
    if ext_provider and op.disk_template != constants.DT_EXT:
329
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
330
                                 " disk template, not %s" %
331
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
332
                                  op.disk_template), errors.ECODE_INVAL)
333

    
334
    data_vg = disk.get(constants.IDISK_VG, default_vg)
335
    name = disk.get(constants.IDISK_NAME, None)
336
    if name is not None and name.lower() == constants.VALUE_NONE:
337
      name = None
338
    new_disk = {
339
      constants.IDISK_SIZE: size,
340
      constants.IDISK_MODE: mode,
341
      constants.IDISK_VG: data_vg,
342
      constants.IDISK_NAME: name,
343
      }
344

    
345
    for key in [
346
      constants.IDISK_METAVG,
347
      constants.IDISK_ADOPT,
348
      constants.IDISK_SPINDLES,
349
      ]:
350
      if key in disk:
351
        new_disk[key] = disk[key]
352

    
353
    # For extstorage, demand the `provider' option and add any
354
    # additional parameters (ext-params) to the dict
355
    if op.disk_template == constants.DT_EXT:
356
      if ext_provider:
357
        new_disk[constants.IDISK_PROVIDER] = ext_provider
358
        for key in disk:
359
          if key not in constants.IDISK_PARAMS:
360
            new_disk[key] = disk[key]
361
      else:
362
        raise errors.OpPrereqError("Missing provider for template '%s'" %
363
                                   constants.DT_EXT, errors.ECODE_INVAL)
364

    
365
    disks.append(new_disk)
366

    
367
  return disks
368

    
369

    
370
def CheckRADOSFreeSpace():
371
  """Compute disk size requirements inside the RADOS cluster.
372

373
  """
374
  # For the RADOS cluster we assume there is always enough space.
375
  pass
376

    
377

    
378
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
379
                         iv_name, p_minor, s_minor):
380
  """Generate a drbd8 device complete with its children.
381

382
  """
383
  assert len(vgnames) == len(names) == 2
384
  port = lu.cfg.AllocatePort()
385
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
386

    
387
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
388
                          logical_id=(vgnames[0], names[0]),
389
                          params={})
390
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
391
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
392
                          size=constants.DRBD_META_SIZE,
393
                          logical_id=(vgnames[1], names[1]),
394
                          params={})
395
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
396
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
397
                          logical_id=(primary, secondary, port,
398
                                      p_minor, s_minor,
399
                                      shared_secret),
400
                          children=[dev_data, dev_meta],
401
                          iv_name=iv_name, params={})
402
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
403
  return drbd_dev
404

    
405

    
406
def GenerateDiskTemplate(
407
  lu, template_name, instance_name, primary_node, secondary_nodes,
408
  disk_info, file_storage_dir, file_driver, base_index,
409
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
410
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
411
  """Generate the entire disk layout for a given template type.
412

413
  """
414
  vgname = lu.cfg.GetVGName()
415
  disk_count = len(disk_info)
416
  disks = []
417

    
418
  if template_name == constants.DT_DISKLESS:
419
    pass
420
  elif template_name == constants.DT_DRBD8:
421
    if len(secondary_nodes) != 1:
422
      raise errors.ProgrammerError("Wrong template configuration")
423
    remote_node = secondary_nodes[0]
424
    minors = lu.cfg.AllocateDRBDMinor(
425
      [primary_node, remote_node] * len(disk_info), instance_name)
426

    
427
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
428
                                                       full_disk_params)
429
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
430

    
431
    names = []
432
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
433
                                               for i in range(disk_count)]):
434
      names.append(lv_prefix + "_data")
435
      names.append(lv_prefix + "_meta")
436
    for idx, disk in enumerate(disk_info):
437
      disk_index = idx + base_index
438
      data_vg = disk.get(constants.IDISK_VG, vgname)
439
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
440
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
441
                                      disk[constants.IDISK_SIZE],
442
                                      [data_vg, meta_vg],
443
                                      names[idx * 2:idx * 2 + 2],
444
                                      "disk/%d" % disk_index,
445
                                      minors[idx * 2], minors[idx * 2 + 1])
446
      disk_dev.mode = disk[constants.IDISK_MODE]
447
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
448
      disks.append(disk_dev)
449
  else:
450
    if secondary_nodes:
451
      raise errors.ProgrammerError("Wrong template configuration")
452

    
453
    if template_name == constants.DT_FILE:
454
      _req_file_storage()
455
    elif template_name == constants.DT_SHARED_FILE:
456
      _req_shr_file_storage()
457

    
458
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
459
    if name_prefix is None:
460
      names = None
461
    else:
462
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
463
                                        (name_prefix, base_index + i)
464
                                        for i in range(disk_count)])
465

    
466
    if template_name == constants.DT_PLAIN:
467

    
468
      def logical_id_fn(idx, _, disk):
469
        vg = disk.get(constants.IDISK_VG, vgname)
470
        return (vg, names[idx])
471

    
472
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
473
      logical_id_fn = \
474
        lambda _, disk_index, disk: (file_driver,
475
                                     "%s/disk%d" % (file_storage_dir,
476
                                                    disk_index))
477
    elif template_name == constants.DT_BLOCK:
478
      logical_id_fn = \
479
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
480
                                       disk[constants.IDISK_ADOPT])
481
    elif template_name == constants.DT_RBD:
482
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
483
    elif template_name == constants.DT_EXT:
484
      def logical_id_fn(idx, _, disk):
485
        provider = disk.get(constants.IDISK_PROVIDER, None)
486
        if provider is None:
487
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
488
                                       " not found", constants.DT_EXT,
489
                                       constants.IDISK_PROVIDER)
490
        return (provider, names[idx])
491
    else:
492
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
493

    
494
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
495

    
496
    for idx, disk in enumerate(disk_info):
497
      params = {}
498
      # Only for the Ext template add disk_info to params
499
      if template_name == constants.DT_EXT:
500
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
501
        for key in disk:
502
          if key not in constants.IDISK_PARAMS:
503
            params[key] = disk[key]
504
      disk_index = idx + base_index
505
      size = disk[constants.IDISK_SIZE]
506
      feedback_fn("* disk %s, size %s" %
507
                  (disk_index, utils.FormatUnit(size, "h")))
508
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
509
                              logical_id=logical_id_fn(idx, disk_index, disk),
510
                              iv_name="disk/%d" % disk_index,
511
                              mode=disk[constants.IDISK_MODE],
512
                              params=params,
513
                              spindles=disk.get(constants.IDISK_SPINDLES))
514
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
515
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
516
      disks.append(disk_dev)
517

    
518
  return disks
519

    
520

    
521
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
522
  """Check the presence of the spindle options with exclusive_storage.
523

524
  @type diskdict: dict
525
  @param diskdict: disk parameters
526
  @type es_flag: bool
527
  @param es_flag: the effective value of the exlusive_storage flag
528
  @type required: bool
529
  @param required: whether spindles are required or just optional
530
  @raise errors.OpPrereqError when spindles are given and they should not
531

532
  """
533
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
534
      diskdict[constants.IDISK_SPINDLES] is not None):
535
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
536
                               " when exclusive storage is not active",
537
                               errors.ECODE_INVAL)
538
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
539
                                diskdict[constants.IDISK_SPINDLES] is None)):
540
    raise errors.OpPrereqError("You must specify spindles in instance disks"
541
                               " when exclusive storage is active",
542
                               errors.ECODE_INVAL)
543

    
544

    
545
class LUInstanceRecreateDisks(LogicalUnit):
546
  """Recreate an instance's missing disks.
547

548
  """
549
  HPATH = "instance-recreate-disks"
550
  HTYPE = constants.HTYPE_INSTANCE
551
  REQ_BGL = False
552

    
553
  _MODIFYABLE = compat.UniqueFrozenset([
554
    constants.IDISK_SIZE,
555
    constants.IDISK_MODE,
556
    constants.IDISK_SPINDLES,
557
    ])
558

    
559
  # New or changed disk parameters may have different semantics
560
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
561
    constants.IDISK_ADOPT,
562

    
563
    # TODO: Implement support changing VG while recreating
564
    constants.IDISK_VG,
565
    constants.IDISK_METAVG,
566
    constants.IDISK_PROVIDER,
567
    constants.IDISK_NAME,
568
    ]))
569

    
570
  def _RunAllocator(self):
571
    """Run the allocator based on input opcode.
572

573
    """
574
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
575

    
576
    # FIXME
577
    # The allocator should actually run in "relocate" mode, but current
578
    # allocators don't support relocating all the nodes of an instance at
579
    # the same time. As a workaround we use "allocate" mode, but this is
580
    # suboptimal for two reasons:
581
    # - The instance name passed to the allocator is present in the list of
582
    #   existing instances, so there could be a conflict within the
583
    #   internal structures of the allocator. This doesn't happen with the
584
    #   current allocators, but it's a liability.
585
    # - The allocator counts the resources used by the instance twice: once
586
    #   because the instance exists already, and once because it tries to
587
    #   allocate a new instance.
588
    # The allocator could choose some of the nodes on which the instance is
589
    # running, but that's not a problem. If the instance nodes are broken,
590
    # they should be already be marked as drained or offline, and hence
591
    # skipped by the allocator. If instance disks have been lost for other
592
    # reasons, then recreating the disks on the same nodes should be fine.
593
    disk_template = self.instance.disk_template
594
    spindle_use = be_full[constants.BE_SPINDLE_USE]
595
    disks = [{
596
      constants.IDISK_SIZE: d.size,
597
      constants.IDISK_MODE: d.mode,
598
      constants.IDISK_SPINDLES: d.spindles,
599
      } for d in self.instance.disks]
600
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
601
                                        disk_template=disk_template,
602
                                        tags=list(self.instance.GetTags()),
603
                                        os=self.instance.os,
604
                                        nics=[{}],
605
                                        vcpus=be_full[constants.BE_VCPUS],
606
                                        memory=be_full[constants.BE_MAXMEM],
607
                                        spindle_use=spindle_use,
608
                                        disks=disks,
609
                                        hypervisor=self.instance.hypervisor,
610
                                        node_whitelist=None)
611
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
612

    
613
    ial.Run(self.op.iallocator)
614

    
615
    assert req.RequiredNodes() == len(self.instance.all_nodes)
616

    
617
    if not ial.success:
618
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
619
                                 " %s" % (self.op.iallocator, ial.info),
620
                                 errors.ECODE_NORES)
621

    
622
    self.op.nodes = ial.result
623
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
624
                 self.op.instance_name, self.op.iallocator,
625
                 utils.CommaJoin(ial.result))
626

    
627
  def CheckArguments(self):
628
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
629
      # Normalize and convert deprecated list of disk indices
630
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
631

    
632
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
633
    if duplicates:
634
      raise errors.OpPrereqError("Some disks have been specified more than"
635
                                 " once: %s" % utils.CommaJoin(duplicates),
636
                                 errors.ECODE_INVAL)
637

    
638
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
639
    # when neither iallocator nor nodes are specified
640
    if self.op.iallocator or self.op.nodes:
641
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
642

    
643
    for (idx, params) in self.op.disks:
644
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
645
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
646
      if unsupported:
647
        raise errors.OpPrereqError("Parameters for disk %s try to change"
648
                                   " unmodifyable parameter(s): %s" %
649
                                   (idx, utils.CommaJoin(unsupported)),
650
                                   errors.ECODE_INVAL)
651

    
652
  def ExpandNames(self):
653
    self._ExpandAndLockInstance()
654
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
655

    
656
    if self.op.nodes:
657
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
658
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
659
    else:
660
      self.needed_locks[locking.LEVEL_NODE] = []
661
      if self.op.iallocator:
662
        # iallocator will select a new node in the same group
663
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
664
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
665

    
666
    self.needed_locks[locking.LEVEL_NODE_RES] = []
667

    
668
  def DeclareLocks(self, level):
669
    if level == locking.LEVEL_NODEGROUP:
670
      assert self.op.iallocator is not None
671
      assert not self.op.nodes
672
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
673
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
674
      # Lock the primary group used by the instance optimistically; this
675
      # requires going via the node before it's locked, requiring
676
      # verification later on
677
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
678
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
679

    
680
    elif level == locking.LEVEL_NODE:
681
      # If an allocator is used, then we lock all the nodes in the current
682
      # instance group, as we don't know yet which ones will be selected;
683
      # if we replace the nodes without using an allocator, locks are
684
      # already declared in ExpandNames; otherwise, we need to lock all the
685
      # instance nodes for disk re-creation
686
      if self.op.iallocator:
687
        assert not self.op.nodes
688
        assert not self.needed_locks[locking.LEVEL_NODE]
689
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
690

    
691
        # Lock member nodes of the group of the primary node
692
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
693
          self.needed_locks[locking.LEVEL_NODE].extend(
694
            self.cfg.GetNodeGroup(group_uuid).members)
695

    
696
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
697
      elif not self.op.nodes:
698
        self._LockInstancesNodes(primary_only=False)
699
    elif level == locking.LEVEL_NODE_RES:
700
      # Copy node locks
701
      self.needed_locks[locking.LEVEL_NODE_RES] = \
702
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
703

    
704
  def BuildHooksEnv(self):
705
    """Build hooks env.
706

707
    This runs on master, primary and secondary nodes of the instance.
708

709
    """
710
    return BuildInstanceHookEnvByObject(self, self.instance)
711

    
712
  def BuildHooksNodes(self):
713
    """Build hooks nodes.
714

715
    """
716
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
717
    return (nl, nl)
718

    
719
  def CheckPrereq(self):
720
    """Check prerequisites.
721

722
    This checks that the instance is in the cluster and is not running.
723

724
    """
725
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
726
    assert instance is not None, \
727
      "Cannot retrieve locked instance %s" % self.op.instance_name
728
    if self.op.nodes:
729
      if len(self.op.nodes) != len(instance.all_nodes):
730
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
731
                                   " %d replacement nodes were specified" %
732
                                   (instance.name, len(instance.all_nodes),
733
                                    len(self.op.nodes)),
734
                                   errors.ECODE_INVAL)
735
      assert instance.disk_template != constants.DT_DRBD8 or \
736
             len(self.op.nodes) == 2
737
      assert instance.disk_template != constants.DT_PLAIN or \
738
             len(self.op.nodes) == 1
739
      primary_node = self.op.nodes[0]
740
    else:
741
      primary_node = instance.primary_node
742
    if not self.op.iallocator:
743
      CheckNodeOnline(self, primary_node)
744

    
745
    if instance.disk_template == constants.DT_DISKLESS:
746
      raise errors.OpPrereqError("Instance '%s' has no disks" %
747
                                 self.op.instance_name, errors.ECODE_INVAL)
748

    
749
    # Verify if node group locks are still correct
750
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
751
    if owned_groups:
752
      # Node group locks are acquired only for the primary node (and only
753
      # when the allocator is used)
754
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
755
                              primary_only=True)
756

    
757
    # if we replace nodes *and* the old primary is offline, we don't
758
    # check the instance state
759
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
760
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
761
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
762
                         msg="cannot recreate disks")
763

    
764
    if self.op.disks:
765
      self.disks = dict(self.op.disks)
766
    else:
767
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
768

    
769
    maxidx = max(self.disks.keys())
770
    if maxidx >= len(instance.disks):
771
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
772
                                 errors.ECODE_INVAL)
773

    
774
    if ((self.op.nodes or self.op.iallocator) and
775
         sorted(self.disks.keys()) != range(len(instance.disks))):
776
      raise errors.OpPrereqError("Can't recreate disks partially and"
777
                                 " change the nodes at the same time",
778
                                 errors.ECODE_INVAL)
779

    
780
    self.instance = instance
781

    
782
    if self.op.iallocator:
783
      self._RunAllocator()
784
      # Release unneeded node and node resource locks
785
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
786
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
787
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
788

    
789
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
790

    
791
    if self.op.nodes:
792
      nodes = self.op.nodes
793
    else:
794
      nodes = instance.all_nodes
795
    excl_stor = compat.any(
796
      rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
797
      )
798
    for new_params in self.disks.values():
799
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
800

    
801
  def Exec(self, feedback_fn):
802
    """Recreate the disks.
803

804
    """
805
    instance = self.instance
806

    
807
    assert (self.owned_locks(locking.LEVEL_NODE) ==
808
            self.owned_locks(locking.LEVEL_NODE_RES))
809

    
810
    to_skip = []
811
    mods = [] # keeps track of needed changes
812

    
813
    for idx, disk in enumerate(instance.disks):
814
      try:
815
        changes = self.disks[idx]
816
      except KeyError:
817
        # Disk should not be recreated
818
        to_skip.append(idx)
819
        continue
820

    
821
      # update secondaries for disks, if needed
822
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
823
        # need to update the nodes and minors
824
        assert len(self.op.nodes) == 2
825
        assert len(disk.logical_id) == 6 # otherwise disk internals
826
                                         # have changed
827
        (_, _, old_port, _, _, old_secret) = disk.logical_id
828
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
829
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
830
                  new_minors[0], new_minors[1], old_secret)
831
        assert len(disk.logical_id) == len(new_id)
832
      else:
833
        new_id = None
834

    
835
      mods.append((idx, new_id, changes))
836

    
837
    # now that we have passed all asserts above, we can apply the mods
838
    # in a single run (to avoid partial changes)
839
    for idx, new_id, changes in mods:
840
      disk = instance.disks[idx]
841
      if new_id is not None:
842
        assert disk.dev_type == constants.LD_DRBD8
843
        disk.logical_id = new_id
844
      if changes:
845
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
846
                    mode=changes.get(constants.IDISK_MODE, None),
847
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
848

    
849
    # change primary node, if needed
850
    if self.op.nodes:
851
      instance.primary_node = self.op.nodes[0]
852
      self.LogWarning("Changing the instance's nodes, you will have to"
853
                      " remove any disks left on the older nodes manually")
854

    
855
    if self.op.nodes:
856
      self.cfg.Update(instance, feedback_fn)
857

    
858
    # All touched nodes must be locked
859
    mylocks = self.owned_locks(locking.LEVEL_NODE)
860
    assert mylocks.issuperset(frozenset(instance.all_nodes))
861
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
862

    
863
    # TODO: Release node locks before wiping, or explain why it's not possible
864
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
865
      wipedisks = [(idx, disk, 0)
866
                   for (idx, disk) in enumerate(instance.disks)
867
                   if idx not in to_skip]
868
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
869

    
870

    
871
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
872
  """Checks if nodes have enough free disk space in the specified VG.
873

874
  This function checks if all given nodes have the needed amount of
875
  free disk. In case any node has less disk or we cannot get the
876
  information from the node, this function raises an OpPrereqError
877
  exception.
878

879
  @type lu: C{LogicalUnit}
880
  @param lu: a logical unit from which we get configuration data
881
  @type nodenames: C{list}
882
  @param nodenames: the list of node names to check
883
  @type vg: C{str}
884
  @param vg: the volume group to check
885
  @type requested: C{int}
886
  @param requested: the amount of disk in MiB to check for
887
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
888
      or we cannot check the node
889

890
  """
891
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
892
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
893
  # the current functionality. Refactor to make it more flexible.
894
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
895
                                   es_flags)
896
  for node in nodenames:
897
    info = nodeinfo[node]
898
    info.Raise("Cannot get current information from node %s" % node,
899
               prereq=True, ecode=errors.ECODE_ENVIRON)
900
    (_, (vg_info, ), _) = info.payload
901
    vg_free = vg_info.get("vg_free", None)
902
    if not isinstance(vg_free, int):
903
      raise errors.OpPrereqError("Can't compute free disk space on node"
904
                                 " %s for vg %s, result was '%s'" %
905
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
906
    if requested > vg_free:
907
      raise errors.OpPrereqError("Not enough disk space on target node %s"
908
                                 " vg %s: required %d MiB, available %d MiB" %
909
                                 (node, vg, requested, vg_free),
910
                                 errors.ECODE_NORES)
911

    
912

    
913
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
914
  """Checks if nodes have enough free disk space in all the VGs.
915

916
  This function checks if all given nodes have the needed amount of
917
  free disk. In case any node has less disk or we cannot get the
918
  information from the node, this function raises an OpPrereqError
919
  exception.
920

921
  @type lu: C{LogicalUnit}
922
  @param lu: a logical unit from which we get configuration data
923
  @type nodenames: C{list}
924
  @param nodenames: the list of node names to check
925
  @type req_sizes: C{dict}
926
  @param req_sizes: the hash of vg and corresponding amount of disk in
927
      MiB to check for
928
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
929
      or we cannot check the node
930

931
  """
932
  for vg, req_size in req_sizes.items():
933
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
934

    
935

    
936
def _DiskSizeInBytesToMebibytes(lu, size):
937
  """Converts a disk size in bytes to mebibytes.
938

939
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
940

941
  """
942
  (mib, remainder) = divmod(size, 1024 * 1024)
943

    
944
  if remainder != 0:
945
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
946
                  " to not overwrite existing data (%s bytes will not be"
947
                  " wiped)", (1024 * 1024) - remainder)
948
    mib += 1
949

    
950
  return mib
951

    
952

    
953
def _CalcEta(time_taken, written, total_size):
954
  """Calculates the ETA based on size written and total size.
955

956
  @param time_taken: The time taken so far
957
  @param written: amount written so far
958
  @param total_size: The total size of data to be written
959
  @return: The remaining time in seconds
960

961
  """
962
  avg_time = time_taken / float(written)
963
  return (total_size - written) * avg_time
964

    
965

    
966
def WipeDisks(lu, instance, disks=None):
967
  """Wipes instance disks.
968

969
  @type lu: L{LogicalUnit}
970
  @param lu: the logical unit on whose behalf we execute
971
  @type instance: L{objects.Instance}
972
  @param instance: the instance whose disks we should create
973
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
974
  @param disks: Disk details; tuple contains disk index, disk object and the
975
    start offset
976

977
  """
978
  node = instance.primary_node
979

    
980
  if disks is None:
981
    disks = [(idx, disk, 0)
982
             for (idx, disk) in enumerate(instance.disks)]
983

    
984
  for (_, device, _) in disks:
985
    lu.cfg.SetDiskID(device, node)
986

    
987
  logging.info("Pausing synchronization of disks of instance '%s'",
988
               instance.name)
989
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
990
                                                  (map(compat.snd, disks),
991
                                                   instance),
992
                                                  True)
993
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
994

    
995
  for idx, success in enumerate(result.payload):
996
    if not success:
997
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
998
                   " failed", idx, instance.name)
999

    
1000
  try:
1001
    for (idx, device, offset) in disks:
1002
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1003
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1004
      wipe_chunk_size = \
1005
        int(min(constants.MAX_WIPE_CHUNK,
1006
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1007

    
1008
      size = device.size
1009
      last_output = 0
1010
      start_time = time.time()
1011

    
1012
      if offset == 0:
1013
        info_text = ""
1014
      else:
1015
        info_text = (" (from %s to %s)" %
1016
                     (utils.FormatUnit(offset, "h"),
1017
                      utils.FormatUnit(size, "h")))
1018

    
1019
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1020

    
1021
      logging.info("Wiping disk %d for instance %s on node %s using"
1022
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1023

    
1024
      while offset < size:
1025
        wipe_size = min(wipe_chunk_size, size - offset)
1026

    
1027
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1028
                      idx, offset, wipe_size)
1029

    
1030
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1031
                                           wipe_size)
1032
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1033
                     (idx, offset, wipe_size))
1034

    
1035
        now = time.time()
1036
        offset += wipe_size
1037
        if now - last_output >= 60:
1038
          eta = _CalcEta(now - start_time, offset, size)
1039
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1040
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1041
          last_output = now
1042
  finally:
1043
    logging.info("Resuming synchronization of disks for instance '%s'",
1044
                 instance.name)
1045

    
1046
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1047
                                                    (map(compat.snd, disks),
1048
                                                     instance),
1049
                                                    False)
1050

    
1051
    if result.fail_msg:
1052
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1053
                    node, result.fail_msg)
1054
    else:
1055
      for idx, success in enumerate(result.payload):
1056
        if not success:
1057
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1058
                        " failed", idx, instance.name)
1059

    
1060

    
1061
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1062
  """Wrapper for L{WipeDisks} that handles errors.
1063

1064
  @type lu: L{LogicalUnit}
1065
  @param lu: the logical unit on whose behalf we execute
1066
  @type instance: L{objects.Instance}
1067
  @param instance: the instance whose disks we should wipe
1068
  @param disks: see L{WipeDisks}
1069
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1070
      case of error
1071
  @raise errors.OpPrereqError: in case of failure
1072

1073
  """
1074
  try:
1075
    WipeDisks(lu, instance, disks=disks)
1076
  except errors.OpExecError:
1077
    logging.warning("Wiping disks for instance '%s' failed",
1078
                    instance.name)
1079
    _UndoCreateDisks(lu, cleanup)
1080
    raise
1081

    
1082

    
1083
def ExpandCheckDisks(instance, disks):
1084
  """Return the instance disks selected by the disks list
1085

1086
  @type disks: list of L{objects.Disk} or None
1087
  @param disks: selected disks
1088
  @rtype: list of L{objects.Disk}
1089
  @return: selected instance disks to act on
1090

1091
  """
1092
  if disks is None:
1093
    return instance.disks
1094
  else:
1095
    if not set(disks).issubset(instance.disks):
1096
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1097
                                   " target instance: expected a subset of %r,"
1098
                                   " got %r" % (instance.disks, disks))
1099
    return disks
1100

    
1101

    
1102
def WaitForSync(lu, instance, disks=None, oneshot=False):
1103
  """Sleep and poll for an instance's disk to sync.
1104

1105
  """
1106
  if not instance.disks or disks is not None and not disks:
1107
    return True
1108

    
1109
  disks = ExpandCheckDisks(instance, disks)
1110

    
1111
  if not oneshot:
1112
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1113

    
1114
  node = instance.primary_node
1115

    
1116
  for dev in disks:
1117
    lu.cfg.SetDiskID(dev, node)
1118

    
1119
  # TODO: Convert to utils.Retry
1120

    
1121
  retries = 0
1122
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1123
  while True:
1124
    max_time = 0
1125
    done = True
1126
    cumul_degraded = False
1127
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1128
    msg = rstats.fail_msg
1129
    if msg:
1130
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1131
      retries += 1
1132
      if retries >= 10:
1133
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1134
                                 " aborting." % node)
1135
      time.sleep(6)
1136
      continue
1137
    rstats = rstats.payload
1138
    retries = 0
1139
    for i, mstat in enumerate(rstats):
1140
      if mstat is None:
1141
        lu.LogWarning("Can't compute data for node %s/%s",
1142
                      node, disks[i].iv_name)
1143
        continue
1144

    
1145
      cumul_degraded = (cumul_degraded or
1146
                        (mstat.is_degraded and mstat.sync_percent is None))
1147
      if mstat.sync_percent is not None:
1148
        done = False
1149
        if mstat.estimated_time is not None:
1150
          rem_time = ("%s remaining (estimated)" %
1151
                      utils.FormatSeconds(mstat.estimated_time))
1152
          max_time = mstat.estimated_time
1153
        else:
1154
          rem_time = "no time estimate"
1155
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1156
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1157

    
1158
    # if we're done but degraded, let's do a few small retries, to
1159
    # make sure we see a stable and not transient situation; therefore
1160
    # we force restart of the loop
1161
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1162
      logging.info("Degraded disks found, %d retries left", degr_retries)
1163
      degr_retries -= 1
1164
      time.sleep(1)
1165
      continue
1166

    
1167
    if done or oneshot:
1168
      break
1169

    
1170
    time.sleep(min(60, max_time))
1171

    
1172
  if done:
1173
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1174

    
1175
  return not cumul_degraded
1176

    
1177

    
1178
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1179
  """Shutdown block devices of an instance.
1180

1181
  This does the shutdown on all nodes of the instance.
1182

1183
  If the ignore_primary is false, errors on the primary node are
1184
  ignored.
1185

1186
  """
1187
  lu.cfg.MarkInstanceDisksInactive(instance.name)
1188
  all_result = True
1189
  disks = ExpandCheckDisks(instance, disks)
1190

    
1191
  for disk in disks:
1192
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1193
      lu.cfg.SetDiskID(top_disk, node)
1194
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1195
      msg = result.fail_msg
1196
      if msg:
1197
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1198
                      disk.iv_name, node, msg)
1199
        if ((node == instance.primary_node and not ignore_primary) or
1200
            (node != instance.primary_node and not result.offline)):
1201
          all_result = False
1202
  return all_result
1203

    
1204

    
1205
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1206
  """Shutdown block devices of an instance.
1207

1208
  This function checks if an instance is running, before calling
1209
  _ShutdownInstanceDisks.
1210

1211
  """
1212
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1213
  ShutdownInstanceDisks(lu, instance, disks=disks)
1214

    
1215

    
1216
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1217
                           ignore_size=False):
1218
  """Prepare the block devices for an instance.
1219

1220
  This sets up the block devices on all nodes.
1221

1222
  @type lu: L{LogicalUnit}
1223
  @param lu: the logical unit on whose behalf we execute
1224
  @type instance: L{objects.Instance}
1225
  @param instance: the instance for whose disks we assemble
1226
  @type disks: list of L{objects.Disk} or None
1227
  @param disks: which disks to assemble (or all, if None)
1228
  @type ignore_secondaries: boolean
1229
  @param ignore_secondaries: if true, errors on secondary nodes
1230
      won't result in an error return from the function
1231
  @type ignore_size: boolean
1232
  @param ignore_size: if true, the current known size of the disk
1233
      will not be used during the disk activation, useful for cases
1234
      when the size is wrong
1235
  @return: False if the operation failed, otherwise a list of
1236
      (host, instance_visible_name, node_visible_name)
1237
      with the mapping from node devices to instance devices
1238

1239
  """
1240
  device_info = []
1241
  disks_ok = True
1242
  iname = instance.name
1243
  disks = ExpandCheckDisks(instance, disks)
1244

    
1245
  # With the two passes mechanism we try to reduce the window of
1246
  # opportunity for the race condition of switching DRBD to primary
1247
  # before handshaking occured, but we do not eliminate it
1248

    
1249
  # The proper fix would be to wait (with some limits) until the
1250
  # connection has been made and drbd transitions from WFConnection
1251
  # into any other network-connected state (Connected, SyncTarget,
1252
  # SyncSource, etc.)
1253

    
1254
  # mark instance disks as active before doing actual work, so watcher does
1255
  # not try to shut them down erroneously
1256
  lu.cfg.MarkInstanceDisksActive(iname)
1257

    
1258
  # 1st pass, assemble on all nodes in secondary mode
1259
  for idx, inst_disk in enumerate(disks):
1260
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1261
      if ignore_size:
1262
        node_disk = node_disk.Copy()
1263
        node_disk.UnsetSize()
1264
      lu.cfg.SetDiskID(node_disk, node)
1265
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1266
                                             False, idx)
1267
      msg = result.fail_msg
1268
      if msg:
1269
        is_offline_secondary = (node in instance.secondary_nodes and
1270
                                result.offline)
1271
        lu.LogWarning("Could not prepare block device %s on node %s"
1272
                      " (is_primary=False, pass=1): %s",
1273
                      inst_disk.iv_name, node, msg)
1274
        if not (ignore_secondaries or is_offline_secondary):
1275
          disks_ok = False
1276

    
1277
  # FIXME: race condition on drbd migration to primary
1278

    
1279
  # 2nd pass, do only the primary node
1280
  for idx, inst_disk in enumerate(disks):
1281
    dev_path = None
1282

    
1283
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1284
      if node != instance.primary_node:
1285
        continue
1286
      if ignore_size:
1287
        node_disk = node_disk.Copy()
1288
        node_disk.UnsetSize()
1289
      lu.cfg.SetDiskID(node_disk, node)
1290
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1291
                                             True, idx)
1292
      msg = result.fail_msg
1293
      if msg:
1294
        lu.LogWarning("Could not prepare block device %s on node %s"
1295
                      " (is_primary=True, pass=2): %s",
1296
                      inst_disk.iv_name, node, msg)
1297
        disks_ok = False
1298
      else:
1299
        dev_path = result.payload
1300

    
1301
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1302

    
1303
  # leave the disks configured for the primary node
1304
  # this is a workaround that would be fixed better by
1305
  # improving the logical/physical id handling
1306
  for disk in disks:
1307
    lu.cfg.SetDiskID(disk, instance.primary_node)
1308

    
1309
  if not disks_ok:
1310
    lu.cfg.MarkInstanceDisksInactive(iname)
1311

    
1312
  return disks_ok, device_info
1313

    
1314

    
1315
def StartInstanceDisks(lu, instance, force):
1316
  """Start the disks of an instance.
1317

1318
  """
1319
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1320
                                      ignore_secondaries=force)
1321
  if not disks_ok:
1322
    ShutdownInstanceDisks(lu, instance)
1323
    if force is not None and not force:
1324
      lu.LogWarning("",
1325
                    hint=("If the message above refers to a secondary node,"
1326
                          " you can retry the operation using '--force'"))
1327
    raise errors.OpExecError("Disk consistency error")
1328

    
1329

    
1330
class LUInstanceGrowDisk(LogicalUnit):
1331
  """Grow a disk of an instance.
1332

1333
  """
1334
  HPATH = "disk-grow"
1335
  HTYPE = constants.HTYPE_INSTANCE
1336
  REQ_BGL = False
1337

    
1338
  def ExpandNames(self):
1339
    self._ExpandAndLockInstance()
1340
    self.needed_locks[locking.LEVEL_NODE] = []
1341
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1342
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1343
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1344

    
1345
  def DeclareLocks(self, level):
1346
    if level == locking.LEVEL_NODE:
1347
      self._LockInstancesNodes()
1348
    elif level == locking.LEVEL_NODE_RES:
1349
      # Copy node locks
1350
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1351
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1352

    
1353
  def BuildHooksEnv(self):
1354
    """Build hooks env.
1355

1356
    This runs on the master, the primary and all the secondaries.
1357

1358
    """
1359
    env = {
1360
      "DISK": self.op.disk,
1361
      "AMOUNT": self.op.amount,
1362
      "ABSOLUTE": self.op.absolute,
1363
      }
1364
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1365
    return env
1366

    
1367
  def BuildHooksNodes(self):
1368
    """Build hooks nodes.
1369

1370
    """
1371
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1372
    return (nl, nl)
1373

    
1374
  def CheckPrereq(self):
1375
    """Check prerequisites.
1376

1377
    This checks that the instance is in the cluster.
1378

1379
    """
1380
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1381
    assert instance is not None, \
1382
      "Cannot retrieve locked instance %s" % self.op.instance_name
1383
    nodenames = list(instance.all_nodes)
1384
    for node in nodenames:
1385
      CheckNodeOnline(self, node)
1386

    
1387
    self.instance = instance
1388

    
1389
    if instance.disk_template not in constants.DTS_GROWABLE:
1390
      raise errors.OpPrereqError("Instance's disk layout does not support"
1391
                                 " growing", errors.ECODE_INVAL)
1392

    
1393
    self.disk = instance.FindDisk(self.op.disk)
1394

    
1395
    if self.op.absolute:
1396
      self.target = self.op.amount
1397
      self.delta = self.target - self.disk.size
1398
      if self.delta < 0:
1399
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1400
                                   "current disk size (%s)" %
1401
                                   (utils.FormatUnit(self.target, "h"),
1402
                                    utils.FormatUnit(self.disk.size, "h")),
1403
                                   errors.ECODE_STATE)
1404
    else:
1405
      self.delta = self.op.amount
1406
      self.target = self.disk.size + self.delta
1407
      if self.delta < 0:
1408
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1409
                                   utils.FormatUnit(self.delta, "h"),
1410
                                   errors.ECODE_INVAL)
1411

    
1412
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1413

    
1414
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1415
    template = self.instance.disk_template
1416
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1417
      # TODO: check the free disk space for file, when that feature will be
1418
      # supported
1419
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1420
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1421
                        nodes)
1422
      if es_nodes:
1423
        # With exclusive storage we need to something smarter than just looking
1424
        # at free space; for now, let's simply abort the operation.
1425
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1426
                                   " is enabled", errors.ECODE_STATE)
1427
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1428

    
1429
  def Exec(self, feedback_fn):
1430
    """Execute disk grow.
1431

1432
    """
1433
    instance = self.instance
1434
    disk = self.disk
1435

    
1436
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1437
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1438
            self.owned_locks(locking.LEVEL_NODE_RES))
1439

    
1440
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1441

    
1442
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1443
    if not disks_ok:
1444
      raise errors.OpExecError("Cannot activate block device to grow")
1445

    
1446
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1447
                (self.op.disk, instance.name,
1448
                 utils.FormatUnit(self.delta, "h"),
1449
                 utils.FormatUnit(self.target, "h")))
1450

    
1451
    # First run all grow ops in dry-run mode
1452
    for node in instance.all_nodes:
1453
      self.cfg.SetDiskID(disk, node)
1454
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1455
                                           True, True)
1456
      result.Raise("Dry-run grow request failed to node %s" % node)
1457

    
1458
    if wipe_disks:
1459
      # Get disk size from primary node for wiping
1460
      result = self.rpc.call_blockdev_getdimensions(instance.primary_node,
1461
                                                    [disk])
1462
      result.Raise("Failed to retrieve disk size from node '%s'" %
1463
                   instance.primary_node)
1464

    
1465
      (disk_dimensions, ) = result.payload
1466

    
1467
      if disk_dimensions is None:
1468
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1469
                                 " node '%s'" % instance.primary_node)
1470
      (disk_size_in_bytes, _) = disk_dimensions
1471

    
1472
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1473

    
1474
      assert old_disk_size >= disk.size, \
1475
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1476
         (old_disk_size, disk.size))
1477
    else:
1478
      old_disk_size = None
1479

    
1480
    # We know that (as far as we can test) operations across different
1481
    # nodes will succeed, time to run it for real on the backing storage
1482
    for node in instance.all_nodes:
1483
      self.cfg.SetDiskID(disk, node)
1484
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1485
                                           False, True)
1486
      result.Raise("Grow request failed to node %s" % node)
1487

    
1488
    # And now execute it for logical storage, on the primary node
1489
    node = instance.primary_node
1490
    self.cfg.SetDiskID(disk, node)
1491
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1492
                                         False, False)
1493
    result.Raise("Grow request failed to node %s" % node)
1494

    
1495
    disk.RecordGrow(self.delta)
1496
    self.cfg.Update(instance, feedback_fn)
1497

    
1498
    # Changes have been recorded, release node lock
1499
    ReleaseLocks(self, locking.LEVEL_NODE)
1500

    
1501
    # Downgrade lock while waiting for sync
1502
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1503

    
1504
    assert wipe_disks ^ (old_disk_size is None)
1505

    
1506
    if wipe_disks:
1507
      assert instance.disks[self.op.disk] == disk
1508

    
1509
      # Wipe newly added disk space
1510
      WipeDisks(self, instance,
1511
                disks=[(self.op.disk, disk, old_disk_size)])
1512

    
1513
    if self.op.wait_for_sync:
1514
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1515
      if disk_abort:
1516
        self.LogWarning("Disk syncing has not returned a good status; check"
1517
                        " the instance")
1518
      if not instance.disks_active:
1519
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1520
    elif not instance.disks_active:
1521
      self.LogWarning("Not shutting down the disk even if the instance is"
1522
                      " not supposed to be running because no wait for"
1523
                      " sync mode was requested")
1524

    
1525
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1526
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1527

    
1528

    
1529
class LUInstanceReplaceDisks(LogicalUnit):
1530
  """Replace the disks of an instance.
1531

1532
  """
1533
  HPATH = "mirrors-replace"
1534
  HTYPE = constants.HTYPE_INSTANCE
1535
  REQ_BGL = False
1536

    
1537
  def CheckArguments(self):
1538
    """Check arguments.
1539

1540
    """
1541
    remote_node = self.op.remote_node
1542
    ialloc = self.op.iallocator
1543
    if self.op.mode == constants.REPLACE_DISK_CHG:
1544
      if remote_node is None and ialloc is None:
1545
        raise errors.OpPrereqError("When changing the secondary either an"
1546
                                   " iallocator script must be used or the"
1547
                                   " new node given", errors.ECODE_INVAL)
1548
      else:
1549
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1550

    
1551
    elif remote_node is not None or ialloc is not None:
1552
      # Not replacing the secondary
1553
      raise errors.OpPrereqError("The iallocator and new node options can"
1554
                                 " only be used when changing the"
1555
                                 " secondary node", errors.ECODE_INVAL)
1556

    
1557
  def ExpandNames(self):
1558
    self._ExpandAndLockInstance()
1559

    
1560
    assert locking.LEVEL_NODE not in self.needed_locks
1561
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1562
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1563

    
1564
    assert self.op.iallocator is None or self.op.remote_node is None, \
1565
      "Conflicting options"
1566

    
1567
    if self.op.remote_node is not None:
1568
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1569

    
1570
      # Warning: do not remove the locking of the new secondary here
1571
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1572
      # currently it doesn't since parallel invocations of
1573
      # FindUnusedMinor will conflict
1574
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1575
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1576
    else:
1577
      self.needed_locks[locking.LEVEL_NODE] = []
1578
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1579

    
1580
      if self.op.iallocator is not None:
1581
        # iallocator will select a new node in the same group
1582
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1583
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1584

    
1585
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1586

    
1587
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1588
                                   self.op.iallocator, self.op.remote_node,
1589
                                   self.op.disks, self.op.early_release,
1590
                                   self.op.ignore_ipolicy)
1591

    
1592
    self.tasklets = [self.replacer]
1593

    
1594
  def DeclareLocks(self, level):
1595
    if level == locking.LEVEL_NODEGROUP:
1596
      assert self.op.remote_node is None
1597
      assert self.op.iallocator is not None
1598
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1599

    
1600
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1601
      # Lock all groups used by instance optimistically; this requires going
1602
      # via the node before it's locked, requiring verification later on
1603
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1604
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1605

    
1606
    elif level == locking.LEVEL_NODE:
1607
      if self.op.iallocator is not None:
1608
        assert self.op.remote_node is None
1609
        assert not self.needed_locks[locking.LEVEL_NODE]
1610
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1611

    
1612
        # Lock member nodes of all locked groups
1613
        self.needed_locks[locking.LEVEL_NODE] = \
1614
          [node_name
1615
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1616
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1617
      else:
1618
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1619

    
1620
        self._LockInstancesNodes()
1621

    
1622
    elif level == locking.LEVEL_NODE_RES:
1623
      # Reuse node locks
1624
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1625
        self.needed_locks[locking.LEVEL_NODE]
1626

    
1627
  def BuildHooksEnv(self):
1628
    """Build hooks env.
1629

1630
    This runs on the master, the primary and all the secondaries.
1631

1632
    """
1633
    instance = self.replacer.instance
1634
    env = {
1635
      "MODE": self.op.mode,
1636
      "NEW_SECONDARY": self.op.remote_node,
1637
      "OLD_SECONDARY": instance.secondary_nodes[0],
1638
      }
1639
    env.update(BuildInstanceHookEnvByObject(self, instance))
1640
    return env
1641

    
1642
  def BuildHooksNodes(self):
1643
    """Build hooks nodes.
1644

1645
    """
1646
    instance = self.replacer.instance
1647
    nl = [
1648
      self.cfg.GetMasterNode(),
1649
      instance.primary_node,
1650
      ]
1651
    if self.op.remote_node is not None:
1652
      nl.append(self.op.remote_node)
1653
    return nl, nl
1654

    
1655
  def CheckPrereq(self):
1656
    """Check prerequisites.
1657

1658
    """
1659
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1660
            self.op.iallocator is None)
1661

    
1662
    # Verify if node group locks are still correct
1663
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1664
    if owned_groups:
1665
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1666

    
1667
    return LogicalUnit.CheckPrereq(self)
1668

    
1669

    
1670
class LUInstanceActivateDisks(NoHooksLU):
1671
  """Bring up an instance's disks.
1672

1673
  """
1674
  REQ_BGL = False
1675

    
1676
  def ExpandNames(self):
1677
    self._ExpandAndLockInstance()
1678
    self.needed_locks[locking.LEVEL_NODE] = []
1679
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1680

    
1681
  def DeclareLocks(self, level):
1682
    if level == locking.LEVEL_NODE:
1683
      self._LockInstancesNodes()
1684

    
1685
  def CheckPrereq(self):
1686
    """Check prerequisites.
1687

1688
    This checks that the instance is in the cluster.
1689

1690
    """
1691
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1692
    assert self.instance is not None, \
1693
      "Cannot retrieve locked instance %s" % self.op.instance_name
1694
    CheckNodeOnline(self, self.instance.primary_node)
1695

    
1696
  def Exec(self, feedback_fn):
1697
    """Activate the disks.
1698

1699
    """
1700
    disks_ok, disks_info = \
1701
              AssembleInstanceDisks(self, self.instance,
1702
                                    ignore_size=self.op.ignore_size)
1703
    if not disks_ok:
1704
      raise errors.OpExecError("Cannot activate block devices")
1705

    
1706
    if self.op.wait_for_sync:
1707
      if not WaitForSync(self, self.instance):
1708
        self.cfg.MarkInstanceDisksInactive(self.instance.name)
1709
        raise errors.OpExecError("Some disks of the instance are degraded!")
1710

    
1711
    return disks_info
1712

    
1713

    
1714
class LUInstanceDeactivateDisks(NoHooksLU):
1715
  """Shutdown an instance's disks.
1716

1717
  """
1718
  REQ_BGL = False
1719

    
1720
  def ExpandNames(self):
1721
    self._ExpandAndLockInstance()
1722
    self.needed_locks[locking.LEVEL_NODE] = []
1723
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1724

    
1725
  def DeclareLocks(self, level):
1726
    if level == locking.LEVEL_NODE:
1727
      self._LockInstancesNodes()
1728

    
1729
  def CheckPrereq(self):
1730
    """Check prerequisites.
1731

1732
    This checks that the instance is in the cluster.
1733

1734
    """
1735
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1736
    assert self.instance is not None, \
1737
      "Cannot retrieve locked instance %s" % self.op.instance_name
1738

    
1739
  def Exec(self, feedback_fn):
1740
    """Deactivate the disks
1741

1742
    """
1743
    instance = self.instance
1744
    if self.op.force:
1745
      ShutdownInstanceDisks(self, instance)
1746
    else:
1747
      _SafeShutdownInstanceDisks(self, instance)
1748

    
1749

    
1750
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1751
                               ldisk=False):
1752
  """Check that mirrors are not degraded.
1753

1754
  @attention: The device has to be annotated already.
1755

1756
  The ldisk parameter, if True, will change the test from the
1757
  is_degraded attribute (which represents overall non-ok status for
1758
  the device(s)) to the ldisk (representing the local storage status).
1759

1760
  """
1761
  lu.cfg.SetDiskID(dev, node)
1762

    
1763
  result = True
1764

    
1765
  if on_primary or dev.AssembleOnSecondary():
1766
    rstats = lu.rpc.call_blockdev_find(node, dev)
1767
    msg = rstats.fail_msg
1768
    if msg:
1769
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1770
      result = False
1771
    elif not rstats.payload:
1772
      lu.LogWarning("Can't find disk on node %s", node)
1773
      result = False
1774
    else:
1775
      if ldisk:
1776
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1777
      else:
1778
        result = result and not rstats.payload.is_degraded
1779

    
1780
  if dev.children:
1781
    for child in dev.children:
1782
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1783
                                                     on_primary)
1784

    
1785
  return result
1786

    
1787

    
1788
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1789
  """Wrapper around L{_CheckDiskConsistencyInner}.
1790

1791
  """
1792
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1793
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1794
                                    ldisk=ldisk)
1795

    
1796

    
1797
def _BlockdevFind(lu, node, dev, instance):
1798
  """Wrapper around call_blockdev_find to annotate diskparams.
1799

1800
  @param lu: A reference to the lu object
1801
  @param node: The node to call out
1802
  @param dev: The device to find
1803
  @param instance: The instance object the device belongs to
1804
  @returns The result of the rpc call
1805

1806
  """
1807
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1808
  return lu.rpc.call_blockdev_find(node, disk)
1809

    
1810

    
1811
def _GenerateUniqueNames(lu, exts):
1812
  """Generate a suitable LV name.
1813

1814
  This will generate a logical volume name for the given instance.
1815

1816
  """
1817
  results = []
1818
  for val in exts:
1819
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1820
    results.append("%s%s" % (new_id, val))
1821
  return results
1822

    
1823

    
1824
class TLReplaceDisks(Tasklet):
1825
  """Replaces disks for an instance.
1826

1827
  Note: Locking is not within the scope of this class.
1828

1829
  """
1830
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1831
               disks, early_release, ignore_ipolicy):
1832
    """Initializes this class.
1833

1834
    """
1835
    Tasklet.__init__(self, lu)
1836

    
1837
    # Parameters
1838
    self.instance_name = instance_name
1839
    self.mode = mode
1840
    self.iallocator_name = iallocator_name
1841
    self.remote_node = remote_node
1842
    self.disks = disks
1843
    self.early_release = early_release
1844
    self.ignore_ipolicy = ignore_ipolicy
1845

    
1846
    # Runtime data
1847
    self.instance = None
1848
    self.new_node = None
1849
    self.target_node = None
1850
    self.other_node = None
1851
    self.remote_node_info = None
1852
    self.node_secondary_ip = None
1853

    
1854
  @staticmethod
1855
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1856
    """Compute a new secondary node using an IAllocator.
1857

1858
    """
1859
    req = iallocator.IAReqRelocate(name=instance_name,
1860
                                   relocate_from=list(relocate_from))
1861
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1862

    
1863
    ial.Run(iallocator_name)
1864

    
1865
    if not ial.success:
1866
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1867
                                 " %s" % (iallocator_name, ial.info),
1868
                                 errors.ECODE_NORES)
1869

    
1870
    remote_node_name = ial.result[0]
1871

    
1872
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1873
               instance_name, remote_node_name)
1874

    
1875
    return remote_node_name
1876

    
1877
  def _FindFaultyDisks(self, node_name):
1878
    """Wrapper for L{FindFaultyInstanceDisks}.
1879

1880
    """
1881
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1882
                                   node_name, True)
1883

    
1884
  def _CheckDisksActivated(self, instance):
1885
    """Checks if the instance disks are activated.
1886

1887
    @param instance: The instance to check disks
1888
    @return: True if they are activated, False otherwise
1889

1890
    """
1891
    nodes = instance.all_nodes
1892

    
1893
    for idx, dev in enumerate(instance.disks):
1894
      for node in nodes:
1895
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1896
        self.cfg.SetDiskID(dev, node)
1897

    
1898
        result = _BlockdevFind(self, node, dev, instance)
1899

    
1900
        if result.offline:
1901
          continue
1902
        elif result.fail_msg or not result.payload:
1903
          return False
1904

    
1905
    return True
1906

    
1907
  def CheckPrereq(self):
1908
    """Check prerequisites.
1909

1910
    This checks that the instance is in the cluster.
1911

1912
    """
1913
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1914
    assert instance is not None, \
1915
      "Cannot retrieve locked instance %s" % self.instance_name
1916

    
1917
    if instance.disk_template != constants.DT_DRBD8:
1918
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1919
                                 " instances", errors.ECODE_INVAL)
1920

    
1921
    if len(instance.secondary_nodes) != 1:
1922
      raise errors.OpPrereqError("The instance has a strange layout,"
1923
                                 " expected one secondary but found %d" %
1924
                                 len(instance.secondary_nodes),
1925
                                 errors.ECODE_FAULT)
1926

    
1927
    instance = self.instance
1928
    secondary_node = instance.secondary_nodes[0]
1929

    
1930
    if self.iallocator_name is None:
1931
      remote_node = self.remote_node
1932
    else:
1933
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1934
                                       instance.name, instance.secondary_nodes)
1935

    
1936
    if remote_node is None:
1937
      self.remote_node_info = None
1938
    else:
1939
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1940
             "Remote node '%s' is not locked" % remote_node
1941

    
1942
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1943
      assert self.remote_node_info is not None, \
1944
        "Cannot retrieve locked node %s" % remote_node
1945

    
1946
    if remote_node == self.instance.primary_node:
1947
      raise errors.OpPrereqError("The specified node is the primary node of"
1948
                                 " the instance", errors.ECODE_INVAL)
1949

    
1950
    if remote_node == secondary_node:
1951
      raise errors.OpPrereqError("The specified node is already the"
1952
                                 " secondary node of the instance",
1953
                                 errors.ECODE_INVAL)
1954

    
1955
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1956
                                    constants.REPLACE_DISK_CHG):
1957
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1958
                                 errors.ECODE_INVAL)
1959

    
1960
    if self.mode == constants.REPLACE_DISK_AUTO:
1961
      if not self._CheckDisksActivated(instance):
1962
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1963
                                   " first" % self.instance_name,
1964
                                   errors.ECODE_STATE)
1965
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1966
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1967

    
1968
      if faulty_primary and faulty_secondary:
1969
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1970
                                   " one node and can not be repaired"
1971
                                   " automatically" % self.instance_name,
1972
                                   errors.ECODE_STATE)
1973

    
1974
      if faulty_primary:
1975
        self.disks = faulty_primary
1976
        self.target_node = instance.primary_node
1977
        self.other_node = secondary_node
1978
        check_nodes = [self.target_node, self.other_node]
1979
      elif faulty_secondary:
1980
        self.disks = faulty_secondary
1981
        self.target_node = secondary_node
1982
        self.other_node = instance.primary_node
1983
        check_nodes = [self.target_node, self.other_node]
1984
      else:
1985
        self.disks = []
1986
        check_nodes = []
1987

    
1988
    else:
1989
      # Non-automatic modes
1990
      if self.mode == constants.REPLACE_DISK_PRI:
1991
        self.target_node = instance.primary_node
1992
        self.other_node = secondary_node
1993
        check_nodes = [self.target_node, self.other_node]
1994

    
1995
      elif self.mode == constants.REPLACE_DISK_SEC:
1996
        self.target_node = secondary_node
1997
        self.other_node = instance.primary_node
1998
        check_nodes = [self.target_node, self.other_node]
1999

    
2000
      elif self.mode == constants.REPLACE_DISK_CHG:
2001
        self.new_node = remote_node
2002
        self.other_node = instance.primary_node
2003
        self.target_node = secondary_node
2004
        check_nodes = [self.new_node, self.other_node]
2005

    
2006
        CheckNodeNotDrained(self.lu, remote_node)
2007
        CheckNodeVmCapable(self.lu, remote_node)
2008

    
2009
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
2010
        assert old_node_info is not None
2011
        if old_node_info.offline and not self.early_release:
2012
          # doesn't make sense to delay the release
2013
          self.early_release = True
2014
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2015
                          " early-release mode", secondary_node)
2016

    
2017
      else:
2018
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2019
                                     self.mode)
2020

    
2021
      # If not specified all disks should be replaced
2022
      if not self.disks:
2023
        self.disks = range(len(self.instance.disks))
2024

    
2025
    # TODO: This is ugly, but right now we can't distinguish between internal
2026
    # submitted opcode and external one. We should fix that.
2027
    if self.remote_node_info:
2028
      # We change the node, lets verify it still meets instance policy
2029
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2030
      cluster = self.cfg.GetClusterInfo()
2031
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2032
                                                              new_group_info)
2033
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2034
                             self.cfg, ignore=self.ignore_ipolicy)
2035

    
2036
    for node in check_nodes:
2037
      CheckNodeOnline(self.lu, node)
2038

    
2039
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2040
                                                          self.other_node,
2041
                                                          self.target_node]
2042
                              if node_name is not None)
2043

    
2044
    # Release unneeded node and node resource locks
2045
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2046
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2047
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2048

    
2049
    # Release any owned node group
2050
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2051

    
2052
    # Check whether disks are valid
2053
    for disk_idx in self.disks:
2054
      instance.FindDisk(disk_idx)
2055

    
2056
    # Get secondary node IP addresses
2057
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2058
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2059

    
2060
  def Exec(self, feedback_fn):
2061
    """Execute disk replacement.
2062

2063
    This dispatches the disk replacement to the appropriate handler.
2064

2065
    """
2066
    if __debug__:
2067
      # Verify owned locks before starting operation
2068
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2069
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2070
          ("Incorrect node locks, owning %s, expected %s" %
2071
           (owned_nodes, self.node_secondary_ip.keys()))
2072
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2073
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2074
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2075

    
2076
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2077
      assert list(owned_instances) == [self.instance_name], \
2078
          "Instance '%s' not locked" % self.instance_name
2079

    
2080
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2081
          "Should not own any node group lock at this point"
2082

    
2083
    if not self.disks:
2084
      feedback_fn("No disks need replacement for instance '%s'" %
2085
                  self.instance.name)
2086
      return
2087

    
2088
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2089
                (utils.CommaJoin(self.disks), self.instance.name))
2090
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2091
    feedback_fn("Current seconary node: %s" %
2092
                utils.CommaJoin(self.instance.secondary_nodes))
2093

    
2094
    activate_disks = not self.instance.disks_active
2095

    
2096
    # Activate the instance disks if we're replacing them on a down instance
2097
    if activate_disks:
2098
      StartInstanceDisks(self.lu, self.instance, True)
2099

    
2100
    try:
2101
      # Should we replace the secondary node?
2102
      if self.new_node is not None:
2103
        fn = self._ExecDrbd8Secondary
2104
      else:
2105
        fn = self._ExecDrbd8DiskOnly
2106

    
2107
      result = fn(feedback_fn)
2108
    finally:
2109
      # Deactivate the instance disks if we're replacing them on a
2110
      # down instance
2111
      if activate_disks:
2112
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2113

    
2114
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2115

    
2116
    if __debug__:
2117
      # Verify owned locks
2118
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2119
      nodes = frozenset(self.node_secondary_ip)
2120
      assert ((self.early_release and not owned_nodes) or
2121
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2122
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2123
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2124

    
2125
    return result
2126

    
2127
  def _CheckVolumeGroup(self, nodes):
2128
    self.lu.LogInfo("Checking volume groups")
2129

    
2130
    vgname = self.cfg.GetVGName()
2131

    
2132
    # Make sure volume group exists on all involved nodes
2133
    results = self.rpc.call_vg_list(nodes)
2134
    if not results:
2135
      raise errors.OpExecError("Can't list volume groups on the nodes")
2136

    
2137
    for node in nodes:
2138
      res = results[node]
2139
      res.Raise("Error checking node %s" % node)
2140
      if vgname not in res.payload:
2141
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2142
                                 (vgname, node))
2143

    
2144
  def _CheckDisksExistence(self, nodes):
2145
    # Check disk existence
2146
    for idx, dev in enumerate(self.instance.disks):
2147
      if idx not in self.disks:
2148
        continue
2149

    
2150
      for node in nodes:
2151
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2152
        self.cfg.SetDiskID(dev, node)
2153

    
2154
        result = _BlockdevFind(self, node, dev, self.instance)
2155

    
2156
        msg = result.fail_msg
2157
        if msg or not result.payload:
2158
          if not msg:
2159
            msg = "disk not found"
2160
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2161
                                   (idx, node, msg))
2162

    
2163
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2164
    for idx, dev in enumerate(self.instance.disks):
2165
      if idx not in self.disks:
2166
        continue
2167

    
2168
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2169
                      (idx, node_name))
2170

    
2171
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2172
                                  on_primary, ldisk=ldisk):
2173
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2174
                                 " replace disks for instance %s" %
2175
                                 (node_name, self.instance.name))
2176

    
2177
  def _CreateNewStorage(self, node_name):
2178
    """Create new storage on the primary or secondary node.
2179

2180
    This is only used for same-node replaces, not for changing the
2181
    secondary node, hence we don't want to modify the existing disk.
2182

2183
    """
2184
    iv_names = {}
2185

    
2186
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2187
    for idx, dev in enumerate(disks):
2188
      if idx not in self.disks:
2189
        continue
2190

    
2191
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2192

    
2193
      self.cfg.SetDiskID(dev, node_name)
2194

    
2195
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2196
      names = _GenerateUniqueNames(self.lu, lv_names)
2197

    
2198
      (data_disk, meta_disk) = dev.children
2199
      vg_data = data_disk.logical_id[0]
2200
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2201
                             logical_id=(vg_data, names[0]),
2202
                             params=data_disk.params)
2203
      vg_meta = meta_disk.logical_id[0]
2204
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2205
                             size=constants.DRBD_META_SIZE,
2206
                             logical_id=(vg_meta, names[1]),
2207
                             params=meta_disk.params)
2208

    
2209
      new_lvs = [lv_data, lv_meta]
2210
      old_lvs = [child.Copy() for child in dev.children]
2211
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2212
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2213

    
2214
      # we pass force_create=True to force the LVM creation
2215
      for new_lv in new_lvs:
2216
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2217
                             GetInstanceInfoText(self.instance), False,
2218
                             excl_stor)
2219

    
2220
    return iv_names
2221

    
2222
  def _CheckDevices(self, node_name, iv_names):
2223
    for name, (dev, _, _) in iv_names.iteritems():
2224
      self.cfg.SetDiskID(dev, node_name)
2225

    
2226
      result = _BlockdevFind(self, node_name, dev, self.instance)
2227

    
2228
      msg = result.fail_msg
2229
      if msg or not result.payload:
2230
        if not msg:
2231
          msg = "disk not found"
2232
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2233
                                 (name, msg))
2234

    
2235
      if result.payload.is_degraded:
2236
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2237

    
2238
  def _RemoveOldStorage(self, node_name, iv_names):
2239
    for name, (_, old_lvs, _) in iv_names.iteritems():
2240
      self.lu.LogInfo("Remove logical volumes for %s", name)
2241

    
2242
      for lv in old_lvs:
2243
        self.cfg.SetDiskID(lv, node_name)
2244

    
2245
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2246
        if msg:
2247
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2248
                             hint="remove unused LVs manually")
2249

    
2250
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2251
    """Replace a disk on the primary or secondary for DRBD 8.
2252

2253
    The algorithm for replace is quite complicated:
2254

2255
      1. for each disk to be replaced:
2256

2257
        1. create new LVs on the target node with unique names
2258
        1. detach old LVs from the drbd device
2259
        1. rename old LVs to name_replaced.<time_t>
2260
        1. rename new LVs to old LVs
2261
        1. attach the new LVs (with the old names now) to the drbd device
2262

2263
      1. wait for sync across all devices
2264

2265
      1. for each modified disk:
2266

2267
        1. remove old LVs (which have the name name_replaces.<time_t>)
2268

2269
    Failures are not very well handled.
2270

2271
    """
2272
    steps_total = 6
2273

    
2274
    # Step: check device activation
2275
    self.lu.LogStep(1, steps_total, "Check device existence")
2276
    self._CheckDisksExistence([self.other_node, self.target_node])
2277
    self._CheckVolumeGroup([self.target_node, self.other_node])
2278

    
2279
    # Step: check other node consistency
2280
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2281
    self._CheckDisksConsistency(self.other_node,
2282
                                self.other_node == self.instance.primary_node,
2283
                                False)
2284

    
2285
    # Step: create new storage
2286
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2287
    iv_names = self._CreateNewStorage(self.target_node)
2288

    
2289
    # Step: for each lv, detach+rename*2+attach
2290
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2291
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2292
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2293

    
2294
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2295
                                                     old_lvs)
2296
      result.Raise("Can't detach drbd from local storage on node"
2297
                   " %s for device %s" % (self.target_node, dev.iv_name))
2298
      #dev.children = []
2299
      #cfg.Update(instance)
2300

    
2301
      # ok, we created the new LVs, so now we know we have the needed
2302
      # storage; as such, we proceed on the target node to rename
2303
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2304
      # using the assumption that logical_id == physical_id (which in
2305
      # turn is the unique_id on that node)
2306

    
2307
      # FIXME(iustin): use a better name for the replaced LVs
2308
      temp_suffix = int(time.time())
2309
      ren_fn = lambda d, suff: (d.physical_id[0],
2310
                                d.physical_id[1] + "_replaced-%s" % suff)
2311

    
2312
      # Build the rename list based on what LVs exist on the node
2313
      rename_old_to_new = []
2314
      for to_ren in old_lvs:
2315
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2316
        if not result.fail_msg and result.payload:
2317
          # device exists
2318
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2319

    
2320
      self.lu.LogInfo("Renaming the old LVs on the target node")
2321
      result = self.rpc.call_blockdev_rename(self.target_node,
2322
                                             rename_old_to_new)
2323
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2324

    
2325
      # Now we rename the new LVs to the old LVs
2326
      self.lu.LogInfo("Renaming the new LVs on the target node")
2327
      rename_new_to_old = [(new, old.physical_id)
2328
                           for old, new in zip(old_lvs, new_lvs)]
2329
      result = self.rpc.call_blockdev_rename(self.target_node,
2330
                                             rename_new_to_old)
2331
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2332

    
2333
      # Intermediate steps of in memory modifications
2334
      for old, new in zip(old_lvs, new_lvs):
2335
        new.logical_id = old.logical_id
2336
        self.cfg.SetDiskID(new, self.target_node)
2337

    
2338
      # We need to modify old_lvs so that removal later removes the
2339
      # right LVs, not the newly added ones; note that old_lvs is a
2340
      # copy here
2341
      for disk in old_lvs:
2342
        disk.logical_id = ren_fn(disk, temp_suffix)
2343
        self.cfg.SetDiskID(disk, self.target_node)
2344

    
2345
      # Now that the new lvs have the old name, we can add them to the device
2346
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2347
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2348
                                                  (dev, self.instance), new_lvs)
2349
      msg = result.fail_msg
2350
      if msg:
2351
        for new_lv in new_lvs:
2352
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2353
                                               new_lv).fail_msg
2354
          if msg2:
2355
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2356
                               hint=("cleanup manually the unused logical"
2357
                                     "volumes"))
2358
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2359

    
2360
    cstep = itertools.count(5)
2361

    
2362
    if self.early_release:
2363
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2364
      self._RemoveOldStorage(self.target_node, iv_names)
2365
      # TODO: Check if releasing locks early still makes sense
2366
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2367
    else:
2368
      # Release all resource locks except those used by the instance
2369
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2370
                   keep=self.node_secondary_ip.keys())
2371

    
2372
    # Release all node locks while waiting for sync
2373
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2374

    
2375
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2376
    # shutdown in the caller into consideration.
2377

    
2378
    # Wait for sync
2379
    # This can fail as the old devices are degraded and _WaitForSync
2380
    # does a combined result over all disks, so we don't check its return value
2381
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2382
    WaitForSync(self.lu, self.instance)
2383

    
2384
    # Check all devices manually
2385
    self._CheckDevices(self.instance.primary_node, iv_names)
2386

    
2387
    # Step: remove old storage
2388
    if not self.early_release:
2389
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2390
      self._RemoveOldStorage(self.target_node, iv_names)
2391

    
2392
  def _ExecDrbd8Secondary(self, feedback_fn):
2393
    """Replace the secondary node for DRBD 8.
2394

2395
    The algorithm for replace is quite complicated:
2396
      - for all disks of the instance:
2397
        - create new LVs on the new node with same names
2398
        - shutdown the drbd device on the old secondary
2399
        - disconnect the drbd network on the primary
2400
        - create the drbd device on the new secondary
2401
        - network attach the drbd on the primary, using an artifice:
2402
          the drbd code for Attach() will connect to the network if it
2403
          finds a device which is connected to the good local disks but
2404
          not network enabled
2405
      - wait for sync across all devices
2406
      - remove all disks from the old secondary
2407

2408
    Failures are not very well handled.
2409

2410
    """
2411
    steps_total = 6
2412

    
2413
    pnode = self.instance.primary_node
2414

    
2415
    # Step: check device activation
2416
    self.lu.LogStep(1, steps_total, "Check device existence")
2417
    self._CheckDisksExistence([self.instance.primary_node])
2418
    self._CheckVolumeGroup([self.instance.primary_node])
2419

    
2420
    # Step: check other node consistency
2421
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2422
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2423

    
2424
    # Step: create new storage
2425
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2426
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2427
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2428
    for idx, dev in enumerate(disks):
2429
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2430
                      (self.new_node, idx))
2431
      # we pass force_create=True to force LVM creation
2432
      for new_lv in dev.children:
2433
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2434
                             True, GetInstanceInfoText(self.instance), False,
2435
                             excl_stor)
2436

    
2437
    # Step 4: dbrd minors and drbd setups changes
2438
    # after this, we must manually remove the drbd minors on both the
2439
    # error and the success paths
2440
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2441
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2442
                                         for dev in self.instance.disks],
2443
                                        self.instance.name)
2444
    logging.debug("Allocated minors %r", minors)
2445

    
2446
    iv_names = {}
2447
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2448
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2449
                      (self.new_node, idx))
2450
      # create new devices on new_node; note that we create two IDs:
2451
      # one without port, so the drbd will be activated without
2452
      # networking information on the new node at this stage, and one
2453
      # with network, for the latter activation in step 4
2454
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2455
      if self.instance.primary_node == o_node1:
2456
        p_minor = o_minor1
2457
      else:
2458
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2459
        p_minor = o_minor2
2460

    
2461
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2462
                      p_minor, new_minor, o_secret)
2463
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2464
                    p_minor, new_minor, o_secret)
2465

    
2466
      iv_names[idx] = (dev, dev.children, new_net_id)
2467
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2468
                    new_net_id)
2469
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2470
                              logical_id=new_alone_id,
2471
                              children=dev.children,
2472
                              size=dev.size,
2473
                              params={})
2474
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2475
                                            self.cfg)
2476
      try:
2477
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2478
                             anno_new_drbd,
2479
                             GetInstanceInfoText(self.instance), False,
2480
                             excl_stor)
2481
      except errors.GenericError:
2482
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2483
        raise
2484

    
2485
    # We have new devices, shutdown the drbd on the old secondary
2486
    for idx, dev in enumerate(self.instance.disks):
2487
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2488
      self.cfg.SetDiskID(dev, self.target_node)
2489
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2490
                                            (dev, self.instance)).fail_msg
2491
      if msg:
2492
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2493
                           "node: %s" % (idx, msg),
2494
                           hint=("Please cleanup this device manually as"
2495
                                 " soon as possible"))
2496

    
2497
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2498
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2499
                                               self.instance.disks)[pnode]
2500

    
2501
    msg = result.fail_msg
2502
    if msg:
2503
      # detaches didn't succeed (unlikely)
2504
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2505
      raise errors.OpExecError("Can't detach the disks from the network on"
2506
                               " old node: %s" % (msg,))
2507

    
2508
    # if we managed to detach at least one, we update all the disks of
2509
    # the instance to point to the new secondary
2510
    self.lu.LogInfo("Updating instance configuration")
2511
    for dev, _, new_logical_id in iv_names.itervalues():
2512
      dev.logical_id = new_logical_id
2513
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2514

    
2515
    self.cfg.Update(self.instance, feedback_fn)
2516

    
2517
    # Release all node locks (the configuration has been updated)
2518
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2519

    
2520
    # and now perform the drbd attach
2521
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2522
                    " (standalone => connected)")
2523
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2524
                                            self.new_node],
2525
                                           self.node_secondary_ip,
2526
                                           (self.instance.disks, self.instance),
2527
                                           self.instance.name,
2528
                                           False)
2529
    for to_node, to_result in result.items():
2530
      msg = to_result.fail_msg
2531
      if msg:
2532
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2533
                           to_node, msg,
2534
                           hint=("please do a gnt-instance info to see the"
2535
                                 " status of disks"))
2536

    
2537
    cstep = itertools.count(5)
2538

    
2539
    if self.early_release:
2540
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2541
      self._RemoveOldStorage(self.target_node, iv_names)
2542
      # TODO: Check if releasing locks early still makes sense
2543
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2544
    else:
2545
      # Release all resource locks except those used by the instance
2546
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2547
                   keep=self.node_secondary_ip.keys())
2548

    
2549
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2550
    # shutdown in the caller into consideration.
2551

    
2552
    # Wait for sync
2553
    # This can fail as the old devices are degraded and _WaitForSync
2554
    # does a combined result over all disks, so we don't check its return value
2555
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2556
    WaitForSync(self.lu, self.instance)
2557

    
2558
    # Check all devices manually
2559
    self._CheckDevices(self.instance.primary_node, iv_names)
2560

    
2561
    # Step: remove old storage
2562
    if not self.early_release:
2563
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2564
      self._RemoveOldStorage(self.target_node, iv_names)