Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ d90f0cb4

History | View | Annotate | Download (99.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node_uuid: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node_uuid)
93
  result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device,
98
                                             lu.cfg.GetNodeName(node_uuid),
99
                                             instance.name))
100
  if device.physical_id is None:
101
    device.physical_id = result.payload
102

    
103

    
104
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
105
                         info, force_open, excl_stor):
106
  """Create a tree of block devices on a given node.
107

108
  If this device type has to be created on secondaries, create it and
109
  all its children.
110

111
  If not, just recurse to children keeping the same 'force' value.
112

113
  @attention: The device has to be annotated already.
114

115
  @param lu: the lu on whose behalf we execute
116
  @param node_uuid: the node on which to create the device
117
  @type instance: L{objects.Instance}
118
  @param instance: the instance which owns the device
119
  @type device: L{objects.Disk}
120
  @param device: the device to create
121
  @type force_create: boolean
122
  @param force_create: whether to force creation of this device; this
123
      will be change to True whenever we find a device which has
124
      CreateOnSecondary() attribute
125
  @param info: the extra 'metadata' we should attach to the device
126
      (this will be represented as a LVM tag)
127
  @type force_open: boolean
128
  @param force_open: this parameter will be passes to the
129
      L{backend.BlockdevCreate} function where it specifies
130
      whether we run on primary or not, and it affects both
131
      the child assembly and the device own Open() execution
132
  @type excl_stor: boolean
133
  @param excl_stor: Whether exclusive_storage is active for the node
134

135
  @return: list of created devices
136
  """
137
  created_devices = []
138
  try:
139
    if device.CreateOnSecondary():
140
      force_create = True
141

    
142
    if device.children:
143
      for child in device.children:
144
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
145
                                    force_create, info, force_open, excl_stor)
146
        created_devices.extend(devs)
147

    
148
    if not force_create:
149
      return created_devices
150

    
151
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
152
                         excl_stor)
153
    # The device has been completely created, so there is no point in keeping
154
    # its subdevices in the list. We just add the device itself instead.
155
    created_devices = [(node_uuid, device)]
156
    return created_devices
157

    
158
  except errors.DeviceCreationError, e:
159
    e.created_devices.extend(created_devices)
160
    raise e
161
  except errors.OpExecError, e:
162
    raise errors.DeviceCreationError(str(e), created_devices)
163

    
164

    
165
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
166
  """Whether exclusive_storage is in effect for the given node.
167

168
  @type cfg: L{config.ConfigWriter}
169
  @param cfg: The cluster configuration
170
  @type node_uuid: string
171
  @param node_uuid: The node UUID
172
  @rtype: bool
173
  @return: The effective value of exclusive_storage
174
  @raise errors.OpPrereqError: if no node exists with the given name
175

176
  """
177
  ni = cfg.GetNodeInfo(node_uuid)
178
  if ni is None:
179
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
180
                               errors.ECODE_NOENT)
181
  return IsExclusiveStorageEnabledNode(cfg, ni)
182

    
183

    
184
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
185
                    force_open):
186
  """Wrapper around L{_CreateBlockDevInner}.
187

188
  This method annotates the root device first.
189

190
  """
191
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
193
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
194
                              force_open, excl_stor)
195

    
196

    
197
def _UndoCreateDisks(lu, disks_created):
198
  """Undo the work performed by L{CreateDisks}.
199

200
  This function is called in case of an error to undo the work of
201
  L{CreateDisks}.
202

203
  @type lu: L{LogicalUnit}
204
  @param lu: the logical unit on whose behalf we execute
205
  @param disks_created: the result returned by L{CreateDisks}
206

207
  """
208
  for (node_uuid, disk) in disks_created:
209
    lu.cfg.SetDiskID(disk, node_uuid)
210
    result = lu.rpc.call_blockdev_remove(node_uuid, disk)
211
    result.Warn("Failed to remove newly-created disk %s on node %s" %
212
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213

    
214

    
215
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
216
  """Create all disks for an instance.
217

218
  This abstracts away some work from AddInstance.
219

220
  @type lu: L{LogicalUnit}
221
  @param lu: the logical unit on whose behalf we execute
222
  @type instance: L{objects.Instance}
223
  @param instance: the instance whose disks we should create
224
  @type to_skip: list
225
  @param to_skip: list of indices to skip
226
  @type target_node_uuid: string
227
  @param target_node_uuid: if passed, overrides the target node for creation
228
  @type disks: list of {objects.Disk}
229
  @param disks: the disks to create; if not specified, all the disks of the
230
      instance are created
231
  @return: information about the created disks, to be used to call
232
      L{_UndoCreateDisks}
233
  @raise errors.OpPrereqError: in case of error
234

235
  """
236
  info = GetInstanceInfoText(instance)
237
  if target_node_uuid is None:
238
    pnode_uuid = instance.primary_node
239
    all_node_uuids = instance.all_nodes
240
  else:
241
    pnode_uuid = target_node_uuid
242
    all_node_uuids = [pnode_uuid]
243

    
244
  if disks is None:
245
    disks = instance.disks
246

    
247
  if instance.disk_template in constants.DTS_FILEBASED:
248
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
249
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
250

    
251
    result.Raise("Failed to create directory '%s' on"
252
                 " node %s" % (file_storage_dir,
253
                               lu.cfg.GetNodeName(pnode_uuid)))
254

    
255
  disks_created = []
256
  for idx, device in enumerate(disks):
257
    if to_skip and idx in to_skip:
258
      continue
259
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
260
    for node_uuid in all_node_uuids:
261
      f_create = node_uuid == pnode_uuid
262
      try:
263
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
264
                        f_create)
265
        disks_created.append((node_uuid, device))
266
      except errors.DeviceCreationError, e:
267
        logging.warning("Creating disk %s for instance '%s' failed",
268
                        idx, instance.name)
269
        disks_created.extend(e.created_devices)
270
        _UndoCreateDisks(lu, disks_created)
271
        raise errors.OpExecError(e.message)
272
  return disks_created
273

    
274

    
275
def ComputeDiskSizePerVG(disk_template, disks):
276
  """Compute disk size requirements in the volume group
277

278
  """
279
  def _compute(disks, payload):
280
    """Universal algorithm.
281

282
    """
283
    vgs = {}
284
    for disk in disks:
285
      vgs[disk[constants.IDISK_VG]] = \
286
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
287

    
288
    return vgs
289

    
290
  # Required free disk space as a function of disk and swap space
291
  req_size_dict = {
292
    constants.DT_DISKLESS: {},
293
    constants.DT_PLAIN: _compute(disks, 0),
294
    # 128 MB are added for drbd metadata for each disk
295
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
296
    constants.DT_FILE: {},
297
    constants.DT_SHARED_FILE: {},
298
    }
299

    
300
  if disk_template not in req_size_dict:
301
    raise errors.ProgrammerError("Disk template '%s' size requirement"
302
                                 " is unknown" % disk_template)
303

    
304
  return req_size_dict[disk_template]
305

    
306

    
307
def ComputeDisks(op, default_vg):
308
  """Computes the instance disks.
309

310
  @param op: The instance opcode
311
  @param default_vg: The default_vg to assume
312

313
  @return: The computed disks
314

315
  """
316
  disks = []
317
  for disk in op.disks:
318
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
319
    if mode not in constants.DISK_ACCESS_SET:
320
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
321
                                 mode, errors.ECODE_INVAL)
322
    size = disk.get(constants.IDISK_SIZE, None)
323
    if size is None:
324
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
325
    try:
326
      size = int(size)
327
    except (TypeError, ValueError):
328
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
329
                                 errors.ECODE_INVAL)
330

    
331
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
332
    if ext_provider and op.disk_template != constants.DT_EXT:
333
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
334
                                 " disk template, not %s" %
335
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
336
                                  op.disk_template), errors.ECODE_INVAL)
337

    
338
    data_vg = disk.get(constants.IDISK_VG, default_vg)
339
    name = disk.get(constants.IDISK_NAME, None)
340
    if name is not None and name.lower() == constants.VALUE_NONE:
341
      name = None
342
    new_disk = {
343
      constants.IDISK_SIZE: size,
344
      constants.IDISK_MODE: mode,
345
      constants.IDISK_VG: data_vg,
346
      constants.IDISK_NAME: name,
347
      }
348

    
349
    for key in [
350
      constants.IDISK_METAVG,
351
      constants.IDISK_ADOPT,
352
      constants.IDISK_SPINDLES,
353
      ]:
354
      if key in disk:
355
        new_disk[key] = disk[key]
356

    
357
    # For extstorage, demand the `provider' option and add any
358
    # additional parameters (ext-params) to the dict
359
    if op.disk_template == constants.DT_EXT:
360
      if ext_provider:
361
        new_disk[constants.IDISK_PROVIDER] = ext_provider
362
        for key in disk:
363
          if key not in constants.IDISK_PARAMS:
364
            new_disk[key] = disk[key]
365
      else:
366
        raise errors.OpPrereqError("Missing provider for template '%s'" %
367
                                   constants.DT_EXT, errors.ECODE_INVAL)
368

    
369
    disks.append(new_disk)
370

    
371
  return disks
372

    
373

    
374
def CheckRADOSFreeSpace():
375
  """Compute disk size requirements inside the RADOS cluster.
376

377
  """
378
  # For the RADOS cluster we assume there is always enough space.
379
  pass
380

    
381

    
382
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
383
                         iv_name, p_minor, s_minor):
384
  """Generate a drbd8 device complete with its children.
385

386
  """
387
  assert len(vgnames) == len(names) == 2
388
  port = lu.cfg.AllocatePort()
389
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
390

    
391
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
392
                          logical_id=(vgnames[0], names[0]),
393
                          params={})
394
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
395
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
396
                          size=constants.DRBD_META_SIZE,
397
                          logical_id=(vgnames[1], names[1]),
398
                          params={})
399
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
400
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
401
                          logical_id=(primary_uuid, secondary_uuid, port,
402
                                      p_minor, s_minor,
403
                                      shared_secret),
404
                          children=[dev_data, dev_meta],
405
                          iv_name=iv_name, params={})
406
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
407
  return drbd_dev
408

    
409

    
410
def GenerateDiskTemplate(
411
  lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
412
  disk_info, file_storage_dir, file_driver, base_index,
413
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
414
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
415
  """Generate the entire disk layout for a given template type.
416

417
  """
418
  vgname = lu.cfg.GetVGName()
419
  disk_count = len(disk_info)
420
  disks = []
421

    
422
  if template_name == constants.DT_DISKLESS:
423
    pass
424
  elif template_name == constants.DT_DRBD8:
425
    if len(secondary_node_uuids) != 1:
426
      raise errors.ProgrammerError("Wrong template configuration")
427
    remote_node_uuid = secondary_node_uuids[0]
428
    minors = lu.cfg.AllocateDRBDMinor(
429
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
430

    
431
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
432
                                                       full_disk_params)
433
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
434

    
435
    names = []
436
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
437
                                               for i in range(disk_count)]):
438
      names.append(lv_prefix + "_data")
439
      names.append(lv_prefix + "_meta")
440
    for idx, disk in enumerate(disk_info):
441
      disk_index = idx + base_index
442
      data_vg = disk.get(constants.IDISK_VG, vgname)
443
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
444
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
445
                                      disk[constants.IDISK_SIZE],
446
                                      [data_vg, meta_vg],
447
                                      names[idx * 2:idx * 2 + 2],
448
                                      "disk/%d" % disk_index,
449
                                      minors[idx * 2], minors[idx * 2 + 1])
450
      disk_dev.mode = disk[constants.IDISK_MODE]
451
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
452
      disks.append(disk_dev)
453
  else:
454
    if secondary_node_uuids:
455
      raise errors.ProgrammerError("Wrong template configuration")
456

    
457
    if template_name == constants.DT_FILE:
458
      _req_file_storage()
459
    elif template_name == constants.DT_SHARED_FILE:
460
      _req_shr_file_storage()
461

    
462
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
463
    if name_prefix is None:
464
      names = None
465
    else:
466
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
467
                                        (name_prefix, base_index + i)
468
                                        for i in range(disk_count)])
469

    
470
    if template_name == constants.DT_PLAIN:
471

    
472
      def logical_id_fn(idx, _, disk):
473
        vg = disk.get(constants.IDISK_VG, vgname)
474
        return (vg, names[idx])
475

    
476
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
477
      logical_id_fn = \
478
        lambda _, disk_index, disk: (file_driver,
479
                                     "%s/disk%d" % (file_storage_dir,
480
                                                    disk_index))
481
    elif template_name == constants.DT_BLOCK:
482
      logical_id_fn = \
483
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
484
                                       disk[constants.IDISK_ADOPT])
485
    elif template_name == constants.DT_RBD:
486
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
487
    elif template_name == constants.DT_EXT:
488
      def logical_id_fn(idx, _, disk):
489
        provider = disk.get(constants.IDISK_PROVIDER, None)
490
        if provider is None:
491
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
492
                                       " not found", constants.DT_EXT,
493
                                       constants.IDISK_PROVIDER)
494
        return (provider, names[idx])
495
    else:
496
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
497

    
498
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
499

    
500
    for idx, disk in enumerate(disk_info):
501
      params = {}
502
      # Only for the Ext template add disk_info to params
503
      if template_name == constants.DT_EXT:
504
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
505
        for key in disk:
506
          if key not in constants.IDISK_PARAMS:
507
            params[key] = disk[key]
508
      disk_index = idx + base_index
509
      size = disk[constants.IDISK_SIZE]
510
      feedback_fn("* disk %s, size %s" %
511
                  (disk_index, utils.FormatUnit(size, "h")))
512
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
513
                              logical_id=logical_id_fn(idx, disk_index, disk),
514
                              iv_name="disk/%d" % disk_index,
515
                              mode=disk[constants.IDISK_MODE],
516
                              params=params,
517
                              spindles=disk.get(constants.IDISK_SPINDLES))
518
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
519
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
520
      disks.append(disk_dev)
521

    
522
  return disks
523

    
524

    
525
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
526
  """Check the presence of the spindle options with exclusive_storage.
527

528
  @type diskdict: dict
529
  @param diskdict: disk parameters
530
  @type es_flag: bool
531
  @param es_flag: the effective value of the exlusive_storage flag
532
  @type required: bool
533
  @param required: whether spindles are required or just optional
534
  @raise errors.OpPrereqError when spindles are given and they should not
535

536
  """
537
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
538
      diskdict[constants.IDISK_SPINDLES] is not None):
539
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
540
                               " when exclusive storage is not active",
541
                               errors.ECODE_INVAL)
542
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
543
                                diskdict[constants.IDISK_SPINDLES] is None)):
544
    raise errors.OpPrereqError("You must specify spindles in instance disks"
545
                               " when exclusive storage is active",
546
                               errors.ECODE_INVAL)
547

    
548

    
549
class LUInstanceRecreateDisks(LogicalUnit):
550
  """Recreate an instance's missing disks.
551

552
  """
553
  HPATH = "instance-recreate-disks"
554
  HTYPE = constants.HTYPE_INSTANCE
555
  REQ_BGL = False
556

    
557
  _MODIFYABLE = compat.UniqueFrozenset([
558
    constants.IDISK_SIZE,
559
    constants.IDISK_MODE,
560
    constants.IDISK_SPINDLES,
561
    ])
562

    
563
  # New or changed disk parameters may have different semantics
564
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
565
    constants.IDISK_ADOPT,
566

    
567
    # TODO: Implement support changing VG while recreating
568
    constants.IDISK_VG,
569
    constants.IDISK_METAVG,
570
    constants.IDISK_PROVIDER,
571
    constants.IDISK_NAME,
572
    ]))
573

    
574
  def _RunAllocator(self):
575
    """Run the allocator based on input opcode.
576

577
    """
578
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
579

    
580
    # FIXME
581
    # The allocator should actually run in "relocate" mode, but current
582
    # allocators don't support relocating all the nodes of an instance at
583
    # the same time. As a workaround we use "allocate" mode, but this is
584
    # suboptimal for two reasons:
585
    # - The instance name passed to the allocator is present in the list of
586
    #   existing instances, so there could be a conflict within the
587
    #   internal structures of the allocator. This doesn't happen with the
588
    #   current allocators, but it's a liability.
589
    # - The allocator counts the resources used by the instance twice: once
590
    #   because the instance exists already, and once because it tries to
591
    #   allocate a new instance.
592
    # The allocator could choose some of the nodes on which the instance is
593
    # running, but that's not a problem. If the instance nodes are broken,
594
    # they should be already be marked as drained or offline, and hence
595
    # skipped by the allocator. If instance disks have been lost for other
596
    # reasons, then recreating the disks on the same nodes should be fine.
597
    disk_template = self.instance.disk_template
598
    spindle_use = be_full[constants.BE_SPINDLE_USE]
599
    disks = [{
600
      constants.IDISK_SIZE: d.size,
601
      constants.IDISK_MODE: d.mode,
602
      constants.IDISK_SPINDLES: d.spindles,
603
      } for d in self.instance.disks]
604
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
605
                                        disk_template=disk_template,
606
                                        tags=list(self.instance.GetTags()),
607
                                        os=self.instance.os,
608
                                        nics=[{}],
609
                                        vcpus=be_full[constants.BE_VCPUS],
610
                                        memory=be_full[constants.BE_MAXMEM],
611
                                        spindle_use=spindle_use,
612
                                        disks=disks,
613
                                        hypervisor=self.instance.hypervisor,
614
                                        node_whitelist=None)
615
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
616

    
617
    ial.Run(self.op.iallocator)
618

    
619
    assert req.RequiredNodes() == len(self.instance.all_nodes)
620

    
621
    if not ial.success:
622
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
623
                                 " %s" % (self.op.iallocator, ial.info),
624
                                 errors.ECODE_NORES)
625

    
626
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
627
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
628
                 self.op.instance_name, self.op.iallocator,
629
                 utils.CommaJoin(self.op.nodes))
630

    
631
  def CheckArguments(self):
632
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
633
      # Normalize and convert deprecated list of disk indices
634
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
635

    
636
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
637
    if duplicates:
638
      raise errors.OpPrereqError("Some disks have been specified more than"
639
                                 " once: %s" % utils.CommaJoin(duplicates),
640
                                 errors.ECODE_INVAL)
641

    
642
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
643
    # when neither iallocator nor nodes are specified
644
    if self.op.iallocator or self.op.nodes:
645
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
646

    
647
    for (idx, params) in self.op.disks:
648
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
649
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
650
      if unsupported:
651
        raise errors.OpPrereqError("Parameters for disk %s try to change"
652
                                   " unmodifyable parameter(s): %s" %
653
                                   (idx, utils.CommaJoin(unsupported)),
654
                                   errors.ECODE_INVAL)
655

    
656
  def ExpandNames(self):
657
    self._ExpandAndLockInstance()
658
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
659

    
660
    if self.op.nodes:
661
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
662
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
663
    else:
664
      self.needed_locks[locking.LEVEL_NODE] = []
665
      if self.op.iallocator:
666
        # iallocator will select a new node in the same group
667
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
668
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
669

    
670
    self.needed_locks[locking.LEVEL_NODE_RES] = []
671

    
672
  def DeclareLocks(self, level):
673
    if level == locking.LEVEL_NODEGROUP:
674
      assert self.op.iallocator is not None
675
      assert not self.op.nodes
676
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
677
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
678
      # Lock the primary group used by the instance optimistically; this
679
      # requires going via the node before it's locked, requiring
680
      # verification later on
681
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
682
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
683

    
684
    elif level == locking.LEVEL_NODE:
685
      # If an allocator is used, then we lock all the nodes in the current
686
      # instance group, as we don't know yet which ones will be selected;
687
      # if we replace the nodes without using an allocator, locks are
688
      # already declared in ExpandNames; otherwise, we need to lock all the
689
      # instance nodes for disk re-creation
690
      if self.op.iallocator:
691
        assert not self.op.nodes
692
        assert not self.needed_locks[locking.LEVEL_NODE]
693
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
694

    
695
        # Lock member nodes of the group of the primary node
696
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
697
          self.needed_locks[locking.LEVEL_NODE].extend(
698
            self.cfg.GetNodeGroup(group_uuid).members)
699

    
700
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
701
      elif not self.op.nodes:
702
        self._LockInstancesNodes(primary_only=False)
703
    elif level == locking.LEVEL_NODE_RES:
704
      # Copy node locks
705
      self.needed_locks[locking.LEVEL_NODE_RES] = \
706
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
707

    
708
  def BuildHooksEnv(self):
709
    """Build hooks env.
710

711
    This runs on master, primary and secondary nodes of the instance.
712

713
    """
714
    return BuildInstanceHookEnvByObject(self, self.instance)
715

    
716
  def BuildHooksNodes(self):
717
    """Build hooks nodes.
718

719
    """
720
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
721
    return (nl, nl)
722

    
723
  def CheckPrereq(self):
724
    """Check prerequisites.
725

726
    This checks that the instance is in the cluster and is not running.
727

728
    """
729
    instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
730
    assert instance is not None, \
731
      "Cannot retrieve locked instance %s" % self.op.instance_name
732
    if self.op.node_uuids:
733
      if len(self.op.node_uuids) != len(instance.all_nodes):
734
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
735
                                   " %d replacement nodes were specified" %
736
                                   (instance.name, len(instance.all_nodes),
737
                                    len(self.op.node_uuids)),
738
                                   errors.ECODE_INVAL)
739
      assert instance.disk_template != constants.DT_DRBD8 or \
740
             len(self.op.node_uuids) == 2
741
      assert instance.disk_template != constants.DT_PLAIN or \
742
             len(self.op.node_uuids) == 1
743
      primary_node = self.op.node_uuids[0]
744
    else:
745
      primary_node = instance.primary_node
746
    if not self.op.iallocator:
747
      CheckNodeOnline(self, primary_node)
748

    
749
    if instance.disk_template == constants.DT_DISKLESS:
750
      raise errors.OpPrereqError("Instance '%s' has no disks" %
751
                                 self.op.instance_name, errors.ECODE_INVAL)
752

    
753
    # Verify if node group locks are still correct
754
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
755
    if owned_groups:
756
      # Node group locks are acquired only for the primary node (and only
757
      # when the allocator is used)
758
      CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
759
                              primary_only=True)
760

    
761
    # if we replace nodes *and* the old primary is offline, we don't
762
    # check the instance state
763
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
764
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
765
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
766
                         msg="cannot recreate disks")
767

    
768
    if self.op.disks:
769
      self.disks = dict(self.op.disks)
770
    else:
771
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
772

    
773
    maxidx = max(self.disks.keys())
774
    if maxidx >= len(instance.disks):
775
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
776
                                 errors.ECODE_INVAL)
777

    
778
    if ((self.op.node_uuids or self.op.iallocator) and
779
         sorted(self.disks.keys()) != range(len(instance.disks))):
780
      raise errors.OpPrereqError("Can't recreate disks partially and"
781
                                 " change the nodes at the same time",
782
                                 errors.ECODE_INVAL)
783

    
784
    self.instance = instance
785

    
786
    if self.op.iallocator:
787
      self._RunAllocator()
788
      # Release unneeded node and node resource locks
789
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
790
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
791
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
792

    
793
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
794

    
795
    if self.op.node_uuids:
796
      node_uuids = self.op.node_uuids
797
    else:
798
      node_uuids = instance.all_nodes
799
    excl_stor = compat.any(
800
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
801
      )
802
    for new_params in self.disks.values():
803
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
804

    
805
  def Exec(self, feedback_fn):
806
    """Recreate the disks.
807

808
    """
809
    assert (self.owned_locks(locking.LEVEL_NODE) ==
810
            self.owned_locks(locking.LEVEL_NODE_RES))
811

    
812
    to_skip = []
813
    mods = [] # keeps track of needed changes
814

    
815
    for idx, disk in enumerate(self.instance.disks):
816
      try:
817
        changes = self.disks[idx]
818
      except KeyError:
819
        # Disk should not be recreated
820
        to_skip.append(idx)
821
        continue
822

    
823
      # update secondaries for disks, if needed
824
      if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
825
        # need to update the nodes and minors
826
        assert len(self.op.node_uuids) == 2
827
        assert len(disk.logical_id) == 6 # otherwise disk internals
828
                                         # have changed
829
        (_, _, old_port, _, _, old_secret) = disk.logical_id
830
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
831
                                                self.instance.uuid)
832
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
833
                  new_minors[0], new_minors[1], old_secret)
834
        assert len(disk.logical_id) == len(new_id)
835
      else:
836
        new_id = None
837

    
838
      mods.append((idx, new_id, changes))
839

    
840
    # now that we have passed all asserts above, we can apply the mods
841
    # in a single run (to avoid partial changes)
842
    for idx, new_id, changes in mods:
843
      disk = self.instance.disks[idx]
844
      if new_id is not None:
845
        assert disk.dev_type == constants.LD_DRBD8
846
        disk.logical_id = new_id
847
      if changes:
848
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
849
                    mode=changes.get(constants.IDISK_MODE, None),
850
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
851

    
852
    # change primary node, if needed
853
    if self.op.node_uuids:
854
      self.instance.primary_node = self.op.node_uuids[0]
855
      self.LogWarning("Changing the instance's nodes, you will have to"
856
                      " remove any disks left on the older nodes manually")
857

    
858
    if self.op.node_uuids:
859
      self.cfg.Update(self.instance, feedback_fn)
860

    
861
    # All touched nodes must be locked
862
    mylocks = self.owned_locks(locking.LEVEL_NODE)
863
    assert mylocks.issuperset(frozenset(self.instance.all_nodes))
864
    new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
865

    
866
    # TODO: Release node locks before wiping, or explain why it's not possible
867
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
868
      wipedisks = [(idx, disk, 0)
869
                   for (idx, disk) in enumerate(self.instance.disks)
870
                   if idx not in to_skip]
871
      WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
872
                         cleanup=new_disks)
873

    
874

    
875
def _PerformNodeInfoCall(lu, node_uuids, vg):
876
  """Prepares the input and performs a node info call.
877

878
  @type lu: C{LogicalUnit}
879
  @param lu: a logical unit from which we get configuration data
880
  @type node_uuids: list of string
881
  @param node_uuids: list of node UUIDs to perform the call for
882
  @type vg: string
883
  @param vg: the volume group's name
884

885
  """
886
  lvm_storage_units = [(constants.ST_LVM_VG, vg)]
887
  storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
888
                                                  node_uuids)
889
  hvname = lu.cfg.GetHypervisorType()
890
  hvparams = lu.cfg.GetClusterInfo().hvparams
891
  nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
892
                                   [(hvname, hvparams[hvname])])
893
  return nodeinfo
894

    
895

    
896
def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
897
  """Checks the vg capacity for a given node.
898

899
  @type node_info: tuple (_, list of dicts, _)
900
  @param node_info: the result of the node info call for one node
901
  @type node_name: string
902
  @param node_name: the name of the node
903
  @type vg: string
904
  @param vg: volume group name
905
  @type requested: int
906
  @param requested: the amount of disk in MiB to check for
907
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
908
      or we cannot check the node
909

910
  """
911
  (_, space_info, _) = node_info
912
  lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
913
      space_info, constants.ST_LVM_VG)
914
  if not lvm_vg_info:
915
    raise errors.OpPrereqError("Can't retrieve storage information for LVM")
916
  vg_free = lvm_vg_info.get("storage_free", None)
917
  if not isinstance(vg_free, int):
918
    raise errors.OpPrereqError("Can't compute free disk space on node"
919
                               " %s for vg %s, result was '%s'" %
920
                               (node_name, vg, vg_free), errors.ECODE_ENVIRON)
921
  if requested > vg_free:
922
    raise errors.OpPrereqError("Not enough disk space on target node %s"
923
                               " vg %s: required %d MiB, available %d MiB" %
924
                               (node_name, vg, requested, vg_free),
925
                               errors.ECODE_NORES)
926

    
927

    
928
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
929
  """Checks if nodes have enough free disk space in the specified VG.
930

931
  This function checks if all given nodes have the needed amount of
932
  free disk. In case any node has less disk or we cannot get the
933
  information from the node, this function raises an OpPrereqError
934
  exception.
935

936
  @type lu: C{LogicalUnit}
937
  @param lu: a logical unit from which we get configuration data
938
  @type node_uuids: C{list}
939
  @param node_uuids: the list of node UUIDs to check
940
  @type vg: C{str}
941
  @param vg: the volume group to check
942
  @type requested: C{int}
943
  @param requested: the amount of disk in MiB to check for
944
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
945
      or we cannot check the node
946

947
  """
948
  nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
949
  for node in node_uuids:
950
    node_name = lu.cfg.GetNodeName(node)
951
    info = nodeinfo[node]
952
    info.Raise("Cannot get current information from node %s" % node_name,
953
               prereq=True, ecode=errors.ECODE_ENVIRON)
954
    _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
955

    
956

    
957
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
958
  """Checks if nodes have enough free disk space in all the VGs.
959

960
  This function checks if all given nodes have the needed amount of
961
  free disk. In case any node has less disk or we cannot get the
962
  information from the node, this function raises an OpPrereqError
963
  exception.
964

965
  @type lu: C{LogicalUnit}
966
  @param lu: a logical unit from which we get configuration data
967
  @type node_uuids: C{list}
968
  @param node_uuids: the list of node UUIDs to check
969
  @type req_sizes: C{dict}
970
  @param req_sizes: the hash of vg and corresponding amount of disk in
971
      MiB to check for
972
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
973
      or we cannot check the node
974

975
  """
976
  for vg, req_size in req_sizes.items():
977
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
978

    
979

    
980
def _DiskSizeInBytesToMebibytes(lu, size):
981
  """Converts a disk size in bytes to mebibytes.
982

983
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
984

985
  """
986
  (mib, remainder) = divmod(size, 1024 * 1024)
987

    
988
  if remainder != 0:
989
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
990
                  " to not overwrite existing data (%s bytes will not be"
991
                  " wiped)", (1024 * 1024) - remainder)
992
    mib += 1
993

    
994
  return mib
995

    
996

    
997
def _CalcEta(time_taken, written, total_size):
998
  """Calculates the ETA based on size written and total size.
999

1000
  @param time_taken: The time taken so far
1001
  @param written: amount written so far
1002
  @param total_size: The total size of data to be written
1003
  @return: The remaining time in seconds
1004

1005
  """
1006
  avg_time = time_taken / float(written)
1007
  return (total_size - written) * avg_time
1008

    
1009

    
1010
def WipeDisks(lu, instance, disks=None):
1011
  """Wipes instance disks.
1012

1013
  @type lu: L{LogicalUnit}
1014
  @param lu: the logical unit on whose behalf we execute
1015
  @type instance: L{objects.Instance}
1016
  @param instance: the instance whose disks we should create
1017
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1018
  @param disks: Disk details; tuple contains disk index, disk object and the
1019
    start offset
1020

1021
  """
1022
  node_uuid = instance.primary_node
1023
  node_name = lu.cfg.GetNodeName(node_uuid)
1024

    
1025
  if disks is None:
1026
    disks = [(idx, disk, 0)
1027
             for (idx, disk) in enumerate(instance.disks)]
1028

    
1029
  for (_, device, _) in disks:
1030
    lu.cfg.SetDiskID(device, node_uuid)
1031

    
1032
  logging.info("Pausing synchronization of disks of instance '%s'",
1033
               instance.name)
1034
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1035
                                                  (map(compat.snd, disks),
1036
                                                   instance),
1037
                                                  True)
1038
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1039

    
1040
  for idx, success in enumerate(result.payload):
1041
    if not success:
1042
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1043
                   " failed", idx, instance.name)
1044

    
1045
  try:
1046
    for (idx, device, offset) in disks:
1047
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1048
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1049
      wipe_chunk_size = \
1050
        int(min(constants.MAX_WIPE_CHUNK,
1051
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1052

    
1053
      size = device.size
1054
      last_output = 0
1055
      start_time = time.time()
1056

    
1057
      if offset == 0:
1058
        info_text = ""
1059
      else:
1060
        info_text = (" (from %s to %s)" %
1061
                     (utils.FormatUnit(offset, "h"),
1062
                      utils.FormatUnit(size, "h")))
1063

    
1064
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1065

    
1066
      logging.info("Wiping disk %d for instance %s on node %s using"
1067
                   " chunk size %s", idx, instance.name, node_name,
1068
                   wipe_chunk_size)
1069

    
1070
      while offset < size:
1071
        wipe_size = min(wipe_chunk_size, size - offset)
1072

    
1073
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1074
                      idx, offset, wipe_size)
1075

    
1076
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1077
                                           offset, wipe_size)
1078
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1079
                     (idx, offset, wipe_size))
1080

    
1081
        now = time.time()
1082
        offset += wipe_size
1083
        if now - last_output >= 60:
1084
          eta = _CalcEta(now - start_time, offset, size)
1085
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1086
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1087
          last_output = now
1088
  finally:
1089
    logging.info("Resuming synchronization of disks for instance '%s'",
1090
                 instance.name)
1091

    
1092
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1093
                                                    (map(compat.snd, disks),
1094
                                                     instance),
1095
                                                    False)
1096

    
1097
    if result.fail_msg:
1098
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1099
                    node_name, result.fail_msg)
1100
    else:
1101
      for idx, success in enumerate(result.payload):
1102
        if not success:
1103
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1104
                        " failed", idx, instance.name)
1105

    
1106

    
1107
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1108
  """Wrapper for L{WipeDisks} that handles errors.
1109

1110
  @type lu: L{LogicalUnit}
1111
  @param lu: the logical unit on whose behalf we execute
1112
  @type instance: L{objects.Instance}
1113
  @param instance: the instance whose disks we should wipe
1114
  @param disks: see L{WipeDisks}
1115
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1116
      case of error
1117
  @raise errors.OpPrereqError: in case of failure
1118

1119
  """
1120
  try:
1121
    WipeDisks(lu, instance, disks=disks)
1122
  except errors.OpExecError:
1123
    logging.warning("Wiping disks for instance '%s' failed",
1124
                    instance.name)
1125
    _UndoCreateDisks(lu, cleanup)
1126
    raise
1127

    
1128

    
1129
def ExpandCheckDisks(instance, disks):
1130
  """Return the instance disks selected by the disks list
1131

1132
  @type disks: list of L{objects.Disk} or None
1133
  @param disks: selected disks
1134
  @rtype: list of L{objects.Disk}
1135
  @return: selected instance disks to act on
1136

1137
  """
1138
  if disks is None:
1139
    return instance.disks
1140
  else:
1141
    if not set(disks).issubset(instance.disks):
1142
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1143
                                   " target instance: expected a subset of %r,"
1144
                                   " got %r" % (instance.disks, disks))
1145
    return disks
1146

    
1147

    
1148
def WaitForSync(lu, instance, disks=None, oneshot=False):
1149
  """Sleep and poll for an instance's disk to sync.
1150

1151
  """
1152
  if not instance.disks or disks is not None and not disks:
1153
    return True
1154

    
1155
  disks = ExpandCheckDisks(instance, disks)
1156

    
1157
  if not oneshot:
1158
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1159

    
1160
  node_uuid = instance.primary_node
1161
  node_name = lu.cfg.GetNodeName(node_uuid)
1162

    
1163
  for dev in disks:
1164
    lu.cfg.SetDiskID(dev, node_uuid)
1165

    
1166
  # TODO: Convert to utils.Retry
1167

    
1168
  retries = 0
1169
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1170
  while True:
1171
    max_time = 0
1172
    done = True
1173
    cumul_degraded = False
1174
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1175
    msg = rstats.fail_msg
1176
    if msg:
1177
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1178
      retries += 1
1179
      if retries >= 10:
1180
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1181
                                 " aborting." % node_name)
1182
      time.sleep(6)
1183
      continue
1184
    rstats = rstats.payload
1185
    retries = 0
1186
    for i, mstat in enumerate(rstats):
1187
      if mstat is None:
1188
        lu.LogWarning("Can't compute data for node %s/%s",
1189
                      node_name, disks[i].iv_name)
1190
        continue
1191

    
1192
      cumul_degraded = (cumul_degraded or
1193
                        (mstat.is_degraded and mstat.sync_percent is None))
1194
      if mstat.sync_percent is not None:
1195
        done = False
1196
        if mstat.estimated_time is not None:
1197
          rem_time = ("%s remaining (estimated)" %
1198
                      utils.FormatSeconds(mstat.estimated_time))
1199
          max_time = mstat.estimated_time
1200
        else:
1201
          rem_time = "no time estimate"
1202
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1203
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1204

    
1205
    # if we're done but degraded, let's do a few small retries, to
1206
    # make sure we see a stable and not transient situation; therefore
1207
    # we force restart of the loop
1208
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1209
      logging.info("Degraded disks found, %d retries left", degr_retries)
1210
      degr_retries -= 1
1211
      time.sleep(1)
1212
      continue
1213

    
1214
    if done or oneshot:
1215
      break
1216

    
1217
    time.sleep(min(60, max_time))
1218

    
1219
  if done:
1220
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1221

    
1222
  return not cumul_degraded
1223

    
1224

    
1225
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1226
  """Shutdown block devices of an instance.
1227

1228
  This does the shutdown on all nodes of the instance.
1229

1230
  If the ignore_primary is false, errors on the primary node are
1231
  ignored.
1232

1233
  """
1234
  lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1235
  all_result = True
1236
  disks = ExpandCheckDisks(instance, disks)
1237

    
1238
  for disk in disks:
1239
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1240
      lu.cfg.SetDiskID(top_disk, node_uuid)
1241
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1242
      msg = result.fail_msg
1243
      if msg:
1244
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1245
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1246
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1247
            (node_uuid != instance.primary_node and not result.offline)):
1248
          all_result = False
1249
  return all_result
1250

    
1251

    
1252
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1253
  """Shutdown block devices of an instance.
1254

1255
  This function checks if an instance is running, before calling
1256
  _ShutdownInstanceDisks.
1257

1258
  """
1259
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1260
  ShutdownInstanceDisks(lu, instance, disks=disks)
1261

    
1262

    
1263
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1264
                           ignore_size=False):
1265
  """Prepare the block devices for an instance.
1266

1267
  This sets up the block devices on all nodes.
1268

1269
  @type lu: L{LogicalUnit}
1270
  @param lu: the logical unit on whose behalf we execute
1271
  @type instance: L{objects.Instance}
1272
  @param instance: the instance for whose disks we assemble
1273
  @type disks: list of L{objects.Disk} or None
1274
  @param disks: which disks to assemble (or all, if None)
1275
  @type ignore_secondaries: boolean
1276
  @param ignore_secondaries: if true, errors on secondary nodes
1277
      won't result in an error return from the function
1278
  @type ignore_size: boolean
1279
  @param ignore_size: if true, the current known size of the disk
1280
      will not be used during the disk activation, useful for cases
1281
      when the size is wrong
1282
  @return: False if the operation failed, otherwise a list of
1283
      (host, instance_visible_name, node_visible_name)
1284
      with the mapping from node devices to instance devices
1285

1286
  """
1287
  device_info = []
1288
  disks_ok = True
1289
  disks = ExpandCheckDisks(instance, disks)
1290

    
1291
  # With the two passes mechanism we try to reduce the window of
1292
  # opportunity for the race condition of switching DRBD to primary
1293
  # before handshaking occured, but we do not eliminate it
1294

    
1295
  # The proper fix would be to wait (with some limits) until the
1296
  # connection has been made and drbd transitions from WFConnection
1297
  # into any other network-connected state (Connected, SyncTarget,
1298
  # SyncSource, etc.)
1299

    
1300
  # mark instance disks as active before doing actual work, so watcher does
1301
  # not try to shut them down erroneously
1302
  lu.cfg.MarkInstanceDisksActive(instance.uuid)
1303

    
1304
  # 1st pass, assemble on all nodes in secondary mode
1305
  for idx, inst_disk in enumerate(disks):
1306
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1307
                                  instance.primary_node):
1308
      if ignore_size:
1309
        node_disk = node_disk.Copy()
1310
        node_disk.UnsetSize()
1311
      lu.cfg.SetDiskID(node_disk, node_uuid)
1312
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1313
                                             instance.name, False, idx)
1314
      msg = result.fail_msg
1315
      if msg:
1316
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1317
                                result.offline)
1318
        lu.LogWarning("Could not prepare block device %s on node %s"
1319
                      " (is_primary=False, pass=1): %s",
1320
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1321
        if not (ignore_secondaries or is_offline_secondary):
1322
          disks_ok = False
1323

    
1324
  # FIXME: race condition on drbd migration to primary
1325

    
1326
  # 2nd pass, do only the primary node
1327
  for idx, inst_disk in enumerate(disks):
1328
    dev_path = None
1329

    
1330
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1331
                                  instance.primary_node):
1332
      if node_uuid != instance.primary_node:
1333
        continue
1334
      if ignore_size:
1335
        node_disk = node_disk.Copy()
1336
        node_disk.UnsetSize()
1337
      lu.cfg.SetDiskID(node_disk, node_uuid)
1338
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1339
                                             instance.name, True, idx)
1340
      msg = result.fail_msg
1341
      if msg:
1342
        lu.LogWarning("Could not prepare block device %s on node %s"
1343
                      " (is_primary=True, pass=2): %s",
1344
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1345
        disks_ok = False
1346
      else:
1347
        dev_path = result.payload
1348

    
1349
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1350
                        inst_disk.iv_name, dev_path))
1351

    
1352
  # leave the disks configured for the primary node
1353
  # this is a workaround that would be fixed better by
1354
  # improving the logical/physical id handling
1355
  for disk in disks:
1356
    lu.cfg.SetDiskID(disk, instance.primary_node)
1357

    
1358
  if not disks_ok:
1359
    lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1360

    
1361
  return disks_ok, device_info
1362

    
1363

    
1364
def StartInstanceDisks(lu, instance, force):
1365
  """Start the disks of an instance.
1366

1367
  """
1368
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1369
                                      ignore_secondaries=force)
1370
  if not disks_ok:
1371
    ShutdownInstanceDisks(lu, instance)
1372
    if force is not None and not force:
1373
      lu.LogWarning("",
1374
                    hint=("If the message above refers to a secondary node,"
1375
                          " you can retry the operation using '--force'"))
1376
    raise errors.OpExecError("Disk consistency error")
1377

    
1378

    
1379
class LUInstanceGrowDisk(LogicalUnit):
1380
  """Grow a disk of an instance.
1381

1382
  """
1383
  HPATH = "disk-grow"
1384
  HTYPE = constants.HTYPE_INSTANCE
1385
  REQ_BGL = False
1386

    
1387
  def ExpandNames(self):
1388
    self._ExpandAndLockInstance()
1389
    self.needed_locks[locking.LEVEL_NODE] = []
1390
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1391
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1392
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1393

    
1394
  def DeclareLocks(self, level):
1395
    if level == locking.LEVEL_NODE:
1396
      self._LockInstancesNodes()
1397
    elif level == locking.LEVEL_NODE_RES:
1398
      # Copy node locks
1399
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1400
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1401

    
1402
  def BuildHooksEnv(self):
1403
    """Build hooks env.
1404

1405
    This runs on the master, the primary and all the secondaries.
1406

1407
    """
1408
    env = {
1409
      "DISK": self.op.disk,
1410
      "AMOUNT": self.op.amount,
1411
      "ABSOLUTE": self.op.absolute,
1412
      }
1413
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1414
    return env
1415

    
1416
  def BuildHooksNodes(self):
1417
    """Build hooks nodes.
1418

1419
    """
1420
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1421
    return (nl, nl)
1422

    
1423
  def CheckPrereq(self):
1424
    """Check prerequisites.
1425

1426
    This checks that the instance is in the cluster.
1427

1428
    """
1429
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1430
    assert self.instance is not None, \
1431
      "Cannot retrieve locked instance %s" % self.op.instance_name
1432
    node_uuids = list(self.instance.all_nodes)
1433
    for node_uuid in node_uuids:
1434
      CheckNodeOnline(self, node_uuid)
1435
    self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1436

    
1437
    if self.instance.disk_template not in constants.DTS_GROWABLE:
1438
      raise errors.OpPrereqError("Instance's disk layout does not support"
1439
                                 " growing", errors.ECODE_INVAL)
1440

    
1441
    self.disk = self.instance.FindDisk(self.op.disk)
1442

    
1443
    if self.op.absolute:
1444
      self.target = self.op.amount
1445
      self.delta = self.target - self.disk.size
1446
      if self.delta < 0:
1447
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1448
                                   "current disk size (%s)" %
1449
                                   (utils.FormatUnit(self.target, "h"),
1450
                                    utils.FormatUnit(self.disk.size, "h")),
1451
                                   errors.ECODE_STATE)
1452
    else:
1453
      self.delta = self.op.amount
1454
      self.target = self.disk.size + self.delta
1455
      if self.delta < 0:
1456
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1457
                                   utils.FormatUnit(self.delta, "h"),
1458
                                   errors.ECODE_INVAL)
1459

    
1460
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1461

    
1462
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1463
    template = self.instance.disk_template
1464
    if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1465
        not any(self.node_es_flags.values())):
1466
      # TODO: check the free disk space for file, when that feature will be
1467
      # supported
1468
      # With exclusive storage we need to do something smarter than just looking
1469
      # at free space, which, in the end, is basically a dry run. So we rely on
1470
      # the dry run performed in Exec() instead.
1471
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1472

    
1473
  def Exec(self, feedback_fn):
1474
    """Execute disk grow.
1475

1476
    """
1477
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1478
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1479
            self.owned_locks(locking.LEVEL_NODE_RES))
1480

    
1481
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1482

    
1483
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1484
    if not disks_ok:
1485
      raise errors.OpExecError("Cannot activate block device to grow")
1486

    
1487
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1488
                (self.op.disk, self.instance.name,
1489
                 utils.FormatUnit(self.delta, "h"),
1490
                 utils.FormatUnit(self.target, "h")))
1491

    
1492
    # First run all grow ops in dry-run mode
1493
    for node_uuid in self.instance.all_nodes:
1494
      self.cfg.SetDiskID(self.disk, node_uuid)
1495
      result = self.rpc.call_blockdev_grow(node_uuid,
1496
                                           (self.disk, self.instance),
1497
                                           self.delta, True, True,
1498
                                           self.node_es_flags[node_uuid])
1499
      result.Raise("Dry-run grow request failed to node %s" %
1500
                   self.cfg.GetNodeName(node_uuid))
1501

    
1502
    if wipe_disks:
1503
      # Get disk size from primary node for wiping
1504
      self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1505
      result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1506
                                                    [self.disk])
1507
      result.Raise("Failed to retrieve disk size from node '%s'" %
1508
                   self.instance.primary_node)
1509

    
1510
      (disk_dimensions, ) = result.payload
1511

    
1512
      if disk_dimensions is None:
1513
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1514
                                 " node '%s'" % self.instance.primary_node)
1515
      (disk_size_in_bytes, _) = disk_dimensions
1516

    
1517
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1518

    
1519
      assert old_disk_size >= self.disk.size, \
1520
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1521
         (old_disk_size, self.disk.size))
1522
    else:
1523
      old_disk_size = None
1524

    
1525
    # We know that (as far as we can test) operations across different
1526
    # nodes will succeed, time to run it for real on the backing storage
1527
    for node_uuid in self.instance.all_nodes:
1528
      self.cfg.SetDiskID(self.disk, node_uuid)
1529
      result = self.rpc.call_blockdev_grow(node_uuid,
1530
                                           (self.disk, self.instance),
1531
                                           self.delta, False, True,
1532
                                           self.node_es_flags[node_uuid])
1533
      result.Raise("Grow request failed to node %s" %
1534
                   self.cfg.GetNodeName(node_uuid))
1535

    
1536
    # And now execute it for logical storage, on the primary node
1537
    node_uuid = self.instance.primary_node
1538
    self.cfg.SetDiskID(self.disk, node_uuid)
1539
    result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1540
                                         self.delta, False, False,
1541
                                         self.node_es_flags[node_uuid])
1542
    result.Raise("Grow request failed to node %s" %
1543
                 self.cfg.GetNodeName(node_uuid))
1544

    
1545
    self.disk.RecordGrow(self.delta)
1546
    self.cfg.Update(self.instance, feedback_fn)
1547

    
1548
    # Changes have been recorded, release node lock
1549
    ReleaseLocks(self, locking.LEVEL_NODE)
1550

    
1551
    # Downgrade lock while waiting for sync
1552
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1553

    
1554
    assert wipe_disks ^ (old_disk_size is None)
1555

    
1556
    if wipe_disks:
1557
      assert self.instance.disks[self.op.disk] == self.disk
1558

    
1559
      # Wipe newly added disk space
1560
      WipeDisks(self, self.instance,
1561
                disks=[(self.op.disk, self.disk, old_disk_size)])
1562

    
1563
    if self.op.wait_for_sync:
1564
      disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1565
      if disk_abort:
1566
        self.LogWarning("Disk syncing has not returned a good status; check"
1567
                        " the instance")
1568
      if not self.instance.disks_active:
1569
        _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1570
    elif not self.instance.disks_active:
1571
      self.LogWarning("Not shutting down the disk even if the instance is"
1572
                      " not supposed to be running because no wait for"
1573
                      " sync mode was requested")
1574

    
1575
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1576
    assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1577

    
1578

    
1579
class LUInstanceReplaceDisks(LogicalUnit):
1580
  """Replace the disks of an instance.
1581

1582
  """
1583
  HPATH = "mirrors-replace"
1584
  HTYPE = constants.HTYPE_INSTANCE
1585
  REQ_BGL = False
1586

    
1587
  def CheckArguments(self):
1588
    """Check arguments.
1589

1590
    """
1591
    if self.op.mode == constants.REPLACE_DISK_CHG:
1592
      if self.op.remote_node is None and self.op.iallocator is None:
1593
        raise errors.OpPrereqError("When changing the secondary either an"
1594
                                   " iallocator script must be used or the"
1595
                                   " new node given", errors.ECODE_INVAL)
1596
      else:
1597
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1598

    
1599
    elif self.op.remote_node is not None or self.op.iallocator is not None:
1600
      # Not replacing the secondary
1601
      raise errors.OpPrereqError("The iallocator and new node options can"
1602
                                 " only be used when changing the"
1603
                                 " secondary node", errors.ECODE_INVAL)
1604

    
1605
  def ExpandNames(self):
1606
    self._ExpandAndLockInstance()
1607

    
1608
    assert locking.LEVEL_NODE not in self.needed_locks
1609
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1610
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1611

    
1612
    assert self.op.iallocator is None or self.op.remote_node is None, \
1613
      "Conflicting options"
1614

    
1615
    if self.op.remote_node is not None:
1616
      (self.op.remote_node_uuid, self.op.remote_node) = \
1617
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1618
                              self.op.remote_node)
1619

    
1620
      # Warning: do not remove the locking of the new secondary here
1621
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1622
      # currently it doesn't since parallel invocations of
1623
      # FindUnusedMinor will conflict
1624
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1625
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1626
    else:
1627
      self.needed_locks[locking.LEVEL_NODE] = []
1628
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1629

    
1630
      if self.op.iallocator is not None:
1631
        # iallocator will select a new node in the same group
1632
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1633
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1634

    
1635
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1636

    
1637
    self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1638
                                   self.op.instance_name, self.op.mode,
1639
                                   self.op.iallocator, self.op.remote_node_uuid,
1640
                                   self.op.disks, self.op.early_release,
1641
                                   self.op.ignore_ipolicy)
1642

    
1643
    self.tasklets = [self.replacer]
1644

    
1645
  def DeclareLocks(self, level):
1646
    if level == locking.LEVEL_NODEGROUP:
1647
      assert self.op.remote_node_uuid is None
1648
      assert self.op.iallocator is not None
1649
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1650

    
1651
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1652
      # Lock all groups used by instance optimistically; this requires going
1653
      # via the node before it's locked, requiring verification later on
1654
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1655
        self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1656

    
1657
    elif level == locking.LEVEL_NODE:
1658
      if self.op.iallocator is not None:
1659
        assert self.op.remote_node_uuid is None
1660
        assert not self.needed_locks[locking.LEVEL_NODE]
1661
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1662

    
1663
        # Lock member nodes of all locked groups
1664
        self.needed_locks[locking.LEVEL_NODE] = \
1665
          [node_uuid
1666
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1667
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1668
      else:
1669
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1670

    
1671
        self._LockInstancesNodes()
1672

    
1673
    elif level == locking.LEVEL_NODE_RES:
1674
      # Reuse node locks
1675
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1676
        self.needed_locks[locking.LEVEL_NODE]
1677

    
1678
  def BuildHooksEnv(self):
1679
    """Build hooks env.
1680

1681
    This runs on the master, the primary and all the secondaries.
1682

1683
    """
1684
    instance = self.replacer.instance
1685
    env = {
1686
      "MODE": self.op.mode,
1687
      "NEW_SECONDARY": self.op.remote_node,
1688
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1689
      }
1690
    env.update(BuildInstanceHookEnvByObject(self, instance))
1691
    return env
1692

    
1693
  def BuildHooksNodes(self):
1694
    """Build hooks nodes.
1695

1696
    """
1697
    instance = self.replacer.instance
1698
    nl = [
1699
      self.cfg.GetMasterNode(),
1700
      instance.primary_node,
1701
      ]
1702
    if self.op.remote_node_uuid is not None:
1703
      nl.append(self.op.remote_node_uuid)
1704
    return nl, nl
1705

    
1706
  def CheckPrereq(self):
1707
    """Check prerequisites.
1708

1709
    """
1710
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1711
            self.op.iallocator is None)
1712

    
1713
    # Verify if node group locks are still correct
1714
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1715
    if owned_groups:
1716
      CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1717

    
1718
    return LogicalUnit.CheckPrereq(self)
1719

    
1720

    
1721
class LUInstanceActivateDisks(NoHooksLU):
1722
  """Bring up an instance's disks.
1723

1724
  """
1725
  REQ_BGL = False
1726

    
1727
  def ExpandNames(self):
1728
    self._ExpandAndLockInstance()
1729
    self.needed_locks[locking.LEVEL_NODE] = []
1730
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1731

    
1732
  def DeclareLocks(self, level):
1733
    if level == locking.LEVEL_NODE:
1734
      self._LockInstancesNodes()
1735

    
1736
  def CheckPrereq(self):
1737
    """Check prerequisites.
1738

1739
    This checks that the instance is in the cluster.
1740

1741
    """
1742
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1743
    assert self.instance is not None, \
1744
      "Cannot retrieve locked instance %s" % self.op.instance_name
1745
    CheckNodeOnline(self, self.instance.primary_node)
1746

    
1747
  def Exec(self, feedback_fn):
1748
    """Activate the disks.
1749

1750
    """
1751
    disks_ok, disks_info = \
1752
              AssembleInstanceDisks(self, self.instance,
1753
                                    ignore_size=self.op.ignore_size)
1754
    if not disks_ok:
1755
      raise errors.OpExecError("Cannot activate block devices")
1756

    
1757
    if self.op.wait_for_sync:
1758
      if not WaitForSync(self, self.instance):
1759
        self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1760
        raise errors.OpExecError("Some disks of the instance are degraded!")
1761

    
1762
    return disks_info
1763

    
1764

    
1765
class LUInstanceDeactivateDisks(NoHooksLU):
1766
  """Shutdown an instance's disks.
1767

1768
  """
1769
  REQ_BGL = False
1770

    
1771
  def ExpandNames(self):
1772
    self._ExpandAndLockInstance()
1773
    self.needed_locks[locking.LEVEL_NODE] = []
1774
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1775

    
1776
  def DeclareLocks(self, level):
1777
    if level == locking.LEVEL_NODE:
1778
      self._LockInstancesNodes()
1779

    
1780
  def CheckPrereq(self):
1781
    """Check prerequisites.
1782

1783
    This checks that the instance is in the cluster.
1784

1785
    """
1786
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1787
    assert self.instance is not None, \
1788
      "Cannot retrieve locked instance %s" % self.op.instance_name
1789

    
1790
  def Exec(self, feedback_fn):
1791
    """Deactivate the disks
1792

1793
    """
1794
    if self.op.force:
1795
      ShutdownInstanceDisks(self, self.instance)
1796
    else:
1797
      _SafeShutdownInstanceDisks(self, self.instance)
1798

    
1799

    
1800
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1801
                               ldisk=False):
1802
  """Check that mirrors are not degraded.
1803

1804
  @attention: The device has to be annotated already.
1805

1806
  The ldisk parameter, if True, will change the test from the
1807
  is_degraded attribute (which represents overall non-ok status for
1808
  the device(s)) to the ldisk (representing the local storage status).
1809

1810
  """
1811
  lu.cfg.SetDiskID(dev, node_uuid)
1812

    
1813
  result = True
1814

    
1815
  if on_primary or dev.AssembleOnSecondary():
1816
    rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1817
    msg = rstats.fail_msg
1818
    if msg:
1819
      lu.LogWarning("Can't find disk on node %s: %s",
1820
                    lu.cfg.GetNodeName(node_uuid), msg)
1821
      result = False
1822
    elif not rstats.payload:
1823
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1824
      result = False
1825
    else:
1826
      if ldisk:
1827
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1828
      else:
1829
        result = result and not rstats.payload.is_degraded
1830

    
1831
  if dev.children:
1832
    for child in dev.children:
1833
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1834
                                                     node_uuid, on_primary)
1835

    
1836
  return result
1837

    
1838

    
1839
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1840
  """Wrapper around L{_CheckDiskConsistencyInner}.
1841

1842
  """
1843
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1844
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1845
                                    ldisk=ldisk)
1846

    
1847

    
1848
def _BlockdevFind(lu, node_uuid, dev, instance):
1849
  """Wrapper around call_blockdev_find to annotate diskparams.
1850

1851
  @param lu: A reference to the lu object
1852
  @param node_uuid: The node to call out
1853
  @param dev: The device to find
1854
  @param instance: The instance object the device belongs to
1855
  @returns The result of the rpc call
1856

1857
  """
1858
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1859
  return lu.rpc.call_blockdev_find(node_uuid, disk)
1860

    
1861

    
1862
def _GenerateUniqueNames(lu, exts):
1863
  """Generate a suitable LV name.
1864

1865
  This will generate a logical volume name for the given instance.
1866

1867
  """
1868
  results = []
1869
  for val in exts:
1870
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1871
    results.append("%s%s" % (new_id, val))
1872
  return results
1873

    
1874

    
1875
class TLReplaceDisks(Tasklet):
1876
  """Replaces disks for an instance.
1877

1878
  Note: Locking is not within the scope of this class.
1879

1880
  """
1881
  def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1882
               remote_node_uuid, disks, early_release, ignore_ipolicy):
1883
    """Initializes this class.
1884

1885
    """
1886
    Tasklet.__init__(self, lu)
1887

    
1888
    # Parameters
1889
    self.instance_uuid = instance_uuid
1890
    self.instance_name = instance_name
1891
    self.mode = mode
1892
    self.iallocator_name = iallocator_name
1893
    self.remote_node_uuid = remote_node_uuid
1894
    self.disks = disks
1895
    self.early_release = early_release
1896
    self.ignore_ipolicy = ignore_ipolicy
1897

    
1898
    # Runtime data
1899
    self.instance = None
1900
    self.new_node_uuid = None
1901
    self.target_node_uuid = None
1902
    self.other_node_uuid = None
1903
    self.remote_node_info = None
1904
    self.node_secondary_ip = None
1905

    
1906
  @staticmethod
1907
  def _RunAllocator(lu, iallocator_name, instance_uuid,
1908
                    relocate_from_node_uuids):
1909
    """Compute a new secondary node using an IAllocator.
1910

1911
    """
1912
    req = iallocator.IAReqRelocate(
1913
          inst_uuid=instance_uuid,
1914
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1915
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1916

    
1917
    ial.Run(iallocator_name)
1918

    
1919
    if not ial.success:
1920
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1921
                                 " %s" % (iallocator_name, ial.info),
1922
                                 errors.ECODE_NORES)
1923

    
1924
    remote_node_name = ial.result[0]
1925
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1926

    
1927
    if remote_node is None:
1928
      raise errors.OpPrereqError("Node %s not found in configuration" %
1929
                                 remote_node_name, errors.ECODE_NOENT)
1930

    
1931
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1932
               instance_uuid, remote_node_name)
1933

    
1934
    return remote_node.uuid
1935

    
1936
  def _FindFaultyDisks(self, node_uuid):
1937
    """Wrapper for L{FindFaultyInstanceDisks}.
1938

1939
    """
1940
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1941
                                   node_uuid, True)
1942

    
1943
  def _CheckDisksActivated(self, instance):
1944
    """Checks if the instance disks are activated.
1945

1946
    @param instance: The instance to check disks
1947
    @return: True if they are activated, False otherwise
1948

1949
    """
1950
    node_uuids = instance.all_nodes
1951

    
1952
    for idx, dev in enumerate(instance.disks):
1953
      for node_uuid in node_uuids:
1954
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1955
                        self.cfg.GetNodeName(node_uuid))
1956
        self.cfg.SetDiskID(dev, node_uuid)
1957

    
1958
        result = _BlockdevFind(self, node_uuid, dev, instance)
1959

    
1960
        if result.offline:
1961
          continue
1962
        elif result.fail_msg or not result.payload:
1963
          return False
1964

    
1965
    return True
1966

    
1967
  def CheckPrereq(self):
1968
    """Check prerequisites.
1969

1970
    This checks that the instance is in the cluster.
1971

1972
    """
1973
    self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1974
    assert self.instance is not None, \
1975
      "Cannot retrieve locked instance %s" % self.instance_name
1976

    
1977
    if self.instance.disk_template != constants.DT_DRBD8:
1978
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1979
                                 " instances", errors.ECODE_INVAL)
1980

    
1981
    if len(self.instance.secondary_nodes) != 1:
1982
      raise errors.OpPrereqError("The instance has a strange layout,"
1983
                                 " expected one secondary but found %d" %
1984
                                 len(self.instance.secondary_nodes),
1985
                                 errors.ECODE_FAULT)
1986

    
1987
    secondary_node_uuid = self.instance.secondary_nodes[0]
1988

    
1989
    if self.iallocator_name is None:
1990
      remote_node_uuid = self.remote_node_uuid
1991
    else:
1992
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1993
                                            self.instance.uuid,
1994
                                            self.instance.secondary_nodes)
1995

    
1996
    if remote_node_uuid is None:
1997
      self.remote_node_info = None
1998
    else:
1999
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
2000
             "Remote node '%s' is not locked" % remote_node_uuid
2001

    
2002
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2003
      assert self.remote_node_info is not None, \
2004
        "Cannot retrieve locked node %s" % remote_node_uuid
2005

    
2006
    if remote_node_uuid == self.instance.primary_node:
2007
      raise errors.OpPrereqError("The specified node is the primary node of"
2008
                                 " the instance", errors.ECODE_INVAL)
2009

    
2010
    if remote_node_uuid == secondary_node_uuid:
2011
      raise errors.OpPrereqError("The specified node is already the"
2012
                                 " secondary node of the instance",
2013
                                 errors.ECODE_INVAL)
2014

    
2015
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2016
                                    constants.REPLACE_DISK_CHG):
2017
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
2018
                                 errors.ECODE_INVAL)
2019

    
2020
    if self.mode == constants.REPLACE_DISK_AUTO:
2021
      if not self._CheckDisksActivated(self.instance):
2022
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
2023
                                   " first" % self.instance_name,
2024
                                   errors.ECODE_STATE)
2025
      faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2026
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2027

    
2028
      if faulty_primary and faulty_secondary:
2029
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2030
                                   " one node and can not be repaired"
2031
                                   " automatically" % self.instance_name,
2032
                                   errors.ECODE_STATE)
2033

    
2034
      if faulty_primary:
2035
        self.disks = faulty_primary
2036
        self.target_node_uuid = self.instance.primary_node
2037
        self.other_node_uuid = secondary_node_uuid
2038
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2039
      elif faulty_secondary:
2040
        self.disks = faulty_secondary
2041
        self.target_node_uuid = secondary_node_uuid
2042
        self.other_node_uuid = self.instance.primary_node
2043
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2044
      else:
2045
        self.disks = []
2046
        check_nodes = []
2047

    
2048
    else:
2049
      # Non-automatic modes
2050
      if self.mode == constants.REPLACE_DISK_PRI:
2051
        self.target_node_uuid = self.instance.primary_node
2052
        self.other_node_uuid = secondary_node_uuid
2053
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2054

    
2055
      elif self.mode == constants.REPLACE_DISK_SEC:
2056
        self.target_node_uuid = secondary_node_uuid
2057
        self.other_node_uuid = self.instance.primary_node
2058
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2059

    
2060
      elif self.mode == constants.REPLACE_DISK_CHG:
2061
        self.new_node_uuid = remote_node_uuid
2062
        self.other_node_uuid = self.instance.primary_node
2063
        self.target_node_uuid = secondary_node_uuid
2064
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2065

    
2066
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2067
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2068

    
2069
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2070
        assert old_node_info is not None
2071
        if old_node_info.offline and not self.early_release:
2072
          # doesn't make sense to delay the release
2073
          self.early_release = True
2074
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2075
                          " early-release mode", secondary_node_uuid)
2076

    
2077
      else:
2078
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2079
                                     self.mode)
2080

    
2081
      # If not specified all disks should be replaced
2082
      if not self.disks:
2083
        self.disks = range(len(self.instance.disks))
2084

    
2085
    # TODO: This is ugly, but right now we can't distinguish between internal
2086
    # submitted opcode and external one. We should fix that.
2087
    if self.remote_node_info:
2088
      # We change the node, lets verify it still meets instance policy
2089
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2090
      cluster = self.cfg.GetClusterInfo()
2091
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2092
                                                              new_group_info)
2093
      CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2094
                             self.remote_node_info, self.cfg,
2095
                             ignore=self.ignore_ipolicy)
2096

    
2097
    for node_uuid in check_nodes:
2098
      CheckNodeOnline(self.lu, node_uuid)
2099

    
2100
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2101
                                                          self.other_node_uuid,
2102
                                                          self.target_node_uuid]
2103
                              if node_uuid is not None)
2104

    
2105
    # Release unneeded node and node resource locks
2106
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2107
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2108
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2109

    
2110
    # Release any owned node group
2111
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2112

    
2113
    # Check whether disks are valid
2114
    for disk_idx in self.disks:
2115
      self.instance.FindDisk(disk_idx)
2116

    
2117
    # Get secondary node IP addresses
2118
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2119
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2120

    
2121
  def Exec(self, feedback_fn):
2122
    """Execute disk replacement.
2123

2124
    This dispatches the disk replacement to the appropriate handler.
2125

2126
    """
2127
    if __debug__:
2128
      # Verify owned locks before starting operation
2129
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2130
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2131
          ("Incorrect node locks, owning %s, expected %s" %
2132
           (owned_nodes, self.node_secondary_ip.keys()))
2133
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2134
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2135
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2136

    
2137
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2138
      assert list(owned_instances) == [self.instance_name], \
2139
          "Instance '%s' not locked" % self.instance_name
2140

    
2141
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2142
          "Should not own any node group lock at this point"
2143

    
2144
    if not self.disks:
2145
      feedback_fn("No disks need replacement for instance '%s'" %
2146
                  self.instance.name)
2147
      return
2148

    
2149
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2150
                (utils.CommaJoin(self.disks), self.instance.name))
2151
    feedback_fn("Current primary node: %s" %
2152
                self.cfg.GetNodeName(self.instance.primary_node))
2153
    feedback_fn("Current seconary node: %s" %
2154
                utils.CommaJoin(self.cfg.GetNodeNames(
2155
                                  self.instance.secondary_nodes)))
2156

    
2157
    activate_disks = not self.instance.disks_active
2158

    
2159
    # Activate the instance disks if we're replacing them on a down instance
2160
    if activate_disks:
2161
      StartInstanceDisks(self.lu, self.instance, True)
2162

    
2163
    try:
2164
      # Should we replace the secondary node?
2165
      if self.new_node_uuid is not None:
2166
        fn = self._ExecDrbd8Secondary
2167
      else:
2168
        fn = self._ExecDrbd8DiskOnly
2169

    
2170
      result = fn(feedback_fn)
2171
    finally:
2172
      # Deactivate the instance disks if we're replacing them on a
2173
      # down instance
2174
      if activate_disks:
2175
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2176

    
2177
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2178

    
2179
    if __debug__:
2180
      # Verify owned locks
2181
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2182
      nodes = frozenset(self.node_secondary_ip)
2183
      assert ((self.early_release and not owned_nodes) or
2184
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2185
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2186
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2187

    
2188
    return result
2189

    
2190
  def _CheckVolumeGroup(self, node_uuids):
2191
    self.lu.LogInfo("Checking volume groups")
2192

    
2193
    vgname = self.cfg.GetVGName()
2194

    
2195
    # Make sure volume group exists on all involved nodes
2196
    results = self.rpc.call_vg_list(node_uuids)
2197
    if not results:
2198
      raise errors.OpExecError("Can't list volume groups on the nodes")
2199

    
2200
    for node_uuid in node_uuids:
2201
      res = results[node_uuid]
2202
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2203
      if vgname not in res.payload:
2204
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2205
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2206

    
2207
  def _CheckDisksExistence(self, node_uuids):
2208
    # Check disk existence
2209
    for idx, dev in enumerate(self.instance.disks):
2210
      if idx not in self.disks:
2211
        continue
2212

    
2213
      for node_uuid in node_uuids:
2214
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2215
                        self.cfg.GetNodeName(node_uuid))
2216
        self.cfg.SetDiskID(dev, node_uuid)
2217

    
2218
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2219

    
2220
        msg = result.fail_msg
2221
        if msg or not result.payload:
2222
          if not msg:
2223
            msg = "disk not found"
2224
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2225
                                   (idx, self.cfg.GetNodeName(node_uuid), msg))
2226

    
2227
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2228
    for idx, dev in enumerate(self.instance.disks):
2229
      if idx not in self.disks:
2230
        continue
2231

    
2232
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2233
                      (idx, self.cfg.GetNodeName(node_uuid)))
2234

    
2235
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2236
                                  on_primary, ldisk=ldisk):
2237
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2238
                                 " replace disks for instance %s" %
2239
                                 (self.cfg.GetNodeName(node_uuid),
2240
                                  self.instance.name))
2241

    
2242
  def _CreateNewStorage(self, node_uuid):
2243
    """Create new storage on the primary or secondary node.
2244

2245
    This is only used for same-node replaces, not for changing the
2246
    secondary node, hence we don't want to modify the existing disk.
2247

2248
    """
2249
    iv_names = {}
2250

    
2251
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2252
    for idx, dev in enumerate(disks):
2253
      if idx not in self.disks:
2254
        continue
2255

    
2256
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2257
                      self.cfg.GetNodeName(node_uuid), idx)
2258

    
2259
      self.cfg.SetDiskID(dev, node_uuid)
2260

    
2261
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2262
      names = _GenerateUniqueNames(self.lu, lv_names)
2263

    
2264
      (data_disk, meta_disk) = dev.children
2265
      vg_data = data_disk.logical_id[0]
2266
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2267
                             logical_id=(vg_data, names[0]),
2268
                             params=data_disk.params)
2269
      vg_meta = meta_disk.logical_id[0]
2270
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2271
                             size=constants.DRBD_META_SIZE,
2272
                             logical_id=(vg_meta, names[1]),
2273
                             params=meta_disk.params)
2274

    
2275
      new_lvs = [lv_data, lv_meta]
2276
      old_lvs = [child.Copy() for child in dev.children]
2277
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2278
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2279

    
2280
      # we pass force_create=True to force the LVM creation
2281
      for new_lv in new_lvs:
2282
        try:
2283
          _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2284
                               GetInstanceInfoText(self.instance), False,
2285
                               excl_stor)
2286
        except errors.DeviceCreationError, e:
2287
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2288

    
2289
    return iv_names
2290

    
2291
  def _CheckDevices(self, node_uuid, iv_names):
2292
    for name, (dev, _, _) in iv_names.iteritems():
2293
      self.cfg.SetDiskID(dev, node_uuid)
2294

    
2295
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2296

    
2297
      msg = result.fail_msg
2298
      if msg or not result.payload:
2299
        if not msg:
2300
          msg = "disk not found"
2301
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2302
                                 (name, msg))
2303

    
2304
      if result.payload.is_degraded:
2305
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2306

    
2307
  def _RemoveOldStorage(self, node_uuid, iv_names):
2308
    for name, (_, old_lvs, _) in iv_names.iteritems():
2309
      self.lu.LogInfo("Remove logical volumes for %s", name)
2310

    
2311
      for lv in old_lvs:
2312
        self.cfg.SetDiskID(lv, node_uuid)
2313

    
2314
        msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2315
        if msg:
2316
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2317
                             hint="remove unused LVs manually")
2318

    
2319
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2320
    """Replace a disk on the primary or secondary for DRBD 8.
2321

2322
    The algorithm for replace is quite complicated:
2323

2324
      1. for each disk to be replaced:
2325

2326
        1. create new LVs on the target node with unique names
2327
        1. detach old LVs from the drbd device
2328
        1. rename old LVs to name_replaced.<time_t>
2329
        1. rename new LVs to old LVs
2330
        1. attach the new LVs (with the old names now) to the drbd device
2331

2332
      1. wait for sync across all devices
2333

2334
      1. for each modified disk:
2335

2336
        1. remove old LVs (which have the name name_replaces.<time_t>)
2337

2338
    Failures are not very well handled.
2339

2340
    """
2341
    steps_total = 6
2342

    
2343
    # Step: check device activation
2344
    self.lu.LogStep(1, steps_total, "Check device existence")
2345
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2346
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2347

    
2348
    # Step: check other node consistency
2349
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2350
    self._CheckDisksConsistency(
2351
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2352
      False)
2353

    
2354
    # Step: create new storage
2355
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2356
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2357

    
2358
    # Step: for each lv, detach+rename*2+attach
2359
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2360
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2361
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2362

    
2363
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2364
                                                     old_lvs)
2365
      result.Raise("Can't detach drbd from local storage on node"
2366
                   " %s for device %s" %
2367
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2368
      #dev.children = []
2369
      #cfg.Update(instance)
2370

    
2371
      # ok, we created the new LVs, so now we know we have the needed
2372
      # storage; as such, we proceed on the target node to rename
2373
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2374
      # using the assumption that logical_id == physical_id (which in
2375
      # turn is the unique_id on that node)
2376

    
2377
      # FIXME(iustin): use a better name for the replaced LVs
2378
      temp_suffix = int(time.time())
2379
      ren_fn = lambda d, suff: (d.physical_id[0],
2380
                                d.physical_id[1] + "_replaced-%s" % suff)
2381

    
2382
      # Build the rename list based on what LVs exist on the node
2383
      rename_old_to_new = []
2384
      for to_ren in old_lvs:
2385
        result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2386
        if not result.fail_msg and result.payload:
2387
          # device exists
2388
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2389

    
2390
      self.lu.LogInfo("Renaming the old LVs on the target node")
2391
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2392
                                             rename_old_to_new)
2393
      result.Raise("Can't rename old LVs on node %s" %
2394
                   self.cfg.GetNodeName(self.target_node_uuid))
2395

    
2396
      # Now we rename the new LVs to the old LVs
2397
      self.lu.LogInfo("Renaming the new LVs on the target node")
2398
      rename_new_to_old = [(new, old.physical_id)
2399
                           for old, new in zip(old_lvs, new_lvs)]
2400
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2401
                                             rename_new_to_old)
2402
      result.Raise("Can't rename new LVs on node %s" %
2403
                   self.cfg.GetNodeName(self.target_node_uuid))
2404

    
2405
      # Intermediate steps of in memory modifications
2406
      for old, new in zip(old_lvs, new_lvs):
2407
        new.logical_id = old.logical_id
2408
        self.cfg.SetDiskID(new, self.target_node_uuid)
2409

    
2410
      # We need to modify old_lvs so that removal later removes the
2411
      # right LVs, not the newly added ones; note that old_lvs is a
2412
      # copy here
2413
      for disk in old_lvs:
2414
        disk.logical_id = ren_fn(disk, temp_suffix)
2415
        self.cfg.SetDiskID(disk, self.target_node_uuid)
2416

    
2417
      # Now that the new lvs have the old name, we can add them to the device
2418
      self.lu.LogInfo("Adding new mirror component on %s",
2419
                      self.cfg.GetNodeName(self.target_node_uuid))
2420
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2421
                                                  (dev, self.instance), new_lvs)
2422
      msg = result.fail_msg
2423
      if msg:
2424
        for new_lv in new_lvs:
2425
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2426
                                               new_lv).fail_msg
2427
          if msg2:
2428
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2429
                               hint=("cleanup manually the unused logical"
2430
                                     "volumes"))
2431
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2432

    
2433
    cstep = itertools.count(5)
2434

    
2435
    if self.early_release:
2436
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2437
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2438
      # TODO: Check if releasing locks early still makes sense
2439
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2440
    else:
2441
      # Release all resource locks except those used by the instance
2442
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2443
                   keep=self.node_secondary_ip.keys())
2444

    
2445
    # Release all node locks while waiting for sync
2446
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2447

    
2448
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2449
    # shutdown in the caller into consideration.
2450

    
2451
    # Wait for sync
2452
    # This can fail as the old devices are degraded and _WaitForSync
2453
    # does a combined result over all disks, so we don't check its return value
2454
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2455
    WaitForSync(self.lu, self.instance)
2456

    
2457
    # Check all devices manually
2458
    self._CheckDevices(self.instance.primary_node, iv_names)
2459

    
2460
    # Step: remove old storage
2461
    if not self.early_release:
2462
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2463
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2464

    
2465
  def _ExecDrbd8Secondary(self, feedback_fn):
2466
    """Replace the secondary node for DRBD 8.
2467

2468
    The algorithm for replace is quite complicated:
2469
      - for all disks of the instance:
2470
        - create new LVs on the new node with same names
2471
        - shutdown the drbd device on the old secondary
2472
        - disconnect the drbd network on the primary
2473
        - create the drbd device on the new secondary
2474
        - network attach the drbd on the primary, using an artifice:
2475
          the drbd code for Attach() will connect to the network if it
2476
          finds a device which is connected to the good local disks but
2477
          not network enabled
2478
      - wait for sync across all devices
2479
      - remove all disks from the old secondary
2480

2481
    Failures are not very well handled.
2482

2483
    """
2484
    steps_total = 6
2485

    
2486
    pnode = self.instance.primary_node
2487

    
2488
    # Step: check device activation
2489
    self.lu.LogStep(1, steps_total, "Check device existence")
2490
    self._CheckDisksExistence([self.instance.primary_node])
2491
    self._CheckVolumeGroup([self.instance.primary_node])
2492

    
2493
    # Step: check other node consistency
2494
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2495
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2496

    
2497
    # Step: create new storage
2498
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2499
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2500
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2501
                                                  self.new_node_uuid)
2502
    for idx, dev in enumerate(disks):
2503
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2504
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2505
      # we pass force_create=True to force LVM creation
2506
      for new_lv in dev.children:
2507
        try:
2508
          _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2509
                               new_lv, True, GetInstanceInfoText(self.instance),
2510
                               False, excl_stor)
2511
        except errors.DeviceCreationError, e:
2512
          raise errors.OpExecError("Can't create block device: %s" % e.message)
2513

    
2514
    # Step 4: dbrd minors and drbd setups changes
2515
    # after this, we must manually remove the drbd minors on both the
2516
    # error and the success paths
2517
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2518
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2519
                                         for _ in self.instance.disks],
2520
                                        self.instance.uuid)
2521
    logging.debug("Allocated minors %r", minors)
2522

    
2523
    iv_names = {}
2524
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2525
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2526
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2527
      # create new devices on new_node; note that we create two IDs:
2528
      # one without port, so the drbd will be activated without
2529
      # networking information on the new node at this stage, and one
2530
      # with network, for the latter activation in step 4
2531
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2532
      if self.instance.primary_node == o_node1:
2533
        p_minor = o_minor1
2534
      else:
2535
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2536
        p_minor = o_minor2
2537

    
2538
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2539
                      p_minor, new_minor, o_secret)
2540
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2541
                    p_minor, new_minor, o_secret)
2542

    
2543
      iv_names[idx] = (dev, dev.children, new_net_id)
2544
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2545
                    new_net_id)
2546
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2547
                              logical_id=new_alone_id,
2548
                              children=dev.children,
2549
                              size=dev.size,
2550
                              params={})
2551
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2552
                                            self.cfg)
2553
      try:
2554
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2555
                             anno_new_drbd,
2556
                             GetInstanceInfoText(self.instance), False,
2557
                             excl_stor)
2558
      except errors.GenericError:
2559
        self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2560
        raise
2561

    
2562
    # We have new devices, shutdown the drbd on the old secondary
2563
    for idx, dev in enumerate(self.instance.disks):
2564
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2565
      self.cfg.SetDiskID(dev, self.target_node_uuid)
2566
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2567
                                            (dev, self.instance)).fail_msg
2568
      if msg:
2569
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2570
                           "node: %s" % (idx, msg),
2571
                           hint=("Please cleanup this device manually as"
2572
                                 " soon as possible"))
2573

    
2574
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2575
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2576
                                               self.instance.disks)[pnode]
2577

    
2578
    msg = result.fail_msg
2579
    if msg:
2580
      # detaches didn't succeed (unlikely)
2581
      self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2582
      raise errors.OpExecError("Can't detach the disks from the network on"
2583
                               " old node: %s" % (msg,))
2584

    
2585
    # if we managed to detach at least one, we update all the disks of
2586
    # the instance to point to the new secondary
2587
    self.lu.LogInfo("Updating instance configuration")
2588
    for dev, _, new_logical_id in iv_names.itervalues():
2589
      dev.logical_id = new_logical_id
2590
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2591

    
2592
    self.cfg.Update(self.instance, feedback_fn)
2593

    
2594
    # Release all node locks (the configuration has been updated)
2595
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2596

    
2597
    # and now perform the drbd attach
2598
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2599
                    " (standalone => connected)")
2600
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2601
                                            self.new_node_uuid],
2602
                                           self.node_secondary_ip,
2603
                                           (self.instance.disks, self.instance),
2604
                                           self.instance.name,
2605
                                           False)
2606
    for to_node, to_result in result.items():
2607
      msg = to_result.fail_msg
2608
      if msg:
2609
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2610
                           self.cfg.GetNodeName(to_node), msg,
2611
                           hint=("please do a gnt-instance info to see the"
2612
                                 " status of disks"))
2613

    
2614
    cstep = itertools.count(5)
2615

    
2616
    if self.early_release:
2617
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2618
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2619
      # TODO: Check if releasing locks early still makes sense
2620
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2621
    else:
2622
      # Release all resource locks except those used by the instance
2623
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2624
                   keep=self.node_secondary_ip.keys())
2625

    
2626
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2627
    # shutdown in the caller into consideration.
2628

    
2629
    # Wait for sync
2630
    # This can fail as the old devices are degraded and _WaitForSync
2631
    # does a combined result over all disks, so we don't check its return value
2632
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2633
    WaitForSync(self.lu, self.instance)
2634

    
2635
    # Check all devices manually
2636
    self._CheckDevices(self.instance.primary_node, iv_names)
2637

    
2638
    # Step: remove old storage
2639
    if not self.early_release:
2640
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2641
      self._RemoveOldStorage(self.target_node_uuid, iv_names)