Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 7c848a6a

History | View | Annotate | Download (94.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    for key in [
347
      constants.IDISK_METAVG,
348
      constants.IDISK_ADOPT,
349
      constants.IDISK_SPINDLES,
350
      ]:
351
      if key in disk:
352
        new_disk[key] = disk[key]
353

    
354
    # For extstorage, demand the `provider' option and add any
355
    # additional parameters (ext-params) to the dict
356
    if op.disk_template == constants.DT_EXT:
357
      if ext_provider:
358
        new_disk[constants.IDISK_PROVIDER] = ext_provider
359
        for key in disk:
360
          if key not in constants.IDISK_PARAMS:
361
            new_disk[key] = disk[key]
362
      else:
363
        raise errors.OpPrereqError("Missing provider for template '%s'" %
364
                                   constants.DT_EXT, errors.ECODE_INVAL)
365

    
366
    disks.append(new_disk)
367

    
368
  return disks
369

    
370

    
371
def CheckRADOSFreeSpace():
372
  """Compute disk size requirements inside the RADOS cluster.
373

374
  """
375
  # For the RADOS cluster we assume there is always enough space.
376
  pass
377

    
378

    
379
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380
                         iv_name, p_minor, s_minor):
381
  """Generate a drbd8 device complete with its children.
382

383
  """
384
  assert len(vgnames) == len(names) == 2
385
  port = lu.cfg.AllocatePort()
386
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387

    
388
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389
                          logical_id=(vgnames[0], names[0]),
390
                          params={})
391
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
393
                          size=constants.DRBD_META_SIZE,
394
                          logical_id=(vgnames[1], names[1]),
395
                          params={})
396
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398
                          logical_id=(primary, secondary, port,
399
                                      p_minor, s_minor,
400
                                      shared_secret),
401
                          children=[dev_data, dev_meta],
402
                          iv_name=iv_name, params={})
403
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404
  return drbd_dev
405

    
406

    
407
def GenerateDiskTemplate(
408
  lu, template_name, instance_name, primary_node, secondary_nodes,
409
  disk_info, file_storage_dir, file_driver, base_index,
410
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412
  """Generate the entire disk layout for a given template type.
413

414
  """
415
  vgname = lu.cfg.GetVGName()
416
  disk_count = len(disk_info)
417
  disks = []
418

    
419
  if template_name == constants.DT_DISKLESS:
420
    pass
421
  elif template_name == constants.DT_DRBD8:
422
    if len(secondary_nodes) != 1:
423
      raise errors.ProgrammerError("Wrong template configuration")
424
    remote_node = secondary_nodes[0]
425
    minors = lu.cfg.AllocateDRBDMinor(
426
      [primary_node, remote_node] * len(disk_info), instance_name)
427

    
428
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
429
                                                       full_disk_params)
430
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431

    
432
    names = []
433
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434
                                               for i in range(disk_count)]):
435
      names.append(lv_prefix + "_data")
436
      names.append(lv_prefix + "_meta")
437
    for idx, disk in enumerate(disk_info):
438
      disk_index = idx + base_index
439
      data_vg = disk.get(constants.IDISK_VG, vgname)
440
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442
                                      disk[constants.IDISK_SIZE],
443
                                      [data_vg, meta_vg],
444
                                      names[idx * 2:idx * 2 + 2],
445
                                      "disk/%d" % disk_index,
446
                                      minors[idx * 2], minors[idx * 2 + 1])
447
      disk_dev.mode = disk[constants.IDISK_MODE]
448
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
449
      disks.append(disk_dev)
450
  else:
451
    if secondary_nodes:
452
      raise errors.ProgrammerError("Wrong template configuration")
453

    
454
    if template_name == constants.DT_FILE:
455
      _req_file_storage()
456
    elif template_name == constants.DT_SHARED_FILE:
457
      _req_shr_file_storage()
458

    
459
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460
    if name_prefix is None:
461
      names = None
462
    else:
463
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464
                                        (name_prefix, base_index + i)
465
                                        for i in range(disk_count)])
466

    
467
    if template_name == constants.DT_PLAIN:
468

    
469
      def logical_id_fn(idx, _, disk):
470
        vg = disk.get(constants.IDISK_VG, vgname)
471
        return (vg, names[idx])
472

    
473
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474
      logical_id_fn = \
475
        lambda _, disk_index, disk: (file_driver,
476
                                     "%s/disk%d" % (file_storage_dir,
477
                                                    disk_index))
478
    elif template_name == constants.DT_BLOCK:
479
      logical_id_fn = \
480
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481
                                       disk[constants.IDISK_ADOPT])
482
    elif template_name == constants.DT_RBD:
483
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484
    elif template_name == constants.DT_EXT:
485
      def logical_id_fn(idx, _, disk):
486
        provider = disk.get(constants.IDISK_PROVIDER, None)
487
        if provider is None:
488
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489
                                       " not found", constants.DT_EXT,
490
                                       constants.IDISK_PROVIDER)
491
        return (provider, names[idx])
492
    else:
493
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494

    
495
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
496

    
497
    for idx, disk in enumerate(disk_info):
498
      params = {}
499
      # Only for the Ext template add disk_info to params
500
      if template_name == constants.DT_EXT:
501
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502
        for key in disk:
503
          if key not in constants.IDISK_PARAMS:
504
            params[key] = disk[key]
505
      disk_index = idx + base_index
506
      size = disk[constants.IDISK_SIZE]
507
      feedback_fn("* disk %s, size %s" %
508
                  (disk_index, utils.FormatUnit(size, "h")))
509
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
510
                              logical_id=logical_id_fn(idx, disk_index, disk),
511
                              iv_name="disk/%d" % disk_index,
512
                              mode=disk[constants.IDISK_MODE],
513
                              params=params,
514
                              spindles=disk.get(constants.IDISK_SPINDLES))
515
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
516
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517
      disks.append(disk_dev)
518

    
519
  return disks
520

    
521

    
522
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
523
  """Check the presence of the spindle options with exclusive_storage.
524

525
  @type diskdict: dict
526
  @param diskdict: disk parameters
527
  @type es_flag: bool
528
  @param es_flag: the effective value of the exlusive_storage flag
529
  @type required: bool
530
  @param required: whether spindles are required or just optional
531
  @raise errors.OpPrereqError when spindles are given and they should not
532

533
  """
534
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
535
      diskdict[constants.IDISK_SPINDLES] is not None):
536
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
537
                               " when exclusive storage is not active",
538
                               errors.ECODE_INVAL)
539
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
540
                                diskdict[constants.IDISK_SPINDLES] is None)):
541
    raise errors.OpPrereqError("You must specify spindles in instance disks"
542
                               " when exclusive storage is active",
543
                               errors.ECODE_INVAL)
544

    
545

    
546
class LUInstanceRecreateDisks(LogicalUnit):
547
  """Recreate an instance's missing disks.
548

549
  """
550
  HPATH = "instance-recreate-disks"
551
  HTYPE = constants.HTYPE_INSTANCE
552
  REQ_BGL = False
553

    
554
  _MODIFYABLE = compat.UniqueFrozenset([
555
    constants.IDISK_SIZE,
556
    constants.IDISK_MODE,
557
    constants.IDISK_SPINDLES,
558
    ])
559

    
560
  # New or changed disk parameters may have different semantics
561
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
562
    constants.IDISK_ADOPT,
563

    
564
    # TODO: Implement support changing VG while recreating
565
    constants.IDISK_VG,
566
    constants.IDISK_METAVG,
567
    constants.IDISK_PROVIDER,
568
    constants.IDISK_NAME,
569
    ]))
570

    
571
  def _RunAllocator(self):
572
    """Run the allocator based on input opcode.
573

574
    """
575
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
576

    
577
    # FIXME
578
    # The allocator should actually run in "relocate" mode, but current
579
    # allocators don't support relocating all the nodes of an instance at
580
    # the same time. As a workaround we use "allocate" mode, but this is
581
    # suboptimal for two reasons:
582
    # - The instance name passed to the allocator is present in the list of
583
    #   existing instances, so there could be a conflict within the
584
    #   internal structures of the allocator. This doesn't happen with the
585
    #   current allocators, but it's a liability.
586
    # - The allocator counts the resources used by the instance twice: once
587
    #   because the instance exists already, and once because it tries to
588
    #   allocate a new instance.
589
    # The allocator could choose some of the nodes on which the instance is
590
    # running, but that's not a problem. If the instance nodes are broken,
591
    # they should be already be marked as drained or offline, and hence
592
    # skipped by the allocator. If instance disks have been lost for other
593
    # reasons, then recreating the disks on the same nodes should be fine.
594
    disk_template = self.instance.disk_template
595
    spindle_use = be_full[constants.BE_SPINDLE_USE]
596
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
597
                                        disk_template=disk_template,
598
                                        tags=list(self.instance.GetTags()),
599
                                        os=self.instance.os,
600
                                        nics=[{}],
601
                                        vcpus=be_full[constants.BE_VCPUS],
602
                                        memory=be_full[constants.BE_MAXMEM],
603
                                        spindle_use=spindle_use,
604
                                        disks=[{constants.IDISK_SIZE: d.size,
605
                                                constants.IDISK_MODE: d.mode}
606
                                               for d in self.instance.disks],
607
                                        hypervisor=self.instance.hypervisor,
608
                                        node_whitelist=None)
609
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
610

    
611
    ial.Run(self.op.iallocator)
612

    
613
    assert req.RequiredNodes() == len(self.instance.all_nodes)
614

    
615
    if not ial.success:
616
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
617
                                 " %s" % (self.op.iallocator, ial.info),
618
                                 errors.ECODE_NORES)
619

    
620
    self.op.nodes = ial.result
621
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
622
                 self.op.instance_name, self.op.iallocator,
623
                 utils.CommaJoin(ial.result))
624

    
625
  def CheckArguments(self):
626
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
627
      # Normalize and convert deprecated list of disk indices
628
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
629

    
630
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
631
    if duplicates:
632
      raise errors.OpPrereqError("Some disks have been specified more than"
633
                                 " once: %s" % utils.CommaJoin(duplicates),
634
                                 errors.ECODE_INVAL)
635

    
636
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
637
    # when neither iallocator nor nodes are specified
638
    if self.op.iallocator or self.op.nodes:
639
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
640

    
641
    for (idx, params) in self.op.disks:
642
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
643
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
644
      if unsupported:
645
        raise errors.OpPrereqError("Parameters for disk %s try to change"
646
                                   " unmodifyable parameter(s): %s" %
647
                                   (idx, utils.CommaJoin(unsupported)),
648
                                   errors.ECODE_INVAL)
649

    
650
  def ExpandNames(self):
651
    self._ExpandAndLockInstance()
652
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
653

    
654
    if self.op.nodes:
655
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
656
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
657
    else:
658
      self.needed_locks[locking.LEVEL_NODE] = []
659
      if self.op.iallocator:
660
        # iallocator will select a new node in the same group
661
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
662
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
663

    
664
    self.needed_locks[locking.LEVEL_NODE_RES] = []
665

    
666
  def DeclareLocks(self, level):
667
    if level == locking.LEVEL_NODEGROUP:
668
      assert self.op.iallocator is not None
669
      assert not self.op.nodes
670
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
671
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
672
      # Lock the primary group used by the instance optimistically; this
673
      # requires going via the node before it's locked, requiring
674
      # verification later on
675
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
676
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
677

    
678
    elif level == locking.LEVEL_NODE:
679
      # If an allocator is used, then we lock all the nodes in the current
680
      # instance group, as we don't know yet which ones will be selected;
681
      # if we replace the nodes without using an allocator, locks are
682
      # already declared in ExpandNames; otherwise, we need to lock all the
683
      # instance nodes for disk re-creation
684
      if self.op.iallocator:
685
        assert not self.op.nodes
686
        assert not self.needed_locks[locking.LEVEL_NODE]
687
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
688

    
689
        # Lock member nodes of the group of the primary node
690
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
691
          self.needed_locks[locking.LEVEL_NODE].extend(
692
            self.cfg.GetNodeGroup(group_uuid).members)
693

    
694
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
695
      elif not self.op.nodes:
696
        self._LockInstancesNodes(primary_only=False)
697
    elif level == locking.LEVEL_NODE_RES:
698
      # Copy node locks
699
      self.needed_locks[locking.LEVEL_NODE_RES] = \
700
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
701

    
702
  def BuildHooksEnv(self):
703
    """Build hooks env.
704

705
    This runs on master, primary and secondary nodes of the instance.
706

707
    """
708
    return BuildInstanceHookEnvByObject(self, self.instance)
709

    
710
  def BuildHooksNodes(self):
711
    """Build hooks nodes.
712

713
    """
714
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
715
    return (nl, nl)
716

    
717
  def CheckPrereq(self):
718
    """Check prerequisites.
719

720
    This checks that the instance is in the cluster and is not running.
721

722
    """
723
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
724
    assert instance is not None, \
725
      "Cannot retrieve locked instance %s" % self.op.instance_name
726
    if self.op.nodes:
727
      if len(self.op.nodes) != len(instance.all_nodes):
728
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
729
                                   " %d replacement nodes were specified" %
730
                                   (instance.name, len(instance.all_nodes),
731
                                    len(self.op.nodes)),
732
                                   errors.ECODE_INVAL)
733
      assert instance.disk_template != constants.DT_DRBD8 or \
734
             len(self.op.nodes) == 2
735
      assert instance.disk_template != constants.DT_PLAIN or \
736
             len(self.op.nodes) == 1
737
      primary_node = self.op.nodes[0]
738
    else:
739
      primary_node = instance.primary_node
740
    if not self.op.iallocator:
741
      CheckNodeOnline(self, primary_node)
742

    
743
    if instance.disk_template == constants.DT_DISKLESS:
744
      raise errors.OpPrereqError("Instance '%s' has no disks" %
745
                                 self.op.instance_name, errors.ECODE_INVAL)
746

    
747
    # Verify if node group locks are still correct
748
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
749
    if owned_groups:
750
      # Node group locks are acquired only for the primary node (and only
751
      # when the allocator is used)
752
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
753
                              primary_only=True)
754

    
755
    # if we replace nodes *and* the old primary is offline, we don't
756
    # check the instance state
757
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
758
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
759
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
760
                         msg="cannot recreate disks")
761

    
762
    if self.op.disks:
763
      self.disks = dict(self.op.disks)
764
    else:
765
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
766

    
767
    maxidx = max(self.disks.keys())
768
    if maxidx >= len(instance.disks):
769
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
770
                                 errors.ECODE_INVAL)
771

    
772
    if ((self.op.nodes or self.op.iallocator) and
773
         sorted(self.disks.keys()) != range(len(instance.disks))):
774
      raise errors.OpPrereqError("Can't recreate disks partially and"
775
                                 " change the nodes at the same time",
776
                                 errors.ECODE_INVAL)
777

    
778
    self.instance = instance
779

    
780
    if self.op.iallocator:
781
      self._RunAllocator()
782
      # Release unneeded node and node resource locks
783
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
784
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
785
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
786

    
787
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
788

    
789
    if self.op.nodes:
790
      nodes = self.op.nodes
791
    else:
792
      nodes = instance.all_nodes
793
    excl_stor = compat.any(
794
      rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
795
      )
796
    for new_params in self.disks.values():
797
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
798

    
799
  def Exec(self, feedback_fn):
800
    """Recreate the disks.
801

802
    """
803
    instance = self.instance
804

    
805
    assert (self.owned_locks(locking.LEVEL_NODE) ==
806
            self.owned_locks(locking.LEVEL_NODE_RES))
807

    
808
    to_skip = []
809
    mods = [] # keeps track of needed changes
810

    
811
    for idx, disk in enumerate(instance.disks):
812
      try:
813
        changes = self.disks[idx]
814
      except KeyError:
815
        # Disk should not be recreated
816
        to_skip.append(idx)
817
        continue
818

    
819
      # update secondaries for disks, if needed
820
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
821
        # need to update the nodes and minors
822
        assert len(self.op.nodes) == 2
823
        assert len(disk.logical_id) == 6 # otherwise disk internals
824
                                         # have changed
825
        (_, _, old_port, _, _, old_secret) = disk.logical_id
826
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
827
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
828
                  new_minors[0], new_minors[1], old_secret)
829
        assert len(disk.logical_id) == len(new_id)
830
      else:
831
        new_id = None
832

    
833
      mods.append((idx, new_id, changes))
834

    
835
    # now that we have passed all asserts above, we can apply the mods
836
    # in a single run (to avoid partial changes)
837
    for idx, new_id, changes in mods:
838
      disk = instance.disks[idx]
839
      if new_id is not None:
840
        assert disk.dev_type == constants.LD_DRBD8
841
        disk.logical_id = new_id
842
      if changes:
843
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
844
                    mode=changes.get(constants.IDISK_MODE, None),
845
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
846

    
847
    # change primary node, if needed
848
    if self.op.nodes:
849
      instance.primary_node = self.op.nodes[0]
850
      self.LogWarning("Changing the instance's nodes, you will have to"
851
                      " remove any disks left on the older nodes manually")
852

    
853
    if self.op.nodes:
854
      self.cfg.Update(instance, feedback_fn)
855

    
856
    # All touched nodes must be locked
857
    mylocks = self.owned_locks(locking.LEVEL_NODE)
858
    assert mylocks.issuperset(frozenset(instance.all_nodes))
859
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
860

    
861
    # TODO: Release node locks before wiping, or explain why it's not possible
862
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
863
      wipedisks = [(idx, disk, 0)
864
                   for (idx, disk) in enumerate(instance.disks)
865
                   if idx not in to_skip]
866
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
867

    
868

    
869
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
870
  """Checks if nodes have enough free disk space in the specified VG.
871

872
  This function checks if all given nodes have the needed amount of
873
  free disk. In case any node has less disk or we cannot get the
874
  information from the node, this function raises an OpPrereqError
875
  exception.
876

877
  @type lu: C{LogicalUnit}
878
  @param lu: a logical unit from which we get configuration data
879
  @type nodenames: C{list}
880
  @param nodenames: the list of node names to check
881
  @type vg: C{str}
882
  @param vg: the volume group to check
883
  @type requested: C{int}
884
  @param requested: the amount of disk in MiB to check for
885
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
886
      or we cannot check the node
887

888
  """
889
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
890
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
891
  # the current functionality. Refactor to make it more flexible.
892
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
893
                                   es_flags)
894
  for node in nodenames:
895
    info = nodeinfo[node]
896
    info.Raise("Cannot get current information from node %s" % node,
897
               prereq=True, ecode=errors.ECODE_ENVIRON)
898
    (_, (vg_info, ), _) = info.payload
899
    vg_free = vg_info.get("vg_free", None)
900
    if not isinstance(vg_free, int):
901
      raise errors.OpPrereqError("Can't compute free disk space on node"
902
                                 " %s for vg %s, result was '%s'" %
903
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
904
    if requested > vg_free:
905
      raise errors.OpPrereqError("Not enough disk space on target node %s"
906
                                 " vg %s: required %d MiB, available %d MiB" %
907
                                 (node, vg, requested, vg_free),
908
                                 errors.ECODE_NORES)
909

    
910

    
911
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
912
  """Checks if nodes have enough free disk space in all the VGs.
913

914
  This function checks if all given nodes have the needed amount of
915
  free disk. In case any node has less disk or we cannot get the
916
  information from the node, this function raises an OpPrereqError
917
  exception.
918

919
  @type lu: C{LogicalUnit}
920
  @param lu: a logical unit from which we get configuration data
921
  @type nodenames: C{list}
922
  @param nodenames: the list of node names to check
923
  @type req_sizes: C{dict}
924
  @param req_sizes: the hash of vg and corresponding amount of disk in
925
      MiB to check for
926
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
927
      or we cannot check the node
928

929
  """
930
  for vg, req_size in req_sizes.items():
931
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
932

    
933

    
934
def _DiskSizeInBytesToMebibytes(lu, size):
935
  """Converts a disk size in bytes to mebibytes.
936

937
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
938

939
  """
940
  (mib, remainder) = divmod(size, 1024 * 1024)
941

    
942
  if remainder != 0:
943
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
944
                  " to not overwrite existing data (%s bytes will not be"
945
                  " wiped)", (1024 * 1024) - remainder)
946
    mib += 1
947

    
948
  return mib
949

    
950

    
951
def _CalcEta(time_taken, written, total_size):
952
  """Calculates the ETA based on size written and total size.
953

954
  @param time_taken: The time taken so far
955
  @param written: amount written so far
956
  @param total_size: The total size of data to be written
957
  @return: The remaining time in seconds
958

959
  """
960
  avg_time = time_taken / float(written)
961
  return (total_size - written) * avg_time
962

    
963

    
964
def WipeDisks(lu, instance, disks=None):
965
  """Wipes instance disks.
966

967
  @type lu: L{LogicalUnit}
968
  @param lu: the logical unit on whose behalf we execute
969
  @type instance: L{objects.Instance}
970
  @param instance: the instance whose disks we should create
971
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
972
  @param disks: Disk details; tuple contains disk index, disk object and the
973
    start offset
974

975
  """
976
  node = instance.primary_node
977

    
978
  if disks is None:
979
    disks = [(idx, disk, 0)
980
             for (idx, disk) in enumerate(instance.disks)]
981

    
982
  for (_, device, _) in disks:
983
    lu.cfg.SetDiskID(device, node)
984

    
985
  logging.info("Pausing synchronization of disks of instance '%s'",
986
               instance.name)
987
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
988
                                                  (map(compat.snd, disks),
989
                                                   instance),
990
                                                  True)
991
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
992

    
993
  for idx, success in enumerate(result.payload):
994
    if not success:
995
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
996
                   " failed", idx, instance.name)
997

    
998
  try:
999
    for (idx, device, offset) in disks:
1000
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1001
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1002
      wipe_chunk_size = \
1003
        int(min(constants.MAX_WIPE_CHUNK,
1004
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1005

    
1006
      size = device.size
1007
      last_output = 0
1008
      start_time = time.time()
1009

    
1010
      if offset == 0:
1011
        info_text = ""
1012
      else:
1013
        info_text = (" (from %s to %s)" %
1014
                     (utils.FormatUnit(offset, "h"),
1015
                      utils.FormatUnit(size, "h")))
1016

    
1017
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1018

    
1019
      logging.info("Wiping disk %d for instance %s on node %s using"
1020
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1021

    
1022
      while offset < size:
1023
        wipe_size = min(wipe_chunk_size, size - offset)
1024

    
1025
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1026
                      idx, offset, wipe_size)
1027

    
1028
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1029
                                           wipe_size)
1030
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1031
                     (idx, offset, wipe_size))
1032

    
1033
        now = time.time()
1034
        offset += wipe_size
1035
        if now - last_output >= 60:
1036
          eta = _CalcEta(now - start_time, offset, size)
1037
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1038
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1039
          last_output = now
1040
  finally:
1041
    logging.info("Resuming synchronization of disks for instance '%s'",
1042
                 instance.name)
1043

    
1044
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1045
                                                    (map(compat.snd, disks),
1046
                                                     instance),
1047
                                                    False)
1048

    
1049
    if result.fail_msg:
1050
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1051
                    node, result.fail_msg)
1052
    else:
1053
      for idx, success in enumerate(result.payload):
1054
        if not success:
1055
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1056
                        " failed", idx, instance.name)
1057

    
1058

    
1059
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1060
  """Wrapper for L{WipeDisks} that handles errors.
1061

1062
  @type lu: L{LogicalUnit}
1063
  @param lu: the logical unit on whose behalf we execute
1064
  @type instance: L{objects.Instance}
1065
  @param instance: the instance whose disks we should wipe
1066
  @param disks: see L{WipeDisks}
1067
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1068
      case of error
1069
  @raise errors.OpPrereqError: in case of failure
1070

1071
  """
1072
  try:
1073
    WipeDisks(lu, instance, disks=disks)
1074
  except errors.OpExecError:
1075
    logging.warning("Wiping disks for instance '%s' failed",
1076
                    instance.name)
1077
    _UndoCreateDisks(lu, cleanup)
1078
    raise
1079

    
1080

    
1081
def ExpandCheckDisks(instance, disks):
1082
  """Return the instance disks selected by the disks list
1083

1084
  @type disks: list of L{objects.Disk} or None
1085
  @param disks: selected disks
1086
  @rtype: list of L{objects.Disk}
1087
  @return: selected instance disks to act on
1088

1089
  """
1090
  if disks is None:
1091
    return instance.disks
1092
  else:
1093
    if not set(disks).issubset(instance.disks):
1094
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1095
                                   " target instance: expected a subset of %r,"
1096
                                   " got %r" % (instance.disks, disks))
1097
    return disks
1098

    
1099

    
1100
def WaitForSync(lu, instance, disks=None, oneshot=False):
1101
  """Sleep and poll for an instance's disk to sync.
1102

1103
  """
1104
  if not instance.disks or disks is not None and not disks:
1105
    return True
1106

    
1107
  disks = ExpandCheckDisks(instance, disks)
1108

    
1109
  if not oneshot:
1110
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1111

    
1112
  node = instance.primary_node
1113

    
1114
  for dev in disks:
1115
    lu.cfg.SetDiskID(dev, node)
1116

    
1117
  # TODO: Convert to utils.Retry
1118

    
1119
  retries = 0
1120
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1121
  while True:
1122
    max_time = 0
1123
    done = True
1124
    cumul_degraded = False
1125
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1126
    msg = rstats.fail_msg
1127
    if msg:
1128
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1129
      retries += 1
1130
      if retries >= 10:
1131
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1132
                                 " aborting." % node)
1133
      time.sleep(6)
1134
      continue
1135
    rstats = rstats.payload
1136
    retries = 0
1137
    for i, mstat in enumerate(rstats):
1138
      if mstat is None:
1139
        lu.LogWarning("Can't compute data for node %s/%s",
1140
                      node, disks[i].iv_name)
1141
        continue
1142

    
1143
      cumul_degraded = (cumul_degraded or
1144
                        (mstat.is_degraded and mstat.sync_percent is None))
1145
      if mstat.sync_percent is not None:
1146
        done = False
1147
        if mstat.estimated_time is not None:
1148
          rem_time = ("%s remaining (estimated)" %
1149
                      utils.FormatSeconds(mstat.estimated_time))
1150
          max_time = mstat.estimated_time
1151
        else:
1152
          rem_time = "no time estimate"
1153
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1154
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1155

    
1156
    # if we're done but degraded, let's do a few small retries, to
1157
    # make sure we see a stable and not transient situation; therefore
1158
    # we force restart of the loop
1159
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1160
      logging.info("Degraded disks found, %d retries left", degr_retries)
1161
      degr_retries -= 1
1162
      time.sleep(1)
1163
      continue
1164

    
1165
    if done or oneshot:
1166
      break
1167

    
1168
    time.sleep(min(60, max_time))
1169

    
1170
  if done:
1171
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1172

    
1173
  return not cumul_degraded
1174

    
1175

    
1176
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1177
  """Shutdown block devices of an instance.
1178

1179
  This does the shutdown on all nodes of the instance.
1180

1181
  If the ignore_primary is false, errors on the primary node are
1182
  ignored.
1183

1184
  """
1185
  all_result = True
1186
  disks = ExpandCheckDisks(instance, disks)
1187

    
1188
  for disk in disks:
1189
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1190
      lu.cfg.SetDiskID(top_disk, node)
1191
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1192
      msg = result.fail_msg
1193
      if msg:
1194
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1195
                      disk.iv_name, node, msg)
1196
        if ((node == instance.primary_node and not ignore_primary) or
1197
            (node != instance.primary_node and not result.offline)):
1198
          all_result = False
1199
  return all_result
1200

    
1201

    
1202
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1203
  """Shutdown block devices of an instance.
1204

1205
  This function checks if an instance is running, before calling
1206
  _ShutdownInstanceDisks.
1207

1208
  """
1209
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1210
  ShutdownInstanceDisks(lu, instance, disks=disks)
1211

    
1212

    
1213
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1214
                           ignore_size=False):
1215
  """Prepare the block devices for an instance.
1216

1217
  This sets up the block devices on all nodes.
1218

1219
  @type lu: L{LogicalUnit}
1220
  @param lu: the logical unit on whose behalf we execute
1221
  @type instance: L{objects.Instance}
1222
  @param instance: the instance for whose disks we assemble
1223
  @type disks: list of L{objects.Disk} or None
1224
  @param disks: which disks to assemble (or all, if None)
1225
  @type ignore_secondaries: boolean
1226
  @param ignore_secondaries: if true, errors on secondary nodes
1227
      won't result in an error return from the function
1228
  @type ignore_size: boolean
1229
  @param ignore_size: if true, the current known size of the disk
1230
      will not be used during the disk activation, useful for cases
1231
      when the size is wrong
1232
  @return: False if the operation failed, otherwise a list of
1233
      (host, instance_visible_name, node_visible_name)
1234
      with the mapping from node devices to instance devices
1235

1236
  """
1237
  device_info = []
1238
  disks_ok = True
1239
  iname = instance.name
1240
  disks = ExpandCheckDisks(instance, disks)
1241

    
1242
  # With the two passes mechanism we try to reduce the window of
1243
  # opportunity for the race condition of switching DRBD to primary
1244
  # before handshaking occured, but we do not eliminate it
1245

    
1246
  # The proper fix would be to wait (with some limits) until the
1247
  # connection has been made and drbd transitions from WFConnection
1248
  # into any other network-connected state (Connected, SyncTarget,
1249
  # SyncSource, etc.)
1250

    
1251
  # 1st pass, assemble on all nodes in secondary mode
1252
  for idx, inst_disk in enumerate(disks):
1253
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1254
      if ignore_size:
1255
        node_disk = node_disk.Copy()
1256
        node_disk.UnsetSize()
1257
      lu.cfg.SetDiskID(node_disk, node)
1258
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1259
                                             False, idx)
1260
      msg = result.fail_msg
1261
      if msg:
1262
        is_offline_secondary = (node in instance.secondary_nodes and
1263
                                result.offline)
1264
        lu.LogWarning("Could not prepare block device %s on node %s"
1265
                      " (is_primary=False, pass=1): %s",
1266
                      inst_disk.iv_name, node, msg)
1267
        if not (ignore_secondaries or is_offline_secondary):
1268
          disks_ok = False
1269

    
1270
  # FIXME: race condition on drbd migration to primary
1271

    
1272
  # 2nd pass, do only the primary node
1273
  for idx, inst_disk in enumerate(disks):
1274
    dev_path = None
1275

    
1276
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1277
      if node != instance.primary_node:
1278
        continue
1279
      if ignore_size:
1280
        node_disk = node_disk.Copy()
1281
        node_disk.UnsetSize()
1282
      lu.cfg.SetDiskID(node_disk, node)
1283
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1284
                                             True, idx)
1285
      msg = result.fail_msg
1286
      if msg:
1287
        lu.LogWarning("Could not prepare block device %s on node %s"
1288
                      " (is_primary=True, pass=2): %s",
1289
                      inst_disk.iv_name, node, msg)
1290
        disks_ok = False
1291
      else:
1292
        dev_path = result.payload
1293

    
1294
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1295

    
1296
  # leave the disks configured for the primary node
1297
  # this is a workaround that would be fixed better by
1298
  # improving the logical/physical id handling
1299
  for disk in disks:
1300
    lu.cfg.SetDiskID(disk, instance.primary_node)
1301

    
1302
  return disks_ok, device_info
1303

    
1304

    
1305
def StartInstanceDisks(lu, instance, force):
1306
  """Start the disks of an instance.
1307

1308
  """
1309
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1310
                                      ignore_secondaries=force)
1311
  if not disks_ok:
1312
    ShutdownInstanceDisks(lu, instance)
1313
    if force is not None and not force:
1314
      lu.LogWarning("",
1315
                    hint=("If the message above refers to a secondary node,"
1316
                          " you can retry the operation using '--force'"))
1317
    raise errors.OpExecError("Disk consistency error")
1318

    
1319

    
1320
class LUInstanceGrowDisk(LogicalUnit):
1321
  """Grow a disk of an instance.
1322

1323
  """
1324
  HPATH = "disk-grow"
1325
  HTYPE = constants.HTYPE_INSTANCE
1326
  REQ_BGL = False
1327

    
1328
  def ExpandNames(self):
1329
    self._ExpandAndLockInstance()
1330
    self.needed_locks[locking.LEVEL_NODE] = []
1331
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1332
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1333
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1334

    
1335
  def DeclareLocks(self, level):
1336
    if level == locking.LEVEL_NODE:
1337
      self._LockInstancesNodes()
1338
    elif level == locking.LEVEL_NODE_RES:
1339
      # Copy node locks
1340
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1341
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1342

    
1343
  def BuildHooksEnv(self):
1344
    """Build hooks env.
1345

1346
    This runs on the master, the primary and all the secondaries.
1347

1348
    """
1349
    env = {
1350
      "DISK": self.op.disk,
1351
      "AMOUNT": self.op.amount,
1352
      "ABSOLUTE": self.op.absolute,
1353
      }
1354
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1355
    return env
1356

    
1357
  def BuildHooksNodes(self):
1358
    """Build hooks nodes.
1359

1360
    """
1361
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1362
    return (nl, nl)
1363

    
1364
  def CheckPrereq(self):
1365
    """Check prerequisites.
1366

1367
    This checks that the instance is in the cluster.
1368

1369
    """
1370
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1371
    assert instance is not None, \
1372
      "Cannot retrieve locked instance %s" % self.op.instance_name
1373
    nodenames = list(instance.all_nodes)
1374
    for node in nodenames:
1375
      CheckNodeOnline(self, node)
1376

    
1377
    self.instance = instance
1378

    
1379
    if instance.disk_template not in constants.DTS_GROWABLE:
1380
      raise errors.OpPrereqError("Instance's disk layout does not support"
1381
                                 " growing", errors.ECODE_INVAL)
1382

    
1383
    self.disk = instance.FindDisk(self.op.disk)
1384

    
1385
    if self.op.absolute:
1386
      self.target = self.op.amount
1387
      self.delta = self.target - self.disk.size
1388
      if self.delta < 0:
1389
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1390
                                   "current disk size (%s)" %
1391
                                   (utils.FormatUnit(self.target, "h"),
1392
                                    utils.FormatUnit(self.disk.size, "h")),
1393
                                   errors.ECODE_STATE)
1394
    else:
1395
      self.delta = self.op.amount
1396
      self.target = self.disk.size + self.delta
1397
      if self.delta < 0:
1398
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1399
                                   utils.FormatUnit(self.delta, "h"),
1400
                                   errors.ECODE_INVAL)
1401

    
1402
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1403

    
1404
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1405
    template = self.instance.disk_template
1406
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1407
      # TODO: check the free disk space for file, when that feature will be
1408
      # supported
1409
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1410
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1411
                        nodes)
1412
      if es_nodes:
1413
        # With exclusive storage we need to something smarter than just looking
1414
        # at free space; for now, let's simply abort the operation.
1415
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1416
                                   " is enabled", errors.ECODE_STATE)
1417
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1418

    
1419
  def Exec(self, feedback_fn):
1420
    """Execute disk grow.
1421

1422
    """
1423
    instance = self.instance
1424
    disk = self.disk
1425

    
1426
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1427
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1428
            self.owned_locks(locking.LEVEL_NODE_RES))
1429

    
1430
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1431

    
1432
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1433
    if not disks_ok:
1434
      raise errors.OpExecError("Cannot activate block device to grow")
1435

    
1436
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1437
                (self.op.disk, instance.name,
1438
                 utils.FormatUnit(self.delta, "h"),
1439
                 utils.FormatUnit(self.target, "h")))
1440

    
1441
    # First run all grow ops in dry-run mode
1442
    for node in instance.all_nodes:
1443
      self.cfg.SetDiskID(disk, node)
1444
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1445
                                           True, True)
1446
      result.Raise("Dry-run grow request failed to node %s" % node)
1447

    
1448
    if wipe_disks:
1449
      # Get disk size from primary node for wiping
1450
      result = self.rpc.call_blockdev_getdimensions(instance.primary_node,
1451
                                                    [disk])
1452
      result.Raise("Failed to retrieve disk size from node '%s'" %
1453
                   instance.primary_node)
1454

    
1455
      (disk_dimensions, ) = result.payload
1456

    
1457
      if disk_dimensions is None:
1458
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1459
                                 " node '%s'" % instance.primary_node)
1460
      (disk_size_in_bytes, _) = disk_dimensions
1461

    
1462
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1463

    
1464
      assert old_disk_size >= disk.size, \
1465
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1466
         (old_disk_size, disk.size))
1467
    else:
1468
      old_disk_size = None
1469

    
1470
    # We know that (as far as we can test) operations across different
1471
    # nodes will succeed, time to run it for real on the backing storage
1472
    for node in instance.all_nodes:
1473
      self.cfg.SetDiskID(disk, node)
1474
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1475
                                           False, True)
1476
      result.Raise("Grow request failed to node %s" % node)
1477

    
1478
    # And now execute it for logical storage, on the primary node
1479
    node = instance.primary_node
1480
    self.cfg.SetDiskID(disk, node)
1481
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1482
                                         False, False)
1483
    result.Raise("Grow request failed to node %s" % node)
1484

    
1485
    disk.RecordGrow(self.delta)
1486
    self.cfg.Update(instance, feedback_fn)
1487

    
1488
    # Changes have been recorded, release node lock
1489
    ReleaseLocks(self, locking.LEVEL_NODE)
1490

    
1491
    # Downgrade lock while waiting for sync
1492
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1493

    
1494
    assert wipe_disks ^ (old_disk_size is None)
1495

    
1496
    if wipe_disks:
1497
      assert instance.disks[self.op.disk] == disk
1498

    
1499
      # Wipe newly added disk space
1500
      WipeDisks(self, instance,
1501
                disks=[(self.op.disk, disk, old_disk_size)])
1502

    
1503
    if self.op.wait_for_sync:
1504
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1505
      if disk_abort:
1506
        self.LogWarning("Disk syncing has not returned a good status; check"
1507
                        " the instance")
1508
      if instance.admin_state != constants.ADMINST_UP:
1509
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1510
    elif instance.admin_state != constants.ADMINST_UP:
1511
      self.LogWarning("Not shutting down the disk even if the instance is"
1512
                      " not supposed to be running because no wait for"
1513
                      " sync mode was requested")
1514

    
1515
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1516
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1517

    
1518

    
1519
class LUInstanceReplaceDisks(LogicalUnit):
1520
  """Replace the disks of an instance.
1521

1522
  """
1523
  HPATH = "mirrors-replace"
1524
  HTYPE = constants.HTYPE_INSTANCE
1525
  REQ_BGL = False
1526

    
1527
  def CheckArguments(self):
1528
    """Check arguments.
1529

1530
    """
1531
    remote_node = self.op.remote_node
1532
    ialloc = self.op.iallocator
1533
    if self.op.mode == constants.REPLACE_DISK_CHG:
1534
      if remote_node is None and ialloc is None:
1535
        raise errors.OpPrereqError("When changing the secondary either an"
1536
                                   " iallocator script must be used or the"
1537
                                   " new node given", errors.ECODE_INVAL)
1538
      else:
1539
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1540

    
1541
    elif remote_node is not None or ialloc is not None:
1542
      # Not replacing the secondary
1543
      raise errors.OpPrereqError("The iallocator and new node options can"
1544
                                 " only be used when changing the"
1545
                                 " secondary node", errors.ECODE_INVAL)
1546

    
1547
  def ExpandNames(self):
1548
    self._ExpandAndLockInstance()
1549

    
1550
    assert locking.LEVEL_NODE not in self.needed_locks
1551
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1552
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1553

    
1554
    assert self.op.iallocator is None or self.op.remote_node is None, \
1555
      "Conflicting options"
1556

    
1557
    if self.op.remote_node is not None:
1558
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1559

    
1560
      # Warning: do not remove the locking of the new secondary here
1561
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1562
      # currently it doesn't since parallel invocations of
1563
      # FindUnusedMinor will conflict
1564
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1565
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1566
    else:
1567
      self.needed_locks[locking.LEVEL_NODE] = []
1568
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1569

    
1570
      if self.op.iallocator is not None:
1571
        # iallocator will select a new node in the same group
1572
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1573
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1574

    
1575
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1576

    
1577
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1578
                                   self.op.iallocator, self.op.remote_node,
1579
                                   self.op.disks, self.op.early_release,
1580
                                   self.op.ignore_ipolicy)
1581

    
1582
    self.tasklets = [self.replacer]
1583

    
1584
  def DeclareLocks(self, level):
1585
    if level == locking.LEVEL_NODEGROUP:
1586
      assert self.op.remote_node is None
1587
      assert self.op.iallocator is not None
1588
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1589

    
1590
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1591
      # Lock all groups used by instance optimistically; this requires going
1592
      # via the node before it's locked, requiring verification later on
1593
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1594
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1595

    
1596
    elif level == locking.LEVEL_NODE:
1597
      if self.op.iallocator is not None:
1598
        assert self.op.remote_node is None
1599
        assert not self.needed_locks[locking.LEVEL_NODE]
1600
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1601

    
1602
        # Lock member nodes of all locked groups
1603
        self.needed_locks[locking.LEVEL_NODE] = \
1604
          [node_name
1605
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1606
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1607
      else:
1608
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1609

    
1610
        self._LockInstancesNodes()
1611

    
1612
    elif level == locking.LEVEL_NODE_RES:
1613
      # Reuse node locks
1614
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1615
        self.needed_locks[locking.LEVEL_NODE]
1616

    
1617
  def BuildHooksEnv(self):
1618
    """Build hooks env.
1619

1620
    This runs on the master, the primary and all the secondaries.
1621

1622
    """
1623
    instance = self.replacer.instance
1624
    env = {
1625
      "MODE": self.op.mode,
1626
      "NEW_SECONDARY": self.op.remote_node,
1627
      "OLD_SECONDARY": instance.secondary_nodes[0],
1628
      }
1629
    env.update(BuildInstanceHookEnvByObject(self, instance))
1630
    return env
1631

    
1632
  def BuildHooksNodes(self):
1633
    """Build hooks nodes.
1634

1635
    """
1636
    instance = self.replacer.instance
1637
    nl = [
1638
      self.cfg.GetMasterNode(),
1639
      instance.primary_node,
1640
      ]
1641
    if self.op.remote_node is not None:
1642
      nl.append(self.op.remote_node)
1643
    return nl, nl
1644

    
1645
  def CheckPrereq(self):
1646
    """Check prerequisites.
1647

1648
    """
1649
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1650
            self.op.iallocator is None)
1651

    
1652
    # Verify if node group locks are still correct
1653
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1654
    if owned_groups:
1655
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1656

    
1657
    return LogicalUnit.CheckPrereq(self)
1658

    
1659

    
1660
class LUInstanceActivateDisks(NoHooksLU):
1661
  """Bring up an instance's disks.
1662

1663
  """
1664
  REQ_BGL = False
1665

    
1666
  def ExpandNames(self):
1667
    self._ExpandAndLockInstance()
1668
    self.needed_locks[locking.LEVEL_NODE] = []
1669
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1670

    
1671
  def DeclareLocks(self, level):
1672
    if level == locking.LEVEL_NODE:
1673
      self._LockInstancesNodes()
1674

    
1675
  def CheckPrereq(self):
1676
    """Check prerequisites.
1677

1678
    This checks that the instance is in the cluster.
1679

1680
    """
1681
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1682
    assert self.instance is not None, \
1683
      "Cannot retrieve locked instance %s" % self.op.instance_name
1684
    CheckNodeOnline(self, self.instance.primary_node)
1685

    
1686
  def Exec(self, feedback_fn):
1687
    """Activate the disks.
1688

1689
    """
1690
    disks_ok, disks_info = \
1691
              AssembleInstanceDisks(self, self.instance,
1692
                                    ignore_size=self.op.ignore_size)
1693
    if not disks_ok:
1694
      raise errors.OpExecError("Cannot activate block devices")
1695

    
1696
    if self.op.wait_for_sync:
1697
      if not WaitForSync(self, self.instance):
1698
        raise errors.OpExecError("Some disks of the instance are degraded!")
1699

    
1700
    return disks_info
1701

    
1702

    
1703
class LUInstanceDeactivateDisks(NoHooksLU):
1704
  """Shutdown an instance's disks.
1705

1706
  """
1707
  REQ_BGL = False
1708

    
1709
  def ExpandNames(self):
1710
    self._ExpandAndLockInstance()
1711
    self.needed_locks[locking.LEVEL_NODE] = []
1712
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1713

    
1714
  def DeclareLocks(self, level):
1715
    if level == locking.LEVEL_NODE:
1716
      self._LockInstancesNodes()
1717

    
1718
  def CheckPrereq(self):
1719
    """Check prerequisites.
1720

1721
    This checks that the instance is in the cluster.
1722

1723
    """
1724
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1725
    assert self.instance is not None, \
1726
      "Cannot retrieve locked instance %s" % self.op.instance_name
1727

    
1728
  def Exec(self, feedback_fn):
1729
    """Deactivate the disks
1730

1731
    """
1732
    instance = self.instance
1733
    if self.op.force:
1734
      ShutdownInstanceDisks(self, instance)
1735
    else:
1736
      _SafeShutdownInstanceDisks(self, instance)
1737

    
1738

    
1739
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1740
                               ldisk=False):
1741
  """Check that mirrors are not degraded.
1742

1743
  @attention: The device has to be annotated already.
1744

1745
  The ldisk parameter, if True, will change the test from the
1746
  is_degraded attribute (which represents overall non-ok status for
1747
  the device(s)) to the ldisk (representing the local storage status).
1748

1749
  """
1750
  lu.cfg.SetDiskID(dev, node)
1751

    
1752
  result = True
1753

    
1754
  if on_primary or dev.AssembleOnSecondary():
1755
    rstats = lu.rpc.call_blockdev_find(node, dev)
1756
    msg = rstats.fail_msg
1757
    if msg:
1758
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1759
      result = False
1760
    elif not rstats.payload:
1761
      lu.LogWarning("Can't find disk on node %s", node)
1762
      result = False
1763
    else:
1764
      if ldisk:
1765
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1766
      else:
1767
        result = result and not rstats.payload.is_degraded
1768

    
1769
  if dev.children:
1770
    for child in dev.children:
1771
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1772
                                                     on_primary)
1773

    
1774
  return result
1775

    
1776

    
1777
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1778
  """Wrapper around L{_CheckDiskConsistencyInner}.
1779

1780
  """
1781
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1782
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1783
                                    ldisk=ldisk)
1784

    
1785

    
1786
def _BlockdevFind(lu, node, dev, instance):
1787
  """Wrapper around call_blockdev_find to annotate diskparams.
1788

1789
  @param lu: A reference to the lu object
1790
  @param node: The node to call out
1791
  @param dev: The device to find
1792
  @param instance: The instance object the device belongs to
1793
  @returns The result of the rpc call
1794

1795
  """
1796
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1797
  return lu.rpc.call_blockdev_find(node, disk)
1798

    
1799

    
1800
def _GenerateUniqueNames(lu, exts):
1801
  """Generate a suitable LV name.
1802

1803
  This will generate a logical volume name for the given instance.
1804

1805
  """
1806
  results = []
1807
  for val in exts:
1808
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1809
    results.append("%s%s" % (new_id, val))
1810
  return results
1811

    
1812

    
1813
class TLReplaceDisks(Tasklet):
1814
  """Replaces disks for an instance.
1815

1816
  Note: Locking is not within the scope of this class.
1817

1818
  """
1819
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1820
               disks, early_release, ignore_ipolicy):
1821
    """Initializes this class.
1822

1823
    """
1824
    Tasklet.__init__(self, lu)
1825

    
1826
    # Parameters
1827
    self.instance_name = instance_name
1828
    self.mode = mode
1829
    self.iallocator_name = iallocator_name
1830
    self.remote_node = remote_node
1831
    self.disks = disks
1832
    self.early_release = early_release
1833
    self.ignore_ipolicy = ignore_ipolicy
1834

    
1835
    # Runtime data
1836
    self.instance = None
1837
    self.new_node = None
1838
    self.target_node = None
1839
    self.other_node = None
1840
    self.remote_node_info = None
1841
    self.node_secondary_ip = None
1842

    
1843
  @staticmethod
1844
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1845
    """Compute a new secondary node using an IAllocator.
1846

1847
    """
1848
    req = iallocator.IAReqRelocate(name=instance_name,
1849
                                   relocate_from=list(relocate_from))
1850
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1851

    
1852
    ial.Run(iallocator_name)
1853

    
1854
    if not ial.success:
1855
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1856
                                 " %s" % (iallocator_name, ial.info),
1857
                                 errors.ECODE_NORES)
1858

    
1859
    remote_node_name = ial.result[0]
1860

    
1861
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1862
               instance_name, remote_node_name)
1863

    
1864
    return remote_node_name
1865

    
1866
  def _FindFaultyDisks(self, node_name):
1867
    """Wrapper for L{FindFaultyInstanceDisks}.
1868

1869
    """
1870
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1871
                                   node_name, True)
1872

    
1873
  def _CheckDisksActivated(self, instance):
1874
    """Checks if the instance disks are activated.
1875

1876
    @param instance: The instance to check disks
1877
    @return: True if they are activated, False otherwise
1878

1879
    """
1880
    nodes = instance.all_nodes
1881

    
1882
    for idx, dev in enumerate(instance.disks):
1883
      for node in nodes:
1884
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1885
        self.cfg.SetDiskID(dev, node)
1886

    
1887
        result = _BlockdevFind(self, node, dev, instance)
1888

    
1889
        if result.offline:
1890
          continue
1891
        elif result.fail_msg or not result.payload:
1892
          return False
1893

    
1894
    return True
1895

    
1896
  def CheckPrereq(self):
1897
    """Check prerequisites.
1898

1899
    This checks that the instance is in the cluster.
1900

1901
    """
1902
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1903
    assert instance is not None, \
1904
      "Cannot retrieve locked instance %s" % self.instance_name
1905

    
1906
    if instance.disk_template != constants.DT_DRBD8:
1907
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1908
                                 " instances", errors.ECODE_INVAL)
1909

    
1910
    if len(instance.secondary_nodes) != 1:
1911
      raise errors.OpPrereqError("The instance has a strange layout,"
1912
                                 " expected one secondary but found %d" %
1913
                                 len(instance.secondary_nodes),
1914
                                 errors.ECODE_FAULT)
1915

    
1916
    instance = self.instance
1917
    secondary_node = instance.secondary_nodes[0]
1918

    
1919
    if self.iallocator_name is None:
1920
      remote_node = self.remote_node
1921
    else:
1922
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1923
                                       instance.name, instance.secondary_nodes)
1924

    
1925
    if remote_node is None:
1926
      self.remote_node_info = None
1927
    else:
1928
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1929
             "Remote node '%s' is not locked" % remote_node
1930

    
1931
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1932
      assert self.remote_node_info is not None, \
1933
        "Cannot retrieve locked node %s" % remote_node
1934

    
1935
    if remote_node == self.instance.primary_node:
1936
      raise errors.OpPrereqError("The specified node is the primary node of"
1937
                                 " the instance", errors.ECODE_INVAL)
1938

    
1939
    if remote_node == secondary_node:
1940
      raise errors.OpPrereqError("The specified node is already the"
1941
                                 " secondary node of the instance",
1942
                                 errors.ECODE_INVAL)
1943

    
1944
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1945
                                    constants.REPLACE_DISK_CHG):
1946
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1947
                                 errors.ECODE_INVAL)
1948

    
1949
    if self.mode == constants.REPLACE_DISK_AUTO:
1950
      if not self._CheckDisksActivated(instance):
1951
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1952
                                   " first" % self.instance_name,
1953
                                   errors.ECODE_STATE)
1954
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1955
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1956

    
1957
      if faulty_primary and faulty_secondary:
1958
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1959
                                   " one node and can not be repaired"
1960
                                   " automatically" % self.instance_name,
1961
                                   errors.ECODE_STATE)
1962

    
1963
      if faulty_primary:
1964
        self.disks = faulty_primary
1965
        self.target_node = instance.primary_node
1966
        self.other_node = secondary_node
1967
        check_nodes = [self.target_node, self.other_node]
1968
      elif faulty_secondary:
1969
        self.disks = faulty_secondary
1970
        self.target_node = secondary_node
1971
        self.other_node = instance.primary_node
1972
        check_nodes = [self.target_node, self.other_node]
1973
      else:
1974
        self.disks = []
1975
        check_nodes = []
1976

    
1977
    else:
1978
      # Non-automatic modes
1979
      if self.mode == constants.REPLACE_DISK_PRI:
1980
        self.target_node = instance.primary_node
1981
        self.other_node = secondary_node
1982
        check_nodes = [self.target_node, self.other_node]
1983

    
1984
      elif self.mode == constants.REPLACE_DISK_SEC:
1985
        self.target_node = secondary_node
1986
        self.other_node = instance.primary_node
1987
        check_nodes = [self.target_node, self.other_node]
1988

    
1989
      elif self.mode == constants.REPLACE_DISK_CHG:
1990
        self.new_node = remote_node
1991
        self.other_node = instance.primary_node
1992
        self.target_node = secondary_node
1993
        check_nodes = [self.new_node, self.other_node]
1994

    
1995
        CheckNodeNotDrained(self.lu, remote_node)
1996
        CheckNodeVmCapable(self.lu, remote_node)
1997

    
1998
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1999
        assert old_node_info is not None
2000
        if old_node_info.offline and not self.early_release:
2001
          # doesn't make sense to delay the release
2002
          self.early_release = True
2003
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2004
                          " early-release mode", secondary_node)
2005

    
2006
      else:
2007
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2008
                                     self.mode)
2009

    
2010
      # If not specified all disks should be replaced
2011
      if not self.disks:
2012
        self.disks = range(len(self.instance.disks))
2013

    
2014
    # TODO: This is ugly, but right now we can't distinguish between internal
2015
    # submitted opcode and external one. We should fix that.
2016
    if self.remote_node_info:
2017
      # We change the node, lets verify it still meets instance policy
2018
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2019
      cluster = self.cfg.GetClusterInfo()
2020
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2021
                                                              new_group_info)
2022
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2023
                             self.cfg, ignore=self.ignore_ipolicy)
2024

    
2025
    for node in check_nodes:
2026
      CheckNodeOnline(self.lu, node)
2027

    
2028
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2029
                                                          self.other_node,
2030
                                                          self.target_node]
2031
                              if node_name is not None)
2032

    
2033
    # Release unneeded node and node resource locks
2034
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2035
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2036
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2037

    
2038
    # Release any owned node group
2039
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2040

    
2041
    # Check whether disks are valid
2042
    for disk_idx in self.disks:
2043
      instance.FindDisk(disk_idx)
2044

    
2045
    # Get secondary node IP addresses
2046
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2047
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2048

    
2049
  def Exec(self, feedback_fn):
2050
    """Execute disk replacement.
2051

2052
    This dispatches the disk replacement to the appropriate handler.
2053

2054
    """
2055
    if __debug__:
2056
      # Verify owned locks before starting operation
2057
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2058
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2059
          ("Incorrect node locks, owning %s, expected %s" %
2060
           (owned_nodes, self.node_secondary_ip.keys()))
2061
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2062
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2063
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2064

    
2065
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2066
      assert list(owned_instances) == [self.instance_name], \
2067
          "Instance '%s' not locked" % self.instance_name
2068

    
2069
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2070
          "Should not own any node group lock at this point"
2071

    
2072
    if not self.disks:
2073
      feedback_fn("No disks need replacement for instance '%s'" %
2074
                  self.instance.name)
2075
      return
2076

    
2077
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2078
                (utils.CommaJoin(self.disks), self.instance.name))
2079
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2080
    feedback_fn("Current seconary node: %s" %
2081
                utils.CommaJoin(self.instance.secondary_nodes))
2082

    
2083
    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2084

    
2085
    # Activate the instance disks if we're replacing them on a down instance
2086
    if activate_disks:
2087
      StartInstanceDisks(self.lu, self.instance, True)
2088

    
2089
    try:
2090
      # Should we replace the secondary node?
2091
      if self.new_node is not None:
2092
        fn = self._ExecDrbd8Secondary
2093
      else:
2094
        fn = self._ExecDrbd8DiskOnly
2095

    
2096
      result = fn(feedback_fn)
2097
    finally:
2098
      # Deactivate the instance disks if we're replacing them on a
2099
      # down instance
2100
      if activate_disks:
2101
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2102

    
2103
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2104

    
2105
    if __debug__:
2106
      # Verify owned locks
2107
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2108
      nodes = frozenset(self.node_secondary_ip)
2109
      assert ((self.early_release and not owned_nodes) or
2110
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2111
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2112
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2113

    
2114
    return result
2115

    
2116
  def _CheckVolumeGroup(self, nodes):
2117
    self.lu.LogInfo("Checking volume groups")
2118

    
2119
    vgname = self.cfg.GetVGName()
2120

    
2121
    # Make sure volume group exists on all involved nodes
2122
    results = self.rpc.call_vg_list(nodes)
2123
    if not results:
2124
      raise errors.OpExecError("Can't list volume groups on the nodes")
2125

    
2126
    for node in nodes:
2127
      res = results[node]
2128
      res.Raise("Error checking node %s" % node)
2129
      if vgname not in res.payload:
2130
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2131
                                 (vgname, node))
2132

    
2133
  def _CheckDisksExistence(self, nodes):
2134
    # Check disk existence
2135
    for idx, dev in enumerate(self.instance.disks):
2136
      if idx not in self.disks:
2137
        continue
2138

    
2139
      for node in nodes:
2140
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2141
        self.cfg.SetDiskID(dev, node)
2142

    
2143
        result = _BlockdevFind(self, node, dev, self.instance)
2144

    
2145
        msg = result.fail_msg
2146
        if msg or not result.payload:
2147
          if not msg:
2148
            msg = "disk not found"
2149
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2150
                                   (idx, node, msg))
2151

    
2152
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2153
    for idx, dev in enumerate(self.instance.disks):
2154
      if idx not in self.disks:
2155
        continue
2156

    
2157
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2158
                      (idx, node_name))
2159

    
2160
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2161
                                  on_primary, ldisk=ldisk):
2162
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2163
                                 " replace disks for instance %s" %
2164
                                 (node_name, self.instance.name))
2165

    
2166
  def _CreateNewStorage(self, node_name):
2167
    """Create new storage on the primary or secondary node.
2168

2169
    This is only used for same-node replaces, not for changing the
2170
    secondary node, hence we don't want to modify the existing disk.
2171

2172
    """
2173
    iv_names = {}
2174

    
2175
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2176
    for idx, dev in enumerate(disks):
2177
      if idx not in self.disks:
2178
        continue
2179

    
2180
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2181

    
2182
      self.cfg.SetDiskID(dev, node_name)
2183

    
2184
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2185
      names = _GenerateUniqueNames(self.lu, lv_names)
2186

    
2187
      (data_disk, meta_disk) = dev.children
2188
      vg_data = data_disk.logical_id[0]
2189
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2190
                             logical_id=(vg_data, names[0]),
2191
                             params=data_disk.params)
2192
      vg_meta = meta_disk.logical_id[0]
2193
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2194
                             size=constants.DRBD_META_SIZE,
2195
                             logical_id=(vg_meta, names[1]),
2196
                             params=meta_disk.params)
2197

    
2198
      new_lvs = [lv_data, lv_meta]
2199
      old_lvs = [child.Copy() for child in dev.children]
2200
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2201
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2202

    
2203
      # we pass force_create=True to force the LVM creation
2204
      for new_lv in new_lvs:
2205
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2206
                             GetInstanceInfoText(self.instance), False,
2207
                             excl_stor)
2208

    
2209
    return iv_names
2210

    
2211
  def _CheckDevices(self, node_name, iv_names):
2212
    for name, (dev, _, _) in iv_names.iteritems():
2213
      self.cfg.SetDiskID(dev, node_name)
2214

    
2215
      result = _BlockdevFind(self, node_name, dev, self.instance)
2216

    
2217
      msg = result.fail_msg
2218
      if msg or not result.payload:
2219
        if not msg:
2220
          msg = "disk not found"
2221
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2222
                                 (name, msg))
2223

    
2224
      if result.payload.is_degraded:
2225
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2226

    
2227
  def _RemoveOldStorage(self, node_name, iv_names):
2228
    for name, (_, old_lvs, _) in iv_names.iteritems():
2229
      self.lu.LogInfo("Remove logical volumes for %s", name)
2230

    
2231
      for lv in old_lvs:
2232
        self.cfg.SetDiskID(lv, node_name)
2233

    
2234
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2235
        if msg:
2236
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2237
                             hint="remove unused LVs manually")
2238

    
2239
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2240
    """Replace a disk on the primary or secondary for DRBD 8.
2241

2242
    The algorithm for replace is quite complicated:
2243

2244
      1. for each disk to be replaced:
2245

2246
        1. create new LVs on the target node with unique names
2247
        1. detach old LVs from the drbd device
2248
        1. rename old LVs to name_replaced.<time_t>
2249
        1. rename new LVs to old LVs
2250
        1. attach the new LVs (with the old names now) to the drbd device
2251

2252
      1. wait for sync across all devices
2253

2254
      1. for each modified disk:
2255

2256
        1. remove old LVs (which have the name name_replaces.<time_t>)
2257

2258
    Failures are not very well handled.
2259

2260
    """
2261
    steps_total = 6
2262

    
2263
    # Step: check device activation
2264
    self.lu.LogStep(1, steps_total, "Check device existence")
2265
    self._CheckDisksExistence([self.other_node, self.target_node])
2266
    self._CheckVolumeGroup([self.target_node, self.other_node])
2267

    
2268
    # Step: check other node consistency
2269
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2270
    self._CheckDisksConsistency(self.other_node,
2271
                                self.other_node == self.instance.primary_node,
2272
                                False)
2273

    
2274
    # Step: create new storage
2275
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2276
    iv_names = self._CreateNewStorage(self.target_node)
2277

    
2278
    # Step: for each lv, detach+rename*2+attach
2279
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2280
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2281
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2282

    
2283
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2284
                                                     old_lvs)
2285
      result.Raise("Can't detach drbd from local storage on node"
2286
                   " %s for device %s" % (self.target_node, dev.iv_name))
2287
      #dev.children = []
2288
      #cfg.Update(instance)
2289

    
2290
      # ok, we created the new LVs, so now we know we have the needed
2291
      # storage; as such, we proceed on the target node to rename
2292
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2293
      # using the assumption that logical_id == physical_id (which in
2294
      # turn is the unique_id on that node)
2295

    
2296
      # FIXME(iustin): use a better name for the replaced LVs
2297
      temp_suffix = int(time.time())
2298
      ren_fn = lambda d, suff: (d.physical_id[0],
2299
                                d.physical_id[1] + "_replaced-%s" % suff)
2300

    
2301
      # Build the rename list based on what LVs exist on the node
2302
      rename_old_to_new = []
2303
      for to_ren in old_lvs:
2304
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2305
        if not result.fail_msg and result.payload:
2306
          # device exists
2307
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2308

    
2309
      self.lu.LogInfo("Renaming the old LVs on the target node")
2310
      result = self.rpc.call_blockdev_rename(self.target_node,
2311
                                             rename_old_to_new)
2312
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2313

    
2314
      # Now we rename the new LVs to the old LVs
2315
      self.lu.LogInfo("Renaming the new LVs on the target node")
2316
      rename_new_to_old = [(new, old.physical_id)
2317
                           for old, new in zip(old_lvs, new_lvs)]
2318
      result = self.rpc.call_blockdev_rename(self.target_node,
2319
                                             rename_new_to_old)
2320
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2321

    
2322
      # Intermediate steps of in memory modifications
2323
      for old, new in zip(old_lvs, new_lvs):
2324
        new.logical_id = old.logical_id
2325
        self.cfg.SetDiskID(new, self.target_node)
2326

    
2327
      # We need to modify old_lvs so that removal later removes the
2328
      # right LVs, not the newly added ones; note that old_lvs is a
2329
      # copy here
2330
      for disk in old_lvs:
2331
        disk.logical_id = ren_fn(disk, temp_suffix)
2332
        self.cfg.SetDiskID(disk, self.target_node)
2333

    
2334
      # Now that the new lvs have the old name, we can add them to the device
2335
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2336
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2337
                                                  (dev, self.instance), new_lvs)
2338
      msg = result.fail_msg
2339
      if msg:
2340
        for new_lv in new_lvs:
2341
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2342
                                               new_lv).fail_msg
2343
          if msg2:
2344
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2345
                               hint=("cleanup manually the unused logical"
2346
                                     "volumes"))
2347
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2348

    
2349
    cstep = itertools.count(5)
2350

    
2351
    if self.early_release:
2352
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2353
      self._RemoveOldStorage(self.target_node, iv_names)
2354
      # TODO: Check if releasing locks early still makes sense
2355
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2356
    else:
2357
      # Release all resource locks except those used by the instance
2358
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2359
                   keep=self.node_secondary_ip.keys())
2360

    
2361
    # Release all node locks while waiting for sync
2362
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2363

    
2364
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2365
    # shutdown in the caller into consideration.
2366

    
2367
    # Wait for sync
2368
    # This can fail as the old devices are degraded and _WaitForSync
2369
    # does a combined result over all disks, so we don't check its return value
2370
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2371
    WaitForSync(self.lu, self.instance)
2372

    
2373
    # Check all devices manually
2374
    self._CheckDevices(self.instance.primary_node, iv_names)
2375

    
2376
    # Step: remove old storage
2377
    if not self.early_release:
2378
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2379
      self._RemoveOldStorage(self.target_node, iv_names)
2380

    
2381
  def _ExecDrbd8Secondary(self, feedback_fn):
2382
    """Replace the secondary node for DRBD 8.
2383

2384
    The algorithm for replace is quite complicated:
2385
      - for all disks of the instance:
2386
        - create new LVs on the new node with same names
2387
        - shutdown the drbd device on the old secondary
2388
        - disconnect the drbd network on the primary
2389
        - create the drbd device on the new secondary
2390
        - network attach the drbd on the primary, using an artifice:
2391
          the drbd code for Attach() will connect to the network if it
2392
          finds a device which is connected to the good local disks but
2393
          not network enabled
2394
      - wait for sync across all devices
2395
      - remove all disks from the old secondary
2396

2397
    Failures are not very well handled.
2398

2399
    """
2400
    steps_total = 6
2401

    
2402
    pnode = self.instance.primary_node
2403

    
2404
    # Step: check device activation
2405
    self.lu.LogStep(1, steps_total, "Check device existence")
2406
    self._CheckDisksExistence([self.instance.primary_node])
2407
    self._CheckVolumeGroup([self.instance.primary_node])
2408

    
2409
    # Step: check other node consistency
2410
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2411
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2412

    
2413
    # Step: create new storage
2414
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2415
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2416
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2417
    for idx, dev in enumerate(disks):
2418
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2419
                      (self.new_node, idx))
2420
      # we pass force_create=True to force LVM creation
2421
      for new_lv in dev.children:
2422
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2423
                             True, GetInstanceInfoText(self.instance), False,
2424
                             excl_stor)
2425

    
2426
    # Step 4: dbrd minors and drbd setups changes
2427
    # after this, we must manually remove the drbd minors on both the
2428
    # error and the success paths
2429
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2430
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2431
                                         for dev in self.instance.disks],
2432
                                        self.instance.name)
2433
    logging.debug("Allocated minors %r", minors)
2434

    
2435
    iv_names = {}
2436
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2437
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2438
                      (self.new_node, idx))
2439
      # create new devices on new_node; note that we create two IDs:
2440
      # one without port, so the drbd will be activated without
2441
      # networking information on the new node at this stage, and one
2442
      # with network, for the latter activation in step 4
2443
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2444
      if self.instance.primary_node == o_node1:
2445
        p_minor = o_minor1
2446
      else:
2447
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2448
        p_minor = o_minor2
2449

    
2450
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2451
                      p_minor, new_minor, o_secret)
2452
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2453
                    p_minor, new_minor, o_secret)
2454

    
2455
      iv_names[idx] = (dev, dev.children, new_net_id)
2456
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2457
                    new_net_id)
2458
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2459
                              logical_id=new_alone_id,
2460
                              children=dev.children,
2461
                              size=dev.size,
2462
                              params={})
2463
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2464
                                            self.cfg)
2465
      try:
2466
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2467
                             anno_new_drbd,
2468
                             GetInstanceInfoText(self.instance), False,
2469
                             excl_stor)
2470
      except errors.GenericError:
2471
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2472
        raise
2473

    
2474
    # We have new devices, shutdown the drbd on the old secondary
2475
    for idx, dev in enumerate(self.instance.disks):
2476
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2477
      self.cfg.SetDiskID(dev, self.target_node)
2478
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2479
                                            (dev, self.instance)).fail_msg
2480
      if msg:
2481
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2482
                           "node: %s" % (idx, msg),
2483
                           hint=("Please cleanup this device manually as"
2484
                                 " soon as possible"))
2485

    
2486
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2487
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2488
                                               self.instance.disks)[pnode]
2489

    
2490
    msg = result.fail_msg
2491
    if msg:
2492
      # detaches didn't succeed (unlikely)
2493
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2494
      raise errors.OpExecError("Can't detach the disks from the network on"
2495
                               " old node: %s" % (msg,))
2496

    
2497
    # if we managed to detach at least one, we update all the disks of
2498
    # the instance to point to the new secondary
2499
    self.lu.LogInfo("Updating instance configuration")
2500
    for dev, _, new_logical_id in iv_names.itervalues():
2501
      dev.logical_id = new_logical_id
2502
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2503

    
2504
    self.cfg.Update(self.instance, feedback_fn)
2505

    
2506
    # Release all node locks (the configuration has been updated)
2507
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2508

    
2509
    # and now perform the drbd attach
2510
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2511
                    " (standalone => connected)")
2512
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2513
                                            self.new_node],
2514
                                           self.node_secondary_ip,
2515
                                           (self.instance.disks, self.instance),
2516
                                           self.instance.name,
2517
                                           False)
2518
    for to_node, to_result in result.items():
2519
      msg = to_result.fail_msg
2520
      if msg:
2521
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2522
                           to_node, msg,
2523
                           hint=("please do a gnt-instance info to see the"
2524
                                 " status of disks"))
2525

    
2526
    cstep = itertools.count(5)
2527

    
2528
    if self.early_release:
2529
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2530
      self._RemoveOldStorage(self.target_node, iv_names)
2531
      # TODO: Check if releasing locks early still makes sense
2532
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2533
    else:
2534
      # Release all resource locks except those used by the instance
2535
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2536
                   keep=self.node_secondary_ip.keys())
2537

    
2538
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2539
    # shutdown in the caller into consideration.
2540

    
2541
    # Wait for sync
2542
    # This can fail as the old devices are degraded and _WaitForSync
2543
    # does a combined result over all disks, so we don't check its return value
2544
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2545
    WaitForSync(self.lu, self.instance)
2546

    
2547
    # Check all devices manually
2548
    self._CheckDevices(self.instance.primary_node, iv_names)
2549

    
2550
    # Step: remove old storage
2551
    if not self.early_release:
2552
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2553
      self._RemoveOldStorage(self.target_node, iv_names)