4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with storage of instances."""
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import rpc
38 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43 CheckDiskTemplateEnabled
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48 import ganeti.masterd.instance
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59 constants.DT_PLAIN: constants.LD_LV,
60 constants.DT_FILE: constants.LD_FILE,
61 constants.DT_SHARED_FILE: constants.LD_FILE,
62 constants.DT_BLOCK: constants.LD_BLOCKDEV,
63 constants.DT_RBD: constants.LD_RBD,
64 constants.DT_EXT: constants.LD_EXT,
68 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
70 """Create a single block device on a given node.
72 This will not recurse over children of the device, so they must be
75 @param lu: the lu on whose behalf we execute
76 @param node_uuid: the node on which to create the device
77 @type instance: L{objects.Instance}
78 @param instance: the instance which owns the device
79 @type device: L{objects.Disk}
80 @param device: the device to create
81 @param info: the extra 'metadata' we should attach to the device
82 (this will be represented as a LVM tag)
83 @type force_open: boolean
84 @param force_open: this parameter will be passes to the
85 L{backend.BlockdevCreate} function where it specifies
86 whether we run on primary or not, and it affects both
87 the child assembly and the device own Open() execution
88 @type excl_stor: boolean
89 @param excl_stor: Whether exclusive_storage is active for the node
92 lu.cfg.SetDiskID(device, node_uuid)
93 result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
94 instance.name, force_open, info,
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device,
98 lu.cfg.GetNodeName(node_uuid),
100 if device.physical_id is None:
101 device.physical_id = result.payload
104 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
105 info, force_open, excl_stor):
106 """Create a tree of block devices on a given node.
108 If this device type has to be created on secondaries, create it and
111 If not, just recurse to children keeping the same 'force' value.
113 @attention: The device has to be annotated already.
115 @param lu: the lu on whose behalf we execute
116 @param node_uuid: the node on which to create the device
117 @type instance: L{objects.Instance}
118 @param instance: the instance which owns the device
119 @type device: L{objects.Disk}
120 @param device: the device to create
121 @type force_create: boolean
122 @param force_create: whether to force creation of this device; this
123 will be change to True whenever we find a device which has
124 CreateOnSecondary() attribute
125 @param info: the extra 'metadata' we should attach to the device
126 (this will be represented as a LVM tag)
127 @type force_open: boolean
128 @param force_open: this parameter will be passes to the
129 L{backend.BlockdevCreate} function where it specifies
130 whether we run on primary or not, and it affects both
131 the child assembly and the device own Open() execution
132 @type excl_stor: boolean
133 @param excl_stor: Whether exclusive_storage is active for the node
135 @return: list of created devices
139 if device.CreateOnSecondary():
143 for child in device.children:
144 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
145 force_create, info, force_open, excl_stor)
146 created_devices.extend(devs)
149 return created_devices
151 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
153 # The device has been completely created, so there is no point in keeping
154 # its subdevices in the list. We just add the device itself instead.
155 created_devices = [(node_uuid, device)]
156 return created_devices
158 except errors.DeviceCreationError, e:
159 e.created_devices.extend(created_devices)
161 except errors.OpExecError, e:
162 raise errors.DeviceCreationError(str(e), created_devices)
165 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
166 """Whether exclusive_storage is in effect for the given node.
168 @type cfg: L{config.ConfigWriter}
169 @param cfg: The cluster configuration
170 @type node_uuid: string
171 @param node_uuid: The node UUID
173 @return: The effective value of exclusive_storage
174 @raise errors.OpPrereqError: if no node exists with the given name
177 ni = cfg.GetNodeInfo(node_uuid)
179 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
181 return IsExclusiveStorageEnabledNode(cfg, ni)
184 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
186 """Wrapper around L{_CreateBlockDevInner}.
188 This method annotates the root device first.
191 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
193 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
194 force_open, excl_stor)
197 def _UndoCreateDisks(lu, disks_created):
198 """Undo the work performed by L{CreateDisks}.
200 This function is called in case of an error to undo the work of
203 @type lu: L{LogicalUnit}
204 @param lu: the logical unit on whose behalf we execute
205 @param disks_created: the result returned by L{CreateDisks}
208 for (node_uuid, disk) in disks_created:
209 lu.cfg.SetDiskID(disk, node_uuid)
210 result = lu.rpc.call_blockdev_remove(node_uuid, disk)
211 result.Warn("Failed to remove newly-created disk %s on node %s" %
212 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
215 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
216 """Create all disks for an instance.
218 This abstracts away some work from AddInstance.
220 @type lu: L{LogicalUnit}
221 @param lu: the logical unit on whose behalf we execute
222 @type instance: L{objects.Instance}
223 @param instance: the instance whose disks we should create
225 @param to_skip: list of indices to skip
226 @type target_node_uuid: string
227 @param target_node_uuid: if passed, overrides the target node for creation
228 @type disks: list of {objects.Disk}
229 @param disks: the disks to create; if not specified, all the disks of the
231 @return: information about the created disks, to be used to call
233 @raise errors.OpPrereqError: in case of error
236 info = GetInstanceInfoText(instance)
237 if target_node_uuid is None:
238 pnode_uuid = instance.primary_node
239 all_node_uuids = instance.all_nodes
241 pnode_uuid = target_node_uuid
242 all_node_uuids = [pnode_uuid]
245 disks = instance.disks
247 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
249 if instance.disk_template in constants.DTS_FILEBASED:
250 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
251 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
253 result.Raise("Failed to create directory '%s' on"
254 " node %s" % (file_storage_dir,
255 lu.cfg.GetNodeName(pnode_uuid)))
258 for idx, device in enumerate(disks):
259 if to_skip and idx in to_skip:
261 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
262 for node_uuid in all_node_uuids:
263 f_create = node_uuid == pnode_uuid
265 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
267 disks_created.append((node_uuid, device))
268 except errors.DeviceCreationError, e:
269 logging.warning("Creating disk %s for instance '%s' failed",
271 disks_created.extend(e.created_devices)
272 _UndoCreateDisks(lu, disks_created)
273 raise errors.OpExecError(e.message)
277 def ComputeDiskSizePerVG(disk_template, disks):
278 """Compute disk size requirements in the volume group
281 def _compute(disks, payload):
282 """Universal algorithm.
287 vgs[disk[constants.IDISK_VG]] = \
288 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
292 # Required free disk space as a function of disk and swap space
294 constants.DT_DISKLESS: {},
295 constants.DT_PLAIN: _compute(disks, 0),
296 # 128 MB are added for drbd metadata for each disk
297 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
298 constants.DT_FILE: {},
299 constants.DT_SHARED_FILE: {},
302 if disk_template not in req_size_dict:
303 raise errors.ProgrammerError("Disk template '%s' size requirement"
304 " is unknown" % disk_template)
306 return req_size_dict[disk_template]
309 def ComputeDisks(op, default_vg):
310 """Computes the instance disks.
312 @param op: The instance opcode
313 @param default_vg: The default_vg to assume
315 @return: The computed disks
319 for disk in op.disks:
320 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
321 if mode not in constants.DISK_ACCESS_SET:
322 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
323 mode, errors.ECODE_INVAL)
324 size = disk.get(constants.IDISK_SIZE, None)
326 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
329 except (TypeError, ValueError):
330 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
333 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
334 if ext_provider and op.disk_template != constants.DT_EXT:
335 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
336 " disk template, not %s" %
337 (constants.IDISK_PROVIDER, constants.DT_EXT,
338 op.disk_template), errors.ECODE_INVAL)
340 data_vg = disk.get(constants.IDISK_VG, default_vg)
341 name = disk.get(constants.IDISK_NAME, None)
342 if name is not None and name.lower() == constants.VALUE_NONE:
345 constants.IDISK_SIZE: size,
346 constants.IDISK_MODE: mode,
347 constants.IDISK_VG: data_vg,
348 constants.IDISK_NAME: name,
352 constants.IDISK_METAVG,
353 constants.IDISK_ADOPT,
354 constants.IDISK_SPINDLES,
357 new_disk[key] = disk[key]
359 # For extstorage, demand the `provider' option and add any
360 # additional parameters (ext-params) to the dict
361 if op.disk_template == constants.DT_EXT:
363 new_disk[constants.IDISK_PROVIDER] = ext_provider
365 if key not in constants.IDISK_PARAMS:
366 new_disk[key] = disk[key]
368 raise errors.OpPrereqError("Missing provider for template '%s'" %
369 constants.DT_EXT, errors.ECODE_INVAL)
371 disks.append(new_disk)
376 def CheckRADOSFreeSpace():
377 """Compute disk size requirements inside the RADOS cluster.
380 # For the RADOS cluster we assume there is always enough space.
384 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
385 iv_name, p_minor, s_minor):
386 """Generate a drbd8 device complete with its children.
389 assert len(vgnames) == len(names) == 2
390 port = lu.cfg.AllocatePort()
391 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
393 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
394 logical_id=(vgnames[0], names[0]),
396 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397 dev_meta = objects.Disk(dev_type=constants.LD_LV,
398 size=constants.DRBD_META_SIZE,
399 logical_id=(vgnames[1], names[1]),
401 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
402 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
403 logical_id=(primary_uuid, secondary_uuid, port,
406 children=[dev_data, dev_meta],
407 iv_name=iv_name, params={})
408 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
412 def GenerateDiskTemplate(
413 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
414 disk_info, file_storage_dir, file_driver, base_index,
415 feedback_fn, full_disk_params):
416 """Generate the entire disk layout for a given template type.
419 vgname = lu.cfg.GetVGName()
420 disk_count = len(disk_info)
423 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
425 if template_name == constants.DT_DISKLESS:
427 elif template_name == constants.DT_DRBD8:
428 if len(secondary_node_uuids) != 1:
429 raise errors.ProgrammerError("Wrong template configuration")
430 remote_node_uuid = secondary_node_uuids[0]
431 minors = lu.cfg.AllocateDRBDMinor(
432 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
434 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
436 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
439 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
440 for i in range(disk_count)]):
441 names.append(lv_prefix + "_data")
442 names.append(lv_prefix + "_meta")
443 for idx, disk in enumerate(disk_info):
444 disk_index = idx + base_index
445 data_vg = disk.get(constants.IDISK_VG, vgname)
446 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
447 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
448 disk[constants.IDISK_SIZE],
450 names[idx * 2:idx * 2 + 2],
451 "disk/%d" % disk_index,
452 minors[idx * 2], minors[idx * 2 + 1])
453 disk_dev.mode = disk[constants.IDISK_MODE]
454 disk_dev.name = disk.get(constants.IDISK_NAME, None)
455 disks.append(disk_dev)
457 if secondary_node_uuids:
458 raise errors.ProgrammerError("Wrong template configuration")
460 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
461 if name_prefix is None:
464 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
465 (name_prefix, base_index + i)
466 for i in range(disk_count)])
468 if template_name == constants.DT_PLAIN:
470 def logical_id_fn(idx, _, disk):
471 vg = disk.get(constants.IDISK_VG, vgname)
472 return (vg, names[idx])
474 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
476 lambda _, disk_index, disk: (file_driver,
477 "%s/disk%d" % (file_storage_dir,
479 elif template_name == constants.DT_BLOCK:
481 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
482 disk[constants.IDISK_ADOPT])
483 elif template_name == constants.DT_RBD:
484 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
485 elif template_name == constants.DT_EXT:
486 def logical_id_fn(idx, _, disk):
487 provider = disk.get(constants.IDISK_PROVIDER, None)
489 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
490 " not found", constants.DT_EXT,
491 constants.IDISK_PROVIDER)
492 return (provider, names[idx])
494 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
496 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
498 for idx, disk in enumerate(disk_info):
500 # Only for the Ext template add disk_info to params
501 if template_name == constants.DT_EXT:
502 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
504 if key not in constants.IDISK_PARAMS:
505 params[key] = disk[key]
506 disk_index = idx + base_index
507 size = disk[constants.IDISK_SIZE]
508 feedback_fn("* disk %s, size %s" %
509 (disk_index, utils.FormatUnit(size, "h")))
510 disk_dev = objects.Disk(dev_type=dev_type, size=size,
511 logical_id=logical_id_fn(idx, disk_index, disk),
512 iv_name="disk/%d" % disk_index,
513 mode=disk[constants.IDISK_MODE],
515 spindles=disk.get(constants.IDISK_SPINDLES))
516 disk_dev.name = disk.get(constants.IDISK_NAME, None)
517 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
518 disks.append(disk_dev)
523 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
524 """Check the presence of the spindle options with exclusive_storage.
527 @param diskdict: disk parameters
529 @param es_flag: the effective value of the exlusive_storage flag
531 @param required: whether spindles are required or just optional
532 @raise errors.OpPrereqError when spindles are given and they should not
535 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
536 diskdict[constants.IDISK_SPINDLES] is not None):
537 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
538 " when exclusive storage is not active",
540 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
541 diskdict[constants.IDISK_SPINDLES] is None)):
542 raise errors.OpPrereqError("You must specify spindles in instance disks"
543 " when exclusive storage is active",
547 class LUInstanceRecreateDisks(LogicalUnit):
548 """Recreate an instance's missing disks.
551 HPATH = "instance-recreate-disks"
552 HTYPE = constants.HTYPE_INSTANCE
555 _MODIFYABLE = compat.UniqueFrozenset([
556 constants.IDISK_SIZE,
557 constants.IDISK_MODE,
558 constants.IDISK_SPINDLES,
561 # New or changed disk parameters may have different semantics
562 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
563 constants.IDISK_ADOPT,
565 # TODO: Implement support changing VG while recreating
567 constants.IDISK_METAVG,
568 constants.IDISK_PROVIDER,
569 constants.IDISK_NAME,
572 def _RunAllocator(self):
573 """Run the allocator based on input opcode.
576 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
579 # The allocator should actually run in "relocate" mode, but current
580 # allocators don't support relocating all the nodes of an instance at
581 # the same time. As a workaround we use "allocate" mode, but this is
582 # suboptimal for two reasons:
583 # - The instance name passed to the allocator is present in the list of
584 # existing instances, so there could be a conflict within the
585 # internal structures of the allocator. This doesn't happen with the
586 # current allocators, but it's a liability.
587 # - The allocator counts the resources used by the instance twice: once
588 # because the instance exists already, and once because it tries to
589 # allocate a new instance.
590 # The allocator could choose some of the nodes on which the instance is
591 # running, but that's not a problem. If the instance nodes are broken,
592 # they should be already be marked as drained or offline, and hence
593 # skipped by the allocator. If instance disks have been lost for other
594 # reasons, then recreating the disks on the same nodes should be fine.
595 disk_template = self.instance.disk_template
596 spindle_use = be_full[constants.BE_SPINDLE_USE]
598 constants.IDISK_SIZE: d.size,
599 constants.IDISK_MODE: d.mode,
600 constants.IDISK_SPINDLES: d.spindles,
601 } for d in self.instance.disks]
602 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
603 disk_template=disk_template,
604 tags=list(self.instance.GetTags()),
607 vcpus=be_full[constants.BE_VCPUS],
608 memory=be_full[constants.BE_MAXMEM],
609 spindle_use=spindle_use,
611 hypervisor=self.instance.hypervisor,
613 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
615 ial.Run(self.op.iallocator)
617 assert req.RequiredNodes() == len(self.instance.all_nodes)
620 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
621 " %s" % (self.op.iallocator, ial.info),
624 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
625 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
626 self.op.instance_name, self.op.iallocator,
627 utils.CommaJoin(self.op.nodes))
629 def CheckArguments(self):
630 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
631 # Normalize and convert deprecated list of disk indices
632 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
634 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
636 raise errors.OpPrereqError("Some disks have been specified more than"
637 " once: %s" % utils.CommaJoin(duplicates),
640 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
641 # when neither iallocator nor nodes are specified
642 if self.op.iallocator or self.op.nodes:
643 CheckIAllocatorOrNode(self, "iallocator", "nodes")
645 for (idx, params) in self.op.disks:
646 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
647 unsupported = frozenset(params.keys()) - self._MODIFYABLE
649 raise errors.OpPrereqError("Parameters for disk %s try to change"
650 " unmodifyable parameter(s): %s" %
651 (idx, utils.CommaJoin(unsupported)),
654 def ExpandNames(self):
655 self._ExpandAndLockInstance()
656 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
659 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
660 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
662 self.needed_locks[locking.LEVEL_NODE] = []
663 if self.op.iallocator:
664 # iallocator will select a new node in the same group
665 self.needed_locks[locking.LEVEL_NODEGROUP] = []
666 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
668 self.needed_locks[locking.LEVEL_NODE_RES] = []
670 def DeclareLocks(self, level):
671 if level == locking.LEVEL_NODEGROUP:
672 assert self.op.iallocator is not None
673 assert not self.op.nodes
674 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
675 self.share_locks[locking.LEVEL_NODEGROUP] = 1
676 # Lock the primary group used by the instance optimistically; this
677 # requires going via the node before it's locked, requiring
678 # verification later on
679 self.needed_locks[locking.LEVEL_NODEGROUP] = \
680 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
682 elif level == locking.LEVEL_NODE:
683 # If an allocator is used, then we lock all the nodes in the current
684 # instance group, as we don't know yet which ones will be selected;
685 # if we replace the nodes without using an allocator, locks are
686 # already declared in ExpandNames; otherwise, we need to lock all the
687 # instance nodes for disk re-creation
688 if self.op.iallocator:
689 assert not self.op.nodes
690 assert not self.needed_locks[locking.LEVEL_NODE]
691 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
693 # Lock member nodes of the group of the primary node
694 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
695 self.needed_locks[locking.LEVEL_NODE].extend(
696 self.cfg.GetNodeGroup(group_uuid).members)
698 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
699 elif not self.op.nodes:
700 self._LockInstancesNodes(primary_only=False)
701 elif level == locking.LEVEL_NODE_RES:
703 self.needed_locks[locking.LEVEL_NODE_RES] = \
704 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
706 def BuildHooksEnv(self):
709 This runs on master, primary and secondary nodes of the instance.
712 return BuildInstanceHookEnvByObject(self, self.instance)
714 def BuildHooksNodes(self):
715 """Build hooks nodes.
718 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
721 def CheckPrereq(self):
722 """Check prerequisites.
724 This checks that the instance is in the cluster and is not running.
727 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
728 assert instance is not None, \
729 "Cannot retrieve locked instance %s" % self.op.instance_name
730 if self.op.node_uuids:
731 if len(self.op.node_uuids) != len(instance.all_nodes):
732 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
733 " %d replacement nodes were specified" %
734 (instance.name, len(instance.all_nodes),
735 len(self.op.node_uuids)),
737 assert instance.disk_template != constants.DT_DRBD8 or \
738 len(self.op.node_uuids) == 2
739 assert instance.disk_template != constants.DT_PLAIN or \
740 len(self.op.node_uuids) == 1
741 primary_node = self.op.node_uuids[0]
743 primary_node = instance.primary_node
744 if not self.op.iallocator:
745 CheckNodeOnline(self, primary_node)
747 if instance.disk_template == constants.DT_DISKLESS:
748 raise errors.OpPrereqError("Instance '%s' has no disks" %
749 self.op.instance_name, errors.ECODE_INVAL)
751 # Verify if node group locks are still correct
752 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
754 # Node group locks are acquired only for the primary node (and only
755 # when the allocator is used)
756 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
759 # if we replace nodes *and* the old primary is offline, we don't
760 # check the instance state
761 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
762 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
763 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
764 msg="cannot recreate disks")
767 self.disks = dict(self.op.disks)
769 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
771 maxidx = max(self.disks.keys())
772 if maxidx >= len(instance.disks):
773 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
776 if ((self.op.node_uuids or self.op.iallocator) and
777 sorted(self.disks.keys()) != range(len(instance.disks))):
778 raise errors.OpPrereqError("Can't recreate disks partially and"
779 " change the nodes at the same time",
782 self.instance = instance
784 if self.op.iallocator:
786 # Release unneeded node and node resource locks
787 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
788 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
789 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
791 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
793 if self.op.node_uuids:
794 node_uuids = self.op.node_uuids
796 node_uuids = instance.all_nodes
797 excl_stor = compat.any(
798 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
800 for new_params in self.disks.values():
801 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
803 def Exec(self, feedback_fn):
804 """Recreate the disks.
807 assert (self.owned_locks(locking.LEVEL_NODE) ==
808 self.owned_locks(locking.LEVEL_NODE_RES))
811 mods = [] # keeps track of needed changes
813 for idx, disk in enumerate(self.instance.disks):
815 changes = self.disks[idx]
817 # Disk should not be recreated
821 # update secondaries for disks, if needed
822 if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
823 # need to update the nodes and minors
824 assert len(self.op.node_uuids) == 2
825 assert len(disk.logical_id) == 6 # otherwise disk internals
827 (_, _, old_port, _, _, old_secret) = disk.logical_id
828 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
830 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
831 new_minors[0], new_minors[1], old_secret)
832 assert len(disk.logical_id) == len(new_id)
836 mods.append((idx, new_id, changes))
838 # now that we have passed all asserts above, we can apply the mods
839 # in a single run (to avoid partial changes)
840 for idx, new_id, changes in mods:
841 disk = self.instance.disks[idx]
842 if new_id is not None:
843 assert disk.dev_type == constants.LD_DRBD8
844 disk.logical_id = new_id
846 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
847 mode=changes.get(constants.IDISK_MODE, None),
848 spindles=changes.get(constants.IDISK_SPINDLES, None))
850 # change primary node, if needed
851 if self.op.node_uuids:
852 self.instance.primary_node = self.op.node_uuids[0]
853 self.LogWarning("Changing the instance's nodes, you will have to"
854 " remove any disks left on the older nodes manually")
856 if self.op.node_uuids:
857 self.cfg.Update(self.instance, feedback_fn)
859 # All touched nodes must be locked
860 mylocks = self.owned_locks(locking.LEVEL_NODE)
861 assert mylocks.issuperset(frozenset(self.instance.all_nodes))
862 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
864 # TODO: Release node locks before wiping, or explain why it's not possible
865 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
866 wipedisks = [(idx, disk, 0)
867 for (idx, disk) in enumerate(self.instance.disks)
868 if idx not in to_skip]
869 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
873 def _PerformNodeInfoCall(lu, node_uuids, vg):
874 """Prepares the input and performs a node info call.
876 @type lu: C{LogicalUnit}
877 @param lu: a logical unit from which we get configuration data
878 @type node_uuids: list of string
879 @param node_uuids: list of node UUIDs to perform the call for
881 @param vg: the volume group's name
884 lvm_storage_units = [(constants.ST_LVM_VG, vg)]
885 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
887 hvname = lu.cfg.GetHypervisorType()
888 hvparams = lu.cfg.GetClusterInfo().hvparams
889 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
890 [(hvname, hvparams[hvname])])
894 def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
895 """Checks the vg capacity for a given node.
897 @type node_info: tuple (_, list of dicts, _)
898 @param node_info: the result of the node info call for one node
899 @type node_name: string
900 @param node_name: the name of the node
902 @param vg: volume group name
904 @param requested: the amount of disk in MiB to check for
905 @raise errors.OpPrereqError: if the node doesn't have enough disk,
906 or we cannot check the node
909 (_, space_info, _) = node_info
910 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
911 space_info, constants.ST_LVM_VG)
913 raise errors.OpPrereqError("Can't retrieve storage information for LVM")
914 vg_free = lvm_vg_info.get("storage_free", None)
915 if not isinstance(vg_free, int):
916 raise errors.OpPrereqError("Can't compute free disk space on node"
917 " %s for vg %s, result was '%s'" %
918 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
919 if requested > vg_free:
920 raise errors.OpPrereqError("Not enough disk space on target node %s"
921 " vg %s: required %d MiB, available %d MiB" %
922 (node_name, vg, requested, vg_free),
926 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
927 """Checks if nodes have enough free disk space in the specified VG.
929 This function checks if all given nodes have the needed amount of
930 free disk. In case any node has less disk or we cannot get the
931 information from the node, this function raises an OpPrereqError
934 @type lu: C{LogicalUnit}
935 @param lu: a logical unit from which we get configuration data
936 @type node_uuids: C{list}
937 @param node_uuids: the list of node UUIDs to check
939 @param vg: the volume group to check
940 @type requested: C{int}
941 @param requested: the amount of disk in MiB to check for
942 @raise errors.OpPrereqError: if the node doesn't have enough disk,
943 or we cannot check the node
946 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
947 for node in node_uuids:
948 node_name = lu.cfg.GetNodeName(node)
949 info = nodeinfo[node]
950 info.Raise("Cannot get current information from node %s" % node_name,
951 prereq=True, ecode=errors.ECODE_ENVIRON)
952 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
955 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
956 """Checks if nodes have enough free disk space in all the VGs.
958 This function checks if all given nodes have the needed amount of
959 free disk. In case any node has less disk or we cannot get the
960 information from the node, this function raises an OpPrereqError
963 @type lu: C{LogicalUnit}
964 @param lu: a logical unit from which we get configuration data
965 @type node_uuids: C{list}
966 @param node_uuids: the list of node UUIDs to check
967 @type req_sizes: C{dict}
968 @param req_sizes: the hash of vg and corresponding amount of disk in
970 @raise errors.OpPrereqError: if the node doesn't have enough disk,
971 or we cannot check the node
974 for vg, req_size in req_sizes.items():
975 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
978 def _DiskSizeInBytesToMebibytes(lu, size):
979 """Converts a disk size in bytes to mebibytes.
981 Warns and rounds up if the size isn't an even multiple of 1 MiB.
984 (mib, remainder) = divmod(size, 1024 * 1024)
987 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
988 " to not overwrite existing data (%s bytes will not be"
989 " wiped)", (1024 * 1024) - remainder)
995 def _CalcEta(time_taken, written, total_size):
996 """Calculates the ETA based on size written and total size.
998 @param time_taken: The time taken so far
999 @param written: amount written so far
1000 @param total_size: The total size of data to be written
1001 @return: The remaining time in seconds
1004 avg_time = time_taken / float(written)
1005 return (total_size - written) * avg_time
1008 def WipeDisks(lu, instance, disks=None):
1009 """Wipes instance disks.
1011 @type lu: L{LogicalUnit}
1012 @param lu: the logical unit on whose behalf we execute
1013 @type instance: L{objects.Instance}
1014 @param instance: the instance whose disks we should create
1015 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1016 @param disks: Disk details; tuple contains disk index, disk object and the
1020 node_uuid = instance.primary_node
1021 node_name = lu.cfg.GetNodeName(node_uuid)
1024 disks = [(idx, disk, 0)
1025 for (idx, disk) in enumerate(instance.disks)]
1027 for (_, device, _) in disks:
1028 lu.cfg.SetDiskID(device, node_uuid)
1030 logging.info("Pausing synchronization of disks of instance '%s'",
1032 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1033 (map(compat.snd, disks),
1036 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1038 for idx, success in enumerate(result.payload):
1040 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1041 " failed", idx, instance.name)
1044 for (idx, device, offset) in disks:
1045 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1046 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1048 int(min(constants.MAX_WIPE_CHUNK,
1049 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1053 start_time = time.time()
1058 info_text = (" (from %s to %s)" %
1059 (utils.FormatUnit(offset, "h"),
1060 utils.FormatUnit(size, "h")))
1062 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1064 logging.info("Wiping disk %d for instance %s on node %s using"
1065 " chunk size %s", idx, instance.name, node_name,
1068 while offset < size:
1069 wipe_size = min(wipe_chunk_size, size - offset)
1071 logging.debug("Wiping disk %d, offset %s, chunk %s",
1072 idx, offset, wipe_size)
1074 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1076 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1077 (idx, offset, wipe_size))
1081 if now - last_output >= 60:
1082 eta = _CalcEta(now - start_time, offset, size)
1083 lu.LogInfo(" - done: %.1f%% ETA: %s",
1084 offset / float(size) * 100, utils.FormatSeconds(eta))
1087 logging.info("Resuming synchronization of disks for instance '%s'",
1090 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1091 (map(compat.snd, disks),
1096 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1097 node_name, result.fail_msg)
1099 for idx, success in enumerate(result.payload):
1101 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1102 " failed", idx, instance.name)
1105 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1106 """Wrapper for L{WipeDisks} that handles errors.
1108 @type lu: L{LogicalUnit}
1109 @param lu: the logical unit on whose behalf we execute
1110 @type instance: L{objects.Instance}
1111 @param instance: the instance whose disks we should wipe
1112 @param disks: see L{WipeDisks}
1113 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1115 @raise errors.OpPrereqError: in case of failure
1119 WipeDisks(lu, instance, disks=disks)
1120 except errors.OpExecError:
1121 logging.warning("Wiping disks for instance '%s' failed",
1123 _UndoCreateDisks(lu, cleanup)
1127 def ExpandCheckDisks(instance, disks):
1128 """Return the instance disks selected by the disks list
1130 @type disks: list of L{objects.Disk} or None
1131 @param disks: selected disks
1132 @rtype: list of L{objects.Disk}
1133 @return: selected instance disks to act on
1137 return instance.disks
1139 if not set(disks).issubset(instance.disks):
1140 raise errors.ProgrammerError("Can only act on disks belonging to the"
1141 " target instance: expected a subset of %r,"
1142 " got %r" % (instance.disks, disks))
1146 def WaitForSync(lu, instance, disks=None, oneshot=False):
1147 """Sleep and poll for an instance's disk to sync.
1150 if not instance.disks or disks is not None and not disks:
1153 disks = ExpandCheckDisks(instance, disks)
1156 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1158 node_uuid = instance.primary_node
1159 node_name = lu.cfg.GetNodeName(node_uuid)
1162 lu.cfg.SetDiskID(dev, node_uuid)
1164 # TODO: Convert to utils.Retry
1167 degr_retries = 10 # in seconds, as we sleep 1 second each time
1171 cumul_degraded = False
1172 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1173 msg = rstats.fail_msg
1175 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1178 raise errors.RemoteError("Can't contact node %s for mirror data,"
1179 " aborting." % node_name)
1182 rstats = rstats.payload
1184 for i, mstat in enumerate(rstats):
1186 lu.LogWarning("Can't compute data for node %s/%s",
1187 node_name, disks[i].iv_name)
1190 cumul_degraded = (cumul_degraded or
1191 (mstat.is_degraded and mstat.sync_percent is None))
1192 if mstat.sync_percent is not None:
1194 if mstat.estimated_time is not None:
1195 rem_time = ("%s remaining (estimated)" %
1196 utils.FormatSeconds(mstat.estimated_time))
1197 max_time = mstat.estimated_time
1199 rem_time = "no time estimate"
1200 lu.LogInfo("- device %s: %5.2f%% done, %s",
1201 disks[i].iv_name, mstat.sync_percent, rem_time)
1203 # if we're done but degraded, let's do a few small retries, to
1204 # make sure we see a stable and not transient situation; therefore
1205 # we force restart of the loop
1206 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1207 logging.info("Degraded disks found, %d retries left", degr_retries)
1215 time.sleep(min(60, max_time))
1218 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1220 return not cumul_degraded
1223 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1224 """Shutdown block devices of an instance.
1226 This does the shutdown on all nodes of the instance.
1228 If the ignore_primary is false, errors on the primary node are
1232 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1234 disks = ExpandCheckDisks(instance, disks)
1237 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1238 lu.cfg.SetDiskID(top_disk, node_uuid)
1239 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1240 msg = result.fail_msg
1242 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1243 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1244 if ((node_uuid == instance.primary_node and not ignore_primary) or
1245 (node_uuid != instance.primary_node and not result.offline)):
1250 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1251 """Shutdown block devices of an instance.
1253 This function checks if an instance is running, before calling
1254 _ShutdownInstanceDisks.
1257 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1258 ShutdownInstanceDisks(lu, instance, disks=disks)
1261 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1263 """Prepare the block devices for an instance.
1265 This sets up the block devices on all nodes.
1267 @type lu: L{LogicalUnit}
1268 @param lu: the logical unit on whose behalf we execute
1269 @type instance: L{objects.Instance}
1270 @param instance: the instance for whose disks we assemble
1271 @type disks: list of L{objects.Disk} or None
1272 @param disks: which disks to assemble (or all, if None)
1273 @type ignore_secondaries: boolean
1274 @param ignore_secondaries: if true, errors on secondary nodes
1275 won't result in an error return from the function
1276 @type ignore_size: boolean
1277 @param ignore_size: if true, the current known size of the disk
1278 will not be used during the disk activation, useful for cases
1279 when the size is wrong
1280 @return: False if the operation failed, otherwise a list of
1281 (host, instance_visible_name, node_visible_name)
1282 with the mapping from node devices to instance devices
1287 disks = ExpandCheckDisks(instance, disks)
1289 # With the two passes mechanism we try to reduce the window of
1290 # opportunity for the race condition of switching DRBD to primary
1291 # before handshaking occured, but we do not eliminate it
1293 # The proper fix would be to wait (with some limits) until the
1294 # connection has been made and drbd transitions from WFConnection
1295 # into any other network-connected state (Connected, SyncTarget,
1298 # mark instance disks as active before doing actual work, so watcher does
1299 # not try to shut them down erroneously
1300 lu.cfg.MarkInstanceDisksActive(instance.uuid)
1302 # 1st pass, assemble on all nodes in secondary mode
1303 for idx, inst_disk in enumerate(disks):
1304 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1305 instance.primary_node):
1307 node_disk = node_disk.Copy()
1308 node_disk.UnsetSize()
1309 lu.cfg.SetDiskID(node_disk, node_uuid)
1310 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1311 instance.name, False, idx)
1312 msg = result.fail_msg
1314 is_offline_secondary = (node_uuid in instance.secondary_nodes and
1316 lu.LogWarning("Could not prepare block device %s on node %s"
1317 " (is_primary=False, pass=1): %s",
1318 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1319 if not (ignore_secondaries or is_offline_secondary):
1322 # FIXME: race condition on drbd migration to primary
1324 # 2nd pass, do only the primary node
1325 for idx, inst_disk in enumerate(disks):
1328 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1329 instance.primary_node):
1330 if node_uuid != instance.primary_node:
1333 node_disk = node_disk.Copy()
1334 node_disk.UnsetSize()
1335 lu.cfg.SetDiskID(node_disk, node_uuid)
1336 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1337 instance.name, True, idx)
1338 msg = result.fail_msg
1340 lu.LogWarning("Could not prepare block device %s on node %s"
1341 " (is_primary=True, pass=2): %s",
1342 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1345 dev_path = result.payload
1347 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1348 inst_disk.iv_name, dev_path))
1350 # leave the disks configured for the primary node
1351 # this is a workaround that would be fixed better by
1352 # improving the logical/physical id handling
1354 lu.cfg.SetDiskID(disk, instance.primary_node)
1357 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1359 return disks_ok, device_info
1362 def StartInstanceDisks(lu, instance, force):
1363 """Start the disks of an instance.
1366 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1367 ignore_secondaries=force)
1369 ShutdownInstanceDisks(lu, instance)
1370 if force is not None and not force:
1372 hint=("If the message above refers to a secondary node,"
1373 " you can retry the operation using '--force'"))
1374 raise errors.OpExecError("Disk consistency error")
1377 class LUInstanceGrowDisk(LogicalUnit):
1378 """Grow a disk of an instance.
1382 HTYPE = constants.HTYPE_INSTANCE
1385 def ExpandNames(self):
1386 self._ExpandAndLockInstance()
1387 self.needed_locks[locking.LEVEL_NODE] = []
1388 self.needed_locks[locking.LEVEL_NODE_RES] = []
1389 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1390 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1392 def DeclareLocks(self, level):
1393 if level == locking.LEVEL_NODE:
1394 self._LockInstancesNodes()
1395 elif level == locking.LEVEL_NODE_RES:
1397 self.needed_locks[locking.LEVEL_NODE_RES] = \
1398 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1400 def BuildHooksEnv(self):
1403 This runs on the master, the primary and all the secondaries.
1407 "DISK": self.op.disk,
1408 "AMOUNT": self.op.amount,
1409 "ABSOLUTE": self.op.absolute,
1411 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1414 def BuildHooksNodes(self):
1415 """Build hooks nodes.
1418 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1421 def CheckPrereq(self):
1422 """Check prerequisites.
1424 This checks that the instance is in the cluster.
1427 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1428 assert self.instance is not None, \
1429 "Cannot retrieve locked instance %s" % self.op.instance_name
1430 node_uuids = list(self.instance.all_nodes)
1431 for node_uuid in node_uuids:
1432 CheckNodeOnline(self, node_uuid)
1433 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1435 if self.instance.disk_template not in constants.DTS_GROWABLE:
1436 raise errors.OpPrereqError("Instance's disk layout does not support"
1437 " growing", errors.ECODE_INVAL)
1439 self.disk = self.instance.FindDisk(self.op.disk)
1441 if self.op.absolute:
1442 self.target = self.op.amount
1443 self.delta = self.target - self.disk.size
1445 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1446 "current disk size (%s)" %
1447 (utils.FormatUnit(self.target, "h"),
1448 utils.FormatUnit(self.disk.size, "h")),
1451 self.delta = self.op.amount
1452 self.target = self.disk.size + self.delta
1454 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1455 utils.FormatUnit(self.delta, "h"),
1458 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1460 def _CheckDiskSpace(self, node_uuids, req_vgspace):
1461 template = self.instance.disk_template
1462 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1463 not any(self.node_es_flags.values())):
1464 # TODO: check the free disk space for file, when that feature will be
1466 # With exclusive storage we need to do something smarter than just looking
1467 # at free space, which, in the end, is basically a dry run. So we rely on
1468 # the dry run performed in Exec() instead.
1469 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1471 def Exec(self, feedback_fn):
1472 """Execute disk grow.
1475 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1476 assert (self.owned_locks(locking.LEVEL_NODE) ==
1477 self.owned_locks(locking.LEVEL_NODE_RES))
1479 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1481 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1483 raise errors.OpExecError("Cannot activate block device to grow")
1485 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1486 (self.op.disk, self.instance.name,
1487 utils.FormatUnit(self.delta, "h"),
1488 utils.FormatUnit(self.target, "h")))
1490 # First run all grow ops in dry-run mode
1491 for node_uuid in self.instance.all_nodes:
1492 self.cfg.SetDiskID(self.disk, node_uuid)
1493 result = self.rpc.call_blockdev_grow(node_uuid,
1494 (self.disk, self.instance),
1495 self.delta, True, True,
1496 self.node_es_flags[node_uuid])
1497 result.Raise("Dry-run grow request failed to node %s" %
1498 self.cfg.GetNodeName(node_uuid))
1501 # Get disk size from primary node for wiping
1502 self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1503 result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1505 result.Raise("Failed to retrieve disk size from node '%s'" %
1506 self.instance.primary_node)
1508 (disk_dimensions, ) = result.payload
1510 if disk_dimensions is None:
1511 raise errors.OpExecError("Failed to retrieve disk size from primary"
1512 " node '%s'" % self.instance.primary_node)
1513 (disk_size_in_bytes, _) = disk_dimensions
1515 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1517 assert old_disk_size >= self.disk.size, \
1518 ("Retrieved disk size too small (got %s, should be at least %s)" %
1519 (old_disk_size, self.disk.size))
1521 old_disk_size = None
1523 # We know that (as far as we can test) operations across different
1524 # nodes will succeed, time to run it for real on the backing storage
1525 for node_uuid in self.instance.all_nodes:
1526 self.cfg.SetDiskID(self.disk, node_uuid)
1527 result = self.rpc.call_blockdev_grow(node_uuid,
1528 (self.disk, self.instance),
1529 self.delta, False, True,
1530 self.node_es_flags[node_uuid])
1531 result.Raise("Grow request failed to node %s" %
1532 self.cfg.GetNodeName(node_uuid))
1534 # And now execute it for logical storage, on the primary node
1535 node_uuid = self.instance.primary_node
1536 self.cfg.SetDiskID(self.disk, node_uuid)
1537 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1538 self.delta, False, False,
1539 self.node_es_flags[node_uuid])
1540 result.Raise("Grow request failed to node %s" %
1541 self.cfg.GetNodeName(node_uuid))
1543 self.disk.RecordGrow(self.delta)
1544 self.cfg.Update(self.instance, feedback_fn)
1546 # Changes have been recorded, release node lock
1547 ReleaseLocks(self, locking.LEVEL_NODE)
1549 # Downgrade lock while waiting for sync
1550 self.glm.downgrade(locking.LEVEL_INSTANCE)
1552 assert wipe_disks ^ (old_disk_size is None)
1555 assert self.instance.disks[self.op.disk] == self.disk
1557 # Wipe newly added disk space
1558 WipeDisks(self, self.instance,
1559 disks=[(self.op.disk, self.disk, old_disk_size)])
1561 if self.op.wait_for_sync:
1562 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1564 self.LogWarning("Disk syncing has not returned a good status; check"
1566 if not self.instance.disks_active:
1567 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1568 elif not self.instance.disks_active:
1569 self.LogWarning("Not shutting down the disk even if the instance is"
1570 " not supposed to be running because no wait for"
1571 " sync mode was requested")
1573 assert self.owned_locks(locking.LEVEL_NODE_RES)
1574 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1577 class LUInstanceReplaceDisks(LogicalUnit):
1578 """Replace the disks of an instance.
1581 HPATH = "mirrors-replace"
1582 HTYPE = constants.HTYPE_INSTANCE
1585 def CheckArguments(self):
1589 if self.op.mode == constants.REPLACE_DISK_CHG:
1590 if self.op.remote_node is None and self.op.iallocator is None:
1591 raise errors.OpPrereqError("When changing the secondary either an"
1592 " iallocator script must be used or the"
1593 " new node given", errors.ECODE_INVAL)
1595 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1597 elif self.op.remote_node is not None or self.op.iallocator is not None:
1598 # Not replacing the secondary
1599 raise errors.OpPrereqError("The iallocator and new node options can"
1600 " only be used when changing the"
1601 " secondary node", errors.ECODE_INVAL)
1603 def ExpandNames(self):
1604 self._ExpandAndLockInstance()
1606 assert locking.LEVEL_NODE not in self.needed_locks
1607 assert locking.LEVEL_NODE_RES not in self.needed_locks
1608 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1610 assert self.op.iallocator is None or self.op.remote_node is None, \
1611 "Conflicting options"
1613 if self.op.remote_node is not None:
1614 (self.op.remote_node_uuid, self.op.remote_node) = \
1615 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1616 self.op.remote_node)
1618 # Warning: do not remove the locking of the new secondary here
1619 # unless DRBD8Dev.AddChildren is changed to work in parallel;
1620 # currently it doesn't since parallel invocations of
1621 # FindUnusedMinor will conflict
1622 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1623 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1625 self.needed_locks[locking.LEVEL_NODE] = []
1626 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1628 if self.op.iallocator is not None:
1629 # iallocator will select a new node in the same group
1630 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1631 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1633 self.needed_locks[locking.LEVEL_NODE_RES] = []
1635 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1636 self.op.instance_name, self.op.mode,
1637 self.op.iallocator, self.op.remote_node_uuid,
1638 self.op.disks, self.op.early_release,
1639 self.op.ignore_ipolicy)
1641 self.tasklets = [self.replacer]
1643 def DeclareLocks(self, level):
1644 if level == locking.LEVEL_NODEGROUP:
1645 assert self.op.remote_node_uuid is None
1646 assert self.op.iallocator is not None
1647 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1649 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1650 # Lock all groups used by instance optimistically; this requires going
1651 # via the node before it's locked, requiring verification later on
1652 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1653 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1655 elif level == locking.LEVEL_NODE:
1656 if self.op.iallocator is not None:
1657 assert self.op.remote_node_uuid is None
1658 assert not self.needed_locks[locking.LEVEL_NODE]
1659 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1661 # Lock member nodes of all locked groups
1662 self.needed_locks[locking.LEVEL_NODE] = \
1664 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1665 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1667 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1669 self._LockInstancesNodes()
1671 elif level == locking.LEVEL_NODE_RES:
1673 self.needed_locks[locking.LEVEL_NODE_RES] = \
1674 self.needed_locks[locking.LEVEL_NODE]
1676 def BuildHooksEnv(self):
1679 This runs on the master, the primary and all the secondaries.
1682 instance = self.replacer.instance
1684 "MODE": self.op.mode,
1685 "NEW_SECONDARY": self.op.remote_node,
1686 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1688 env.update(BuildInstanceHookEnvByObject(self, instance))
1691 def BuildHooksNodes(self):
1692 """Build hooks nodes.
1695 instance = self.replacer.instance
1697 self.cfg.GetMasterNode(),
1698 instance.primary_node,
1700 if self.op.remote_node_uuid is not None:
1701 nl.append(self.op.remote_node_uuid)
1704 def CheckPrereq(self):
1705 """Check prerequisites.
1708 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1709 self.op.iallocator is None)
1711 # Verify if node group locks are still correct
1712 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1714 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1716 return LogicalUnit.CheckPrereq(self)
1719 class LUInstanceActivateDisks(NoHooksLU):
1720 """Bring up an instance's disks.
1725 def ExpandNames(self):
1726 self._ExpandAndLockInstance()
1727 self.needed_locks[locking.LEVEL_NODE] = []
1728 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1730 def DeclareLocks(self, level):
1731 if level == locking.LEVEL_NODE:
1732 self._LockInstancesNodes()
1734 def CheckPrereq(self):
1735 """Check prerequisites.
1737 This checks that the instance is in the cluster.
1740 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1741 assert self.instance is not None, \
1742 "Cannot retrieve locked instance %s" % self.op.instance_name
1743 CheckNodeOnline(self, self.instance.primary_node)
1745 def Exec(self, feedback_fn):
1746 """Activate the disks.
1749 disks_ok, disks_info = \
1750 AssembleInstanceDisks(self, self.instance,
1751 ignore_size=self.op.ignore_size)
1753 raise errors.OpExecError("Cannot activate block devices")
1755 if self.op.wait_for_sync:
1756 if not WaitForSync(self, self.instance):
1757 self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1758 raise errors.OpExecError("Some disks of the instance are degraded!")
1763 class LUInstanceDeactivateDisks(NoHooksLU):
1764 """Shutdown an instance's disks.
1769 def ExpandNames(self):
1770 self._ExpandAndLockInstance()
1771 self.needed_locks[locking.LEVEL_NODE] = []
1772 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1774 def DeclareLocks(self, level):
1775 if level == locking.LEVEL_NODE:
1776 self._LockInstancesNodes()
1778 def CheckPrereq(self):
1779 """Check prerequisites.
1781 This checks that the instance is in the cluster.
1784 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1785 assert self.instance is not None, \
1786 "Cannot retrieve locked instance %s" % self.op.instance_name
1788 def Exec(self, feedback_fn):
1789 """Deactivate the disks
1793 ShutdownInstanceDisks(self, self.instance)
1795 _SafeShutdownInstanceDisks(self, self.instance)
1798 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1800 """Check that mirrors are not degraded.
1802 @attention: The device has to be annotated already.
1804 The ldisk parameter, if True, will change the test from the
1805 is_degraded attribute (which represents overall non-ok status for
1806 the device(s)) to the ldisk (representing the local storage status).
1809 lu.cfg.SetDiskID(dev, node_uuid)
1813 if on_primary or dev.AssembleOnSecondary():
1814 rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1815 msg = rstats.fail_msg
1817 lu.LogWarning("Can't find disk on node %s: %s",
1818 lu.cfg.GetNodeName(node_uuid), msg)
1820 elif not rstats.payload:
1821 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1825 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1827 result = result and not rstats.payload.is_degraded
1830 for child in dev.children:
1831 result = result and _CheckDiskConsistencyInner(lu, instance, child,
1832 node_uuid, on_primary)
1837 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1838 """Wrapper around L{_CheckDiskConsistencyInner}.
1841 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1842 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1846 def _BlockdevFind(lu, node_uuid, dev, instance):
1847 """Wrapper around call_blockdev_find to annotate diskparams.
1849 @param lu: A reference to the lu object
1850 @param node_uuid: The node to call out
1851 @param dev: The device to find
1852 @param instance: The instance object the device belongs to
1853 @returns The result of the rpc call
1856 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1857 return lu.rpc.call_blockdev_find(node_uuid, disk)
1860 def _GenerateUniqueNames(lu, exts):
1861 """Generate a suitable LV name.
1863 This will generate a logical volume name for the given instance.
1868 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1869 results.append("%s%s" % (new_id, val))
1873 class TLReplaceDisks(Tasklet):
1874 """Replaces disks for an instance.
1876 Note: Locking is not within the scope of this class.
1879 def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1880 remote_node_uuid, disks, early_release, ignore_ipolicy):
1881 """Initializes this class.
1884 Tasklet.__init__(self, lu)
1887 self.instance_uuid = instance_uuid
1888 self.instance_name = instance_name
1890 self.iallocator_name = iallocator_name
1891 self.remote_node_uuid = remote_node_uuid
1893 self.early_release = early_release
1894 self.ignore_ipolicy = ignore_ipolicy
1897 self.instance = None
1898 self.new_node_uuid = None
1899 self.target_node_uuid = None
1900 self.other_node_uuid = None
1901 self.remote_node_info = None
1902 self.node_secondary_ip = None
1905 def _RunAllocator(lu, iallocator_name, instance_uuid,
1906 relocate_from_node_uuids):
1907 """Compute a new secondary node using an IAllocator.
1910 req = iallocator.IAReqRelocate(
1911 inst_uuid=instance_uuid,
1912 relocate_from_node_uuids=list(relocate_from_node_uuids))
1913 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1915 ial.Run(iallocator_name)
1918 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1919 " %s" % (iallocator_name, ial.info),
1922 remote_node_name = ial.result[0]
1923 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1925 if remote_node is None:
1926 raise errors.OpPrereqError("Node %s not found in configuration" %
1927 remote_node_name, errors.ECODE_NOENT)
1929 lu.LogInfo("Selected new secondary for instance '%s': %s",
1930 instance_uuid, remote_node_name)
1932 return remote_node.uuid
1934 def _FindFaultyDisks(self, node_uuid):
1935 """Wrapper for L{FindFaultyInstanceDisks}.
1938 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1941 def _CheckDisksActivated(self, instance):
1942 """Checks if the instance disks are activated.
1944 @param instance: The instance to check disks
1945 @return: True if they are activated, False otherwise
1948 node_uuids = instance.all_nodes
1950 for idx, dev in enumerate(instance.disks):
1951 for node_uuid in node_uuids:
1952 self.lu.LogInfo("Checking disk/%d on %s", idx,
1953 self.cfg.GetNodeName(node_uuid))
1954 self.cfg.SetDiskID(dev, node_uuid)
1956 result = _BlockdevFind(self, node_uuid, dev, instance)
1960 elif result.fail_msg or not result.payload:
1965 def CheckPrereq(self):
1966 """Check prerequisites.
1968 This checks that the instance is in the cluster.
1971 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1972 assert self.instance is not None, \
1973 "Cannot retrieve locked instance %s" % self.instance_name
1975 if self.instance.disk_template != constants.DT_DRBD8:
1976 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1977 " instances", errors.ECODE_INVAL)
1979 if len(self.instance.secondary_nodes) != 1:
1980 raise errors.OpPrereqError("The instance has a strange layout,"
1981 " expected one secondary but found %d" %
1982 len(self.instance.secondary_nodes),
1985 secondary_node_uuid = self.instance.secondary_nodes[0]
1987 if self.iallocator_name is None:
1988 remote_node_uuid = self.remote_node_uuid
1990 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1992 self.instance.secondary_nodes)
1994 if remote_node_uuid is None:
1995 self.remote_node_info = None
1997 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1998 "Remote node '%s' is not locked" % remote_node_uuid
2000 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2001 assert self.remote_node_info is not None, \
2002 "Cannot retrieve locked node %s" % remote_node_uuid
2004 if remote_node_uuid == self.instance.primary_node:
2005 raise errors.OpPrereqError("The specified node is the primary node of"
2006 " the instance", errors.ECODE_INVAL)
2008 if remote_node_uuid == secondary_node_uuid:
2009 raise errors.OpPrereqError("The specified node is already the"
2010 " secondary node of the instance",
2013 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2014 constants.REPLACE_DISK_CHG):
2015 raise errors.OpPrereqError("Cannot specify disks to be replaced",
2018 if self.mode == constants.REPLACE_DISK_AUTO:
2019 if not self._CheckDisksActivated(self.instance):
2020 raise errors.OpPrereqError("Please run activate-disks on instance %s"
2021 " first" % self.instance_name,
2023 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2024 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2026 if faulty_primary and faulty_secondary:
2027 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2028 " one node and can not be repaired"
2029 " automatically" % self.instance_name,
2033 self.disks = faulty_primary
2034 self.target_node_uuid = self.instance.primary_node
2035 self.other_node_uuid = secondary_node_uuid
2036 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2037 elif faulty_secondary:
2038 self.disks = faulty_secondary
2039 self.target_node_uuid = secondary_node_uuid
2040 self.other_node_uuid = self.instance.primary_node
2041 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2047 # Non-automatic modes
2048 if self.mode == constants.REPLACE_DISK_PRI:
2049 self.target_node_uuid = self.instance.primary_node
2050 self.other_node_uuid = secondary_node_uuid
2051 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2053 elif self.mode == constants.REPLACE_DISK_SEC:
2054 self.target_node_uuid = secondary_node_uuid
2055 self.other_node_uuid = self.instance.primary_node
2056 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2058 elif self.mode == constants.REPLACE_DISK_CHG:
2059 self.new_node_uuid = remote_node_uuid
2060 self.other_node_uuid = self.instance.primary_node
2061 self.target_node_uuid = secondary_node_uuid
2062 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2064 CheckNodeNotDrained(self.lu, remote_node_uuid)
2065 CheckNodeVmCapable(self.lu, remote_node_uuid)
2067 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2068 assert old_node_info is not None
2069 if old_node_info.offline and not self.early_release:
2070 # doesn't make sense to delay the release
2071 self.early_release = True
2072 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2073 " early-release mode", secondary_node_uuid)
2076 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2079 # If not specified all disks should be replaced
2081 self.disks = range(len(self.instance.disks))
2083 # TODO: This is ugly, but right now we can't distinguish between internal
2084 # submitted opcode and external one. We should fix that.
2085 if self.remote_node_info:
2086 # We change the node, lets verify it still meets instance policy
2087 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2088 cluster = self.cfg.GetClusterInfo()
2089 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2091 CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2092 self.remote_node_info, self.cfg,
2093 ignore=self.ignore_ipolicy)
2095 for node_uuid in check_nodes:
2096 CheckNodeOnline(self.lu, node_uuid)
2098 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2099 self.other_node_uuid,
2100 self.target_node_uuid]
2101 if node_uuid is not None)
2103 # Release unneeded node and node resource locks
2104 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2105 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2106 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2108 # Release any owned node group
2109 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2111 # Check whether disks are valid
2112 for disk_idx in self.disks:
2113 self.instance.FindDisk(disk_idx)
2115 # Get secondary node IP addresses
2116 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2117 in self.cfg.GetMultiNodeInfo(touched_nodes))
2119 def Exec(self, feedback_fn):
2120 """Execute disk replacement.
2122 This dispatches the disk replacement to the appropriate handler.
2126 # Verify owned locks before starting operation
2127 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2128 assert set(owned_nodes) == set(self.node_secondary_ip), \
2129 ("Incorrect node locks, owning %s, expected %s" %
2130 (owned_nodes, self.node_secondary_ip.keys()))
2131 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2132 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2133 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2135 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2136 assert list(owned_instances) == [self.instance_name], \
2137 "Instance '%s' not locked" % self.instance_name
2139 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2140 "Should not own any node group lock at this point"
2143 feedback_fn("No disks need replacement for instance '%s'" %
2147 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2148 (utils.CommaJoin(self.disks), self.instance.name))
2149 feedback_fn("Current primary node: %s" %
2150 self.cfg.GetNodeName(self.instance.primary_node))
2151 feedback_fn("Current seconary node: %s" %
2152 utils.CommaJoin(self.cfg.GetNodeNames(
2153 self.instance.secondary_nodes)))
2155 activate_disks = not self.instance.disks_active
2157 # Activate the instance disks if we're replacing them on a down instance
2159 StartInstanceDisks(self.lu, self.instance, True)
2162 # Should we replace the secondary node?
2163 if self.new_node_uuid is not None:
2164 fn = self._ExecDrbd8Secondary
2166 fn = self._ExecDrbd8DiskOnly
2168 result = fn(feedback_fn)
2170 # Deactivate the instance disks if we're replacing them on a
2173 _SafeShutdownInstanceDisks(self.lu, self.instance)
2175 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2178 # Verify owned locks
2179 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2180 nodes = frozenset(self.node_secondary_ip)
2181 assert ((self.early_release and not owned_nodes) or
2182 (not self.early_release and not (set(owned_nodes) - nodes))), \
2183 ("Not owning the correct locks, early_release=%s, owned=%r,"
2184 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2188 def _CheckVolumeGroup(self, node_uuids):
2189 self.lu.LogInfo("Checking volume groups")
2191 vgname = self.cfg.GetVGName()
2193 # Make sure volume group exists on all involved nodes
2194 results = self.rpc.call_vg_list(node_uuids)
2196 raise errors.OpExecError("Can't list volume groups on the nodes")
2198 for node_uuid in node_uuids:
2199 res = results[node_uuid]
2200 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2201 if vgname not in res.payload:
2202 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2203 (vgname, self.cfg.GetNodeName(node_uuid)))
2205 def _CheckDisksExistence(self, node_uuids):
2206 # Check disk existence
2207 for idx, dev in enumerate(self.instance.disks):
2208 if idx not in self.disks:
2211 for node_uuid in node_uuids:
2212 self.lu.LogInfo("Checking disk/%d on %s", idx,
2213 self.cfg.GetNodeName(node_uuid))
2214 self.cfg.SetDiskID(dev, node_uuid)
2216 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2218 msg = result.fail_msg
2219 if msg or not result.payload:
2221 msg = "disk not found"
2222 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2223 (idx, self.cfg.GetNodeName(node_uuid), msg))
2225 def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2226 for idx, dev in enumerate(self.instance.disks):
2227 if idx not in self.disks:
2230 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2231 (idx, self.cfg.GetNodeName(node_uuid)))
2233 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2234 on_primary, ldisk=ldisk):
2235 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2236 " replace disks for instance %s" %
2237 (self.cfg.GetNodeName(node_uuid),
2238 self.instance.name))
2240 def _CreateNewStorage(self, node_uuid):
2241 """Create new storage on the primary or secondary node.
2243 This is only used for same-node replaces, not for changing the
2244 secondary node, hence we don't want to modify the existing disk.
2249 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2250 for idx, dev in enumerate(disks):
2251 if idx not in self.disks:
2254 self.lu.LogInfo("Adding storage on %s for disk/%d",
2255 self.cfg.GetNodeName(node_uuid), idx)
2257 self.cfg.SetDiskID(dev, node_uuid)
2259 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2260 names = _GenerateUniqueNames(self.lu, lv_names)
2262 (data_disk, meta_disk) = dev.children
2263 vg_data = data_disk.logical_id[0]
2264 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2265 logical_id=(vg_data, names[0]),
2266 params=data_disk.params)
2267 vg_meta = meta_disk.logical_id[0]
2268 lv_meta = objects.Disk(dev_type=constants.LD_LV,
2269 size=constants.DRBD_META_SIZE,
2270 logical_id=(vg_meta, names[1]),
2271 params=meta_disk.params)
2273 new_lvs = [lv_data, lv_meta]
2274 old_lvs = [child.Copy() for child in dev.children]
2275 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2276 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2278 # we pass force_create=True to force the LVM creation
2279 for new_lv in new_lvs:
2281 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2282 GetInstanceInfoText(self.instance), False,
2284 except errors.DeviceCreationError, e:
2285 raise errors.OpExecError("Can't create block device: %s" % e.message)
2289 def _CheckDevices(self, node_uuid, iv_names):
2290 for name, (dev, _, _) in iv_names.iteritems():
2291 self.cfg.SetDiskID(dev, node_uuid)
2293 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2295 msg = result.fail_msg
2296 if msg or not result.payload:
2298 msg = "disk not found"
2299 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2302 if result.payload.is_degraded:
2303 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2305 def _RemoveOldStorage(self, node_uuid, iv_names):
2306 for name, (_, old_lvs, _) in iv_names.iteritems():
2307 self.lu.LogInfo("Remove logical volumes for %s", name)
2310 self.cfg.SetDiskID(lv, node_uuid)
2312 msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2314 self.lu.LogWarning("Can't remove old LV: %s", msg,
2315 hint="remove unused LVs manually")
2317 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2318 """Replace a disk on the primary or secondary for DRBD 8.
2320 The algorithm for replace is quite complicated:
2322 1. for each disk to be replaced:
2324 1. create new LVs on the target node with unique names
2325 1. detach old LVs from the drbd device
2326 1. rename old LVs to name_replaced.<time_t>
2327 1. rename new LVs to old LVs
2328 1. attach the new LVs (with the old names now) to the drbd device
2330 1. wait for sync across all devices
2332 1. for each modified disk:
2334 1. remove old LVs (which have the name name_replaces.<time_t>)
2336 Failures are not very well handled.
2341 # Step: check device activation
2342 self.lu.LogStep(1, steps_total, "Check device existence")
2343 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2344 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2346 # Step: check other node consistency
2347 self.lu.LogStep(2, steps_total, "Check peer consistency")
2348 self._CheckDisksConsistency(
2349 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2352 # Step: create new storage
2353 self.lu.LogStep(3, steps_total, "Allocate new storage")
2354 iv_names = self._CreateNewStorage(self.target_node_uuid)
2356 # Step: for each lv, detach+rename*2+attach
2357 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2358 for dev, old_lvs, new_lvs in iv_names.itervalues():
2359 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2361 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2363 result.Raise("Can't detach drbd from local storage on node"
2364 " %s for device %s" %
2365 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2367 #cfg.Update(instance)
2369 # ok, we created the new LVs, so now we know we have the needed
2370 # storage; as such, we proceed on the target node to rename
2371 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2372 # using the assumption that logical_id == physical_id (which in
2373 # turn is the unique_id on that node)
2375 # FIXME(iustin): use a better name for the replaced LVs
2376 temp_suffix = int(time.time())
2377 ren_fn = lambda d, suff: (d.physical_id[0],
2378 d.physical_id[1] + "_replaced-%s" % suff)
2380 # Build the rename list based on what LVs exist on the node
2381 rename_old_to_new = []
2382 for to_ren in old_lvs:
2383 result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2384 if not result.fail_msg and result.payload:
2386 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2388 self.lu.LogInfo("Renaming the old LVs on the target node")
2389 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2391 result.Raise("Can't rename old LVs on node %s" %
2392 self.cfg.GetNodeName(self.target_node_uuid))
2394 # Now we rename the new LVs to the old LVs
2395 self.lu.LogInfo("Renaming the new LVs on the target node")
2396 rename_new_to_old = [(new, old.physical_id)
2397 for old, new in zip(old_lvs, new_lvs)]
2398 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2400 result.Raise("Can't rename new LVs on node %s" %
2401 self.cfg.GetNodeName(self.target_node_uuid))
2403 # Intermediate steps of in memory modifications
2404 for old, new in zip(old_lvs, new_lvs):
2405 new.logical_id = old.logical_id
2406 self.cfg.SetDiskID(new, self.target_node_uuid)
2408 # We need to modify old_lvs so that removal later removes the
2409 # right LVs, not the newly added ones; note that old_lvs is a
2411 for disk in old_lvs:
2412 disk.logical_id = ren_fn(disk, temp_suffix)
2413 self.cfg.SetDiskID(disk, self.target_node_uuid)
2415 # Now that the new lvs have the old name, we can add them to the device
2416 self.lu.LogInfo("Adding new mirror component on %s",
2417 self.cfg.GetNodeName(self.target_node_uuid))
2418 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2419 (dev, self.instance), new_lvs)
2420 msg = result.fail_msg
2422 for new_lv in new_lvs:
2423 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2426 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2427 hint=("cleanup manually the unused logical"
2429 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2431 cstep = itertools.count(5)
2433 if self.early_release:
2434 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2435 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2436 # TODO: Check if releasing locks early still makes sense
2437 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2439 # Release all resource locks except those used by the instance
2440 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2441 keep=self.node_secondary_ip.keys())
2443 # Release all node locks while waiting for sync
2444 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2446 # TODO: Can the instance lock be downgraded here? Take the optional disk
2447 # shutdown in the caller into consideration.
2450 # This can fail as the old devices are degraded and _WaitForSync
2451 # does a combined result over all disks, so we don't check its return value
2452 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2453 WaitForSync(self.lu, self.instance)
2455 # Check all devices manually
2456 self._CheckDevices(self.instance.primary_node, iv_names)
2458 # Step: remove old storage
2459 if not self.early_release:
2460 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2461 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2463 def _ExecDrbd8Secondary(self, feedback_fn):
2464 """Replace the secondary node for DRBD 8.
2466 The algorithm for replace is quite complicated:
2467 - for all disks of the instance:
2468 - create new LVs on the new node with same names
2469 - shutdown the drbd device on the old secondary
2470 - disconnect the drbd network on the primary
2471 - create the drbd device on the new secondary
2472 - network attach the drbd on the primary, using an artifice:
2473 the drbd code for Attach() will connect to the network if it
2474 finds a device which is connected to the good local disks but
2476 - wait for sync across all devices
2477 - remove all disks from the old secondary
2479 Failures are not very well handled.
2484 pnode = self.instance.primary_node
2486 # Step: check device activation
2487 self.lu.LogStep(1, steps_total, "Check device existence")
2488 self._CheckDisksExistence([self.instance.primary_node])
2489 self._CheckVolumeGroup([self.instance.primary_node])
2491 # Step: check other node consistency
2492 self.lu.LogStep(2, steps_total, "Check peer consistency")
2493 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2495 # Step: create new storage
2496 self.lu.LogStep(3, steps_total, "Allocate new storage")
2497 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2498 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2500 for idx, dev in enumerate(disks):
2501 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2502 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2503 # we pass force_create=True to force LVM creation
2504 for new_lv in dev.children:
2506 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2507 new_lv, True, GetInstanceInfoText(self.instance),
2509 except errors.DeviceCreationError, e:
2510 raise errors.OpExecError("Can't create block device: %s" % e.message)
2512 # Step 4: dbrd minors and drbd setups changes
2513 # after this, we must manually remove the drbd minors on both the
2514 # error and the success paths
2515 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2516 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2517 for _ in self.instance.disks],
2519 logging.debug("Allocated minors %r", minors)
2522 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2523 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2524 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2525 # create new devices on new_node; note that we create two IDs:
2526 # one without port, so the drbd will be activated without
2527 # networking information on the new node at this stage, and one
2528 # with network, for the latter activation in step 4
2529 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2530 if self.instance.primary_node == o_node1:
2533 assert self.instance.primary_node == o_node2, "Three-node instance?"
2536 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2537 p_minor, new_minor, o_secret)
2538 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2539 p_minor, new_minor, o_secret)
2541 iv_names[idx] = (dev, dev.children, new_net_id)
2542 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2544 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2545 logical_id=new_alone_id,
2546 children=dev.children,
2549 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2552 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2554 GetInstanceInfoText(self.instance), False,
2556 except errors.GenericError:
2557 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2560 # We have new devices, shutdown the drbd on the old secondary
2561 for idx, dev in enumerate(self.instance.disks):
2562 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2563 self.cfg.SetDiskID(dev, self.target_node_uuid)
2564 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2565 (dev, self.instance)).fail_msg
2567 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2568 "node: %s" % (idx, msg),
2569 hint=("Please cleanup this device manually as"
2570 " soon as possible"))
2572 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2573 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2574 self.instance.disks)[pnode]
2576 msg = result.fail_msg
2578 # detaches didn't succeed (unlikely)
2579 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2580 raise errors.OpExecError("Can't detach the disks from the network on"
2581 " old node: %s" % (msg,))
2583 # if we managed to detach at least one, we update all the disks of
2584 # the instance to point to the new secondary
2585 self.lu.LogInfo("Updating instance configuration")
2586 for dev, _, new_logical_id in iv_names.itervalues():
2587 dev.logical_id = new_logical_id
2588 self.cfg.SetDiskID(dev, self.instance.primary_node)
2590 self.cfg.Update(self.instance, feedback_fn)
2592 # Release all node locks (the configuration has been updated)
2593 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2595 # and now perform the drbd attach
2596 self.lu.LogInfo("Attaching primary drbds to new secondary"
2597 " (standalone => connected)")
2598 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2599 self.new_node_uuid],
2600 self.node_secondary_ip,
2601 (self.instance.disks, self.instance),
2604 for to_node, to_result in result.items():
2605 msg = to_result.fail_msg
2607 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2608 self.cfg.GetNodeName(to_node), msg,
2609 hint=("please do a gnt-instance info to see the"
2610 " status of disks"))
2612 cstep = itertools.count(5)
2614 if self.early_release:
2615 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2616 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2617 # TODO: Check if releasing locks early still makes sense
2618 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2620 # Release all resource locks except those used by the instance
2621 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2622 keep=self.node_secondary_ip.keys())
2624 # TODO: Can the instance lock be downgraded here? Take the optional disk
2625 # shutdown in the caller into consideration.
2628 # This can fail as the old devices are degraded and _WaitForSync
2629 # does a combined result over all disks, so we don't check its return value
2630 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2631 WaitForSync(self.lu, self.instance)
2633 # Check all devices manually
2634 self._CheckDevices(self.instance.primary_node, iv_names)
2636 # Step: remove old storage
2637 if not self.early_release:
2638 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2639 self._RemoveOldStorage(self.target_node_uuid, iv_names)