Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ d66acf3d

History | View | Annotate | Download (98.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
59
                         excl_stor):
60
  """Create a single block device on a given node.
61

62
  This will not recurse over children of the device, so they must be
63
  created in advance.
64

65
  @param lu: the lu on whose behalf we execute
66
  @param node_uuid: the node on which to create the device
67
  @type instance: L{objects.Instance}
68
  @param instance: the instance which owns the device
69
  @type device: L{objects.Disk}
70
  @param device: the device to create
71
  @param info: the extra 'metadata' we should attach to the device
72
      (this will be represented as a LVM tag)
73
  @type force_open: boolean
74
  @param force_open: this parameter will be passes to the
75
      L{backend.BlockdevCreate} function where it specifies
76
      whether we run on primary or not, and it affects both
77
      the child assembly and the device own Open() execution
78
  @type excl_stor: boolean
79
  @param excl_stor: Whether exclusive_storage is active for the node
80

81
  """
82
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
83
                                       device.size, instance.name, force_open,
84
                                       info, excl_stor)
85
  result.Raise("Can't create block device %s on"
86
               " node %s for instance %s" % (device,
87
                                             lu.cfg.GetNodeName(node_uuid),
88
                                             instance.name))
89

    
90

    
91
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
92
                         info, force_open, excl_stor):
93
  """Create a tree of block devices on a given node.
94

95
  If this device type has to be created on secondaries, create it and
96
  all its children.
97

98
  If not, just recurse to children keeping the same 'force' value.
99

100
  @attention: The device has to be annotated already.
101

102
  @param lu: the lu on whose behalf we execute
103
  @param node_uuid: the node on which to create the device
104
  @type instance: L{objects.Instance}
105
  @param instance: the instance which owns the device
106
  @type device: L{objects.Disk}
107
  @param device: the device to create
108
  @type force_create: boolean
109
  @param force_create: whether to force creation of this device; this
110
      will be change to True whenever we find a device which has
111
      CreateOnSecondary() attribute
112
  @param info: the extra 'metadata' we should attach to the device
113
      (this will be represented as a LVM tag)
114
  @type force_open: boolean
115
  @param force_open: this parameter will be passes to the
116
      L{backend.BlockdevCreate} function where it specifies
117
      whether we run on primary or not, and it affects both
118
      the child assembly and the device own Open() execution
119
  @type excl_stor: boolean
120
  @param excl_stor: Whether exclusive_storage is active for the node
121

122
  @return: list of created devices
123
  """
124
  created_devices = []
125
  try:
126
    if device.CreateOnSecondary():
127
      force_create = True
128

    
129
    if device.children:
130
      for child in device.children:
131
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
132
                                    force_create, info, force_open, excl_stor)
133
        created_devices.extend(devs)
134

    
135
    if not force_create:
136
      return created_devices
137

    
138
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
139
                         excl_stor)
140
    # The device has been completely created, so there is no point in keeping
141
    # its subdevices in the list. We just add the device itself instead.
142
    created_devices = [(node_uuid, device)]
143
    return created_devices
144

    
145
  except errors.DeviceCreationError, e:
146
    e.created_devices.extend(created_devices)
147
    raise e
148
  except errors.OpExecError, e:
149
    raise errors.DeviceCreationError(str(e), created_devices)
150

    
151

    
152
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
153
  """Whether exclusive_storage is in effect for the given node.
154

155
  @type cfg: L{config.ConfigWriter}
156
  @param cfg: The cluster configuration
157
  @type node_uuid: string
158
  @param node_uuid: The node UUID
159
  @rtype: bool
160
  @return: The effective value of exclusive_storage
161
  @raise errors.OpPrereqError: if no node exists with the given name
162

163
  """
164
  ni = cfg.GetNodeInfo(node_uuid)
165
  if ni is None:
166
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
167
                               errors.ECODE_NOENT)
168
  return IsExclusiveStorageEnabledNode(cfg, ni)
169

    
170

    
171
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
172
                    force_open):
173
  """Wrapper around L{_CreateBlockDevInner}.
174

175
  This method annotates the root device first.
176

177
  """
178
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
179
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
180
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
181
                              force_open, excl_stor)
182

    
183

    
184
def _UndoCreateDisks(lu, disks_created, instance):
185
  """Undo the work performed by L{CreateDisks}.
186

187
  This function is called in case of an error to undo the work of
188
  L{CreateDisks}.
189

190
  @type lu: L{LogicalUnit}
191
  @param lu: the logical unit on whose behalf we execute
192
  @param disks_created: the result returned by L{CreateDisks}
193
  @type instance: L{objects.Instance}
194
  @param instance: the instance for which disks were created
195

196
  """
197
  for (node_uuid, disk) in disks_created:
198
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
199
    result.Warn("Failed to remove newly-created disk %s on node %s" %
200
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
201

    
202

    
203
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
204
  """Create all disks for an instance.
205

206
  This abstracts away some work from AddInstance.
207

208
  @type lu: L{LogicalUnit}
209
  @param lu: the logical unit on whose behalf we execute
210
  @type instance: L{objects.Instance}
211
  @param instance: the instance whose disks we should create
212
  @type to_skip: list
213
  @param to_skip: list of indices to skip
214
  @type target_node_uuid: string
215
  @param target_node_uuid: if passed, overrides the target node for creation
216
  @type disks: list of {objects.Disk}
217
  @param disks: the disks to create; if not specified, all the disks of the
218
      instance are created
219
  @return: information about the created disks, to be used to call
220
      L{_UndoCreateDisks}
221
  @raise errors.OpPrereqError: in case of error
222

223
  """
224
  info = GetInstanceInfoText(instance)
225
  if target_node_uuid is None:
226
    pnode_uuid = instance.primary_node
227
    all_node_uuids = instance.all_nodes
228
  else:
229
    pnode_uuid = target_node_uuid
230
    all_node_uuids = [pnode_uuid]
231

    
232
  if disks is None:
233
    disks = instance.disks
234

    
235
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
236

    
237
  if instance.disk_template in constants.DTS_FILEBASED:
238
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
239
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
240

    
241
    result.Raise("Failed to create directory '%s' on"
242
                 " node %s" % (file_storage_dir,
243
                               lu.cfg.GetNodeName(pnode_uuid)))
244

    
245
  disks_created = []
246
  for idx, device in enumerate(disks):
247
    if to_skip and idx in to_skip:
248
      continue
249
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
250
    for node_uuid in all_node_uuids:
251
      f_create = node_uuid == pnode_uuid
252
      try:
253
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
254
                        f_create)
255
        disks_created.append((node_uuid, device))
256
      except errors.DeviceCreationError, e:
257
        logging.warning("Creating disk %s for instance '%s' failed",
258
                        idx, instance.name)
259
        disks_created.extend(e.created_devices)
260
        _UndoCreateDisks(lu, disks_created, instance)
261
        raise errors.OpExecError(e.message)
262
  return disks_created
263

    
264

    
265
def ComputeDiskSizePerVG(disk_template, disks):
266
  """Compute disk size requirements in the volume group
267

268
  """
269
  def _compute(disks, payload):
270
    """Universal algorithm.
271

272
    """
273
    vgs = {}
274
    for disk in disks:
275
      vgs[disk[constants.IDISK_VG]] = \
276
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
277

    
278
    return vgs
279

    
280
  # Required free disk space as a function of disk and swap space
281
  req_size_dict = {
282
    constants.DT_DISKLESS: {},
283
    constants.DT_PLAIN: _compute(disks, 0),
284
    # 128 MB are added for drbd metadata for each disk
285
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
286
    constants.DT_FILE: {},
287
    constants.DT_SHARED_FILE: {},
288
    }
289

    
290
  if disk_template not in req_size_dict:
291
    raise errors.ProgrammerError("Disk template '%s' size requirement"
292
                                 " is unknown" % disk_template)
293

    
294
  return req_size_dict[disk_template]
295

    
296

    
297
def ComputeDisks(op, default_vg):
298
  """Computes the instance disks.
299

300
  @param op: The instance opcode
301
  @param default_vg: The default_vg to assume
302

303
  @return: The computed disks
304

305
  """
306
  disks = []
307
  for disk in op.disks:
308
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
309
    if mode not in constants.DISK_ACCESS_SET:
310
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
311
                                 mode, errors.ECODE_INVAL)
312
    size = disk.get(constants.IDISK_SIZE, None)
313
    if size is None:
314
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
315
    try:
316
      size = int(size)
317
    except (TypeError, ValueError):
318
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
319
                                 errors.ECODE_INVAL)
320

    
321
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
322
    if ext_provider and op.disk_template != constants.DT_EXT:
323
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
324
                                 " disk template, not %s" %
325
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
326
                                  op.disk_template), errors.ECODE_INVAL)
327

    
328
    data_vg = disk.get(constants.IDISK_VG, default_vg)
329
    name = disk.get(constants.IDISK_NAME, None)
330
    if name is not None and name.lower() == constants.VALUE_NONE:
331
      name = None
332
    new_disk = {
333
      constants.IDISK_SIZE: size,
334
      constants.IDISK_MODE: mode,
335
      constants.IDISK_VG: data_vg,
336
      constants.IDISK_NAME: name,
337
      }
338

    
339
    for key in [
340
      constants.IDISK_METAVG,
341
      constants.IDISK_ADOPT,
342
      constants.IDISK_SPINDLES,
343
      ]:
344
      if key in disk:
345
        new_disk[key] = disk[key]
346

    
347
    # For extstorage, demand the `provider' option and add any
348
    # additional parameters (ext-params) to the dict
349
    if op.disk_template == constants.DT_EXT:
350
      if ext_provider:
351
        new_disk[constants.IDISK_PROVIDER] = ext_provider
352
        for key in disk:
353
          if key not in constants.IDISK_PARAMS:
354
            new_disk[key] = disk[key]
355
      else:
356
        raise errors.OpPrereqError("Missing provider for template '%s'" %
357
                                   constants.DT_EXT, errors.ECODE_INVAL)
358

    
359
    disks.append(new_disk)
360

    
361
  return disks
362

    
363

    
364
def CheckRADOSFreeSpace():
365
  """Compute disk size requirements inside the RADOS cluster.
366

367
  """
368
  # For the RADOS cluster we assume there is always enough space.
369
  pass
370

    
371

    
372
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
373
                         iv_name, p_minor, s_minor):
374
  """Generate a drbd8 device complete with its children.
375

376
  """
377
  assert len(vgnames) == len(names) == 2
378
  port = lu.cfg.AllocatePort()
379
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
380

    
381
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
382
                          logical_id=(vgnames[0], names[0]),
383
                          params={})
384
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
385
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
386
                          size=constants.DRBD_META_SIZE,
387
                          logical_id=(vgnames[1], names[1]),
388
                          params={})
389
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
390
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
391
                          logical_id=(primary_uuid, secondary_uuid, port,
392
                                      p_minor, s_minor,
393
                                      shared_secret),
394
                          children=[dev_data, dev_meta],
395
                          iv_name=iv_name, params={})
396
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397
  return drbd_dev
398

    
399

    
400
def GenerateDiskTemplate(
401
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
402
  disk_info, file_storage_dir, file_driver, base_index,
403
  feedback_fn, full_disk_params):
404
  """Generate the entire disk layout for a given template type.
405

406
  """
407
  vgname = lu.cfg.GetVGName()
408
  disk_count = len(disk_info)
409
  disks = []
410

    
411
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
412

    
413
  if template_name == constants.DT_DISKLESS:
414
    pass
415
  elif template_name == constants.DT_DRBD8:
416
    if len(secondary_node_uuids) != 1:
417
      raise errors.ProgrammerError("Wrong template configuration")
418
    remote_node_uuid = secondary_node_uuids[0]
419
    minors = lu.cfg.AllocateDRBDMinor(
420
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
421

    
422
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
423
                                                       full_disk_params)
424
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
425

    
426
    names = []
427
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
428
                                               for i in range(disk_count)]):
429
      names.append(lv_prefix + "_data")
430
      names.append(lv_prefix + "_meta")
431
    for idx, disk in enumerate(disk_info):
432
      disk_index = idx + base_index
433
      data_vg = disk.get(constants.IDISK_VG, vgname)
434
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
435
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
436
                                      disk[constants.IDISK_SIZE],
437
                                      [data_vg, meta_vg],
438
                                      names[idx * 2:idx * 2 + 2],
439
                                      "disk/%d" % disk_index,
440
                                      minors[idx * 2], minors[idx * 2 + 1])
441
      disk_dev.mode = disk[constants.IDISK_MODE]
442
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
443
      disks.append(disk_dev)
444
  else:
445
    if secondary_node_uuids:
446
      raise errors.ProgrammerError("Wrong template configuration")
447

    
448
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
449
    if name_prefix is None:
450
      names = None
451
    else:
452
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
453
                                        (name_prefix, base_index + i)
454
                                        for i in range(disk_count)])
455

    
456
    if template_name == constants.DT_PLAIN:
457

    
458
      def logical_id_fn(idx, _, disk):
459
        vg = disk.get(constants.IDISK_VG, vgname)
460
        return (vg, names[idx])
461

    
462
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
463
      logical_id_fn = \
464
        lambda _, disk_index, disk: (file_driver,
465
                                     "%s/disk%d" % (file_storage_dir,
466
                                                    disk_index))
467
    elif template_name == constants.DT_BLOCK:
468
      logical_id_fn = \
469
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
470
                                       disk[constants.IDISK_ADOPT])
471
    elif template_name == constants.DT_RBD:
472
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
473
    elif template_name == constants.DT_EXT:
474
      def logical_id_fn(idx, _, disk):
475
        provider = disk.get(constants.IDISK_PROVIDER, None)
476
        if provider is None:
477
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
478
                                       " not found", constants.DT_EXT,
479
                                       constants.IDISK_PROVIDER)
480
        return (provider, names[idx])
481
    else:
482
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
483

    
484
    dev_type = template_name
485

    
486
    for idx, disk in enumerate(disk_info):
487
      params = {}
488
      # Only for the Ext template add disk_info to params
489
      if template_name == constants.DT_EXT:
490
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
491
        for key in disk:
492
          if key not in constants.IDISK_PARAMS:
493
            params[key] = disk[key]
494
      disk_index = idx + base_index
495
      size = disk[constants.IDISK_SIZE]
496
      feedback_fn("* disk %s, size %s" %
497
                  (disk_index, utils.FormatUnit(size, "h")))
498
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
499
                              logical_id=logical_id_fn(idx, disk_index, disk),
500
                              iv_name="disk/%d" % disk_index,
501
                              mode=disk[constants.IDISK_MODE],
502
                              params=params,
503
                              spindles=disk.get(constants.IDISK_SPINDLES))
504
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
505
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
506
      disks.append(disk_dev)
507

    
508
  return disks
509

    
510

    
511
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
512
  """Check the presence of the spindle options with exclusive_storage.
513

514
  @type diskdict: dict
515
  @param diskdict: disk parameters
516
  @type es_flag: bool
517
  @param es_flag: the effective value of the exlusive_storage flag
518
  @type required: bool
519
  @param required: whether spindles are required or just optional
520
  @raise errors.OpPrereqError when spindles are given and they should not
521

522
  """
523
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
524
      diskdict[constants.IDISK_SPINDLES] is not None):
525
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
526
                               " when exclusive storage is not active",
527
                               errors.ECODE_INVAL)
528
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
529
                                diskdict[constants.IDISK_SPINDLES] is None)):
530
    raise errors.OpPrereqError("You must specify spindles in instance disks"
531
                               " when exclusive storage is active",
532
                               errors.ECODE_INVAL)
533

    
534

    
535
class LUInstanceRecreateDisks(LogicalUnit):
536
  """Recreate an instance's missing disks.
537

538
  """
539
  HPATH = "instance-recreate-disks"
540
  HTYPE = constants.HTYPE_INSTANCE
541
  REQ_BGL = False
542

    
543
  _MODIFYABLE = compat.UniqueFrozenset([
544
    constants.IDISK_SIZE,
545
    constants.IDISK_MODE,
546
    constants.IDISK_SPINDLES,
547
    ])
548

    
549
  # New or changed disk parameters may have different semantics
550
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
551
    constants.IDISK_ADOPT,
552

    
553
    # TODO: Implement support changing VG while recreating
554
    constants.IDISK_VG,
555
    constants.IDISK_METAVG,
556
    constants.IDISK_PROVIDER,
557
    constants.IDISK_NAME,
558
    ]))
559

    
560
  def _RunAllocator(self):
561
    """Run the allocator based on input opcode.
562

563
    """
564
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
565

    
566
    # FIXME
567
    # The allocator should actually run in "relocate" mode, but current
568
    # allocators don't support relocating all the nodes of an instance at
569
    # the same time. As a workaround we use "allocate" mode, but this is
570
    # suboptimal for two reasons:
571
    # - The instance name passed to the allocator is present in the list of
572
    #   existing instances, so there could be a conflict within the
573
    #   internal structures of the allocator. This doesn't happen with the
574
    #   current allocators, but it's a liability.
575
    # - The allocator counts the resources used by the instance twice: once
576
    #   because the instance exists already, and once because it tries to
577
    #   allocate a new instance.
578
    # The allocator could choose some of the nodes on which the instance is
579
    # running, but that's not a problem. If the instance nodes are broken,
580
    # they should be already be marked as drained or offline, and hence
581
    # skipped by the allocator. If instance disks have been lost for other
582
    # reasons, then recreating the disks on the same nodes should be fine.
583
    disk_template = self.instance.disk_template
584
    spindle_use = be_full[constants.BE_SPINDLE_USE]
585
    disks = [{
586
      constants.IDISK_SIZE: d.size,
587
      constants.IDISK_MODE: d.mode,
588
      constants.IDISK_SPINDLES: d.spindles,
589
      } for d in self.instance.disks]
590
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
591
                                        disk_template=disk_template,
592
                                        tags=list(self.instance.GetTags()),
593
                                        os=self.instance.os,
594
                                        nics=[{}],
595
                                        vcpus=be_full[constants.BE_VCPUS],
596
                                        memory=be_full[constants.BE_MAXMEM],
597
                                        spindle_use=spindle_use,
598
                                        disks=disks,
599
                                        hypervisor=self.instance.hypervisor,
600
                                        node_whitelist=None)
601
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
602

    
603
    ial.Run(self.op.iallocator)
604

    
605
    assert req.RequiredNodes() == len(self.instance.all_nodes)
606

    
607
    if not ial.success:
608
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
609
                                 " %s" % (self.op.iallocator, ial.info),
610
                                 errors.ECODE_NORES)
611

    
612
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
613
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
614
                 self.op.instance_name, self.op.iallocator,
615
                 utils.CommaJoin(self.op.nodes))
616

    
617
  def CheckArguments(self):
618
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
619
      # Normalize and convert deprecated list of disk indices
620
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
621

    
622
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
623
    if duplicates:
624
      raise errors.OpPrereqError("Some disks have been specified more than"
625
                                 " once: %s" % utils.CommaJoin(duplicates),
626
                                 errors.ECODE_INVAL)
627

    
628
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
629
    # when neither iallocator nor nodes are specified
630
    if self.op.iallocator or self.op.nodes:
631
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
632

    
633
    for (idx, params) in self.op.disks:
634
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
635
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
636
      if unsupported:
637
        raise errors.OpPrereqError("Parameters for disk %s try to change"
638
                                   " unmodifyable parameter(s): %s" %
639
                                   (idx, utils.CommaJoin(unsupported)),
640
                                   errors.ECODE_INVAL)
641

    
642
  def ExpandNames(self):
643
    self._ExpandAndLockInstance()
644
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
645

    
646
    if self.op.nodes:
647
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
648
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
649
    else:
650
      self.needed_locks[locking.LEVEL_NODE] = []
651
      if self.op.iallocator:
652
        # iallocator will select a new node in the same group
653
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
654
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
655

    
656
    self.needed_locks[locking.LEVEL_NODE_RES] = []
657

    
658
  def DeclareLocks(self, level):
659
    if level == locking.LEVEL_NODEGROUP:
660
      assert self.op.iallocator is not None
661
      assert not self.op.nodes
662
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
663
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
664
      # Lock the primary group used by the instance optimistically; this
665
      # requires going via the node before it's locked, requiring
666
      # verification later on
667
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
668
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
669

    
670
    elif level == locking.LEVEL_NODE:
671
      # If an allocator is used, then we lock all the nodes in the current
672
      # instance group, as we don't know yet which ones will be selected;
673
      # if we replace the nodes without using an allocator, locks are
674
      # already declared in ExpandNames; otherwise, we need to lock all the
675
      # instance nodes for disk re-creation
676
      if self.op.iallocator:
677
        assert not self.op.nodes
678
        assert not self.needed_locks[locking.LEVEL_NODE]
679
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
680

    
681
        # Lock member nodes of the group of the primary node
682
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
683
          self.needed_locks[locking.LEVEL_NODE].extend(
684
            self.cfg.GetNodeGroup(group_uuid).members)
685

    
686
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
687
      elif not self.op.nodes:
688
        self._LockInstancesNodes(primary_only=False)
689
    elif level == locking.LEVEL_NODE_RES:
690
      # Copy node locks
691
      self.needed_locks[locking.LEVEL_NODE_RES] = \
692
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
693

    
694
  def BuildHooksEnv(self):
695
    """Build hooks env.
696

697
    This runs on master, primary and secondary nodes of the instance.
698

699
    """
700
    return BuildInstanceHookEnvByObject(self, self.instance)
701

    
702
  def BuildHooksNodes(self):
703
    """Build hooks nodes.
704

705
    """
706
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
707
    return (nl, nl)
708

    
709
  def CheckPrereq(self):
710
    """Check prerequisites.
711

712
    This checks that the instance is in the cluster and is not running.
713

714
    """
715
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
716
    assert instance is not None, \
717
      "Cannot retrieve locked instance %s" % self.op.instance_name
718
    if self.op.node_uuids:
719
      if len(self.op.node_uuids) != len(instance.all_nodes):
720
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
721
                                   " %d replacement nodes were specified" %
722
                                   (instance.name, len(instance.all_nodes),
723
                                    len(self.op.node_uuids)),
724
                                   errors.ECODE_INVAL)
725
      assert instance.disk_template != constants.DT_DRBD8 or \
726
             len(self.op.node_uuids) == 2
727
      assert instance.disk_template != constants.DT_PLAIN or \
728
             len(self.op.node_uuids) == 1
729
      primary_node = self.op.node_uuids[0]
730
    else:
731
      primary_node = instance.primary_node
732
    if not self.op.iallocator:
733
      CheckNodeOnline(self, primary_node)
734

    
735
    if instance.disk_template == constants.DT_DISKLESS:
736
      raise errors.OpPrereqError("Instance '%s' has no disks" %
737
                                 self.op.instance_name, errors.ECODE_INVAL)
738

    
739
    # Verify if node group locks are still correct
740
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
741
    if owned_groups:
742
      # Node group locks are acquired only for the primary node (and only
743
      # when the allocator is used)
744
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
745
                              primary_only=True)
746

    
747
    # if we replace nodes *and* the old primary is offline, we don't
748
    # check the instance state
749
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
750
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
751
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
752
                         msg="cannot recreate disks")
753

    
754
    if self.op.disks:
755
      self.disks = dict(self.op.disks)
756
    else:
757
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
758

    
759
    maxidx = max(self.disks.keys())
760
    if maxidx >= len(instance.disks):
761
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
762
                                 errors.ECODE_INVAL)
763

    
764
    if ((self.op.node_uuids or self.op.iallocator) and
765
         sorted(self.disks.keys()) != range(len(instance.disks))):
766
      raise errors.OpPrereqError("Can't recreate disks partially and"
767
                                 " change the nodes at the same time",
768
                                 errors.ECODE_INVAL)
769

    
770
    self.instance = instance
771

    
772
    if self.op.iallocator:
773
      self._RunAllocator()
774
      # Release unneeded node and node resource locks
775
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
776
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
777
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
778

    
779
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
780

    
781
    if self.op.node_uuids:
782
      node_uuids = self.op.node_uuids
783
    else:
784
      node_uuids = instance.all_nodes
785
    excl_stor = compat.any(
786
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
787
      )
788
    for new_params in self.disks.values():
789
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
790

    
791
  def Exec(self, feedback_fn):
792
    """Recreate the disks.
793

794
    """
795
    assert (self.owned_locks(locking.LEVEL_NODE) ==
796
            self.owned_locks(locking.LEVEL_NODE_RES))
797

    
798
    to_skip = []
799
    mods = [] # keeps track of needed changes
800

    
801
    for idx, disk in enumerate(self.instance.disks):
802
      try:
803
        changes = self.disks[idx]
804
      except KeyError:
805
        # Disk should not be recreated
806
        to_skip.append(idx)
807
        continue
808

    
809
      # update secondaries for disks, if needed
810
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
811
        # need to update the nodes and minors
812
        assert len(self.op.node_uuids) == 2
813
        assert len(disk.logical_id) == 6 # otherwise disk internals
814
                                         # have changed
815
        (_, _, old_port, _, _, old_secret) = disk.logical_id
816
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
817
                                                self.instance.uuid)
818
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
819
                  new_minors[0], new_minors[1], old_secret)
820
        assert len(disk.logical_id) == len(new_id)
821
      else:
822
        new_id = None
823

    
824
      mods.append((idx, new_id, changes))
825

    
826
    # now that we have passed all asserts above, we can apply the mods
827
    # in a single run (to avoid partial changes)
828
    for idx, new_id, changes in mods:
829
      disk = self.instance.disks[idx]
830
      if new_id is not None:
831
        assert disk.dev_type == constants.DT_DRBD8
832
        disk.logical_id = new_id
833
      if changes:
834
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
835
                    mode=changes.get(constants.IDISK_MODE, None),
836
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
837

    
838
    # change primary node, if needed
839
    if self.op.node_uuids:
840
      self.instance.primary_node = self.op.node_uuids[0]
841
      self.LogWarning("Changing the instance's nodes, you will have to"
842
                      " remove any disks left on the older nodes manually")
843

    
844
    if self.op.node_uuids:
845
      self.cfg.Update(self.instance, feedback_fn)
846

    
847
    # All touched nodes must be locked
848
    mylocks = self.owned_locks(locking.LEVEL_NODE)
849
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
850
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
851

    
852
    # TODO: Release node locks before wiping, or explain why it's not possible
853
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
854
      wipedisks = [(idx, disk, 0)
855
                   for (idx, disk) in enumerate(self.instance.disks)
856
                   if idx not in to_skip]
857
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
858
                         cleanup=new_disks)
859

    
860

    
861
def _PerformNodeInfoCall(lu, node_uuids, vg):
862
  """Prepares the input and performs a node info call.
863

864
  @type lu: C{LogicalUnit}
865
  @param lu: a logical unit from which we get configuration data
866
  @type node_uuids: list of string
867
  @param node_uuids: list of node UUIDs to perform the call for
868
  @type vg: string
869
  @param vg: the volume group's name
870

871
  """
872
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
873
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
874
                                                  node_uuids)
875
  hvname = lu.cfg.GetHypervisorType()
876
  hvparams = lu.cfg.GetClusterInfo().hvparams
877
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
878
                                   [(hvname, hvparams[hvname])])
879
  return nodeinfo
880

    
881

    
882
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
883
  """Checks the vg capacity for a given node.
884

885
  @type node_info: tuple (_, list of dicts, _)
886
  @param node_info: the result of the node info call for one node
887
  @type node_name: string
888
  @param node_name: the name of the node
889
  @type vg: string
890
  @param vg: volume group name
891
  @type requested: int
892
  @param requested: the amount of disk in MiB to check for
893
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
894
      or we cannot check the node
895

896
  """
897
  (_, space_info, _) = node_info
898
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
899
      space_info, constants.ST_LVM_VG)
900
  if not lvm_vg_info:
901
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
902
  vg_free = lvm_vg_info.get("storage_free", None)
903
  if not isinstance(vg_free, int):
904
    raise errors.OpPrereqError("Can't compute free disk space on node"
905
                               " %s for vg %s, result was '%s'" %
906
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
907
  if requested > vg_free:
908
    raise errors.OpPrereqError("Not enough disk space on target node %s"
909
                               " vg %s: required %d MiB, available %d MiB" %
910
                               (node_name, vg, requested, vg_free),
911
                               errors.ECODE_NORES)
912

    
913

    
914
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
915
  """Checks if nodes have enough free disk space in the specified VG.
916

917
  This function checks if all given nodes have the needed amount of
918
  free disk. In case any node has less disk or we cannot get the
919
  information from the node, this function raises an OpPrereqError
920
  exception.
921

922
  @type lu: C{LogicalUnit}
923
  @param lu: a logical unit from which we get configuration data
924
  @type node_uuids: C{list}
925
  @param node_uuids: the list of node UUIDs to check
926
  @type vg: C{str}
927
  @param vg: the volume group to check
928
  @type requested: C{int}
929
  @param requested: the amount of disk in MiB to check for
930
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
931
      or we cannot check the node
932

933
  """
934
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
935
  for node_uuid in node_uuids:
936
    node_name = lu.cfg.GetNodeName(node_uuid)
937
    info = nodeinfo[node_uuid]
938
    info.Raise("Cannot get current information from node %s" % node_name,
939
               prereq=True, ecode=errors.ECODE_ENVIRON)
940
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
941

    
942

    
943
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
944
  """Checks if nodes have enough free disk space in all the VGs.
945

946
  This function checks if all given nodes have the needed amount of
947
  free disk. In case any node has less disk or we cannot get the
948
  information from the node, this function raises an OpPrereqError
949
  exception.
950

951
  @type lu: C{LogicalUnit}
952
  @param lu: a logical unit from which we get configuration data
953
  @type node_uuids: C{list}
954
  @param node_uuids: the list of node UUIDs to check
955
  @type req_sizes: C{dict}
956
  @param req_sizes: the hash of vg and corresponding amount of disk in
957
      MiB to check for
958
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
959
      or we cannot check the node
960

961
  """
962
  for vg, req_size in req_sizes.items():
963
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
964

    
965

    
966
def _DiskSizeInBytesToMebibytes(lu, size):
967
  """Converts a disk size in bytes to mebibytes.
968

969
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
970

971
  """
972
  (mib, remainder) = divmod(size, 1024 * 1024)
973

    
974
  if remainder != 0:
975
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
976
                  " to not overwrite existing data (%s bytes will not be"
977
                  " wiped)", (1024 * 1024) - remainder)
978
    mib += 1
979

    
980
  return mib
981

    
982

    
983
def _CalcEta(time_taken, written, total_size):
984
  """Calculates the ETA based on size written and total size.
985

986
  @param time_taken: The time taken so far
987
  @param written: amount written so far
988
  @param total_size: The total size of data to be written
989
  @return: The remaining time in seconds
990

991
  """
992
  avg_time = time_taken / float(written)
993
  return (total_size - written) * avg_time
994

    
995

    
996
def WipeDisks(lu, instance, disks=None):
997
  """Wipes instance disks.
998

999
  @type lu: L{LogicalUnit}
1000
  @param lu: the logical unit on whose behalf we execute
1001
  @type instance: L{objects.Instance}
1002
  @param instance: the instance whose disks we should create
1003
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1004
  @param disks: Disk details; tuple contains disk index, disk object and the
1005
    start offset
1006

1007
  """
1008
  node_uuid = instance.primary_node
1009
  node_name = lu.cfg.GetNodeName(node_uuid)
1010

    
1011
  if disks is None:
1012
    disks = [(idx, disk, 0)
1013
             for (idx, disk) in enumerate(instance.disks)]
1014

    
1015
  logging.info("Pausing synchronization of disks of instance '%s'",
1016
               instance.name)
1017
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1018
                                                  (map(compat.snd, disks),
1019
                                                   instance),
1020
                                                  True)
1021
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1022

    
1023
  for idx, success in enumerate(result.payload):
1024
    if not success:
1025
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1026
                   " failed", idx, instance.name)
1027

    
1028
  try:
1029
    for (idx, device, offset) in disks:
1030
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1031
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1032
      wipe_chunk_size = \
1033
        int(min(constants.MAX_WIPE_CHUNK,
1034
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1035

    
1036
      size = device.size
1037
      last_output = 0
1038
      start_time = time.time()
1039

    
1040
      if offset == 0:
1041
        info_text = ""
1042
      else:
1043
        info_text = (" (from %s to %s)" %
1044
                     (utils.FormatUnit(offset, "h"),
1045
                      utils.FormatUnit(size, "h")))
1046

    
1047
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1048

    
1049
      logging.info("Wiping disk %d for instance %s on node %s using"
1050
                   " chunk size %s", idx, instance.name, node_name,
1051
                   wipe_chunk_size)
1052

    
1053
      while offset < size:
1054
        wipe_size = min(wipe_chunk_size, size - offset)
1055

    
1056
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1057
                      idx, offset, wipe_size)
1058

    
1059
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1060
                                           offset, wipe_size)
1061
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1062
                     (idx, offset, wipe_size))
1063

    
1064
        now = time.time()
1065
        offset += wipe_size
1066
        if now - last_output >= 60:
1067
          eta = _CalcEta(now - start_time, offset, size)
1068
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1069
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1070
          last_output = now
1071
  finally:
1072
    logging.info("Resuming synchronization of disks for instance '%s'",
1073
                 instance.name)
1074

    
1075
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1076
                                                    (map(compat.snd, disks),
1077
                                                     instance),
1078
                                                    False)
1079

    
1080
    if result.fail_msg:
1081
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1082
                    node_name, result.fail_msg)
1083
    else:
1084
      for idx, success in enumerate(result.payload):
1085
        if not success:
1086
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1087
                        " failed", idx, instance.name)
1088

    
1089

    
1090
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1091
  """Wrapper for L{WipeDisks} that handles errors.
1092

1093
  @type lu: L{LogicalUnit}
1094
  @param lu: the logical unit on whose behalf we execute
1095
  @type instance: L{objects.Instance}
1096
  @param instance: the instance whose disks we should wipe
1097
  @param disks: see L{WipeDisks}
1098
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1099
      case of error
1100
  @raise errors.OpPrereqError: in case of failure
1101

1102
  """
1103
  try:
1104
    WipeDisks(lu, instance, disks=disks)
1105
  except errors.OpExecError:
1106
    logging.warning("Wiping disks for instance '%s' failed",
1107
                    instance.name)
1108
    _UndoCreateDisks(lu, cleanup, instance)
1109
    raise
1110

    
1111

    
1112
def ExpandCheckDisks(instance, disks):
1113
  """Return the instance disks selected by the disks list
1114

1115
  @type disks: list of L{objects.Disk} or None
1116
  @param disks: selected disks
1117
  @rtype: list of L{objects.Disk}
1118
  @return: selected instance disks to act on
1119

1120
  """
1121
  if disks is None:
1122
    return instance.disks
1123
  else:
1124
    if not set(disks).issubset(instance.disks):
1125
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1126
                                   " target instance: expected a subset of %r,"
1127
                                   " got %r" % (instance.disks, disks))
1128
    return disks
1129

    
1130

    
1131
def WaitForSync(lu, instance, disks=None, oneshot=False):
1132
  """Sleep and poll for an instance's disk to sync.
1133

1134
  """
1135
  if not instance.disks or disks is not None and not disks:
1136
    return True
1137

    
1138
  disks = ExpandCheckDisks(instance, disks)
1139

    
1140
  if not oneshot:
1141
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1142

    
1143
  node_uuid = instance.primary_node
1144
  node_name = lu.cfg.GetNodeName(node_uuid)
1145

    
1146
  # TODO: Convert to utils.Retry
1147

    
1148
  retries = 0
1149
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1150
  while True:
1151
    max_time = 0
1152
    done = True
1153
    cumul_degraded = False
1154
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1155
    msg = rstats.fail_msg
1156
    if msg:
1157
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1158
      retries += 1
1159
      if retries >= 10:
1160
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1161
                                 " aborting." % node_name)
1162
      time.sleep(6)
1163
      continue
1164
    rstats = rstats.payload
1165
    retries = 0
1166
    for i, mstat in enumerate(rstats):
1167
      if mstat is None:
1168
        lu.LogWarning("Can't compute data for node %s/%s",
1169
                      node_name, disks[i].iv_name)
1170
        continue
1171

    
1172
      cumul_degraded = (cumul_degraded or
1173
                        (mstat.is_degraded and mstat.sync_percent is None))
1174
      if mstat.sync_percent is not None:
1175
        done = False
1176
        if mstat.estimated_time is not None:
1177
          rem_time = ("%s remaining (estimated)" %
1178
                      utils.FormatSeconds(mstat.estimated_time))
1179
          max_time = mstat.estimated_time
1180
        else:
1181
          rem_time = "no time estimate"
1182
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1183
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1184

    
1185
    # if we're done but degraded, let's do a few small retries, to
1186
    # make sure we see a stable and not transient situation; therefore
1187
    # we force restart of the loop
1188
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1189
      logging.info("Degraded disks found, %d retries left", degr_retries)
1190
      degr_retries -= 1
1191
      time.sleep(1)
1192
      continue
1193

    
1194
    if done or oneshot:
1195
      break
1196

    
1197
    time.sleep(min(60, max_time))
1198

    
1199
  if done:
1200
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1201

    
1202
  return not cumul_degraded
1203

    
1204

    
1205
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1206
  """Shutdown block devices of an instance.
1207

1208
  This does the shutdown on all nodes of the instance.
1209

1210
  If the ignore_primary is false, errors on the primary node are
1211
  ignored.
1212

1213
  """
1214
  lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1215
  all_result = True
1216
  disks = ExpandCheckDisks(instance, disks)
1217

    
1218
  for disk in disks:
1219
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1220
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1221
      msg = result.fail_msg
1222
      if msg:
1223
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1224
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1225
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1226
            (node_uuid != instance.primary_node and not result.offline)):
1227
          all_result = False
1228
  return all_result
1229

    
1230

    
1231
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1232
  """Shutdown block devices of an instance.
1233

1234
  This function checks if an instance is running, before calling
1235
  _ShutdownInstanceDisks.
1236

1237
  """
1238
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1239
  ShutdownInstanceDisks(lu, instance, disks=disks)
1240

    
1241

    
1242
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1243
                           ignore_size=False):
1244
  """Prepare the block devices for an instance.
1245

1246
  This sets up the block devices on all nodes.
1247

1248
  @type lu: L{LogicalUnit}
1249
  @param lu: the logical unit on whose behalf we execute
1250
  @type instance: L{objects.Instance}
1251
  @param instance: the instance for whose disks we assemble
1252
  @type disks: list of L{objects.Disk} or None
1253
  @param disks: which disks to assemble (or all, if None)
1254
  @type ignore_secondaries: boolean
1255
  @param ignore_secondaries: if true, errors on secondary nodes
1256
      won't result in an error return from the function
1257
  @type ignore_size: boolean
1258
  @param ignore_size: if true, the current known size of the disk
1259
      will not be used during the disk activation, useful for cases
1260
      when the size is wrong
1261
  @return: False if the operation failed, otherwise a list of
1262
      (host, instance_visible_name, node_visible_name)
1263
      with the mapping from node devices to instance devices
1264

1265
  """
1266
  device_info = []
1267
  disks_ok = True
1268
  disks = ExpandCheckDisks(instance, disks)
1269

    
1270
  # With the two passes mechanism we try to reduce the window of
1271
  # opportunity for the race condition of switching DRBD to primary
1272
  # before handshaking occured, but we do not eliminate it
1273

    
1274
  # The proper fix would be to wait (with some limits) until the
1275
  # connection has been made and drbd transitions from WFConnection
1276
  # into any other network-connected state (Connected, SyncTarget,
1277
  # SyncSource, etc.)
1278

    
1279
  # mark instance disks as active before doing actual work, so watcher does
1280
  # not try to shut them down erroneously
1281
  lu.cfg.MarkInstanceDisksActive(instance.uuid)
1282

    
1283
  # 1st pass, assemble on all nodes in secondary mode
1284
  for idx, inst_disk in enumerate(disks):
1285
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1286
                                  instance.primary_node):
1287
      if ignore_size:
1288
        node_disk = node_disk.Copy()
1289
        node_disk.UnsetSize()
1290
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1291
                                             instance.name, False, idx)
1292
      msg = result.fail_msg
1293
      if msg:
1294
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1295
                                result.offline)
1296
        lu.LogWarning("Could not prepare block device %s on node %s"
1297
                      " (is_primary=False, pass=1): %s",
1298
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1299
        if not (ignore_secondaries or is_offline_secondary):
1300
          disks_ok = False
1301

    
1302
  # FIXME: race condition on drbd migration to primary
1303

    
1304
  # 2nd pass, do only the primary node
1305
  for idx, inst_disk in enumerate(disks):
1306
    dev_path = None
1307

    
1308
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1309
                                  instance.primary_node):
1310
      if node_uuid != instance.primary_node:
1311
        continue
1312
      if ignore_size:
1313
        node_disk = node_disk.Copy()
1314
        node_disk.UnsetSize()
1315
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1316
                                             instance.name, True, idx)
1317
      msg = result.fail_msg
1318
      if msg:
1319
        lu.LogWarning("Could not prepare block device %s on node %s"
1320
                      " (is_primary=True, pass=2): %s",
1321
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1322
        disks_ok = False
1323
      else:
1324
        dev_path = result.payload
1325

    
1326
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1327
                        inst_disk.iv_name, dev_path))
1328

    
1329
  if not disks_ok:
1330
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1331

    
1332
  return disks_ok, device_info
1333

    
1334

    
1335
def StartInstanceDisks(lu, instance, force):
1336
  """Start the disks of an instance.
1337

1338
  """
1339
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1340
                                      ignore_secondaries=force)
1341
  if not disks_ok:
1342
    ShutdownInstanceDisks(lu, instance)
1343
    if force is not None and not force:
1344
      lu.LogWarning("",
1345
                    hint=("If the message above refers to a secondary node,"
1346
                          " you can retry the operation using '--force'"))
1347
    raise errors.OpExecError("Disk consistency error")
1348

    
1349

    
1350
class LUInstanceGrowDisk(LogicalUnit):
1351
  """Grow a disk of an instance.
1352

1353
  """
1354
  HPATH = "disk-grow"
1355
  HTYPE = constants.HTYPE_INSTANCE
1356
  REQ_BGL = False
1357

    
1358
  def ExpandNames(self):
1359
    self._ExpandAndLockInstance()
1360
    self.needed_locks[locking.LEVEL_NODE] = []
1361
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1362
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1363
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1364

    
1365
  def DeclareLocks(self, level):
1366
    if level == locking.LEVEL_NODE:
1367
      self._LockInstancesNodes()
1368
    elif level == locking.LEVEL_NODE_RES:
1369
      # Copy node locks
1370
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1371
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1372

    
1373
  def BuildHooksEnv(self):
1374
    """Build hooks env.
1375

1376
    This runs on the master, the primary and all the secondaries.
1377

1378
    """
1379
    env = {
1380
      "DISK": self.op.disk,
1381
      "AMOUNT": self.op.amount,
1382
      "ABSOLUTE": self.op.absolute,
1383
      }
1384
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1385
    return env
1386

    
1387
  def BuildHooksNodes(self):
1388
    """Build hooks nodes.
1389

1390
    """
1391
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1392
    return (nl, nl)
1393

    
1394
  def CheckPrereq(self):
1395
    """Check prerequisites.
1396

1397
    This checks that the instance is in the cluster.
1398

1399
    """
1400
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1401
    assert self.instance is not None, \
1402
      "Cannot retrieve locked instance %s" % self.op.instance_name
1403
    node_uuids = list(self.instance.all_nodes)
1404
    for node_uuid in node_uuids:
1405
      CheckNodeOnline(self, node_uuid)
1406
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1407

    
1408
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1409
      raise errors.OpPrereqError("Instance's disk layout does not support"
1410
                                 " growing", errors.ECODE_INVAL)
1411

    
1412
    self.disk = self.instance.FindDisk(self.op.disk)
1413

    
1414
    if self.op.absolute:
1415
      self.target = self.op.amount
1416
      self.delta = self.target - self.disk.size
1417
      if self.delta < 0:
1418
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1419
                                   "current disk size (%s)" %
1420
                                   (utils.FormatUnit(self.target, "h"),
1421
                                    utils.FormatUnit(self.disk.size, "h")),
1422
                                   errors.ECODE_STATE)
1423
    else:
1424
      self.delta = self.op.amount
1425
      self.target = self.disk.size + self.delta
1426
      if self.delta < 0:
1427
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1428
                                   utils.FormatUnit(self.delta, "h"),
1429
                                   errors.ECODE_INVAL)
1430

    
1431
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1432

    
1433
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1434
    template = self.instance.disk_template
1435
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1436
        not any(self.node_es_flags.values())):
1437
      # TODO: check the free disk space for file, when that feature will be
1438
      # supported
1439
      # With exclusive storage we need to do something smarter than just looking
1440
      # at free space, which, in the end, is basically a dry run. So we rely on
1441
      # the dry run performed in Exec() instead.
1442
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1443

    
1444
  def Exec(self, feedback_fn):
1445
    """Execute disk grow.
1446

1447
    """
1448
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1449
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1450
            self.owned_locks(locking.LEVEL_NODE_RES))
1451

    
1452
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1453

    
1454
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1455
    if not disks_ok:
1456
      raise errors.OpExecError("Cannot activate block device to grow")
1457

    
1458
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1459
                (self.op.disk, self.instance.name,
1460
                 utils.FormatUnit(self.delta, "h"),
1461
                 utils.FormatUnit(self.target, "h")))
1462

    
1463
    # First run all grow ops in dry-run mode
1464
    for node_uuid in self.instance.all_nodes:
1465
      result = self.rpc.call_blockdev_grow(node_uuid,
1466
                                           (self.disk, self.instance),
1467
                                           self.delta, True, True,
1468
                                           self.node_es_flags[node_uuid])
1469
      result.Raise("Dry-run grow request failed to node %s" %
1470
                   self.cfg.GetNodeName(node_uuid))
1471

    
1472
    if wipe_disks:
1473
      # Get disk size from primary node for wiping
1474
      result = self.rpc.call_blockdev_getdimensions(
1475
                 self.instance.primary_node, [([self.disk], self.instance)])
1476
      result.Raise("Failed to retrieve disk size from node '%s'" %
1477
                   self.instance.primary_node)
1478

    
1479
      (disk_dimensions, ) = result.payload
1480

    
1481
      if disk_dimensions is None:
1482
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1483
                                 " node '%s'" % self.instance.primary_node)
1484
      (disk_size_in_bytes, _) = disk_dimensions
1485

    
1486
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1487

    
1488
      assert old_disk_size >= self.disk.size, \
1489
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1490
         (old_disk_size, self.disk.size))
1491
    else:
1492
      old_disk_size = None
1493

    
1494
    # We know that (as far as we can test) operations across different
1495
    # nodes will succeed, time to run it for real on the backing storage
1496
    for node_uuid in self.instance.all_nodes:
1497
      result = self.rpc.call_blockdev_grow(node_uuid,
1498
                                           (self.disk, self.instance),
1499
                                           self.delta, False, True,
1500
                                           self.node_es_flags[node_uuid])
1501
      result.Raise("Grow request failed to node %s" %
1502
                   self.cfg.GetNodeName(node_uuid))
1503

    
1504
    # And now execute it for logical storage, on the primary node
1505
    node_uuid = self.instance.primary_node
1506
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1507
                                         self.delta, False, False,
1508
                                         self.node_es_flags[node_uuid])
1509
    result.Raise("Grow request failed to node %s" %
1510
                 self.cfg.GetNodeName(node_uuid))
1511

    
1512
    self.disk.RecordGrow(self.delta)
1513
    self.cfg.Update(self.instance, feedback_fn)
1514

    
1515
    # Changes have been recorded, release node lock
1516
    ReleaseLocks(self, locking.LEVEL_NODE)
1517

    
1518
    # Downgrade lock while waiting for sync
1519
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1520

    
1521
    assert wipe_disks ^ (old_disk_size is None)
1522

    
1523
    if wipe_disks:
1524
      assert self.instance.disks[self.op.disk] == self.disk
1525

    
1526
      # Wipe newly added disk space
1527
      WipeDisks(self, self.instance,
1528
                disks=[(self.op.disk, self.disk, old_disk_size)])
1529

    
1530
    if self.op.wait_for_sync:
1531
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1532
      if disk_abort:
1533
        self.LogWarning("Disk syncing has not returned a good status; check"
1534
                        " the instance")
1535
      if not self.instance.disks_active:
1536
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1537
    elif not self.instance.disks_active:
1538
      self.LogWarning("Not shutting down the disk even if the instance is"
1539
                      " not supposed to be running because no wait for"
1540
                      " sync mode was requested")
1541

    
1542
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1543
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1544

    
1545

    
1546
class LUInstanceReplaceDisks(LogicalUnit):
1547
  """Replace the disks of an instance.
1548

1549
  """
1550
  HPATH = "mirrors-replace"
1551
  HTYPE = constants.HTYPE_INSTANCE
1552
  REQ_BGL = False
1553

    
1554
  def CheckArguments(self):
1555
    """Check arguments.
1556

1557
    """
1558
    if self.op.mode == constants.REPLACE_DISK_CHG:
1559
      if self.op.remote_node is None and self.op.iallocator is None:
1560
        raise errors.OpPrereqError("When changing the secondary either an"
1561
                                   " iallocator script must be used or the"
1562
                                   " new node given", errors.ECODE_INVAL)
1563
      else:
1564
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1565

    
1566
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1567
      # Not replacing the secondary
1568
      raise errors.OpPrereqError("The iallocator and new node options can"
1569
                                 " only be used when changing the"
1570
                                 " secondary node", errors.ECODE_INVAL)
1571

    
1572
  def ExpandNames(self):
1573
    self._ExpandAndLockInstance()
1574

    
1575
    assert locking.LEVEL_NODE not in self.needed_locks
1576
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1577
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1578

    
1579
    assert self.op.iallocator is None or self.op.remote_node is None, \
1580
      "Conflicting options"
1581

    
1582
    if self.op.remote_node is not None:
1583
      (self.op.remote_node_uuid, self.op.remote_node) = \
1584
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1585
                              self.op.remote_node)
1586

    
1587
      # Warning: do not remove the locking of the new secondary here
1588
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1589
      # currently it doesn't since parallel invocations of
1590
      # FindUnusedMinor will conflict
1591
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1592
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1593
    else:
1594
      self.needed_locks[locking.LEVEL_NODE] = []
1595
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1596

    
1597
      if self.op.iallocator is not None:
1598
        # iallocator will select a new node in the same group
1599
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1600
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1601

    
1602
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1603

    
1604
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1605
                                   self.op.instance_name, self.op.mode,
1606
                                   self.op.iallocator, self.op.remote_node_uuid,
1607
                                   self.op.disks, self.op.early_release,
1608
                                   self.op.ignore_ipolicy)
1609

    
1610
    self.tasklets = [self.replacer]
1611

    
1612
  def DeclareLocks(self, level):
1613
    if level == locking.LEVEL_NODEGROUP:
1614
      assert self.op.remote_node_uuid is None
1615
      assert self.op.iallocator is not None
1616
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1617

    
1618
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1619
      # Lock all groups used by instance optimistically; this requires going
1620
      # via the node before it's locked, requiring verification later on
1621
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1622
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1623

    
1624
    elif level == locking.LEVEL_NODE:
1625
      if self.op.iallocator is not None:
1626
        assert self.op.remote_node_uuid is None
1627
        assert not self.needed_locks[locking.LEVEL_NODE]
1628
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1629

    
1630
        # Lock member nodes of all locked groups
1631
        self.needed_locks[locking.LEVEL_NODE] = \
1632
          [node_uuid
1633
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1634
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1635
      else:
1636
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1637

    
1638
        self._LockInstancesNodes()
1639

    
1640
    elif level == locking.LEVEL_NODE_RES:
1641
      # Reuse node locks
1642
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1643
        self.needed_locks[locking.LEVEL_NODE]
1644

    
1645
  def BuildHooksEnv(self):
1646
    """Build hooks env.
1647

1648
    This runs on the master, the primary and all the secondaries.
1649

1650
    """
1651
    instance = self.replacer.instance
1652
    env = {
1653
      "MODE": self.op.mode,
1654
      "NEW_SECONDARY": self.op.remote_node,
1655
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1656
      }
1657
    env.update(BuildInstanceHookEnvByObject(self, instance))
1658
    return env
1659

    
1660
  def BuildHooksNodes(self):
1661
    """Build hooks nodes.
1662

1663
    """
1664
    instance = self.replacer.instance
1665
    nl = [
1666
      self.cfg.GetMasterNode(),
1667
      instance.primary_node,
1668
      ]
1669
    if self.op.remote_node_uuid is not None:
1670
      nl.append(self.op.remote_node_uuid)
1671
    return nl, nl
1672

    
1673
  def CheckPrereq(self):
1674
    """Check prerequisites.
1675

1676
    """
1677
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1678
            self.op.iallocator is None)
1679

    
1680
    # Verify if node group locks are still correct
1681
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1682
    if owned_groups:
1683
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1684

    
1685
    return LogicalUnit.CheckPrereq(self)
1686

    
1687

    
1688
class LUInstanceActivateDisks(NoHooksLU):
1689
  """Bring up an instance's disks.
1690

1691
  """
1692
  REQ_BGL = False
1693

    
1694
  def ExpandNames(self):
1695
    self._ExpandAndLockInstance()
1696
    self.needed_locks[locking.LEVEL_NODE] = []
1697
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1698

    
1699
  def DeclareLocks(self, level):
1700
    if level == locking.LEVEL_NODE:
1701
      self._LockInstancesNodes()
1702

    
1703
  def CheckPrereq(self):
1704
    """Check prerequisites.
1705

1706
    This checks that the instance is in the cluster.
1707

1708
    """
1709
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1710
    assert self.instance is not None, \
1711
      "Cannot retrieve locked instance %s" % self.op.instance_name
1712
    CheckNodeOnline(self, self.instance.primary_node)
1713

    
1714
  def Exec(self, feedback_fn):
1715
    """Activate the disks.
1716

1717
    """
1718
    disks_ok, disks_info = \
1719
              AssembleInstanceDisks(self, self.instance,
1720
                                    ignore_size=self.op.ignore_size)
1721
    if not disks_ok:
1722
      raise errors.OpExecError("Cannot activate block devices")
1723

    
1724
    if self.op.wait_for_sync:
1725
      if not WaitForSync(self, self.instance):
1726
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1727
        raise errors.OpExecError("Some disks of the instance are degraded!")
1728

    
1729
    return disks_info
1730

    
1731

    
1732
class LUInstanceDeactivateDisks(NoHooksLU):
1733
  """Shutdown an instance's disks.
1734

1735
  """
1736
  REQ_BGL = False
1737

    
1738
  def ExpandNames(self):
1739
    self._ExpandAndLockInstance()
1740
    self.needed_locks[locking.LEVEL_NODE] = []
1741
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1742

    
1743
  def DeclareLocks(self, level):
1744
    if level == locking.LEVEL_NODE:
1745
      self._LockInstancesNodes()
1746

    
1747
  def CheckPrereq(self):
1748
    """Check prerequisites.
1749

1750
    This checks that the instance is in the cluster.
1751

1752
    """
1753
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1754
    assert self.instance is not None, \
1755
      "Cannot retrieve locked instance %s" % self.op.instance_name
1756

    
1757
  def Exec(self, feedback_fn):
1758
    """Deactivate the disks
1759

1760
    """
1761
    if self.op.force:
1762
      ShutdownInstanceDisks(self, self.instance)
1763
    else:
1764
      _SafeShutdownInstanceDisks(self, self.instance)
1765

    
1766

    
1767
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1768
                               ldisk=False):
1769
  """Check that mirrors are not degraded.
1770

1771
  @attention: The device has to be annotated already.
1772

1773
  The ldisk parameter, if True, will change the test from the
1774
  is_degraded attribute (which represents overall non-ok status for
1775
  the device(s)) to the ldisk (representing the local storage status).
1776

1777
  """
1778
  result = True
1779

    
1780
  if on_primary or dev.AssembleOnSecondary():
1781
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1782
    msg = rstats.fail_msg
1783
    if msg:
1784
      lu.LogWarning("Can't find disk on node %s: %s",
1785
                    lu.cfg.GetNodeName(node_uuid), msg)
1786
      result = False
1787
    elif not rstats.payload:
1788
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1789
      result = False
1790
    else:
1791
      if ldisk:
1792
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1793
      else:
1794
        result = result and not rstats.payload.is_degraded
1795

    
1796
  if dev.children:
1797
    for child in dev.children:
1798
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1799
                                                     node_uuid, on_primary)
1800

    
1801
  return result
1802

    
1803

    
1804
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1805
  """Wrapper around L{_CheckDiskConsistencyInner}.
1806

1807
  """
1808
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1809
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1810
                                    ldisk=ldisk)
1811

    
1812

    
1813
def _BlockdevFind(lu, node_uuid, dev, instance):
1814
  """Wrapper around call_blockdev_find to annotate diskparams.
1815

1816
  @param lu: A reference to the lu object
1817
  @param node_uuid: The node to call out
1818
  @param dev: The device to find
1819
  @param instance: The instance object the device belongs to
1820
  @returns The result of the rpc call
1821

1822
  """
1823
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1824
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1825

    
1826

    
1827
def _GenerateUniqueNames(lu, exts):
1828
  """Generate a suitable LV name.
1829

1830
  This will generate a logical volume name for the given instance.
1831

1832
  """
1833
  results = []
1834
  for val in exts:
1835
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1836
    results.append("%s%s" % (new_id, val))
1837
  return results
1838

    
1839

    
1840
class TLReplaceDisks(Tasklet):
1841
  """Replaces disks for an instance.
1842

1843
  Note: Locking is not within the scope of this class.
1844

1845
  """
1846
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1847
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1848
    """Initializes this class.
1849

1850
    """
1851
    Tasklet.__init__(self, lu)
1852

    
1853
    # Parameters
1854
    self.instance_uuid = instance_uuid
1855
    self.instance_name = instance_name
1856
    self.mode = mode
1857
    self.iallocator_name = iallocator_name
1858
    self.remote_node_uuid = remote_node_uuid
1859
    self.disks = disks
1860
    self.early_release = early_release
1861
    self.ignore_ipolicy = ignore_ipolicy
1862

    
1863
    # Runtime data
1864
    self.instance = None
1865
    self.new_node_uuid = None
1866
    self.target_node_uuid = None
1867
    self.other_node_uuid = None
1868
    self.remote_node_info = None
1869
    self.node_secondary_ip = None
1870

    
1871
  @staticmethod
1872
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1873
                    relocate_from_node_uuids):
1874
    """Compute a new secondary node using an IAllocator.
1875

1876
    """
1877
    req = iallocator.IAReqRelocate(
1878
          inst_uuid=instance_uuid,
1879
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1880
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1881

    
1882
    ial.Run(iallocator_name)
1883

    
1884
    if not ial.success:
1885
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1886
                                 " %s" % (iallocator_name, ial.info),
1887
                                 errors.ECODE_NORES)
1888

    
1889
    remote_node_name = ial.result[0]
1890
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1891

    
1892
    if remote_node is None:
1893
      raise errors.OpPrereqError("Node %s not found in configuration" %
1894
                                 remote_node_name, errors.ECODE_NOENT)
1895

    
1896
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1897
               instance_uuid, remote_node_name)
1898

    
1899
    return remote_node.uuid
1900

    
1901
  def _FindFaultyDisks(self, node_uuid):
1902
    """Wrapper for L{FindFaultyInstanceDisks}.
1903

1904
    """
1905
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1906
                                   node_uuid, True)
1907

    
1908
  def _CheckDisksActivated(self, instance):
1909
    """Checks if the instance disks are activated.
1910

1911
    @param instance: The instance to check disks
1912
    @return: True if they are activated, False otherwise
1913

1914
    """
1915
    node_uuids = instance.all_nodes
1916

    
1917
    for idx, dev in enumerate(instance.disks):
1918
      for node_uuid in node_uuids:
1919
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1920
                        self.cfg.GetNodeName(node_uuid))
1921

    
1922
        result = _BlockdevFind(self, node_uuid, dev, instance)
1923

    
1924
        if result.offline:
1925
          continue
1926
        elif result.fail_msg or not result.payload:
1927
          return False
1928

    
1929
    return True
1930

    
1931
  def CheckPrereq(self):
1932
    """Check prerequisites.
1933

1934
    This checks that the instance is in the cluster.
1935

1936
    """
1937
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1938
    assert self.instance is not None, \
1939
      "Cannot retrieve locked instance %s" % self.instance_name
1940

    
1941
    if self.instance.disk_template != constants.DT_DRBD8:
1942
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1943
                                 " instances", errors.ECODE_INVAL)
1944

    
1945
    if len(self.instance.secondary_nodes) != 1:
1946
      raise errors.OpPrereqError("The instance has a strange layout,"
1947
                                 " expected one secondary but found %d" %
1948
                                 len(self.instance.secondary_nodes),
1949
                                 errors.ECODE_FAULT)
1950

    
1951
    secondary_node_uuid = self.instance.secondary_nodes[0]
1952

    
1953
    if self.iallocator_name is None:
1954
      remote_node_uuid = self.remote_node_uuid
1955
    else:
1956
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1957
                                            self.instance.uuid,
1958
                                            self.instance.secondary_nodes)
1959

    
1960
    if remote_node_uuid is None:
1961
      self.remote_node_info = None
1962
    else:
1963
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1964
             "Remote node '%s' is not locked" % remote_node_uuid
1965

    
1966
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1967
      assert self.remote_node_info is not None, \
1968
        "Cannot retrieve locked node %s" % remote_node_uuid
1969

    
1970
    if remote_node_uuid == self.instance.primary_node:
1971
      raise errors.OpPrereqError("The specified node is the primary node of"
1972
                                 " the instance", errors.ECODE_INVAL)
1973

    
1974
    if remote_node_uuid == secondary_node_uuid:
1975
      raise errors.OpPrereqError("The specified node is already the"
1976
                                 " secondary node of the instance",
1977
                                 errors.ECODE_INVAL)
1978

    
1979
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1980
                                    constants.REPLACE_DISK_CHG):
1981
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1982
                                 errors.ECODE_INVAL)
1983

    
1984
    if self.mode == constants.REPLACE_DISK_AUTO:
1985
      if not self._CheckDisksActivated(self.instance):
1986
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1987
                                   " first" % self.instance_name,
1988
                                   errors.ECODE_STATE)
1989
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
1990
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
1991

    
1992
      if faulty_primary and faulty_secondary:
1993
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1994
                                   " one node and can not be repaired"
1995
                                   " automatically" % self.instance_name,
1996
                                   errors.ECODE_STATE)
1997

    
1998
      if faulty_primary:
1999
        self.disks = faulty_primary
2000
        self.target_node_uuid = self.instance.primary_node
2001
        self.other_node_uuid = secondary_node_uuid
2002
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2003
      elif faulty_secondary:
2004
        self.disks = faulty_secondary
2005
        self.target_node_uuid = secondary_node_uuid
2006
        self.other_node_uuid = self.instance.primary_node
2007
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2008
      else:
2009
        self.disks = []
2010
        check_nodes = []
2011

    
2012
    else:
2013
      # Non-automatic modes
2014
      if self.mode == constants.REPLACE_DISK_PRI:
2015
        self.target_node_uuid = self.instance.primary_node
2016
        self.other_node_uuid = secondary_node_uuid
2017
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2018

    
2019
      elif self.mode == constants.REPLACE_DISK_SEC:
2020
        self.target_node_uuid = secondary_node_uuid
2021
        self.other_node_uuid = self.instance.primary_node
2022
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2023

    
2024
      elif self.mode == constants.REPLACE_DISK_CHG:
2025
        self.new_node_uuid = remote_node_uuid
2026
        self.other_node_uuid = self.instance.primary_node
2027
        self.target_node_uuid = secondary_node_uuid
2028
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2029

    
2030
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2031
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2032

    
2033
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2034
        assert old_node_info is not None
2035
        if old_node_info.offline and not self.early_release:
2036
          # doesn't make sense to delay the release
2037
          self.early_release = True
2038
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2039
                          " early-release mode", secondary_node_uuid)
2040

    
2041
      else:
2042
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2043
                                     self.mode)
2044

    
2045
      # If not specified all disks should be replaced
2046
      if not self.disks:
2047
        self.disks = range(len(self.instance.disks))
2048

    
2049
    # TODO: This is ugly, but right now we can't distinguish between internal
2050
    # submitted opcode and external one. We should fix that.
2051
    if self.remote_node_info:
2052
      # We change the node, lets verify it still meets instance policy
2053
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2054
      cluster = self.cfg.GetClusterInfo()
2055
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2056
                                                              new_group_info)
2057
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2058
                             self.remote_node_info, self.cfg,
2059
                             ignore=self.ignore_ipolicy)
2060

    
2061
    for node_uuid in check_nodes:
2062
      CheckNodeOnline(self.lu, node_uuid)
2063

    
2064
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2065
                                                          self.other_node_uuid,
2066
                                                          self.target_node_uuid]
2067
                              if node_uuid is not None)
2068

    
2069
    # Release unneeded node and node resource locks
2070
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2071
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2072
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2073

    
2074
    # Release any owned node group
2075
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2076

    
2077
    # Check whether disks are valid
2078
    for disk_idx in self.disks:
2079
      self.instance.FindDisk(disk_idx)
2080

    
2081
    # Get secondary node IP addresses
2082
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2083
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2084

    
2085
  def Exec(self, feedback_fn):
2086
    """Execute disk replacement.
2087

2088
    This dispatches the disk replacement to the appropriate handler.
2089

2090
    """
2091
    if __debug__:
2092
      # Verify owned locks before starting operation
2093
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2094
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2095
          ("Incorrect node locks, owning %s, expected %s" %
2096
           (owned_nodes, self.node_secondary_ip.keys()))
2097
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2098
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2099
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2100

    
2101
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2102
      assert list(owned_instances) == [self.instance_name], \
2103
          "Instance '%s' not locked" % self.instance_name
2104

    
2105
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2106
          "Should not own any node group lock at this point"
2107

    
2108
    if not self.disks:
2109
      feedback_fn("No disks need replacement for instance '%s'" %
2110
                  self.instance.name)
2111
      return
2112

    
2113
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2114
                (utils.CommaJoin(self.disks), self.instance.name))
2115
    feedback_fn("Current primary node: %s" %
2116
                self.cfg.GetNodeName(self.instance.primary_node))
2117
    feedback_fn("Current seconary node: %s" %
2118
                utils.CommaJoin(self.cfg.GetNodeNames(
2119
                                  self.instance.secondary_nodes)))
2120

    
2121
    activate_disks = not self.instance.disks_active
2122

    
2123
    # Activate the instance disks if we're replacing them on a down instance
2124
    if activate_disks:
2125
      StartInstanceDisks(self.lu, self.instance, True)
2126

    
2127
    try:
2128
      # Should we replace the secondary node?
2129
      if self.new_node_uuid is not None:
2130
        fn = self._ExecDrbd8Secondary
2131
      else:
2132
        fn = self._ExecDrbd8DiskOnly
2133

    
2134
      result = fn(feedback_fn)
2135
    finally:
2136
      # Deactivate the instance disks if we're replacing them on a
2137
      # down instance
2138
      if activate_disks:
2139
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2140

    
2141
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2142

    
2143
    if __debug__:
2144
      # Verify owned locks
2145
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2146
      nodes = frozenset(self.node_secondary_ip)
2147
      assert ((self.early_release and not owned_nodes) or
2148
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2149
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2150
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2151

    
2152
    return result
2153

    
2154
  def _CheckVolumeGroup(self, node_uuids):
2155
    self.lu.LogInfo("Checking volume groups")
2156

    
2157
    vgname = self.cfg.GetVGName()
2158

    
2159
    # Make sure volume group exists on all involved nodes
2160
    results = self.rpc.call_vg_list(node_uuids)
2161
    if not results:
2162
      raise errors.OpExecError("Can't list volume groups on the nodes")
2163

    
2164
    for node_uuid in node_uuids:
2165
      res = results[node_uuid]
2166
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2167
      if vgname not in res.payload:
2168
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2169
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2170

    
2171
  def _CheckDisksExistence(self, node_uuids):
2172
    # Check disk existence
2173
    for idx, dev in enumerate(self.instance.disks):
2174
      if idx not in self.disks:
2175
        continue
2176

    
2177
      for node_uuid in node_uuids:
2178
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2179
                        self.cfg.GetNodeName(node_uuid))
2180

    
2181
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2182

    
2183
        msg = result.fail_msg
2184
        if msg or not result.payload:
2185
          if not msg:
2186
            msg = "disk not found"
2187
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2188
                                   (idx, self.cfg.GetNodeName(node_uuid), msg))
2189

    
2190
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2191
    for idx, dev in enumerate(self.instance.disks):
2192
      if idx not in self.disks:
2193
        continue
2194

    
2195
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2196
                      (idx, self.cfg.GetNodeName(node_uuid)))
2197

    
2198
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2199
                                  on_primary, ldisk=ldisk):
2200
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2201
                                 " replace disks for instance %s" %
2202
                                 (self.cfg.GetNodeName(node_uuid),
2203
                                  self.instance.name))
2204

    
2205
  def _CreateNewStorage(self, node_uuid):
2206
    """Create new storage on the primary or secondary node.
2207

2208
    This is only used for same-node replaces, not for changing the
2209
    secondary node, hence we don't want to modify the existing disk.
2210

2211
    """
2212
    iv_names = {}
2213

    
2214
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2215
    for idx, dev in enumerate(disks):
2216
      if idx not in self.disks:
2217
        continue
2218

    
2219
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2220
                      self.cfg.GetNodeName(node_uuid), idx)
2221

    
2222
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2223
      names = _GenerateUniqueNames(self.lu, lv_names)
2224

    
2225
      (data_disk, meta_disk) = dev.children
2226
      vg_data = data_disk.logical_id[0]
2227
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2228
                             logical_id=(vg_data, names[0]),
2229
                             params=data_disk.params)
2230
      vg_meta = meta_disk.logical_id[0]
2231
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2232
                             size=constants.DRBD_META_SIZE,
2233
                             logical_id=(vg_meta, names[1]),
2234
                             params=meta_disk.params)
2235

    
2236
      new_lvs = [lv_data, lv_meta]
2237
      old_lvs = [child.Copy() for child in dev.children]
2238
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2239
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2240

    
2241
      # we pass force_create=True to force the LVM creation
2242
      for new_lv in new_lvs:
2243
        try:
2244
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2245
                               GetInstanceInfoText(self.instance), False,
2246
                               excl_stor)
2247
        except errors.DeviceCreationError, e:
2248
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2249

    
2250
    return iv_names
2251

    
2252
  def _CheckDevices(self, node_uuid, iv_names):
2253
    for name, (dev, _, _) in iv_names.iteritems():
2254
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2255

    
2256
      msg = result.fail_msg
2257
      if msg or not result.payload:
2258
        if not msg:
2259
          msg = "disk not found"
2260
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2261
                                 (name, msg))
2262

    
2263
      if result.payload.is_degraded:
2264
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2265

    
2266
  def _RemoveOldStorage(self, node_uuid, iv_names):
2267
    for name, (_, old_lvs, _) in iv_names.iteritems():
2268
      self.lu.LogInfo("Remove logical volumes for %s", name)
2269

    
2270
      for lv in old_lvs:
2271
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2272
                .fail_msg
2273
        if msg:
2274
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2275
                             hint="remove unused LVs manually")
2276

    
2277
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2278
    """Replace a disk on the primary or secondary for DRBD 8.
2279

2280
    The algorithm for replace is quite complicated:
2281

2282
      1. for each disk to be replaced:
2283

2284
        1. create new LVs on the target node with unique names
2285
        1. detach old LVs from the drbd device
2286
        1. rename old LVs to name_replaced.<time_t>
2287
        1. rename new LVs to old LVs
2288
        1. attach the new LVs (with the old names now) to the drbd device
2289

2290
      1. wait for sync across all devices
2291

2292
      1. for each modified disk:
2293

2294
        1. remove old LVs (which have the name name_replaces.<time_t>)
2295

2296
    Failures are not very well handled.
2297

2298
    """
2299
    steps_total = 6
2300

    
2301
    # Step: check device activation
2302
    self.lu.LogStep(1, steps_total, "Check device existence")
2303
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2304
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2305

    
2306
    # Step: check other node consistency
2307
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2308
    self._CheckDisksConsistency(
2309
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2310
      False)
2311

    
2312
    # Step: create new storage
2313
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2314
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2315

    
2316
    # Step: for each lv, detach+rename*2+attach
2317
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2318
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2319
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2320

    
2321
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2322
                                                     (dev, self.instance),
2323
                                                     (old_lvs, self.instance))
2324
      result.Raise("Can't detach drbd from local storage on node"
2325
                   " %s for device %s" %
2326
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2327
      #dev.children = []
2328
      #cfg.Update(instance)
2329

    
2330
      # ok, we created the new LVs, so now we know we have the needed
2331
      # storage; as such, we proceed on the target node to rename
2332
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2333
      # using the assumption that logical_id == unique_id on that node
2334

    
2335
      # FIXME(iustin): use a better name for the replaced LVs
2336
      temp_suffix = int(time.time())
2337
      ren_fn = lambda d, suff: (d.logical_id[0],
2338
                                d.logical_id[1] + "_replaced-%s" % suff)
2339

    
2340
      # Build the rename list based on what LVs exist on the node
2341
      rename_old_to_new = []
2342
      for to_ren in old_lvs:
2343
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
2344
                                             (to_ren, self.instance))
2345
        if not result.fail_msg and result.payload:
2346
          # device exists
2347
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2348

    
2349
      self.lu.LogInfo("Renaming the old LVs on the target node")
2350
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2351
                                             rename_old_to_new)
2352
      result.Raise("Can't rename old LVs on node %s" %
2353
                   self.cfg.GetNodeName(self.target_node_uuid))
2354

    
2355
      # Now we rename the new LVs to the old LVs
2356
      self.lu.LogInfo("Renaming the new LVs on the target node")
2357
      rename_new_to_old = [(new, old.logical_id)
2358
                           for old, new in zip(old_lvs, new_lvs)]
2359
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2360
                                             rename_new_to_old)
2361
      result.Raise("Can't rename new LVs on node %s" %
2362
                   self.cfg.GetNodeName(self.target_node_uuid))
2363

    
2364
      # Intermediate steps of in memory modifications
2365
      for old, new in zip(old_lvs, new_lvs):
2366
        new.logical_id = old.logical_id
2367

    
2368
      # We need to modify old_lvs so that removal later removes the
2369
      # right LVs, not the newly added ones; note that old_lvs is a
2370
      # copy here
2371
      for disk in old_lvs:
2372
        disk.logical_id = ren_fn(disk, temp_suffix)
2373

    
2374
      # Now that the new lvs have the old name, we can add them to the device
2375
      self.lu.LogInfo("Adding new mirror component on %s",
2376
                      self.cfg.GetNodeName(self.target_node_uuid))
2377
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2378
                                                  (dev, self.instance),
2379
                                                  (new_lvs, self.instance))
2380
      msg = result.fail_msg
2381
      if msg:
2382
        for new_lv in new_lvs:
2383
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2384
                                               (new_lv, self.instance)).fail_msg
2385
          if msg2:
2386
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2387
                               hint=("cleanup manually the unused logical"
2388
                                     "volumes"))
2389
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2390

    
2391
    cstep = itertools.count(5)
2392

    
2393
    if self.early_release:
2394
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2395
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2396
      # TODO: Check if releasing locks early still makes sense
2397
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2398
    else:
2399
      # Release all resource locks except those used by the instance
2400
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2401
                   keep=self.node_secondary_ip.keys())
2402

    
2403
    # Release all node locks while waiting for sync
2404
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2405

    
2406
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2407
    # shutdown in the caller into consideration.
2408

    
2409
    # Wait for sync
2410
    # This can fail as the old devices are degraded and _WaitForSync
2411
    # does a combined result over all disks, so we don't check its return value
2412
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2413
    WaitForSync(self.lu, self.instance)
2414

    
2415
    # Check all devices manually
2416
    self._CheckDevices(self.instance.primary_node, iv_names)
2417

    
2418
    # Step: remove old storage
2419
    if not self.early_release:
2420
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2421
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2422

    
2423
  def _ExecDrbd8Secondary(self, feedback_fn):
2424
    """Replace the secondary node for DRBD 8.
2425

2426
    The algorithm for replace is quite complicated:
2427
      - for all disks of the instance:
2428
        - create new LVs on the new node with same names
2429
        - shutdown the drbd device on the old secondary
2430
        - disconnect the drbd network on the primary
2431
        - create the drbd device on the new secondary
2432
        - network attach the drbd on the primary, using an artifice:
2433
          the drbd code for Attach() will connect to the network if it
2434
          finds a device which is connected to the good local disks but
2435
          not network enabled
2436
      - wait for sync across all devices
2437
      - remove all disks from the old secondary
2438

2439
    Failures are not very well handled.
2440

2441
    """
2442
    steps_total = 6
2443

    
2444
    pnode = self.instance.primary_node
2445

    
2446
    # Step: check device activation
2447
    self.lu.LogStep(1, steps_total, "Check device existence")
2448
    self._CheckDisksExistence([self.instance.primary_node])
2449
    self._CheckVolumeGroup([self.instance.primary_node])
2450

    
2451
    # Step: check other node consistency
2452
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2453
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2454

    
2455
    # Step: create new storage
2456
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2457
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2458
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2459
                                                  self.new_node_uuid)
2460
    for idx, dev in enumerate(disks):
2461
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2462
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2463
      # we pass force_create=True to force LVM creation
2464
      for new_lv in dev.children:
2465
        try:
2466
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2467
                               new_lv, True, GetInstanceInfoText(self.instance),
2468
                               False, excl_stor)
2469
        except errors.DeviceCreationError, e:
2470
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2471

    
2472
    # Step 4: dbrd minors and drbd setups changes
2473
    # after this, we must manually remove the drbd minors on both the
2474
    # error and the success paths
2475
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2476
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2477
                                         for _ in self.instance.disks],
2478
                                        self.instance.uuid)
2479
    logging.debug("Allocated minors %r", minors)
2480

    
2481
    iv_names = {}
2482
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2483
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2484
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2485
      # create new devices on new_node; note that we create two IDs:
2486
      # one without port, so the drbd will be activated without
2487
      # networking information on the new node at this stage, and one
2488
      # with network, for the latter activation in step 4
2489
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2490
      if self.instance.primary_node == o_node1:
2491
        p_minor = o_minor1
2492
      else:
2493
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2494
        p_minor = o_minor2
2495

    
2496
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2497
                      p_minor, new_minor, o_secret)
2498
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2499
                    p_minor, new_minor, o_secret)
2500

    
2501
      iv_names[idx] = (dev, dev.children, new_net_id)
2502
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2503
                    new_net_id)
2504
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2505
                              logical_id=new_alone_id,
2506
                              children=dev.children,
2507
                              size=dev.size,
2508
                              params={})
2509
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2510
                                            self.cfg)
2511
      try:
2512
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2513
                             anno_new_drbd,
2514
                             GetInstanceInfoText(self.instance), False,
2515
                             excl_stor)
2516
      except errors.GenericError:
2517
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2518
        raise
2519

    
2520
    # We have new devices, shutdown the drbd on the old secondary
2521
    for idx, dev in enumerate(self.instance.disks):
2522
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2523
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2524
                                            (dev, self.instance)).fail_msg
2525
      if msg:
2526
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2527
                           "node: %s" % (idx, msg),
2528
                           hint=("Please cleanup this device manually as"
2529
                                 " soon as possible"))
2530

    
2531
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2532
    result = self.rpc.call_drbd_disconnect_net(
2533
               [pnode], (self.instance.disks, self.instance))[pnode]
2534

    
2535
    msg = result.fail_msg
2536
    if msg:
2537
      # detaches didn't succeed (unlikely)
2538
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2539
      raise errors.OpExecError("Can't detach the disks from the network on"
2540
                               " old node: %s" % (msg,))
2541

    
2542
    # if we managed to detach at least one, we update all the disks of
2543
    # the instance to point to the new secondary
2544
    self.lu.LogInfo("Updating instance configuration")
2545
    for dev, _, new_logical_id in iv_names.itervalues():
2546
      dev.logical_id = new_logical_id
2547

    
2548
    self.cfg.Update(self.instance, feedback_fn)
2549

    
2550
    # Release all node locks (the configuration has been updated)
2551
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2552

    
2553
    # and now perform the drbd attach
2554
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2555
                    " (standalone => connected)")
2556
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2557
                                            self.new_node_uuid],
2558
                                           (self.instance.disks, self.instance),
2559
                                           self.instance.name,
2560
                                           False)
2561
    for to_node, to_result in result.items():
2562
      msg = to_result.fail_msg
2563
      if msg:
2564
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2565
                           self.cfg.GetNodeName(to_node), msg,
2566
                           hint=("please do a gnt-instance info to see the"
2567
                                 " status of disks"))
2568

    
2569
    cstep = itertools.count(5)
2570

    
2571
    if self.early_release:
2572
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2573
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2574
      # TODO: Check if releasing locks early still makes sense
2575
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2576
    else:
2577
      # Release all resource locks except those used by the instance
2578
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2579
                   keep=self.node_secondary_ip.keys())
2580

    
2581
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2582
    # shutdown in the caller into consideration.
2583

    
2584
    # Wait for sync
2585
    # This can fail as the old devices are degraded and _WaitForSync
2586
    # does a combined result over all disks, so we don't check its return value
2587
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2588
    WaitForSync(self.lu, self.instance)
2589

    
2590
    # Check all devices manually
2591
    self._CheckDevices(self.instance.primary_node, iv_names)
2592

    
2593
    # Step: remove old storage
2594
    if not self.early_release:
2595
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2596
      self._RemoveOldStorage(self.target_node_uuid, iv_names)