Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / instance_storage.py @ 1c3231aa

History | View | Annotate | Download (97.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with storage of instances."""
23

    
24
import itertools
25
import logging
26
import os
27
import time
28

    
29
from ganeti import compat
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import ht
33
from ganeti import locking
34
from ganeti.masterd import iallocator
35
from ganeti import objects
36
from ganeti import utils
37
from ganeti import opcodes
38
from ganeti import rpc
39
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41
  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
42
  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43
  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes
44
from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45
  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46
  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47

    
48
import ganeti.masterd.instance
49

    
50

    
51
_DISK_TEMPLATE_NAME_PREFIX = {
52
  constants.DT_PLAIN: "",
53
  constants.DT_RBD: ".rbd",
54
  constants.DT_EXT: ".ext",
55
  }
56

    
57

    
58
_DISK_TEMPLATE_DEVICE_TYPE = {
59
  constants.DT_PLAIN: constants.LD_LV,
60
  constants.DT_FILE: constants.LD_FILE,
61
  constants.DT_SHARED_FILE: constants.LD_FILE,
62
  constants.DT_BLOCK: constants.LD_BLOCKDEV,
63
  constants.DT_RBD: constants.LD_RBD,
64
  constants.DT_EXT: constants.LD_EXT,
65
  }
66

    
67

    
68
def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
69
                         excl_stor):
70
  """Create a single block device on a given node.
71

72
  This will not recurse over children of the device, so they must be
73
  created in advance.
74

75
  @param lu: the lu on whose behalf we execute
76
  @param node_uuid: the node on which to create the device
77
  @type instance: L{objects.Instance}
78
  @param instance: the instance which owns the device
79
  @type device: L{objects.Disk}
80
  @param device: the device to create
81
  @param info: the extra 'metadata' we should attach to the device
82
      (this will be represented as a LVM tag)
83
  @type force_open: boolean
84
  @param force_open: this parameter will be passes to the
85
      L{backend.BlockdevCreate} function where it specifies
86
      whether we run on primary or not, and it affects both
87
      the child assembly and the device own Open() execution
88
  @type excl_stor: boolean
89
  @param excl_stor: Whether exclusive_storage is active for the node
90

91
  """
92
  lu.cfg.SetDiskID(device, node_uuid)
93
  result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
94
                                       instance.name, force_open, info,
95
                                       excl_stor)
96
  result.Raise("Can't create block device %s on"
97
               " node %s for instance %s" % (device,
98
                                             lu.cfg.GetNodeName(node_uuid),
99
                                             instance.name))
100
  if device.physical_id is None:
101
    device.physical_id = result.payload
102

    
103

    
104
def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
105
                         info, force_open, excl_stor):
106
  """Create a tree of block devices on a given node.
107

108
  If this device type has to be created on secondaries, create it and
109
  all its children.
110

111
  If not, just recurse to children keeping the same 'force' value.
112

113
  @attention: The device has to be annotated already.
114

115
  @param lu: the lu on whose behalf we execute
116
  @param node_uuid: the node on which to create the device
117
  @type instance: L{objects.Instance}
118
  @param instance: the instance which owns the device
119
  @type device: L{objects.Disk}
120
  @param device: the device to create
121
  @type force_create: boolean
122
  @param force_create: whether to force creation of this device; this
123
      will be change to True whenever we find a device which has
124
      CreateOnSecondary() attribute
125
  @param info: the extra 'metadata' we should attach to the device
126
      (this will be represented as a LVM tag)
127
  @type force_open: boolean
128
  @param force_open: this parameter will be passes to the
129
      L{backend.BlockdevCreate} function where it specifies
130
      whether we run on primary or not, and it affects both
131
      the child assembly and the device own Open() execution
132
  @type excl_stor: boolean
133
  @param excl_stor: Whether exclusive_storage is active for the node
134

135
  @return: list of created devices
136
  """
137
  created_devices = []
138
  try:
139
    if device.CreateOnSecondary():
140
      force_create = True
141

    
142
    if device.children:
143
      for child in device.children:
144
        devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
145
                                    force_create, info, force_open, excl_stor)
146
        created_devices.extend(devs)
147

    
148
    if not force_create:
149
      return created_devices
150

    
151
    CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
152
                         excl_stor)
153
    # The device has been completely created, so there is no point in keeping
154
    # its subdevices in the list. We just add the device itself instead.
155
    created_devices = [(node_uuid, device)]
156
    return created_devices
157

    
158
  except errors.DeviceCreationError, e:
159
    e.created_devices.extend(created_devices)
160
    raise e
161
  except errors.OpExecError, e:
162
    raise errors.DeviceCreationError(str(e), created_devices)
163

    
164

    
165
def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
166
  """Whether exclusive_storage is in effect for the given node.
167

168
  @type cfg: L{config.ConfigWriter}
169
  @param cfg: The cluster configuration
170
  @type node_uuid: string
171
  @param node_uuid: The node UUID
172
  @rtype: bool
173
  @return: The effective value of exclusive_storage
174
  @raise errors.OpPrereqError: if no node exists with the given name
175

176
  """
177
  ni = cfg.GetNodeInfo(node_uuid)
178
  if ni is None:
179
    raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
180
                               errors.ECODE_NOENT)
181
  return IsExclusiveStorageEnabledNode(cfg, ni)
182

    
183

    
184
def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
185
                    force_open):
186
  """Wrapper around L{_CreateBlockDevInner}.
187

188
  This method annotates the root device first.
189

190
  """
191
  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192
  excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
193
  return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
194
                              force_open, excl_stor)
195

    
196

    
197
def _UndoCreateDisks(lu, disks_created):
198
  """Undo the work performed by L{CreateDisks}.
199

200
  This function is called in case of an error to undo the work of
201
  L{CreateDisks}.
202

203
  @type lu: L{LogicalUnit}
204
  @param lu: the logical unit on whose behalf we execute
205
  @param disks_created: the result returned by L{CreateDisks}
206

207
  """
208
  for (node_uuid, disk) in disks_created:
209
    lu.cfg.SetDiskID(disk, node_uuid)
210
    result = lu.rpc.call_blockdev_remove(node_uuid, disk)
211
    result.Warn("Failed to remove newly-created disk %s on node %s" %
212
                (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213

    
214

    
215
def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
216
  """Create all disks for an instance.
217

218
  This abstracts away some work from AddInstance.
219

220
  @type lu: L{LogicalUnit}
221
  @param lu: the logical unit on whose behalf we execute
222
  @type instance: L{objects.Instance}
223
  @param instance: the instance whose disks we should create
224
  @type to_skip: list
225
  @param to_skip: list of indices to skip
226
  @type target_node_uuid: string
227
  @param target_node_uuid: if passed, overrides the target node for creation
228
  @type disks: list of {objects.Disk}
229
  @param disks: the disks to create; if not specified, all the disks of the
230
      instance are created
231
  @return: information about the created disks, to be used to call
232
      L{_UndoCreateDisks}
233
  @raise errors.OpPrereqError: in case of error
234

235
  """
236
  info = GetInstanceInfoText(instance)
237
  if target_node_uuid is None:
238
    pnode_uuid = instance.primary_node
239
    all_node_uuids = instance.all_nodes
240
  else:
241
    pnode_uuid = target_node_uuid
242
    all_node_uuids = [pnode_uuid]
243

    
244
  if disks is None:
245
    disks = instance.disks
246

    
247
  if instance.disk_template in constants.DTS_FILEBASED:
248
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
249
    result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
250

    
251
    result.Raise("Failed to create directory '%s' on"
252
                 " node %s" % (file_storage_dir,
253
                               lu.cfg.GetNodeName(pnode_uuid)))
254

    
255
  disks_created = []
256
  for idx, device in enumerate(disks):
257
    if to_skip and idx in to_skip:
258
      continue
259
    logging.info("Creating disk %s for instance '%s'", idx, instance.name)
260
    for node_uuid in all_node_uuids:
261
      f_create = node_uuid == pnode_uuid
262
      try:
263
        _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
264
                        f_create)
265
        disks_created.append((node_uuid, device))
266
      except errors.DeviceCreationError, e:
267
        logging.warning("Creating disk %s for instance '%s' failed",
268
                        idx, instance.name)
269
        disks_created.extend(e.created_devices)
270
        _UndoCreateDisks(lu, disks_created)
271
        raise errors.OpExecError(e.message)
272
  return disks_created
273

    
274

    
275
def ComputeDiskSizePerVG(disk_template, disks):
276
  """Compute disk size requirements in the volume group
277

278
  """
279
  def _compute(disks, payload):
280
    """Universal algorithm.
281

282
    """
283
    vgs = {}
284
    for disk in disks:
285
      vgs[disk[constants.IDISK_VG]] = \
286
        vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
287

    
288
    return vgs
289

    
290
  # Required free disk space as a function of disk and swap space
291
  req_size_dict = {
292
    constants.DT_DISKLESS: {},
293
    constants.DT_PLAIN: _compute(disks, 0),
294
    # 128 MB are added for drbd metadata for each disk
295
    constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
296
    constants.DT_FILE: {},
297
    constants.DT_SHARED_FILE: {},
298
    }
299

    
300
  if disk_template not in req_size_dict:
301
    raise errors.ProgrammerError("Disk template '%s' size requirement"
302
                                 " is unknown" % disk_template)
303

    
304
  return req_size_dict[disk_template]
305

    
306

    
307
def ComputeDisks(op, default_vg):
308
  """Computes the instance disks.
309

310
  @param op: The instance opcode
311
  @param default_vg: The default_vg to assume
312

313
  @return: The computed disks
314

315
  """
316
  disks = []
317
  for disk in op.disks:
318
    mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
319
    if mode not in constants.DISK_ACCESS_SET:
320
      raise errors.OpPrereqError("Invalid disk access mode '%s'" %
321
                                 mode, errors.ECODE_INVAL)
322
    size = disk.get(constants.IDISK_SIZE, None)
323
    if size is None:
324
      raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
325
    try:
326
      size = int(size)
327
    except (TypeError, ValueError):
328
      raise errors.OpPrereqError("Invalid disk size '%s'" % size,
329
                                 errors.ECODE_INVAL)
330

    
331
    ext_provider = disk.get(constants.IDISK_PROVIDER, None)
332
    if ext_provider and op.disk_template != constants.DT_EXT:
333
      raise errors.OpPrereqError("The '%s' option is only valid for the %s"
334
                                 " disk template, not %s" %
335
                                 (constants.IDISK_PROVIDER, constants.DT_EXT,
336
                                  op.disk_template), errors.ECODE_INVAL)
337

    
338
    data_vg = disk.get(constants.IDISK_VG, default_vg)
339
    name = disk.get(constants.IDISK_NAME, None)
340
    if name is not None and name.lower() == constants.VALUE_NONE:
341
      name = None
342
    new_disk = {
343
      constants.IDISK_SIZE: size,
344
      constants.IDISK_MODE: mode,
345
      constants.IDISK_VG: data_vg,
346
      constants.IDISK_NAME: name,
347
      }
348

    
349
    for key in [
350
      constants.IDISK_METAVG,
351
      constants.IDISK_ADOPT,
352
      constants.IDISK_SPINDLES,
353
      ]:
354
      if key in disk:
355
        new_disk[key] = disk[key]
356

    
357
    # For extstorage, demand the `provider' option and add any
358
    # additional parameters (ext-params) to the dict
359
    if op.disk_template == constants.DT_EXT:
360
      if ext_provider:
361
        new_disk[constants.IDISK_PROVIDER] = ext_provider
362
        for key in disk:
363
          if key not in constants.IDISK_PARAMS:
364
            new_disk[key] = disk[key]
365
      else:
366
        raise errors.OpPrereqError("Missing provider for template '%s'" %
367
                                   constants.DT_EXT, errors.ECODE_INVAL)
368

    
369
    disks.append(new_disk)
370

    
371
  return disks
372

    
373

    
374
def CheckRADOSFreeSpace():
375
  """Compute disk size requirements inside the RADOS cluster.
376

377
  """
378
  # For the RADOS cluster we assume there is always enough space.
379
  pass
380

    
381

    
382
def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
383
                         iv_name, p_minor, s_minor):
384
  """Generate a drbd8 device complete with its children.
385

386
  """
387
  assert len(vgnames) == len(names) == 2
388
  port = lu.cfg.AllocatePort()
389
  shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
390

    
391
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
392
                          logical_id=(vgnames[0], names[0]),
393
                          params={})
394
  dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
395
  dev_meta = objects.Disk(dev_type=constants.LD_LV,
396
                          size=constants.DRBD_META_SIZE,
397
                          logical_id=(vgnames[1], names[1]),
398
                          params={})
399
  dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
400
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
401
                          logical_id=(primary_uuid, secondary_uuid, port,
402
                                      p_minor, s_minor,
403
                                      shared_secret),
404
                          children=[dev_data, dev_meta],
405
                          iv_name=iv_name, params={})
406
  drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
407
  return drbd_dev
408

    
409

    
410
def GenerateDiskTemplate(
411
  lu, template_name, instance_name, primary_node_uuid, secondary_node_uuids,
412
  disk_info, file_storage_dir, file_driver, base_index,
413
  feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
414
  _req_shr_file_storage=opcodes.RequireSharedFileStorage):
415
  """Generate the entire disk layout for a given template type.
416

417
  """
418
  vgname = lu.cfg.GetVGName()
419
  disk_count = len(disk_info)
420
  disks = []
421

    
422
  if template_name == constants.DT_DISKLESS:
423
    pass
424
  elif template_name == constants.DT_DRBD8:
425
    if len(secondary_node_uuids) != 1:
426
      raise errors.ProgrammerError("Wrong template configuration")
427
    remote_node_uuid = secondary_node_uuids[0]
428
    minors = lu.cfg.AllocateDRBDMinor(
429
      [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_name)
430

    
431
    (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
432
                                                       full_disk_params)
433
    drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
434

    
435
    names = []
436
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
437
                                               for i in range(disk_count)]):
438
      names.append(lv_prefix + "_data")
439
      names.append(lv_prefix + "_meta")
440
    for idx, disk in enumerate(disk_info):
441
      disk_index = idx + base_index
442
      data_vg = disk.get(constants.IDISK_VG, vgname)
443
      meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
444
      disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
445
                                      disk[constants.IDISK_SIZE],
446
                                      [data_vg, meta_vg],
447
                                      names[idx * 2:idx * 2 + 2],
448
                                      "disk/%d" % disk_index,
449
                                      minors[idx * 2], minors[idx * 2 + 1])
450
      disk_dev.mode = disk[constants.IDISK_MODE]
451
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
452
      disks.append(disk_dev)
453
  else:
454
    if secondary_node_uuids:
455
      raise errors.ProgrammerError("Wrong template configuration")
456

    
457
    if template_name == constants.DT_FILE:
458
      _req_file_storage()
459
    elif template_name == constants.DT_SHARED_FILE:
460
      _req_shr_file_storage()
461

    
462
    name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
463
    if name_prefix is None:
464
      names = None
465
    else:
466
      names = _GenerateUniqueNames(lu, ["%s.disk%s" %
467
                                        (name_prefix, base_index + i)
468
                                        for i in range(disk_count)])
469

    
470
    if template_name == constants.DT_PLAIN:
471

    
472
      def logical_id_fn(idx, _, disk):
473
        vg = disk.get(constants.IDISK_VG, vgname)
474
        return (vg, names[idx])
475

    
476
    elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
477
      logical_id_fn = \
478
        lambda _, disk_index, disk: (file_driver,
479
                                     "%s/disk%d" % (file_storage_dir,
480
                                                    disk_index))
481
    elif template_name == constants.DT_BLOCK:
482
      logical_id_fn = \
483
        lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
484
                                       disk[constants.IDISK_ADOPT])
485
    elif template_name == constants.DT_RBD:
486
      logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
487
    elif template_name == constants.DT_EXT:
488
      def logical_id_fn(idx, _, disk):
489
        provider = disk.get(constants.IDISK_PROVIDER, None)
490
        if provider is None:
491
          raise errors.ProgrammerError("Disk template is %s, but '%s' is"
492
                                       " not found", constants.DT_EXT,
493
                                       constants.IDISK_PROVIDER)
494
        return (provider, names[idx])
495
    else:
496
      raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
497

    
498
    dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
499

    
500
    for idx, disk in enumerate(disk_info):
501
      params = {}
502
      # Only for the Ext template add disk_info to params
503
      if template_name == constants.DT_EXT:
504
        params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
505
        for key in disk:
506
          if key not in constants.IDISK_PARAMS:
507
            params[key] = disk[key]
508
      disk_index = idx + base_index
509
      size = disk[constants.IDISK_SIZE]
510
      feedback_fn("* disk %s, size %s" %
511
                  (disk_index, utils.FormatUnit(size, "h")))
512
      disk_dev = objects.Disk(dev_type=dev_type, size=size,
513
                              logical_id=logical_id_fn(idx, disk_index, disk),
514
                              iv_name="disk/%d" % disk_index,
515
                              mode=disk[constants.IDISK_MODE],
516
                              params=params,
517
                              spindles=disk.get(constants.IDISK_SPINDLES))
518
      disk_dev.name = disk.get(constants.IDISK_NAME, None)
519
      disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
520
      disks.append(disk_dev)
521

    
522
  return disks
523

    
524

    
525
def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
526
  """Check the presence of the spindle options with exclusive_storage.
527

528
  @type diskdict: dict
529
  @param diskdict: disk parameters
530
  @type es_flag: bool
531
  @param es_flag: the effective value of the exlusive_storage flag
532
  @type required: bool
533
  @param required: whether spindles are required or just optional
534
  @raise errors.OpPrereqError when spindles are given and they should not
535

536
  """
537
  if (not es_flag and constants.IDISK_SPINDLES in diskdict and
538
      diskdict[constants.IDISK_SPINDLES] is not None):
539
    raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
540
                               " when exclusive storage is not active",
541
                               errors.ECODE_INVAL)
542
  if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
543
                                diskdict[constants.IDISK_SPINDLES] is None)):
544
    raise errors.OpPrereqError("You must specify spindles in instance disks"
545
                               " when exclusive storage is active",
546
                               errors.ECODE_INVAL)
547

    
548

    
549
class LUInstanceRecreateDisks(LogicalUnit):
550
  """Recreate an instance's missing disks.
551

552
  """
553
  HPATH = "instance-recreate-disks"
554
  HTYPE = constants.HTYPE_INSTANCE
555
  REQ_BGL = False
556

    
557
  _MODIFYABLE = compat.UniqueFrozenset([
558
    constants.IDISK_SIZE,
559
    constants.IDISK_MODE,
560
    constants.IDISK_SPINDLES,
561
    ])
562

    
563
  # New or changed disk parameters may have different semantics
564
  assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
565
    constants.IDISK_ADOPT,
566

    
567
    # TODO: Implement support changing VG while recreating
568
    constants.IDISK_VG,
569
    constants.IDISK_METAVG,
570
    constants.IDISK_PROVIDER,
571
    constants.IDISK_NAME,
572
    ]))
573

    
574
  def _RunAllocator(self):
575
    """Run the allocator based on input opcode.
576

577
    """
578
    be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
579

    
580
    # FIXME
581
    # The allocator should actually run in "relocate" mode, but current
582
    # allocators don't support relocating all the nodes of an instance at
583
    # the same time. As a workaround we use "allocate" mode, but this is
584
    # suboptimal for two reasons:
585
    # - The instance name passed to the allocator is present in the list of
586
    #   existing instances, so there could be a conflict within the
587
    #   internal structures of the allocator. This doesn't happen with the
588
    #   current allocators, but it's a liability.
589
    # - The allocator counts the resources used by the instance twice: once
590
    #   because the instance exists already, and once because it tries to
591
    #   allocate a new instance.
592
    # The allocator could choose some of the nodes on which the instance is
593
    # running, but that's not a problem. If the instance nodes are broken,
594
    # they should be already be marked as drained or offline, and hence
595
    # skipped by the allocator. If instance disks have been lost for other
596
    # reasons, then recreating the disks on the same nodes should be fine.
597
    disk_template = self.instance.disk_template
598
    spindle_use = be_full[constants.BE_SPINDLE_USE]
599
    disks = [{
600
      constants.IDISK_SIZE: d.size,
601
      constants.IDISK_MODE: d.mode,
602
      constants.IDISK_SPINDLES: d.spindles,
603
      } for d in self.instance.disks]
604
    req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
605
                                        disk_template=disk_template,
606
                                        tags=list(self.instance.GetTags()),
607
                                        os=self.instance.os,
608
                                        nics=[{}],
609
                                        vcpus=be_full[constants.BE_VCPUS],
610
                                        memory=be_full[constants.BE_MAXMEM],
611
                                        spindle_use=spindle_use,
612
                                        disks=disks,
613
                                        hypervisor=self.instance.hypervisor,
614
                                        node_whitelist=None)
615
    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
616

    
617
    ial.Run(self.op.iallocator)
618

    
619
    assert req.RequiredNodes() == len(self.instance.all_nodes)
620

    
621
    if not ial.success:
622
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
623
                                 " %s" % (self.op.iallocator, ial.info),
624
                                 errors.ECODE_NORES)
625

    
626
    (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
627
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
628
                 self.op.instance_name, self.op.iallocator,
629
                 utils.CommaJoin(self.op.nodes))
630

    
631
  def CheckArguments(self):
632
    if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
633
      # Normalize and convert deprecated list of disk indices
634
      self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
635

    
636
    duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
637
    if duplicates:
638
      raise errors.OpPrereqError("Some disks have been specified more than"
639
                                 " once: %s" % utils.CommaJoin(duplicates),
640
                                 errors.ECODE_INVAL)
641

    
642
    # We don't want _CheckIAllocatorOrNode selecting the default iallocator
643
    # when neither iallocator nor nodes are specified
644
    if self.op.iallocator or self.op.nodes:
645
      CheckIAllocatorOrNode(self, "iallocator", "nodes")
646

    
647
    for (idx, params) in self.op.disks:
648
      utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
649
      unsupported = frozenset(params.keys()) - self._MODIFYABLE
650
      if unsupported:
651
        raise errors.OpPrereqError("Parameters for disk %s try to change"
652
                                   " unmodifyable parameter(s): %s" %
653
                                   (idx, utils.CommaJoin(unsupported)),
654
                                   errors.ECODE_INVAL)
655

    
656
  def ExpandNames(self):
657
    self._ExpandAndLockInstance()
658
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
659

    
660
    if self.op.nodes:
661
      (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
662
      self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
663
    else:
664
      self.needed_locks[locking.LEVEL_NODE] = []
665
      if self.op.iallocator:
666
        # iallocator will select a new node in the same group
667
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
668
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
669

    
670
    self.needed_locks[locking.LEVEL_NODE_RES] = []
671

    
672
  def DeclareLocks(self, level):
673
    if level == locking.LEVEL_NODEGROUP:
674
      assert self.op.iallocator is not None
675
      assert not self.op.nodes
676
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
677
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
678
      # Lock the primary group used by the instance optimistically; this
679
      # requires going via the node before it's locked, requiring
680
      # verification later on
681
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
682
        self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
683

    
684
    elif level == locking.LEVEL_NODE:
685
      # If an allocator is used, then we lock all the nodes in the current
686
      # instance group, as we don't know yet which ones will be selected;
687
      # if we replace the nodes without using an allocator, locks are
688
      # already declared in ExpandNames; otherwise, we need to lock all the
689
      # instance nodes for disk re-creation
690
      if self.op.iallocator:
691
        assert not self.op.nodes
692
        assert not self.needed_locks[locking.LEVEL_NODE]
693
        assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
694

    
695
        # Lock member nodes of the group of the primary node
696
        for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
697
          self.needed_locks[locking.LEVEL_NODE].extend(
698
            self.cfg.GetNodeGroup(group_uuid).members)
699

    
700
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
701
      elif not self.op.nodes:
702
        self._LockInstancesNodes(primary_only=False)
703
    elif level == locking.LEVEL_NODE_RES:
704
      # Copy node locks
705
      self.needed_locks[locking.LEVEL_NODE_RES] = \
706
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
707

    
708
  def BuildHooksEnv(self):
709
    """Build hooks env.
710

711
    This runs on master, primary and secondary nodes of the instance.
712

713
    """
714
    return BuildInstanceHookEnvByObject(self, self.instance)
715

    
716
  def BuildHooksNodes(self):
717
    """Build hooks nodes.
718

719
    """
720
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
721
    return (nl, nl)
722

    
723
  def CheckPrereq(self):
724
    """Check prerequisites.
725

726
    This checks that the instance is in the cluster and is not running.
727

728
    """
729
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
730
    assert instance is not None, \
731
      "Cannot retrieve locked instance %s" % self.op.instance_name
732
    if self.op.node_uuids:
733
      if len(self.op.node_uuids) != len(instance.all_nodes):
734
        raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
735
                                   " %d replacement nodes were specified" %
736
                                   (instance.name, len(instance.all_nodes),
737
                                    len(self.op.node_uuids)),
738
                                   errors.ECODE_INVAL)
739
      assert instance.disk_template != constants.DT_DRBD8 or \
740
             len(self.op.node_uuids) == 2
741
      assert instance.disk_template != constants.DT_PLAIN or \
742
             len(self.op.node_uuids) == 1
743
      primary_node = self.op.node_uuids[0]
744
    else:
745
      primary_node = instance.primary_node
746
    if not self.op.iallocator:
747
      CheckNodeOnline(self, primary_node)
748

    
749
    if instance.disk_template == constants.DT_DISKLESS:
750
      raise errors.OpPrereqError("Instance '%s' has no disks" %
751
                                 self.op.instance_name, errors.ECODE_INVAL)
752

    
753
    # Verify if node group locks are still correct
754
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
755
    if owned_groups:
756
      # Node group locks are acquired only for the primary node (and only
757
      # when the allocator is used)
758
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
759
                              primary_only=True)
760

    
761
    # if we replace nodes *and* the old primary is offline, we don't
762
    # check the instance state
763
    old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
764
    if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
765
      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
766
                         msg="cannot recreate disks")
767

    
768
    if self.op.disks:
769
      self.disks = dict(self.op.disks)
770
    else:
771
      self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
772

    
773
    maxidx = max(self.disks.keys())
774
    if maxidx >= len(instance.disks):
775
      raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
776
                                 errors.ECODE_INVAL)
777

    
778
    if ((self.op.node_uuids or self.op.iallocator) and
779
         sorted(self.disks.keys()) != range(len(instance.disks))):
780
      raise errors.OpPrereqError("Can't recreate disks partially and"
781
                                 " change the nodes at the same time",
782
                                 errors.ECODE_INVAL)
783

    
784
    self.instance = instance
785

    
786
    if self.op.iallocator:
787
      self._RunAllocator()
788
      # Release unneeded node and node resource locks
789
      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
790
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
791
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
792

    
793
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
794

    
795
    if self.op.node_uuids:
796
      node_uuids = self.op.node_uuids
797
    else:
798
      node_uuids = instance.all_nodes
799
    excl_stor = compat.any(
800
      rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
801
      )
802
    for new_params in self.disks.values():
803
      CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
804

    
805
  def Exec(self, feedback_fn):
806
    """Recreate the disks.
807

808
    """
809
    instance = self.instance
810

    
811
    assert (self.owned_locks(locking.LEVEL_NODE) ==
812
            self.owned_locks(locking.LEVEL_NODE_RES))
813

    
814
    to_skip = []
815
    mods = [] # keeps track of needed changes
816

    
817
    for idx, disk in enumerate(instance.disks):
818
      try:
819
        changes = self.disks[idx]
820
      except KeyError:
821
        # Disk should not be recreated
822
        to_skip.append(idx)
823
        continue
824

    
825
      # update secondaries for disks, if needed
826
      if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
827
        # need to update the nodes and minors
828
        assert len(self.op.node_uuids) == 2
829
        assert len(disk.logical_id) == 6 # otherwise disk internals
830
                                         # have changed
831
        (_, _, old_port, _, _, old_secret) = disk.logical_id
832
        new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
833
                                                instance.name)
834
        new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
835
                  new_minors[0], new_minors[1], old_secret)
836
        assert len(disk.logical_id) == len(new_id)
837
      else:
838
        new_id = None
839

    
840
      mods.append((idx, new_id, changes))
841

    
842
    # now that we have passed all asserts above, we can apply the mods
843
    # in a single run (to avoid partial changes)
844
    for idx, new_id, changes in mods:
845
      disk = instance.disks[idx]
846
      if new_id is not None:
847
        assert disk.dev_type == constants.LD_DRBD8
848
        disk.logical_id = new_id
849
      if changes:
850
        disk.Update(size=changes.get(constants.IDISK_SIZE, None),
851
                    mode=changes.get(constants.IDISK_MODE, None),
852
                    spindles=changes.get(constants.IDISK_SPINDLES, None))
853

    
854
    # change primary node, if needed
855
    if self.op.node_uuids:
856
      instance.primary_node = self.op.node_uuids[0]
857
      self.LogWarning("Changing the instance's nodes, you will have to"
858
                      " remove any disks left on the older nodes manually")
859

    
860
    if self.op.node_uuids:
861
      self.cfg.Update(instance, feedback_fn)
862

    
863
    # All touched nodes must be locked
864
    mylocks = self.owned_locks(locking.LEVEL_NODE)
865
    assert mylocks.issuperset(frozenset(instance.all_nodes))
866
    new_disks = CreateDisks(self, instance, to_skip=to_skip)
867

    
868
    # TODO: Release node locks before wiping, or explain why it's not possible
869
    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
870
      wipedisks = [(idx, disk, 0)
871
                   for (idx, disk) in enumerate(instance.disks)
872
                   if idx not in to_skip]
873
      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
874

    
875

    
876
def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
877
  """Checks if nodes have enough free disk space in the specified VG.
878

879
  This function checks if all given nodes have the needed amount of
880
  free disk. In case any node has less disk or we cannot get the
881
  information from the node, this function raises an OpPrereqError
882
  exception.
883

884
  @type lu: C{LogicalUnit}
885
  @param lu: a logical unit from which we get configuration data
886
  @type node_uuids: C{list}
887
  @param node_uuids: the list of node UUIDs to check
888
  @type vg: C{str}
889
  @param vg: the volume group to check
890
  @type requested: C{int}
891
  @param requested: the amount of disk in MiB to check for
892
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
893
      or we cannot check the node
894

895
  """
896
  es_flags = rpc.GetExclusiveStorageForNodes(lu.cfg, node_uuids)
897
  # FIXME: This maps everything to storage type 'lvm-vg' to maintain
898
  # the current functionality. Refactor to make it more flexible.
899
  hvname = lu.cfg.GetHypervisorType()
900
  hvparams = lu.cfg.GetClusterInfo().hvparams
901
  nodeinfo = lu.rpc.call_node_info(node_uuids, [(constants.ST_LVM_VG, vg)],
902
                                   [(hvname, hvparams[hvname])], es_flags)
903
  for node in node_uuids:
904
    node_name = lu.cfg.GetNodeName(node)
905

    
906
    info = nodeinfo[node]
907
    info.Raise("Cannot get current information from node %s" % node_name,
908
               prereq=True, ecode=errors.ECODE_ENVIRON)
909
    (_, (vg_info, ), _) = info.payload
910
    vg_free = vg_info.get("vg_free", None)
911
    if not isinstance(vg_free, int):
912
      raise errors.OpPrereqError("Can't compute free disk space on node"
913
                                 " %s for vg %s, result was '%s'" %
914
                                 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
915
    if requested > vg_free:
916
      raise errors.OpPrereqError("Not enough disk space on target node %s"
917
                                 " vg %s: required %d MiB, available %d MiB" %
918
                                 (node_name, vg, requested, vg_free),
919
                                 errors.ECODE_NORES)
920

    
921

    
922
def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
923
  """Checks if nodes have enough free disk space in all the VGs.
924

925
  This function checks if all given nodes have the needed amount of
926
  free disk. In case any node has less disk or we cannot get the
927
  information from the node, this function raises an OpPrereqError
928
  exception.
929

930
  @type lu: C{LogicalUnit}
931
  @param lu: a logical unit from which we get configuration data
932
  @type node_uuids: C{list}
933
  @param node_uuids: the list of node UUIDs to check
934
  @type req_sizes: C{dict}
935
  @param req_sizes: the hash of vg and corresponding amount of disk in
936
      MiB to check for
937
  @raise errors.OpPrereqError: if the node doesn't have enough disk,
938
      or we cannot check the node
939

940
  """
941
  for vg, req_size in req_sizes.items():
942
    _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
943

    
944

    
945
def _DiskSizeInBytesToMebibytes(lu, size):
946
  """Converts a disk size in bytes to mebibytes.
947

948
  Warns and rounds up if the size isn't an even multiple of 1 MiB.
949

950
  """
951
  (mib, remainder) = divmod(size, 1024 * 1024)
952

    
953
  if remainder != 0:
954
    lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
955
                  " to not overwrite existing data (%s bytes will not be"
956
                  " wiped)", (1024 * 1024) - remainder)
957
    mib += 1
958

    
959
  return mib
960

    
961

    
962
def _CalcEta(time_taken, written, total_size):
963
  """Calculates the ETA based on size written and total size.
964

965
  @param time_taken: The time taken so far
966
  @param written: amount written so far
967
  @param total_size: The total size of data to be written
968
  @return: The remaining time in seconds
969

970
  """
971
  avg_time = time_taken / float(written)
972
  return (total_size - written) * avg_time
973

    
974

    
975
def WipeDisks(lu, instance, disks=None):
976
  """Wipes instance disks.
977

978
  @type lu: L{LogicalUnit}
979
  @param lu: the logical unit on whose behalf we execute
980
  @type instance: L{objects.Instance}
981
  @param instance: the instance whose disks we should create
982
  @type disks: None or list of tuple of (number, L{objects.Disk}, number)
983
  @param disks: Disk details; tuple contains disk index, disk object and the
984
    start offset
985

986
  """
987
  node_uuid = instance.primary_node
988
  node_name = lu.cfg.GetNodeName(node_uuid)
989

    
990
  if disks is None:
991
    disks = [(idx, disk, 0)
992
             for (idx, disk) in enumerate(instance.disks)]
993

    
994
  for (_, device, _) in disks:
995
    lu.cfg.SetDiskID(device, node_uuid)
996

    
997
  logging.info("Pausing synchronization of disks of instance '%s'",
998
               instance.name)
999
  result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1000
                                                  (map(compat.snd, disks),
1001
                                                   instance),
1002
                                                  True)
1003
  result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1004

    
1005
  for idx, success in enumerate(result.payload):
1006
    if not success:
1007
      logging.warn("Pausing synchronization of disk %s of instance '%s'"
1008
                   " failed", idx, instance.name)
1009

    
1010
  try:
1011
    for (idx, device, offset) in disks:
1012
      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1013
      # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1014
      wipe_chunk_size = \
1015
        int(min(constants.MAX_WIPE_CHUNK,
1016
                device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1017

    
1018
      size = device.size
1019
      last_output = 0
1020
      start_time = time.time()
1021

    
1022
      if offset == 0:
1023
        info_text = ""
1024
      else:
1025
        info_text = (" (from %s to %s)" %
1026
                     (utils.FormatUnit(offset, "h"),
1027
                      utils.FormatUnit(size, "h")))
1028

    
1029
      lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1030

    
1031
      logging.info("Wiping disk %d for instance %s on node %s using"
1032
                   " chunk size %s", idx, instance.name, node_name,
1033
                   wipe_chunk_size)
1034

    
1035
      while offset < size:
1036
        wipe_size = min(wipe_chunk_size, size - offset)
1037

    
1038
        logging.debug("Wiping disk %d, offset %s, chunk %s",
1039
                      idx, offset, wipe_size)
1040

    
1041
        result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1042
                                           offset, wipe_size)
1043
        result.Raise("Could not wipe disk %d at offset %d for size %d" %
1044
                     (idx, offset, wipe_size))
1045

    
1046
        now = time.time()
1047
        offset += wipe_size
1048
        if now - last_output >= 60:
1049
          eta = _CalcEta(now - start_time, offset, size)
1050
          lu.LogInfo(" - done: %.1f%% ETA: %s",
1051
                     offset / float(size) * 100, utils.FormatSeconds(eta))
1052
          last_output = now
1053
  finally:
1054
    logging.info("Resuming synchronization of disks for instance '%s'",
1055
                 instance.name)
1056

    
1057
    result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1058
                                                    (map(compat.snd, disks),
1059
                                                     instance),
1060
                                                    False)
1061

    
1062
    if result.fail_msg:
1063
      lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1064
                    node_name, result.fail_msg)
1065
    else:
1066
      for idx, success in enumerate(result.payload):
1067
        if not success:
1068
          lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1069
                        " failed", idx, instance.name)
1070

    
1071

    
1072
def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1073
  """Wrapper for L{WipeDisks} that handles errors.
1074

1075
  @type lu: L{LogicalUnit}
1076
  @param lu: the logical unit on whose behalf we execute
1077
  @type instance: L{objects.Instance}
1078
  @param instance: the instance whose disks we should wipe
1079
  @param disks: see L{WipeDisks}
1080
  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1081
      case of error
1082
  @raise errors.OpPrereqError: in case of failure
1083

1084
  """
1085
  try:
1086
    WipeDisks(lu, instance, disks=disks)
1087
  except errors.OpExecError:
1088
    logging.warning("Wiping disks for instance '%s' failed",
1089
                    instance.name)
1090
    _UndoCreateDisks(lu, cleanup)
1091
    raise
1092

    
1093

    
1094
def ExpandCheckDisks(instance, disks):
1095
  """Return the instance disks selected by the disks list
1096

1097
  @type disks: list of L{objects.Disk} or None
1098
  @param disks: selected disks
1099
  @rtype: list of L{objects.Disk}
1100
  @return: selected instance disks to act on
1101

1102
  """
1103
  if disks is None:
1104
    return instance.disks
1105
  else:
1106
    if not set(disks).issubset(instance.disks):
1107
      raise errors.ProgrammerError("Can only act on disks belonging to the"
1108
                                   " target instance: expected a subset of %r,"
1109
                                   " got %r" % (instance.disks, disks))
1110
    return disks
1111

    
1112

    
1113
def WaitForSync(lu, instance, disks=None, oneshot=False):
1114
  """Sleep and poll for an instance's disk to sync.
1115

1116
  """
1117
  if not instance.disks or disks is not None and not disks:
1118
    return True
1119

    
1120
  disks = ExpandCheckDisks(instance, disks)
1121

    
1122
  if not oneshot:
1123
    lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1124

    
1125
  node_uuid = instance.primary_node
1126
  node_name = lu.cfg.GetNodeName(node_uuid)
1127

    
1128
  for dev in disks:
1129
    lu.cfg.SetDiskID(dev, node_uuid)
1130

    
1131
  # TODO: Convert to utils.Retry
1132

    
1133
  retries = 0
1134
  degr_retries = 10 # in seconds, as we sleep 1 second each time
1135
  while True:
1136
    max_time = 0
1137
    done = True
1138
    cumul_degraded = False
1139
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1140
    msg = rstats.fail_msg
1141
    if msg:
1142
      lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1143
      retries += 1
1144
      if retries >= 10:
1145
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1146
                                 " aborting." % node_name)
1147
      time.sleep(6)
1148
      continue
1149
    rstats = rstats.payload
1150
    retries = 0
1151
    for i, mstat in enumerate(rstats):
1152
      if mstat is None:
1153
        lu.LogWarning("Can't compute data for node %s/%s",
1154
                      node_name, disks[i].iv_name)
1155
        continue
1156

    
1157
      cumul_degraded = (cumul_degraded or
1158
                        (mstat.is_degraded and mstat.sync_percent is None))
1159
      if mstat.sync_percent is not None:
1160
        done = False
1161
        if mstat.estimated_time is not None:
1162
          rem_time = ("%s remaining (estimated)" %
1163
                      utils.FormatSeconds(mstat.estimated_time))
1164
          max_time = mstat.estimated_time
1165
        else:
1166
          rem_time = "no time estimate"
1167
        lu.LogInfo("- device %s: %5.2f%% done, %s",
1168
                   disks[i].iv_name, mstat.sync_percent, rem_time)
1169

    
1170
    # if we're done but degraded, let's do a few small retries, to
1171
    # make sure we see a stable and not transient situation; therefore
1172
    # we force restart of the loop
1173
    if (done or oneshot) and cumul_degraded and degr_retries > 0:
1174
      logging.info("Degraded disks found, %d retries left", degr_retries)
1175
      degr_retries -= 1
1176
      time.sleep(1)
1177
      continue
1178

    
1179
    if done or oneshot:
1180
      break
1181

    
1182
    time.sleep(min(60, max_time))
1183

    
1184
  if done:
1185
    lu.LogInfo("Instance %s's disks are in sync", instance.name)
1186

    
1187
  return not cumul_degraded
1188

    
1189

    
1190
def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1191
  """Shutdown block devices of an instance.
1192

1193
  This does the shutdown on all nodes of the instance.
1194

1195
  If the ignore_primary is false, errors on the primary node are
1196
  ignored.
1197

1198
  """
1199
  lu.cfg.MarkInstanceDisksInactive(instance.name)
1200
  all_result = True
1201
  disks = ExpandCheckDisks(instance, disks)
1202

    
1203
  for disk in disks:
1204
    for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1205
      lu.cfg.SetDiskID(top_disk, node_uuid)
1206
      result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1207
      msg = result.fail_msg
1208
      if msg:
1209
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1210
                      disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1211
        if ((node_uuid == instance.primary_node and not ignore_primary) or
1212
            (node_uuid != instance.primary_node and not result.offline)):
1213
          all_result = False
1214
  return all_result
1215

    
1216

    
1217
def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1218
  """Shutdown block devices of an instance.
1219

1220
  This function checks if an instance is running, before calling
1221
  _ShutdownInstanceDisks.
1222

1223
  """
1224
  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1225
  ShutdownInstanceDisks(lu, instance, disks=disks)
1226

    
1227

    
1228
def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1229
                           ignore_size=False):
1230
  """Prepare the block devices for an instance.
1231

1232
  This sets up the block devices on all nodes.
1233

1234
  @type lu: L{LogicalUnit}
1235
  @param lu: the logical unit on whose behalf we execute
1236
  @type instance: L{objects.Instance}
1237
  @param instance: the instance for whose disks we assemble
1238
  @type disks: list of L{objects.Disk} or None
1239
  @param disks: which disks to assemble (or all, if None)
1240
  @type ignore_secondaries: boolean
1241
  @param ignore_secondaries: if true, errors on secondary nodes
1242
      won't result in an error return from the function
1243
  @type ignore_size: boolean
1244
  @param ignore_size: if true, the current known size of the disk
1245
      will not be used during the disk activation, useful for cases
1246
      when the size is wrong
1247
  @return: False if the operation failed, otherwise a list of
1248
      (host, instance_visible_name, node_visible_name)
1249
      with the mapping from node devices to instance devices
1250

1251
  """
1252
  device_info = []
1253
  disks_ok = True
1254
  iname = instance.name
1255
  disks = ExpandCheckDisks(instance, disks)
1256

    
1257
  # With the two passes mechanism we try to reduce the window of
1258
  # opportunity for the race condition of switching DRBD to primary
1259
  # before handshaking occured, but we do not eliminate it
1260

    
1261
  # The proper fix would be to wait (with some limits) until the
1262
  # connection has been made and drbd transitions from WFConnection
1263
  # into any other network-connected state (Connected, SyncTarget,
1264
  # SyncSource, etc.)
1265

    
1266
  # mark instance disks as active before doing actual work, so watcher does
1267
  # not try to shut them down erroneously
1268
  lu.cfg.MarkInstanceDisksActive(iname)
1269

    
1270
  # 1st pass, assemble on all nodes in secondary mode
1271
  for idx, inst_disk in enumerate(disks):
1272
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1273
                                  instance.primary_node):
1274
      if ignore_size:
1275
        node_disk = node_disk.Copy()
1276
        node_disk.UnsetSize()
1277
      lu.cfg.SetDiskID(node_disk, node_uuid)
1278
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1279
                                             iname, False, idx)
1280
      msg = result.fail_msg
1281
      if msg:
1282
        is_offline_secondary = (node_uuid in instance.secondary_nodes and
1283
                                result.offline)
1284
        lu.LogWarning("Could not prepare block device %s on node %s"
1285
                      " (is_primary=False, pass=1): %s",
1286
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1287
        if not (ignore_secondaries or is_offline_secondary):
1288
          disks_ok = False
1289

    
1290
  # FIXME: race condition on drbd migration to primary
1291

    
1292
  # 2nd pass, do only the primary node
1293
  for idx, inst_disk in enumerate(disks):
1294
    dev_path = None
1295

    
1296
    for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1297
                                  instance.primary_node):
1298
      if node_uuid != instance.primary_node:
1299
        continue
1300
      if ignore_size:
1301
        node_disk = node_disk.Copy()
1302
        node_disk.UnsetSize()
1303
      lu.cfg.SetDiskID(node_disk, node_uuid)
1304
      result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1305
                                             iname, True, idx)
1306
      msg = result.fail_msg
1307
      if msg:
1308
        lu.LogWarning("Could not prepare block device %s on node %s"
1309
                      " (is_primary=True, pass=2): %s",
1310
                      inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1311
        disks_ok = False
1312
      else:
1313
        dev_path = result.payload
1314

    
1315
    device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1316
                        inst_disk.iv_name, dev_path))
1317

    
1318
  # leave the disks configured for the primary node
1319
  # this is a workaround that would be fixed better by
1320
  # improving the logical/physical id handling
1321
  for disk in disks:
1322
    lu.cfg.SetDiskID(disk, instance.primary_node)
1323

    
1324
  if not disks_ok:
1325
    lu.cfg.MarkInstanceDisksInactive(iname)
1326

    
1327
  return disks_ok, device_info
1328

    
1329

    
1330
def StartInstanceDisks(lu, instance, force):
1331
  """Start the disks of an instance.
1332

1333
  """
1334
  disks_ok, _ = AssembleInstanceDisks(lu, instance,
1335
                                      ignore_secondaries=force)
1336
  if not disks_ok:
1337
    ShutdownInstanceDisks(lu, instance)
1338
    if force is not None and not force:
1339
      lu.LogWarning("",
1340
                    hint=("If the message above refers to a secondary node,"
1341
                          " you can retry the operation using '--force'"))
1342
    raise errors.OpExecError("Disk consistency error")
1343

    
1344

    
1345
class LUInstanceGrowDisk(LogicalUnit):
1346
  """Grow a disk of an instance.
1347

1348
  """
1349
  HPATH = "disk-grow"
1350
  HTYPE = constants.HTYPE_INSTANCE
1351
  REQ_BGL = False
1352

    
1353
  def ExpandNames(self):
1354
    self._ExpandAndLockInstance()
1355
    self.needed_locks[locking.LEVEL_NODE] = []
1356
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1357
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1358
    self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1359

    
1360
  def DeclareLocks(self, level):
1361
    if level == locking.LEVEL_NODE:
1362
      self._LockInstancesNodes()
1363
    elif level == locking.LEVEL_NODE_RES:
1364
      # Copy node locks
1365
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1366
        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1367

    
1368
  def BuildHooksEnv(self):
1369
    """Build hooks env.
1370

1371
    This runs on the master, the primary and all the secondaries.
1372

1373
    """
1374
    env = {
1375
      "DISK": self.op.disk,
1376
      "AMOUNT": self.op.amount,
1377
      "ABSOLUTE": self.op.absolute,
1378
      }
1379
    env.update(BuildInstanceHookEnvByObject(self, self.instance))
1380
    return env
1381

    
1382
  def BuildHooksNodes(self):
1383
    """Build hooks nodes.
1384

1385
    """
1386
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1387
    return (nl, nl)
1388

    
1389
  def CheckPrereq(self):
1390
    """Check prerequisites.
1391

1392
    This checks that the instance is in the cluster.
1393

1394
    """
1395
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1396
    assert instance is not None, \
1397
      "Cannot retrieve locked instance %s" % self.op.instance_name
1398
    node_uuids = list(instance.all_nodes)
1399
    for node_uuid in node_uuids:
1400
      CheckNodeOnline(self, node_uuid)
1401

    
1402
    self.instance = instance
1403

    
1404
    if instance.disk_template not in constants.DTS_GROWABLE:
1405
      raise errors.OpPrereqError("Instance's disk layout does not support"
1406
                                 " growing", errors.ECODE_INVAL)
1407

    
1408
    self.disk = instance.FindDisk(self.op.disk)
1409

    
1410
    if self.op.absolute:
1411
      self.target = self.op.amount
1412
      self.delta = self.target - self.disk.size
1413
      if self.delta < 0:
1414
        raise errors.OpPrereqError("Requested size (%s) is smaller than "
1415
                                   "current disk size (%s)" %
1416
                                   (utils.FormatUnit(self.target, "h"),
1417
                                    utils.FormatUnit(self.disk.size, "h")),
1418
                                   errors.ECODE_STATE)
1419
    else:
1420
      self.delta = self.op.amount
1421
      self.target = self.disk.size + self.delta
1422
      if self.delta < 0:
1423
        raise errors.OpPrereqError("Requested increment (%s) is negative" %
1424
                                   utils.FormatUnit(self.delta, "h"),
1425
                                   errors.ECODE_INVAL)
1426

    
1427
    self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1428

    
1429
  def _CheckDiskSpace(self, node_uuids, req_vgspace):
1430
    template = self.instance.disk_template
1431
    if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1432
      # TODO: check the free disk space for file, when that feature will be
1433
      # supported
1434
      nodes = map(self.cfg.GetNodeInfo, node_uuids)
1435
      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1436
                        nodes)
1437
      if es_nodes:
1438
        # With exclusive storage we need to something smarter than just looking
1439
        # at free space; for now, let's simply abort the operation.
1440
        raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1441
                                   " is enabled", errors.ECODE_STATE)
1442
      CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1443

    
1444
  def Exec(self, feedback_fn):
1445
    """Execute disk grow.
1446

1447
    """
1448
    instance = self.instance
1449
    disk = self.disk
1450

    
1451
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1452
    assert (self.owned_locks(locking.LEVEL_NODE) ==
1453
            self.owned_locks(locking.LEVEL_NODE_RES))
1454

    
1455
    wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1456

    
1457
    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1458
    if not disks_ok:
1459
      raise errors.OpExecError("Cannot activate block device to grow")
1460

    
1461
    feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1462
                (self.op.disk, instance.name,
1463
                 utils.FormatUnit(self.delta, "h"),
1464
                 utils.FormatUnit(self.target, "h")))
1465

    
1466
    # First run all grow ops in dry-run mode
1467
    for node_uuid in instance.all_nodes:
1468
      self.cfg.SetDiskID(disk, node_uuid)
1469
      result = self.rpc.call_blockdev_grow(node_uuid, (disk, instance),
1470
                                           self.delta, True, True)
1471
      result.Raise("Dry-run grow request failed to node %s" %
1472
                   self.cfg.GetNodeName(node_uuid))
1473

    
1474
    if wipe_disks:
1475
      # Get disk size from primary node for wiping
1476
      result = self.rpc.call_blockdev_getdimensions(instance.primary_node,
1477
                                                    [disk])
1478
      result.Raise("Failed to retrieve disk size from node '%s'" %
1479
                   instance.primary_node)
1480

    
1481
      (disk_dimensions, ) = result.payload
1482

    
1483
      if disk_dimensions is None:
1484
        raise errors.OpExecError("Failed to retrieve disk size from primary"
1485
                                 " node '%s'" % instance.primary_node)
1486
      (disk_size_in_bytes, _) = disk_dimensions
1487

    
1488
      old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1489

    
1490
      assert old_disk_size >= disk.size, \
1491
        ("Retrieved disk size too small (got %s, should be at least %s)" %
1492
         (old_disk_size, disk.size))
1493
    else:
1494
      old_disk_size = None
1495

    
1496
    # We know that (as far as we can test) operations across different
1497
    # nodes will succeed, time to run it for real on the backing storage
1498
    for node_uuid in instance.all_nodes:
1499
      self.cfg.SetDiskID(disk, node_uuid)
1500
      result = self.rpc.call_blockdev_grow(node_uuid, (disk, instance),
1501
                                           self.delta, False, True)
1502
      result.Raise("Grow request failed to node %s" %
1503
                   self.cfg.GetNodeName(node_uuid))
1504

    
1505
    # And now execute it for logical storage, on the primary node
1506
    node_uuid = instance.primary_node
1507
    self.cfg.SetDiskID(disk, node_uuid)
1508
    result = self.rpc.call_blockdev_grow(node_uuid, (disk, instance),
1509
                                         self.delta, False, False)
1510
    result.Raise("Grow request failed to node %s" %
1511
                 self.cfg.GetNodeName(node_uuid))
1512

    
1513
    disk.RecordGrow(self.delta)
1514
    self.cfg.Update(instance, feedback_fn)
1515

    
1516
    # Changes have been recorded, release node lock
1517
    ReleaseLocks(self, locking.LEVEL_NODE)
1518

    
1519
    # Downgrade lock while waiting for sync
1520
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1521

    
1522
    assert wipe_disks ^ (old_disk_size is None)
1523

    
1524
    if wipe_disks:
1525
      assert instance.disks[self.op.disk] == disk
1526

    
1527
      # Wipe newly added disk space
1528
      WipeDisks(self, instance,
1529
                disks=[(self.op.disk, disk, old_disk_size)])
1530

    
1531
    if self.op.wait_for_sync:
1532
      disk_abort = not WaitForSync(self, instance, disks=[disk])
1533
      if disk_abort:
1534
        self.LogWarning("Disk syncing has not returned a good status; check"
1535
                        " the instance")
1536
      if not instance.disks_active:
1537
        _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1538
    elif not instance.disks_active:
1539
      self.LogWarning("Not shutting down the disk even if the instance is"
1540
                      " not supposed to be running because no wait for"
1541
                      " sync mode was requested")
1542

    
1543
    assert self.owned_locks(locking.LEVEL_NODE_RES)
1544
    assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1545

    
1546

    
1547
class LUInstanceReplaceDisks(LogicalUnit):
1548
  """Replace the disks of an instance.
1549

1550
  """
1551
  HPATH = "mirrors-replace"
1552
  HTYPE = constants.HTYPE_INSTANCE
1553
  REQ_BGL = False
1554

    
1555
  def CheckArguments(self):
1556
    """Check arguments.
1557

1558
    """
1559
    remote_node = self.op.remote_node
1560
    ialloc = self.op.iallocator
1561
    if self.op.mode == constants.REPLACE_DISK_CHG:
1562
      if remote_node is None and ialloc is None:
1563
        raise errors.OpPrereqError("When changing the secondary either an"
1564
                                   " iallocator script must be used or the"
1565
                                   " new node given", errors.ECODE_INVAL)
1566
      else:
1567
        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1568

    
1569
    elif remote_node is not None or ialloc is not None:
1570
      # Not replacing the secondary
1571
      raise errors.OpPrereqError("The iallocator and new node options can"
1572
                                 " only be used when changing the"
1573
                                 " secondary node", errors.ECODE_INVAL)
1574

    
1575
  def ExpandNames(self):
1576
    self._ExpandAndLockInstance()
1577

    
1578
    assert locking.LEVEL_NODE not in self.needed_locks
1579
    assert locking.LEVEL_NODE_RES not in self.needed_locks
1580
    assert locking.LEVEL_NODEGROUP not in self.needed_locks
1581

    
1582
    assert self.op.iallocator is None or self.op.remote_node is None, \
1583
      "Conflicting options"
1584

    
1585
    if self.op.remote_node is not None:
1586
      (self.op.remote_node_uuid, self.op.remote_node) = \
1587
        ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1588
                              self.op.remote_node)
1589

    
1590
      # Warning: do not remove the locking of the new secondary here
1591
      # unless DRBD8Dev.AddChildren is changed to work in parallel;
1592
      # currently it doesn't since parallel invocations of
1593
      # FindUnusedMinor will conflict
1594
      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1595
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1596
    else:
1597
      self.needed_locks[locking.LEVEL_NODE] = []
1598
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1599

    
1600
      if self.op.iallocator is not None:
1601
        # iallocator will select a new node in the same group
1602
        self.needed_locks[locking.LEVEL_NODEGROUP] = []
1603
        self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1604

    
1605
    self.needed_locks[locking.LEVEL_NODE_RES] = []
1606

    
1607
    self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1608
                                   self.op.iallocator, self.op.remote_node_uuid,
1609
                                   self.op.disks, self.op.early_release,
1610
                                   self.op.ignore_ipolicy)
1611

    
1612
    self.tasklets = [self.replacer]
1613

    
1614
  def DeclareLocks(self, level):
1615
    if level == locking.LEVEL_NODEGROUP:
1616
      assert self.op.remote_node_uuid is None
1617
      assert self.op.iallocator is not None
1618
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1619

    
1620
      self.share_locks[locking.LEVEL_NODEGROUP] = 1
1621
      # Lock all groups used by instance optimistically; this requires going
1622
      # via the node before it's locked, requiring verification later on
1623
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
1624
        self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1625

    
1626
    elif level == locking.LEVEL_NODE:
1627
      if self.op.iallocator is not None:
1628
        assert self.op.remote_node_uuid is None
1629
        assert not self.needed_locks[locking.LEVEL_NODE]
1630
        assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1631

    
1632
        # Lock member nodes of all locked groups
1633
        self.needed_locks[locking.LEVEL_NODE] = \
1634
          [node_uuid
1635
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1636
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1637
      else:
1638
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1639

    
1640
        self._LockInstancesNodes()
1641

    
1642
    elif level == locking.LEVEL_NODE_RES:
1643
      # Reuse node locks
1644
      self.needed_locks[locking.LEVEL_NODE_RES] = \
1645
        self.needed_locks[locking.LEVEL_NODE]
1646

    
1647
  def BuildHooksEnv(self):
1648
    """Build hooks env.
1649

1650
    This runs on the master, the primary and all the secondaries.
1651

1652
    """
1653
    instance = self.replacer.instance
1654
    env = {
1655
      "MODE": self.op.mode,
1656
      "NEW_SECONDARY": self.op.remote_node,
1657
      "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1658
      }
1659
    env.update(BuildInstanceHookEnvByObject(self, instance))
1660
    return env
1661

    
1662
  def BuildHooksNodes(self):
1663
    """Build hooks nodes.
1664

1665
    """
1666
    instance = self.replacer.instance
1667
    nl = [
1668
      self.cfg.GetMasterNode(),
1669
      instance.primary_node,
1670
      ]
1671
    if self.op.remote_node_uuid is not None:
1672
      nl.append(self.op.remote_node_uuid)
1673
    return nl, nl
1674

    
1675
  def CheckPrereq(self):
1676
    """Check prerequisites.
1677

1678
    """
1679
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1680
            self.op.iallocator is None)
1681

    
1682
    # Verify if node group locks are still correct
1683
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1684
    if owned_groups:
1685
      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1686

    
1687
    return LogicalUnit.CheckPrereq(self)
1688

    
1689

    
1690
class LUInstanceActivateDisks(NoHooksLU):
1691
  """Bring up an instance's disks.
1692

1693
  """
1694
  REQ_BGL = False
1695

    
1696
  def ExpandNames(self):
1697
    self._ExpandAndLockInstance()
1698
    self.needed_locks[locking.LEVEL_NODE] = []
1699
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1700

    
1701
  def DeclareLocks(self, level):
1702
    if level == locking.LEVEL_NODE:
1703
      self._LockInstancesNodes()
1704

    
1705
  def CheckPrereq(self):
1706
    """Check prerequisites.
1707

1708
    This checks that the instance is in the cluster.
1709

1710
    """
1711
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1712
    assert self.instance is not None, \
1713
      "Cannot retrieve locked instance %s" % self.op.instance_name
1714
    CheckNodeOnline(self, self.instance.primary_node)
1715

    
1716
  def Exec(self, feedback_fn):
1717
    """Activate the disks.
1718

1719
    """
1720
    disks_ok, disks_info = \
1721
              AssembleInstanceDisks(self, self.instance,
1722
                                    ignore_size=self.op.ignore_size)
1723
    if not disks_ok:
1724
      raise errors.OpExecError("Cannot activate block devices")
1725

    
1726
    if self.op.wait_for_sync:
1727
      if not WaitForSync(self, self.instance):
1728
        self.cfg.MarkInstanceDisksInactive(self.instance.name)
1729
        raise errors.OpExecError("Some disks of the instance are degraded!")
1730

    
1731
    return disks_info
1732

    
1733

    
1734
class LUInstanceDeactivateDisks(NoHooksLU):
1735
  """Shutdown an instance's disks.
1736

1737
  """
1738
  REQ_BGL = False
1739

    
1740
  def ExpandNames(self):
1741
    self._ExpandAndLockInstance()
1742
    self.needed_locks[locking.LEVEL_NODE] = []
1743
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1744

    
1745
  def DeclareLocks(self, level):
1746
    if level == locking.LEVEL_NODE:
1747
      self._LockInstancesNodes()
1748

    
1749
  def CheckPrereq(self):
1750
    """Check prerequisites.
1751

1752
    This checks that the instance is in the cluster.
1753

1754
    """
1755
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1756
    assert self.instance is not None, \
1757
      "Cannot retrieve locked instance %s" % self.op.instance_name
1758

    
1759
  def Exec(self, feedback_fn):
1760
    """Deactivate the disks
1761

1762
    """
1763
    instance = self.instance
1764
    if self.op.force:
1765
      ShutdownInstanceDisks(self, instance)
1766
    else:
1767
      _SafeShutdownInstanceDisks(self, instance)
1768

    
1769

    
1770
def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1771
                               ldisk=False):
1772
  """Check that mirrors are not degraded.
1773

1774
  @attention: The device has to be annotated already.
1775

1776
  The ldisk parameter, if True, will change the test from the
1777
  is_degraded attribute (which represents overall non-ok status for
1778
  the device(s)) to the ldisk (representing the local storage status).
1779

1780
  """
1781
  lu.cfg.SetDiskID(dev, node_uuid)
1782

    
1783
  result = True
1784

    
1785
  if on_primary or dev.AssembleOnSecondary():
1786
    rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1787
    msg = rstats.fail_msg
1788
    if msg:
1789
      lu.LogWarning("Can't find disk on node %s: %s",
1790
                    lu.cfg.GetNodeName(node_uuid), msg)
1791
      result = False
1792
    elif not rstats.payload:
1793
      lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1794
      result = False
1795
    else:
1796
      if ldisk:
1797
        result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1798
      else:
1799
        result = result and not rstats.payload.is_degraded
1800

    
1801
  if dev.children:
1802
    for child in dev.children:
1803
      result = result and _CheckDiskConsistencyInner(lu, instance, child,
1804
                                                     node_uuid, on_primary)
1805

    
1806
  return result
1807

    
1808

    
1809
def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1810
  """Wrapper around L{_CheckDiskConsistencyInner}.
1811

1812
  """
1813
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1814
  return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1815
                                    ldisk=ldisk)
1816

    
1817

    
1818
def _BlockdevFind(lu, node_uuid, dev, instance):
1819
  """Wrapper around call_blockdev_find to annotate diskparams.
1820

1821
  @param lu: A reference to the lu object
1822
  @param node_uuid: The node to call out
1823
  @param dev: The device to find
1824
  @param instance: The instance object the device belongs to
1825
  @returns The result of the rpc call
1826

1827
  """
1828
  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1829
  return lu.rpc.call_blockdev_find(node_uuid, disk)
1830

    
1831

    
1832
def _GenerateUniqueNames(lu, exts):
1833
  """Generate a suitable LV name.
1834

1835
  This will generate a logical volume name for the given instance.
1836

1837
  """
1838
  results = []
1839
  for val in exts:
1840
    new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1841
    results.append("%s%s" % (new_id, val))
1842
  return results
1843

    
1844

    
1845
class TLReplaceDisks(Tasklet):
1846
  """Replaces disks for an instance.
1847

1848
  Note: Locking is not within the scope of this class.
1849

1850
  """
1851
  def __init__(self, lu, instance_name, mode, iallocator_name, remote_node_uuid,
1852
               disks, early_release, ignore_ipolicy):
1853
    """Initializes this class.
1854

1855
    """
1856
    Tasklet.__init__(self, lu)
1857

    
1858
    # Parameters
1859
    self.instance_name = instance_name
1860
    self.mode = mode
1861
    self.iallocator_name = iallocator_name
1862
    self.remote_node_uuid = remote_node_uuid
1863
    self.disks = disks
1864
    self.early_release = early_release
1865
    self.ignore_ipolicy = ignore_ipolicy
1866

    
1867
    # Runtime data
1868
    self.instance = None
1869
    self.new_node_uuid = None
1870
    self.target_node_uuid = None
1871
    self.other_node_uuid = None
1872
    self.remote_node_info = None
1873
    self.node_secondary_ip = None
1874

    
1875
  @staticmethod
1876
  def _RunAllocator(lu, iallocator_name, instance_name,
1877
                    relocate_from_node_uuids):
1878
    """Compute a new secondary node using an IAllocator.
1879

1880
    """
1881
    req = iallocator.IAReqRelocate(
1882
          name=instance_name,
1883
          relocate_from_node_uuids=list(relocate_from_node_uuids))
1884
    ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1885

    
1886
    ial.Run(iallocator_name)
1887

    
1888
    if not ial.success:
1889
      raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1890
                                 " %s" % (iallocator_name, ial.info),
1891
                                 errors.ECODE_NORES)
1892

    
1893
    remote_node_name = ial.result[0]
1894
    remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1895

    
1896
    if remote_node is None:
1897
      raise errors.OpPrereqError("Node %s not found in configuration" %
1898
                                 remote_node_name, errors.ECODE_NOENT)
1899

    
1900
    lu.LogInfo("Selected new secondary for instance '%s': %s",
1901
               instance_name, remote_node_name)
1902

    
1903
    return remote_node.uuid
1904

    
1905
  def _FindFaultyDisks(self, node_uuid):
1906
    """Wrapper for L{FindFaultyInstanceDisks}.
1907

1908
    """
1909
    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1910
                                   node_uuid, True)
1911

    
1912
  def _CheckDisksActivated(self, instance):
1913
    """Checks if the instance disks are activated.
1914

1915
    @param instance: The instance to check disks
1916
    @return: True if they are activated, False otherwise
1917

1918
    """
1919
    node_uuids = instance.all_nodes
1920

    
1921
    for idx, dev in enumerate(instance.disks):
1922
      for node_uuid in node_uuids:
1923
        self.lu.LogInfo("Checking disk/%d on %s", idx,
1924
                        self.cfg.GetNodeName(node_uuid))
1925
        self.cfg.SetDiskID(dev, node_uuid)
1926

    
1927
        result = _BlockdevFind(self, node_uuid, dev, instance)
1928

    
1929
        if result.offline:
1930
          continue
1931
        elif result.fail_msg or not result.payload:
1932
          return False
1933

    
1934
    return True
1935

    
1936
  def CheckPrereq(self):
1937
    """Check prerequisites.
1938

1939
    This checks that the instance is in the cluster.
1940

1941
    """
1942
    self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1943
    assert instance is not None, \
1944
      "Cannot retrieve locked instance %s" % self.instance_name
1945

    
1946
    if instance.disk_template != constants.DT_DRBD8:
1947
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1948
                                 " instances", errors.ECODE_INVAL)
1949

    
1950
    if len(instance.secondary_nodes) != 1:
1951
      raise errors.OpPrereqError("The instance has a strange layout,"
1952
                                 " expected one secondary but found %d" %
1953
                                 len(instance.secondary_nodes),
1954
                                 errors.ECODE_FAULT)
1955

    
1956
    instance = self.instance
1957
    secondary_node_uuid = instance.secondary_nodes[0]
1958

    
1959
    if self.iallocator_name is None:
1960
      remote_node_uuid = self.remote_node_uuid
1961
    else:
1962
      remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1963
                                            instance.name,
1964
                                            instance.secondary_nodes)
1965

    
1966
    if remote_node_uuid is None:
1967
      self.remote_node_info = None
1968
    else:
1969
      assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1970
             "Remote node '%s' is not locked" % remote_node_uuid
1971

    
1972
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1973
      assert self.remote_node_info is not None, \
1974
        "Cannot retrieve locked node %s" % remote_node_uuid
1975

    
1976
    if remote_node_uuid == self.instance.primary_node:
1977
      raise errors.OpPrereqError("The specified node is the primary node of"
1978
                                 " the instance", errors.ECODE_INVAL)
1979

    
1980
    if remote_node_uuid == secondary_node_uuid:
1981
      raise errors.OpPrereqError("The specified node is already the"
1982
                                 " secondary node of the instance",
1983
                                 errors.ECODE_INVAL)
1984

    
1985
    if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1986
                                    constants.REPLACE_DISK_CHG):
1987
      raise errors.OpPrereqError("Cannot specify disks to be replaced",
1988
                                 errors.ECODE_INVAL)
1989

    
1990
    if self.mode == constants.REPLACE_DISK_AUTO:
1991
      if not self._CheckDisksActivated(instance):
1992
        raise errors.OpPrereqError("Please run activate-disks on instance %s"
1993
                                   " first" % self.instance_name,
1994
                                   errors.ECODE_STATE)
1995
      faulty_primary = self._FindFaultyDisks(instance.primary_node)
1996
      faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
1997

    
1998
      if faulty_primary and faulty_secondary:
1999
        raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2000
                                   " one node and can not be repaired"
2001
                                   " automatically" % self.instance_name,
2002
                                   errors.ECODE_STATE)
2003

    
2004
      if faulty_primary:
2005
        self.disks = faulty_primary
2006
        self.target_node_uuid = instance.primary_node
2007
        self.other_node_uuid = secondary_node_uuid
2008
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2009
      elif faulty_secondary:
2010
        self.disks = faulty_secondary
2011
        self.target_node_uuid = secondary_node_uuid
2012
        self.other_node_uuid = instance.primary_node
2013
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2014
      else:
2015
        self.disks = []
2016
        check_nodes = []
2017

    
2018
    else:
2019
      # Non-automatic modes
2020
      if self.mode == constants.REPLACE_DISK_PRI:
2021
        self.target_node_uuid = instance.primary_node
2022
        self.other_node_uuid = secondary_node_uuid
2023
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2024

    
2025
      elif self.mode == constants.REPLACE_DISK_SEC:
2026
        self.target_node_uuid = secondary_node_uuid
2027
        self.other_node_uuid = instance.primary_node
2028
        check_nodes = [self.target_node_uuid, self.other_node_uuid]
2029

    
2030
      elif self.mode == constants.REPLACE_DISK_CHG:
2031
        self.new_node_uuid = remote_node_uuid
2032
        self.other_node_uuid = instance.primary_node
2033
        self.target_node_uuid = secondary_node_uuid
2034
        check_nodes = [self.new_node_uuid, self.other_node_uuid]
2035

    
2036
        CheckNodeNotDrained(self.lu, remote_node_uuid)
2037
        CheckNodeVmCapable(self.lu, remote_node_uuid)
2038

    
2039
        old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2040
        assert old_node_info is not None
2041
        if old_node_info.offline and not self.early_release:
2042
          # doesn't make sense to delay the release
2043
          self.early_release = True
2044
          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2045
                          " early-release mode", secondary_node_uuid)
2046

    
2047
      else:
2048
        raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2049
                                     self.mode)
2050

    
2051
      # If not specified all disks should be replaced
2052
      if not self.disks:
2053
        self.disks = range(len(self.instance.disks))
2054

    
2055
    # TODO: This is ugly, but right now we can't distinguish between internal
2056
    # submitted opcode and external one. We should fix that.
2057
    if self.remote_node_info:
2058
      # We change the node, lets verify it still meets instance policy
2059
      new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2060
      cluster = self.cfg.GetClusterInfo()
2061
      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2062
                                                              new_group_info)
2063
      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2064
                             self.cfg, ignore=self.ignore_ipolicy)
2065

    
2066
    for node_uuid in check_nodes:
2067
      CheckNodeOnline(self.lu, node_uuid)
2068

    
2069
    touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2070
                                                          self.other_node_uuid,
2071
                                                          self.target_node_uuid]
2072
                              if node_uuid is not None)
2073

    
2074
    # Release unneeded node and node resource locks
2075
    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2076
    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2077
    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2078

    
2079
    # Release any owned node group
2080
    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2081

    
2082
    # Check whether disks are valid
2083
    for disk_idx in self.disks:
2084
      instance.FindDisk(disk_idx)
2085

    
2086
    # Get secondary node IP addresses
2087
    self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2088
                                  in self.cfg.GetMultiNodeInfo(touched_nodes))
2089

    
2090
  def Exec(self, feedback_fn):
2091
    """Execute disk replacement.
2092

2093
    This dispatches the disk replacement to the appropriate handler.
2094

2095
    """
2096
    if __debug__:
2097
      # Verify owned locks before starting operation
2098
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2099
      assert set(owned_nodes) == set(self.node_secondary_ip), \
2100
          ("Incorrect node locks, owning %s, expected %s" %
2101
           (owned_nodes, self.node_secondary_ip.keys()))
2102
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2103
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2104
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2105

    
2106
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2107
      assert list(owned_instances) == [self.instance_name], \
2108
          "Instance '%s' not locked" % self.instance_name
2109

    
2110
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2111
          "Should not own any node group lock at this point"
2112

    
2113
    if not self.disks:
2114
      feedback_fn("No disks need replacement for instance '%s'" %
2115
                  self.instance.name)
2116
      return
2117

    
2118
    feedback_fn("Replacing disk(s) %s for instance '%s'" %
2119
                (utils.CommaJoin(self.disks), self.instance.name))
2120
    feedback_fn("Current primary node: %s" %
2121
                self.cfg.GetNodeName(self.instance.primary_node))
2122
    feedback_fn("Current seconary node: %s" %
2123
                utils.CommaJoin(self.cfg.GetNodeNames(
2124
                                  self.instance.secondary_nodes)))
2125

    
2126
    activate_disks = not self.instance.disks_active
2127

    
2128
    # Activate the instance disks if we're replacing them on a down instance
2129
    if activate_disks:
2130
      StartInstanceDisks(self.lu, self.instance, True)
2131

    
2132
    try:
2133
      # Should we replace the secondary node?
2134
      if self.new_node_uuid is not None:
2135
        fn = self._ExecDrbd8Secondary
2136
      else:
2137
        fn = self._ExecDrbd8DiskOnly
2138

    
2139
      result = fn(feedback_fn)
2140
    finally:
2141
      # Deactivate the instance disks if we're replacing them on a
2142
      # down instance
2143
      if activate_disks:
2144
        _SafeShutdownInstanceDisks(self.lu, self.instance)
2145

    
2146
    assert not self.lu.owned_locks(locking.LEVEL_NODE)
2147

    
2148
    if __debug__:
2149
      # Verify owned locks
2150
      owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2151
      nodes = frozenset(self.node_secondary_ip)
2152
      assert ((self.early_release and not owned_nodes) or
2153
              (not self.early_release and not (set(owned_nodes) - nodes))), \
2154
        ("Not owning the correct locks, early_release=%s, owned=%r,"
2155
         " nodes=%r" % (self.early_release, owned_nodes, nodes))
2156

    
2157
    return result
2158

    
2159
  def _CheckVolumeGroup(self, node_uuids):
2160
    self.lu.LogInfo("Checking volume groups")
2161

    
2162
    vgname = self.cfg.GetVGName()
2163

    
2164
    # Make sure volume group exists on all involved nodes
2165
    results = self.rpc.call_vg_list(node_uuids)
2166
    if not results:
2167
      raise errors.OpExecError("Can't list volume groups on the nodes")
2168

    
2169
    for node_uuid in node_uuids:
2170
      res = results[node_uuid]
2171
      res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2172
      if vgname not in res.payload:
2173
        raise errors.OpExecError("Volume group '%s' not found on node %s" %
2174
                                 (vgname, self.cfg.GetNodeName(node_uuid)))
2175

    
2176
  def _CheckDisksExistence(self, node_uuids):
2177
    # Check disk existence
2178
    for idx, dev in enumerate(self.instance.disks):
2179
      if idx not in self.disks:
2180
        continue
2181

    
2182
      for node_uuid in node_uuids:
2183
        self.lu.LogInfo("Checking disk/%d on %s", idx,
2184
                        self.cfg.GetNodeName(node_uuid))
2185
        self.cfg.SetDiskID(dev, node_uuid)
2186

    
2187
        result = _BlockdevFind(self, node_uuid, dev, self.instance)
2188

    
2189
        msg = result.fail_msg
2190
        if msg or not result.payload:
2191
          if not msg:
2192
            msg = "disk not found"
2193
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2194
                                   (idx, self.cfg.GetNodeName(node_uuid), msg))
2195

    
2196
  def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2197
    for idx, dev in enumerate(self.instance.disks):
2198
      if idx not in self.disks:
2199
        continue
2200

    
2201
      self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2202
                      (idx, self.cfg.GetNodeName(node_uuid)))
2203

    
2204
      if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2205
                                  on_primary, ldisk=ldisk):
2206
        raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2207
                                 " replace disks for instance %s" %
2208
                                 (self.cfg.GetNodeName(node_uuid),
2209
                                  self.instance.name))
2210

    
2211
  def _CreateNewStorage(self, node_uuid):
2212
    """Create new storage on the primary or secondary node.
2213

2214
    This is only used for same-node replaces, not for changing the
2215
    secondary node, hence we don't want to modify the existing disk.
2216

2217
    """
2218
    iv_names = {}
2219

    
2220
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2221
    for idx, dev in enumerate(disks):
2222
      if idx not in self.disks:
2223
        continue
2224

    
2225
      self.lu.LogInfo("Adding storage on %s for disk/%d",
2226
                      self.cfg.GetNodeName(node_uuid), idx)
2227

    
2228
      self.cfg.SetDiskID(dev, node_uuid)
2229

    
2230
      lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2231
      names = _GenerateUniqueNames(self.lu, lv_names)
2232

    
2233
      (data_disk, meta_disk) = dev.children
2234
      vg_data = data_disk.logical_id[0]
2235
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2236
                             logical_id=(vg_data, names[0]),
2237
                             params=data_disk.params)
2238
      vg_meta = meta_disk.logical_id[0]
2239
      lv_meta = objects.Disk(dev_type=constants.LD_LV,
2240
                             size=constants.DRBD_META_SIZE,
2241
                             logical_id=(vg_meta, names[1]),
2242
                             params=meta_disk.params)
2243

    
2244
      new_lvs = [lv_data, lv_meta]
2245
      old_lvs = [child.Copy() for child in dev.children]
2246
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2247
      excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2248

    
2249
      # we pass force_create=True to force the LVM creation
2250
      for new_lv in new_lvs:
2251
        _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2252
                             GetInstanceInfoText(self.instance), False,
2253
                             excl_stor)
2254

    
2255
    return iv_names
2256

    
2257
  def _CheckDevices(self, node_uuid, iv_names):
2258
    for name, (dev, _, _) in iv_names.iteritems():
2259
      self.cfg.SetDiskID(dev, node_uuid)
2260

    
2261
      result = _BlockdevFind(self, node_uuid, dev, self.instance)
2262

    
2263
      msg = result.fail_msg
2264
      if msg or not result.payload:
2265
        if not msg:
2266
          msg = "disk not found"
2267
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
2268
                                 (name, msg))
2269

    
2270
      if result.payload.is_degraded:
2271
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
2272

    
2273
  def _RemoveOldStorage(self, node_uuid, iv_names):
2274
    for name, (_, old_lvs, _) in iv_names.iteritems():
2275
      self.lu.LogInfo("Remove logical volumes for %s", name)
2276

    
2277
      for lv in old_lvs:
2278
        self.cfg.SetDiskID(lv, node_uuid)
2279

    
2280
        msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2281
        if msg:
2282
          self.lu.LogWarning("Can't remove old LV: %s", msg,
2283
                             hint="remove unused LVs manually")
2284

    
2285
  def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2286
    """Replace a disk on the primary or secondary for DRBD 8.
2287

2288
    The algorithm for replace is quite complicated:
2289

2290
      1. for each disk to be replaced:
2291

2292
        1. create new LVs on the target node with unique names
2293
        1. detach old LVs from the drbd device
2294
        1. rename old LVs to name_replaced.<time_t>
2295
        1. rename new LVs to old LVs
2296
        1. attach the new LVs (with the old names now) to the drbd device
2297

2298
      1. wait for sync across all devices
2299

2300
      1. for each modified disk:
2301

2302
        1. remove old LVs (which have the name name_replaces.<time_t>)
2303

2304
    Failures are not very well handled.
2305

2306
    """
2307
    steps_total = 6
2308

    
2309
    # Step: check device activation
2310
    self.lu.LogStep(1, steps_total, "Check device existence")
2311
    self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2312
    self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2313

    
2314
    # Step: check other node consistency
2315
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2316
    self._CheckDisksConsistency(
2317
      self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2318
      False)
2319

    
2320
    # Step: create new storage
2321
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2322
    iv_names = self._CreateNewStorage(self.target_node_uuid)
2323

    
2324
    # Step: for each lv, detach+rename*2+attach
2325
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2326
    for dev, old_lvs, new_lvs in iv_names.itervalues():
2327
      self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2328

    
2329
      result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2330
                                                     old_lvs)
2331
      result.Raise("Can't detach drbd from local storage on node"
2332
                   " %s for device %s" %
2333
                   (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2334
      #dev.children = []
2335
      #cfg.Update(instance)
2336

    
2337
      # ok, we created the new LVs, so now we know we have the needed
2338
      # storage; as such, we proceed on the target node to rename
2339
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2340
      # using the assumption that logical_id == physical_id (which in
2341
      # turn is the unique_id on that node)
2342

    
2343
      # FIXME(iustin): use a better name for the replaced LVs
2344
      temp_suffix = int(time.time())
2345
      ren_fn = lambda d, suff: (d.physical_id[0],
2346
                                d.physical_id[1] + "_replaced-%s" % suff)
2347

    
2348
      # Build the rename list based on what LVs exist on the node
2349
      rename_old_to_new = []
2350
      for to_ren in old_lvs:
2351
        result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2352
        if not result.fail_msg and result.payload:
2353
          # device exists
2354
          rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2355

    
2356
      self.lu.LogInfo("Renaming the old LVs on the target node")
2357
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2358
                                             rename_old_to_new)
2359
      result.Raise("Can't rename old LVs on node %s" %
2360
                   self.cfg.GetNodeName(self.target_node_uuid))
2361

    
2362
      # Now we rename the new LVs to the old LVs
2363
      self.lu.LogInfo("Renaming the new LVs on the target node")
2364
      rename_new_to_old = [(new, old.physical_id)
2365
                           for old, new in zip(old_lvs, new_lvs)]
2366
      result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2367
                                             rename_new_to_old)
2368
      result.Raise("Can't rename new LVs on node %s" %
2369
                   self.cfg.GetNodeName(self.target_node_uuid))
2370

    
2371
      # Intermediate steps of in memory modifications
2372
      for old, new in zip(old_lvs, new_lvs):
2373
        new.logical_id = old.logical_id
2374
        self.cfg.SetDiskID(new, self.target_node_uuid)
2375

    
2376
      # We need to modify old_lvs so that removal later removes the
2377
      # right LVs, not the newly added ones; note that old_lvs is a
2378
      # copy here
2379
      for disk in old_lvs:
2380
        disk.logical_id = ren_fn(disk, temp_suffix)
2381
        self.cfg.SetDiskID(disk, self.target_node_uuid)
2382

    
2383
      # Now that the new lvs have the old name, we can add them to the device
2384
      self.lu.LogInfo("Adding new mirror component on %s",
2385
                      self.cfg.GetNodeName(self.target_node_uuid))
2386
      result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2387
                                                  (dev, self.instance), new_lvs)
2388
      msg = result.fail_msg
2389
      if msg:
2390
        for new_lv in new_lvs:
2391
          msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2392
                                               new_lv).fail_msg
2393
          if msg2:
2394
            self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2395
                               hint=("cleanup manually the unused logical"
2396
                                     "volumes"))
2397
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2398

    
2399
    cstep = itertools.count(5)
2400

    
2401
    if self.early_release:
2402
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2403
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2404
      # TODO: Check if releasing locks early still makes sense
2405
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2406
    else:
2407
      # Release all resource locks except those used by the instance
2408
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2409
                   keep=self.node_secondary_ip.keys())
2410

    
2411
    # Release all node locks while waiting for sync
2412
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2413

    
2414
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2415
    # shutdown in the caller into consideration.
2416

    
2417
    # Wait for sync
2418
    # This can fail as the old devices are degraded and _WaitForSync
2419
    # does a combined result over all disks, so we don't check its return value
2420
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2421
    WaitForSync(self.lu, self.instance)
2422

    
2423
    # Check all devices manually
2424
    self._CheckDevices(self.instance.primary_node, iv_names)
2425

    
2426
    # Step: remove old storage
2427
    if not self.early_release:
2428
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2429
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2430

    
2431
  def _ExecDrbd8Secondary(self, feedback_fn):
2432
    """Replace the secondary node for DRBD 8.
2433

2434
    The algorithm for replace is quite complicated:
2435
      - for all disks of the instance:
2436
        - create new LVs on the new node with same names
2437
        - shutdown the drbd device on the old secondary
2438
        - disconnect the drbd network on the primary
2439
        - create the drbd device on the new secondary
2440
        - network attach the drbd on the primary, using an artifice:
2441
          the drbd code for Attach() will connect to the network if it
2442
          finds a device which is connected to the good local disks but
2443
          not network enabled
2444
      - wait for sync across all devices
2445
      - remove all disks from the old secondary
2446

2447
    Failures are not very well handled.
2448

2449
    """
2450
    steps_total = 6
2451

    
2452
    pnode = self.instance.primary_node
2453

    
2454
    # Step: check device activation
2455
    self.lu.LogStep(1, steps_total, "Check device existence")
2456
    self._CheckDisksExistence([self.instance.primary_node])
2457
    self._CheckVolumeGroup([self.instance.primary_node])
2458

    
2459
    # Step: check other node consistency
2460
    self.lu.LogStep(2, steps_total, "Check peer consistency")
2461
    self._CheckDisksConsistency(self.instance.primary_node, True, True)
2462

    
2463
    # Step: create new storage
2464
    self.lu.LogStep(3, steps_total, "Allocate new storage")
2465
    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2466
    excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2467
                                                  self.new_node_uuid)
2468
    for idx, dev in enumerate(disks):
2469
      self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2470
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2471
      # we pass force_create=True to force LVM creation
2472
      for new_lv in dev.children:
2473
        _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance, new_lv,
2474
                             True, GetInstanceInfoText(self.instance), False,
2475
                             excl_stor)
2476

    
2477
    # Step 4: dbrd minors and drbd setups changes
2478
    # after this, we must manually remove the drbd minors on both the
2479
    # error and the success paths
2480
    self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2481
    minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2482
                                         for _ in self.instance.disks],
2483
                                        self.instance.name)
2484
    logging.debug("Allocated minors %r", minors)
2485

    
2486
    iv_names = {}
2487
    for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2488
      self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2489
                      (self.cfg.GetNodeName(self.new_node_uuid), idx))
2490
      # create new devices on new_node; note that we create two IDs:
2491
      # one without port, so the drbd will be activated without
2492
      # networking information on the new node at this stage, and one
2493
      # with network, for the latter activation in step 4
2494
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2495
      if self.instance.primary_node == o_node1:
2496
        p_minor = o_minor1
2497
      else:
2498
        assert self.instance.primary_node == o_node2, "Three-node instance?"
2499
        p_minor = o_minor2
2500

    
2501
      new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2502
                      p_minor, new_minor, o_secret)
2503
      new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2504
                    p_minor, new_minor, o_secret)
2505

    
2506
      iv_names[idx] = (dev, dev.children, new_net_id)
2507
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2508
                    new_net_id)
2509
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2510
                              logical_id=new_alone_id,
2511
                              children=dev.children,
2512
                              size=dev.size,
2513
                              params={})
2514
      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2515
                                            self.cfg)
2516
      try:
2517
        CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2518
                             anno_new_drbd,
2519
                             GetInstanceInfoText(self.instance), False,
2520
                             excl_stor)
2521
      except errors.GenericError:
2522
        self.cfg.ReleaseDRBDMinors(self.instance.name)
2523
        raise
2524

    
2525
    # We have new devices, shutdown the drbd on the old secondary
2526
    for idx, dev in enumerate(self.instance.disks):
2527
      self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2528
      self.cfg.SetDiskID(dev, self.target_node_uuid)
2529
      msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2530
                                            (dev, self.instance)).fail_msg
2531
      if msg:
2532
        self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2533
                           "node: %s" % (idx, msg),
2534
                           hint=("Please cleanup this device manually as"
2535
                                 " soon as possible"))
2536

    
2537
    self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2538
    result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2539
                                               self.instance.disks)[pnode]
2540

    
2541
    msg = result.fail_msg
2542
    if msg:
2543
      # detaches didn't succeed (unlikely)
2544
      self.cfg.ReleaseDRBDMinors(self.instance.name)
2545
      raise errors.OpExecError("Can't detach the disks from the network on"
2546
                               " old node: %s" % (msg,))
2547

    
2548
    # if we managed to detach at least one, we update all the disks of
2549
    # the instance to point to the new secondary
2550
    self.lu.LogInfo("Updating instance configuration")
2551
    for dev, _, new_logical_id in iv_names.itervalues():
2552
      dev.logical_id = new_logical_id
2553
      self.cfg.SetDiskID(dev, self.instance.primary_node)
2554

    
2555
    self.cfg.Update(self.instance, feedback_fn)
2556

    
2557
    # Release all node locks (the configuration has been updated)
2558
    ReleaseLocks(self.lu, locking.LEVEL_NODE)
2559

    
2560
    # and now perform the drbd attach
2561
    self.lu.LogInfo("Attaching primary drbds to new secondary"
2562
                    " (standalone => connected)")
2563
    result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2564
                                            self.new_node_uuid],
2565
                                           self.node_secondary_ip,
2566
                                           (self.instance.disks, self.instance),
2567
                                           self.instance.name,
2568
                                           False)
2569
    for to_node, to_result in result.items():
2570
      msg = to_result.fail_msg
2571
      if msg:
2572
        self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2573
                           self.cfg.GetNodeName(to_node), msg,
2574
                           hint=("please do a gnt-instance info to see the"
2575
                                 " status of disks"))
2576

    
2577
    cstep = itertools.count(5)
2578

    
2579
    if self.early_release:
2580
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2581
      self._RemoveOldStorage(self.target_node_uuid, iv_names)
2582
      # TODO: Check if releasing locks early still makes sense
2583
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2584
    else:
2585
      # Release all resource locks except those used by the instance
2586
      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2587
                   keep=self.node_secondary_ip.keys())
2588

    
2589
    # TODO: Can the instance lock be downgraded here? Take the optional disk
2590
    # shutdown in the caller into consideration.
2591

    
2592
    # Wait for sync
2593
    # This can fail as the old devices are degraded and _WaitForSync
2594
    # does a combined result over all disks, so we don't check its return value
2595
    self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2596
    WaitForSync(self.lu, self.instance)
2597

    
2598
    # Check all devices manually
2599
    self._CheckDevices(self.instance.primary_node, iv_names)
2600

    
2601
    # Step: remove old storage
2602
    if not self.early_release:
2603
      self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2604
      self._RemoveOldStorage(self.target_node_uuid, iv_names)