Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 3f3ea14c

History | View | Annotate | Download (93.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node)
93
  result = lu.rpc.call_blockdev_create(node, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device, node, instance.name))
98
  if device.physical_id is None:
99
    device.physical_id = result.payload
100

    
101

    
102
def _CreateBlockDevInner(lu, node, instance, device, force_create,
103
                         info, force_open, excl_stor):
104
  """Create a tree of block devices on a given node.
105

106
  If this device type has to be created on secondaries, create it and
107
  all its children.
108

109
  If not, just recurse to children keeping the same 'force' value.
110

111
  @attention: The device has to be annotated already.
112

113
  @param lu: the lu on whose behalf we execute
114
  @param node: the node on which to create the device
115
  @type instance: L{objects.Instance}
116
  @param instance: the instance which owns the device
117
  @type device: L{objects.Disk}
118
  @param device: the device to create
119
  @type force_create: boolean
120
  @param force_create: whether to force creation of this device; this
121
      will be change to True whenever we find a device which has
122
      CreateOnSecondary() attribute
123
  @param info: the extra 'metadata' we should attach to the device
124
      (this will be represented as a LVM tag)
125
  @type force_open: boolean
126
  @param force_open: this parameter will be passes to the
127
      L{backend.BlockdevCreate} function where it specifies
128
      whether we run on primary or not, and it affects both
129
      the child assembly and the device own Open() execution
130
  @type excl_stor: boolean
131
  @param excl_stor: Whether exclusive_storage is active for the node
132

133
  @return: list of created devices
134
  """
135
  created_devices = []
136
  try:
137
    if device.CreateOnSecondary():
138
      force_create = True
139

    
140
    if device.children:
141
      for child in device.children:
142
        devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143
                                    info, force_open, excl_stor)
144
        created_devices.extend(devs)
145

    
146
    if not force_create:
147
      return created_devices
148

    
149
    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150
                         excl_stor)
151
    # The device has been completely created, so there is no point in keeping
152
    # its subdevices in the list. We just add the device itself instead.
153
    created_devices = [(node, device)]
154
    return created_devices
155

    
156
  except errors.DeviceCreationError, e:
157
    e.created_devices.extend(created_devices)
158
    raise e
159
  except errors.OpExecError, e:
160
    raise errors.DeviceCreationError(str(e), created_devices)
161

    
162

    
163
def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164
  """Whether exclusive_storage is in effect for the given node.
165

166
  @type cfg: L{config.ConfigWriter}
167
  @param cfg: The cluster configuration
168
  @type nodename: string
169
  @param nodename: The node
170
  @rtype: bool
171
  @return: The effective value of exclusive_storage
172
  @raise errors.OpPrereqError: if no node exists with the given name
173

174
  """
175
  ni = cfg.GetNodeInfo(nodename)
176
  if ni is None:
177
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
178
                               errors.ECODE_NOENT)
179
  return IsExclusiveStorageEnabledNode(cfg, ni)
180

    
181

    
182
def _CreateBlockDev(lu, node, instance, device, force_create, info,
183
                    force_open):
184
  """Wrapper around L{_CreateBlockDevInner}.
185

186
  This method annotates the root device first.
187

188
  """
189
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190
  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191
  return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192
                              force_open, excl_stor)
193

    
194

    
195
def _UndoCreateDisks(lu, disks_created):
196
  """Undo the work performed by L{CreateDisks}.
197

198
  This function is called in case of an error to undo the work of
199
  L{CreateDisks}.
200

201
  @type lu: L{LogicalUnit}
202
  @param lu: the logical unit on whose behalf we execute
203
  @param disks_created: the result returned by L{CreateDisks}
204

205
  """
206
  for (node, disk) in disks_created:
207
    lu.cfg.SetDiskID(disk, node)
208
    result = lu.rpc.call_blockdev_remove(node, disk)
209
    if result.fail_msg:
210
      logging.warning("Failed to remove newly-created disk %s on node %s:"
211
                      " %s", disk, node, result.fail_msg)
212

    
213

    
214
def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215
  """Create all disks for an instance.
216

217
  This abstracts away some work from AddInstance.
218

219
  @type lu: L{LogicalUnit}
220
  @param lu: the logical unit on whose behalf we execute
221
  @type instance: L{objects.Instance}
222
  @param instance: the instance whose disks we should create
223
  @type to_skip: list
224
  @param to_skip: list of indices to skip
225
  @type target_node: string
226
  @param target_node: if passed, overrides the target node for creation
227
  @type disks: list of {objects.Disk}
228
  @param disks: the disks to create; if not specified, all the disks of the
229
      instance are created
230
  @return: information about the created disks, to be used to call
231
      L{_UndoCreateDisks}
232
  @raise errors.OpPrereqError: in case of error
233

234
  """
235
  info = GetInstanceInfoText(instance)
236
  if target_node is None:
237
    pnode = instance.primary_node
238
    all_nodes = instance.all_nodes
239
  else:
240
    pnode = target_node
241
    all_nodes = [pnode]
242

    
243
  if disks is None:
244
    disks = instance.disks
245

    
246
  if instance.disk_template in constants.DTS_FILEBASED:
247
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249

    
250
    result.Raise("Failed to create directory '%s' on"
251
                 " node %s" % (file_storage_dir, pnode))
252

    
253
  disks_created = []
254
  for idx, device in enumerate(disks):
255
    if to_skip and idx in to_skip:
256
      continue
257
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258
    for node in all_nodes:
259
      f_create = node == pnode
260
      try:
261
        _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262
        disks_created.append((node, device))
263
      except errors.DeviceCreationError, e:
264
        logging.warning("Creating disk %s for instance '%s' failed",
265
                        idx, instance.name)
266
        disks_created.extend(e.created_devices)
267
        _UndoCreateDisks(lu, disks_created)
268
        raise errors.OpExecError(e.message)
269
  return disks_created
270

    
271

    
272
def ComputeDiskSizePerVG(disk_template, disks):
273
  """Compute disk size requirements in the volume group
274

275
  """
276
  def _compute(disks, payload):
277
    """Universal algorithm.
278

279
    """
280
    vgs = {}
281
    for disk in disks:
282
      vgs[disk[constants.IDISK_VG]] = \
283
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284

    
285
    return vgs
286

    
287
  # Required free disk space as a function of disk and swap space
288
  req_size_dict = {
289
    constants.DT_DISKLESS: {},
290
    constants.DT_PLAIN: _compute(disks, 0),
291
    # 128 MB are added for drbd metadata for each disk
292
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293
    constants.DT_FILE: {},
294
    constants.DT_SHARED_FILE: {},
295
    }
296

    
297
  if disk_template not in req_size_dict:
298
    raise errors.ProgrammerError("Disk template '%s' size requirement"
299
                                 " is unknown" % disk_template)
300

    
301
  return req_size_dict[disk_template]
302

    
303

    
304
def ComputeDisks(op, default_vg):
305
  """Computes the instance disks.
306

307
  @param op: The instance opcode
308
  @param default_vg: The default_vg to assume
309

310
  @return: The computed disks
311

312
  """
313
  disks = []
314
  for disk in op.disks:
315
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316
    if mode not in constants.DISK_ACCESS_SET:
317
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318
                                 mode, errors.ECODE_INVAL)
319
    size = disk.get(constants.IDISK_SIZE, None)
320
    if size is None:
321
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322
    try:
323
      size = int(size)
324
    except (TypeError, ValueError):
325
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326
                                 errors.ECODE_INVAL)
327

    
328
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329
    if ext_provider and op.disk_template != constants.DT_EXT:
330
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331
                                 " disk template, not %s" %
332
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
333
                                  op.disk_template), errors.ECODE_INVAL)
334

    
335
    data_vg = disk.get(constants.IDISK_VG, default_vg)
336
    name = disk.get(constants.IDISK_NAME, None)
337
    if name is not None and name.lower() == constants.VALUE_NONE:
338
      name = None
339
    new_disk = {
340
      constants.IDISK_SIZE: size,
341
      constants.IDISK_MODE: mode,
342
      constants.IDISK_VG: data_vg,
343
      constants.IDISK_NAME: name,
344
      }
345

    
346
    for key in [
347
      constants.IDISK_METAVG,
348
      constants.IDISK_ADOPT,
349
      constants.IDISK_SPINDLES,
350
      ]:
351
      if key in disk:
352
        new_disk[key] = disk[key]
353

    
354
    # For extstorage, demand the `provider' option and add any
355
    # additional parameters (ext-params) to the dict
356
    if op.disk_template == constants.DT_EXT:
357
      if ext_provider:
358
        new_disk[constants.IDISK_PROVIDER] = ext_provider
359
        for key in disk:
360
          if key not in constants.IDISK_PARAMS:
361
            new_disk[key] = disk[key]
362
      else:
363
        raise errors.OpPrereqError("Missing provider for template '%s'" %
364
                                   constants.DT_EXT, errors.ECODE_INVAL)
365

    
366
    disks.append(new_disk)
367

    
368
  return disks
369

    
370

    
371
def CheckRADOSFreeSpace():
372
  """Compute disk size requirements inside the RADOS cluster.
373

374
  """
375
  # For the RADOS cluster we assume there is always enough space.
376
  pass
377

    
378

    
379
def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380
                         iv_name, p_minor, s_minor):
381
  """Generate a drbd8 device complete with its children.
382

383
  """
384
  assert len(vgnames) == len(names) == 2
385
  port = lu.cfg.AllocatePort()
386
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387

    
388
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389
                          logical_id=(vgnames[0], names[0]),
390
                          params={})
391
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
393
                          size=constants.DRBD_META_SIZE,
394
                          logical_id=(vgnames[1], names[1]),
395
                          params={})
396
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398
                          logical_id=(primary, secondary, port,
399
                                      p_minor, s_minor,
400
                                      shared_secret),
401
                          children=[dev_data, dev_meta],
402
                          iv_name=iv_name, params={})
403
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404
  return drbd_dev
405

    
406

    
407
def GenerateDiskTemplate(
408
  lu, template_name, instance_name, primary_node, secondary_nodes,
409
  disk_info, file_storage_dir, file_driver, base_index,
410
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412
  """Generate the entire disk layout for a given template type.
413

414
  """
415
  vgname = lu.cfg.GetVGName()
416
  disk_count = len(disk_info)
417
  disks = []
418

    
419
  if template_name == constants.DT_DISKLESS:
420
    pass
421
  elif template_name == constants.DT_DRBD8:
422
    if len(secondary_nodes) != 1:
423
      raise errors.ProgrammerError("Wrong template configuration")
424
    remote_node = secondary_nodes[0]
425
    minors = lu.cfg.AllocateDRBDMinor(
426
      [primary_node, remote_node] * len(disk_info), instance_name)
427

    
428
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
429
                                                       full_disk_params)
430
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431

    
432
    names = []
433
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434
                                               for i in range(disk_count)]):
435
      names.append(lv_prefix + "_data")
436
      names.append(lv_prefix + "_meta")
437
    for idx, disk in enumerate(disk_info):
438
      disk_index = idx + base_index
439
      data_vg = disk.get(constants.IDISK_VG, vgname)
440
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442
                                      disk[constants.IDISK_SIZE],
443
                                      [data_vg, meta_vg],
444
                                      names[idx * 2:idx * 2 + 2],
445
                                      "disk/%d" % disk_index,
446
                                      minors[idx * 2], minors[idx * 2 + 1])
447
      disk_dev.mode = disk[constants.IDISK_MODE]
448
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
449
      disks.append(disk_dev)
450
  else:
451
    if secondary_nodes:
452
      raise errors.ProgrammerError("Wrong template configuration")
453

    
454
    if template_name == constants.DT_FILE:
455
      _req_file_storage()
456
    elif template_name == constants.DT_SHARED_FILE:
457
      _req_shr_file_storage()
458

    
459
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460
    if name_prefix is None:
461
      names = None
462
    else:
463
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464
                                        (name_prefix, base_index + i)
465
                                        for i in range(disk_count)])
466

    
467
    if template_name == constants.DT_PLAIN:
468

    
469
      def logical_id_fn(idx, _, disk):
470
        vg = disk.get(constants.IDISK_VG, vgname)
471
        return (vg, names[idx])
472

    
473
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474
      logical_id_fn = \
475
        lambda _, disk_index, disk: (file_driver,
476
                                     "%s/disk%d" % (file_storage_dir,
477
                                                    disk_index))
478
    elif template_name == constants.DT_BLOCK:
479
      logical_id_fn = \
480
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481
                                       disk[constants.IDISK_ADOPT])
482
    elif template_name == constants.DT_RBD:
483
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484
    elif template_name == constants.DT_EXT:
485
      def logical_id_fn(idx, _, disk):
486
        provider = disk.get(constants.IDISK_PROVIDER, None)
487
        if provider is None:
488
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489
                                       " not found", constants.DT_EXT,
490
                                       constants.IDISK_PROVIDER)
491
        return (provider, names[idx])
492
    else:
493
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494

    
495
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
496

    
497
    for idx, disk in enumerate(disk_info):
498
      params = {}
499
      # Only for the Ext template add disk_info to params
500
      if template_name == constants.DT_EXT:
501
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502
        for key in disk:
503
          if key not in constants.IDISK_PARAMS:
504
            params[key] = disk[key]
505
      disk_index = idx + base_index
506
      size = disk[constants.IDISK_SIZE]
507
      feedback_fn("* disk %s, size %s" %
508
                  (disk_index, utils.FormatUnit(size, "h")))
509
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
510
                              logical_id=logical_id_fn(idx, disk_index, disk),
511
                              iv_name="disk/%d" % disk_index,
512
                              mode=disk[constants.IDISK_MODE],
513
                              params=params)
514
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
515
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
516
      disks.append(disk_dev)
517

    
518
  return disks
519

    
520

    
521
def CheckSpindlesExclusiveStorage(diskdict, es_flag):
522
  """Check the presence of the spindle options with exclusive_storage.
523

524
  @type diskdict: dict
525
  @param diskdict: disk parameters
526
  @type es_flag: bool
527
  @param es_flag: the effective value of the exlusive_storage flag
528
  @raise errors.OpPrereqError when spindles are given and they should not
529

530
  """
531
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
532
      diskdict[constants.IDISK_SPINDLES] is not None):
533
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
534
                               " when exclusive storage is not active",
535
                               errors.ECODE_INVAL)
536

    
537

    
538
class LUInstanceRecreateDisks(LogicalUnit):
539
  """Recreate an instance's missing disks.
540

541
  """
542
  HPATH = "instance-recreate-disks"
543
  HTYPE = constants.HTYPE_INSTANCE
544
  REQ_BGL = False
545

    
546
  _MODIFYABLE = compat.UniqueFrozenset([
547
    constants.IDISK_SIZE,
548
    constants.IDISK_MODE,
549
    constants.IDISK_SPINDLES,
550
    ])
551

    
552
  # New or changed disk parameters may have different semantics
553
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
554
    constants.IDISK_ADOPT,
555

    
556
    # TODO: Implement support changing VG while recreating
557
    constants.IDISK_VG,
558
    constants.IDISK_METAVG,
559
    constants.IDISK_PROVIDER,
560
    constants.IDISK_NAME,
561
    ]))
562

    
563
  def _RunAllocator(self):
564
    """Run the allocator based on input opcode.
565

566
    """
567
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
568

    
569
    # FIXME
570
    # The allocator should actually run in "relocate" mode, but current
571
    # allocators don't support relocating all the nodes of an instance at
572
    # the same time. As a workaround we use "allocate" mode, but this is
573
    # suboptimal for two reasons:
574
    # - The instance name passed to the allocator is present in the list of
575
    #   existing instances, so there could be a conflict within the
576
    #   internal structures of the allocator. This doesn't happen with the
577
    #   current allocators, but it's a liability.
578
    # - The allocator counts the resources used by the instance twice: once
579
    #   because the instance exists already, and once because it tries to
580
    #   allocate a new instance.
581
    # The allocator could choose some of the nodes on which the instance is
582
    # running, but that's not a problem. If the instance nodes are broken,
583
    # they should be already be marked as drained or offline, and hence
584
    # skipped by the allocator. If instance disks have been lost for other
585
    # reasons, then recreating the disks on the same nodes should be fine.
586
    disk_template = self.instance.disk_template
587
    spindle_use = be_full[constants.BE_SPINDLE_USE]
588
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
589
                                        disk_template=disk_template,
590
                                        tags=list(self.instance.GetTags()),
591
                                        os=self.instance.os,
592
                                        nics=[{}],
593
                                        vcpus=be_full[constants.BE_VCPUS],
594
                                        memory=be_full[constants.BE_MAXMEM],
595
                                        spindle_use=spindle_use,
596
                                        disks=[{constants.IDISK_SIZE: d.size,
597
                                                constants.IDISK_MODE: d.mode}
598
                                               for d in self.instance.disks],
599
                                        hypervisor=self.instance.hypervisor,
600
                                        node_whitelist=None)
601
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
602

    
603
    ial.Run(self.op.iallocator)
604

    
605
    assert req.RequiredNodes() == len(self.instance.all_nodes)
606

    
607
    if not ial.success:
608
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
609
                                 " %s" % (self.op.iallocator, ial.info),
610
                                 errors.ECODE_NORES)
611

    
612
    self.op.nodes = ial.result
613
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
614
                 self.op.instance_name, self.op.iallocator,
615
                 utils.CommaJoin(ial.result))
616

    
617
  def CheckArguments(self):
618
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
619
      # Normalize and convert deprecated list of disk indices
620
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
621

    
622
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
623
    if duplicates:
624
      raise errors.OpPrereqError("Some disks have been specified more than"
625
                                 " once: %s" % utils.CommaJoin(duplicates),
626
                                 errors.ECODE_INVAL)
627

    
628
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
629
    # when neither iallocator nor nodes are specified
630
    if self.op.iallocator or self.op.nodes:
631
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
632

    
633
    for (idx, params) in self.op.disks:
634
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
635
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
636
      if unsupported:
637
        raise errors.OpPrereqError("Parameters for disk %s try to change"
638
                                   " unmodifyable parameter(s): %s" %
639
                                   (idx, utils.CommaJoin(unsupported)),
640
                                   errors.ECODE_INVAL)
641

    
642
  def ExpandNames(self):
643
    self._ExpandAndLockInstance()
644
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
645

    
646
    if self.op.nodes:
647
      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
648
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
649
    else:
650
      self.needed_locks[locking.LEVEL_NODE] = []
651
      if self.op.iallocator:
652
        # iallocator will select a new node in the same group
653
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
654
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
655

    
656
    self.needed_locks[locking.LEVEL_NODE_RES] = []
657

    
658
  def DeclareLocks(self, level):
659
    if level == locking.LEVEL_NODEGROUP:
660
      assert self.op.iallocator is not None
661
      assert not self.op.nodes
662
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
663
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
664
      # Lock the primary group used by the instance optimistically; this
665
      # requires going via the node before it's locked, requiring
666
      # verification later on
667
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
668
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
669

    
670
    elif level == locking.LEVEL_NODE:
671
      # If an allocator is used, then we lock all the nodes in the current
672
      # instance group, as we don't know yet which ones will be selected;
673
      # if we replace the nodes without using an allocator, locks are
674
      # already declared in ExpandNames; otherwise, we need to lock all the
675
      # instance nodes for disk re-creation
676
      if self.op.iallocator:
677
        assert not self.op.nodes
678
        assert not self.needed_locks[locking.LEVEL_NODE]
679
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
680

    
681
        # Lock member nodes of the group of the primary node
682
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
683
          self.needed_locks[locking.LEVEL_NODE].extend(
684
            self.cfg.GetNodeGroup(group_uuid).members)
685

    
686
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
687
      elif not self.op.nodes:
688
        self._LockInstancesNodes(primary_only=False)
689
    elif level == locking.LEVEL_NODE_RES:
690
      # Copy node locks
691
      self.needed_locks[locking.LEVEL_NODE_RES] = \
692
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
693

    
694
  def BuildHooksEnv(self):
695
    """Build hooks env.
696

697
    This runs on master, primary and secondary nodes of the instance.
698

699
    """
700
    return BuildInstanceHookEnvByObject(self, self.instance)
701

    
702
  def BuildHooksNodes(self):
703
    """Build hooks nodes.
704

705
    """
706
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
707
    return (nl, nl)
708

    
709
  def CheckPrereq(self):
710
    """Check prerequisites.
711

712
    This checks that the instance is in the cluster and is not running.
713

714
    """
715
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
716
    assert instance is not None, \
717
      "Cannot retrieve locked instance %s" % self.op.instance_name
718
    if self.op.nodes:
719
      if len(self.op.nodes) != len(instance.all_nodes):
720
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
721
                                   " %d replacement nodes were specified" %
722
                                   (instance.name, len(instance.all_nodes),
723
                                    len(self.op.nodes)),
724
                                   errors.ECODE_INVAL)
725
      assert instance.disk_template != constants.DT_DRBD8 or \
726
             len(self.op.nodes) == 2
727
      assert instance.disk_template != constants.DT_PLAIN or \
728
             len(self.op.nodes) == 1
729
      primary_node = self.op.nodes[0]
730
    else:
731
      primary_node = instance.primary_node
732
    if not self.op.iallocator:
733
      CheckNodeOnline(self, primary_node)
734

    
735
    if instance.disk_template == constants.DT_DISKLESS:
736
      raise errors.OpPrereqError("Instance '%s' has no disks" %
737
                                 self.op.instance_name, errors.ECODE_INVAL)
738

    
739
    # Verify if node group locks are still correct
740
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
741
    if owned_groups:
742
      # Node group locks are acquired only for the primary node (and only
743
      # when the allocator is used)
744
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
745
                              primary_only=True)
746

    
747
    # if we replace nodes *and* the old primary is offline, we don't
748
    # check the instance state
749
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
750
    if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
751
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
752
                         msg="cannot recreate disks")
753

    
754
    if self.op.disks:
755
      self.disks = dict(self.op.disks)
756
    else:
757
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
758

    
759
    maxidx = max(self.disks.keys())
760
    if maxidx >= len(instance.disks):
761
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
762
                                 errors.ECODE_INVAL)
763

    
764
    if ((self.op.nodes or self.op.iallocator) and
765
         sorted(self.disks.keys()) != range(len(instance.disks))):
766
      raise errors.OpPrereqError("Can't recreate disks partially and"
767
                                 " change the nodes at the same time",
768
                                 errors.ECODE_INVAL)
769

    
770
    self.instance = instance
771

    
772
    if self.op.iallocator:
773
      self._RunAllocator()
774
      # Release unneeded node and node resource locks
775
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
776
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
777
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
778

    
779
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
780

    
781
    if self.op.nodes:
782
      nodes = self.op.nodes
783
    else:
784
      nodes = instance.all_nodes
785
    excl_stor = compat.any(
786
      rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
787
      )
788
    for new_params in self.disks.values():
789
      CheckSpindlesExclusiveStorage(new_params, excl_stor)
790

    
791
  def Exec(self, feedback_fn):
792
    """Recreate the disks.
793

794
    """
795
    instance = self.instance
796

    
797
    assert (self.owned_locks(locking.LEVEL_NODE) ==
798
            self.owned_locks(locking.LEVEL_NODE_RES))
799

    
800
    to_skip = []
801
    mods = [] # keeps track of needed changes
802

    
803
    for idx, disk in enumerate(instance.disks):
804
      try:
805
        changes = self.disks[idx]
806
      except KeyError:
807
        # Disk should not be recreated
808
        to_skip.append(idx)
809
        continue
810

    
811
      # update secondaries for disks, if needed
812
      if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
813
        # need to update the nodes and minors
814
        assert len(self.op.nodes) == 2
815
        assert len(disk.logical_id) == 6 # otherwise disk internals
816
                                         # have changed
817
        (_, _, old_port, _, _, old_secret) = disk.logical_id
818
        new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
819
        new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
820
                  new_minors[0], new_minors[1], old_secret)
821
        assert len(disk.logical_id) == len(new_id)
822
      else:
823
        new_id = None
824

    
825
      mods.append((idx, new_id, changes))
826

    
827
    # now that we have passed all asserts above, we can apply the mods
828
    # in a single run (to avoid partial changes)
829
    for idx, new_id, changes in mods:
830
      disk = instance.disks[idx]
831
      if new_id is not None:
832
        assert disk.dev_type == constants.LD_DRBD8
833
        disk.logical_id = new_id
834
      if changes:
835
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
836
                    mode=changes.get(constants.IDISK_MODE, None))
837

    
838
    # change primary node, if needed
839
    if self.op.nodes:
840
      instance.primary_node = self.op.nodes[0]
841
      self.LogWarning("Changing the instance's nodes, you will have to"
842
                      " remove any disks left on the older nodes manually")
843

    
844
    if self.op.nodes:
845
      self.cfg.Update(instance, feedback_fn)
846

    
847
    # All touched nodes must be locked
848
    mylocks = self.owned_locks(locking.LEVEL_NODE)
849
    assert mylocks.issuperset(frozenset(instance.all_nodes))
850
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
851

    
852
    # TODO: Release node locks before wiping, or explain why it's not possible
853
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
854
      wipedisks = [(idx, disk, 0)
855
                   for (idx, disk) in enumerate(instance.disks)
856
                   if idx not in to_skip]
857
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
858

    
859

    
860
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
861
  """Checks if nodes have enough free disk space in the specified VG.
862

863
  This function checks if all given nodes have the needed amount of
864
  free disk. In case any node has less disk or we cannot get the
865
  information from the node, this function raises an OpPrereqError
866
  exception.
867

868
  @type lu: C{LogicalUnit}
869
  @param lu: a logical unit from which we get configuration data
870
  @type nodenames: C{list}
871
  @param nodenames: the list of node names to check
872
  @type vg: C{str}
873
  @param vg: the volume group to check
874
  @type requested: C{int}
875
  @param requested: the amount of disk in MiB to check for
876
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
877
      or we cannot check the node
878

879
  """
880
  es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
881
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
882
  # the current functionality. Refactor to make it more flexible.
883
  nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
884
                                   es_flags)
885
  for node in nodenames:
886
    info = nodeinfo[node]
887
    info.Raise("Cannot get current information from node %s" % node,
888
               prereq=True, ecode=errors.ECODE_ENVIRON)
889
    (_, (vg_info, ), _) = info.payload
890
    vg_free = vg_info.get("vg_free", None)
891
    if not isinstance(vg_free, int):
892
      raise errors.OpPrereqError("Can't compute free disk space on node"
893
                                 " %s for vg %s, result was '%s'" %
894
                                 (node, vg, vg_free), errors.ECODE_ENVIRON)
895
    if requested > vg_free:
896
      raise errors.OpPrereqError("Not enough disk space on target node %s"
897
                                 " vg %s: required %d MiB, available %d MiB" %
898
                                 (node, vg, requested, vg_free),
899
                                 errors.ECODE_NORES)
900

    
901

    
902
def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
903
  """Checks if nodes have enough free disk space in all the VGs.
904

905
  This function checks if all given nodes have the needed amount of
906
  free disk. In case any node has less disk or we cannot get the
907
  information from the node, this function raises an OpPrereqError
908
  exception.
909

910
  @type lu: C{LogicalUnit}
911
  @param lu: a logical unit from which we get configuration data
912
  @type nodenames: C{list}
913
  @param nodenames: the list of node names to check
914
  @type req_sizes: C{dict}
915
  @param req_sizes: the hash of vg and corresponding amount of disk in
916
      MiB to check for
917
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
918
      or we cannot check the node
919

920
  """
921
  for vg, req_size in req_sizes.items():
922
    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
923

    
924

    
925
def _DiskSizeInBytesToMebibytes(lu, size):
926
  """Converts a disk size in bytes to mebibytes.
927

928
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
929

930
  """
931
  (mib, remainder) = divmod(size, 1024 * 1024)
932

    
933
  if remainder != 0:
934
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
935
                  " to not overwrite existing data (%s bytes will not be"
936
                  " wiped)", (1024 * 1024) - remainder)
937
    mib += 1
938

    
939
  return mib
940

    
941

    
942
def _CalcEta(time_taken, written, total_size):
943
  """Calculates the ETA based on size written and total size.
944

945
  @param time_taken: The time taken so far
946
  @param written: amount written so far
947
  @param total_size: The total size of data to be written
948
  @return: The remaining time in seconds
949

950
  """
951
  avg_time = time_taken / float(written)
952
  return (total_size - written) * avg_time
953

    
954

    
955
def WipeDisks(lu, instance, disks=None):
956
  """Wipes instance disks.
957

958
  @type lu: L{LogicalUnit}
959
  @param lu: the logical unit on whose behalf we execute
960
  @type instance: L{objects.Instance}
961
  @param instance: the instance whose disks we should create
962
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
963
  @param disks: Disk details; tuple contains disk index, disk object and the
964
    start offset
965

966
  """
967
  node = instance.primary_node
968

    
969
  if disks is None:
970
    disks = [(idx, disk, 0)
971
             for (idx, disk) in enumerate(instance.disks)]
972

    
973
  for (_, device, _) in disks:
974
    lu.cfg.SetDiskID(device, node)
975

    
976
  logging.info("Pausing synchronization of disks of instance '%s'",
977
               instance.name)
978
  result = lu.rpc.call_blockdev_pause_resume_sync(node,
979
                                                  (map(compat.snd, disks),
980
                                                   instance),
981
                                                  True)
982
  result.Raise("Failed to pause disk synchronization on node '%s'" % node)
983

    
984
  for idx, success in enumerate(result.payload):
985
    if not success:
986
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
987
                   " failed", idx, instance.name)
988

    
989
  try:
990
    for (idx, device, offset) in disks:
991
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
992
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
993
      wipe_chunk_size = \
994
        int(min(constants.MAX_WIPE_CHUNK,
995
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
996

    
997
      size = device.size
998
      last_output = 0
999
      start_time = time.time()
1000

    
1001
      if offset == 0:
1002
        info_text = ""
1003
      else:
1004
        info_text = (" (from %s to %s)" %
1005
                     (utils.FormatUnit(offset, "h"),
1006
                      utils.FormatUnit(size, "h")))
1007

    
1008
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1009

    
1010
      logging.info("Wiping disk %d for instance %s on node %s using"
1011
                   " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1012

    
1013
      while offset < size:
1014
        wipe_size = min(wipe_chunk_size, size - offset)
1015

    
1016
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1017
                      idx, offset, wipe_size)
1018

    
1019
        result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1020
                                           wipe_size)
1021
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1022
                     (idx, offset, wipe_size))
1023

    
1024
        now = time.time()
1025
        offset += wipe_size
1026
        if now - last_output >= 60:
1027
          eta = _CalcEta(now - start_time, offset, size)
1028
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1029
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1030
          last_output = now
1031
  finally:
1032
    logging.info("Resuming synchronization of disks for instance '%s'",
1033
                 instance.name)
1034

    
1035
    result = lu.rpc.call_blockdev_pause_resume_sync(node,
1036
                                                    (map(compat.snd, disks),
1037
                                                     instance),
1038
                                                    False)
1039

    
1040
    if result.fail_msg:
1041
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1042
                    node, result.fail_msg)
1043
    else:
1044
      for idx, success in enumerate(result.payload):
1045
        if not success:
1046
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1047
                        " failed", idx, instance.name)
1048

    
1049

    
1050
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1051
  """Wrapper for L{WipeDisks} that handles errors.
1052

1053
  @type lu: L{LogicalUnit}
1054
  @param lu: the logical unit on whose behalf we execute
1055
  @type instance: L{objects.Instance}
1056
  @param instance: the instance whose disks we should wipe
1057
  @param disks: see L{WipeDisks}
1058
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1059
      case of error
1060
  @raise errors.OpPrereqError: in case of failure
1061

1062
  """
1063
  try:
1064
    WipeDisks(lu, instance, disks=disks)
1065
  except errors.OpExecError:
1066
    logging.warning("Wiping disks for instance '%s' failed",
1067
                    instance.name)
1068
    _UndoCreateDisks(lu, cleanup)
1069
    raise
1070

    
1071

    
1072
def ExpandCheckDisks(instance, disks):
1073
  """Return the instance disks selected by the disks list
1074

1075
  @type disks: list of L{objects.Disk} or None
1076
  @param disks: selected disks
1077
  @rtype: list of L{objects.Disk}
1078
  @return: selected instance disks to act on
1079

1080
  """
1081
  if disks is None:
1082
    return instance.disks
1083
  else:
1084
    if not set(disks).issubset(instance.disks):
1085
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1086
                                   " target instance")
1087
    return disks
1088

    
1089

    
1090
def WaitForSync(lu, instance, disks=None, oneshot=False):
1091
  """Sleep and poll for an instance's disk to sync.
1092

1093
  """
1094
  if not instance.disks or disks is not None and not disks:
1095
    return True
1096

    
1097
  disks = ExpandCheckDisks(instance, disks)
1098

    
1099
  if not oneshot:
1100
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1101

    
1102
  node = instance.primary_node
1103

    
1104
  for dev in disks:
1105
    lu.cfg.SetDiskID(dev, node)
1106

    
1107
  # TODO: Convert to utils.Retry
1108

    
1109
  retries = 0
1110
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1111
  while True:
1112
    max_time = 0
1113
    done = True
1114
    cumul_degraded = False
1115
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1116
    msg = rstats.fail_msg
1117
    if msg:
1118
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1119
      retries += 1
1120
      if retries >= 10:
1121
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1122
                                 " aborting." % node)
1123
      time.sleep(6)
1124
      continue
1125
    rstats = rstats.payload
1126
    retries = 0
1127
    for i, mstat in enumerate(rstats):
1128
      if mstat is None:
1129
        lu.LogWarning("Can't compute data for node %s/%s",
1130
                      node, disks[i].iv_name)
1131
        continue
1132

    
1133
      cumul_degraded = (cumul_degraded or
1134
                        (mstat.is_degraded and mstat.sync_percent is None))
1135
      if mstat.sync_percent is not None:
1136
        done = False
1137
        if mstat.estimated_time is not None:
1138
          rem_time = ("%s remaining (estimated)" %
1139
                      utils.FormatSeconds(mstat.estimated_time))
1140
          max_time = mstat.estimated_time
1141
        else:
1142
          rem_time = "no time estimate"
1143
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1144
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1145

    
1146
    # if we're done but degraded, let's do a few small retries, to
1147
    # make sure we see a stable and not transient situation; therefore
1148
    # we force restart of the loop
1149
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1150
      logging.info("Degraded disks found, %d retries left", degr_retries)
1151
      degr_retries -= 1
1152
      time.sleep(1)
1153
      continue
1154

    
1155
    if done or oneshot:
1156
      break
1157

    
1158
    time.sleep(min(60, max_time))
1159

    
1160
  if done:
1161
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1162

    
1163
  return not cumul_degraded
1164

    
1165

    
1166
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1167
  """Shutdown block devices of an instance.
1168

1169
  This does the shutdown on all nodes of the instance.
1170

1171
  If the ignore_primary is false, errors on the primary node are
1172
  ignored.
1173

1174
  """
1175
  all_result = True
1176
  disks = ExpandCheckDisks(instance, disks)
1177

    
1178
  for disk in disks:
1179
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1180
      lu.cfg.SetDiskID(top_disk, node)
1181
      result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1182
      msg = result.fail_msg
1183
      if msg:
1184
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1185
                      disk.iv_name, node, msg)
1186
        if ((node == instance.primary_node and not ignore_primary) or
1187
            (node != instance.primary_node and not result.offline)):
1188
          all_result = False
1189
  return all_result
1190

    
1191

    
1192
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1193
  """Shutdown block devices of an instance.
1194

1195
  This function checks if an instance is running, before calling
1196
  _ShutdownInstanceDisks.
1197

1198
  """
1199
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1200
  ShutdownInstanceDisks(lu, instance, disks=disks)
1201

    
1202

    
1203
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1204
                           ignore_size=False):
1205
  """Prepare the block devices for an instance.
1206

1207
  This sets up the block devices on all nodes.
1208

1209
  @type lu: L{LogicalUnit}
1210
  @param lu: the logical unit on whose behalf we execute
1211
  @type instance: L{objects.Instance}
1212
  @param instance: the instance for whose disks we assemble
1213
  @type disks: list of L{objects.Disk} or None
1214
  @param disks: which disks to assemble (or all, if None)
1215
  @type ignore_secondaries: boolean
1216
  @param ignore_secondaries: if true, errors on secondary nodes
1217
      won't result in an error return from the function
1218
  @type ignore_size: boolean
1219
  @param ignore_size: if true, the current known size of the disk
1220
      will not be used during the disk activation, useful for cases
1221
      when the size is wrong
1222
  @return: False if the operation failed, otherwise a list of
1223
      (host, instance_visible_name, node_visible_name)
1224
      with the mapping from node devices to instance devices
1225

1226
  """
1227
  device_info = []
1228
  disks_ok = True
1229
  iname = instance.name
1230
  disks = ExpandCheckDisks(instance, disks)
1231

    
1232
  # With the two passes mechanism we try to reduce the window of
1233
  # opportunity for the race condition of switching DRBD to primary
1234
  # before handshaking occured, but we do not eliminate it
1235

    
1236
  # The proper fix would be to wait (with some limits) until the
1237
  # connection has been made and drbd transitions from WFConnection
1238
  # into any other network-connected state (Connected, SyncTarget,
1239
  # SyncSource, etc.)
1240

    
1241
  # 1st pass, assemble on all nodes in secondary mode
1242
  for idx, inst_disk in enumerate(disks):
1243
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1244
      if ignore_size:
1245
        node_disk = node_disk.Copy()
1246
        node_disk.UnsetSize()
1247
      lu.cfg.SetDiskID(node_disk, node)
1248
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1249
                                             False, idx)
1250
      msg = result.fail_msg
1251
      if msg:
1252
        is_offline_secondary = (node in instance.secondary_nodes and
1253
                                result.offline)
1254
        lu.LogWarning("Could not prepare block device %s on node %s"
1255
                      " (is_primary=False, pass=1): %s",
1256
                      inst_disk.iv_name, node, msg)
1257
        if not (ignore_secondaries or is_offline_secondary):
1258
          disks_ok = False
1259

    
1260
  # FIXME: race condition on drbd migration to primary
1261

    
1262
  # 2nd pass, do only the primary node
1263
  for idx, inst_disk in enumerate(disks):
1264
    dev_path = None
1265

    
1266
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1267
      if node != instance.primary_node:
1268
        continue
1269
      if ignore_size:
1270
        node_disk = node_disk.Copy()
1271
        node_disk.UnsetSize()
1272
      lu.cfg.SetDiskID(node_disk, node)
1273
      result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1274
                                             True, idx)
1275
      msg = result.fail_msg
1276
      if msg:
1277
        lu.LogWarning("Could not prepare block device %s on node %s"
1278
                      " (is_primary=True, pass=2): %s",
1279
                      inst_disk.iv_name, node, msg)
1280
        disks_ok = False
1281
      else:
1282
        dev_path = result.payload
1283

    
1284
    device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1285

    
1286
  # leave the disks configured for the primary node
1287
  # this is a workaround that would be fixed better by
1288
  # improving the logical/physical id handling
1289
  for disk in disks:
1290
    lu.cfg.SetDiskID(disk, instance.primary_node)
1291

    
1292
  return disks_ok, device_info
1293

    
1294

    
1295
def StartInstanceDisks(lu, instance, force):
1296
  """Start the disks of an instance.
1297

1298
  """
1299
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1300
                                      ignore_secondaries=force)
1301
  if not disks_ok:
1302
    ShutdownInstanceDisks(lu, instance)
1303
    if force is not None and not force:
1304
      lu.LogWarning("",
1305
                    hint=("If the message above refers to a secondary node,"
1306
                          " you can retry the operation using '--force'"))
1307
    raise errors.OpExecError("Disk consistency error")
1308

    
1309

    
1310
class LUInstanceGrowDisk(LogicalUnit):
1311
  """Grow a disk of an instance.
1312

1313
  """
1314
  HPATH = "disk-grow"
1315
  HTYPE = constants.HTYPE_INSTANCE
1316
  REQ_BGL = False
1317

    
1318
  def ExpandNames(self):
1319
    self._ExpandAndLockInstance()
1320
    self.needed_locks[locking.LEVEL_NODE] = []
1321
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1322
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1323
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1324

    
1325
  def DeclareLocks(self, level):
1326
    if level == locking.LEVEL_NODE:
1327
      self._LockInstancesNodes()
1328
    elif level == locking.LEVEL_NODE_RES:
1329
      # Copy node locks
1330
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1331
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1332

    
1333
  def BuildHooksEnv(self):
1334
    """Build hooks env.
1335

1336
    This runs on the master, the primary and all the secondaries.
1337

1338
    """
1339
    env = {
1340
      "DISK": self.op.disk,
1341
      "AMOUNT": self.op.amount,
1342
      "ABSOLUTE": self.op.absolute,
1343
      }
1344
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1345
    return env
1346

    
1347
  def BuildHooksNodes(self):
1348
    """Build hooks nodes.
1349

1350
    """
1351
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1352
    return (nl, nl)
1353

    
1354
  def CheckPrereq(self):
1355
    """Check prerequisites.
1356

1357
    This checks that the instance is in the cluster.
1358

1359
    """
1360
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1361
    assert instance is not None, \
1362
      "Cannot retrieve locked instance %s" % self.op.instance_name
1363
    nodenames = list(instance.all_nodes)
1364
    for node in nodenames:
1365
      CheckNodeOnline(self, node)
1366

    
1367
    self.instance = instance
1368

    
1369
    if instance.disk_template not in constants.DTS_GROWABLE:
1370
      raise errors.OpPrereqError("Instance's disk layout does not support"
1371
                                 " growing", errors.ECODE_INVAL)
1372

    
1373
    self.disk = instance.FindDisk(self.op.disk)
1374

    
1375
    if self.op.absolute:
1376
      self.target = self.op.amount
1377
      self.delta = self.target - self.disk.size
1378
      if self.delta < 0:
1379
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1380
                                   "current disk size (%s)" %
1381
                                   (utils.FormatUnit(self.target, "h"),
1382
                                    utils.FormatUnit(self.disk.size, "h")),
1383
                                   errors.ECODE_STATE)
1384
    else:
1385
      self.delta = self.op.amount
1386
      self.target = self.disk.size + self.delta
1387
      if self.delta < 0:
1388
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1389
                                   utils.FormatUnit(self.delta, "h"),
1390
                                   errors.ECODE_INVAL)
1391

    
1392
    self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1393

    
1394
  def _CheckDiskSpace(self, nodenames, req_vgspace):
1395
    template = self.instance.disk_template
1396
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1397
      # TODO: check the free disk space for file, when that feature will be
1398
      # supported
1399
      nodes = map(self.cfg.GetNodeInfo, nodenames)
1400
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1401
                        nodes)
1402
      if es_nodes:
1403
        # With exclusive storage we need to something smarter than just looking
1404
        # at free space; for now, let's simply abort the operation.
1405
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1406
                                   " is enabled", errors.ECODE_STATE)
1407
      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1408

    
1409
  def Exec(self, feedback_fn):
1410
    """Execute disk grow.
1411

1412
    """
1413
    instance = self.instance
1414
    disk = self.disk
1415

    
1416
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1417
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1418
            self.owned_locks(locking.LEVEL_NODE_RES))
1419

    
1420
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1421

    
1422
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1423
    if not disks_ok:
1424
      raise errors.OpExecError("Cannot activate block device to grow")
1425

    
1426
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1427
                (self.op.disk, instance.name,
1428
                 utils.FormatUnit(self.delta, "h"),
1429
                 utils.FormatUnit(self.target, "h")))
1430

    
1431
    # First run all grow ops in dry-run mode
1432
    for node in instance.all_nodes:
1433
      self.cfg.SetDiskID(disk, node)
1434
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1435
                                           True, True)
1436
      result.Raise("Dry-run grow request failed to node %s" % node)
1437

    
1438
    if wipe_disks:
1439
      # Get disk size from primary node for wiping
1440
      result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1441
      result.Raise("Failed to retrieve disk size from node '%s'" %
1442
                   instance.primary_node)
1443

    
1444
      (disk_size_in_bytes, ) = result.payload
1445

    
1446
      if disk_size_in_bytes is None:
1447
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1448
                                 " node '%s'" % instance.primary_node)
1449

    
1450
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1451

    
1452
      assert old_disk_size >= disk.size, \
1453
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1454
         (old_disk_size, disk.size))
1455
    else:
1456
      old_disk_size = None
1457

    
1458
    # We know that (as far as we can test) operations across different
1459
    # nodes will succeed, time to run it for real on the backing storage
1460
    for node in instance.all_nodes:
1461
      self.cfg.SetDiskID(disk, node)
1462
      result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1463
                                           False, True)
1464
      result.Raise("Grow request failed to node %s" % node)
1465

    
1466
    # And now execute it for logical storage, on the primary node
1467
    node = instance.primary_node
1468
    self.cfg.SetDiskID(disk, node)
1469
    result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1470
                                         False, False)
1471
    result.Raise("Grow request failed to node %s" % node)
1472

    
1473
    disk.RecordGrow(self.delta)
1474
    self.cfg.Update(instance, feedback_fn)
1475

    
1476
    # Changes have been recorded, release node lock
1477
    ReleaseLocks(self, locking.LEVEL_NODE)
1478

    
1479
    # Downgrade lock while waiting for sync
1480
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1481

    
1482
    assert wipe_disks ^ (old_disk_size is None)
1483

    
1484
    if wipe_disks:
1485
      assert instance.disks[self.op.disk] == disk
1486

    
1487
      # Wipe newly added disk space
1488
      WipeDisks(self, instance,
1489
                disks=[(self.op.disk, disk, old_disk_size)])
1490

    
1491
    if self.op.wait_for_sync:
1492
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1493
      if disk_abort:
1494
        self.LogWarning("Disk syncing has not returned a good status; check"
1495
                        " the instance")
1496
      if instance.admin_state != constants.ADMINST_UP:
1497
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1498
    elif instance.admin_state != constants.ADMINST_UP:
1499
      self.LogWarning("Not shutting down the disk even if the instance is"
1500
                      " not supposed to be running because no wait for"
1501
                      " sync mode was requested")
1502

    
1503
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1504
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1505

    
1506

    
1507
class LUInstanceReplaceDisks(LogicalUnit):
1508
  """Replace the disks of an instance.
1509

1510
  """
1511
  HPATH = "mirrors-replace"
1512
  HTYPE = constants.HTYPE_INSTANCE
1513
  REQ_BGL = False
1514

    
1515
  def CheckArguments(self):
1516
    """Check arguments.
1517

1518
    """
1519
    remote_node = self.op.remote_node
1520
    ialloc = self.op.iallocator
1521
    if self.op.mode == constants.REPLACE_DISK_CHG:
1522
      if remote_node is None and ialloc is None:
1523
        raise errors.OpPrereqError("When changing the secondary either an"
1524
                                   " iallocator script must be used or the"
1525
                                   " new node given", errors.ECODE_INVAL)
1526
      else:
1527
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1528

    
1529
    elif remote_node is not None or ialloc is not None:
1530
      # Not replacing the secondary
1531
      raise errors.OpPrereqError("The iallocator and new node options can"
1532
                                 " only be used when changing the"
1533
                                 " secondary node", errors.ECODE_INVAL)
1534

    
1535
  def ExpandNames(self):
1536
    self._ExpandAndLockInstance()
1537

    
1538
    assert locking.LEVEL_NODE not in self.needed_locks
1539
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1540
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1541

    
1542
    assert self.op.iallocator is None or self.op.remote_node is None, \
1543
      "Conflicting options"
1544

    
1545
    if self.op.remote_node is not None:
1546
      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1547

    
1548
      # Warning: do not remove the locking of the new secondary here
1549
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1550
      # currently it doesn't since parallel invocations of
1551
      # FindUnusedMinor will conflict
1552
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1553
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1554
    else:
1555
      self.needed_locks[locking.LEVEL_NODE] = []
1556
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1557

    
1558
      if self.op.iallocator is not None:
1559
        # iallocator will select a new node in the same group
1560
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1561
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1562

    
1563
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1564

    
1565
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1566
                                   self.op.iallocator, self.op.remote_node,
1567
                                   self.op.disks, self.op.early_release,
1568
                                   self.op.ignore_ipolicy)
1569

    
1570
    self.tasklets = [self.replacer]
1571

    
1572
  def DeclareLocks(self, level):
1573
    if level == locking.LEVEL_NODEGROUP:
1574
      assert self.op.remote_node is None
1575
      assert self.op.iallocator is not None
1576
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1577

    
1578
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1579
      # Lock all groups used by instance optimistically; this requires going
1580
      # via the node before it's locked, requiring verification later on
1581
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1582
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1583

    
1584
    elif level == locking.LEVEL_NODE:
1585
      if self.op.iallocator is not None:
1586
        assert self.op.remote_node is None
1587
        assert not self.needed_locks[locking.LEVEL_NODE]
1588
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1589

    
1590
        # Lock member nodes of all locked groups
1591
        self.needed_locks[locking.LEVEL_NODE] = \
1592
          [node_name
1593
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1594
           for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1595
      else:
1596
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1597

    
1598
        self._LockInstancesNodes()
1599

    
1600
    elif level == locking.LEVEL_NODE_RES:
1601
      # Reuse node locks
1602
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1603
        self.needed_locks[locking.LEVEL_NODE]
1604

    
1605
  def BuildHooksEnv(self):
1606
    """Build hooks env.
1607

1608
    This runs on the master, the primary and all the secondaries.
1609

1610
    """
1611
    instance = self.replacer.instance
1612
    env = {
1613
      "MODE": self.op.mode,
1614
      "NEW_SECONDARY": self.op.remote_node,
1615
      "OLD_SECONDARY": instance.secondary_nodes[0],
1616
      }
1617
    env.update(BuildInstanceHookEnvByObject(self, instance))
1618
    return env
1619

    
1620
  def BuildHooksNodes(self):
1621
    """Build hooks nodes.
1622

1623
    """
1624
    instance = self.replacer.instance
1625
    nl = [
1626
      self.cfg.GetMasterNode(),
1627
      instance.primary_node,
1628
      ]
1629
    if self.op.remote_node is not None:
1630
      nl.append(self.op.remote_node)
1631
    return nl, nl
1632

    
1633
  def CheckPrereq(self):
1634
    """Check prerequisites.
1635

1636
    """
1637
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1638
            self.op.iallocator is None)
1639

    
1640
    # Verify if node group locks are still correct
1641
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1642
    if owned_groups:
1643
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1644

    
1645
    return LogicalUnit.CheckPrereq(self)
1646

    
1647

    
1648
class LUInstanceActivateDisks(NoHooksLU):
1649
  """Bring up an instance's disks.
1650

1651
  """
1652
  REQ_BGL = False
1653

    
1654
  def ExpandNames(self):
1655
    self._ExpandAndLockInstance()
1656
    self.needed_locks[locking.LEVEL_NODE] = []
1657
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1658

    
1659
  def DeclareLocks(self, level):
1660
    if level == locking.LEVEL_NODE:
1661
      self._LockInstancesNodes()
1662

    
1663
  def CheckPrereq(self):
1664
    """Check prerequisites.
1665

1666
    This checks that the instance is in the cluster.
1667

1668
    """
1669
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1670
    assert self.instance is not None, \
1671
      "Cannot retrieve locked instance %s" % self.op.instance_name
1672
    CheckNodeOnline(self, self.instance.primary_node)
1673

    
1674
  def Exec(self, feedback_fn):
1675
    """Activate the disks.
1676

1677
    """
1678
    disks_ok, disks_info = \
1679
              AssembleInstanceDisks(self, self.instance,
1680
                                    ignore_size=self.op.ignore_size)
1681
    if not disks_ok:
1682
      raise errors.OpExecError("Cannot activate block devices")
1683

    
1684
    if self.op.wait_for_sync:
1685
      if not WaitForSync(self, self.instance):
1686
        raise errors.OpExecError("Some disks of the instance are degraded!")
1687

    
1688
    return disks_info
1689

    
1690

    
1691
class LUInstanceDeactivateDisks(NoHooksLU):
1692
  """Shutdown an instance's disks.
1693

1694
  """
1695
  REQ_BGL = False
1696

    
1697
  def ExpandNames(self):
1698
    self._ExpandAndLockInstance()
1699
    self.needed_locks[locking.LEVEL_NODE] = []
1700
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1701

    
1702
  def DeclareLocks(self, level):
1703
    if level == locking.LEVEL_NODE:
1704
      self._LockInstancesNodes()
1705

    
1706
  def CheckPrereq(self):
1707
    """Check prerequisites.
1708

1709
    This checks that the instance is in the cluster.
1710

1711
    """
1712
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1713
    assert self.instance is not None, \
1714
      "Cannot retrieve locked instance %s" % self.op.instance_name
1715

    
1716
  def Exec(self, feedback_fn):
1717
    """Deactivate the disks
1718

1719
    """
1720
    instance = self.instance
1721
    if self.op.force:
1722
      ShutdownInstanceDisks(self, instance)
1723
    else:
1724
      _SafeShutdownInstanceDisks(self, instance)
1725

    
1726

    
1727
def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1728
                               ldisk=False):
1729
  """Check that mirrors are not degraded.
1730

1731
  @attention: The device has to be annotated already.
1732

1733
  The ldisk parameter, if True, will change the test from the
1734
  is_degraded attribute (which represents overall non-ok status for
1735
  the device(s)) to the ldisk (representing the local storage status).
1736

1737
  """
1738
  lu.cfg.SetDiskID(dev, node)
1739

    
1740
  result = True
1741

    
1742
  if on_primary or dev.AssembleOnSecondary():
1743
    rstats = lu.rpc.call_blockdev_find(node, dev)
1744
    msg = rstats.fail_msg
1745
    if msg:
1746
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1747
      result = False
1748
    elif not rstats.payload:
1749
      lu.LogWarning("Can't find disk on node %s", node)
1750
      result = False
1751
    else:
1752
      if ldisk:
1753
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1754
      else:
1755
        result = result and not rstats.payload.is_degraded
1756

    
1757
  if dev.children:
1758
    for child in dev.children:
1759
      result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1760
                                                     on_primary)
1761

    
1762
  return result
1763

    
1764

    
1765
def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1766
  """Wrapper around L{_CheckDiskConsistencyInner}.
1767

1768
  """
1769
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1770
  return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1771
                                    ldisk=ldisk)
1772

    
1773

    
1774
def _BlockdevFind(lu, node, dev, instance):
1775
  """Wrapper around call_blockdev_find to annotate diskparams.
1776

1777
  @param lu: A reference to the lu object
1778
  @param node: The node to call out
1779
  @param dev: The device to find
1780
  @param instance: The instance object the device belongs to
1781
  @returns The result of the rpc call
1782

1783
  """
1784
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1785
  return lu.rpc.call_blockdev_find(node, disk)
1786

    
1787

    
1788
def _GenerateUniqueNames(lu, exts):
1789
  """Generate a suitable LV name.
1790

1791
  This will generate a logical volume name for the given instance.
1792

1793
  """
1794
  results = []
1795
  for val in exts:
1796
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1797
    results.append("%s%s" % (new_id, val))
1798
  return results
1799

    
1800

    
1801
class TLReplaceDisks(Tasklet):
1802
  """Replaces disks for an instance.
1803

1804
  Note: Locking is not within the scope of this class.
1805

1806
  """
1807
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1808
               disks, early_release, ignore_ipolicy):
1809
    """Initializes this class.
1810

1811
    """
1812
    Tasklet.__init__(self, lu)
1813

    
1814
    # Parameters
1815
    self.instance_name = instance_name
1816
    self.mode = mode
1817
    self.iallocator_name = iallocator_name
1818
    self.remote_node = remote_node
1819
    self.disks = disks
1820
    self.early_release = early_release
1821
    self.ignore_ipolicy = ignore_ipolicy
1822

    
1823
    # Runtime data
1824
    self.instance = None
1825
    self.new_node = None
1826
    self.target_node = None
1827
    self.other_node = None
1828
    self.remote_node_info = None
1829
    self.node_secondary_ip = None
1830

    
1831
  @staticmethod
1832
  def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1833
    """Compute a new secondary node using an IAllocator.
1834

1835
    """
1836
    req = iallocator.IAReqRelocate(name=instance_name,
1837
                                   relocate_from=list(relocate_from))
1838
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1839

    
1840
    ial.Run(iallocator_name)
1841

    
1842
    if not ial.success:
1843
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1844
                                 " %s" % (iallocator_name, ial.info),
1845
                                 errors.ECODE_NORES)
1846

    
1847
    remote_node_name = ial.result[0]
1848

    
1849
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1850
               instance_name, remote_node_name)
1851

    
1852
    return remote_node_name
1853

    
1854
  def _FindFaultyDisks(self, node_name):
1855
    """Wrapper for L{FindFaultyInstanceDisks}.
1856

1857
    """
1858
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1859
                                   node_name, True)
1860

    
1861
  def _CheckDisksActivated(self, instance):
1862
    """Checks if the instance disks are activated.
1863

1864
    @param instance: The instance to check disks
1865
    @return: True if they are activated, False otherwise
1866

1867
    """
1868
    nodes = instance.all_nodes
1869

    
1870
    for idx, dev in enumerate(instance.disks):
1871
      for node in nodes:
1872
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1873
        self.cfg.SetDiskID(dev, node)
1874

    
1875
        result = _BlockdevFind(self, node, dev, instance)
1876

    
1877
        if result.offline:
1878
          continue
1879
        elif result.fail_msg or not result.payload:
1880
          return False
1881

    
1882
    return True
1883

    
1884
  def CheckPrereq(self):
1885
    """Check prerequisites.
1886

1887
    This checks that the instance is in the cluster.
1888

1889
    """
1890
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1891
    assert instance is not None, \
1892
      "Cannot retrieve locked instance %s" % self.instance_name
1893

    
1894
    if instance.disk_template != constants.DT_DRBD8:
1895
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1896
                                 " instances", errors.ECODE_INVAL)
1897

    
1898
    if len(instance.secondary_nodes) != 1:
1899
      raise errors.OpPrereqError("The instance has a strange layout,"
1900
                                 " expected one secondary but found %d" %
1901
                                 len(instance.secondary_nodes),
1902
                                 errors.ECODE_FAULT)
1903

    
1904
    instance = self.instance
1905
    secondary_node = instance.secondary_nodes[0]
1906

    
1907
    if self.iallocator_name is None:
1908
      remote_node = self.remote_node
1909
    else:
1910
      remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1911
                                       instance.name, instance.secondary_nodes)
1912

    
1913
    if remote_node is None:
1914
      self.remote_node_info = None
1915
    else:
1916
      assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1917
             "Remote node '%s' is not locked" % remote_node
1918

    
1919
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1920
      assert self.remote_node_info is not None, \
1921
        "Cannot retrieve locked node %s" % remote_node
1922

    
1923
    if remote_node == self.instance.primary_node:
1924
      raise errors.OpPrereqError("The specified node is the primary node of"
1925
                                 " the instance", errors.ECODE_INVAL)
1926

    
1927
    if remote_node == secondary_node:
1928
      raise errors.OpPrereqError("The specified node is already the"
1929
                                 " secondary node of the instance",
1930
                                 errors.ECODE_INVAL)
1931

    
1932
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1933
                                    constants.REPLACE_DISK_CHG):
1934
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1935
                                 errors.ECODE_INVAL)
1936

    
1937
    if self.mode == constants.REPLACE_DISK_AUTO:
1938
      if not self._CheckDisksActivated(instance):
1939
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1940
                                   " first" % self.instance_name,
1941
                                   errors.ECODE_STATE)
1942
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1943
      faulty_secondary = self._FindFaultyDisks(secondary_node)
1944

    
1945
      if faulty_primary and faulty_secondary:
1946
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1947
                                   " one node and can not be repaired"
1948
                                   " automatically" % self.instance_name,
1949
                                   errors.ECODE_STATE)
1950

    
1951
      if faulty_primary:
1952
        self.disks = faulty_primary
1953
        self.target_node = instance.primary_node
1954
        self.other_node = secondary_node
1955
        check_nodes = [self.target_node, self.other_node]
1956
      elif faulty_secondary:
1957
        self.disks = faulty_secondary
1958
        self.target_node = secondary_node
1959
        self.other_node = instance.primary_node
1960
        check_nodes = [self.target_node, self.other_node]
1961
      else:
1962
        self.disks = []
1963
        check_nodes = []
1964

    
1965
    else:
1966
      # Non-automatic modes
1967
      if self.mode == constants.REPLACE_DISK_PRI:
1968
        self.target_node = instance.primary_node
1969
        self.other_node = secondary_node
1970
        check_nodes = [self.target_node, self.other_node]
1971

    
1972
      elif self.mode == constants.REPLACE_DISK_SEC:
1973
        self.target_node = secondary_node
1974
        self.other_node = instance.primary_node
1975
        check_nodes = [self.target_node, self.other_node]
1976

    
1977
      elif self.mode == constants.REPLACE_DISK_CHG:
1978
        self.new_node = remote_node
1979
        self.other_node = instance.primary_node
1980
        self.target_node = secondary_node
1981
        check_nodes = [self.new_node, self.other_node]
1982

    
1983
        CheckNodeNotDrained(self.lu, remote_node)
1984
        CheckNodeVmCapable(self.lu, remote_node)
1985

    
1986
        old_node_info = self.cfg.GetNodeInfo(secondary_node)
1987
        assert old_node_info is not None
1988
        if old_node_info.offline and not self.early_release:
1989
          # doesn't make sense to delay the release
1990
          self.early_release = True
1991
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1992
                          " early-release mode", secondary_node)
1993

    
1994
      else:
1995
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1996
                                     self.mode)
1997

    
1998
      # If not specified all disks should be replaced
1999
      if not self.disks:
2000
        self.disks = range(len(self.instance.disks))
2001

    
2002
    # TODO: This is ugly, but right now we can't distinguish between internal
2003
    # submitted opcode and external one. We should fix that.
2004
    if self.remote_node_info:
2005
      # We change the node, lets verify it still meets instance policy
2006
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2007
      cluster = self.cfg.GetClusterInfo()
2008
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2009
                                                              new_group_info)
2010
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2011
                             self.cfg, ignore=self.ignore_ipolicy)
2012

    
2013
    for node in check_nodes:
2014
      CheckNodeOnline(self.lu, node)
2015

    
2016
    touched_nodes = frozenset(node_name for node_name in [self.new_node,
2017
                                                          self.other_node,
2018
                                                          self.target_node]
2019
                              if node_name is not None)
2020

    
2021
    # Release unneeded node and node resource locks
2022
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2023
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2024
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2025

    
2026
    # Release any owned node group
2027
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2028

    
2029
    # Check whether disks are valid
2030
    for disk_idx in self.disks:
2031
      instance.FindDisk(disk_idx)
2032

    
2033
    # Get secondary node IP addresses
2034
    self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2035
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2036

    
2037
  def Exec(self, feedback_fn):
2038
    """Execute disk replacement.
2039

2040
    This dispatches the disk replacement to the appropriate handler.
2041

2042
    """
2043
    if __debug__:
2044
      # Verify owned locks before starting operation
2045
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2046
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2047
          ("Incorrect node locks, owning %s, expected %s" %
2048
           (owned_nodes, self.node_secondary_ip.keys()))
2049
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2050
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2051
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2052

    
2053
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2054
      assert list(owned_instances) == [self.instance_name], \
2055
          "Instance '%s' not locked" % self.instance_name
2056

    
2057
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2058
          "Should not own any node group lock at this point"
2059

    
2060
    if not self.disks:
2061
      feedback_fn("No disks need replacement for instance '%s'" %
2062
                  self.instance.name)
2063
      return
2064

    
2065
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2066
                (utils.CommaJoin(self.disks), self.instance.name))
2067
    feedback_fn("Current primary node: %s" % self.instance.primary_node)
2068
    feedback_fn("Current seconary node: %s" %
2069
                utils.CommaJoin(self.instance.secondary_nodes))
2070

    
2071
    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2072

    
2073
    # Activate the instance disks if we're replacing them on a down instance
2074
    if activate_disks:
2075
      StartInstanceDisks(self.lu, self.instance, True)
2076

    
2077
    try:
2078
      # Should we replace the secondary node?
2079
      if self.new_node is not None:
2080
        fn = self._ExecDrbd8Secondary
2081
      else:
2082
        fn = self._ExecDrbd8DiskOnly
2083

    
2084
      result = fn(feedback_fn)
2085
    finally:
2086
      # Deactivate the instance disks if we're replacing them on a
2087
      # down instance
2088
      if activate_disks:
2089
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2090

    
2091
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2092

    
2093
    if __debug__:
2094
      # Verify owned locks
2095
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2096
      nodes = frozenset(self.node_secondary_ip)
2097
      assert ((self.early_release and not owned_nodes) or
2098
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2099
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2100
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2101

    
2102
    return result
2103

    
2104
  def _CheckVolumeGroup(self, nodes):
2105
    self.lu.LogInfo("Checking volume groups")
2106

    
2107
    vgname = self.cfg.GetVGName()
2108

    
2109
    # Make sure volume group exists on all involved nodes
2110
    results = self.rpc.call_vg_list(nodes)
2111
    if not results:
2112
      raise errors.OpExecError("Can't list volume groups on the nodes")
2113

    
2114
    for node in nodes:
2115
      res = results[node]
2116
      res.Raise("Error checking node %s" % node)
2117
      if vgname not in res.payload:
2118
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2119
                                 (vgname, node))
2120

    
2121
  def _CheckDisksExistence(self, nodes):
2122
    # Check disk existence
2123
    for idx, dev in enumerate(self.instance.disks):
2124
      if idx not in self.disks:
2125
        continue
2126

    
2127
      for node in nodes:
2128
        self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2129
        self.cfg.SetDiskID(dev, node)
2130

    
2131
        result = _BlockdevFind(self, node, dev, self.instance)
2132

    
2133
        msg = result.fail_msg
2134
        if msg or not result.payload:
2135
          if not msg:
2136
            msg = "disk not found"
2137
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2138
                                   (idx, node, msg))
2139

    
2140
  def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2141
    for idx, dev in enumerate(self.instance.disks):
2142
      if idx not in self.disks:
2143
        continue
2144

    
2145
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2146
                      (idx, node_name))
2147

    
2148
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2149
                                  on_primary, ldisk=ldisk):
2150
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2151
                                 " replace disks for instance %s" %
2152
                                 (node_name, self.instance.name))
2153

    
2154
  def _CreateNewStorage(self, node_name):
2155
    """Create new storage on the primary or secondary node.
2156

2157
    This is only used for same-node replaces, not for changing the
2158
    secondary node, hence we don't want to modify the existing disk.
2159

2160
    """
2161
    iv_names = {}
2162

    
2163
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2164
    for idx, dev in enumerate(disks):
2165
      if idx not in self.disks:
2166
        continue
2167

    
2168
      self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2169

    
2170
      self.cfg.SetDiskID(dev, node_name)
2171

    
2172
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2173
      names = _GenerateUniqueNames(self.lu, lv_names)
2174

    
2175
      (data_disk, meta_disk) = dev.children
2176
      vg_data = data_disk.logical_id[0]
2177
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2178
                             logical_id=(vg_data, names[0]),
2179
                             params=data_disk.params)
2180
      vg_meta = meta_disk.logical_id[0]
2181
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2182
                             size=constants.DRBD_META_SIZE,
2183
                             logical_id=(vg_meta, names[1]),
2184
                             params=meta_disk.params)
2185

    
2186
      new_lvs = [lv_data, lv_meta]
2187
      old_lvs = [child.Copy() for child in dev.children]
2188
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2189
      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2190

    
2191
      # we pass force_create=True to force the LVM creation
2192
      for new_lv in new_lvs:
2193
        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2194
                             GetInstanceInfoText(self.instance), False,
2195
                             excl_stor)
2196

    
2197
    return iv_names
2198

    
2199
  def _CheckDevices(self, node_name, iv_names):
2200
    for name, (dev, _, _) in iv_names.iteritems():
2201
      self.cfg.SetDiskID(dev, node_name)
2202

    
2203
      result = _BlockdevFind(self, node_name, dev, self.instance)
2204

    
2205
      msg = result.fail_msg
2206
      if msg or not result.payload:
2207
        if not msg:
2208
          msg = "disk not found"
2209
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2210
                                 (name, msg))
2211

    
2212
      if result.payload.is_degraded:
2213
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2214

    
2215
  def _RemoveOldStorage(self, node_name, iv_names):
2216
    for name, (_, old_lvs, _) in iv_names.iteritems():
2217
      self.lu.LogInfo("Remove logical volumes for %s", name)
2218

    
2219
      for lv in old_lvs:
2220
        self.cfg.SetDiskID(lv, node_name)
2221

    
2222
        msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2223
        if msg:
2224
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2225
                             hint="remove unused LVs manually")
2226

    
2227
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2228
    """Replace a disk on the primary or secondary for DRBD 8.
2229

2230
    The algorithm for replace is quite complicated:
2231

2232
      1. for each disk to be replaced:
2233

2234
        1. create new LVs on the target node with unique names
2235
        1. detach old LVs from the drbd device
2236
        1. rename old LVs to name_replaced.<time_t>
2237
        1. rename new LVs to old LVs
2238
        1. attach the new LVs (with the old names now) to the drbd device
2239

2240
      1. wait for sync across all devices
2241

2242
      1. for each modified disk:
2243

2244
        1. remove old LVs (which have the name name_replaces.<time_t>)
2245

2246
    Failures are not very well handled.
2247

2248
    """
2249
    steps_total = 6
2250

    
2251
    # Step: check device activation
2252
    self.lu.LogStep(1, steps_total, "Check device existence")
2253
    self._CheckDisksExistence([self.other_node, self.target_node])
2254
    self._CheckVolumeGroup([self.target_node, self.other_node])
2255

    
2256
    # Step: check other node consistency
2257
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2258
    self._CheckDisksConsistency(self.other_node,
2259
                                self.other_node == self.instance.primary_node,
2260
                                False)
2261

    
2262
    # Step: create new storage
2263
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2264
    iv_names = self._CreateNewStorage(self.target_node)
2265

    
2266
    # Step: for each lv, detach+rename*2+attach
2267
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2268
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2269
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2270

    
2271
      result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2272
                                                     old_lvs)
2273
      result.Raise("Can't detach drbd from local storage on node"
2274
                   " %s for device %s" % (self.target_node, dev.iv_name))
2275
      #dev.children = []
2276
      #cfg.Update(instance)
2277

    
2278
      # ok, we created the new LVs, so now we know we have the needed
2279
      # storage; as such, we proceed on the target node to rename
2280
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2281
      # using the assumption that logical_id == physical_id (which in
2282
      # turn is the unique_id on that node)
2283

    
2284
      # FIXME(iustin): use a better name for the replaced LVs
2285
      temp_suffix = int(time.time())
2286
      ren_fn = lambda d, suff: (d.physical_id[0],
2287
                                d.physical_id[1] + "_replaced-%s" % suff)
2288

    
2289
      # Build the rename list based on what LVs exist on the node
2290
      rename_old_to_new = []
2291
      for to_ren in old_lvs:
2292
        result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2293
        if not result.fail_msg and result.payload:
2294
          # device exists
2295
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2296

    
2297
      self.lu.LogInfo("Renaming the old LVs on the target node")
2298
      result = self.rpc.call_blockdev_rename(self.target_node,
2299
                                             rename_old_to_new)
2300
      result.Raise("Can't rename old LVs on node %s" % self.target_node)
2301

    
2302
      # Now we rename the new LVs to the old LVs
2303
      self.lu.LogInfo("Renaming the new LVs on the target node")
2304
      rename_new_to_old = [(new, old.physical_id)
2305
                           for old, new in zip(old_lvs, new_lvs)]
2306
      result = self.rpc.call_blockdev_rename(self.target_node,
2307
                                             rename_new_to_old)
2308
      result.Raise("Can't rename new LVs on node %s" % self.target_node)
2309

    
2310
      # Intermediate steps of in memory modifications
2311
      for old, new in zip(old_lvs, new_lvs):
2312
        new.logical_id = old.logical_id
2313
        self.cfg.SetDiskID(new, self.target_node)
2314

    
2315
      # We need to modify old_lvs so that removal later removes the
2316
      # right LVs, not the newly added ones; note that old_lvs is a
2317
      # copy here
2318
      for disk in old_lvs:
2319
        disk.logical_id = ren_fn(disk, temp_suffix)
2320
        self.cfg.SetDiskID(disk, self.target_node)
2321

    
2322
      # Now that the new lvs have the old name, we can add them to the device
2323
      self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2324
      result = self.rpc.call_blockdev_addchildren(self.target_node,
2325
                                                  (dev, self.instance), new_lvs)
2326
      msg = result.fail_msg
2327
      if msg:
2328
        for new_lv in new_lvs:
2329
          msg2 = self.rpc.call_blockdev_remove(self.target_node,
2330
                                               new_lv).fail_msg
2331
          if msg2:
2332
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2333
                               hint=("cleanup manually the unused logical"
2334
                                     "volumes"))
2335
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2336

    
2337
    cstep = itertools.count(5)
2338

    
2339
    if self.early_release:
2340
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2341
      self._RemoveOldStorage(self.target_node, iv_names)
2342
      # TODO: Check if releasing locks early still makes sense
2343
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2344
    else:
2345
      # Release all resource locks except those used by the instance
2346
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2347
                   keep=self.node_secondary_ip.keys())
2348

    
2349
    # Release all node locks while waiting for sync
2350
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2351

    
2352
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2353
    # shutdown in the caller into consideration.
2354

    
2355
    # Wait for sync
2356
    # This can fail as the old devices are degraded and _WaitForSync
2357
    # does a combined result over all disks, so we don't check its return value
2358
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2359
    WaitForSync(self.lu, self.instance)
2360

    
2361
    # Check all devices manually
2362
    self._CheckDevices(self.instance.primary_node, iv_names)
2363

    
2364
    # Step: remove old storage
2365
    if not self.early_release:
2366
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2367
      self._RemoveOldStorage(self.target_node, iv_names)
2368

    
2369
  def _ExecDrbd8Secondary(self, feedback_fn):
2370
    """Replace the secondary node for DRBD 8.
2371

2372
    The algorithm for replace is quite complicated:
2373
      - for all disks of the instance:
2374
        - create new LVs on the new node with same names
2375
        - shutdown the drbd device on the old secondary
2376
        - disconnect the drbd network on the primary
2377
        - create the drbd device on the new secondary
2378
        - network attach the drbd on the primary, using an artifice:
2379
          the drbd code for Attach() will connect to the network if it
2380
          finds a device which is connected to the good local disks but
2381
          not network enabled
2382
      - wait for sync across all devices
2383
      - remove all disks from the old secondary
2384

2385
    Failures are not very well handled.
2386

2387
    """
2388
    steps_total = 6
2389

    
2390
    pnode = self.instance.primary_node
2391

    
2392
    # Step: check device activation
2393
    self.lu.LogStep(1, steps_total, "Check device existence")
2394
    self._CheckDisksExistence([self.instance.primary_node])
2395
    self._CheckVolumeGroup([self.instance.primary_node])
2396

    
2397
    # Step: check other node consistency
2398
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2399
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2400

    
2401
    # Step: create new storage
2402
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2403
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2404
    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2405
    for idx, dev in enumerate(disks):
2406
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2407
                      (self.new_node, idx))
2408
      # we pass force_create=True to force LVM creation
2409
      for new_lv in dev.children:
2410
        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2411
                             True, GetInstanceInfoText(self.instance), False,
2412
                             excl_stor)
2413

    
2414
    # Step 4: dbrd minors and drbd setups changes
2415
    # after this, we must manually remove the drbd minors on both the
2416
    # error and the success paths
2417
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2418
    minors = self.cfg.AllocateDRBDMinor([self.new_node
2419
                                         for dev in self.instance.disks],
2420
                                        self.instance.name)
2421
    logging.debug("Allocated minors %r", minors)
2422

    
2423
    iv_names = {}
2424
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2425
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2426
                      (self.new_node, idx))
2427
      # create new devices on new_node; note that we create two IDs:
2428
      # one without port, so the drbd will be activated without
2429
      # networking information on the new node at this stage, and one
2430
      # with network, for the latter activation in step 4
2431
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2432
      if self.instance.primary_node == o_node1:
2433
        p_minor = o_minor1
2434
      else:
2435
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2436
        p_minor = o_minor2
2437

    
2438
      new_alone_id = (self.instance.primary_node, self.new_node, None,
2439
                      p_minor, new_minor, o_secret)
2440
      new_net_id = (self.instance.primary_node, self.new_node, o_port,
2441
                    p_minor, new_minor, o_secret)
2442

    
2443
      iv_names[idx] = (dev, dev.children, new_net_id)
2444
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2445
                    new_net_id)
2446
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2447
                              logical_id=new_alone_id,
2448
                              children=dev.children,
2449
                              size=dev.size,
2450
                              params={})
2451
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2452
                                            self.cfg)
2453
      try:
2454
        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2455
                             anno_new_drbd,
2456
                             GetInstanceInfoText(self.instance), False,
2457
                             excl_stor)
2458
      except errors.GenericError:
2459
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2460
        raise
2461

    
2462
    # We have new devices, shutdown the drbd on the old secondary
2463
    for idx, dev in enumerate(self.instance.disks):
2464
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2465
      self.cfg.SetDiskID(dev, self.target_node)
2466
      msg = self.rpc.call_blockdev_shutdown(self.target_node,
2467
                                            (dev, self.instance)).fail_msg
2468
      if msg:
2469
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2470
                           "node: %s" % (idx, msg),
2471
                           hint=("Please cleanup this device manually as"
2472
                                 " soon as possible"))
2473

    
2474
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2475
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2476
                                               self.instance.disks)[pnode]
2477

    
2478
    msg = result.fail_msg
2479
    if msg:
2480
      # detaches didn't succeed (unlikely)
2481
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2482
      raise errors.OpExecError("Can't detach the disks from the network on"
2483
                               " old node: %s" % (msg,))
2484

    
2485
    # if we managed to detach at least one, we update all the disks of
2486
    # the instance to point to the new secondary
2487
    self.lu.LogInfo("Updating instance configuration")
2488
    for dev, _, new_logical_id in iv_names.itervalues():
2489
      dev.logical_id = new_logical_id
2490
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2491

    
2492
    self.cfg.Update(self.instance, feedback_fn)
2493

    
2494
    # Release all node locks (the configuration has been updated)
2495
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2496

    
2497
    # and now perform the drbd attach
2498
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2499
                    " (standalone => connected)")
2500
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2501
                                            self.new_node],
2502
                                           self.node_secondary_ip,
2503
                                           (self.instance.disks, self.instance),
2504
                                           self.instance.name,
2505
                                           False)
2506
    for to_node, to_result in result.items():
2507
      msg = to_result.fail_msg
2508
      if msg:
2509
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2510
                           to_node, msg,
2511
                           hint=("please do a gnt-instance info to see the"
2512
                                 " status of disks"))
2513

    
2514
    cstep = itertools.count(5)
2515

    
2516
    if self.early_release:
2517
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2518
      self._RemoveOldStorage(self.target_node, iv_names)
2519
      # TODO: Check if releasing locks early still makes sense
2520
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2521
    else:
2522
      # Release all resource locks except those used by the instance
2523
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2524
                   keep=self.node_secondary_ip.keys())
2525

    
2526
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2527
    # shutdown in the caller into consideration.
2528

    
2529
    # Wait for sync
2530
    # This can fail as the old devices are degraded and _WaitForSync
2531
    # does a combined result over all disks, so we don't check its return value
2532
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2533
    WaitForSync(self.lu, self.instance)
2534

    
2535
    # Check all devices manually
2536
    self._CheckDevices(self.instance.primary_node, iv_names)
2537

    
2538
    # Step: remove old storage
2539
    if not self.early_release:
2540
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2541
      self._RemoveOldStorage(self.target_node, iv_names)