Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 4f3bdf5a

History | View | Annotate | Download (98.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import rpc
38
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43
  CheckDiskTemplateEnabled
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  constants.DT_FILE: ".file",
56
  constants.DT_SHARED_FILE: ".sharedfile",
57
  }
58

    
59

    
60
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
61
                         excl_stor):
62
  """Create a single block device on a given node.
63

64
  This will not recurse over children of the device, so they must be
65
  created in advance.
66

67
  @param lu: the lu on whose behalf we execute
68
  @param node_uuid: the node on which to create the device
69
  @type instance: L{objects.Instance}
70
  @param instance: the instance which owns the device
71
  @type device: L{objects.Disk}
72
  @param device: the device to create
73
  @param info: the extra 'metadata' we should attach to the device
74
      (this will be represented as a LVM tag)
75
  @type force_open: boolean
76
  @param force_open: this parameter will be passes to the
77
      L{backend.BlockdevCreate} function where it specifies
78
      whether we run on primary or not, and it affects both
79
      the child assembly and the device own Open() execution
80
  @type excl_stor: boolean
81
  @param excl_stor: Whether exclusive_storage is active for the node
82

83
  """
84
  result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
85
                                       device.size, instance.name, force_open,
86
                                       info, excl_stor)
87
  result.Raise("Can't create block device %s on"
88
               " node %s for instance %s" % (device,
89
                                             lu.cfg.GetNodeName(node_uuid),
90
                                             instance.name))
91

    
92

    
93
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
94
                         info, force_open, excl_stor):
95
  """Create a tree of block devices on a given node.
96

97
  If this device type has to be created on secondaries, create it and
98
  all its children.
99

100
  If not, just recurse to children keeping the same 'force' value.
101

102
  @attention: The device has to be annotated already.
103

104
  @param lu: the lu on whose behalf we execute
105
  @param node_uuid: the node on which to create the device
106
  @type instance: L{objects.Instance}
107
  @param instance: the instance which owns the device
108
  @type device: L{objects.Disk}
109
  @param device: the device to create
110
  @type force_create: boolean
111
  @param force_create: whether to force creation of this device; this
112
      will be change to True whenever we find a device which has
113
      CreateOnSecondary() attribute
114
  @param info: the extra 'metadata' we should attach to the device
115
      (this will be represented as a LVM tag)
116
  @type force_open: boolean
117
  @param force_open: this parameter will be passes to the
118
      L{backend.BlockdevCreate} function where it specifies
119
      whether we run on primary or not, and it affects both
120
      the child assembly and the device own Open() execution
121
  @type excl_stor: boolean
122
  @param excl_stor: Whether exclusive_storage is active for the node
123

124
  @return: list of created devices
125
  """
126
  created_devices = []
127
  try:
128
    if device.CreateOnSecondary():
129
      force_create = True
130

    
131
    if device.children:
132
      for child in device.children:
133
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
134
                                    force_create, info, force_open, excl_stor)
135
        created_devices.extend(devs)
136

    
137
    if not force_create:
138
      return created_devices
139

    
140
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
141
                         excl_stor)
142
    # The device has been completely created, so there is no point in keeping
143
    # its subdevices in the list. We just add the device itself instead.
144
    created_devices = [(node_uuid, device)]
145
    return created_devices
146

    
147
  except errors.DeviceCreationError, e:
148
    e.created_devices.extend(created_devices)
149
    raise e
150
  except errors.OpExecError, e:
151
    raise errors.DeviceCreationError(str(e), created_devices)
152

    
153

    
154
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
155
  """Whether exclusive_storage is in effect for the given node.
156

157
  @type cfg: L{config.ConfigWriter}
158
  @param cfg: The cluster configuration
159
  @type node_uuid: string
160
  @param node_uuid: The node UUID
161
  @rtype: bool
162
  @return: The effective value of exclusive_storage
163
  @raise errors.OpPrereqError: if no node exists with the given name
164

165
  """
166
  ni = cfg.GetNodeInfo(node_uuid)
167
  if ni is None:
168
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
169
                               errors.ECODE_NOENT)
170
  return IsExclusiveStorageEnabledNode(cfg, ni)
171

    
172

    
173
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
174
                    force_open):
175
  """Wrapper around L{_CreateBlockDevInner}.
176

177
  This method annotates the root device first.
178

179
  """
180
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
181
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
182
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
183
                              force_open, excl_stor)
184

    
185

    
186
def _UndoCreateDisks(lu, disks_created, instance):
187
  """Undo the work performed by L{CreateDisks}.
188

189
  This function is called in case of an error to undo the work of
190
  L{CreateDisks}.
191

192
  @type lu: L{LogicalUnit}
193
  @param lu: the logical unit on whose behalf we execute
194
  @param disks_created: the result returned by L{CreateDisks}
195
  @type instance: L{objects.Instance}
196
  @param instance: the instance for which disks were created
197

198
  """
199
  for (node_uuid, disk) in disks_created:
200
    result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
201
    result.Warn("Failed to remove newly-created disk %s on node %s" %
202
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
203

    
204

    
205
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
206
  """Create all disks for an instance.
207

208
  This abstracts away some work from AddInstance.
209

210
  @type lu: L{LogicalUnit}
211
  @param lu: the logical unit on whose behalf we execute
212
  @type instance: L{objects.Instance}
213
  @param instance: the instance whose disks we should create
214
  @type to_skip: list
215
  @param to_skip: list of indices to skip
216
  @type target_node_uuid: string
217
  @param target_node_uuid: if passed, overrides the target node for creation
218
  @type disks: list of {objects.Disk}
219
  @param disks: the disks to create; if not specified, all the disks of the
220
      instance are created
221
  @return: information about the created disks, to be used to call
222
      L{_UndoCreateDisks}
223
  @raise errors.OpPrereqError: in case of error
224

225
  """
226
  info = GetInstanceInfoText(instance)
227
  if target_node_uuid is None:
228
    pnode_uuid = instance.primary_node
229
    all_node_uuids = instance.all_nodes
230
  else:
231
    pnode_uuid = target_node_uuid
232
    all_node_uuids = [pnode_uuid]
233

    
234
  if disks is None:
235
    disks = instance.disks
236

    
237
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
238

    
239
  if instance.disk_template in constants.DTS_FILEBASED:
240
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
241
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
242

    
243
    result.Raise("Failed to create directory '%s' on"
244
                 " node %s" % (file_storage_dir,
245
                               lu.cfg.GetNodeName(pnode_uuid)))
246

    
247
  disks_created = []
248
  for idx, device in enumerate(disks):
249
    if to_skip and idx in to_skip:
250
      continue
251
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
252
    for node_uuid in all_node_uuids:
253
      f_create = node_uuid == pnode_uuid
254
      try:
255
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
256
                        f_create)
257
        disks_created.append((node_uuid, device))
258
      except errors.DeviceCreationError, e:
259
        logging.warning("Creating disk %s for instance '%s' failed",
260
                        idx, instance.name)
261
        disks_created.extend(e.created_devices)
262
        _UndoCreateDisks(lu, disks_created, instance)
263
        raise errors.OpExecError(e.message)
264
  return disks_created
265

    
266

    
267
def ComputeDiskSizePerVG(disk_template, disks):
268
  """Compute disk size requirements in the volume group
269

270
  """
271
  def _compute(disks, payload):
272
    """Universal algorithm.
273

274
    """
275
    vgs = {}
276
    for disk in disks:
277
      vgs[disk[constants.IDISK_VG]] = \
278
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
279

    
280
    return vgs
281

    
282
  # Required free disk space as a function of disk and swap space
283
  req_size_dict = {
284
    constants.DT_DISKLESS: {},
285
    constants.DT_PLAIN: _compute(disks, 0),
286
    # 128 MB are added for drbd metadata for each disk
287
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
288
    constants.DT_FILE: {},
289
    constants.DT_SHARED_FILE: {},
290
    }
291

    
292
  if disk_template not in req_size_dict:
293
    raise errors.ProgrammerError("Disk template '%s' size requirement"
294
                                 " is unknown" % disk_template)
295

    
296
  return req_size_dict[disk_template]
297

    
298

    
299
def ComputeDisks(op, default_vg):
300
  """Computes the instance disks.
301

302
  @param op: The instance opcode
303
  @param default_vg: The default_vg to assume
304

305
  @return: The computed disks
306

307
  """
308
  disks = []
309
  for disk in op.disks:
310
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
311
    if mode not in constants.DISK_ACCESS_SET:
312
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
313
                                 mode, errors.ECODE_INVAL)
314
    size = disk.get(constants.IDISK_SIZE, None)
315
    if size is None:
316
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
317
    try:
318
      size = int(size)
319
    except (TypeError, ValueError):
320
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
321
                                 errors.ECODE_INVAL)
322

    
323
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
324
    if ext_provider and op.disk_template != constants.DT_EXT:
325
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
326
                                 " disk template, not %s" %
327
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
328
                                  op.disk_template), errors.ECODE_INVAL)
329

    
330
    data_vg = disk.get(constants.IDISK_VG, default_vg)
331
    name = disk.get(constants.IDISK_NAME, None)
332
    if name is not None and name.lower() == constants.VALUE_NONE:
333
      name = None
334
    new_disk = {
335
      constants.IDISK_SIZE: size,
336
      constants.IDISK_MODE: mode,
337
      constants.IDISK_VG: data_vg,
338
      constants.IDISK_NAME: name,
339
      }
340

    
341
    for key in [
342
      constants.IDISK_METAVG,
343
      constants.IDISK_ADOPT,
344
      constants.IDISK_SPINDLES,
345
      ]:
346
      if key in disk:
347
        new_disk[key] = disk[key]
348

    
349
    # For extstorage, demand the `provider' option and add any
350
    # additional parameters (ext-params) to the dict
351
    if op.disk_template == constants.DT_EXT:
352
      if ext_provider:
353
        new_disk[constants.IDISK_PROVIDER] = ext_provider
354
        for key in disk:
355
          if key not in constants.IDISK_PARAMS:
356
            new_disk[key] = disk[key]
357
      else:
358
        raise errors.OpPrereqError("Missing provider for template '%s'" %
359
                                   constants.DT_EXT, errors.ECODE_INVAL)
360

    
361
    disks.append(new_disk)
362

    
363
  return disks
364

    
365

    
366
def CheckRADOSFreeSpace():
367
  """Compute disk size requirements inside the RADOS cluster.
368

369
  """
370
  # For the RADOS cluster we assume there is always enough space.
371
  pass
372

    
373

    
374
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
375
                         iv_name, p_minor, s_minor):
376
  """Generate a drbd8 device complete with its children.
377

378
  """
379
  assert len(vgnames) == len(names) == 2
380
  port = lu.cfg.AllocatePort()
381
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
382

    
383
  dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
384
                          logical_id=(vgnames[0], names[0]),
385
                          params={})
386
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
387
  dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
388
                          size=constants.DRBD_META_SIZE,
389
                          logical_id=(vgnames[1], names[1]),
390
                          params={})
391
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392
  drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
393
                          logical_id=(primary_uuid, secondary_uuid, port,
394
                                      p_minor, s_minor,
395
                                      shared_secret),
396
                          children=[dev_data, dev_meta],
397
                          iv_name=iv_name, params={})
398
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
399
  return drbd_dev
400

    
401

    
402
def GenerateDiskTemplate(
403
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
404
  disk_info, file_storage_dir, file_driver, base_index,
405
  feedback_fn, full_disk_params):
406
  """Generate the entire disk layout for a given template type.
407

408
  """
409
  vgname = lu.cfg.GetVGName()
410
  disk_count = len(disk_info)
411
  disks = []
412

    
413
  CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
414

    
415
  if template_name == constants.DT_DISKLESS:
416
    pass
417
  elif template_name == constants.DT_DRBD8:
418
    if len(secondary_node_uuids) != 1:
419
      raise errors.ProgrammerError("Wrong template configuration")
420
    remote_node_uuid = secondary_node_uuids[0]
421
    minors = lu.cfg.AllocateDRBDMinor(
422
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
423

    
424
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
425
                                                       full_disk_params)
426
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
427

    
428
    names = []
429
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
430
                                               for i in range(disk_count)]):
431
      names.append(lv_prefix + "_data")
432
      names.append(lv_prefix + "_meta")
433
    for idx, disk in enumerate(disk_info):
434
      disk_index = idx + base_index
435
      data_vg = disk.get(constants.IDISK_VG, vgname)
436
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
437
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
438
                                      disk[constants.IDISK_SIZE],
439
                                      [data_vg, meta_vg],
440
                                      names[idx * 2:idx * 2 + 2],
441
                                      "disk/%d" % disk_index,
442
                                      minors[idx * 2], minors[idx * 2 + 1])
443
      disk_dev.mode = disk[constants.IDISK_MODE]
444
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
445
      disks.append(disk_dev)
446
  else:
447
    if secondary_node_uuids:
448
      raise errors.ProgrammerError("Wrong template configuration")
449

    
450
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
451
    if name_prefix is None:
452
      names = None
453
    else:
454
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
455
                                        (name_prefix, base_index + i)
456
                                        for i in range(disk_count)])
457

    
458
    if template_name == constants.DT_PLAIN:
459

    
460
      def logical_id_fn(idx, _, disk):
461
        vg = disk.get(constants.IDISK_VG, vgname)
462
        return (vg, names[idx])
463

    
464
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
465
      logical_id_fn = \
466
        lambda _, disk_index, disk: (file_driver,
467
                                     "%s/%s" % (file_storage_dir,
468
                                                names[idx]))
469
    elif template_name == constants.DT_BLOCK:
470
      logical_id_fn = \
471
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
472
                                       disk[constants.IDISK_ADOPT])
473
    elif template_name == constants.DT_RBD:
474
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
475
    elif template_name == constants.DT_EXT:
476
      def logical_id_fn(idx, _, disk):
477
        provider = disk.get(constants.IDISK_PROVIDER, None)
478
        if provider is None:
479
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
480
                                       " not found", constants.DT_EXT,
481
                                       constants.IDISK_PROVIDER)
482
        return (provider, names[idx])
483
    else:
484
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
485

    
486
    dev_type = template_name
487

    
488
    for idx, disk in enumerate(disk_info):
489
      params = {}
490
      # Only for the Ext template add disk_info to params
491
      if template_name == constants.DT_EXT:
492
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
493
        for key in disk:
494
          if key not in constants.IDISK_PARAMS:
495
            params[key] = disk[key]
496
      # Add IDISK_ACCESS param to disk params
497
      if (template_name in constants.DTS_HAVE_ACCESS and
498
          constants.IDISK_ACCESS in disk):
499
        params[constants.IDISK_ACCESS] = disk[constants.IDISK_ACCESS]
500
      disk_index = idx + base_index
501
      size = disk[constants.IDISK_SIZE]
502
      feedback_fn("* disk %s, size %s" %
503
                  (disk_index, utils.FormatUnit(size, "h")))
504
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
505
                              logical_id=logical_id_fn(idx, disk_index, disk),
506
                              iv_name="disk/%d" % disk_index,
507
                              mode=disk[constants.IDISK_MODE],
508
                              params=params,
509
                              spindles=disk.get(constants.IDISK_SPINDLES))
510
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
511
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
512
      disks.append(disk_dev)
513

    
514
  return disks
515

    
516

    
517
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
518
  """Check the presence of the spindle options with exclusive_storage.
519

520
  @type diskdict: dict
521
  @param diskdict: disk parameters
522
  @type es_flag: bool
523
  @param es_flag: the effective value of the exlusive_storage flag
524
  @type required: bool
525
  @param required: whether spindles are required or just optional
526
  @raise errors.OpPrereqError when spindles are given and they should not
527

528
  """
529
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
530
      diskdict[constants.IDISK_SPINDLES] is not None):
531
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
532
                               " when exclusive storage is not active",
533
                               errors.ECODE_INVAL)
534
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
535
                                diskdict[constants.IDISK_SPINDLES] is None)):
536
    raise errors.OpPrereqError("You must specify spindles in instance disks"
537
                               " when exclusive storage is active",
538
                               errors.ECODE_INVAL)
539

    
540

    
541
class LUInstanceRecreateDisks(LogicalUnit):
542
  """Recreate an instance's missing disks.
543

544
  """
545
  HPATH = "instance-recreate-disks"
546
  HTYPE = constants.HTYPE_INSTANCE
547
  REQ_BGL = False
548

    
549
  _MODIFYABLE = compat.UniqueFrozenset([
550
    constants.IDISK_SIZE,
551
    constants.IDISK_MODE,
552
    constants.IDISK_SPINDLES,
553
    ])
554

    
555
  # New or changed disk parameters may have different semantics
556
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
557
    constants.IDISK_ADOPT,
558

    
559
    # TODO: Implement support changing VG while recreating
560
    constants.IDISK_VG,
561
    constants.IDISK_METAVG,
562
    constants.IDISK_PROVIDER,
563
    constants.IDISK_NAME,
564
    constants.IDISK_ACCESS,
565
    ]))
566

    
567
  def _RunAllocator(self):
568
    """Run the allocator based on input opcode.
569

570
    """
571
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
572

    
573
    # FIXME
574
    # The allocator should actually run in "relocate" mode, but current
575
    # allocators don't support relocating all the nodes of an instance at
576
    # the same time. As a workaround we use "allocate" mode, but this is
577
    # suboptimal for two reasons:
578
    # - The instance name passed to the allocator is present in the list of
579
    #   existing instances, so there could be a conflict within the
580
    #   internal structures of the allocator. This doesn't happen with the
581
    #   current allocators, but it's a liability.
582
    # - The allocator counts the resources used by the instance twice: once
583
    #   because the instance exists already, and once because it tries to
584
    #   allocate a new instance.
585
    # The allocator could choose some of the nodes on which the instance is
586
    # running, but that's not a problem. If the instance nodes are broken,
587
    # they should be already be marked as drained or offline, and hence
588
    # skipped by the allocator. If instance disks have been lost for other
589
    # reasons, then recreating the disks on the same nodes should be fine.
590
    disk_template = self.instance.disk_template
591
    spindle_use = be_full[constants.BE_SPINDLE_USE]
592
    disks = [{
593
      constants.IDISK_SIZE: d.size,
594
      constants.IDISK_MODE: d.mode,
595
      constants.IDISK_SPINDLES: d.spindles,
596
      } for d in self.instance.disks]
597
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
598
                                        disk_template=disk_template,
599
                                        tags=list(self.instance.GetTags()),
600
                                        os=self.instance.os,
601
                                        nics=[{}],
602
                                        vcpus=be_full[constants.BE_VCPUS],
603
                                        memory=be_full[constants.BE_MAXMEM],
604
                                        spindle_use=spindle_use,
605
                                        disks=disks,
606
                                        hypervisor=self.instance.hypervisor,
607
                                        node_whitelist=None)
608
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
609

    
610
    ial.Run(self.op.iallocator)
611

    
612
    assert req.RequiredNodes() == len(self.instance.all_nodes)
613

    
614
    if not ial.success:
615
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
616
                                 " %s" % (self.op.iallocator, ial.info),
617
                                 errors.ECODE_NORES)
618

    
619
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
620
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
621
                 self.op.instance_name, self.op.iallocator,
622
                 utils.CommaJoin(self.op.nodes))
623

    
624
  def CheckArguments(self):
625
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
626
      # Normalize and convert deprecated list of disk indices
627
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
628

    
629
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
630
    if duplicates:
631
      raise errors.OpPrereqError("Some disks have been specified more than"
632
                                 " once: %s" % utils.CommaJoin(duplicates),
633
                                 errors.ECODE_INVAL)
634

    
635
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
636
    # when neither iallocator nor nodes are specified
637
    if self.op.iallocator or self.op.nodes:
638
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
639

    
640
    for (idx, params) in self.op.disks:
641
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
642
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
643
      if unsupported:
644
        raise errors.OpPrereqError("Parameters for disk %s try to change"
645
                                   " unmodifyable parameter(s): %s" %
646
                                   (idx, utils.CommaJoin(unsupported)),
647
                                   errors.ECODE_INVAL)
648

    
649
  def ExpandNames(self):
650
    self._ExpandAndLockInstance()
651
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
652

    
653
    if self.op.nodes:
654
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
655
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
656
    else:
657
      self.needed_locks[locking.LEVEL_NODE] = []
658
      if self.op.iallocator:
659
        # iallocator will select a new node in the same group
660
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
661
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
662

    
663
    self.needed_locks[locking.LEVEL_NODE_RES] = []
664

    
665
  def DeclareLocks(self, level):
666
    if level == locking.LEVEL_NODEGROUP:
667
      assert self.op.iallocator is not None
668
      assert not self.op.nodes
669
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
670
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
671
      # Lock the primary group used by the instance optimistically; this
672
      # requires going via the node before it's locked, requiring
673
      # verification later on
674
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
675
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
676

    
677
    elif level == locking.LEVEL_NODE:
678
      # If an allocator is used, then we lock all the nodes in the current
679
      # instance group, as we don't know yet which ones will be selected;
680
      # if we replace the nodes without using an allocator, locks are
681
      # already declared in ExpandNames; otherwise, we need to lock all the
682
      # instance nodes for disk re-creation
683
      if self.op.iallocator:
684
        assert not self.op.nodes
685
        assert not self.needed_locks[locking.LEVEL_NODE]
686
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
687

    
688
        # Lock member nodes of the group of the primary node
689
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
690
          self.needed_locks[locking.LEVEL_NODE].extend(
691
            self.cfg.GetNodeGroup(group_uuid).members)
692

    
693
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
694
      elif not self.op.nodes:
695
        self._LockInstancesNodes(primary_only=False)
696
    elif level == locking.LEVEL_NODE_RES:
697
      # Copy node locks
698
      self.needed_locks[locking.LEVEL_NODE_RES] = \
699
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
700

    
701
  def BuildHooksEnv(self):
702
    """Build hooks env.
703

704
    This runs on master, primary and secondary nodes of the instance.
705

706
    """
707
    return BuildInstanceHookEnvByObject(self, self.instance)
708

    
709
  def BuildHooksNodes(self):
710
    """Build hooks nodes.
711

712
    """
713
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
714
    return (nl, nl)
715

    
716
  def CheckPrereq(self):
717
    """Check prerequisites.
718

719
    This checks that the instance is in the cluster and is not running.
720

721
    """
722
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
723
    assert instance is not None, \
724
      "Cannot retrieve locked instance %s" % self.op.instance_name
725
    if self.op.node_uuids:
726
      if len(self.op.node_uuids) != len(instance.all_nodes):
727
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
728
                                   " %d replacement nodes were specified" %
729
                                   (instance.name, len(instance.all_nodes),
730
                                    len(self.op.node_uuids)),
731
                                   errors.ECODE_INVAL)
732
      assert instance.disk_template != constants.DT_DRBD8 or \
733
             len(self.op.node_uuids) == 2
734
      assert instance.disk_template != constants.DT_PLAIN or \
735
             len(self.op.node_uuids) == 1
736
      primary_node = self.op.node_uuids[0]
737
    else:
738
      primary_node = instance.primary_node
739
    if not self.op.iallocator:
740
      CheckNodeOnline(self, primary_node)
741

    
742
    if instance.disk_template == constants.DT_DISKLESS:
743
      raise errors.OpPrereqError("Instance '%s' has no disks" %
744
                                 self.op.instance_name, errors.ECODE_INVAL)
745

    
746
    # Verify if node group locks are still correct
747
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
748
    if owned_groups:
749
      # Node group locks are acquired only for the primary node (and only
750
      # when the allocator is used)
751
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
752
                              primary_only=True)
753

    
754
    # if we replace nodes *and* the old primary is offline, we don't
755
    # check the instance state
756
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
757
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
758
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
759
                         msg="cannot recreate disks")
760

    
761
    if self.op.disks:
762
      self.disks = dict(self.op.disks)
763
    else:
764
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
765

    
766
    maxidx = max(self.disks.keys())
767
    if maxidx >= len(instance.disks):
768
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
769
                                 errors.ECODE_INVAL)
770

    
771
    if ((self.op.node_uuids or self.op.iallocator) and
772
         sorted(self.disks.keys()) != range(len(instance.disks))):
773
      raise errors.OpPrereqError("Can't recreate disks partially and"
774
                                 " change the nodes at the same time",
775
                                 errors.ECODE_INVAL)
776

    
777
    self.instance = instance
778

    
779
    if self.op.iallocator:
780
      self._RunAllocator()
781
      # Release unneeded node and node resource locks
782
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
783
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
784
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
785

    
786
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
787

    
788
    if self.op.node_uuids:
789
      node_uuids = self.op.node_uuids
790
    else:
791
      node_uuids = instance.all_nodes
792
    excl_stor = compat.any(
793
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
794
      )
795
    for new_params in self.disks.values():
796
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
797

    
798
  def Exec(self, feedback_fn):
799
    """Recreate the disks.
800

801
    """
802
    assert (self.owned_locks(locking.LEVEL_NODE) ==
803
            self.owned_locks(locking.LEVEL_NODE_RES))
804

    
805
    to_skip = []
806
    mods = [] # keeps track of needed changes
807

    
808
    for idx, disk in enumerate(self.instance.disks):
809
      try:
810
        changes = self.disks[idx]
811
      except KeyError:
812
        # Disk should not be recreated
813
        to_skip.append(idx)
814
        continue
815

    
816
      # update secondaries for disks, if needed
817
      if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
818
        # need to update the nodes and minors
819
        assert len(self.op.node_uuids) == 2
820
        assert len(disk.logical_id) == 6 # otherwise disk internals
821
                                         # have changed
822
        (_, _, old_port, _, _, old_secret) = disk.logical_id
823
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
824
                                                self.instance.uuid)
825
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
826
                  new_minors[0], new_minors[1], old_secret)
827
        assert len(disk.logical_id) == len(new_id)
828
      else:
829
        new_id = None
830

    
831
      mods.append((idx, new_id, changes))
832

    
833
    # now that we have passed all asserts above, we can apply the mods
834
    # in a single run (to avoid partial changes)
835
    for idx, new_id, changes in mods:
836
      disk = self.instance.disks[idx]
837
      if new_id is not None:
838
        assert disk.dev_type == constants.DT_DRBD8
839
        disk.logical_id = new_id
840
      if changes:
841
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
842
                    mode=changes.get(constants.IDISK_MODE, None),
843
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
844

    
845
    # change primary node, if needed
846
    if self.op.node_uuids:
847
      self.instance.primary_node = self.op.node_uuids[0]
848
      self.LogWarning("Changing the instance's nodes, you will have to"
849
                      " remove any disks left on the older nodes manually")
850

    
851
    if self.op.node_uuids:
852
      self.cfg.Update(self.instance, feedback_fn)
853

    
854
    # All touched nodes must be locked
855
    mylocks = self.owned_locks(locking.LEVEL_NODE)
856
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
857
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
858

    
859
    # TODO: Release node locks before wiping, or explain why it's not possible
860
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
861
      wipedisks = [(idx, disk, 0)
862
                   for (idx, disk) in enumerate(self.instance.disks)
863
                   if idx not in to_skip]
864
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
865
                         cleanup=new_disks)
866

    
867

    
868
def _PerformNodeInfoCall(lu, node_uuids, vg):
869
  """Prepares the input and performs a node info call.
870

871
  @type lu: C{LogicalUnit}
872
  @param lu: a logical unit from which we get configuration data
873
  @type node_uuids: list of string
874
  @param node_uuids: list of node UUIDs to perform the call for
875
  @type vg: string
876
  @param vg: the volume group's name
877

878
  """
879
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
880
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
881
                                                  node_uuids)
882
  hvname = lu.cfg.GetHypervisorType()
883
  hvparams = lu.cfg.GetClusterInfo().hvparams
884
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
885
                                   [(hvname, hvparams[hvname])])
886
  return nodeinfo
887

    
888

    
889
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
890
  """Checks the vg capacity for a given node.
891

892
  @type node_info: tuple (_, list of dicts, _)
893
  @param node_info: the result of the node info call for one node
894
  @type node_name: string
895
  @param node_name: the name of the node
896
  @type vg: string
897
  @param vg: volume group name
898
  @type requested: int
899
  @param requested: the amount of disk in MiB to check for
900
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
901
      or we cannot check the node
902

903
  """
904
  (_, space_info, _) = node_info
905
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
906
      space_info, constants.ST_LVM_VG)
907
  if not lvm_vg_info:
908
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
909
  vg_free = lvm_vg_info.get("storage_free", None)
910
  if not isinstance(vg_free, int):
911
    raise errors.OpPrereqError("Can't compute free disk space on node"
912
                               " %s for vg %s, result was '%s'" %
913
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
914
  if requested > vg_free:
915
    raise errors.OpPrereqError("Not enough disk space on target node %s"
916
                               " vg %s: required %d MiB, available %d MiB" %
917
                               (node_name, vg, requested, vg_free),
918
                               errors.ECODE_NORES)
919

    
920

    
921
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
922
  """Checks if nodes have enough free disk space in the specified VG.
923

924
  This function checks if all given nodes have the needed amount of
925
  free disk. In case any node has less disk or we cannot get the
926
  information from the node, this function raises an OpPrereqError
927
  exception.
928

929
  @type lu: C{LogicalUnit}
930
  @param lu: a logical unit from which we get configuration data
931
  @type node_uuids: C{list}
932
  @param node_uuids: the list of node UUIDs to check
933
  @type vg: C{str}
934
  @param vg: the volume group to check
935
  @type requested: C{int}
936
  @param requested: the amount of disk in MiB to check for
937
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
938
      or we cannot check the node
939

940
  """
941
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
942
  for node_uuid in node_uuids:
943
    node_name = lu.cfg.GetNodeName(node_uuid)
944
    info = nodeinfo[node_uuid]
945
    info.Raise("Cannot get current information from node %s" % node_name,
946
               prereq=True, ecode=errors.ECODE_ENVIRON)
947
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
948

    
949

    
950
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
951
  """Checks if nodes have enough free disk space in all the VGs.
952

953
  This function checks if all given nodes have the needed amount of
954
  free disk. In case any node has less disk or we cannot get the
955
  information from the node, this function raises an OpPrereqError
956
  exception.
957

958
  @type lu: C{LogicalUnit}
959
  @param lu: a logical unit from which we get configuration data
960
  @type node_uuids: C{list}
961
  @param node_uuids: the list of node UUIDs to check
962
  @type req_sizes: C{dict}
963
  @param req_sizes: the hash of vg and corresponding amount of disk in
964
      MiB to check for
965
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
966
      or we cannot check the node
967

968
  """
969
  for vg, req_size in req_sizes.items():
970
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
971

    
972

    
973
def _DiskSizeInBytesToMebibytes(lu, size):
974
  """Converts a disk size in bytes to mebibytes.
975

976
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
977

978
  """
979
  (mib, remainder) = divmod(size, 1024 * 1024)
980

    
981
  if remainder != 0:
982
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
983
                  " to not overwrite existing data (%s bytes will not be"
984
                  " wiped)", (1024 * 1024) - remainder)
985
    mib += 1
986

    
987
  return mib
988

    
989

    
990
def _CalcEta(time_taken, written, total_size):
991
  """Calculates the ETA based on size written and total size.
992

993
  @param time_taken: The time taken so far
994
  @param written: amount written so far
995
  @param total_size: The total size of data to be written
996
  @return: The remaining time in seconds
997

998
  """
999
  avg_time = time_taken / float(written)
1000
  return (total_size - written) * avg_time
1001

    
1002

    
1003
def WipeDisks(lu, instance, disks=None):
1004
  """Wipes instance disks.
1005

1006
  @type lu: L{LogicalUnit}
1007
  @param lu: the logical unit on whose behalf we execute
1008
  @type instance: L{objects.Instance}
1009
  @param instance: the instance whose disks we should create
1010
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1011
  @param disks: Disk details; tuple contains disk index, disk object and the
1012
    start offset
1013

1014
  """
1015
  node_uuid = instance.primary_node
1016
  node_name = lu.cfg.GetNodeName(node_uuid)
1017

    
1018
  if disks is None:
1019
    disks = [(idx, disk, 0)
1020
             for (idx, disk) in enumerate(instance.disks)]
1021

    
1022
  logging.info("Pausing synchronization of disks of instance '%s'",
1023
               instance.name)
1024
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1025
                                                  (map(compat.snd, disks),
1026
                                                   instance),
1027
                                                  True)
1028
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1029

    
1030
  for idx, success in enumerate(result.payload):
1031
    if not success:
1032
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1033
                   " failed", idx, instance.name)
1034

    
1035
  try:
1036
    for (idx, device, offset) in disks:
1037
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1038
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1039
      wipe_chunk_size = \
1040
        int(min(constants.MAX_WIPE_CHUNK,
1041
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1042

    
1043
      size = device.size
1044
      last_output = 0
1045
      start_time = time.time()
1046

    
1047
      if offset == 0:
1048
        info_text = ""
1049
      else:
1050
        info_text = (" (from %s to %s)" %
1051
                     (utils.FormatUnit(offset, "h"),
1052
                      utils.FormatUnit(size, "h")))
1053

    
1054
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1055

    
1056
      logging.info("Wiping disk %d for instance %s on node %s using"
1057
                   " chunk size %s", idx, instance.name, node_name,
1058
                   wipe_chunk_size)
1059

    
1060
      while offset < size:
1061
        wipe_size = min(wipe_chunk_size, size - offset)
1062

    
1063
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1064
                      idx, offset, wipe_size)
1065

    
1066
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1067
                                           offset, wipe_size)
1068
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1069
                     (idx, offset, wipe_size))
1070

    
1071
        now = time.time()
1072
        offset += wipe_size
1073
        if now - last_output >= 60:
1074
          eta = _CalcEta(now - start_time, offset, size)
1075
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1076
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1077
          last_output = now
1078
  finally:
1079
    logging.info("Resuming synchronization of disks for instance '%s'",
1080
                 instance.name)
1081

    
1082
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1083
                                                    (map(compat.snd, disks),
1084
                                                     instance),
1085
                                                    False)
1086

    
1087
    if result.fail_msg:
1088
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1089
                    node_name, result.fail_msg)
1090
    else:
1091
      for idx, success in enumerate(result.payload):
1092
        if not success:
1093
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1094
                        " failed", idx, instance.name)
1095

    
1096

    
1097
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1098
  """Wrapper for L{WipeDisks} that handles errors.
1099

1100
  @type lu: L{LogicalUnit}
1101
  @param lu: the logical unit on whose behalf we execute
1102
  @type instance: L{objects.Instance}
1103
  @param instance: the instance whose disks we should wipe
1104
  @param disks: see L{WipeDisks}
1105
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1106
      case of error
1107
  @raise errors.OpPrereqError: in case of failure
1108

1109
  """
1110
  try:
1111
    WipeDisks(lu, instance, disks=disks)
1112
  except errors.OpExecError:
1113
    logging.warning("Wiping disks for instance '%s' failed",
1114
                    instance.name)
1115
    _UndoCreateDisks(lu, cleanup, instance)
1116
    raise
1117

    
1118

    
1119
def ExpandCheckDisks(instance, disks):
1120
  """Return the instance disks selected by the disks list
1121

1122
  @type disks: list of L{objects.Disk} or None
1123
  @param disks: selected disks
1124
  @rtype: list of L{objects.Disk}
1125
  @return: selected instance disks to act on
1126

1127
  """
1128
  if disks is None:
1129
    return instance.disks
1130
  else:
1131
    if not set(disks).issubset(instance.disks):
1132
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1133
                                   " target instance: expected a subset of %r,"
1134
                                   " got %r" % (instance.disks, disks))
1135
    return disks
1136

    
1137

    
1138
def WaitForSync(lu, instance, disks=None, oneshot=False):
1139
  """Sleep and poll for an instance's disk to sync.
1140

1141
  """
1142
  if not instance.disks or disks is not None and not disks:
1143
    return True
1144

    
1145
  disks = ExpandCheckDisks(instance, disks)
1146

    
1147
  if not oneshot:
1148
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1149

    
1150
  node_uuid = instance.primary_node
1151
  node_name = lu.cfg.GetNodeName(node_uuid)
1152

    
1153
  # TODO: Convert to utils.Retry
1154

    
1155
  retries = 0
1156
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1157
  while True:
1158
    max_time = 0
1159
    done = True
1160
    cumul_degraded = False
1161
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1162
    msg = rstats.fail_msg
1163
    if msg:
1164
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1165
      retries += 1
1166
      if retries >= 10:
1167
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1168
                                 " aborting." % node_name)
1169
      time.sleep(6)
1170
      continue
1171
    rstats = rstats.payload
1172
    retries = 0
1173
    for i, mstat in enumerate(rstats):
1174
      if mstat is None:
1175
        lu.LogWarning("Can't compute data for node %s/%s",
1176
                      node_name, disks[i].iv_name)
1177
        continue
1178

    
1179
      cumul_degraded = (cumul_degraded or
1180
                        (mstat.is_degraded and mstat.sync_percent is None))
1181
      if mstat.sync_percent is not None:
1182
        done = False
1183
        if mstat.estimated_time is not None:
1184
          rem_time = ("%s remaining (estimated)" %
1185
                      utils.FormatSeconds(mstat.estimated_time))
1186
          max_time = mstat.estimated_time
1187
        else:
1188
          rem_time = "no time estimate"
1189
          max_time = 5 # sleep at least a bit between retries
1190
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1191
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1192

    
1193
    # if we're done but degraded, let's do a few small retries, to
1194
    # make sure we see a stable and not transient situation; therefore
1195
    # we force restart of the loop
1196
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1197
      logging.info("Degraded disks found, %d retries left", degr_retries)
1198
      degr_retries -= 1
1199
      time.sleep(1)
1200
      continue
1201

    
1202
    if done or oneshot:
1203
      break
1204

    
1205
    time.sleep(min(60, max_time))
1206

    
1207
  if done:
1208
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1209

    
1210
  return not cumul_degraded
1211

    
1212

    
1213
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1214
  """Shutdown block devices of an instance.
1215

1216
  This does the shutdown on all nodes of the instance.
1217

1218
  If the ignore_primary is false, errors on the primary node are
1219
  ignored.
1220

1221
  """
1222
  all_result = True
1223

    
1224
  if disks is None:
1225
    # only mark instance disks as inactive if all disks are affected
1226
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1227
  disks = ExpandCheckDisks(instance, disks)
1228

    
1229
  for disk in disks:
1230
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1231
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1232
      msg = result.fail_msg
1233
      if msg:
1234
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1235
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1236
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1237
            (node_uuid != instance.primary_node and not result.offline)):
1238
          all_result = False
1239
  return all_result
1240

    
1241

    
1242
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1243
  """Shutdown block devices of an instance.
1244

1245
  This function checks if an instance is running, before calling
1246
  _ShutdownInstanceDisks.
1247

1248
  """
1249
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1250
  ShutdownInstanceDisks(lu, instance, disks=disks)
1251

    
1252

    
1253
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1254
                          ignore_size=False):
1255
  """Prepare the block devices for an instance.
1256

1257
  This sets up the block devices on all nodes.
1258

1259
  @type lu: L{LogicalUnit}
1260
  @param lu: the logical unit on whose behalf we execute
1261
  @type instance: L{objects.Instance}
1262
  @param instance: the instance for whose disks we assemble
1263
  @type disks: list of L{objects.Disk} or None
1264
  @param disks: which disks to assemble (or all, if None)
1265
  @type ignore_secondaries: boolean
1266
  @param ignore_secondaries: if true, errors on secondary nodes
1267
      won't result in an error return from the function
1268
  @type ignore_size: boolean
1269
  @param ignore_size: if true, the current known size of the disk
1270
      will not be used during the disk activation, useful for cases
1271
      when the size is wrong
1272
  @return: False if the operation failed, otherwise a list of
1273
      (host, instance_visible_name, node_visible_name)
1274
      with the mapping from node devices to instance devices
1275

1276
  """
1277
  device_info = []
1278
  disks_ok = True
1279

    
1280
  if disks is None:
1281
    # only mark instance disks as active if all disks are affected
1282
    lu.cfg.MarkInstanceDisksActive(instance.uuid)
1283

    
1284
  disks = ExpandCheckDisks(instance, disks)
1285

    
1286
  # With the two passes mechanism we try to reduce the window of
1287
  # opportunity for the race condition of switching DRBD to primary
1288
  # before handshaking occured, but we do not eliminate it
1289

    
1290
  # The proper fix would be to wait (with some limits) until the
1291
  # connection has been made and drbd transitions from WFConnection
1292
  # into any other network-connected state (Connected, SyncTarget,
1293
  # SyncSource, etc.)
1294

    
1295
  # 1st pass, assemble on all nodes in secondary mode
1296
  for idx, inst_disk in enumerate(disks):
1297
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1298
                                  instance.primary_node):
1299
      if ignore_size:
1300
        node_disk = node_disk.Copy()
1301
        node_disk.UnsetSize()
1302
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1303
                                             instance.name, False, idx)
1304
      msg = result.fail_msg
1305
      if msg:
1306
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1307
                                result.offline)
1308
        lu.LogWarning("Could not prepare block device %s on node %s"
1309
                      " (is_primary=False, pass=1): %s",
1310
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1311
        if not (ignore_secondaries or is_offline_secondary):
1312
          disks_ok = False
1313

    
1314
  # FIXME: race condition on drbd migration to primary
1315

    
1316
  # 2nd pass, do only the primary node
1317
  for idx, inst_disk in enumerate(disks):
1318
    dev_path = None
1319

    
1320
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1321
                                  instance.primary_node):
1322
      if node_uuid != instance.primary_node:
1323
        continue
1324
      if ignore_size:
1325
        node_disk = node_disk.Copy()
1326
        node_disk.UnsetSize()
1327
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1328
                                             instance.name, True, idx)
1329
      msg = result.fail_msg
1330
      if msg:
1331
        lu.LogWarning("Could not prepare block device %s on node %s"
1332
                      " (is_primary=True, pass=2): %s",
1333
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1334
        disks_ok = False
1335
      else:
1336
        dev_path, _ = result.payload
1337

    
1338
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1339
                        inst_disk.iv_name, dev_path))
1340

    
1341
  if not disks_ok:
1342
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1343

    
1344
  return disks_ok, device_info
1345

    
1346

    
1347
def StartInstanceDisks(lu, instance, force):
1348
  """Start the disks of an instance.
1349

1350
  """
1351
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1352
                                      ignore_secondaries=force)
1353
  if not disks_ok:
1354
    ShutdownInstanceDisks(lu, instance)
1355
    if force is not None and not force:
1356
      lu.LogWarning("",
1357
                    hint=("If the message above refers to a secondary node,"
1358
                          " you can retry the operation using '--force'"))
1359
    raise errors.OpExecError("Disk consistency error")
1360

    
1361

    
1362
class LUInstanceGrowDisk(LogicalUnit):
1363
  """Grow a disk of an instance.
1364

1365
  """
1366
  HPATH = "disk-grow"
1367
  HTYPE = constants.HTYPE_INSTANCE
1368
  REQ_BGL = False
1369

    
1370
  def ExpandNames(self):
1371
    self._ExpandAndLockInstance()
1372
    self.needed_locks[locking.LEVEL_NODE] = []
1373
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1374
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1375
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1376

    
1377
  def DeclareLocks(self, level):
1378
    if level == locking.LEVEL_NODE:
1379
      self._LockInstancesNodes()
1380
    elif level == locking.LEVEL_NODE_RES:
1381
      # Copy node locks
1382
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1383
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1384

    
1385
  def BuildHooksEnv(self):
1386
    """Build hooks env.
1387

1388
    This runs on the master, the primary and all the secondaries.
1389

1390
    """
1391
    env = {
1392
      "DISK": self.op.disk,
1393
      "AMOUNT": self.op.amount,
1394
      "ABSOLUTE": self.op.absolute,
1395
      }
1396
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1397
    return env
1398

    
1399
  def BuildHooksNodes(self):
1400
    """Build hooks nodes.
1401

1402
    """
1403
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1404
    return (nl, nl)
1405

    
1406
  def CheckPrereq(self):
1407
    """Check prerequisites.
1408

1409
    This checks that the instance is in the cluster.
1410

1411
    """
1412
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1413
    assert self.instance is not None, \
1414
      "Cannot retrieve locked instance %s" % self.op.instance_name
1415
    node_uuids = list(self.instance.all_nodes)
1416
    for node_uuid in node_uuids:
1417
      CheckNodeOnline(self, node_uuid)
1418
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1419

    
1420
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1421
      raise errors.OpPrereqError("Instance's disk layout does not support"
1422
                                 " growing", errors.ECODE_INVAL)
1423

    
1424
    self.disk = self.instance.FindDisk(self.op.disk)
1425

    
1426
    if self.op.absolute:
1427
      self.target = self.op.amount
1428
      self.delta = self.target - self.disk.size
1429
      if self.delta < 0:
1430
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1431
                                   "current disk size (%s)" %
1432
                                   (utils.FormatUnit(self.target, "h"),
1433
                                    utils.FormatUnit(self.disk.size, "h")),
1434
                                   errors.ECODE_STATE)
1435
    else:
1436
      self.delta = self.op.amount
1437
      self.target = self.disk.size + self.delta
1438
      if self.delta < 0:
1439
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1440
                                   utils.FormatUnit(self.delta, "h"),
1441
                                   errors.ECODE_INVAL)
1442

    
1443
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1444

    
1445
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1446
    template = self.instance.disk_template
1447
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1448
        not any(self.node_es_flags.values())):
1449
      # TODO: check the free disk space for file, when that feature will be
1450
      # supported
1451
      # With exclusive storage we need to do something smarter than just looking
1452
      # at free space, which, in the end, is basically a dry run. So we rely on
1453
      # the dry run performed in Exec() instead.
1454
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1455

    
1456
  def Exec(self, feedback_fn):
1457
    """Execute disk grow.
1458

1459
    """
1460
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1461
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1462
            self.owned_locks(locking.LEVEL_NODE_RES))
1463

    
1464
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1465

    
1466
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1467
    if not disks_ok:
1468
      raise errors.OpExecError("Cannot activate block device to grow")
1469

    
1470
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1471
                (self.op.disk, self.instance.name,
1472
                 utils.FormatUnit(self.delta, "h"),
1473
                 utils.FormatUnit(self.target, "h")))
1474

    
1475
    # First run all grow ops in dry-run mode
1476
    for node_uuid in self.instance.all_nodes:
1477
      result = self.rpc.call_blockdev_grow(node_uuid,
1478
                                           (self.disk, self.instance),
1479
                                           self.delta, True, True,
1480
                                           self.node_es_flags[node_uuid])
1481
      result.Raise("Dry-run grow request failed to node %s" %
1482
                   self.cfg.GetNodeName(node_uuid))
1483

    
1484
    if wipe_disks:
1485
      # Get disk size from primary node for wiping
1486
      result = self.rpc.call_blockdev_getdimensions(
1487
                 self.instance.primary_node, [([self.disk], self.instance)])
1488
      result.Raise("Failed to retrieve disk size from node '%s'" %
1489
                   self.instance.primary_node)
1490

    
1491
      (disk_dimensions, ) = result.payload
1492

    
1493
      if disk_dimensions is None:
1494
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1495
                                 " node '%s'" % self.instance.primary_node)
1496
      (disk_size_in_bytes, _) = disk_dimensions
1497

    
1498
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1499

    
1500
      assert old_disk_size >= self.disk.size, \
1501
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1502
         (old_disk_size, self.disk.size))
1503
    else:
1504
      old_disk_size = None
1505

    
1506
    # We know that (as far as we can test) operations across different
1507
    # nodes will succeed, time to run it for real on the backing storage
1508
    for node_uuid in self.instance.all_nodes:
1509
      result = self.rpc.call_blockdev_grow(node_uuid,
1510
                                           (self.disk, self.instance),
1511
                                           self.delta, False, True,
1512
                                           self.node_es_flags[node_uuid])
1513
      result.Raise("Grow request failed to node %s" %
1514
                   self.cfg.GetNodeName(node_uuid))
1515

    
1516
    # And now execute it for logical storage, on the primary node
1517
    node_uuid = self.instance.primary_node
1518
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1519
                                         self.delta, False, False,
1520
                                         self.node_es_flags[node_uuid])
1521
    result.Raise("Grow request failed to node %s" %
1522
                 self.cfg.GetNodeName(node_uuid))
1523

    
1524
    self.disk.RecordGrow(self.delta)
1525
    self.cfg.Update(self.instance, feedback_fn)
1526

    
1527
    # Changes have been recorded, release node lock
1528
    ReleaseLocks(self, locking.LEVEL_NODE)
1529

    
1530
    # Downgrade lock while waiting for sync
1531
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1532

    
1533
    assert wipe_disks ^ (old_disk_size is None)
1534

    
1535
    if wipe_disks:
1536
      assert self.instance.disks[self.op.disk] == self.disk
1537

    
1538
      # Wipe newly added disk space
1539
      WipeDisks(self, self.instance,
1540
                disks=[(self.op.disk, self.disk, old_disk_size)])
1541

    
1542
    if self.op.wait_for_sync:
1543
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1544
      if disk_abort:
1545
        self.LogWarning("Disk syncing has not returned a good status; check"
1546
                        " the instance")
1547
      if not self.instance.disks_active:
1548
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1549
    elif not self.instance.disks_active:
1550
      self.LogWarning("Not shutting down the disk even if the instance is"
1551
                      " not supposed to be running because no wait for"
1552
                      " sync mode was requested")
1553

    
1554
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1555
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1556

    
1557

    
1558
class LUInstanceReplaceDisks(LogicalUnit):
1559
  """Replace the disks of an instance.
1560

1561
  """
1562
  HPATH = "mirrors-replace"
1563
  HTYPE = constants.HTYPE_INSTANCE
1564
  REQ_BGL = False
1565

    
1566
  def CheckArguments(self):
1567
    """Check arguments.
1568

1569
    """
1570
    if self.op.mode == constants.REPLACE_DISK_CHG:
1571
      if self.op.remote_node is None and self.op.iallocator is None:
1572
        raise errors.OpPrereqError("When changing the secondary either an"
1573
                                   " iallocator script must be used or the"
1574
                                   " new node given", errors.ECODE_INVAL)
1575
      else:
1576
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1577

    
1578
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1579
      # Not replacing the secondary
1580
      raise errors.OpPrereqError("The iallocator and new node options can"
1581
                                 " only be used when changing the"
1582
                                 " secondary node", errors.ECODE_INVAL)
1583

    
1584
  def ExpandNames(self):
1585
    self._ExpandAndLockInstance()
1586

    
1587
    assert locking.LEVEL_NODE not in self.needed_locks
1588
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1589
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1590

    
1591
    assert self.op.iallocator is None or self.op.remote_node is None, \
1592
      "Conflicting options"
1593

    
1594
    if self.op.remote_node is not None:
1595
      (self.op.remote_node_uuid, self.op.remote_node) = \
1596
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1597
                              self.op.remote_node)
1598

    
1599
      # Warning: do not remove the locking of the new secondary here
1600
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1601
      # currently it doesn't since parallel invocations of
1602
      # FindUnusedMinor will conflict
1603
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1604
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1605
    else:
1606
      self.needed_locks[locking.LEVEL_NODE] = []
1607
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1608

    
1609
      if self.op.iallocator is not None:
1610
        # iallocator will select a new node in the same group
1611
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1612
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1613

    
1614
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1615

    
1616
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1617
                                   self.op.instance_name, self.op.mode,
1618
                                   self.op.iallocator, self.op.remote_node_uuid,
1619
                                   self.op.disks, self.op.early_release,
1620
                                   self.op.ignore_ipolicy)
1621

    
1622
    self.tasklets = [self.replacer]
1623

    
1624
  def DeclareLocks(self, level):
1625
    if level == locking.LEVEL_NODEGROUP:
1626
      assert self.op.remote_node_uuid is None
1627
      assert self.op.iallocator is not None
1628
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1629

    
1630
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1631
      # Lock all groups used by instance optimistically; this requires going
1632
      # via the node before it's locked, requiring verification later on
1633
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1634
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1635

    
1636
    elif level == locking.LEVEL_NODE:
1637
      if self.op.iallocator is not None:
1638
        assert self.op.remote_node_uuid is None
1639
        assert not self.needed_locks[locking.LEVEL_NODE]
1640
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1641

    
1642
        # Lock member nodes of all locked groups
1643
        self.needed_locks[locking.LEVEL_NODE] = \
1644
          [node_uuid
1645
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1646
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1647
      else:
1648
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1649

    
1650
        self._LockInstancesNodes()
1651

    
1652
    elif level == locking.LEVEL_NODE_RES:
1653
      # Reuse node locks
1654
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1655
        self.needed_locks[locking.LEVEL_NODE]
1656

    
1657
  def BuildHooksEnv(self):
1658
    """Build hooks env.
1659

1660
    This runs on the master, the primary and all the secondaries.
1661

1662
    """
1663
    instance = self.replacer.instance
1664
    env = {
1665
      "MODE": self.op.mode,
1666
      "NEW_SECONDARY": self.op.remote_node,
1667
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1668
      }
1669
    env.update(BuildInstanceHookEnvByObject(self, instance))
1670
    return env
1671

    
1672
  def BuildHooksNodes(self):
1673
    """Build hooks nodes.
1674

1675
    """
1676
    instance = self.replacer.instance
1677
    nl = [
1678
      self.cfg.GetMasterNode(),
1679
      instance.primary_node,
1680
      ]
1681
    if self.op.remote_node_uuid is not None:
1682
      nl.append(self.op.remote_node_uuid)
1683
    return nl, nl
1684

    
1685
  def CheckPrereq(self):
1686
    """Check prerequisites.
1687

1688
    """
1689
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1690
            self.op.iallocator is None)
1691

    
1692
    # Verify if node group locks are still correct
1693
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1694
    if owned_groups:
1695
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1696

    
1697
    return LogicalUnit.CheckPrereq(self)
1698

    
1699

    
1700
class LUInstanceActivateDisks(NoHooksLU):
1701
  """Bring up an instance's disks.
1702

1703
  """
1704
  REQ_BGL = False
1705

    
1706
  def ExpandNames(self):
1707
    self._ExpandAndLockInstance()
1708
    self.needed_locks[locking.LEVEL_NODE] = []
1709
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1710

    
1711
  def DeclareLocks(self, level):
1712
    if level == locking.LEVEL_NODE:
1713
      self._LockInstancesNodes()
1714

    
1715
  def CheckPrereq(self):
1716
    """Check prerequisites.
1717

1718
    This checks that the instance is in the cluster.
1719

1720
    """
1721
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1722
    assert self.instance is not None, \
1723
      "Cannot retrieve locked instance %s" % self.op.instance_name
1724
    CheckNodeOnline(self, self.instance.primary_node)
1725

    
1726
  def Exec(self, feedback_fn):
1727
    """Activate the disks.
1728

1729
    """
1730
    disks_ok, disks_info = \
1731
              AssembleInstanceDisks(self, self.instance,
1732
                                    ignore_size=self.op.ignore_size)
1733
    if not disks_ok:
1734
      raise errors.OpExecError("Cannot activate block devices")
1735

    
1736
    if self.op.wait_for_sync:
1737
      if not WaitForSync(self, self.instance):
1738
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1739
        raise errors.OpExecError("Some disks of the instance are degraded!")
1740

    
1741
    return disks_info
1742

    
1743

    
1744
class LUInstanceDeactivateDisks(NoHooksLU):
1745
  """Shutdown an instance's disks.
1746

1747
  """
1748
  REQ_BGL = False
1749

    
1750
  def ExpandNames(self):
1751
    self._ExpandAndLockInstance()
1752
    self.needed_locks[locking.LEVEL_NODE] = []
1753
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1754

    
1755
  def DeclareLocks(self, level):
1756
    if level == locking.LEVEL_NODE:
1757
      self._LockInstancesNodes()
1758

    
1759
  def CheckPrereq(self):
1760
    """Check prerequisites.
1761

1762
    This checks that the instance is in the cluster.
1763

1764
    """
1765
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1766
    assert self.instance is not None, \
1767
      "Cannot retrieve locked instance %s" % self.op.instance_name
1768

    
1769
  def Exec(self, feedback_fn):
1770
    """Deactivate the disks
1771

1772
    """
1773
    if self.op.force:
1774
      ShutdownInstanceDisks(self, self.instance)
1775
    else:
1776
      _SafeShutdownInstanceDisks(self, self.instance)
1777

    
1778

    
1779
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1780
                               ldisk=False):
1781
  """Check that mirrors are not degraded.
1782

1783
  @attention: The device has to be annotated already.
1784

1785
  The ldisk parameter, if True, will change the test from the
1786
  is_degraded attribute (which represents overall non-ok status for
1787
  the device(s)) to the ldisk (representing the local storage status).
1788

1789
  """
1790
  result = True
1791

    
1792
  if on_primary or dev.AssembleOnSecondary():
1793
    rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1794
    msg = rstats.fail_msg
1795
    if msg:
1796
      lu.LogWarning("Can't find disk on node %s: %s",
1797
                    lu.cfg.GetNodeName(node_uuid), msg)
1798
      result = False
1799
    elif not rstats.payload:
1800
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1801
      result = False
1802
    else:
1803
      if ldisk:
1804
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1805
      else:
1806
        result = result and not rstats.payload.is_degraded
1807

    
1808
  if dev.children:
1809
    for child in dev.children:
1810
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1811
                                                     node_uuid, on_primary)
1812

    
1813
  return result
1814

    
1815

    
1816
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1817
  """Wrapper around L{_CheckDiskConsistencyInner}.
1818

1819
  """
1820
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1821
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1822
                                    ldisk=ldisk)
1823

    
1824

    
1825
def _BlockdevFind(lu, node_uuid, dev, instance):
1826
  """Wrapper around call_blockdev_find to annotate diskparams.
1827

1828
  @param lu: A reference to the lu object
1829
  @param node_uuid: The node to call out
1830
  @param dev: The device to find
1831
  @param instance: The instance object the device belongs to
1832
  @returns The result of the rpc call
1833

1834
  """
1835
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1836
  return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1837

    
1838

    
1839
def _GenerateUniqueNames(lu, exts):
1840
  """Generate a suitable LV name.
1841

1842
  This will generate a logical volume name for the given instance.
1843

1844
  """
1845
  results = []
1846
  for val in exts:
1847
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1848
    results.append("%s%s" % (new_id, val))
1849
  return results
1850

    
1851

    
1852
class TLReplaceDisks(Tasklet):
1853
  """Replaces disks for an instance.
1854

1855
  Note: Locking is not within the scope of this class.
1856

1857
  """
1858
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1859
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1860
    """Initializes this class.
1861

1862
    """
1863
    Tasklet.__init__(self, lu)
1864

    
1865
    # Parameters
1866
    self.instance_uuid = instance_uuid
1867
    self.instance_name = instance_name
1868
    self.mode = mode
1869
    self.iallocator_name = iallocator_name
1870
    self.remote_node_uuid = remote_node_uuid
1871
    self.disks = disks
1872
    self.early_release = early_release
1873
    self.ignore_ipolicy = ignore_ipolicy
1874

    
1875
    # Runtime data
1876
    self.instance = None
1877
    self.new_node_uuid = None
1878
    self.target_node_uuid = None
1879
    self.other_node_uuid = None
1880
    self.remote_node_info = None
1881
    self.node_secondary_ip = None
1882

    
1883
  @staticmethod
1884
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1885
                    relocate_from_node_uuids):
1886
    """Compute a new secondary node using an IAllocator.
1887

1888
    """
1889
    req = iallocator.IAReqRelocate(
1890
          inst_uuid=instance_uuid,
1891
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1892
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1893

    
1894
    ial.Run(iallocator_name)
1895

    
1896
    if not ial.success:
1897
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1898
                                 " %s" % (iallocator_name, ial.info),
1899
                                 errors.ECODE_NORES)
1900

    
1901
    remote_node_name = ial.result[0]
1902
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1903

    
1904
    if remote_node is None:
1905
      raise errors.OpPrereqError("Node %s not found in configuration" %
1906
                                 remote_node_name, errors.ECODE_NOENT)
1907

    
1908
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1909
               instance_uuid, remote_node_name)
1910

    
1911
    return remote_node.uuid
1912

    
1913
  def _FindFaultyDisks(self, node_uuid):
1914
    """Wrapper for L{FindFaultyInstanceDisks}.
1915

1916
    """
1917
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1918
                                   node_uuid, True)
1919

    
1920
  def _CheckDisksActivated(self, instance):
1921
    """Checks if the instance disks are activated.
1922

1923
    @param instance: The instance to check disks
1924
    @return: True if they are activated, False otherwise
1925

1926
    """
1927
    node_uuids = instance.all_nodes
1928

    
1929
    for idx, dev in enumerate(instance.disks):
1930
      for node_uuid in node_uuids:
1931
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1932
                        self.cfg.GetNodeName(node_uuid))
1933

    
1934
        result = _BlockdevFind(self, node_uuid, dev, instance)
1935

    
1936
        if result.offline:
1937
          continue
1938
        elif result.fail_msg or not result.payload:
1939
          return False
1940

    
1941
    return True
1942

    
1943
  def CheckPrereq(self):
1944
    """Check prerequisites.
1945

1946
    This checks that the instance is in the cluster.
1947

1948
    """
1949
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1950
    assert self.instance is not None, \
1951
      "Cannot retrieve locked instance %s" % self.instance_name
1952

    
1953
    if self.instance.disk_template != constants.DT_DRBD8:
1954
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1955
                                 " instances", errors.ECODE_INVAL)
1956

    
1957
    if len(self.instance.secondary_nodes) != 1:
1958
      raise errors.OpPrereqError("The instance has a strange layout,"
1959
                                 " expected one secondary but found %d" %
1960
                                 len(self.instance.secondary_nodes),
1961
                                 errors.ECODE_FAULT)
1962

    
1963
    secondary_node_uuid = self.instance.secondary_nodes[0]
1964

    
1965
    if self.iallocator_name is None:
1966
      remote_node_uuid = self.remote_node_uuid
1967
    else:
1968
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1969
                                            self.instance.uuid,
1970
                                            self.instance.secondary_nodes)
1971

    
1972
    if remote_node_uuid is None:
1973
      self.remote_node_info = None
1974
    else:
1975
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1976
             "Remote node '%s' is not locked" % remote_node_uuid
1977

    
1978
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1979
      assert self.remote_node_info is not None, \
1980
        "Cannot retrieve locked node %s" % remote_node_uuid
1981

    
1982
    if remote_node_uuid == self.instance.primary_node:
1983
      raise errors.OpPrereqError("The specified node is the primary node of"
1984
                                 " the instance", errors.ECODE_INVAL)
1985

    
1986
    if remote_node_uuid == secondary_node_uuid:
1987
      raise errors.OpPrereqError("The specified node is already the"
1988
                                 " secondary node of the instance",
1989
                                 errors.ECODE_INVAL)
1990

    
1991
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1992
                                    constants.REPLACE_DISK_CHG):
1993
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1994
                                 errors.ECODE_INVAL)
1995

    
1996
    if self.mode == constants.REPLACE_DISK_AUTO:
1997
      if not self._CheckDisksActivated(self.instance):
1998
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1999
                                   " first" % self.instance_name,
2000
                                   errors.ECODE_STATE)
2001
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2002
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2003

    
2004
      if faulty_primary and faulty_secondary:
2005
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2006
                                   " one node and can not be repaired"
2007
                                   " automatically" % self.instance_name,
2008
                                   errors.ECODE_STATE)
2009

    
2010
      if faulty_primary:
2011
        self.disks = faulty_primary
2012
        self.target_node_uuid = self.instance.primary_node
2013
        self.other_node_uuid = secondary_node_uuid
2014
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2015
      elif faulty_secondary:
2016
        self.disks = faulty_secondary
2017
        self.target_node_uuid = secondary_node_uuid
2018
        self.other_node_uuid = self.instance.primary_node
2019
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2020
      else:
2021
        self.disks = []
2022
        check_nodes = []
2023

    
2024
    else:
2025
      # Non-automatic modes
2026
      if self.mode == constants.REPLACE_DISK_PRI:
2027
        self.target_node_uuid = self.instance.primary_node
2028
        self.other_node_uuid = secondary_node_uuid
2029
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2030

    
2031
      elif self.mode == constants.REPLACE_DISK_SEC:
2032
        self.target_node_uuid = secondary_node_uuid
2033
        self.other_node_uuid = self.instance.primary_node
2034
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2035

    
2036
      elif self.mode == constants.REPLACE_DISK_CHG:
2037
        self.new_node_uuid = remote_node_uuid
2038
        self.other_node_uuid = self.instance.primary_node
2039
        self.target_node_uuid = secondary_node_uuid
2040
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2041

    
2042
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2043
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2044

    
2045
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2046
        assert old_node_info is not None
2047
        if old_node_info.offline and not self.early_release:
2048
          # doesn't make sense to delay the release
2049
          self.early_release = True
2050
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2051
                          " early-release mode", secondary_node_uuid)
2052

    
2053
      else:
2054
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2055
                                     self.mode)
2056

    
2057
      # If not specified all disks should be replaced
2058
      if not self.disks:
2059
        self.disks = range(len(self.instance.disks))
2060

    
2061
    # TODO: This is ugly, but right now we can't distinguish between internal
2062
    # submitted opcode and external one. We should fix that.
2063
    if self.remote_node_info:
2064
      # We change the node, lets verify it still meets instance policy
2065
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2066
      cluster = self.cfg.GetClusterInfo()
2067
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2068
                                                              new_group_info)
2069
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2070
                             self.remote_node_info, self.cfg,
2071
                             ignore=self.ignore_ipolicy)
2072

    
2073
    for node_uuid in check_nodes:
2074
      CheckNodeOnline(self.lu, node_uuid)
2075

    
2076
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2077
                                                          self.other_node_uuid,
2078
                                                          self.target_node_uuid]
2079
                              if node_uuid is not None)
2080

    
2081
    # Release unneeded node and node resource locks
2082
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2083
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2084
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2085

    
2086
    # Release any owned node group
2087
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2088

    
2089
    # Check whether disks are valid
2090
    for disk_idx in self.disks:
2091
      self.instance.FindDisk(disk_idx)
2092

    
2093
    # Get secondary node IP addresses
2094
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2095
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2096

    
2097
  def Exec(self, feedback_fn):
2098
    """Execute disk replacement.
2099

2100
    This dispatches the disk replacement to the appropriate handler.
2101

2102
    """
2103
    if __debug__:
2104
      # Verify owned locks before starting operation
2105
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2106
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2107
          ("Incorrect node locks, owning %s, expected %s" %
2108
           (owned_nodes, self.node_secondary_ip.keys()))
2109
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2110
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2111
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2112

    
2113
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2114
      assert list(owned_instances) == [self.instance_name], \
2115
          "Instance '%s' not locked" % self.instance_name
2116

    
2117
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2118
          "Should not own any node group lock at this point"
2119

    
2120
    if not self.disks:
2121
      feedback_fn("No disks need replacement for instance '%s'" %
2122
                  self.instance.name)
2123
      return
2124

    
2125
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2126
                (utils.CommaJoin(self.disks), self.instance.name))
2127
    feedback_fn("Current primary node: %s" %
2128
                self.cfg.GetNodeName(self.instance.primary_node))
2129
    feedback_fn("Current seconary node: %s" %
2130
                utils.CommaJoin(self.cfg.GetNodeNames(
2131
                                  self.instance.secondary_nodes)))
2132

    
2133
    activate_disks = not self.instance.disks_active
2134

    
2135
    # Activate the instance disks if we're replacing them on a down instance
2136
    if activate_disks:
2137
      StartInstanceDisks(self.lu, self.instance, True)
2138

    
2139
    try:
2140
      # Should we replace the secondary node?
2141
      if self.new_node_uuid is not None:
2142
        fn = self._ExecDrbd8Secondary
2143
      else:
2144
        fn = self._ExecDrbd8DiskOnly
2145

    
2146
      result = fn(feedback_fn)
2147
    finally:
2148
      # Deactivate the instance disks if we're replacing them on a
2149
      # down instance
2150
      if activate_disks:
2151
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2152

    
2153
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2154

    
2155
    if __debug__:
2156
      # Verify owned locks
2157
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2158
      nodes = frozenset(self.node_secondary_ip)
2159
      assert ((self.early_release and not owned_nodes) or
2160
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2161
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2162
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2163

    
2164
    return result
2165

    
2166
  def _CheckVolumeGroup(self, node_uuids):
2167
    self.lu.LogInfo("Checking volume groups")
2168

    
2169
    vgname = self.cfg.GetVGName()
2170

    
2171
    # Make sure volume group exists on all involved nodes
2172
    results = self.rpc.call_vg_list(node_uuids)
2173
    if not results:
2174
      raise errors.OpExecError("Can't list volume groups on the nodes")
2175

    
2176
    for node_uuid in node_uuids:
2177
      res = results[node_uuid]
2178
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2179
      if vgname not in res.payload:
2180
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2181
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2182

    
2183
  def _CheckDisksExistence(self, node_uuids):
2184
    # Check disk existence
2185
    for idx, dev in enumerate(self.instance.disks):
2186
      if idx not in self.disks:
2187
        continue
2188

    
2189
      for node_uuid in node_uuids:
2190
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2191
                        self.cfg.GetNodeName(node_uuid))
2192

    
2193
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2194

    
2195
        msg = result.fail_msg
2196
        if msg or not result.payload:
2197
          if not msg:
2198
            msg = "disk not found"
2199
          if not self._CheckDisksActivated(self.instance):
2200
            extra_hint = ("\nDisks seem to be not properly activated. Try"
2201
                          " running activate-disks on the instance before"
2202
                          " using replace-disks.")
2203
          else:
2204
            extra_hint = ""
2205
          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2206
                                   (idx, self.cfg.GetNodeName(node_uuid), msg,
2207
                                    extra_hint))
2208

    
2209
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2210
    for idx, dev in enumerate(self.instance.disks):
2211
      if idx not in self.disks:
2212
        continue
2213

    
2214
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2215
                      (idx, self.cfg.GetNodeName(node_uuid)))
2216

    
2217
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2218
                                  on_primary, ldisk=ldisk):
2219
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2220
                                 " replace disks for instance %s" %
2221
                                 (self.cfg.GetNodeName(node_uuid),
2222
                                  self.instance.name))
2223

    
2224
  def _CreateNewStorage(self, node_uuid):
2225
    """Create new storage on the primary or secondary node.
2226

2227
    This is only used for same-node replaces, not for changing the
2228
    secondary node, hence we don't want to modify the existing disk.
2229

2230
    """
2231
    iv_names = {}
2232

    
2233
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2234
    for idx, dev in enumerate(disks):
2235
      if idx not in self.disks:
2236
        continue
2237

    
2238
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2239
                      self.cfg.GetNodeName(node_uuid), idx)
2240

    
2241
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2242
      names = _GenerateUniqueNames(self.lu, lv_names)
2243

    
2244
      (data_disk, meta_disk) = dev.children
2245
      vg_data = data_disk.logical_id[0]
2246
      lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2247
                             logical_id=(vg_data, names[0]),
2248
                             params=data_disk.params)
2249
      vg_meta = meta_disk.logical_id[0]
2250
      lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2251
                             size=constants.DRBD_META_SIZE,
2252
                             logical_id=(vg_meta, names[1]),
2253
                             params=meta_disk.params)
2254

    
2255
      new_lvs = [lv_data, lv_meta]
2256
      old_lvs = [child.Copy() for child in dev.children]
2257
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2258
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2259

    
2260
      # we pass force_create=True to force the LVM creation
2261
      for new_lv in new_lvs:
2262
        try:
2263
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2264
                               GetInstanceInfoText(self.instance), False,
2265
                               excl_stor)
2266
        except errors.DeviceCreationError, e:
2267
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2268

    
2269
    return iv_names
2270

    
2271
  def _CheckDevices(self, node_uuid, iv_names):
2272
    for name, (dev, _, _) in iv_names.iteritems():
2273
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2274

    
2275
      msg = result.fail_msg
2276
      if msg or not result.payload:
2277
        if not msg:
2278
          msg = "disk not found"
2279
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2280
                                 (name, msg))
2281

    
2282
      if result.payload.is_degraded:
2283
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2284

    
2285
  def _RemoveOldStorage(self, node_uuid, iv_names):
2286
    for name, (_, old_lvs, _) in iv_names.iteritems():
2287
      self.lu.LogInfo("Remove logical volumes for %s", name)
2288

    
2289
      for lv in old_lvs:
2290
        msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2291
                .fail_msg
2292
        if msg:
2293
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2294
                             hint="remove unused LVs manually")
2295

    
2296
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2297
    """Replace a disk on the primary or secondary for DRBD 8.
2298

2299
    The algorithm for replace is quite complicated:
2300

2301
      1. for each disk to be replaced:
2302

2303
        1. create new LVs on the target node with unique names
2304
        1. detach old LVs from the drbd device
2305
        1. rename old LVs to name_replaced.<time_t>
2306
        1. rename new LVs to old LVs
2307
        1. attach the new LVs (with the old names now) to the drbd device
2308

2309
      1. wait for sync across all devices
2310

2311
      1. for each modified disk:
2312

2313
        1. remove old LVs (which have the name name_replaces.<time_t>)
2314

2315
    Failures are not very well handled.
2316

2317
    """
2318
    steps_total = 6
2319

    
2320
    # Step: check device activation
2321
    self.lu.LogStep(1, steps_total, "Check device existence")
2322
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2323
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2324

    
2325
    # Step: check other node consistency
2326
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2327
    self._CheckDisksConsistency(
2328
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2329
      False)
2330

    
2331
    # Step: create new storage
2332
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2333
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2334

    
2335
    # Step: for each lv, detach+rename*2+attach
2336
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2337
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2338
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2339

    
2340
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2341
                                                     (dev, self.instance),
2342
                                                     (old_lvs, self.instance))
2343
      result.Raise("Can't detach drbd from local storage on node"
2344
                   " %s for device %s" %
2345
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2346
      #dev.children = []
2347
      #cfg.Update(instance)
2348

    
2349
      # ok, we created the new LVs, so now we know we have the needed
2350
      # storage; as such, we proceed on the target node to rename
2351
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2352
      # using the assumption that logical_id == unique_id on that node
2353

    
2354
      # FIXME(iustin): use a better name for the replaced LVs
2355
      temp_suffix = int(time.time())
2356
      ren_fn = lambda d, suff: (d.logical_id[0],
2357
                                d.logical_id[1] + "_replaced-%s" % suff)
2358

    
2359
      # Build the rename list based on what LVs exist on the node
2360
      rename_old_to_new = []
2361
      for to_ren in old_lvs:
2362
        result = self.rpc.call_blockdev_find(self.target_node_uuid,
2363
                                             (to_ren, self.instance))
2364
        if not result.fail_msg and result.payload:
2365
          # device exists
2366
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2367

    
2368
      self.lu.LogInfo("Renaming the old LVs on the target node")
2369
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2370
                                             rename_old_to_new)
2371
      result.Raise("Can't rename old LVs on node %s" %
2372
                   self.cfg.GetNodeName(self.target_node_uuid))
2373

    
2374
      # Now we rename the new LVs to the old LVs
2375
      self.lu.LogInfo("Renaming the new LVs on the target node")
2376
      rename_new_to_old = [(new, old.logical_id)
2377
                           for old, new in zip(old_lvs, new_lvs)]
2378
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2379
                                             rename_new_to_old)
2380
      result.Raise("Can't rename new LVs on node %s" %
2381
                   self.cfg.GetNodeName(self.target_node_uuid))
2382

    
2383
      # Intermediate steps of in memory modifications
2384
      for old, new in zip(old_lvs, new_lvs):
2385
        new.logical_id = old.logical_id
2386

    
2387
      # We need to modify old_lvs so that removal later removes the
2388
      # right LVs, not the newly added ones; note that old_lvs is a
2389
      # copy here
2390
      for disk in old_lvs:
2391
        disk.logical_id = ren_fn(disk, temp_suffix)
2392

    
2393
      # Now that the new lvs have the old name, we can add them to the device
2394
      self.lu.LogInfo("Adding new mirror component on %s",
2395
                      self.cfg.GetNodeName(self.target_node_uuid))
2396
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2397
                                                  (dev, self.instance),
2398
                                                  (new_lvs, self.instance))
2399
      msg = result.fail_msg
2400
      if msg:
2401
        for new_lv in new_lvs:
2402
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2403
                                               (new_lv, self.instance)).fail_msg
2404
          if msg2:
2405
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2406
                               hint=("cleanup manually the unused logical"
2407
                                     "volumes"))
2408
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2409

    
2410
    cstep = itertools.count(5)
2411

    
2412
    if self.early_release:
2413
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2414
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2415
      # TODO: Check if releasing locks early still makes sense
2416
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2417
    else:
2418
      # Release all resource locks except those used by the instance
2419
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2420
                   keep=self.node_secondary_ip.keys())
2421

    
2422
    # Release all node locks while waiting for sync
2423
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2424

    
2425
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2426
    # shutdown in the caller into consideration.
2427

    
2428
    # Wait for sync
2429
    # This can fail as the old devices are degraded and _WaitForSync
2430
    # does a combined result over all disks, so we don't check its return value
2431
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2432
    WaitForSync(self.lu, self.instance)
2433

    
2434
    # Check all devices manually
2435
    self._CheckDevices(self.instance.primary_node, iv_names)
2436

    
2437
    # Step: remove old storage
2438
    if not self.early_release:
2439
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2440
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2441

    
2442
  def _ExecDrbd8Secondary(self, feedback_fn):
2443
    """Replace the secondary node for DRBD 8.
2444

2445
    The algorithm for replace is quite complicated:
2446
      - for all disks of the instance:
2447
        - create new LVs on the new node with same names
2448
        - shutdown the drbd device on the old secondary
2449
        - disconnect the drbd network on the primary
2450
        - create the drbd device on the new secondary
2451
        - network attach the drbd on the primary, using an artifice:
2452
          the drbd code for Attach() will connect to the network if it
2453
          finds a device which is connected to the good local disks but
2454
          not network enabled
2455
      - wait for sync across all devices
2456
      - remove all disks from the old secondary
2457

2458
    Failures are not very well handled.
2459

2460
    """
2461
    steps_total = 6
2462

    
2463
    pnode = self.instance.primary_node
2464

    
2465
    # Step: check device activation
2466
    self.lu.LogStep(1, steps_total, "Check device existence")
2467
    self._CheckDisksExistence([self.instance.primary_node])
2468
    self._CheckVolumeGroup([self.instance.primary_node])
2469

    
2470
    # Step: check other node consistency
2471
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2472
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2473

    
2474
    # Step: create new storage
2475
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2476
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2477
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2478
                                                  self.new_node_uuid)
2479
    for idx, dev in enumerate(disks):
2480
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2481
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2482
      # we pass force_create=True to force LVM creation
2483
      for new_lv in dev.children:
2484
        try:
2485
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2486
                               new_lv, True, GetInstanceInfoText(self.instance),
2487
                               False, excl_stor)
2488
        except errors.DeviceCreationError, e:
2489
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2490

    
2491
    # Step 4: dbrd minors and drbd setups changes
2492
    # after this, we must manually remove the drbd minors on both the
2493
    # error and the success paths
2494
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2495
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2496
                                         for _ in self.instance.disks],
2497
                                        self.instance.uuid)
2498
    logging.debug("Allocated minors %r", minors)
2499

    
2500
    iv_names = {}
2501
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2502
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2503
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2504
      # create new devices on new_node; note that we create two IDs:
2505
      # one without port, so the drbd will be activated without
2506
      # networking information on the new node at this stage, and one
2507
      # with network, for the latter activation in step 4
2508
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2509
      if self.instance.primary_node == o_node1:
2510
        p_minor = o_minor1
2511
      else:
2512
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2513
        p_minor = o_minor2
2514

    
2515
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2516
                      p_minor, new_minor, o_secret)
2517
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2518
                    p_minor, new_minor, o_secret)
2519

    
2520
      iv_names[idx] = (dev, dev.children, new_net_id)
2521
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2522
                    new_net_id)
2523
      new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2524
                              logical_id=new_alone_id,
2525
                              children=dev.children,
2526
                              size=dev.size,
2527
                              params={})
2528
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2529
                                            self.cfg)
2530
      try:
2531
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2532
                             anno_new_drbd,
2533
                             GetInstanceInfoText(self.instance), False,
2534
                             excl_stor)
2535
      except errors.GenericError:
2536
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2537
        raise
2538

    
2539
    # We have new devices, shutdown the drbd on the old secondary
2540
    for idx, dev in enumerate(self.instance.disks):
2541
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2542
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2543
                                            (dev, self.instance)).fail_msg
2544
      if msg:
2545
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2546
                           "node: %s" % (idx, msg),
2547
                           hint=("Please cleanup this device manually as"
2548
                                 " soon as possible"))
2549

    
2550
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2551
    result = self.rpc.call_drbd_disconnect_net(
2552
               [pnode], (self.instance.disks, self.instance))[pnode]
2553

    
2554
    msg = result.fail_msg
2555
    if msg:
2556
      # detaches didn't succeed (unlikely)
2557
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2558
      raise errors.OpExecError("Can't detach the disks from the network on"
2559
                               " old node: %s" % (msg,))
2560

    
2561
    # if we managed to detach at least one, we update all the disks of
2562
    # the instance to point to the new secondary
2563
    self.lu.LogInfo("Updating instance configuration")
2564
    for dev, _, new_logical_id in iv_names.itervalues():
2565
      dev.logical_id = new_logical_id
2566

    
2567
    self.cfg.Update(self.instance, feedback_fn)
2568

    
2569
    # Release all node locks (the configuration has been updated)
2570
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2571

    
2572
    # and now perform the drbd attach
2573
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2574
                    " (standalone => connected)")
2575
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2576
                                            self.new_node_uuid],
2577
                                           (self.instance.disks, self.instance),
2578
                                           self.instance.name,
2579
                                           False)
2580
    for to_node, to_result in result.items():
2581
      msg = to_result.fail_msg
2582
      if msg:
2583
        raise errors.OpExecError(
2584
          "Can't attach drbd disks on node %s: %s (please do a gnt-instance "
2585
          "info to see the status of disks)" %
2586
          (self.cfg.GetNodeName(to_node), msg))
2587

    
2588
    cstep = itertools.count(5)
2589

    
2590
    if self.early_release:
2591
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2592
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2593
      # TODO: Check if releasing locks early still makes sense
2594
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2595
    else:
2596
      # Release all resource locks except those used by the instance
2597
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2598
                   keep=self.node_secondary_ip.keys())
2599

    
2600
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2601
    # shutdown in the caller into consideration.
2602

    
2603
    # Wait for sync
2604
    # This can fail as the old devices are degraded and _WaitForSync
2605
    # does a combined result over all disks, so we don't check its return value
2606
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2607
    WaitForSync(self.lu, self.instance)
2608

    
2609
    # Check all devices manually
2610
    self._CheckDevices(self.instance.primary_node, iv_names)
2611

    
2612
    # Step: remove old storage
2613
    if not self.early_release:
2614
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2615
      self._RemoveOldStorage(self.target_node_uuid, iv_names)