4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with storage of instances."""
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
42 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48 import ganeti.masterd.instance
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59 constants.DT_PLAIN: constants.LD_LV,
60 constants.DT_FILE: constants.LD_FILE,
61 constants.DT_SHARED_FILE: constants.LD_FILE,
62 constants.DT_BLOCK: constants.LD_BLOCKDEV,
63 constants.DT_RBD: constants.LD_RBD,
64 constants.DT_EXT: constants.LD_EXT,
68 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
70 """Create a single block device on a given node.
72 This will not recurse over children of the device, so they must be
75 @param lu: the lu on whose behalf we execute
76 @param node_uuid: the node on which to create the device
77 @type instance: L{objects.Instance}
78 @param instance: the instance which owns the device
79 @type device: L{objects.Disk}
80 @param device: the device to create
81 @param info: the extra 'metadata' we should attach to the device
82 (this will be represented as a LVM tag)
83 @type force_open: boolean
84 @param force_open: this parameter will be passes to the
85 L{backend.BlockdevCreate} function where it specifies
86 whether we run on primary or not, and it affects both
87 the child assembly and the device own Open() execution
88 @type excl_stor: boolean
89 @param excl_stor: Whether exclusive_storage is active for the node
92 lu.cfg.SetDiskID(device, node_uuid)
93 result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
94 instance.name, force_open, info,
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device,
98 lu.cfg.GetNodeName(node_uuid),
100 if device.physical_id is None:
101 device.physical_id = result.payload
104 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
105 info, force_open, excl_stor):
106 """Create a tree of block devices on a given node.
108 If this device type has to be created on secondaries, create it and
111 If not, just recurse to children keeping the same 'force' value.
113 @attention: The device has to be annotated already.
115 @param lu: the lu on whose behalf we execute
116 @param node_uuid: the node on which to create the device
117 @type instance: L{objects.Instance}
118 @param instance: the instance which owns the device
119 @type device: L{objects.Disk}
120 @param device: the device to create
121 @type force_create: boolean
122 @param force_create: whether to force creation of this device; this
123 will be change to True whenever we find a device which has
124 CreateOnSecondary() attribute
125 @param info: the extra 'metadata' we should attach to the device
126 (this will be represented as a LVM tag)
127 @type force_open: boolean
128 @param force_open: this parameter will be passes to the
129 L{backend.BlockdevCreate} function where it specifies
130 whether we run on primary or not, and it affects both
131 the child assembly and the device own Open() execution
132 @type excl_stor: boolean
133 @param excl_stor: Whether exclusive_storage is active for the node
135 @return: list of created devices
139 if device.CreateOnSecondary():
143 for child in device.children:
144 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
145 force_create, info, force_open, excl_stor)
146 created_devices.extend(devs)
149 return created_devices
151 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
153 # The device has been completely created, so there is no point in keeping
154 # its subdevices in the list. We just add the device itself instead.
155 created_devices = [(node_uuid, device)]
156 return created_devices
158 except errors.DeviceCreationError, e:
159 e.created_devices.extend(created_devices)
161 except errors.OpExecError, e:
162 raise errors.DeviceCreationError(str(e), created_devices)
165 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
166 """Whether exclusive_storage is in effect for the given node.
168 @type cfg: L{config.ConfigWriter}
169 @param cfg: The cluster configuration
170 @type node_uuid: string
171 @param node_uuid: The node UUID
173 @return: The effective value of exclusive_storage
174 @raise errors.OpPrereqError: if no node exists with the given name
177 ni = cfg.GetNodeInfo(node_uuid)
179 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
181 return IsExclusiveStorageEnabledNode(cfg, ni)
184 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
186 """Wrapper around L{_CreateBlockDevInner}.
188 This method annotates the root device first.
191 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
193 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
194 force_open, excl_stor)
197 def _UndoCreateDisks(lu, disks_created):
198 """Undo the work performed by L{CreateDisks}.
200 This function is called in case of an error to undo the work of
203 @type lu: L{LogicalUnit}
204 @param lu: the logical unit on whose behalf we execute
205 @param disks_created: the result returned by L{CreateDisks}
208 for (node_uuid, disk) in disks_created:
209 lu.cfg.SetDiskID(disk, node_uuid)
210 result = lu.rpc.call_blockdev_remove(node_uuid, disk)
211 result.Warn("Failed to remove newly-created disk %s on node %s" %
212 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
215 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
216 """Create all disks for an instance.
218 This abstracts away some work from AddInstance.
220 @type lu: L{LogicalUnit}
221 @param lu: the logical unit on whose behalf we execute
222 @type instance: L{objects.Instance}
223 @param instance: the instance whose disks we should create
225 @param to_skip: list of indices to skip
226 @type target_node_uuid: string
227 @param target_node_uuid: if passed, overrides the target node for creation
228 @type disks: list of {objects.Disk}
229 @param disks: the disks to create; if not specified, all the disks of the
231 @return: information about the created disks, to be used to call
233 @raise errors.OpPrereqError: in case of error
236 info = GetInstanceInfoText(instance)
237 if target_node_uuid is None:
238 pnode_uuid = instance.primary_node
239 all_node_uuids = instance.all_nodes
241 pnode_uuid = target_node_uuid
242 all_node_uuids = [pnode_uuid]
245 disks = instance.disks
247 if instance.disk_template in constants.DTS_FILEBASED:
248 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
249 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
251 result.Raise("Failed to create directory '%s' on"
252 " node %s" % (file_storage_dir,
253 lu.cfg.GetNodeName(pnode_uuid)))
256 for idx, device in enumerate(disks):
257 if to_skip and idx in to_skip:
259 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
260 for node_uuid in all_node_uuids:
261 f_create = node_uuid == pnode_uuid
263 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
265 disks_created.append((node_uuid, device))
266 except errors.DeviceCreationError, e:
267 logging.warning("Creating disk %s for instance '%s' failed",
269 disks_created.extend(e.created_devices)
270 _UndoCreateDisks(lu, disks_created)
271 raise errors.OpExecError(e.message)
275 def ComputeDiskSizePerVG(disk_template, disks):
276 """Compute disk size requirements in the volume group
279 def _compute(disks, payload):
280 """Universal algorithm.
285 vgs[disk[constants.IDISK_VG]] = \
286 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
290 # Required free disk space as a function of disk and swap space
292 constants.DT_DISKLESS: {},
293 constants.DT_PLAIN: _compute(disks, 0),
294 # 128 MB are added for drbd metadata for each disk
295 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
296 constants.DT_FILE: {},
297 constants.DT_SHARED_FILE: {},
300 if disk_template not in req_size_dict:
301 raise errors.ProgrammerError("Disk template '%s' size requirement"
302 " is unknown" % disk_template)
304 return req_size_dict[disk_template]
307 def ComputeDisks(op, default_vg):
308 """Computes the instance disks.
310 @param op: The instance opcode
311 @param default_vg: The default_vg to assume
313 @return: The computed disks
317 for disk in op.disks:
318 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
319 if mode not in constants.DISK_ACCESS_SET:
320 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
321 mode, errors.ECODE_INVAL)
322 size = disk.get(constants.IDISK_SIZE, None)
324 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
327 except (TypeError, ValueError):
328 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
331 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
332 if ext_provider and op.disk_template != constants.DT_EXT:
333 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
334 " disk template, not %s" %
335 (constants.IDISK_PROVIDER, constants.DT_EXT,
336 op.disk_template), errors.ECODE_INVAL)
338 data_vg = disk.get(constants.IDISK_VG, default_vg)
339 name = disk.get(constants.IDISK_NAME, None)
340 if name is not None and name.lower() == constants.VALUE_NONE:
343 constants.IDISK_SIZE: size,
344 constants.IDISK_MODE: mode,
345 constants.IDISK_VG: data_vg,
346 constants.IDISK_NAME: name,
350 constants.IDISK_METAVG,
351 constants.IDISK_ADOPT,
352 constants.IDISK_SPINDLES,
355 new_disk[key] = disk[key]
357 # For extstorage, demand the `provider' option and add any
358 # additional parameters (ext-params) to the dict
359 if op.disk_template == constants.DT_EXT:
361 new_disk[constants.IDISK_PROVIDER] = ext_provider
363 if key not in constants.IDISK_PARAMS:
364 new_disk[key] = disk[key]
366 raise errors.OpPrereqError("Missing provider for template '%s'" %
367 constants.DT_EXT, errors.ECODE_INVAL)
369 disks.append(new_disk)
374 def CheckRADOSFreeSpace():
375 """Compute disk size requirements inside the RADOS cluster.
378 # For the RADOS cluster we assume there is always enough space.
382 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
383 iv_name, p_minor, s_minor):
384 """Generate a drbd8 device complete with its children.
387 assert len(vgnames) == len(names) == 2
388 port = lu.cfg.AllocatePort()
389 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
391 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
392 logical_id=(vgnames[0], names[0]),
394 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
395 dev_meta = objects.Disk(dev_type=constants.LD_LV,
396 size=constants.DRBD_META_SIZE,
397 logical_id=(vgnames[1], names[1]),
399 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
400 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
401 logical_id=(primary_uuid, secondary_uuid, port,
404 children=[dev_data, dev_meta],
405 iv_name=iv_name, params={})
406 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
410 def GenerateDiskTemplate(
411 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
412 disk_info, file_storage_dir, file_driver, base_index,
413 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
414 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
415 """Generate the entire disk layout for a given template type.
418 vgname = lu.cfg.GetVGName()
419 disk_count = len(disk_info)
422 if template_name == constants.DT_DISKLESS:
424 elif template_name == constants.DT_DRBD8:
425 if len(secondary_node_uuids) != 1:
426 raise errors.ProgrammerError("Wrong template configuration")
427 remote_node_uuid = secondary_node_uuids[0]
428 minors = lu.cfg.AllocateDRBDMinor(
429 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
431 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
433 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
436 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
437 for i in range(disk_count)]):
438 names.append(lv_prefix + "_data")
439 names.append(lv_prefix + "_meta")
440 for idx, disk in enumerate(disk_info):
441 disk_index = idx + base_index
442 data_vg = disk.get(constants.IDISK_VG, vgname)
443 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
444 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
445 disk[constants.IDISK_SIZE],
447 names[idx * 2:idx * 2 + 2],
448 "disk/%d" % disk_index,
449 minors[idx * 2], minors[idx * 2 + 1])
450 disk_dev.mode = disk[constants.IDISK_MODE]
451 disk_dev.name = disk.get(constants.IDISK_NAME, None)
452 disks.append(disk_dev)
454 if secondary_node_uuids:
455 raise errors.ProgrammerError("Wrong template configuration")
457 if template_name == constants.DT_FILE:
459 elif template_name == constants.DT_SHARED_FILE:
460 _req_shr_file_storage()
462 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
463 if name_prefix is None:
466 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
467 (name_prefix, base_index + i)
468 for i in range(disk_count)])
470 if template_name == constants.DT_PLAIN:
472 def logical_id_fn(idx, _, disk):
473 vg = disk.get(constants.IDISK_VG, vgname)
474 return (vg, names[idx])
476 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
478 lambda _, disk_index, disk: (file_driver,
479 "%s/disk%d" % (file_storage_dir,
481 elif template_name == constants.DT_BLOCK:
483 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
484 disk[constants.IDISK_ADOPT])
485 elif template_name == constants.DT_RBD:
486 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
487 elif template_name == constants.DT_EXT:
488 def logical_id_fn(idx, _, disk):
489 provider = disk.get(constants.IDISK_PROVIDER, None)
491 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
492 " not found", constants.DT_EXT,
493 constants.IDISK_PROVIDER)
494 return (provider, names[idx])
496 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
498 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
500 for idx, disk in enumerate(disk_info):
502 # Only for the Ext template add disk_info to params
503 if template_name == constants.DT_EXT:
504 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
506 if key not in constants.IDISK_PARAMS:
507 params[key] = disk[key]
508 disk_index = idx + base_index
509 size = disk[constants.IDISK_SIZE]
510 feedback_fn("* disk %s, size %s" %
511 (disk_index, utils.FormatUnit(size, "h")))
512 disk_dev = objects.Disk(dev_type=dev_type, size=size,
513 logical_id=logical_id_fn(idx, disk_index, disk),
514 iv_name="disk/%d" % disk_index,
515 mode=disk[constants.IDISK_MODE],
517 spindles=disk.get(constants.IDISK_SPINDLES))
518 disk_dev.name = disk.get(constants.IDISK_NAME, None)
519 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
520 disks.append(disk_dev)
525 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
526 """Check the presence of the spindle options with exclusive_storage.
529 @param diskdict: disk parameters
531 @param es_flag: the effective value of the exlusive_storage flag
533 @param required: whether spindles are required or just optional
534 @raise errors.OpPrereqError when spindles are given and they should not
537 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
538 diskdict[constants.IDISK_SPINDLES] is not None):
539 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
540 " when exclusive storage is not active",
542 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
543 diskdict[constants.IDISK_SPINDLES] is None)):
544 raise errors.OpPrereqError("You must specify spindles in instance disks"
545 " when exclusive storage is active",
549 class LUInstanceRecreateDisks(LogicalUnit):
550 """Recreate an instance's missing disks.
553 HPATH = "instance-recreate-disks"
554 HTYPE = constants.HTYPE_INSTANCE
557 _MODIFYABLE = compat.UniqueFrozenset([
558 constants.IDISK_SIZE,
559 constants.IDISK_MODE,
560 constants.IDISK_SPINDLES,
563 # New or changed disk parameters may have different semantics
564 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
565 constants.IDISK_ADOPT,
567 # TODO: Implement support changing VG while recreating
569 constants.IDISK_METAVG,
570 constants.IDISK_PROVIDER,
571 constants.IDISK_NAME,
574 def _RunAllocator(self):
575 """Run the allocator based on input opcode.
578 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
581 # The allocator should actually run in "relocate" mode, but current
582 # allocators don't support relocating all the nodes of an instance at
583 # the same time. As a workaround we use "allocate" mode, but this is
584 # suboptimal for two reasons:
585 # - The instance name passed to the allocator is present in the list of
586 # existing instances, so there could be a conflict within the
587 # internal structures of the allocator. This doesn't happen with the
588 # current allocators, but it's a liability.
589 # - The allocator counts the resources used by the instance twice: once
590 # because the instance exists already, and once because it tries to
591 # allocate a new instance.
592 # The allocator could choose some of the nodes on which the instance is
593 # running, but that's not a problem. If the instance nodes are broken,
594 # they should be already be marked as drained or offline, and hence
595 # skipped by the allocator. If instance disks have been lost for other
596 # reasons, then recreating the disks on the same nodes should be fine.
597 disk_template = self.instance.disk_template
598 spindle_use = be_full[constants.BE_SPINDLE_USE]
600 constants.IDISK_SIZE: d.size,
601 constants.IDISK_MODE: d.mode,
602 constants.IDISK_SPINDLES: d.spindles,
603 } for d in self.instance.disks]
604 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
605 disk_template=disk_template,
606 tags=list(self.instance.GetTags()),
609 vcpus=be_full[constants.BE_VCPUS],
610 memory=be_full[constants.BE_MAXMEM],
611 spindle_use=spindle_use,
613 hypervisor=self.instance.hypervisor,
615 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
617 ial.Run(self.op.iallocator)
619 assert req.RequiredNodes() == len(self.instance.all_nodes)
622 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
623 " %s" % (self.op.iallocator, ial.info),
626 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
627 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
628 self.op.instance_name, self.op.iallocator,
629 utils.CommaJoin(self.op.nodes))
631 def CheckArguments(self):
632 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
633 # Normalize and convert deprecated list of disk indices
634 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
636 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
638 raise errors.OpPrereqError("Some disks have been specified more than"
639 " once: %s" % utils.CommaJoin(duplicates),
642 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
643 # when neither iallocator nor nodes are specified
644 if self.op.iallocator or self.op.nodes:
645 CheckIAllocatorOrNode(self, "iallocator", "nodes")
647 for (idx, params) in self.op.disks:
648 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
649 unsupported = frozenset(params.keys()) - self._MODIFYABLE
651 raise errors.OpPrereqError("Parameters for disk %s try to change"
652 " unmodifyable parameter(s): %s" %
653 (idx, utils.CommaJoin(unsupported)),
656 def ExpandNames(self):
657 self._ExpandAndLockInstance()
658 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
661 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
662 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
664 self.needed_locks[locking.LEVEL_NODE] = []
665 if self.op.iallocator:
666 # iallocator will select a new node in the same group
667 self.needed_locks[locking.LEVEL_NODEGROUP] = []
668 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
670 self.needed_locks[locking.LEVEL_NODE_RES] = []
672 def DeclareLocks(self, level):
673 if level == locking.LEVEL_NODEGROUP:
674 assert self.op.iallocator is not None
675 assert not self.op.nodes
676 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
677 self.share_locks[locking.LEVEL_NODEGROUP] = 1
678 # Lock the primary group used by the instance optimistically; this
679 # requires going via the node before it's locked, requiring
680 # verification later on
681 self.needed_locks[locking.LEVEL_NODEGROUP] = \
682 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
684 elif level == locking.LEVEL_NODE:
685 # If an allocator is used, then we lock all the nodes in the current
686 # instance group, as we don't know yet which ones will be selected;
687 # if we replace the nodes without using an allocator, locks are
688 # already declared in ExpandNames; otherwise, we need to lock all the
689 # instance nodes for disk re-creation
690 if self.op.iallocator:
691 assert not self.op.nodes
692 assert not self.needed_locks[locking.LEVEL_NODE]
693 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
695 # Lock member nodes of the group of the primary node
696 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
697 self.needed_locks[locking.LEVEL_NODE].extend(
698 self.cfg.GetNodeGroup(group_uuid).members)
700 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
701 elif not self.op.nodes:
702 self._LockInstancesNodes(primary_only=False)
703 elif level == locking.LEVEL_NODE_RES:
705 self.needed_locks[locking.LEVEL_NODE_RES] = \
706 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
708 def BuildHooksEnv(self):
711 This runs on master, primary and secondary nodes of the instance.
714 return BuildInstanceHookEnvByObject(self, self.instance)
716 def BuildHooksNodes(self):
717 """Build hooks nodes.
720 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
723 def CheckPrereq(self):
724 """Check prerequisites.
726 This checks that the instance is in the cluster and is not running.
729 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
730 assert instance is not None, \
731 "Cannot retrieve locked instance %s" % self.op.instance_name
732 if self.op.node_uuids:
733 if len(self.op.node_uuids) != len(instance.all_nodes):
734 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
735 " %d replacement nodes were specified" %
736 (instance.name, len(instance.all_nodes),
737 len(self.op.node_uuids)),
739 assert instance.disk_template != constants.DT_DRBD8 or \
740 len(self.op.node_uuids) == 2
741 assert instance.disk_template != constants.DT_PLAIN or \
742 len(self.op.node_uuids) == 1
743 primary_node = self.op.node_uuids[0]
745 primary_node = instance.primary_node
746 if not self.op.iallocator:
747 CheckNodeOnline(self, primary_node)
749 if instance.disk_template == constants.DT_DISKLESS:
750 raise errors.OpPrereqError("Instance '%s' has no disks" %
751 self.op.instance_name, errors.ECODE_INVAL)
753 # Verify if node group locks are still correct
754 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
756 # Node group locks are acquired only for the primary node (and only
757 # when the allocator is used)
758 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
761 # if we replace nodes *and* the old primary is offline, we don't
762 # check the instance state
763 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
764 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
765 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
766 msg="cannot recreate disks")
769 self.disks = dict(self.op.disks)
771 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
773 maxidx = max(self.disks.keys())
774 if maxidx >= len(instance.disks):
775 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
778 if ((self.op.node_uuids or self.op.iallocator) and
779 sorted(self.disks.keys()) != range(len(instance.disks))):
780 raise errors.OpPrereqError("Can't recreate disks partially and"
781 " change the nodes at the same time",
784 self.instance = instance
786 if self.op.iallocator:
788 # Release unneeded node and node resource locks
789 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
790 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
791 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
793 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
795 if self.op.node_uuids:
796 node_uuids = self.op.node_uuids
798 node_uuids = instance.all_nodes
799 excl_stor = compat.any(
800 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
802 for new_params in self.disks.values():
803 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
805 def Exec(self, feedback_fn):
806 """Recreate the disks.
809 assert (self.owned_locks(locking.LEVEL_NODE) ==
810 self.owned_locks(locking.LEVEL_NODE_RES))
813 mods = [] # keeps track of needed changes
815 for idx, disk in enumerate(self.instance.disks):
817 changes = self.disks[idx]
819 # Disk should not be recreated
823 # update secondaries for disks, if needed
824 if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
825 # need to update the nodes and minors
826 assert len(self.op.node_uuids) == 2
827 assert len(disk.logical_id) == 6 # otherwise disk internals
829 (_, _, old_port, _, _, old_secret) = disk.logical_id
830 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
832 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
833 new_minors[0], new_minors[1], old_secret)
834 assert len(disk.logical_id) == len(new_id)
838 mods.append((idx, new_id, changes))
840 # now that we have passed all asserts above, we can apply the mods
841 # in a single run (to avoid partial changes)
842 for idx, new_id, changes in mods:
843 disk = self.instance.disks[idx]
844 if new_id is not None:
845 assert disk.dev_type == constants.LD_DRBD8
846 disk.logical_id = new_id
848 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
849 mode=changes.get(constants.IDISK_MODE, None),
850 spindles=changes.get(constants.IDISK_SPINDLES, None))
852 # change primary node, if needed
853 if self.op.node_uuids:
854 self.instance.primary_node = self.op.node_uuids[0]
855 self.LogWarning("Changing the instance's nodes, you will have to"
856 " remove any disks left on the older nodes manually")
858 if self.op.node_uuids:
859 self.cfg.Update(self.instance, feedback_fn)
861 # All touched nodes must be locked
862 mylocks = self.owned_locks(locking.LEVEL_NODE)
863 assert mylocks.issuperset(frozenset(self.instance.all_nodes))
864 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
866 # TODO: Release node locks before wiping, or explain why it's not possible
867 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
868 wipedisks = [(idx, disk, 0)
869 for (idx, disk) in enumerate(self.instance.disks)
870 if idx not in to_skip]
871 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
875 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
876 """Checks if nodes have enough free disk space in the specified VG.
878 This function checks if all given nodes have the needed amount of
879 free disk. In case any node has less disk or we cannot get the
880 information from the node, this function raises an OpPrereqError
883 @type lu: C{LogicalUnit}
884 @param lu: a logical unit from which we get configuration data
885 @type node_uuids: C{list}
886 @param node_uuids: the list of node UUIDs to check
888 @param vg: the volume group to check
889 @type requested: C{int}
890 @param requested: the amount of disk in MiB to check for
891 @raise errors.OpPrereqError: if the node doesn't have enough disk,
892 or we cannot check the node
895 es_flags = rpc.GetExclusiveStorageForNodes(lu.cfg, node_uuids)
896 hvname = lu.cfg.GetHypervisorType()
897 hvparams = lu.cfg.GetClusterInfo().hvparams
898 nodeinfo = lu.rpc.call_node_info(node_uuids, [(constants.ST_LVM_VG, vg)],
899 [(hvname, hvparams[hvname])], es_flags)
900 for node in node_uuids:
901 node_name = lu.cfg.GetNodeName(node)
903 info = nodeinfo[node]
904 info.Raise("Cannot get current information from node %s" % node_name,
905 prereq=True, ecode=errors.ECODE_ENVIRON)
906 (_, (vg_info, ), _) = info.payload
907 vg_free = vg_info.get("storage_free", None)
908 if not isinstance(vg_free, int):
909 raise errors.OpPrereqError("Can't compute free disk space on node"
910 " %s for vg %s, result was '%s'" %
911 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
912 if requested > vg_free:
913 raise errors.OpPrereqError("Not enough disk space on target node %s"
914 " vg %s: required %d MiB, available %d MiB" %
915 (node_name, vg, requested, vg_free),
919 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
920 """Checks if nodes have enough free disk space in all the VGs.
922 This function checks if all given nodes have the needed amount of
923 free disk. In case any node has less disk or we cannot get the
924 information from the node, this function raises an OpPrereqError
927 @type lu: C{LogicalUnit}
928 @param lu: a logical unit from which we get configuration data
929 @type node_uuids: C{list}
930 @param node_uuids: the list of node UUIDs to check
931 @type req_sizes: C{dict}
932 @param req_sizes: the hash of vg and corresponding amount of disk in
934 @raise errors.OpPrereqError: if the node doesn't have enough disk,
935 or we cannot check the node
938 for vg, req_size in req_sizes.items():
939 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
942 def _DiskSizeInBytesToMebibytes(lu, size):
943 """Converts a disk size in bytes to mebibytes.
945 Warns and rounds up if the size isn't an even multiple of 1 MiB.
948 (mib, remainder) = divmod(size, 1024 * 1024)
951 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
952 " to not overwrite existing data (%s bytes will not be"
953 " wiped)", (1024 * 1024) - remainder)
959 def _CalcEta(time_taken, written, total_size):
960 """Calculates the ETA based on size written and total size.
962 @param time_taken: The time taken so far
963 @param written: amount written so far
964 @param total_size: The total size of data to be written
965 @return: The remaining time in seconds
968 avg_time = time_taken / float(written)
969 return (total_size - written) * avg_time
972 def WipeDisks(lu, instance, disks=None):
973 """Wipes instance disks.
975 @type lu: L{LogicalUnit}
976 @param lu: the logical unit on whose behalf we execute
977 @type instance: L{objects.Instance}
978 @param instance: the instance whose disks we should create
979 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
980 @param disks: Disk details; tuple contains disk index, disk object and the
984 node_uuid = instance.primary_node
985 node_name = lu.cfg.GetNodeName(node_uuid)
988 disks = [(idx, disk, 0)
989 for (idx, disk) in enumerate(instance.disks)]
991 for (_, device, _) in disks:
992 lu.cfg.SetDiskID(device, node_uuid)
994 logging.info("Pausing synchronization of disks of instance '%s'",
996 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
997 (map(compat.snd, disks),
1000 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1002 for idx, success in enumerate(result.payload):
1004 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1005 " failed", idx, instance.name)
1008 for (idx, device, offset) in disks:
1009 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1010 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1012 int(min(constants.MAX_WIPE_CHUNK,
1013 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1017 start_time = time.time()
1022 info_text = (" (from %s to %s)" %
1023 (utils.FormatUnit(offset, "h"),
1024 utils.FormatUnit(size, "h")))
1026 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1028 logging.info("Wiping disk %d for instance %s on node %s using"
1029 " chunk size %s", idx, instance.name, node_name,
1032 while offset < size:
1033 wipe_size = min(wipe_chunk_size, size - offset)
1035 logging.debug("Wiping disk %d, offset %s, chunk %s",
1036 idx, offset, wipe_size)
1038 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1040 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1041 (idx, offset, wipe_size))
1045 if now - last_output >= 60:
1046 eta = _CalcEta(now - start_time, offset, size)
1047 lu.LogInfo(" - done: %.1f%% ETA: %s",
1048 offset / float(size) * 100, utils.FormatSeconds(eta))
1051 logging.info("Resuming synchronization of disks for instance '%s'",
1054 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1055 (map(compat.snd, disks),
1060 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1061 node_name, result.fail_msg)
1063 for idx, success in enumerate(result.payload):
1065 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1066 " failed", idx, instance.name)
1069 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1070 """Wrapper for L{WipeDisks} that handles errors.
1072 @type lu: L{LogicalUnit}
1073 @param lu: the logical unit on whose behalf we execute
1074 @type instance: L{objects.Instance}
1075 @param instance: the instance whose disks we should wipe
1076 @param disks: see L{WipeDisks}
1077 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1079 @raise errors.OpPrereqError: in case of failure
1083 WipeDisks(lu, instance, disks=disks)
1084 except errors.OpExecError:
1085 logging.warning("Wiping disks for instance '%s' failed",
1087 _UndoCreateDisks(lu, cleanup)
1091 def ExpandCheckDisks(instance, disks):
1092 """Return the instance disks selected by the disks list
1094 @type disks: list of L{objects.Disk} or None
1095 @param disks: selected disks
1096 @rtype: list of L{objects.Disk}
1097 @return: selected instance disks to act on
1101 return instance.disks
1103 if not set(disks).issubset(instance.disks):
1104 raise errors.ProgrammerError("Can only act on disks belonging to the"
1105 " target instance: expected a subset of %r,"
1106 " got %r" % (instance.disks, disks))
1110 def WaitForSync(lu, instance, disks=None, oneshot=False):
1111 """Sleep and poll for an instance's disk to sync.
1114 if not instance.disks or disks is not None and not disks:
1117 disks = ExpandCheckDisks(instance, disks)
1120 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1122 node_uuid = instance.primary_node
1123 node_name = lu.cfg.GetNodeName(node_uuid)
1126 lu.cfg.SetDiskID(dev, node_uuid)
1128 # TODO: Convert to utils.Retry
1131 degr_retries = 10 # in seconds, as we sleep 1 second each time
1135 cumul_degraded = False
1136 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1137 msg = rstats.fail_msg
1139 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1142 raise errors.RemoteError("Can't contact node %s for mirror data,"
1143 " aborting." % node_name)
1146 rstats = rstats.payload
1148 for i, mstat in enumerate(rstats):
1150 lu.LogWarning("Can't compute data for node %s/%s",
1151 node_name, disks[i].iv_name)
1154 cumul_degraded = (cumul_degraded or
1155 (mstat.is_degraded and mstat.sync_percent is None))
1156 if mstat.sync_percent is not None:
1158 if mstat.estimated_time is not None:
1159 rem_time = ("%s remaining (estimated)" %
1160 utils.FormatSeconds(mstat.estimated_time))
1161 max_time = mstat.estimated_time
1163 rem_time = "no time estimate"
1164 lu.LogInfo("- device %s: %5.2f%% done, %s",
1165 disks[i].iv_name, mstat.sync_percent, rem_time)
1167 # if we're done but degraded, let's do a few small retries, to
1168 # make sure we see a stable and not transient situation; therefore
1169 # we force restart of the loop
1170 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1171 logging.info("Degraded disks found, %d retries left", degr_retries)
1179 time.sleep(min(60, max_time))
1182 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1184 return not cumul_degraded
1187 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1188 """Shutdown block devices of an instance.
1190 This does the shutdown on all nodes of the instance.
1192 If the ignore_primary is false, errors on the primary node are
1196 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1198 disks = ExpandCheckDisks(instance, disks)
1201 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1202 lu.cfg.SetDiskID(top_disk, node_uuid)
1203 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1204 msg = result.fail_msg
1206 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1207 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1208 if ((node_uuid == instance.primary_node and not ignore_primary) or
1209 (node_uuid != instance.primary_node and not result.offline)):
1214 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1215 """Shutdown block devices of an instance.
1217 This function checks if an instance is running, before calling
1218 _ShutdownInstanceDisks.
1221 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1222 ShutdownInstanceDisks(lu, instance, disks=disks)
1225 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1227 """Prepare the block devices for an instance.
1229 This sets up the block devices on all nodes.
1231 @type lu: L{LogicalUnit}
1232 @param lu: the logical unit on whose behalf we execute
1233 @type instance: L{objects.Instance}
1234 @param instance: the instance for whose disks we assemble
1235 @type disks: list of L{objects.Disk} or None
1236 @param disks: which disks to assemble (or all, if None)
1237 @type ignore_secondaries: boolean
1238 @param ignore_secondaries: if true, errors on secondary nodes
1239 won't result in an error return from the function
1240 @type ignore_size: boolean
1241 @param ignore_size: if true, the current known size of the disk
1242 will not be used during the disk activation, useful for cases
1243 when the size is wrong
1244 @return: False if the operation failed, otherwise a list of
1245 (host, instance_visible_name, node_visible_name)
1246 with the mapping from node devices to instance devices
1251 disks = ExpandCheckDisks(instance, disks)
1253 # With the two passes mechanism we try to reduce the window of
1254 # opportunity for the race condition of switching DRBD to primary
1255 # before handshaking occured, but we do not eliminate it
1257 # The proper fix would be to wait (with some limits) until the
1258 # connection has been made and drbd transitions from WFConnection
1259 # into any other network-connected state (Connected, SyncTarget,
1262 # mark instance disks as active before doing actual work, so watcher does
1263 # not try to shut them down erroneously
1264 lu.cfg.MarkInstanceDisksActive(instance.uuid)
1266 # 1st pass, assemble on all nodes in secondary mode
1267 for idx, inst_disk in enumerate(disks):
1268 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1269 instance.primary_node):
1271 node_disk = node_disk.Copy()
1272 node_disk.UnsetSize()
1273 lu.cfg.SetDiskID(node_disk, node_uuid)
1274 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1275 instance.name, False, idx)
1276 msg = result.fail_msg
1278 is_offline_secondary = (node_uuid in instance.secondary_nodes and
1280 lu.LogWarning("Could not prepare block device %s on node %s"
1281 " (is_primary=False, pass=1): %s",
1282 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1283 if not (ignore_secondaries or is_offline_secondary):
1286 # FIXME: race condition on drbd migration to primary
1288 # 2nd pass, do only the primary node
1289 for idx, inst_disk in enumerate(disks):
1292 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1293 instance.primary_node):
1294 if node_uuid != instance.primary_node:
1297 node_disk = node_disk.Copy()
1298 node_disk.UnsetSize()
1299 lu.cfg.SetDiskID(node_disk, node_uuid)
1300 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1301 instance.name, True, idx)
1302 msg = result.fail_msg
1304 lu.LogWarning("Could not prepare block device %s on node %s"
1305 " (is_primary=True, pass=2): %s",
1306 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1309 dev_path = result.payload
1311 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1312 inst_disk.iv_name, dev_path))
1314 # leave the disks configured for the primary node
1315 # this is a workaround that would be fixed better by
1316 # improving the logical/physical id handling
1318 lu.cfg.SetDiskID(disk, instance.primary_node)
1321 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1323 return disks_ok, device_info
1326 def StartInstanceDisks(lu, instance, force):
1327 """Start the disks of an instance.
1330 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1331 ignore_secondaries=force)
1333 ShutdownInstanceDisks(lu, instance)
1334 if force is not None and not force:
1336 hint=("If the message above refers to a secondary node,"
1337 " you can retry the operation using '--force'"))
1338 raise errors.OpExecError("Disk consistency error")
1341 class LUInstanceGrowDisk(LogicalUnit):
1342 """Grow a disk of an instance.
1346 HTYPE = constants.HTYPE_INSTANCE
1349 def ExpandNames(self):
1350 self._ExpandAndLockInstance()
1351 self.needed_locks[locking.LEVEL_NODE] = []
1352 self.needed_locks[locking.LEVEL_NODE_RES] = []
1353 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1354 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1356 def DeclareLocks(self, level):
1357 if level == locking.LEVEL_NODE:
1358 self._LockInstancesNodes()
1359 elif level == locking.LEVEL_NODE_RES:
1361 self.needed_locks[locking.LEVEL_NODE_RES] = \
1362 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1364 def BuildHooksEnv(self):
1367 This runs on the master, the primary and all the secondaries.
1371 "DISK": self.op.disk,
1372 "AMOUNT": self.op.amount,
1373 "ABSOLUTE": self.op.absolute,
1375 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1378 def BuildHooksNodes(self):
1379 """Build hooks nodes.
1382 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1385 def CheckPrereq(self):
1386 """Check prerequisites.
1388 This checks that the instance is in the cluster.
1391 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1392 assert self.instance is not None, \
1393 "Cannot retrieve locked instance %s" % self.op.instance_name
1394 node_uuids = list(self.instance.all_nodes)
1395 for node_uuid in node_uuids:
1396 CheckNodeOnline(self, node_uuid)
1397 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1399 if self.instance.disk_template not in constants.DTS_GROWABLE:
1400 raise errors.OpPrereqError("Instance's disk layout does not support"
1401 " growing", errors.ECODE_INVAL)
1403 self.disk = self.instance.FindDisk(self.op.disk)
1405 if self.op.absolute:
1406 self.target = self.op.amount
1407 self.delta = self.target - self.disk.size
1409 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1410 "current disk size (%s)" %
1411 (utils.FormatUnit(self.target, "h"),
1412 utils.FormatUnit(self.disk.size, "h")),
1415 self.delta = self.op.amount
1416 self.target = self.disk.size + self.delta
1418 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1419 utils.FormatUnit(self.delta, "h"),
1422 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1424 def _CheckDiskSpace(self, node_uuids, req_vgspace):
1425 template = self.instance.disk_template
1426 if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1427 # TODO: check the free disk space for file, when that feature will be
1429 if any(self.node_es_flags.values()):
1430 # With exclusive storage we need to something smarter than just looking
1431 # at free space; for now, let's simply abort the operation.
1432 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1433 " is enabled", errors.ECODE_STATE)
1434 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1436 def Exec(self, feedback_fn):
1437 """Execute disk grow.
1440 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1441 assert (self.owned_locks(locking.LEVEL_NODE) ==
1442 self.owned_locks(locking.LEVEL_NODE_RES))
1444 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1446 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1448 raise errors.OpExecError("Cannot activate block device to grow")
1450 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1451 (self.op.disk, self.instance.name,
1452 utils.FormatUnit(self.delta, "h"),
1453 utils.FormatUnit(self.target, "h")))
1455 # First run all grow ops in dry-run mode
1456 for node_uuid in self.instance.all_nodes:
1457 self.cfg.SetDiskID(self.disk, node_uuid)
1458 result = self.rpc.call_blockdev_grow(node_uuid,
1459 (self.disk, self.instance),
1460 self.delta, True, True,
1461 self.node_es_flags[node_uuid])
1462 result.Raise("Dry-run grow request failed to node %s" %
1463 self.cfg.GetNodeName(node_uuid))
1466 # Get disk size from primary node for wiping
1467 self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1468 result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1470 result.Raise("Failed to retrieve disk size from node '%s'" %
1471 self.instance.primary_node)
1473 (disk_dimensions, ) = result.payload
1475 if disk_dimensions is None:
1476 raise errors.OpExecError("Failed to retrieve disk size from primary"
1477 " node '%s'" % self.instance.primary_node)
1478 (disk_size_in_bytes, _) = disk_dimensions
1480 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1482 assert old_disk_size >= self.disk.size, \
1483 ("Retrieved disk size too small (got %s, should be at least %s)" %
1484 (old_disk_size, self.disk.size))
1486 old_disk_size = None
1488 # We know that (as far as we can test) operations across different
1489 # nodes will succeed, time to run it for real on the backing storage
1490 for node_uuid in self.instance.all_nodes:
1491 self.cfg.SetDiskID(self.disk, node_uuid)
1492 result = self.rpc.call_blockdev_grow(node_uuid,
1493 (self.disk, self.instance),
1494 self.delta, False, True,
1495 self.node_es_flags[node_uuid])
1496 result.Raise("Grow request failed to node %s" %
1497 self.cfg.GetNodeName(node_uuid))
1499 # And now execute it for logical storage, on the primary node
1500 node_uuid = self.instance.primary_node
1501 self.cfg.SetDiskID(self.disk, node_uuid)
1502 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1503 self.delta, False, False,
1504 self.node_es_flags[node_uuid])
1505 result.Raise("Grow request failed to node %s" %
1506 self.cfg.GetNodeName(node_uuid))
1508 self.disk.RecordGrow(self.delta)
1509 self.cfg.Update(self.instance, feedback_fn)
1511 # Changes have been recorded, release node lock
1512 ReleaseLocks(self, locking.LEVEL_NODE)
1514 # Downgrade lock while waiting for sync
1515 self.glm.downgrade(locking.LEVEL_INSTANCE)
1517 assert wipe_disks ^ (old_disk_size is None)
1520 assert self.instance.disks[self.op.disk] == self.disk
1522 # Wipe newly added disk space
1523 WipeDisks(self, self.instance,
1524 disks=[(self.op.disk, self.disk, old_disk_size)])
1526 if self.op.wait_for_sync:
1527 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1529 self.LogWarning("Disk syncing has not returned a good status; check"
1531 if not self.instance.disks_active:
1532 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1533 elif not self.instance.disks_active:
1534 self.LogWarning("Not shutting down the disk even if the instance is"
1535 " not supposed to be running because no wait for"
1536 " sync mode was requested")
1538 assert self.owned_locks(locking.LEVEL_NODE_RES)
1539 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1542 class LUInstanceReplaceDisks(LogicalUnit):
1543 """Replace the disks of an instance.
1546 HPATH = "mirrors-replace"
1547 HTYPE = constants.HTYPE_INSTANCE
1550 def CheckArguments(self):
1554 if self.op.mode == constants.REPLACE_DISK_CHG:
1555 if self.op.remote_node is None and self.op.iallocator is None:
1556 raise errors.OpPrereqError("When changing the secondary either an"
1557 " iallocator script must be used or the"
1558 " new node given", errors.ECODE_INVAL)
1560 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1562 elif self.op.remote_node is not None or self.op.iallocator is not None:
1563 # Not replacing the secondary
1564 raise errors.OpPrereqError("The iallocator and new node options can"
1565 " only be used when changing the"
1566 " secondary node", errors.ECODE_INVAL)
1568 def ExpandNames(self):
1569 self._ExpandAndLockInstance()
1571 assert locking.LEVEL_NODE not in self.needed_locks
1572 assert locking.LEVEL_NODE_RES not in self.needed_locks
1573 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1575 assert self.op.iallocator is None or self.op.remote_node is None, \
1576 "Conflicting options"
1578 if self.op.remote_node is not None:
1579 (self.op.remote_node_uuid, self.op.remote_node) = \
1580 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1581 self.op.remote_node)
1583 # Warning: do not remove the locking of the new secondary here
1584 # unless DRBD8Dev.AddChildren is changed to work in parallel;
1585 # currently it doesn't since parallel invocations of
1586 # FindUnusedMinor will conflict
1587 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1588 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1590 self.needed_locks[locking.LEVEL_NODE] = []
1591 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1593 if self.op.iallocator is not None:
1594 # iallocator will select a new node in the same group
1595 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1596 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1598 self.needed_locks[locking.LEVEL_NODE_RES] = []
1600 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1601 self.op.instance_name, self.op.mode,
1602 self.op.iallocator, self.op.remote_node_uuid,
1603 self.op.disks, self.op.early_release,
1604 self.op.ignore_ipolicy)
1606 self.tasklets = [self.replacer]
1608 def DeclareLocks(self, level):
1609 if level == locking.LEVEL_NODEGROUP:
1610 assert self.op.remote_node_uuid is None
1611 assert self.op.iallocator is not None
1612 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1614 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1615 # Lock all groups used by instance optimistically; this requires going
1616 # via the node before it's locked, requiring verification later on
1617 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1618 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1620 elif level == locking.LEVEL_NODE:
1621 if self.op.iallocator is not None:
1622 assert self.op.remote_node_uuid is None
1623 assert not self.needed_locks[locking.LEVEL_NODE]
1624 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1626 # Lock member nodes of all locked groups
1627 self.needed_locks[locking.LEVEL_NODE] = \
1629 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1630 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1632 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1634 self._LockInstancesNodes()
1636 elif level == locking.LEVEL_NODE_RES:
1638 self.needed_locks[locking.LEVEL_NODE_RES] = \
1639 self.needed_locks[locking.LEVEL_NODE]
1641 def BuildHooksEnv(self):
1644 This runs on the master, the primary and all the secondaries.
1647 instance = self.replacer.instance
1649 "MODE": self.op.mode,
1650 "NEW_SECONDARY": self.op.remote_node,
1651 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1653 env.update(BuildInstanceHookEnvByObject(self, instance))
1656 def BuildHooksNodes(self):
1657 """Build hooks nodes.
1660 instance = self.replacer.instance
1662 self.cfg.GetMasterNode(),
1663 instance.primary_node,
1665 if self.op.remote_node_uuid is not None:
1666 nl.append(self.op.remote_node_uuid)
1669 def CheckPrereq(self):
1670 """Check prerequisites.
1673 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1674 self.op.iallocator is None)
1676 # Verify if node group locks are still correct
1677 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1679 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1681 return LogicalUnit.CheckPrereq(self)
1684 class LUInstanceActivateDisks(NoHooksLU):
1685 """Bring up an instance's disks.
1690 def ExpandNames(self):
1691 self._ExpandAndLockInstance()
1692 self.needed_locks[locking.LEVEL_NODE] = []
1693 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1695 def DeclareLocks(self, level):
1696 if level == locking.LEVEL_NODE:
1697 self._LockInstancesNodes()
1699 def CheckPrereq(self):
1700 """Check prerequisites.
1702 This checks that the instance is in the cluster.
1705 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1706 assert self.instance is not None, \
1707 "Cannot retrieve locked instance %s" % self.op.instance_name
1708 CheckNodeOnline(self, self.instance.primary_node)
1710 def Exec(self, feedback_fn):
1711 """Activate the disks.
1714 disks_ok, disks_info = \
1715 AssembleInstanceDisks(self, self.instance,
1716 ignore_size=self.op.ignore_size)
1718 raise errors.OpExecError("Cannot activate block devices")
1720 if self.op.wait_for_sync:
1721 if not WaitForSync(self, self.instance):
1722 self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1723 raise errors.OpExecError("Some disks of the instance are degraded!")
1728 class LUInstanceDeactivateDisks(NoHooksLU):
1729 """Shutdown an instance's disks.
1734 def ExpandNames(self):
1735 self._ExpandAndLockInstance()
1736 self.needed_locks[locking.LEVEL_NODE] = []
1737 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1739 def DeclareLocks(self, level):
1740 if level == locking.LEVEL_NODE:
1741 self._LockInstancesNodes()
1743 def CheckPrereq(self):
1744 """Check prerequisites.
1746 This checks that the instance is in the cluster.
1749 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1750 assert self.instance is not None, \
1751 "Cannot retrieve locked instance %s" % self.op.instance_name
1753 def Exec(self, feedback_fn):
1754 """Deactivate the disks
1758 ShutdownInstanceDisks(self, self.instance)
1760 _SafeShutdownInstanceDisks(self, self.instance)
1763 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1765 """Check that mirrors are not degraded.
1767 @attention: The device has to be annotated already.
1769 The ldisk parameter, if True, will change the test from the
1770 is_degraded attribute (which represents overall non-ok status for
1771 the device(s)) to the ldisk (representing the local storage status).
1774 lu.cfg.SetDiskID(dev, node_uuid)
1778 if on_primary or dev.AssembleOnSecondary():
1779 rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1780 msg = rstats.fail_msg
1782 lu.LogWarning("Can't find disk on node %s: %s",
1783 lu.cfg.GetNodeName(node_uuid), msg)
1785 elif not rstats.payload:
1786 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1790 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1792 result = result and not rstats.payload.is_degraded
1795 for child in dev.children:
1796 result = result and _CheckDiskConsistencyInner(lu, instance, child,
1797 node_uuid, on_primary)
1802 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1803 """Wrapper around L{_CheckDiskConsistencyInner}.
1806 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1807 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1811 def _BlockdevFind(lu, node_uuid, dev, instance):
1812 """Wrapper around call_blockdev_find to annotate diskparams.
1814 @param lu: A reference to the lu object
1815 @param node_uuid: The node to call out
1816 @param dev: The device to find
1817 @param instance: The instance object the device belongs to
1818 @returns The result of the rpc call
1821 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1822 return lu.rpc.call_blockdev_find(node_uuid, disk)
1825 def _GenerateUniqueNames(lu, exts):
1826 """Generate a suitable LV name.
1828 This will generate a logical volume name for the given instance.
1833 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1834 results.append("%s%s" % (new_id, val))
1838 class TLReplaceDisks(Tasklet):
1839 """Replaces disks for an instance.
1841 Note: Locking is not within the scope of this class.
1844 def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1845 remote_node_uuid, disks, early_release, ignore_ipolicy):
1846 """Initializes this class.
1849 Tasklet.__init__(self, lu)
1852 self.instance_uuid = instance_uuid
1853 self.instance_name = instance_name
1855 self.iallocator_name = iallocator_name
1856 self.remote_node_uuid = remote_node_uuid
1858 self.early_release = early_release
1859 self.ignore_ipolicy = ignore_ipolicy
1862 self.instance = None
1863 self.new_node_uuid = None
1864 self.target_node_uuid = None
1865 self.other_node_uuid = None
1866 self.remote_node_info = None
1867 self.node_secondary_ip = None
1870 def _RunAllocator(lu, iallocator_name, instance_uuid,
1871 relocate_from_node_uuids):
1872 """Compute a new secondary node using an IAllocator.
1875 req = iallocator.IAReqRelocate(
1876 inst_uuid=instance_uuid,
1877 relocate_from_node_uuids=list(relocate_from_node_uuids))
1878 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1880 ial.Run(iallocator_name)
1883 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1884 " %s" % (iallocator_name, ial.info),
1887 remote_node_name = ial.result[0]
1888 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1890 if remote_node is None:
1891 raise errors.OpPrereqError("Node %s not found in configuration" %
1892 remote_node_name, errors.ECODE_NOENT)
1894 lu.LogInfo("Selected new secondary for instance '%s': %s",
1895 instance_uuid, remote_node_name)
1897 return remote_node.uuid
1899 def _FindFaultyDisks(self, node_uuid):
1900 """Wrapper for L{FindFaultyInstanceDisks}.
1903 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1906 def _CheckDisksActivated(self, instance):
1907 """Checks if the instance disks are activated.
1909 @param instance: The instance to check disks
1910 @return: True if they are activated, False otherwise
1913 node_uuids = instance.all_nodes
1915 for idx, dev in enumerate(instance.disks):
1916 for node_uuid in node_uuids:
1917 self.lu.LogInfo("Checking disk/%d on %s", idx,
1918 self.cfg.GetNodeName(node_uuid))
1919 self.cfg.SetDiskID(dev, node_uuid)
1921 result = _BlockdevFind(self, node_uuid, dev, instance)
1925 elif result.fail_msg or not result.payload:
1930 def CheckPrereq(self):
1931 """Check prerequisites.
1933 This checks that the instance is in the cluster.
1936 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1937 assert self.instance is not None, \
1938 "Cannot retrieve locked instance %s" % self.instance_name
1940 if self.instance.disk_template != constants.DT_DRBD8:
1941 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1942 " instances", errors.ECODE_INVAL)
1944 if len(self.instance.secondary_nodes) != 1:
1945 raise errors.OpPrereqError("The instance has a strange layout,"
1946 " expected one secondary but found %d" %
1947 len(self.instance.secondary_nodes),
1950 secondary_node_uuid = self.instance.secondary_nodes[0]
1952 if self.iallocator_name is None:
1953 remote_node_uuid = self.remote_node_uuid
1955 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1957 self.instance.secondary_nodes)
1959 if remote_node_uuid is None:
1960 self.remote_node_info = None
1962 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1963 "Remote node '%s' is not locked" % remote_node_uuid
1965 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1966 assert self.remote_node_info is not None, \
1967 "Cannot retrieve locked node %s" % remote_node_uuid
1969 if remote_node_uuid == self.instance.primary_node:
1970 raise errors.OpPrereqError("The specified node is the primary node of"
1971 " the instance", errors.ECODE_INVAL)
1973 if remote_node_uuid == secondary_node_uuid:
1974 raise errors.OpPrereqError("The specified node is already the"
1975 " secondary node of the instance",
1978 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1979 constants.REPLACE_DISK_CHG):
1980 raise errors.OpPrereqError("Cannot specify disks to be replaced",
1983 if self.mode == constants.REPLACE_DISK_AUTO:
1984 if not self._CheckDisksActivated(self.instance):
1985 raise errors.OpPrereqError("Please run activate-disks on instance %s"
1986 " first" % self.instance_name,
1988 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
1989 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
1991 if faulty_primary and faulty_secondary:
1992 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1993 " one node and can not be repaired"
1994 " automatically" % self.instance_name,
1998 self.disks = faulty_primary
1999 self.target_node_uuid = self.instance.primary_node
2000 self.other_node_uuid = secondary_node_uuid
2001 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2002 elif faulty_secondary:
2003 self.disks = faulty_secondary
2004 self.target_node_uuid = secondary_node_uuid
2005 self.other_node_uuid = self.instance.primary_node
2006 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2012 # Non-automatic modes
2013 if self.mode == constants.REPLACE_DISK_PRI:
2014 self.target_node_uuid = self.instance.primary_node
2015 self.other_node_uuid = secondary_node_uuid
2016 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2018 elif self.mode == constants.REPLACE_DISK_SEC:
2019 self.target_node_uuid = secondary_node_uuid
2020 self.other_node_uuid = self.instance.primary_node
2021 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2023 elif self.mode == constants.REPLACE_DISK_CHG:
2024 self.new_node_uuid = remote_node_uuid
2025 self.other_node_uuid = self.instance.primary_node
2026 self.target_node_uuid = secondary_node_uuid
2027 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2029 CheckNodeNotDrained(self.lu, remote_node_uuid)
2030 CheckNodeVmCapable(self.lu, remote_node_uuid)
2032 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2033 assert old_node_info is not None
2034 if old_node_info.offline and not self.early_release:
2035 # doesn't make sense to delay the release
2036 self.early_release = True
2037 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2038 " early-release mode", secondary_node_uuid)
2041 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2044 # If not specified all disks should be replaced
2046 self.disks = range(len(self.instance.disks))
2048 # TODO: This is ugly, but right now we can't distinguish between internal
2049 # submitted opcode and external one. We should fix that.
2050 if self.remote_node_info:
2051 # We change the node, lets verify it still meets instance policy
2052 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2053 cluster = self.cfg.GetClusterInfo()
2054 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2056 CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2057 self.remote_node_info, self.cfg,
2058 ignore=self.ignore_ipolicy)
2060 for node_uuid in check_nodes:
2061 CheckNodeOnline(self.lu, node_uuid)
2063 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2064 self.other_node_uuid,
2065 self.target_node_uuid]
2066 if node_uuid is not None)
2068 # Release unneeded node and node resource locks
2069 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2070 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2071 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2073 # Release any owned node group
2074 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2076 # Check whether disks are valid
2077 for disk_idx in self.disks:
2078 self.instance.FindDisk(disk_idx)
2080 # Get secondary node IP addresses
2081 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2082 in self.cfg.GetMultiNodeInfo(touched_nodes))
2084 def Exec(self, feedback_fn):
2085 """Execute disk replacement.
2087 This dispatches the disk replacement to the appropriate handler.
2091 # Verify owned locks before starting operation
2092 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2093 assert set(owned_nodes) == set(self.node_secondary_ip), \
2094 ("Incorrect node locks, owning %s, expected %s" %
2095 (owned_nodes, self.node_secondary_ip.keys()))
2096 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2097 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2098 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2100 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2101 assert list(owned_instances) == [self.instance_name], \
2102 "Instance '%s' not locked" % self.instance_name
2104 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2105 "Should not own any node group lock at this point"
2108 feedback_fn("No disks need replacement for instance '%s'" %
2112 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2113 (utils.CommaJoin(self.disks), self.instance.name))
2114 feedback_fn("Current primary node: %s" %
2115 self.cfg.GetNodeName(self.instance.primary_node))
2116 feedback_fn("Current seconary node: %s" %
2117 utils.CommaJoin(self.cfg.GetNodeNames(
2118 self.instance.secondary_nodes)))
2120 activate_disks = not self.instance.disks_active
2122 # Activate the instance disks if we're replacing them on a down instance
2124 StartInstanceDisks(self.lu, self.instance, True)
2127 # Should we replace the secondary node?
2128 if self.new_node_uuid is not None:
2129 fn = self._ExecDrbd8Secondary
2131 fn = self._ExecDrbd8DiskOnly
2133 result = fn(feedback_fn)
2135 # Deactivate the instance disks if we're replacing them on a
2138 _SafeShutdownInstanceDisks(self.lu, self.instance)
2140 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2143 # Verify owned locks
2144 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2145 nodes = frozenset(self.node_secondary_ip)
2146 assert ((self.early_release and not owned_nodes) or
2147 (not self.early_release and not (set(owned_nodes) - nodes))), \
2148 ("Not owning the correct locks, early_release=%s, owned=%r,"
2149 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2153 def _CheckVolumeGroup(self, node_uuids):
2154 self.lu.LogInfo("Checking volume groups")
2156 vgname = self.cfg.GetVGName()
2158 # Make sure volume group exists on all involved nodes
2159 results = self.rpc.call_vg_list(node_uuids)
2161 raise errors.OpExecError("Can't list volume groups on the nodes")
2163 for node_uuid in node_uuids:
2164 res = results[node_uuid]
2165 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2166 if vgname not in res.payload:
2167 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2168 (vgname, self.cfg.GetNodeName(node_uuid)))
2170 def _CheckDisksExistence(self, node_uuids):
2171 # Check disk existence
2172 for idx, dev in enumerate(self.instance.disks):
2173 if idx not in self.disks:
2176 for node_uuid in node_uuids:
2177 self.lu.LogInfo("Checking disk/%d on %s", idx,
2178 self.cfg.GetNodeName(node_uuid))
2179 self.cfg.SetDiskID(dev, node_uuid)
2181 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2183 msg = result.fail_msg
2184 if msg or not result.payload:
2186 msg = "disk not found"
2187 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2188 (idx, self.cfg.GetNodeName(node_uuid), msg))
2190 def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2191 for idx, dev in enumerate(self.instance.disks):
2192 if idx not in self.disks:
2195 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2196 (idx, self.cfg.GetNodeName(node_uuid)))
2198 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2199 on_primary, ldisk=ldisk):
2200 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2201 " replace disks for instance %s" %
2202 (self.cfg.GetNodeName(node_uuid),
2203 self.instance.name))
2205 def _CreateNewStorage(self, node_uuid):
2206 """Create new storage on the primary or secondary node.
2208 This is only used for same-node replaces, not for changing the
2209 secondary node, hence we don't want to modify the existing disk.
2214 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2215 for idx, dev in enumerate(disks):
2216 if idx not in self.disks:
2219 self.lu.LogInfo("Adding storage on %s for disk/%d",
2220 self.cfg.GetNodeName(node_uuid), idx)
2222 self.cfg.SetDiskID(dev, node_uuid)
2224 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2225 names = _GenerateUniqueNames(self.lu, lv_names)
2227 (data_disk, meta_disk) = dev.children
2228 vg_data = data_disk.logical_id[0]
2229 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2230 logical_id=(vg_data, names[0]),
2231 params=data_disk.params)
2232 vg_meta = meta_disk.logical_id[0]
2233 lv_meta = objects.Disk(dev_type=constants.LD_LV,
2234 size=constants.DRBD_META_SIZE,
2235 logical_id=(vg_meta, names[1]),
2236 params=meta_disk.params)
2238 new_lvs = [lv_data, lv_meta]
2239 old_lvs = [child.Copy() for child in dev.children]
2240 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2241 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2243 # we pass force_create=True to force the LVM creation
2244 for new_lv in new_lvs:
2246 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2247 GetInstanceInfoText(self.instance), False,
2249 except errors.DeviceCreationError, e:
2250 raise errors.OpExecError("Can't create block device: %s" % e.message)
2254 def _CheckDevices(self, node_uuid, iv_names):
2255 for name, (dev, _, _) in iv_names.iteritems():
2256 self.cfg.SetDiskID(dev, node_uuid)
2258 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2260 msg = result.fail_msg
2261 if msg or not result.payload:
2263 msg = "disk not found"
2264 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2267 if result.payload.is_degraded:
2268 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2270 def _RemoveOldStorage(self, node_uuid, iv_names):
2271 for name, (_, old_lvs, _) in iv_names.iteritems():
2272 self.lu.LogInfo("Remove logical volumes for %s", name)
2275 self.cfg.SetDiskID(lv, node_uuid)
2277 msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2279 self.lu.LogWarning("Can't remove old LV: %s", msg,
2280 hint="remove unused LVs manually")
2282 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2283 """Replace a disk on the primary or secondary for DRBD 8.
2285 The algorithm for replace is quite complicated:
2287 1. for each disk to be replaced:
2289 1. create new LVs on the target node with unique names
2290 1. detach old LVs from the drbd device
2291 1. rename old LVs to name_replaced.<time_t>
2292 1. rename new LVs to old LVs
2293 1. attach the new LVs (with the old names now) to the drbd device
2295 1. wait for sync across all devices
2297 1. for each modified disk:
2299 1. remove old LVs (which have the name name_replaces.<time_t>)
2301 Failures are not very well handled.
2306 # Step: check device activation
2307 self.lu.LogStep(1, steps_total, "Check device existence")
2308 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2309 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2311 # Step: check other node consistency
2312 self.lu.LogStep(2, steps_total, "Check peer consistency")
2313 self._CheckDisksConsistency(
2314 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2317 # Step: create new storage
2318 self.lu.LogStep(3, steps_total, "Allocate new storage")
2319 iv_names = self._CreateNewStorage(self.target_node_uuid)
2321 # Step: for each lv, detach+rename*2+attach
2322 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2323 for dev, old_lvs, new_lvs in iv_names.itervalues():
2324 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2326 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2328 result.Raise("Can't detach drbd from local storage on node"
2329 " %s for device %s" %
2330 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2332 #cfg.Update(instance)
2334 # ok, we created the new LVs, so now we know we have the needed
2335 # storage; as such, we proceed on the target node to rename
2336 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2337 # using the assumption that logical_id == physical_id (which in
2338 # turn is the unique_id on that node)
2340 # FIXME(iustin): use a better name for the replaced LVs
2341 temp_suffix = int(time.time())
2342 ren_fn = lambda d, suff: (d.physical_id[0],
2343 d.physical_id[1] + "_replaced-%s" % suff)
2345 # Build the rename list based on what LVs exist on the node
2346 rename_old_to_new = []
2347 for to_ren in old_lvs:
2348 result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2349 if not result.fail_msg and result.payload:
2351 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2353 self.lu.LogInfo("Renaming the old LVs on the target node")
2354 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2356 result.Raise("Can't rename old LVs on node %s" %
2357 self.cfg.GetNodeName(self.target_node_uuid))
2359 # Now we rename the new LVs to the old LVs
2360 self.lu.LogInfo("Renaming the new LVs on the target node")
2361 rename_new_to_old = [(new, old.physical_id)
2362 for old, new in zip(old_lvs, new_lvs)]
2363 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2365 result.Raise("Can't rename new LVs on node %s" %
2366 self.cfg.GetNodeName(self.target_node_uuid))
2368 # Intermediate steps of in memory modifications
2369 for old, new in zip(old_lvs, new_lvs):
2370 new.logical_id = old.logical_id
2371 self.cfg.SetDiskID(new, self.target_node_uuid)
2373 # We need to modify old_lvs so that removal later removes the
2374 # right LVs, not the newly added ones; note that old_lvs is a
2376 for disk in old_lvs:
2377 disk.logical_id = ren_fn(disk, temp_suffix)
2378 self.cfg.SetDiskID(disk, self.target_node_uuid)
2380 # Now that the new lvs have the old name, we can add them to the device
2381 self.lu.LogInfo("Adding new mirror component on %s",
2382 self.cfg.GetNodeName(self.target_node_uuid))
2383 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2384 (dev, self.instance), new_lvs)
2385 msg = result.fail_msg
2387 for new_lv in new_lvs:
2388 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2391 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2392 hint=("cleanup manually the unused logical"
2394 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2396 cstep = itertools.count(5)
2398 if self.early_release:
2399 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2400 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2401 # TODO: Check if releasing locks early still makes sense
2402 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2404 # Release all resource locks except those used by the instance
2405 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2406 keep=self.node_secondary_ip.keys())
2408 # Release all node locks while waiting for sync
2409 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2411 # TODO: Can the instance lock be downgraded here? Take the optional disk
2412 # shutdown in the caller into consideration.
2415 # This can fail as the old devices are degraded and _WaitForSync
2416 # does a combined result over all disks, so we don't check its return value
2417 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2418 WaitForSync(self.lu, self.instance)
2420 # Check all devices manually
2421 self._CheckDevices(self.instance.primary_node, iv_names)
2423 # Step: remove old storage
2424 if not self.early_release:
2425 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2426 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2428 def _ExecDrbd8Secondary(self, feedback_fn):
2429 """Replace the secondary node for DRBD 8.
2431 The algorithm for replace is quite complicated:
2432 - for all disks of the instance:
2433 - create new LVs on the new node with same names
2434 - shutdown the drbd device on the old secondary
2435 - disconnect the drbd network on the primary
2436 - create the drbd device on the new secondary
2437 - network attach the drbd on the primary, using an artifice:
2438 the drbd code for Attach() will connect to the network if it
2439 finds a device which is connected to the good local disks but
2441 - wait for sync across all devices
2442 - remove all disks from the old secondary
2444 Failures are not very well handled.
2449 pnode = self.instance.primary_node
2451 # Step: check device activation
2452 self.lu.LogStep(1, steps_total, "Check device existence")
2453 self._CheckDisksExistence([self.instance.primary_node])
2454 self._CheckVolumeGroup([self.instance.primary_node])
2456 # Step: check other node consistency
2457 self.lu.LogStep(2, steps_total, "Check peer consistency")
2458 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2460 # Step: create new storage
2461 self.lu.LogStep(3, steps_total, "Allocate new storage")
2462 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2463 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2465 for idx, dev in enumerate(disks):
2466 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2467 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2468 # we pass force_create=True to force LVM creation
2469 for new_lv in dev.children:
2471 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2472 new_lv, True, GetInstanceInfoText(self.instance),
2474 except errors.DeviceCreationError, e:
2475 raise errors.OpExecError("Can't create block device: %s" % e.message)
2477 # Step 4: dbrd minors and drbd setups changes
2478 # after this, we must manually remove the drbd minors on both the
2479 # error and the success paths
2480 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2481 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2482 for _ in self.instance.disks],
2484 logging.debug("Allocated minors %r", minors)
2487 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2488 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2489 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2490 # create new devices on new_node; note that we create two IDs:
2491 # one without port, so the drbd will be activated without
2492 # networking information on the new node at this stage, and one
2493 # with network, for the latter activation in step 4
2494 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2495 if self.instance.primary_node == o_node1:
2498 assert self.instance.primary_node == o_node2, "Three-node instance?"
2501 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2502 p_minor, new_minor, o_secret)
2503 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2504 p_minor, new_minor, o_secret)
2506 iv_names[idx] = (dev, dev.children, new_net_id)
2507 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2509 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2510 logical_id=new_alone_id,
2511 children=dev.children,
2514 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2517 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2519 GetInstanceInfoText(self.instance), False,
2521 except errors.GenericError:
2522 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2525 # We have new devices, shutdown the drbd on the old secondary
2526 for idx, dev in enumerate(self.instance.disks):
2527 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2528 self.cfg.SetDiskID(dev, self.target_node_uuid)
2529 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2530 (dev, self.instance)).fail_msg
2532 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2533 "node: %s" % (idx, msg),
2534 hint=("Please cleanup this device manually as"
2535 " soon as possible"))
2537 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2538 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2539 self.instance.disks)[pnode]
2541 msg = result.fail_msg
2543 # detaches didn't succeed (unlikely)
2544 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2545 raise errors.OpExecError("Can't detach the disks from the network on"
2546 " old node: %s" % (msg,))
2548 # if we managed to detach at least one, we update all the disks of
2549 # the instance to point to the new secondary
2550 self.lu.LogInfo("Updating instance configuration")
2551 for dev, _, new_logical_id in iv_names.itervalues():
2552 dev.logical_id = new_logical_id
2553 self.cfg.SetDiskID(dev, self.instance.primary_node)
2555 self.cfg.Update(self.instance, feedback_fn)
2557 # Release all node locks (the configuration has been updated)
2558 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2560 # and now perform the drbd attach
2561 self.lu.LogInfo("Attaching primary drbds to new secondary"
2562 " (standalone => connected)")
2563 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2564 self.new_node_uuid],
2565 self.node_secondary_ip,
2566 (self.instance.disks, self.instance),
2569 for to_node, to_result in result.items():
2570 msg = to_result.fail_msg
2572 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2573 self.cfg.GetNodeName(to_node), msg,
2574 hint=("please do a gnt-instance info to see the"
2575 " status of disks"))
2577 cstep = itertools.count(5)
2579 if self.early_release:
2580 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2581 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2582 # TODO: Check if releasing locks early still makes sense
2583 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2585 # Release all resource locks except those used by the instance
2586 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2587 keep=self.node_secondary_ip.keys())
2589 # TODO: Can the instance lock be downgraded here? Take the optional disk
2590 # shutdown in the caller into consideration.
2593 # This can fail as the old devices are degraded and _WaitForSync
2594 # does a combined result over all disks, so we don't check its return value
2595 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2596 WaitForSync(self.lu, self.instance)
2598 # Check all devices manually
2599 self._CheckDevices(self.instance.primary_node, iv_names)
2601 # Step: remove old storage
2602 if not self.early_release:
2603 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2604 self._RemoveOldStorage(self.target_node_uuid, iv_names)