Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 4869595d

History | View | Annotate | Download (98.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
import ganeti.rpc.node as rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
59
                         excl_stor):
60
  """Create a single block device on a given node.
61

62
  This will not recurse over children of the device, so they must be
63
  created in advance.
64

65
  @param lu: the lu on whose behalf we execute
66
  @param node_uuid: the node on which to create the device
67
  @type instance: L{objects.Instance}
68
  @param instance: the instance which owns the device
69
  @type device: L{objects.Disk}
70
  @param device: the device to create
71
  @param info: the extra 'metadata' we should attach to the device
72
      (this will be represented as a LVM tag)
73
  @type force_open: boolean
74
  @param force_open: this parameter will be passes to the
75
      L{backend.BlockdevCreate} function where it specifies
76
      whether we run on primary or not, and it affects both
77
      the child assembly and the device own Open() execution
78
  @type excl_stor: boolean
79
  @param excl_stor: Whether exclusive_storage is active for the node
80

81
  """
82
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
83
                                       device.size, instance.name, force_open,
84
                                       info, excl_stor)
85
  result.Raise("Can't create block device %s on"
86
               " node %s for instance %s" % (device,
87
                                             lu.cfg.GetNodeName(node_uuid),
88
                                             instance.name))
89

    
90

    
91
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
92
                         info, force_open, excl_stor):
93
  """Create a tree of block devices on a given node.
94

95
  If this device type has to be created on secondaries, create it and
96
  all its children.
97

98
  If not, just recurse to children keeping the same 'force' value.
99

100
  @attention: The device has to be annotated already.
101

102
  @param lu: the lu on whose behalf we execute
103
  @param node_uuid: the node on which to create the device
104
  @type instance: L{objects.Instance}
105
  @param instance: the instance which owns the device
106
  @type device: L{objects.Disk}
107
  @param device: the device to create
108
  @type force_create: boolean
109
  @param force_create: whether to force creation of this device; this
110
      will be change to True whenever we find a device which has
111
      CreateOnSecondary() attribute
112
  @param info: the extra 'metadata' we should attach to the device
113
      (this will be represented as a LVM tag)
114
  @type force_open: boolean
115
  @param force_open: this parameter will be passes to the
116
      L{backend.BlockdevCreate} function where it specifies
117
      whether we run on primary or not, and it affects both
118
      the child assembly and the device own Open() execution
119
  @type excl_stor: boolean
120
  @param excl_stor: Whether exclusive_storage is active for the node
121

122
  @return: list of created devices
123
  """
124
  created_devices = []
125
  try:
126
    if device.CreateOnSecondary():
127
      force_create = True
128

    
129
    if device.children:
130
      for child in device.children:
131
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
132
                                    force_create, info, force_open, excl_stor)
133
        created_devices.extend(devs)
134

    
135
    if not force_create:
136
      return created_devices
137

    
138
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
139
                         excl_stor)
140
    # The device has been completely created, so there is no point in keeping
141
    # its subdevices in the list. We just add the device itself instead.
142
    created_devices = [(node_uuid, device)]
143
    return created_devices
144

    
145
  except errors.DeviceCreationError, e:
146
    e.created_devices.extend(created_devices)
147
    raise e
148
  except errors.OpExecError, e:
149
    raise errors.DeviceCreationError(str(e), created_devices)
150

    
151

    
152
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
153
  """Whether exclusive_storage is in effect for the given node.
154

155
  @type cfg: L{config.ConfigWriter}
156
  @param cfg: The cluster configuration
157
  @type node_uuid: string
158
  @param node_uuid: The node UUID
159
  @rtype: bool
160
  @return: The effective value of exclusive_storage
161
  @raise errors.OpPrereqError: if no node exists with the given name
162

163
  """
164
  ni = cfg.GetNodeInfo(node_uuid)
165
  if ni is None:
166
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
167
                               errors.ECODE_NOENT)
168
  return IsExclusiveStorageEnabledNode(cfg, ni)
169

    
170

    
171
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
172
                    force_open):
173
  """Wrapper around L{_CreateBlockDevInner}.
174

175
  This method annotates the root device first.
176

177
  """
178
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
179
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
180
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
181
                              force_open, excl_stor)
182

    
183

    
184
def _UndoCreateDisks(lu, disks_created, instance):
185
  """Undo the work performed by L{CreateDisks}.
186

187
  This function is called in case of an error to undo the work of
188
  L{CreateDisks}.
189

190
  @type lu: L{LogicalUnit}
191
  @param lu: the logical unit on whose behalf we execute
192
  @param disks_created: the result returned by L{CreateDisks}
193
  @type instance: L{objects.Instance}
194
  @param instance: the instance for which disks were created
195

196
  """
197
  for (node_uuid, disk) in disks_created:
198
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
199
    result.Warn("Failed to remove newly-created disk %s on node %s" %
200
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
201

    
202

    
203
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
204
  """Create all disks for an instance.
205

206
  This abstracts away some work from AddInstance.
207

208
  @type lu: L{LogicalUnit}
209
  @param lu: the logical unit on whose behalf we execute
210
  @type instance: L{objects.Instance}
211
  @param instance: the instance whose disks we should create
212
  @type to_skip: list
213
  @param to_skip: list of indices to skip
214
  @type target_node_uuid: string
215
  @param target_node_uuid: if passed, overrides the target node for creation
216
  @type disks: list of {objects.Disk}
217
  @param disks: the disks to create; if not specified, all the disks of the
218
      instance are created
219
  @return: information about the created disks, to be used to call
220
      L{_UndoCreateDisks}
221
  @raise errors.OpPrereqError: in case of error
222

223
  """
224
  info = GetInstanceInfoText(instance)
225
  if target_node_uuid is None:
226
    pnode_uuid = instance.primary_node
227
    all_node_uuids = instance.all_nodes
228
  else:
229
    pnode_uuid = target_node_uuid
230
    all_node_uuids = [pnode_uuid]
231

    
232
  if disks is None:
233
    disks = instance.disks
234

    
235
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
236

    
237
  if instance.disk_template in constants.DTS_FILEBASED:
238
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
239
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
240

    
241
    result.Raise("Failed to create directory '%s' on"
242
                 " node %s" % (file_storage_dir,
243
                               lu.cfg.GetNodeName(pnode_uuid)))
244

    
245
  disks_created = []
246
  for idx, device in enumerate(disks):
247
    if to_skip and idx in to_skip:
248
      continue
249
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
250
    for node_uuid in all_node_uuids:
251
      f_create = node_uuid == pnode_uuid
252
      try:
253
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
254
                        f_create)
255
        disks_created.append((node_uuid, device))
256
      except errors.DeviceCreationError, e:
257
        logging.warning("Creating disk %s for instance '%s' failed",
258
                        idx, instance.name)
259
        disks_created.extend(e.created_devices)
260
        _UndoCreateDisks(lu, disks_created, instance)
261
        raise errors.OpExecError(e.message)
262
  return disks_created
263

    
264

    
265
def ComputeDiskSizePerVG(disk_template, disks):
266
  """Compute disk size requirements in the volume group
267

268
  """
269
  def _compute(disks, payload):
270
    """Universal algorithm.
271

272
    """
273
    vgs = {}
274
    for disk in disks:
275
      vgs[disk[constants.IDISK_VG]] = \
276
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
277

    
278
    return vgs
279

    
280
  # Required free disk space as a function of disk and swap space
281
  req_size_dict = {
282
    constants.DT_DISKLESS: {},
283
    constants.DT_PLAIN: _compute(disks, 0),
284
    # 128 MB are added for drbd metadata for each disk
285
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
286
    constants.DT_FILE: {},
287
    constants.DT_SHARED_FILE: {},
288
    constants.DT_GLUSTER: {},
289
    }
290

    
291
  if disk_template not in req_size_dict:
292
    raise errors.ProgrammerError("Disk template '%s' size requirement"
293
                                 " is unknown" % disk_template)
294

    
295
  return req_size_dict[disk_template]
296

    
297

    
298
def ComputeDisks(op, default_vg):
299
  """Computes the instance disks.
300

301
  @param op: The instance opcode
302
  @param default_vg: The default_vg to assume
303

304
  @return: The computed disks
305

306
  """
307
  disks = []
308
  for disk in op.disks:
309
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
310
    if mode not in constants.DISK_ACCESS_SET:
311
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
312
                                 mode, errors.ECODE_INVAL)
313
    size = disk.get(constants.IDISK_SIZE, None)
314
    if size is None:
315
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
316
    try:
317
      size = int(size)
318
    except (TypeError, ValueError):
319
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
320
                                 errors.ECODE_INVAL)
321

    
322
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
323
    if ext_provider and op.disk_template != constants.DT_EXT:
324
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
325
                                 " disk template, not %s" %
326
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
327
                                  op.disk_template), errors.ECODE_INVAL)
328

    
329
    data_vg = disk.get(constants.IDISK_VG, default_vg)
330
    name = disk.get(constants.IDISK_NAME, None)
331
    if name is not None and name.lower() == constants.VALUE_NONE:
332
      name = None
333
    new_disk = {
334
      constants.IDISK_SIZE: size,
335
      constants.IDISK_MODE: mode,
336
      constants.IDISK_VG: data_vg,
337
      constants.IDISK_NAME: name,
338
      }
339

    
340
    for key in [
341
      constants.IDISK_METAVG,
342
      constants.IDISK_ADOPT,
343
      constants.IDISK_SPINDLES,
344
      ]:
345
      if key in disk:
346
        new_disk[key] = disk[key]
347

    
348
    # For extstorage, demand the `provider' option and add any
349
    # additional parameters (ext-params) to the dict
350
    if op.disk_template == constants.DT_EXT:
351
      if ext_provider:
352
        new_disk[constants.IDISK_PROVIDER] = ext_provider
353
        for key in disk:
354
          if key not in constants.IDISK_PARAMS:
355
            new_disk[key] = disk[key]
356
      else:
357
        raise errors.OpPrereqError("Missing provider for template '%s'" %
358
                                   constants.DT_EXT, errors.ECODE_INVAL)
359

    
360
    disks.append(new_disk)
361

    
362
  return disks
363

    
364

    
365
def CheckRADOSFreeSpace():
366
  """Compute disk size requirements inside the RADOS cluster.
367

368
  """
369
  # For the RADOS cluster we assume there is always enough space.
370
  pass
371

    
372

    
373
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
374
                         iv_name, p_minor, s_minor):
375
  """Generate a drbd8 device complete with its children.
376

377
  """
378
  assert len(vgnames) == len(names) == 2
379
  port = lu.cfg.AllocatePort()
380
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
381

    
382
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
383
                          logical_id=(vgnames[0], names[0]),
384
                          params={})
385
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
386
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
387
                          size=constants.DRBD_META_SIZE,
388
                          logical_id=(vgnames[1], names[1]),
389
                          params={})
390
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
391
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
392
                          logical_id=(primary_uuid, secondary_uuid, port,
393
                                      p_minor, s_minor,
394
                                      shared_secret),
395
                          children=[dev_data, dev_meta],
396
                          iv_name=iv_name, params={})
397
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
398
  return drbd_dev
399

    
400

    
401
def GenerateDiskTemplate(
402
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
403
  disk_info, file_storage_dir, file_driver, base_index,
404
  feedback_fn, full_disk_params):
405
  """Generate the entire disk layout for a given template type.
406

407
  """
408
  vgname = lu.cfg.GetVGName()
409
  disk_count = len(disk_info)
410
  disks = []
411

    
412
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
413

    
414
  if template_name == constants.DT_DISKLESS:
415
    pass
416
  elif template_name == constants.DT_DRBD8:
417
    if len(secondary_node_uuids) != 1:
418
      raise errors.ProgrammerError("Wrong template configuration")
419
    remote_node_uuid = secondary_node_uuids[0]
420
    minors = lu.cfg.AllocateDRBDMinor(
421
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
422

    
423
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
424
                                                       full_disk_params)
425
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
426

    
427
    names = []
428
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
429
                                               for i in range(disk_count)]):
430
      names.append(lv_prefix + "_data")
431
      names.append(lv_prefix + "_meta")
432
    for idx, disk in enumerate(disk_info):
433
      disk_index = idx + base_index
434
      data_vg = disk.get(constants.IDISK_VG, vgname)
435
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
436
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
437
                                      disk[constants.IDISK_SIZE],
438
                                      [data_vg, meta_vg],
439
                                      names[idx * 2:idx * 2 + 2],
440
                                      "disk/%d" % disk_index,
441
                                      minors[idx * 2], minors[idx * 2 + 1])
442
      disk_dev.mode = disk[constants.IDISK_MODE]
443
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
444
      disks.append(disk_dev)
445
  else:
446
    if secondary_node_uuids:
447
      raise errors.ProgrammerError("Wrong template configuration")
448

    
449
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
450
    if name_prefix is None:
451
      names = None
452
    else:
453
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
454
                                        (name_prefix, base_index + i)
455
                                        for i in range(disk_count)])
456

    
457
    if template_name == constants.DT_PLAIN:
458

    
459
      def logical_id_fn(idx, _, disk):
460
        vg = disk.get(constants.IDISK_VG, vgname)
461
        return (vg, names[idx])
462

    
463
    elif template_name == constants.DT_GLUSTER:
464
      logical_id_fn = lambda _1, disk_index, _2: \
465
        (file_driver, "ganeti/%s.%d" % (instance_uuid,
466
                                        disk_index))
467

    
468
    elif template_name in constants.DTS_FILEBASED: # Gluster handled above
469
      logical_id_fn = \
470
        lambda _, disk_index, disk: (file_driver,
471
                                     "%s/disk%d" % (file_storage_dir,
472
                                                    disk_index))
473
    elif template_name == constants.DT_BLOCK:
474
      logical_id_fn = \
475
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
476
                                       disk[constants.IDISK_ADOPT])
477
    elif template_name == constants.DT_RBD:
478
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
479
    elif template_name == constants.DT_EXT:
480
      def logical_id_fn(idx, _, disk):
481
        provider = disk.get(constants.IDISK_PROVIDER, None)
482
        if provider is None:
483
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
484
                                       " not found", constants.DT_EXT,
485
                                       constants.IDISK_PROVIDER)
486
        return (provider, names[idx])
487
    else:
488
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
489

    
490
    dev_type = template_name
491

    
492
    for idx, disk in enumerate(disk_info):
493
      params = {}
494
      # Only for the Ext template add disk_info to params
495
      if template_name == constants.DT_EXT:
496
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
497
        for key in disk:
498
          if key not in constants.IDISK_PARAMS:
499
            params[key] = disk[key]
500
      disk_index = idx + base_index
501
      size = disk[constants.IDISK_SIZE]
502
      feedback_fn("* disk %s, size %s" %
503
                  (disk_index, utils.FormatUnit(size, "h")))
504
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
505
                              logical_id=logical_id_fn(idx, disk_index, disk),
506
                              iv_name="disk/%d" % disk_index,
507
                              mode=disk[constants.IDISK_MODE],
508
                              params=params,
509
                              spindles=disk.get(constants.IDISK_SPINDLES))
510
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
511
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
512
      disks.append(disk_dev)
513

    
514
  return disks
515

    
516

    
517
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
518
  """Check the presence of the spindle options with exclusive_storage.
519

520
  @type diskdict: dict
521
  @param diskdict: disk parameters
522
  @type es_flag: bool
523
  @param es_flag: the effective value of the exlusive_storage flag
524
  @type required: bool
525
  @param required: whether spindles are required or just optional
526
  @raise errors.OpPrereqError when spindles are given and they should not
527

528
  """
529
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
530
      diskdict[constants.IDISK_SPINDLES] is not None):
531
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
532
                               " when exclusive storage is not active",
533
                               errors.ECODE_INVAL)
534
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
535
                                diskdict[constants.IDISK_SPINDLES] is None)):
536
    raise errors.OpPrereqError("You must specify spindles in instance disks"
537
                               " when exclusive storage is active",
538
                               errors.ECODE_INVAL)
539

    
540

    
541
class LUInstanceRecreateDisks(LogicalUnit):
542
  """Recreate an instance's missing disks.
543

544
  """
545
  HPATH = "instance-recreate-disks"
546
  HTYPE = constants.HTYPE_INSTANCE
547
  REQ_BGL = False
548

    
549
  _MODIFYABLE = compat.UniqueFrozenset([
550
    constants.IDISK_SIZE,
551
    constants.IDISK_MODE,
552
    constants.IDISK_SPINDLES,
553
    ])
554

    
555
  # New or changed disk parameters may have different semantics
556
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
557
    constants.IDISK_ADOPT,
558

    
559
    # TODO: Implement support changing VG while recreating
560
    constants.IDISK_VG,
561
    constants.IDISK_METAVG,
562
    constants.IDISK_PROVIDER,
563
    constants.IDISK_NAME,
564
    ]))
565

    
566
  def _RunAllocator(self):
567
    """Run the allocator based on input opcode.
568

569
    """
570
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
571

    
572
    # FIXME
573
    # The allocator should actually run in "relocate" mode, but current
574
    # allocators don't support relocating all the nodes of an instance at
575
    # the same time. As a workaround we use "allocate" mode, but this is
576
    # suboptimal for two reasons:
577
    # - The instance name passed to the allocator is present in the list of
578
    #   existing instances, so there could be a conflict within the
579
    #   internal structures of the allocator. This doesn't happen with the
580
    #   current allocators, but it's a liability.
581
    # - The allocator counts the resources used by the instance twice: once
582
    #   because the instance exists already, and once because it tries to
583
    #   allocate a new instance.
584
    # The allocator could choose some of the nodes on which the instance is
585
    # running, but that's not a problem. If the instance nodes are broken,
586
    # they should be already be marked as drained or offline, and hence
587
    # skipped by the allocator. If instance disks have been lost for other
588
    # reasons, then recreating the disks on the same nodes should be fine.
589
    disk_template = self.instance.disk_template
590
    spindle_use = be_full[constants.BE_SPINDLE_USE]
591
    disks = [{
592
      constants.IDISK_SIZE: d.size,
593
      constants.IDISK_MODE: d.mode,
594
      constants.IDISK_SPINDLES: d.spindles,
595
      } for d in self.instance.disks]
596
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
597
                                        disk_template=disk_template,
598
                                        tags=list(self.instance.GetTags()),
599
                                        os=self.instance.os,
600
                                        nics=[{}],
601
                                        vcpus=be_full[constants.BE_VCPUS],
602
                                        memory=be_full[constants.BE_MAXMEM],
603
                                        spindle_use=spindle_use,
604
                                        disks=disks,
605
                                        hypervisor=self.instance.hypervisor,
606
                                        node_whitelist=None)
607
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
608

    
609
    ial.Run(self.op.iallocator)
610

    
611
    assert req.RequiredNodes() == len(self.instance.all_nodes)
612

    
613
    if not ial.success:
614
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
615
                                 " %s" % (self.op.iallocator, ial.info),
616
                                 errors.ECODE_NORES)
617

    
618
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
619
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
620
                 self.op.instance_name, self.op.iallocator,
621
                 utils.CommaJoin(self.op.nodes))
622

    
623
  def CheckArguments(self):
624
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
625
      # Normalize and convert deprecated list of disk indices
626
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
627

    
628
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
629
    if duplicates:
630
      raise errors.OpPrereqError("Some disks have been specified more than"
631
                                 " once: %s" % utils.CommaJoin(duplicates),
632
                                 errors.ECODE_INVAL)
633

    
634
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
635
    # when neither iallocator nor nodes are specified
636
    if self.op.iallocator or self.op.nodes:
637
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
638

    
639
    for (idx, params) in self.op.disks:
640
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
641
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
642
      if unsupported:
643
        raise errors.OpPrereqError("Parameters for disk %s try to change"
644
                                   " unmodifyable parameter(s): %s" %
645
                                   (idx, utils.CommaJoin(unsupported)),
646
                                   errors.ECODE_INVAL)
647

    
648
  def ExpandNames(self):
649
    self._ExpandAndLockInstance()
650
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
651

    
652
    if self.op.nodes:
653
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
654
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
655
    else:
656
      self.needed_locks[locking.LEVEL_NODE] = []
657
      if self.op.iallocator:
658
        # iallocator will select a new node in the same group
659
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
660
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
661

    
662
    self.needed_locks[locking.LEVEL_NODE_RES] = []
663

    
664
  def DeclareLocks(self, level):
665
    if level == locking.LEVEL_NODEGROUP:
666
      assert self.op.iallocator is not None
667
      assert not self.op.nodes
668
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
669
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
670
      # Lock the primary group used by the instance optimistically; this
671
      # requires going via the node before it's locked, requiring
672
      # verification later on
673
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
674
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
675

    
676
    elif level == locking.LEVEL_NODE:
677
      # If an allocator is used, then we lock all the nodes in the current
678
      # instance group, as we don't know yet which ones will be selected;
679
      # if we replace the nodes without using an allocator, locks are
680
      # already declared in ExpandNames; otherwise, we need to lock all the
681
      # instance nodes for disk re-creation
682
      if self.op.iallocator:
683
        assert not self.op.nodes
684
        assert not self.needed_locks[locking.LEVEL_NODE]
685
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
686

    
687
        # Lock member nodes of the group of the primary node
688
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
689
          self.needed_locks[locking.LEVEL_NODE].extend(
690
            self.cfg.GetNodeGroup(group_uuid).members)
691

    
692
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
693
      elif not self.op.nodes:
694
        self._LockInstancesNodes(primary_only=False)
695
    elif level == locking.LEVEL_NODE_RES:
696
      # Copy node locks
697
      self.needed_locks[locking.LEVEL_NODE_RES] = \
698
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
699

    
700
  def BuildHooksEnv(self):
701
    """Build hooks env.
702

703
    This runs on master, primary and secondary nodes of the instance.
704

705
    """
706
    return BuildInstanceHookEnvByObject(self, self.instance)
707

    
708
  def BuildHooksNodes(self):
709
    """Build hooks nodes.
710

711
    """
712
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
713
    return (nl, nl)
714

    
715
  def CheckPrereq(self):
716
    """Check prerequisites.
717

718
    This checks that the instance is in the cluster and is not running.
719

720
    """
721
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
722
    assert instance is not None, \
723
      "Cannot retrieve locked instance %s" % self.op.instance_name
724
    if self.op.node_uuids:
725
      if len(self.op.node_uuids) != len(instance.all_nodes):
726
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
727
                                   " %d replacement nodes were specified" %
728
                                   (instance.name, len(instance.all_nodes),
729
                                    len(self.op.node_uuids)),
730
                                   errors.ECODE_INVAL)
731
      assert instance.disk_template != constants.DT_DRBD8 or \
732
             len(self.op.node_uuids) == 2
733
      assert instance.disk_template != constants.DT_PLAIN or \
734
             len(self.op.node_uuids) == 1
735
      primary_node = self.op.node_uuids[0]
736
    else:
737
      primary_node = instance.primary_node
738
    if not self.op.iallocator:
739
      CheckNodeOnline(self, primary_node)
740

    
741
    if instance.disk_template == constants.DT_DISKLESS:
742
      raise errors.OpPrereqError("Instance '%s' has no disks" %
743
                                 self.op.instance_name, errors.ECODE_INVAL)
744

    
745
    # Verify if node group locks are still correct
746
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
747
    if owned_groups:
748
      # Node group locks are acquired only for the primary node (and only
749
      # when the allocator is used)
750
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
751
                              primary_only=True)
752

    
753
    # if we replace nodes *and* the old primary is offline, we don't
754
    # check the instance state
755
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
756
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
757
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
758
                         msg="cannot recreate disks")
759

    
760
    if self.op.disks:
761
      self.disks = dict(self.op.disks)
762
    else:
763
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
764

    
765
    maxidx = max(self.disks.keys())
766
    if maxidx >= len(instance.disks):
767
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
768
                                 errors.ECODE_INVAL)
769

    
770
    if ((self.op.node_uuids or self.op.iallocator) and
771
         sorted(self.disks.keys()) != range(len(instance.disks))):
772
      raise errors.OpPrereqError("Can't recreate disks partially and"
773
                                 " change the nodes at the same time",
774
                                 errors.ECODE_INVAL)
775

    
776
    self.instance = instance
777

    
778
    if self.op.iallocator:
779
      self._RunAllocator()
780
      # Release unneeded node and node resource locks
781
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
782
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
783
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
784

    
785
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
786

    
787
    if self.op.node_uuids:
788
      node_uuids = self.op.node_uuids
789
    else:
790
      node_uuids = instance.all_nodes
791
    excl_stor = compat.any(
792
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
793
      )
794
    for new_params in self.disks.values():
795
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
796

    
797
  def Exec(self, feedback_fn):
798
    """Recreate the disks.
799

800
    """
801
    assert (self.owned_locks(locking.LEVEL_NODE) ==
802
            self.owned_locks(locking.LEVEL_NODE_RES))
803

    
804
    to_skip = []
805
    mods = [] # keeps track of needed changes
806

    
807
    for idx, disk in enumerate(self.instance.disks):
808
      try:
809
        changes = self.disks[idx]
810
      except KeyError:
811
        # Disk should not be recreated
812
        to_skip.append(idx)
813
        continue
814

    
815
      # update secondaries for disks, if needed
816
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
817
        # need to update the nodes and minors
818
        assert len(self.op.node_uuids) == 2
819
        assert len(disk.logical_id) == 6 # otherwise disk internals
820
                                         # have changed
821
        (_, _, old_port, _, _, old_secret) = disk.logical_id
822
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
823
                                                self.instance.uuid)
824
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
825
                  new_minors[0], new_minors[1], old_secret)
826
        assert len(disk.logical_id) == len(new_id)
827
      else:
828
        new_id = None
829

    
830
      mods.append((idx, new_id, changes))
831

    
832
    # now that we have passed all asserts above, we can apply the mods
833
    # in a single run (to avoid partial changes)
834
    for idx, new_id, changes in mods:
835
      disk = self.instance.disks[idx]
836
      if new_id is not None:
837
        assert disk.dev_type == constants.DT_DRBD8
838
        disk.logical_id = new_id
839
      if changes:
840
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
841
                    mode=changes.get(constants.IDISK_MODE, None),
842
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
843

    
844
    # change primary node, if needed
845
    if self.op.node_uuids:
846
      self.instance.primary_node = self.op.node_uuids[0]
847
      self.LogWarning("Changing the instance's nodes, you will have to"
848
                      " remove any disks left on the older nodes manually")
849

    
850
    if self.op.node_uuids:
851
      self.cfg.Update(self.instance, feedback_fn)
852

    
853
    # All touched nodes must be locked
854
    mylocks = self.owned_locks(locking.LEVEL_NODE)
855
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
856
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
857

    
858
    # TODO: Release node locks before wiping, or explain why it's not possible
859
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
860
      wipedisks = [(idx, disk, 0)
861
                   for (idx, disk) in enumerate(self.instance.disks)
862
                   if idx not in to_skip]
863
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
864
                         cleanup=new_disks)
865

    
866

    
867
def _PerformNodeInfoCall(lu, node_uuids, vg):
868
  """Prepares the input and performs a node info call.
869

870
  @type lu: C{LogicalUnit}
871
  @param lu: a logical unit from which we get configuration data
872
  @type node_uuids: list of string
873
  @param node_uuids: list of node UUIDs to perform the call for
874
  @type vg: string
875
  @param vg: the volume group's name
876

877
  """
878
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
879
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
880
                                                  node_uuids)
881
  hvname = lu.cfg.GetHypervisorType()
882
  hvparams = lu.cfg.GetClusterInfo().hvparams
883
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
884
                                   [(hvname, hvparams[hvname])])
885
  return nodeinfo
886

    
887

    
888
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
889
  """Checks the vg capacity for a given node.
890

891
  @type node_info: tuple (_, list of dicts, _)
892
  @param node_info: the result of the node info call for one node
893
  @type node_name: string
894
  @param node_name: the name of the node
895
  @type vg: string
896
  @param vg: volume group name
897
  @type requested: int
898
  @param requested: the amount of disk in MiB to check for
899
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
900
      or we cannot check the node
901

902
  """
903
  (_, space_info, _) = node_info
904
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
905
      space_info, constants.ST_LVM_VG)
906
  if not lvm_vg_info:
907
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
908
  vg_free = lvm_vg_info.get("storage_free", None)
909
  if not isinstance(vg_free, int):
910
    raise errors.OpPrereqError("Can't compute free disk space on node"
911
                               " %s for vg %s, result was '%s'" %
912
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
913
  if requested > vg_free:
914
    raise errors.OpPrereqError("Not enough disk space on target node %s"
915
                               " vg %s: required %d MiB, available %d MiB" %
916
                               (node_name, vg, requested, vg_free),
917
                               errors.ECODE_NORES)
918

    
919

    
920
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
921
  """Checks if nodes have enough free disk space in the specified VG.
922

923
  This function checks if all given nodes have the needed amount of
924
  free disk. In case any node has less disk or we cannot get the
925
  information from the node, this function raises an OpPrereqError
926
  exception.
927

928
  @type lu: C{LogicalUnit}
929
  @param lu: a logical unit from which we get configuration data
930
  @type node_uuids: C{list}
931
  @param node_uuids: the list of node UUIDs to check
932
  @type vg: C{str}
933
  @param vg: the volume group to check
934
  @type requested: C{int}
935
  @param requested: the amount of disk in MiB to check for
936
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
937
      or we cannot check the node
938

939
  """
940
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
941
  for node_uuid in node_uuids:
942
    node_name = lu.cfg.GetNodeName(node_uuid)
943
    info = nodeinfo[node_uuid]
944
    info.Raise("Cannot get current information from node %s" % node_name,
945
               prereq=True, ecode=errors.ECODE_ENVIRON)
946
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
947

    
948

    
949
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
950
  """Checks if nodes have enough free disk space in all the VGs.
951

952
  This function checks if all given nodes have the needed amount of
953
  free disk. In case any node has less disk or we cannot get the
954
  information from the node, this function raises an OpPrereqError
955
  exception.
956

957
  @type lu: C{LogicalUnit}
958
  @param lu: a logical unit from which we get configuration data
959
  @type node_uuids: C{list}
960
  @param node_uuids: the list of node UUIDs to check
961
  @type req_sizes: C{dict}
962
  @param req_sizes: the hash of vg and corresponding amount of disk in
963
      MiB to check for
964
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
965
      or we cannot check the node
966

967
  """
968
  for vg, req_size in req_sizes.items():
969
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
970

    
971

    
972
def _DiskSizeInBytesToMebibytes(lu, size):
973
  """Converts a disk size in bytes to mebibytes.
974

975
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
976

977
  """
978
  (mib, remainder) = divmod(size, 1024 * 1024)
979

    
980
  if remainder != 0:
981
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
982
                  " to not overwrite existing data (%s bytes will not be"
983
                  " wiped)", (1024 * 1024) - remainder)
984
    mib += 1
985

    
986
  return mib
987

    
988

    
989
def _CalcEta(time_taken, written, total_size):
990
  """Calculates the ETA based on size written and total size.
991

992
  @param time_taken: The time taken so far
993
  @param written: amount written so far
994
  @param total_size: The total size of data to be written
995
  @return: The remaining time in seconds
996

997
  """
998
  avg_time = time_taken / float(written)
999
  return (total_size - written) * avg_time
1000

    
1001

    
1002
def WipeDisks(lu, instance, disks=None):
1003
  """Wipes instance disks.
1004

1005
  @type lu: L{LogicalUnit}
1006
  @param lu: the logical unit on whose behalf we execute
1007
  @type instance: L{objects.Instance}
1008
  @param instance: the instance whose disks we should create
1009
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1010
  @param disks: Disk details; tuple contains disk index, disk object and the
1011
    start offset
1012

1013
  """
1014
  node_uuid = instance.primary_node
1015
  node_name = lu.cfg.GetNodeName(node_uuid)
1016

    
1017
  if disks is None:
1018
    disks = [(idx, disk, 0)
1019
             for (idx, disk) in enumerate(instance.disks)]
1020

    
1021
  logging.info("Pausing synchronization of disks of instance '%s'",
1022
               instance.name)
1023
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1024
                                                  (map(compat.snd, disks),
1025
                                                   instance),
1026
                                                  True)
1027
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1028

    
1029
  for idx, success in enumerate(result.payload):
1030
    if not success:
1031
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1032
                   " failed", idx, instance.name)
1033

    
1034
  try:
1035
    for (idx, device, offset) in disks:
1036
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1037
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1038
      wipe_chunk_size = \
1039
        int(min(constants.MAX_WIPE_CHUNK,
1040
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1041

    
1042
      size = device.size
1043
      last_output = 0
1044
      start_time = time.time()
1045

    
1046
      if offset == 0:
1047
        info_text = ""
1048
      else:
1049
        info_text = (" (from %s to %s)" %
1050
                     (utils.FormatUnit(offset, "h"),
1051
                      utils.FormatUnit(size, "h")))
1052

    
1053
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1054

    
1055
      logging.info("Wiping disk %d for instance %s on node %s using"
1056
                   " chunk size %s", idx, instance.name, node_name,
1057
                   wipe_chunk_size)
1058

    
1059
      while offset < size:
1060
        wipe_size = min(wipe_chunk_size, size - offset)
1061

    
1062
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1063
                      idx, offset, wipe_size)
1064

    
1065
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1066
                                           offset, wipe_size)
1067
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1068
                     (idx, offset, wipe_size))
1069

    
1070
        now = time.time()
1071
        offset += wipe_size
1072
        if now - last_output >= 60:
1073
          eta = _CalcEta(now - start_time, offset, size)
1074
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1075
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1076
          last_output = now
1077
  finally:
1078
    logging.info("Resuming synchronization of disks for instance '%s'",
1079
                 instance.name)
1080

    
1081
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1082
                                                    (map(compat.snd, disks),
1083
                                                     instance),
1084
                                                    False)
1085

    
1086
    if result.fail_msg:
1087
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1088
                    node_name, result.fail_msg)
1089
    else:
1090
      for idx, success in enumerate(result.payload):
1091
        if not success:
1092
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1093
                        " failed", idx, instance.name)
1094

    
1095

    
1096
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1097
  """Wrapper for L{WipeDisks} that handles errors.
1098

1099
  @type lu: L{LogicalUnit}
1100
  @param lu: the logical unit on whose behalf we execute
1101
  @type instance: L{objects.Instance}
1102
  @param instance: the instance whose disks we should wipe
1103
  @param disks: see L{WipeDisks}
1104
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1105
      case of error
1106
  @raise errors.OpPrereqError: in case of failure
1107

1108
  """
1109
  try:
1110
    WipeDisks(lu, instance, disks=disks)
1111
  except errors.OpExecError:
1112
    logging.warning("Wiping disks for instance '%s' failed",
1113
                    instance.name)
1114
    _UndoCreateDisks(lu, cleanup, instance)
1115
    raise
1116

    
1117

    
1118
def ExpandCheckDisks(instance, disks):
1119
  """Return the instance disks selected by the disks list
1120

1121
  @type disks: list of L{objects.Disk} or None
1122
  @param disks: selected disks
1123
  @rtype: list of L{objects.Disk}
1124
  @return: selected instance disks to act on
1125

1126
  """
1127
  if disks is None:
1128
    return instance.disks
1129
  else:
1130
    if not set(disks).issubset(instance.disks):
1131
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1132
                                   " target instance: expected a subset of %r,"
1133
                                   " got %r" % (instance.disks, disks))
1134
    return disks
1135

    
1136

    
1137
def WaitForSync(lu, instance, disks=None, oneshot=False):
1138
  """Sleep and poll for an instance's disk to sync.
1139

1140
  """
1141
  if not instance.disks or disks is not None and not disks:
1142
    return True
1143

    
1144
  disks = ExpandCheckDisks(instance, disks)
1145

    
1146
  if not oneshot:
1147
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1148

    
1149
  node_uuid = instance.primary_node
1150
  node_name = lu.cfg.GetNodeName(node_uuid)
1151

    
1152
  # TODO: Convert to utils.Retry
1153

    
1154
  retries = 0
1155
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1156
  while True:
1157
    max_time = 0
1158
    done = True
1159
    cumul_degraded = False
1160
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1161
    msg = rstats.fail_msg
1162
    if msg:
1163
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1164
      retries += 1
1165
      if retries >= 10:
1166
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1167
                                 " aborting." % node_name)
1168
      time.sleep(6)
1169
      continue
1170
    rstats = rstats.payload
1171
    retries = 0
1172
    for i, mstat in enumerate(rstats):
1173
      if mstat is None:
1174
        lu.LogWarning("Can't compute data for node %s/%s",
1175
                      node_name, disks[i].iv_name)
1176
        continue
1177

    
1178
      cumul_degraded = (cumul_degraded or
1179
                        (mstat.is_degraded and mstat.sync_percent is None))
1180
      if mstat.sync_percent is not None:
1181
        done = False
1182
        if mstat.estimated_time is not None:
1183
          rem_time = ("%s remaining (estimated)" %
1184
                      utils.FormatSeconds(mstat.estimated_time))
1185
          max_time = mstat.estimated_time
1186
        else:
1187
          rem_time = "no time estimate"
1188
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1189
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1190

    
1191
    # if we're done but degraded, let's do a few small retries, to
1192
    # make sure we see a stable and not transient situation; therefore
1193
    # we force restart of the loop
1194
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1195
      logging.info("Degraded disks found, %d retries left", degr_retries)
1196
      degr_retries -= 1
1197
      time.sleep(1)
1198
      continue
1199

    
1200
    if done or oneshot:
1201
      break
1202

    
1203
    time.sleep(min(60, max_time))
1204

    
1205
  if done:
1206
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1207

    
1208
  return not cumul_degraded
1209

    
1210

    
1211
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1212
  """Shutdown block devices of an instance.
1213

1214
  This does the shutdown on all nodes of the instance.
1215

1216
  If the ignore_primary is false, errors on the primary node are
1217
  ignored.
1218

1219
  """
1220
  all_result = True
1221

    
1222
  if disks is None:
1223
    # only mark instance disks as inactive if all disks are affected
1224
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1225
  disks = ExpandCheckDisks(instance, disks)
1226

    
1227
  for disk in disks:
1228
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1229
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1230
      msg = result.fail_msg
1231
      if msg:
1232
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1233
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1234
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1235
            (node_uuid != instance.primary_node and not result.offline)):
1236
          all_result = False
1237
  return all_result
1238

    
1239

    
1240
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1241
  """Shutdown block devices of an instance.
1242

1243
  This function checks if an instance is running, before calling
1244
  _ShutdownInstanceDisks.
1245

1246
  """
1247
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1248
  ShutdownInstanceDisks(lu, instance, disks=disks)
1249

    
1250

    
1251
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1252
                          ignore_size=False):
1253
  """Prepare the block devices for an instance.
1254

1255
  This sets up the block devices on all nodes.
1256

1257
  @type lu: L{LogicalUnit}
1258
  @param lu: the logical unit on whose behalf we execute
1259
  @type instance: L{objects.Instance}
1260
  @param instance: the instance for whose disks we assemble
1261
  @type disks: list of L{objects.Disk} or None
1262
  @param disks: which disks to assemble (or all, if None)
1263
  @type ignore_secondaries: boolean
1264
  @param ignore_secondaries: if true, errors on secondary nodes
1265
      won't result in an error return from the function
1266
  @type ignore_size: boolean
1267
  @param ignore_size: if true, the current known size of the disk
1268
      will not be used during the disk activation, useful for cases
1269
      when the size is wrong
1270
  @return: False if the operation failed, otherwise a list of
1271
      (host, instance_visible_name, node_visible_name)
1272
      with the mapping from node devices to instance devices
1273

1274
  """
1275
  device_info = []
1276
  disks_ok = True
1277

    
1278
  if disks is None:
1279
    # only mark instance disks as active if all disks are affected
1280
    lu.cfg.MarkInstanceDisksActive(instance.uuid)
1281

    
1282
  disks = ExpandCheckDisks(instance, disks)
1283

    
1284
  # With the two passes mechanism we try to reduce the window of
1285
  # opportunity for the race condition of switching DRBD to primary
1286
  # before handshaking occured, but we do not eliminate it
1287

    
1288
  # The proper fix would be to wait (with some limits) until the
1289
  # connection has been made and drbd transitions from WFConnection
1290
  # into any other network-connected state (Connected, SyncTarget,
1291
  # SyncSource, etc.)
1292

    
1293
  # 1st pass, assemble on all nodes in secondary mode
1294
  for idx, inst_disk in enumerate(disks):
1295
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1296
                                  instance.primary_node):
1297
      if ignore_size:
1298
        node_disk = node_disk.Copy()
1299
        node_disk.UnsetSize()
1300
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1301
                                             instance.name, False, idx)
1302
      msg = result.fail_msg
1303
      if msg:
1304
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1305
                                result.offline)
1306
        lu.LogWarning("Could not prepare block device %s on node %s"
1307
                      " (is_primary=False, pass=1): %s",
1308
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1309
        if not (ignore_secondaries or is_offline_secondary):
1310
          disks_ok = False
1311

    
1312
  # FIXME: race condition on drbd migration to primary
1313

    
1314
  # 2nd pass, do only the primary node
1315
  for idx, inst_disk in enumerate(disks):
1316
    dev_path = None
1317

    
1318
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1319
                                  instance.primary_node):
1320
      if node_uuid != instance.primary_node:
1321
        continue
1322
      if ignore_size:
1323
        node_disk = node_disk.Copy()
1324
        node_disk.UnsetSize()
1325
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1326
                                             instance.name, True, idx)
1327
      msg = result.fail_msg
1328
      if msg:
1329
        lu.LogWarning("Could not prepare block device %s on node %s"
1330
                      " (is_primary=True, pass=2): %s",
1331
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1332
        disks_ok = False
1333
      else:
1334
        dev_path, _ = result.payload
1335

    
1336
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1337
                        inst_disk.iv_name, dev_path))
1338

    
1339
  if not disks_ok:
1340
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1341

    
1342
  return disks_ok, device_info
1343

    
1344

    
1345
def StartInstanceDisks(lu, instance, force):
1346
  """Start the disks of an instance.
1347

1348
  """
1349
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1350
                                      ignore_secondaries=force)
1351
  if not disks_ok:
1352
    ShutdownInstanceDisks(lu, instance)
1353
    if force is not None and not force:
1354
      lu.LogWarning("",
1355
                    hint=("If the message above refers to a secondary node,"
1356
                          " you can retry the operation using '--force'"))
1357
    raise errors.OpExecError("Disk consistency error")
1358

    
1359

    
1360
class LUInstanceGrowDisk(LogicalUnit):
1361
  """Grow a disk of an instance.
1362

1363
  """
1364
  HPATH = "disk-grow"
1365
  HTYPE = constants.HTYPE_INSTANCE
1366
  REQ_BGL = False
1367

    
1368
  def ExpandNames(self):
1369
    self._ExpandAndLockInstance()
1370
    self.needed_locks[locking.LEVEL_NODE] = []
1371
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1372
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1373
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1374

    
1375
  def DeclareLocks(self, level):
1376
    if level == locking.LEVEL_NODE:
1377
      self._LockInstancesNodes()
1378
    elif level == locking.LEVEL_NODE_RES:
1379
      # Copy node locks
1380
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1381
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1382

    
1383
  def BuildHooksEnv(self):
1384
    """Build hooks env.
1385

1386
    This runs on the master, the primary and all the secondaries.
1387

1388
    """
1389
    env = {
1390
      "DISK": self.op.disk,
1391
      "AMOUNT": self.op.amount,
1392
      "ABSOLUTE": self.op.absolute,
1393
      }
1394
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1395
    return env
1396

    
1397
  def BuildHooksNodes(self):
1398
    """Build hooks nodes.
1399

1400
    """
1401
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1402
    return (nl, nl)
1403

    
1404
  def CheckPrereq(self):
1405
    """Check prerequisites.
1406

1407
    This checks that the instance is in the cluster.
1408

1409
    """
1410
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1411
    assert self.instance is not None, \
1412
      "Cannot retrieve locked instance %s" % self.op.instance_name
1413
    node_uuids = list(self.instance.all_nodes)
1414
    for node_uuid in node_uuids:
1415
      CheckNodeOnline(self, node_uuid)
1416
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1417

    
1418
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1419
      raise errors.OpPrereqError("Instance's disk layout does not support"
1420
                                 " growing", errors.ECODE_INVAL)
1421

    
1422
    self.disk = self.instance.FindDisk(self.op.disk)
1423

    
1424
    if self.op.absolute:
1425
      self.target = self.op.amount
1426
      self.delta = self.target - self.disk.size
1427
      if self.delta < 0:
1428
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1429
                                   "current disk size (%s)" %
1430
                                   (utils.FormatUnit(self.target, "h"),
1431
                                    utils.FormatUnit(self.disk.size, "h")),
1432
                                   errors.ECODE_STATE)
1433
    else:
1434
      self.delta = self.op.amount
1435
      self.target = self.disk.size + self.delta
1436
      if self.delta < 0:
1437
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1438
                                   utils.FormatUnit(self.delta, "h"),
1439
                                   errors.ECODE_INVAL)
1440

    
1441
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1442

    
1443
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1444
    template = self.instance.disk_template
1445
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1446
        not any(self.node_es_flags.values())):
1447
      # TODO: check the free disk space for file, when that feature will be
1448
      # supported
1449
      # With exclusive storage we need to do something smarter than just looking
1450
      # at free space, which, in the end, is basically a dry run. So we rely on
1451
      # the dry run performed in Exec() instead.
1452
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1453

    
1454
  def Exec(self, feedback_fn):
1455
    """Execute disk grow.
1456

1457
    """
1458
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1459
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1460
            self.owned_locks(locking.LEVEL_NODE_RES))
1461

    
1462
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1463

    
1464
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1465
    if not disks_ok:
1466
      raise errors.OpExecError("Cannot activate block device to grow")
1467

    
1468
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1469
                (self.op.disk, self.instance.name,
1470
                 utils.FormatUnit(self.delta, "h"),
1471
                 utils.FormatUnit(self.target, "h")))
1472

    
1473
    # First run all grow ops in dry-run mode
1474
    for node_uuid in self.instance.all_nodes:
1475
      result = self.rpc.call_blockdev_grow(node_uuid,
1476
                                           (self.disk, self.instance),
1477
                                           self.delta, True, True,
1478
                                           self.node_es_flags[node_uuid])
1479
      result.Raise("Dry-run grow request failed to node %s" %
1480
                   self.cfg.GetNodeName(node_uuid))
1481

    
1482
    if wipe_disks:
1483
      # Get disk size from primary node for wiping
1484
      result = self.rpc.call_blockdev_getdimensions(
1485
                 self.instance.primary_node, [([self.disk], self.instance)])
1486
      result.Raise("Failed to retrieve disk size from node '%s'" %
1487
                   self.instance.primary_node)
1488

    
1489
      (disk_dimensions, ) = result.payload
1490

    
1491
      if disk_dimensions is None:
1492
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1493
                                 " node '%s'" % self.instance.primary_node)
1494
      (disk_size_in_bytes, _) = disk_dimensions
1495

    
1496
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1497

    
1498
      assert old_disk_size >= self.disk.size, \
1499
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1500
         (old_disk_size, self.disk.size))
1501
    else:
1502
      old_disk_size = None
1503

    
1504
    # We know that (as far as we can test) operations across different
1505
    # nodes will succeed, time to run it for real on the backing storage
1506
    for node_uuid in self.instance.all_nodes:
1507
      result = self.rpc.call_blockdev_grow(node_uuid,
1508
                                           (self.disk, self.instance),
1509
                                           self.delta, False, True,
1510
                                           self.node_es_flags[node_uuid])
1511
      result.Raise("Grow request failed to node %s" %
1512
                   self.cfg.GetNodeName(node_uuid))
1513

    
1514
    # And now execute it for logical storage, on the primary node
1515
    node_uuid = self.instance.primary_node
1516
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1517
                                         self.delta, False, False,
1518
                                         self.node_es_flags[node_uuid])
1519
    result.Raise("Grow request failed to node %s" %
1520
                 self.cfg.GetNodeName(node_uuid))
1521

    
1522
    self.disk.RecordGrow(self.delta)
1523
    self.cfg.Update(self.instance, feedback_fn)
1524

    
1525
    # Changes have been recorded, release node lock
1526
    ReleaseLocks(self, locking.LEVEL_NODE)
1527

    
1528
    # Downgrade lock while waiting for sync
1529
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1530

    
1531
    assert wipe_disks ^ (old_disk_size is None)
1532

    
1533
    if wipe_disks:
1534
      assert self.instance.disks[self.op.disk] == self.disk
1535

    
1536
      # Wipe newly added disk space
1537
      WipeDisks(self, self.instance,
1538
                disks=[(self.op.disk, self.disk, old_disk_size)])
1539

    
1540
    if self.op.wait_for_sync:
1541
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1542
      if disk_abort:
1543
        self.LogWarning("Disk syncing has not returned a good status; check"
1544
                        " the instance")
1545
      if not self.instance.disks_active:
1546
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1547
    elif not self.instance.disks_active:
1548
      self.LogWarning("Not shutting down the disk even if the instance is"
1549
                      " not supposed to be running because no wait for"
1550
                      " sync mode was requested")
1551

    
1552
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1553
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1554

    
1555

    
1556
class LUInstanceReplaceDisks(LogicalUnit):
1557
  """Replace the disks of an instance.
1558

1559
  """
1560
  HPATH = "mirrors-replace"
1561
  HTYPE = constants.HTYPE_INSTANCE
1562
  REQ_BGL = False
1563

    
1564
  def CheckArguments(self):
1565
    """Check arguments.
1566

1567
    """
1568
    if self.op.mode == constants.REPLACE_DISK_CHG:
1569
      if self.op.remote_node is None and self.op.iallocator is None:
1570
        raise errors.OpPrereqError("When changing the secondary either an"
1571
                                   " iallocator script must be used or the"
1572
                                   " new node given", errors.ECODE_INVAL)
1573
      else:
1574
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1575

    
1576
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1577
      # Not replacing the secondary
1578
      raise errors.OpPrereqError("The iallocator and new node options can"
1579
                                 " only be used when changing the"
1580
                                 " secondary node", errors.ECODE_INVAL)
1581

    
1582
  def ExpandNames(self):
1583
    self._ExpandAndLockInstance()
1584

    
1585
    assert locking.LEVEL_NODE not in self.needed_locks
1586
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1587
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1588

    
1589
    assert self.op.iallocator is None or self.op.remote_node is None, \
1590
      "Conflicting options"
1591

    
1592
    if self.op.remote_node is not None:
1593
      (self.op.remote_node_uuid, self.op.remote_node) = \
1594
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1595
                              self.op.remote_node)
1596

    
1597
      # Warning: do not remove the locking of the new secondary here
1598
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1599
      # currently it doesn't since parallel invocations of
1600
      # FindUnusedMinor will conflict
1601
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1602
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1603
    else:
1604
      self.needed_locks[locking.LEVEL_NODE] = []
1605
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1606

    
1607
      if self.op.iallocator is not None:
1608
        # iallocator will select a new node in the same group
1609
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1610
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1611

    
1612
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1613

    
1614
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1615
                                   self.op.instance_name, self.op.mode,
1616
                                   self.op.iallocator, self.op.remote_node_uuid,
1617
                                   self.op.disks, self.op.early_release,
1618
                                   self.op.ignore_ipolicy)
1619

    
1620
    self.tasklets = [self.replacer]
1621

    
1622
  def DeclareLocks(self, level):
1623
    if level == locking.LEVEL_NODEGROUP:
1624
      assert self.op.remote_node_uuid is None
1625
      assert self.op.iallocator is not None
1626
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1627

    
1628
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1629
      # Lock all groups used by instance optimistically; this requires going
1630
      # via the node before it's locked, requiring verification later on
1631
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1632
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1633

    
1634
    elif level == locking.LEVEL_NODE:
1635
      if self.op.iallocator is not None:
1636
        assert self.op.remote_node_uuid is None
1637
        assert not self.needed_locks[locking.LEVEL_NODE]
1638
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1639

    
1640
        # Lock member nodes of all locked groups
1641
        self.needed_locks[locking.LEVEL_NODE] = \
1642
          [node_uuid
1643
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1644
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1645
      else:
1646
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1647

    
1648
        self._LockInstancesNodes()
1649

    
1650
    elif level == locking.LEVEL_NODE_RES:
1651
      # Reuse node locks
1652
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1653
        self.needed_locks[locking.LEVEL_NODE]
1654

    
1655
  def BuildHooksEnv(self):
1656
    """Build hooks env.
1657

1658
    This runs on the master, the primary and all the secondaries.
1659

1660
    """
1661
    instance = self.replacer.instance
1662
    env = {
1663
      "MODE": self.op.mode,
1664
      "NEW_SECONDARY": self.op.remote_node,
1665
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1666
      }
1667
    env.update(BuildInstanceHookEnvByObject(self, instance))
1668
    return env
1669

    
1670
  def BuildHooksNodes(self):
1671
    """Build hooks nodes.
1672

1673
    """
1674
    instance = self.replacer.instance
1675
    nl = [
1676
      self.cfg.GetMasterNode(),
1677
      instance.primary_node,
1678
      ]
1679
    if self.op.remote_node_uuid is not None:
1680
      nl.append(self.op.remote_node_uuid)
1681
    return nl, nl
1682

    
1683
  def CheckPrereq(self):
1684
    """Check prerequisites.
1685

1686
    """
1687
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1688
            self.op.iallocator is None)
1689

    
1690
    # Verify if node group locks are still correct
1691
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1692
    if owned_groups:
1693
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1694

    
1695
    return LogicalUnit.CheckPrereq(self)
1696

    
1697

    
1698
class LUInstanceActivateDisks(NoHooksLU):
1699
  """Bring up an instance's disks.
1700

1701
  """
1702
  REQ_BGL = False
1703

    
1704
  def ExpandNames(self):
1705
    self._ExpandAndLockInstance()
1706
    self.needed_locks[locking.LEVEL_NODE] = []
1707
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1708

    
1709
  def DeclareLocks(self, level):
1710
    if level == locking.LEVEL_NODE:
1711
      self._LockInstancesNodes()
1712

    
1713
  def CheckPrereq(self):
1714
    """Check prerequisites.
1715

1716
    This checks that the instance is in the cluster.
1717

1718
    """
1719
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1720
    assert self.instance is not None, \
1721
      "Cannot retrieve locked instance %s" % self.op.instance_name
1722
    CheckNodeOnline(self, self.instance.primary_node)
1723

    
1724
  def Exec(self, feedback_fn):
1725
    """Activate the disks.
1726

1727
    """
1728
    disks_ok, disks_info = \
1729
              AssembleInstanceDisks(self, self.instance,
1730
                                    ignore_size=self.op.ignore_size)
1731
    if not disks_ok:
1732
      raise errors.OpExecError("Cannot activate block devices")
1733

    
1734
    if self.op.wait_for_sync:
1735
      if not WaitForSync(self, self.instance):
1736
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1737
        raise errors.OpExecError("Some disks of the instance are degraded!")
1738

    
1739
    return disks_info
1740

    
1741

    
1742
class LUInstanceDeactivateDisks(NoHooksLU):
1743
  """Shutdown an instance's disks.
1744

1745
  """
1746
  REQ_BGL = False
1747

    
1748
  def ExpandNames(self):
1749
    self._ExpandAndLockInstance()
1750
    self.needed_locks[locking.LEVEL_NODE] = []
1751
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1752

    
1753
  def DeclareLocks(self, level):
1754
    if level == locking.LEVEL_NODE:
1755
      self._LockInstancesNodes()
1756

    
1757
  def CheckPrereq(self):
1758
    """Check prerequisites.
1759

1760
    This checks that the instance is in the cluster.
1761

1762
    """
1763
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1764
    assert self.instance is not None, \
1765
      "Cannot retrieve locked instance %s" % self.op.instance_name
1766

    
1767
  def Exec(self, feedback_fn):
1768
    """Deactivate the disks
1769

1770
    """
1771
    if self.op.force:
1772
      ShutdownInstanceDisks(self, self.instance)
1773
    else:
1774
      _SafeShutdownInstanceDisks(self, self.instance)
1775

    
1776

    
1777
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1778
                               ldisk=False):
1779
  """Check that mirrors are not degraded.
1780

1781
  @attention: The device has to be annotated already.
1782

1783
  The ldisk parameter, if True, will change the test from the
1784
  is_degraded attribute (which represents overall non-ok status for
1785
  the device(s)) to the ldisk (representing the local storage status).
1786

1787
  """
1788
  result = True
1789

    
1790
  if on_primary or dev.AssembleOnSecondary():
1791
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1792
    msg = rstats.fail_msg
1793
    if msg:
1794
      lu.LogWarning("Can't find disk on node %s: %s",
1795
                    lu.cfg.GetNodeName(node_uuid), msg)
1796
      result = False
1797
    elif not rstats.payload:
1798
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1799
      result = False
1800
    else:
1801
      if ldisk:
1802
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1803
      else:
1804
        result = result and not rstats.payload.is_degraded
1805

    
1806
  if dev.children:
1807
    for child in dev.children:
1808
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1809
                                                     node_uuid, on_primary)
1810

    
1811
  return result
1812

    
1813

    
1814
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1815
  """Wrapper around L{_CheckDiskConsistencyInner}.
1816

1817
  """
1818
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1819
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1820
                                    ldisk=ldisk)
1821

    
1822

    
1823
def _BlockdevFind(lu, node_uuid, dev, instance):
1824
  """Wrapper around call_blockdev_find to annotate diskparams.
1825

1826
  @param lu: A reference to the lu object
1827
  @param node_uuid: The node to call out
1828
  @param dev: The device to find
1829
  @param instance: The instance object the device belongs to
1830
  @returns The result of the rpc call
1831

1832
  """
1833
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1834
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1835

    
1836

    
1837
def _GenerateUniqueNames(lu, exts):
1838
  """Generate a suitable LV name.
1839

1840
  This will generate a logical volume name for the given instance.
1841

1842
  """
1843
  results = []
1844
  for val in exts:
1845
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1846
    results.append("%s%s" % (new_id, val))
1847
  return results
1848

    
1849

    
1850
class TLReplaceDisks(Tasklet):
1851
  """Replaces disks for an instance.
1852

1853
  Note: Locking is not within the scope of this class.
1854

1855
  """
1856
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1857
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1858
    """Initializes this class.
1859

1860
    """
1861
    Tasklet.__init__(self, lu)
1862

    
1863
    # Parameters
1864
    self.instance_uuid = instance_uuid
1865
    self.instance_name = instance_name
1866
    self.mode = mode
1867
    self.iallocator_name = iallocator_name
1868
    self.remote_node_uuid = remote_node_uuid
1869
    self.disks = disks
1870
    self.early_release = early_release
1871
    self.ignore_ipolicy = ignore_ipolicy
1872

    
1873
    # Runtime data
1874
    self.instance = None
1875
    self.new_node_uuid = None
1876
    self.target_node_uuid = None
1877
    self.other_node_uuid = None
1878
    self.remote_node_info = None
1879
    self.node_secondary_ip = None
1880

    
1881
  @staticmethod
1882
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1883
                    relocate_from_node_uuids):
1884
    """Compute a new secondary node using an IAllocator.
1885

1886
    """
1887
    req = iallocator.IAReqRelocate(
1888
          inst_uuid=instance_uuid,
1889
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1890
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1891

    
1892
    ial.Run(iallocator_name)
1893

    
1894
    if not ial.success:
1895
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1896
                                 " %s" % (iallocator_name, ial.info),
1897
                                 errors.ECODE_NORES)
1898

    
1899
    remote_node_name = ial.result[0]
1900
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1901

    
1902
    if remote_node is None:
1903
      raise errors.OpPrereqError("Node %s not found in configuration" %
1904
                                 remote_node_name, errors.ECODE_NOENT)
1905

    
1906
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1907
               instance_uuid, remote_node_name)
1908

    
1909
    return remote_node.uuid
1910

    
1911
  def _FindFaultyDisks(self, node_uuid):
1912
    """Wrapper for L{FindFaultyInstanceDisks}.
1913

1914
    """
1915
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1916
                                   node_uuid, True)
1917

    
1918
  def _CheckDisksActivated(self, instance):
1919
    """Checks if the instance disks are activated.
1920

1921
    @param instance: The instance to check disks
1922
    @return: True if they are activated, False otherwise
1923

1924
    """
1925
    node_uuids = instance.all_nodes
1926

    
1927
    for idx, dev in enumerate(instance.disks):
1928
      for node_uuid in node_uuids:
1929
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1930
                        self.cfg.GetNodeName(node_uuid))
1931

    
1932
        result = _BlockdevFind(self, node_uuid, dev, instance)
1933

    
1934
        if result.offline:
1935
          continue
1936
        elif result.fail_msg or not result.payload:
1937
          return False
1938

    
1939
    return True
1940

    
1941
  def CheckPrereq(self):
1942
    """Check prerequisites.
1943

1944
    This checks that the instance is in the cluster.
1945

1946
    """
1947
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1948
    assert self.instance is not None, \
1949
      "Cannot retrieve locked instance %s" % self.instance_name
1950

    
1951
    if self.instance.disk_template != constants.DT_DRBD8:
1952
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1953
                                 " instances", errors.ECODE_INVAL)
1954

    
1955
    if len(self.instance.secondary_nodes) != 1:
1956
      raise errors.OpPrereqError("The instance has a strange layout,"
1957
                                 " expected one secondary but found %d" %
1958
                                 len(self.instance.secondary_nodes),
1959
                                 errors.ECODE_FAULT)
1960

    
1961
    secondary_node_uuid = self.instance.secondary_nodes[0]
1962

    
1963
    if self.iallocator_name is None:
1964
      remote_node_uuid = self.remote_node_uuid
1965
    else:
1966
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1967
                                            self.instance.uuid,
1968
                                            self.instance.secondary_nodes)
1969

    
1970
    if remote_node_uuid is None:
1971
      self.remote_node_info = None
1972
    else:
1973
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1974
             "Remote node '%s' is not locked" % remote_node_uuid
1975

    
1976
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1977
      assert self.remote_node_info is not None, \
1978
        "Cannot retrieve locked node %s" % remote_node_uuid
1979

    
1980
    if remote_node_uuid == self.instance.primary_node:
1981
      raise errors.OpPrereqError("The specified node is the primary node of"
1982
                                 " the instance", errors.ECODE_INVAL)
1983

    
1984
    if remote_node_uuid == secondary_node_uuid:
1985
      raise errors.OpPrereqError("The specified node is already the"
1986
                                 " secondary node of the instance",
1987
                                 errors.ECODE_INVAL)
1988

    
1989
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1990
                                    constants.REPLACE_DISK_CHG):
1991
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1992
                                 errors.ECODE_INVAL)
1993

    
1994
    if self.mode == constants.REPLACE_DISK_AUTO:
1995
      if not self._CheckDisksActivated(self.instance):
1996
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1997
                                   " first" % self.instance_name,
1998
                                   errors.ECODE_STATE)
1999
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2000
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2001

    
2002
      if faulty_primary and faulty_secondary:
2003
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2004
                                   " one node and can not be repaired"
2005
                                   " automatically" % self.instance_name,
2006
                                   errors.ECODE_STATE)
2007

    
2008
      if faulty_primary:
2009
        self.disks = faulty_primary
2010
        self.target_node_uuid = self.instance.primary_node
2011
        self.other_node_uuid = secondary_node_uuid
2012
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2013
      elif faulty_secondary:
2014
        self.disks = faulty_secondary
2015
        self.target_node_uuid = secondary_node_uuid
2016
        self.other_node_uuid = self.instance.primary_node
2017
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2018
      else:
2019
        self.disks = []
2020
        check_nodes = []
2021

    
2022
    else:
2023
      # Non-automatic modes
2024
      if self.mode == constants.REPLACE_DISK_PRI:
2025
        self.target_node_uuid = self.instance.primary_node
2026
        self.other_node_uuid = secondary_node_uuid
2027
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2028

    
2029
      elif self.mode == constants.REPLACE_DISK_SEC:
2030
        self.target_node_uuid = secondary_node_uuid
2031
        self.other_node_uuid = self.instance.primary_node
2032
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2033

    
2034
      elif self.mode == constants.REPLACE_DISK_CHG:
2035
        self.new_node_uuid = remote_node_uuid
2036
        self.other_node_uuid = self.instance.primary_node
2037
        self.target_node_uuid = secondary_node_uuid
2038
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2039

    
2040
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2041
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2042

    
2043
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2044
        assert old_node_info is not None
2045
        if old_node_info.offline and not self.early_release:
2046
          # doesn't make sense to delay the release
2047
          self.early_release = True
2048
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2049
                          " early-release mode", secondary_node_uuid)
2050

    
2051
      else:
2052
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2053
                                     self.mode)
2054

    
2055
      # If not specified all disks should be replaced
2056
      if not self.disks:
2057
        self.disks = range(len(self.instance.disks))
2058

    
2059
    # TODO: This is ugly, but right now we can't distinguish between internal
2060
    # submitted opcode and external one. We should fix that.
2061
    if self.remote_node_info:
2062
      # We change the node, lets verify it still meets instance policy
2063
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2064
      cluster = self.cfg.GetClusterInfo()
2065
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2066
                                                              new_group_info)
2067
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2068
                             self.remote_node_info, self.cfg,
2069
                             ignore=self.ignore_ipolicy)
2070

    
2071
    for node_uuid in check_nodes:
2072
      CheckNodeOnline(self.lu, node_uuid)
2073

    
2074
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2075
                                                          self.other_node_uuid,
2076
                                                          self.target_node_uuid]
2077
                              if node_uuid is not None)
2078

    
2079
    # Release unneeded node and node resource locks
2080
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2081
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2082
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2083

    
2084
    # Release any owned node group
2085
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2086

    
2087
    # Check whether disks are valid
2088
    for disk_idx in self.disks:
2089
      self.instance.FindDisk(disk_idx)
2090

    
2091
    # Get secondary node IP addresses
2092
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2093
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2094

    
2095
  def Exec(self, feedback_fn):
2096
    """Execute disk replacement.
2097

2098
    This dispatches the disk replacement to the appropriate handler.
2099

2100
    """
2101
    if __debug__:
2102
      # Verify owned locks before starting operation
2103
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2104
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2105
          ("Incorrect node locks, owning %s, expected %s" %
2106
           (owned_nodes, self.node_secondary_ip.keys()))
2107
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2108
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2109
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2110

    
2111
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2112
      assert list(owned_instances) == [self.instance_name], \
2113
          "Instance '%s' not locked" % self.instance_name
2114

    
2115
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2116
          "Should not own any node group lock at this point"
2117

    
2118
    if not self.disks:
2119
      feedback_fn("No disks need replacement for instance '%s'" %
2120
                  self.instance.name)
2121
      return
2122

    
2123
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2124
                (utils.CommaJoin(self.disks), self.instance.name))
2125
    feedback_fn("Current primary node: %s" %
2126
                self.cfg.GetNodeName(self.instance.primary_node))
2127
    feedback_fn("Current seconary node: %s" %
2128
                utils.CommaJoin(self.cfg.GetNodeNames(
2129
                                  self.instance.secondary_nodes)))
2130

    
2131
    activate_disks = not self.instance.disks_active
2132

    
2133
    # Activate the instance disks if we're replacing them on a down instance
2134
    if activate_disks:
2135
      StartInstanceDisks(self.lu, self.instance, True)
2136

    
2137
    try:
2138
      # Should we replace the secondary node?
2139
      if self.new_node_uuid is not None:
2140
        fn = self._ExecDrbd8Secondary
2141
      else:
2142
        fn = self._ExecDrbd8DiskOnly
2143

    
2144
      result = fn(feedback_fn)
2145
    finally:
2146
      # Deactivate the instance disks if we're replacing them on a
2147
      # down instance
2148
      if activate_disks:
2149
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2150

    
2151
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2152

    
2153
    if __debug__:
2154
      # Verify owned locks
2155
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2156
      nodes = frozenset(self.node_secondary_ip)
2157
      assert ((self.early_release and not owned_nodes) or
2158
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2159
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2160
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2161

    
2162
    return result
2163

    
2164
  def _CheckVolumeGroup(self, node_uuids):
2165
    self.lu.LogInfo("Checking volume groups")
2166

    
2167
    vgname = self.cfg.GetVGName()
2168

    
2169
    # Make sure volume group exists on all involved nodes
2170
    results = self.rpc.call_vg_list(node_uuids)
2171
    if not results:
2172
      raise errors.OpExecError("Can't list volume groups on the nodes")
2173

    
2174
    for node_uuid in node_uuids:
2175
      res = results[node_uuid]
2176
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2177
      if vgname not in res.payload:
2178
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2179
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2180

    
2181
  def _CheckDisksExistence(self, node_uuids):
2182
    # Check disk existence
2183
    for idx, dev in enumerate(self.instance.disks):
2184
      if idx not in self.disks:
2185
        continue
2186

    
2187
      for node_uuid in node_uuids:
2188
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2189
                        self.cfg.GetNodeName(node_uuid))
2190

    
2191
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2192

    
2193
        msg = result.fail_msg
2194
        if msg or not result.payload:
2195
          if not msg:
2196
            msg = "disk not found"
2197
          if not self._CheckDisksActivated(self.instance):
2198
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2199
                          " running activate-disks on the instance before"
2200
                          " using replace-disks.")
2201
          else:
2202
            extra_hint = ""
2203
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2204
                                   (idx, self.cfg.GetNodeName(node_uuid), msg,
2205
                                    extra_hint))
2206

    
2207
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2208
    for idx, dev in enumerate(self.instance.disks):
2209
      if idx not in self.disks:
2210
        continue
2211

    
2212
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2213
                      (idx, self.cfg.GetNodeName(node_uuid)))
2214

    
2215
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2216
                                  on_primary, ldisk=ldisk):
2217
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2218
                                 " replace disks for instance %s" %
2219
                                 (self.cfg.GetNodeName(node_uuid),
2220
                                  self.instance.name))
2221

    
2222
  def _CreateNewStorage(self, node_uuid):
2223
    """Create new storage on the primary or secondary node.
2224

2225
    This is only used for same-node replaces, not for changing the
2226
    secondary node, hence we don't want to modify the existing disk.
2227

2228
    """
2229
    iv_names = {}
2230

    
2231
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2232
    for idx, dev in enumerate(disks):
2233
      if idx not in self.disks:
2234
        continue
2235

    
2236
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2237
                      self.cfg.GetNodeName(node_uuid), idx)
2238

    
2239
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2240
      names = _GenerateUniqueNames(self.lu, lv_names)
2241

    
2242
      (data_disk, meta_disk) = dev.children
2243
      vg_data = data_disk.logical_id[0]
2244
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2245
                             logical_id=(vg_data, names[0]),
2246
                             params=data_disk.params)
2247
      vg_meta = meta_disk.logical_id[0]
2248
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2249
                             size=constants.DRBD_META_SIZE,
2250
                             logical_id=(vg_meta, names[1]),
2251
                             params=meta_disk.params)
2252

    
2253
      new_lvs = [lv_data, lv_meta]
2254
      old_lvs = [child.Copy() for child in dev.children]
2255
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2256
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2257

    
2258
      # we pass force_create=True to force the LVM creation
2259
      for new_lv in new_lvs:
2260
        try:
2261
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2262
                               GetInstanceInfoText(self.instance), False,
2263
                               excl_stor)
2264
        except errors.DeviceCreationError, e:
2265
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2266

    
2267
    return iv_names
2268

    
2269
  def _CheckDevices(self, node_uuid, iv_names):
2270
    for name, (dev, _, _) in iv_names.iteritems():
2271
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2272

    
2273
      msg = result.fail_msg
2274
      if msg or not result.payload:
2275
        if not msg:
2276
          msg = "disk not found"
2277
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2278
                                 (name, msg))
2279

    
2280
      if result.payload.is_degraded:
2281
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2282

    
2283
  def _RemoveOldStorage(self, node_uuid, iv_names):
2284
    for name, (_, old_lvs, _) in iv_names.iteritems():
2285
      self.lu.LogInfo("Remove logical volumes for %s", name)
2286

    
2287
      for lv in old_lvs:
2288
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2289
                .fail_msg
2290
        if msg:
2291
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2292
                             hint="remove unused LVs manually")
2293

    
2294
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2295
    """Replace a disk on the primary or secondary for DRBD 8.
2296

2297
    The algorithm for replace is quite complicated:
2298

2299
      1. for each disk to be replaced:
2300

2301
        1. create new LVs on the target node with unique names
2302
        1. detach old LVs from the drbd device
2303
        1. rename old LVs to name_replaced.<time_t>
2304
        1. rename new LVs to old LVs
2305
        1. attach the new LVs (with the old names now) to the drbd device
2306

2307
      1. wait for sync across all devices
2308

2309
      1. for each modified disk:
2310

2311
        1. remove old LVs (which have the name name_replaces.<time_t>)
2312

2313
    Failures are not very well handled.
2314

2315
    """
2316
    steps_total = 6
2317

    
2318
    # Step: check device activation
2319
    self.lu.LogStep(1, steps_total, "Check device existence")
2320
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2321
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2322

    
2323
    # Step: check other node consistency
2324
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2325
    self._CheckDisksConsistency(
2326
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2327
      False)
2328

    
2329
    # Step: create new storage
2330
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2331
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2332

    
2333
    # Step: for each lv, detach+rename*2+attach
2334
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2335
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2336
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2337

    
2338
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2339
                                                     (dev, self.instance),
2340
                                                     (old_lvs, self.instance))
2341
      result.Raise("Can't detach drbd from local storage on node"
2342
                   " %s for device %s" %
2343
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2344
      #dev.children = []
2345
      #cfg.Update(instance)
2346

    
2347
      # ok, we created the new LVs, so now we know we have the needed
2348
      # storage; as such, we proceed on the target node to rename
2349
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2350
      # using the assumption that logical_id == unique_id on that node
2351

    
2352
      # FIXME(iustin): use a better name for the replaced LVs
2353
      temp_suffix = int(time.time())
2354
      ren_fn = lambda d, suff: (d.logical_id[0],
2355
                                d.logical_id[1] + "_replaced-%s" % suff)
2356

    
2357
      # Build the rename list based on what LVs exist on the node
2358
      rename_old_to_new = []
2359
      for to_ren in old_lvs:
2360
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
2361
                                             (to_ren, self.instance))
2362
        if not result.fail_msg and result.payload:
2363
          # device exists
2364
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2365

    
2366
      self.lu.LogInfo("Renaming the old LVs on the target node")
2367
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2368
                                             rename_old_to_new)
2369
      result.Raise("Can't rename old LVs on node %s" %
2370
                   self.cfg.GetNodeName(self.target_node_uuid))
2371

    
2372
      # Now we rename the new LVs to the old LVs
2373
      self.lu.LogInfo("Renaming the new LVs on the target node")
2374
      rename_new_to_old = [(new, old.logical_id)
2375
                           for old, new in zip(old_lvs, new_lvs)]
2376
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2377
                                             rename_new_to_old)
2378
      result.Raise("Can't rename new LVs on node %s" %
2379
                   self.cfg.GetNodeName(self.target_node_uuid))
2380

    
2381
      # Intermediate steps of in memory modifications
2382
      for old, new in zip(old_lvs, new_lvs):
2383
        new.logical_id = old.logical_id
2384

    
2385
      # We need to modify old_lvs so that removal later removes the
2386
      # right LVs, not the newly added ones; note that old_lvs is a
2387
      # copy here
2388
      for disk in old_lvs:
2389
        disk.logical_id = ren_fn(disk, temp_suffix)
2390

    
2391
      # Now that the new lvs have the old name, we can add them to the device
2392
      self.lu.LogInfo("Adding new mirror component on %s",
2393
                      self.cfg.GetNodeName(self.target_node_uuid))
2394
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2395
                                                  (dev, self.instance),
2396
                                                  (new_lvs, self.instance))
2397
      msg = result.fail_msg
2398
      if msg:
2399
        for new_lv in new_lvs:
2400
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2401
                                               (new_lv, self.instance)).fail_msg
2402
          if msg2:
2403
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2404
                               hint=("cleanup manually the unused logical"
2405
                                     "volumes"))
2406
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2407

    
2408
    cstep = itertools.count(5)
2409

    
2410
    if self.early_release:
2411
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2412
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2413
      # TODO: Check if releasing locks early still makes sense
2414
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2415
    else:
2416
      # Release all resource locks except those used by the instance
2417
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2418
                   keep=self.node_secondary_ip.keys())
2419

    
2420
    # Release all node locks while waiting for sync
2421
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2422

    
2423
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2424
    # shutdown in the caller into consideration.
2425

    
2426
    # Wait for sync
2427
    # This can fail as the old devices are degraded and _WaitForSync
2428
    # does a combined result over all disks, so we don't check its return value
2429
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2430
    WaitForSync(self.lu, self.instance)
2431

    
2432
    # Check all devices manually
2433
    self._CheckDevices(self.instance.primary_node, iv_names)
2434

    
2435
    # Step: remove old storage
2436
    if not self.early_release:
2437
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2438
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2439

    
2440
  def _ExecDrbd8Secondary(self, feedback_fn):
2441
    """Replace the secondary node for DRBD 8.
2442

2443
    The algorithm for replace is quite complicated:
2444
      - for all disks of the instance:
2445
        - create new LVs on the new node with same names
2446
        - shutdown the drbd device on the old secondary
2447
        - disconnect the drbd network on the primary
2448
        - create the drbd device on the new secondary
2449
        - network attach the drbd on the primary, using an artifice:
2450
          the drbd code for Attach() will connect to the network if it
2451
          finds a device which is connected to the good local disks but
2452
          not network enabled
2453
      - wait for sync across all devices
2454
      - remove all disks from the old secondary
2455

2456
    Failures are not very well handled.
2457

2458
    """
2459
    steps_total = 6
2460

    
2461
    pnode = self.instance.primary_node
2462

    
2463
    # Step: check device activation
2464
    self.lu.LogStep(1, steps_total, "Check device existence")
2465
    self._CheckDisksExistence([self.instance.primary_node])
2466
    self._CheckVolumeGroup([self.instance.primary_node])
2467

    
2468
    # Step: check other node consistency
2469
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2470
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2471

    
2472
    # Step: create new storage
2473
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2474
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2475
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2476
                                                  self.new_node_uuid)
2477
    for idx, dev in enumerate(disks):
2478
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2479
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2480
      # we pass force_create=True to force LVM creation
2481
      for new_lv in dev.children:
2482
        try:
2483
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2484
                               new_lv, True, GetInstanceInfoText(self.instance),
2485
                               False, excl_stor)
2486
        except errors.DeviceCreationError, e:
2487
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2488

    
2489
    # Step 4: dbrd minors and drbd setups changes
2490
    # after this, we must manually remove the drbd minors on both the
2491
    # error and the success paths
2492
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2493
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2494
                                         for _ in self.instance.disks],
2495
                                        self.instance.uuid)
2496
    logging.debug("Allocated minors %r", minors)
2497

    
2498
    iv_names = {}
2499
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2500
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2501
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2502
      # create new devices on new_node; note that we create two IDs:
2503
      # one without port, so the drbd will be activated without
2504
      # networking information on the new node at this stage, and one
2505
      # with network, for the latter activation in step 4
2506
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2507
      if self.instance.primary_node == o_node1:
2508
        p_minor = o_minor1
2509
      else:
2510
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2511
        p_minor = o_minor2
2512

    
2513
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2514
                      p_minor, new_minor, o_secret)
2515
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2516
                    p_minor, new_minor, o_secret)
2517

    
2518
      iv_names[idx] = (dev, dev.children, new_net_id)
2519
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2520
                    new_net_id)
2521
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2522
                              logical_id=new_alone_id,
2523
                              children=dev.children,
2524
                              size=dev.size,
2525
                              params={})
2526
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2527
                                            self.cfg)
2528
      try:
2529
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2530
                             anno_new_drbd,
2531
                             GetInstanceInfoText(self.instance), False,
2532
                             excl_stor)
2533
      except errors.GenericError:
2534
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2535
        raise
2536

    
2537
    # We have new devices, shutdown the drbd on the old secondary
2538
    for idx, dev in enumerate(self.instance.disks):
2539
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2540
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2541
                                            (dev, self.instance)).fail_msg
2542
      if msg:
2543
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2544
                           "node: %s" % (idx, msg),
2545
                           hint=("Please cleanup this device manually as"
2546
                                 " soon as possible"))
2547

    
2548
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2549
    result = self.rpc.call_drbd_disconnect_net(
2550
               [pnode], (self.instance.disks, self.instance))[pnode]
2551

    
2552
    msg = result.fail_msg
2553
    if msg:
2554
      # detaches didn't succeed (unlikely)
2555
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2556
      raise errors.OpExecError("Can't detach the disks from the network on"
2557
                               " old node: %s" % (msg,))
2558

    
2559
    # if we managed to detach at least one, we update all the disks of
2560
    # the instance to point to the new secondary
2561
    self.lu.LogInfo("Updating instance configuration")
2562
    for dev, _, new_logical_id in iv_names.itervalues():
2563
      dev.logical_id = new_logical_id
2564

    
2565
    self.cfg.Update(self.instance, feedback_fn)
2566

    
2567
    # Release all node locks (the configuration has been updated)
2568
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2569

    
2570
    # and now perform the drbd attach
2571
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2572
                    " (standalone => connected)")
2573
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2574
                                            self.new_node_uuid],
2575
                                           (self.instance.disks, self.instance),
2576
                                           self.instance.name,
2577
                                           False)
2578
    for to_node, to_result in result.items():
2579
      msg = to_result.fail_msg
2580
      if msg:
2581
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2582
                           self.cfg.GetNodeName(to_node), msg,
2583
                           hint=("please do a gnt-instance info to see the"
2584
                                 " status of disks"))
2585

    
2586
    cstep = itertools.count(5)
2587

    
2588
    if self.early_release:
2589
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2590
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2591
      # TODO: Check if releasing locks early still makes sense
2592
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2593
    else:
2594
      # Release all resource locks except those used by the instance
2595
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2596
                   keep=self.node_secondary_ip.keys())
2597

    
2598
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2599
    # shutdown in the caller into consideration.
2600

    
2601
    # Wait for sync
2602
    # This can fail as the old devices are degraded and _WaitForSync
2603
    # does a combined result over all disks, so we don't check its return value
2604
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2605
    WaitForSync(self.lu, self.instance)
2606

    
2607
    # Check all devices manually
2608
    self._CheckDevices(self.instance.primary_node, iv_names)
2609

    
2610
    # Step: remove old storage
2611
    if not self.early_release:
2612
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2613
      self._RemoveOldStorage(self.target_node_uuid, iv_names)