4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with storage of instances."""
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import rpc
38 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43 CheckDiskTemplateEnabled
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48 import ganeti.masterd.instance
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
55 constants.DT_FILE: ".file",
56 constants.DT_SHARED_FILE: ".sharedfile",
60 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
62 """Create a single block device on a given node.
64 This will not recurse over children of the device, so they must be
67 @param lu: the lu on whose behalf we execute
68 @param node_uuid: the node on which to create the device
69 @type instance: L{objects.Instance}
70 @param instance: the instance which owns the device
71 @type device: L{objects.Disk}
72 @param device: the device to create
73 @param info: the extra 'metadata' we should attach to the device
74 (this will be represented as a LVM tag)
75 @type force_open: boolean
76 @param force_open: this parameter will be passes to the
77 L{backend.BlockdevCreate} function where it specifies
78 whether we run on primary or not, and it affects both
79 the child assembly and the device own Open() execution
80 @type excl_stor: boolean
81 @param excl_stor: Whether exclusive_storage is active for the node
84 lu.cfg.SetDiskID(device, node_uuid)
85 result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
86 instance.name, force_open, info,
88 result.Raise("Can't create block device %s on"
89 " node %s for instance %s" % (device,
90 lu.cfg.GetNodeName(node_uuid),
92 if device.physical_id is None:
93 device.physical_id = result.payload
96 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
97 info, force_open, excl_stor):
98 """Create a tree of block devices on a given node.
100 If this device type has to be created on secondaries, create it and
103 If not, just recurse to children keeping the same 'force' value.
105 @attention: The device has to be annotated already.
107 @param lu: the lu on whose behalf we execute
108 @param node_uuid: the node on which to create the device
109 @type instance: L{objects.Instance}
110 @param instance: the instance which owns the device
111 @type device: L{objects.Disk}
112 @param device: the device to create
113 @type force_create: boolean
114 @param force_create: whether to force creation of this device; this
115 will be change to True whenever we find a device which has
116 CreateOnSecondary() attribute
117 @param info: the extra 'metadata' we should attach to the device
118 (this will be represented as a LVM tag)
119 @type force_open: boolean
120 @param force_open: this parameter will be passes to the
121 L{backend.BlockdevCreate} function where it specifies
122 whether we run on primary or not, and it affects both
123 the child assembly and the device own Open() execution
124 @type excl_stor: boolean
125 @param excl_stor: Whether exclusive_storage is active for the node
127 @return: list of created devices
131 if device.CreateOnSecondary():
135 for child in device.children:
136 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
137 force_create, info, force_open, excl_stor)
138 created_devices.extend(devs)
141 return created_devices
143 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
145 # The device has been completely created, so there is no point in keeping
146 # its subdevices in the list. We just add the device itself instead.
147 created_devices = [(node_uuid, device)]
148 return created_devices
150 except errors.DeviceCreationError, e:
151 e.created_devices.extend(created_devices)
153 except errors.OpExecError, e:
154 raise errors.DeviceCreationError(str(e), created_devices)
157 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
158 """Whether exclusive_storage is in effect for the given node.
160 @type cfg: L{config.ConfigWriter}
161 @param cfg: The cluster configuration
162 @type node_uuid: string
163 @param node_uuid: The node UUID
165 @return: The effective value of exclusive_storage
166 @raise errors.OpPrereqError: if no node exists with the given name
169 ni = cfg.GetNodeInfo(node_uuid)
171 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
173 return IsExclusiveStorageEnabledNode(cfg, ni)
176 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
178 """Wrapper around L{_CreateBlockDevInner}.
180 This method annotates the root device first.
183 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
184 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
185 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
186 force_open, excl_stor)
189 def _UndoCreateDisks(lu, disks_created):
190 """Undo the work performed by L{CreateDisks}.
192 This function is called in case of an error to undo the work of
195 @type lu: L{LogicalUnit}
196 @param lu: the logical unit on whose behalf we execute
197 @param disks_created: the result returned by L{CreateDisks}
200 for (node_uuid, disk) in disks_created:
201 lu.cfg.SetDiskID(disk, node_uuid)
202 result = lu.rpc.call_blockdev_remove(node_uuid, disk)
203 result.Warn("Failed to remove newly-created disk %s on node %s" %
204 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
207 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
208 """Create all disks for an instance.
210 This abstracts away some work from AddInstance.
212 @type lu: L{LogicalUnit}
213 @param lu: the logical unit on whose behalf we execute
214 @type instance: L{objects.Instance}
215 @param instance: the instance whose disks we should create
217 @param to_skip: list of indices to skip
218 @type target_node_uuid: string
219 @param target_node_uuid: if passed, overrides the target node for creation
220 @type disks: list of {objects.Disk}
221 @param disks: the disks to create; if not specified, all the disks of the
223 @return: information about the created disks, to be used to call
225 @raise errors.OpPrereqError: in case of error
228 info = GetInstanceInfoText(instance)
229 if target_node_uuid is None:
230 pnode_uuid = instance.primary_node
231 all_node_uuids = instance.all_nodes
233 pnode_uuid = target_node_uuid
234 all_node_uuids = [pnode_uuid]
237 disks = instance.disks
239 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
241 if instance.disk_template in constants.DTS_FILEBASED:
242 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
243 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
245 result.Raise("Failed to create directory '%s' on"
246 " node %s" % (file_storage_dir,
247 lu.cfg.GetNodeName(pnode_uuid)))
250 for idx, device in enumerate(disks):
251 if to_skip and idx in to_skip:
253 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
254 for node_uuid in all_node_uuids:
255 f_create = node_uuid == pnode_uuid
257 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
259 disks_created.append((node_uuid, device))
260 except errors.DeviceCreationError, e:
261 logging.warning("Creating disk %s for instance '%s' failed",
263 disks_created.extend(e.created_devices)
264 _UndoCreateDisks(lu, disks_created)
265 raise errors.OpExecError(e.message)
269 def ComputeDiskSizePerVG(disk_template, disks):
270 """Compute disk size requirements in the volume group
273 def _compute(disks, payload):
274 """Universal algorithm.
279 vgs[disk[constants.IDISK_VG]] = \
280 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284 # Required free disk space as a function of disk and swap space
286 constants.DT_DISKLESS: {},
287 constants.DT_PLAIN: _compute(disks, 0),
288 # 128 MB are added for drbd metadata for each disk
289 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
290 constants.DT_FILE: {},
291 constants.DT_SHARED_FILE: {},
294 if disk_template not in req_size_dict:
295 raise errors.ProgrammerError("Disk template '%s' size requirement"
296 " is unknown" % disk_template)
298 return req_size_dict[disk_template]
301 def ComputeDisks(op, default_vg):
302 """Computes the instance disks.
304 @param op: The instance opcode
305 @param default_vg: The default_vg to assume
307 @return: The computed disks
311 for disk in op.disks:
312 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
313 if mode not in constants.DISK_ACCESS_SET:
314 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
315 mode, errors.ECODE_INVAL)
316 size = disk.get(constants.IDISK_SIZE, None)
318 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
321 except (TypeError, ValueError):
322 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
325 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
326 if ext_provider and op.disk_template != constants.DT_EXT:
327 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
328 " disk template, not %s" %
329 (constants.IDISK_PROVIDER, constants.DT_EXT,
330 op.disk_template), errors.ECODE_INVAL)
332 data_vg = disk.get(constants.IDISK_VG, default_vg)
333 name = disk.get(constants.IDISK_NAME, None)
334 if name is not None and name.lower() == constants.VALUE_NONE:
337 constants.IDISK_SIZE: size,
338 constants.IDISK_MODE: mode,
339 constants.IDISK_VG: data_vg,
340 constants.IDISK_NAME: name,
344 constants.IDISK_METAVG,
345 constants.IDISK_ADOPT,
346 constants.IDISK_SPINDLES,
349 new_disk[key] = disk[key]
351 # For extstorage, demand the `provider' option and add any
352 # additional parameters (ext-params) to the dict
353 if op.disk_template == constants.DT_EXT:
355 new_disk[constants.IDISK_PROVIDER] = ext_provider
357 if key not in constants.IDISK_PARAMS:
358 new_disk[key] = disk[key]
360 raise errors.OpPrereqError("Missing provider for template '%s'" %
361 constants.DT_EXT, errors.ECODE_INVAL)
363 disks.append(new_disk)
368 def CheckRADOSFreeSpace():
369 """Compute disk size requirements inside the RADOS cluster.
372 # For the RADOS cluster we assume there is always enough space.
376 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
377 iv_name, p_minor, s_minor):
378 """Generate a drbd8 device complete with its children.
381 assert len(vgnames) == len(names) == 2
382 port = lu.cfg.AllocatePort()
383 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
385 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
386 logical_id=(vgnames[0], names[0]),
388 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
390 size=constants.DRBD_META_SIZE,
391 logical_id=(vgnames[1], names[1]),
393 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
395 logical_id=(primary_uuid, secondary_uuid, port,
398 children=[dev_data, dev_meta],
399 iv_name=iv_name, params={})
400 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404 def GenerateDiskTemplate(
405 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
406 disk_info, file_storage_dir, file_driver, base_index,
407 feedback_fn, full_disk_params):
408 """Generate the entire disk layout for a given template type.
411 vgname = lu.cfg.GetVGName()
412 disk_count = len(disk_info)
415 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
417 if template_name == constants.DT_DISKLESS:
419 elif template_name == constants.DT_DRBD8:
420 if len(secondary_node_uuids) != 1:
421 raise errors.ProgrammerError("Wrong template configuration")
422 remote_node_uuid = secondary_node_uuids[0]
423 minors = lu.cfg.AllocateDRBDMinor(
424 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
426 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
428 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
432 for i in range(disk_count)]):
433 names.append(lv_prefix + "_data")
434 names.append(lv_prefix + "_meta")
435 for idx, disk in enumerate(disk_info):
436 disk_index = idx + base_index
437 data_vg = disk.get(constants.IDISK_VG, vgname)
438 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
439 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
440 disk[constants.IDISK_SIZE],
442 names[idx * 2:idx * 2 + 2],
443 "disk/%d" % disk_index,
444 minors[idx * 2], minors[idx * 2 + 1])
445 disk_dev.mode = disk[constants.IDISK_MODE]
446 disk_dev.name = disk.get(constants.IDISK_NAME, None)
447 disks.append(disk_dev)
449 if secondary_node_uuids:
450 raise errors.ProgrammerError("Wrong template configuration")
452 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
453 if name_prefix is None:
456 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
457 (name_prefix, base_index + i)
458 for i in range(disk_count)])
460 if template_name == constants.DT_PLAIN:
462 def logical_id_fn(idx, _, disk):
463 vg = disk.get(constants.IDISK_VG, vgname)
464 return (vg, names[idx])
466 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
468 lambda _, disk_index, disk: (file_driver,
469 "%s/%s" % (file_storage_dir,
471 elif template_name == constants.DT_BLOCK:
473 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
474 disk[constants.IDISK_ADOPT])
475 elif template_name == constants.DT_RBD:
476 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
477 elif template_name == constants.DT_EXT:
478 def logical_id_fn(idx, _, disk):
479 provider = disk.get(constants.IDISK_PROVIDER, None)
481 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
482 " not found", constants.DT_EXT,
483 constants.IDISK_PROVIDER)
484 return (provider, names[idx])
486 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
488 dev_type = template_name
490 for idx, disk in enumerate(disk_info):
492 # Only for the Ext template add disk_info to params
493 if template_name == constants.DT_EXT:
494 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
496 if key not in constants.IDISK_PARAMS:
497 params[key] = disk[key]
498 disk_index = idx + base_index
499 size = disk[constants.IDISK_SIZE]
500 feedback_fn("* disk %s, size %s" %
501 (disk_index, utils.FormatUnit(size, "h")))
502 disk_dev = objects.Disk(dev_type=dev_type, size=size,
503 logical_id=logical_id_fn(idx, disk_index, disk),
504 iv_name="disk/%d" % disk_index,
505 mode=disk[constants.IDISK_MODE],
507 spindles=disk.get(constants.IDISK_SPINDLES))
508 disk_dev.name = disk.get(constants.IDISK_NAME, None)
509 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
510 disks.append(disk_dev)
515 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
516 """Check the presence of the spindle options with exclusive_storage.
519 @param diskdict: disk parameters
521 @param es_flag: the effective value of the exlusive_storage flag
523 @param required: whether spindles are required or just optional
524 @raise errors.OpPrereqError when spindles are given and they should not
527 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
528 diskdict[constants.IDISK_SPINDLES] is not None):
529 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
530 " when exclusive storage is not active",
532 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
533 diskdict[constants.IDISK_SPINDLES] is None)):
534 raise errors.OpPrereqError("You must specify spindles in instance disks"
535 " when exclusive storage is active",
539 class LUInstanceRecreateDisks(LogicalUnit):
540 """Recreate an instance's missing disks.
543 HPATH = "instance-recreate-disks"
544 HTYPE = constants.HTYPE_INSTANCE
547 _MODIFYABLE = compat.UniqueFrozenset([
548 constants.IDISK_SIZE,
549 constants.IDISK_MODE,
550 constants.IDISK_SPINDLES,
553 # New or changed disk parameters may have different semantics
554 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555 constants.IDISK_ADOPT,
557 # TODO: Implement support changing VG while recreating
559 constants.IDISK_METAVG,
560 constants.IDISK_PROVIDER,
561 constants.IDISK_NAME,
564 def _RunAllocator(self):
565 """Run the allocator based on input opcode.
568 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
571 # The allocator should actually run in "relocate" mode, but current
572 # allocators don't support relocating all the nodes of an instance at
573 # the same time. As a workaround we use "allocate" mode, but this is
574 # suboptimal for two reasons:
575 # - The instance name passed to the allocator is present in the list of
576 # existing instances, so there could be a conflict within the
577 # internal structures of the allocator. This doesn't happen with the
578 # current allocators, but it's a liability.
579 # - The allocator counts the resources used by the instance twice: once
580 # because the instance exists already, and once because it tries to
581 # allocate a new instance.
582 # The allocator could choose some of the nodes on which the instance is
583 # running, but that's not a problem. If the instance nodes are broken,
584 # they should be already be marked as drained or offline, and hence
585 # skipped by the allocator. If instance disks have been lost for other
586 # reasons, then recreating the disks on the same nodes should be fine.
587 disk_template = self.instance.disk_template
588 spindle_use = be_full[constants.BE_SPINDLE_USE]
590 constants.IDISK_SIZE: d.size,
591 constants.IDISK_MODE: d.mode,
592 constants.IDISK_SPINDLES: d.spindles,
593 } for d in self.instance.disks]
594 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
595 disk_template=disk_template,
596 tags=list(self.instance.GetTags()),
599 vcpus=be_full[constants.BE_VCPUS],
600 memory=be_full[constants.BE_MAXMEM],
601 spindle_use=spindle_use,
603 hypervisor=self.instance.hypervisor,
605 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
607 ial.Run(self.op.iallocator)
609 assert req.RequiredNodes() == len(self.instance.all_nodes)
612 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
613 " %s" % (self.op.iallocator, ial.info),
616 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
617 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
618 self.op.instance_name, self.op.iallocator,
619 utils.CommaJoin(self.op.nodes))
621 def CheckArguments(self):
622 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
623 # Normalize and convert deprecated list of disk indices
624 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
626 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
628 raise errors.OpPrereqError("Some disks have been specified more than"
629 " once: %s" % utils.CommaJoin(duplicates),
632 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
633 # when neither iallocator nor nodes are specified
634 if self.op.iallocator or self.op.nodes:
635 CheckIAllocatorOrNode(self, "iallocator", "nodes")
637 for (idx, params) in self.op.disks:
638 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
639 unsupported = frozenset(params.keys()) - self._MODIFYABLE
641 raise errors.OpPrereqError("Parameters for disk %s try to change"
642 " unmodifyable parameter(s): %s" %
643 (idx, utils.CommaJoin(unsupported)),
646 def ExpandNames(self):
647 self._ExpandAndLockInstance()
648 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
651 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
652 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
654 self.needed_locks[locking.LEVEL_NODE] = []
655 if self.op.iallocator:
656 # iallocator will select a new node in the same group
657 self.needed_locks[locking.LEVEL_NODEGROUP] = []
658 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
660 self.needed_locks[locking.LEVEL_NODE_RES] = []
662 def DeclareLocks(self, level):
663 if level == locking.LEVEL_NODEGROUP:
664 assert self.op.iallocator is not None
665 assert not self.op.nodes
666 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
667 self.share_locks[locking.LEVEL_NODEGROUP] = 1
668 # Lock the primary group used by the instance optimistically; this
669 # requires going via the node before it's locked, requiring
670 # verification later on
671 self.needed_locks[locking.LEVEL_NODEGROUP] = \
672 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
674 elif level == locking.LEVEL_NODE:
675 # If an allocator is used, then we lock all the nodes in the current
676 # instance group, as we don't know yet which ones will be selected;
677 # if we replace the nodes without using an allocator, locks are
678 # already declared in ExpandNames; otherwise, we need to lock all the
679 # instance nodes for disk re-creation
680 if self.op.iallocator:
681 assert not self.op.nodes
682 assert not self.needed_locks[locking.LEVEL_NODE]
683 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
685 # Lock member nodes of the group of the primary node
686 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
687 self.needed_locks[locking.LEVEL_NODE].extend(
688 self.cfg.GetNodeGroup(group_uuid).members)
690 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
691 elif not self.op.nodes:
692 self._LockInstancesNodes(primary_only=False)
693 elif level == locking.LEVEL_NODE_RES:
695 self.needed_locks[locking.LEVEL_NODE_RES] = \
696 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
698 def BuildHooksEnv(self):
701 This runs on master, primary and secondary nodes of the instance.
704 return BuildInstanceHookEnvByObject(self, self.instance)
706 def BuildHooksNodes(self):
707 """Build hooks nodes.
710 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
713 def CheckPrereq(self):
714 """Check prerequisites.
716 This checks that the instance is in the cluster and is not running.
719 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
720 assert instance is not None, \
721 "Cannot retrieve locked instance %s" % self.op.instance_name
722 if self.op.node_uuids:
723 if len(self.op.node_uuids) != len(instance.all_nodes):
724 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
725 " %d replacement nodes were specified" %
726 (instance.name, len(instance.all_nodes),
727 len(self.op.node_uuids)),
729 assert instance.disk_template != constants.DT_DRBD8 or \
730 len(self.op.node_uuids) == 2
731 assert instance.disk_template != constants.DT_PLAIN or \
732 len(self.op.node_uuids) == 1
733 primary_node = self.op.node_uuids[0]
735 primary_node = instance.primary_node
736 if not self.op.iallocator:
737 CheckNodeOnline(self, primary_node)
739 if instance.disk_template == constants.DT_DISKLESS:
740 raise errors.OpPrereqError("Instance '%s' has no disks" %
741 self.op.instance_name, errors.ECODE_INVAL)
743 # Verify if node group locks are still correct
744 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
746 # Node group locks are acquired only for the primary node (and only
747 # when the allocator is used)
748 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
751 # if we replace nodes *and* the old primary is offline, we don't
752 # check the instance state
753 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
754 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
755 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
756 msg="cannot recreate disks")
759 self.disks = dict(self.op.disks)
761 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
763 maxidx = max(self.disks.keys())
764 if maxidx >= len(instance.disks):
765 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
768 if ((self.op.node_uuids or self.op.iallocator) and
769 sorted(self.disks.keys()) != range(len(instance.disks))):
770 raise errors.OpPrereqError("Can't recreate disks partially and"
771 " change the nodes at the same time",
774 self.instance = instance
776 if self.op.iallocator:
778 # Release unneeded node and node resource locks
779 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
780 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
781 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
783 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
785 if self.op.node_uuids:
786 node_uuids = self.op.node_uuids
788 node_uuids = instance.all_nodes
789 excl_stor = compat.any(
790 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
792 for new_params in self.disks.values():
793 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
795 def Exec(self, feedback_fn):
796 """Recreate the disks.
799 assert (self.owned_locks(locking.LEVEL_NODE) ==
800 self.owned_locks(locking.LEVEL_NODE_RES))
803 mods = [] # keeps track of needed changes
805 for idx, disk in enumerate(self.instance.disks):
807 changes = self.disks[idx]
809 # Disk should not be recreated
813 # update secondaries for disks, if needed
814 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
815 # need to update the nodes and minors
816 assert len(self.op.node_uuids) == 2
817 assert len(disk.logical_id) == 6 # otherwise disk internals
819 (_, _, old_port, _, _, old_secret) = disk.logical_id
820 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
822 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
823 new_minors[0], new_minors[1], old_secret)
824 assert len(disk.logical_id) == len(new_id)
828 mods.append((idx, new_id, changes))
830 # now that we have passed all asserts above, we can apply the mods
831 # in a single run (to avoid partial changes)
832 for idx, new_id, changes in mods:
833 disk = self.instance.disks[idx]
834 if new_id is not None:
835 assert disk.dev_type == constants.DT_DRBD8
836 disk.logical_id = new_id
838 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
839 mode=changes.get(constants.IDISK_MODE, None),
840 spindles=changes.get(constants.IDISK_SPINDLES, None))
842 # change primary node, if needed
843 if self.op.node_uuids:
844 self.instance.primary_node = self.op.node_uuids[0]
845 self.LogWarning("Changing the instance's nodes, you will have to"
846 " remove any disks left on the older nodes manually")
848 if self.op.node_uuids:
849 self.cfg.Update(self.instance, feedback_fn)
851 # All touched nodes must be locked
852 mylocks = self.owned_locks(locking.LEVEL_NODE)
853 assert mylocks.issuperset(frozenset(self.instance.all_nodes))
854 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
856 # TODO: Release node locks before wiping, or explain why it's not possible
857 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
858 wipedisks = [(idx, disk, 0)
859 for (idx, disk) in enumerate(self.instance.disks)
860 if idx not in to_skip]
861 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
865 def _PerformNodeInfoCall(lu, node_uuids, vg):
866 """Prepares the input and performs a node info call.
868 @type lu: C{LogicalUnit}
869 @param lu: a logical unit from which we get configuration data
870 @type node_uuids: list of string
871 @param node_uuids: list of node UUIDs to perform the call for
873 @param vg: the volume group's name
876 lvm_storage_units = [(constants.ST_LVM_VG, vg)]
877 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
879 hvname = lu.cfg.GetHypervisorType()
880 hvparams = lu.cfg.GetClusterInfo().hvparams
881 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
882 [(hvname, hvparams[hvname])])
886 def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
887 """Checks the vg capacity for a given node.
889 @type node_info: tuple (_, list of dicts, _)
890 @param node_info: the result of the node info call for one node
891 @type node_name: string
892 @param node_name: the name of the node
894 @param vg: volume group name
896 @param requested: the amount of disk in MiB to check for
897 @raise errors.OpPrereqError: if the node doesn't have enough disk,
898 or we cannot check the node
901 (_, space_info, _) = node_info
902 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
903 space_info, constants.ST_LVM_VG)
905 raise errors.OpPrereqError("Can't retrieve storage information for LVM")
906 vg_free = lvm_vg_info.get("storage_free", None)
907 if not isinstance(vg_free, int):
908 raise errors.OpPrereqError("Can't compute free disk space on node"
909 " %s for vg %s, result was '%s'" %
910 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
911 if requested > vg_free:
912 raise errors.OpPrereqError("Not enough disk space on target node %s"
913 " vg %s: required %d MiB, available %d MiB" %
914 (node_name, vg, requested, vg_free),
918 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
919 """Checks if nodes have enough free disk space in the specified VG.
921 This function checks if all given nodes have the needed amount of
922 free disk. In case any node has less disk or we cannot get the
923 information from the node, this function raises an OpPrereqError
926 @type lu: C{LogicalUnit}
927 @param lu: a logical unit from which we get configuration data
928 @type node_uuids: C{list}
929 @param node_uuids: the list of node UUIDs to check
931 @param vg: the volume group to check
932 @type requested: C{int}
933 @param requested: the amount of disk in MiB to check for
934 @raise errors.OpPrereqError: if the node doesn't have enough disk,
935 or we cannot check the node
938 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
939 for node in node_uuids:
940 node_name = lu.cfg.GetNodeName(node)
941 info = nodeinfo[node]
942 info.Raise("Cannot get current information from node %s" % node_name,
943 prereq=True, ecode=errors.ECODE_ENVIRON)
944 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
947 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
948 """Checks if nodes have enough free disk space in all the VGs.
950 This function checks if all given nodes have the needed amount of
951 free disk. In case any node has less disk or we cannot get the
952 information from the node, this function raises an OpPrereqError
955 @type lu: C{LogicalUnit}
956 @param lu: a logical unit from which we get configuration data
957 @type node_uuids: C{list}
958 @param node_uuids: the list of node UUIDs to check
959 @type req_sizes: C{dict}
960 @param req_sizes: the hash of vg and corresponding amount of disk in
962 @raise errors.OpPrereqError: if the node doesn't have enough disk,
963 or we cannot check the node
966 for vg, req_size in req_sizes.items():
967 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
970 def _DiskSizeInBytesToMebibytes(lu, size):
971 """Converts a disk size in bytes to mebibytes.
973 Warns and rounds up if the size isn't an even multiple of 1 MiB.
976 (mib, remainder) = divmod(size, 1024 * 1024)
979 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
980 " to not overwrite existing data (%s bytes will not be"
981 " wiped)", (1024 * 1024) - remainder)
987 def _CalcEta(time_taken, written, total_size):
988 """Calculates the ETA based on size written and total size.
990 @param time_taken: The time taken so far
991 @param written: amount written so far
992 @param total_size: The total size of data to be written
993 @return: The remaining time in seconds
996 avg_time = time_taken / float(written)
997 return (total_size - written) * avg_time
1000 def WipeDisks(lu, instance, disks=None):
1001 """Wipes instance disks.
1003 @type lu: L{LogicalUnit}
1004 @param lu: the logical unit on whose behalf we execute
1005 @type instance: L{objects.Instance}
1006 @param instance: the instance whose disks we should create
1007 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1008 @param disks: Disk details; tuple contains disk index, disk object and the
1012 node_uuid = instance.primary_node
1013 node_name = lu.cfg.GetNodeName(node_uuid)
1016 disks = [(idx, disk, 0)
1017 for (idx, disk) in enumerate(instance.disks)]
1019 for (_, device, _) in disks:
1020 lu.cfg.SetDiskID(device, node_uuid)
1022 logging.info("Pausing synchronization of disks of instance '%s'",
1024 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1025 (map(compat.snd, disks),
1028 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1030 for idx, success in enumerate(result.payload):
1032 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1033 " failed", idx, instance.name)
1036 for (idx, device, offset) in disks:
1037 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1038 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1040 int(min(constants.MAX_WIPE_CHUNK,
1041 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1045 start_time = time.time()
1050 info_text = (" (from %s to %s)" %
1051 (utils.FormatUnit(offset, "h"),
1052 utils.FormatUnit(size, "h")))
1054 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1056 logging.info("Wiping disk %d for instance %s on node %s using"
1057 " chunk size %s", idx, instance.name, node_name,
1060 while offset < size:
1061 wipe_size = min(wipe_chunk_size, size - offset)
1063 logging.debug("Wiping disk %d, offset %s, chunk %s",
1064 idx, offset, wipe_size)
1066 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1068 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1069 (idx, offset, wipe_size))
1073 if now - last_output >= 60:
1074 eta = _CalcEta(now - start_time, offset, size)
1075 lu.LogInfo(" - done: %.1f%% ETA: %s",
1076 offset / float(size) * 100, utils.FormatSeconds(eta))
1079 logging.info("Resuming synchronization of disks for instance '%s'",
1082 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1083 (map(compat.snd, disks),
1088 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1089 node_name, result.fail_msg)
1091 for idx, success in enumerate(result.payload):
1093 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1094 " failed", idx, instance.name)
1097 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1098 """Wrapper for L{WipeDisks} that handles errors.
1100 @type lu: L{LogicalUnit}
1101 @param lu: the logical unit on whose behalf we execute
1102 @type instance: L{objects.Instance}
1103 @param instance: the instance whose disks we should wipe
1104 @param disks: see L{WipeDisks}
1105 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1107 @raise errors.OpPrereqError: in case of failure
1111 WipeDisks(lu, instance, disks=disks)
1112 except errors.OpExecError:
1113 logging.warning("Wiping disks for instance '%s' failed",
1115 _UndoCreateDisks(lu, cleanup)
1119 def ExpandCheckDisks(instance, disks):
1120 """Return the instance disks selected by the disks list
1122 @type disks: list of L{objects.Disk} or None
1123 @param disks: selected disks
1124 @rtype: list of L{objects.Disk}
1125 @return: selected instance disks to act on
1129 return instance.disks
1131 if not set(disks).issubset(instance.disks):
1132 raise errors.ProgrammerError("Can only act on disks belonging to the"
1133 " target instance: expected a subset of %r,"
1134 " got %r" % (instance.disks, disks))
1138 def WaitForSync(lu, instance, disks=None, oneshot=False):
1139 """Sleep and poll for an instance's disk to sync.
1142 if not instance.disks or disks is not None and not disks:
1145 disks = ExpandCheckDisks(instance, disks)
1148 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1150 node_uuid = instance.primary_node
1151 node_name = lu.cfg.GetNodeName(node_uuid)
1154 lu.cfg.SetDiskID(dev, node_uuid)
1156 # TODO: Convert to utils.Retry
1159 degr_retries = 10 # in seconds, as we sleep 1 second each time
1163 cumul_degraded = False
1164 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1165 msg = rstats.fail_msg
1167 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1170 raise errors.RemoteError("Can't contact node %s for mirror data,"
1171 " aborting." % node_name)
1174 rstats = rstats.payload
1176 for i, mstat in enumerate(rstats):
1178 lu.LogWarning("Can't compute data for node %s/%s",
1179 node_name, disks[i].iv_name)
1182 cumul_degraded = (cumul_degraded or
1183 (mstat.is_degraded and mstat.sync_percent is None))
1184 if mstat.sync_percent is not None:
1186 if mstat.estimated_time is not None:
1187 rem_time = ("%s remaining (estimated)" %
1188 utils.FormatSeconds(mstat.estimated_time))
1189 max_time = mstat.estimated_time
1191 rem_time = "no time estimate"
1192 lu.LogInfo("- device %s: %5.2f%% done, %s",
1193 disks[i].iv_name, mstat.sync_percent, rem_time)
1195 # if we're done but degraded, let's do a few small retries, to
1196 # make sure we see a stable and not transient situation; therefore
1197 # we force restart of the loop
1198 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1199 logging.info("Degraded disks found, %d retries left", degr_retries)
1207 time.sleep(min(60, max_time))
1210 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1212 return not cumul_degraded
1215 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1216 """Shutdown block devices of an instance.
1218 This does the shutdown on all nodes of the instance.
1220 If the ignore_primary is false, errors on the primary node are
1224 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1226 disks = ExpandCheckDisks(instance, disks)
1229 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1230 lu.cfg.SetDiskID(top_disk, node_uuid)
1231 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1232 msg = result.fail_msg
1234 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1235 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1236 if ((node_uuid == instance.primary_node and not ignore_primary) or
1237 (node_uuid != instance.primary_node and not result.offline)):
1242 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1243 """Shutdown block devices of an instance.
1245 This function checks if an instance is running, before calling
1246 _ShutdownInstanceDisks.
1249 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1250 ShutdownInstanceDisks(lu, instance, disks=disks)
1253 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1255 """Prepare the block devices for an instance.
1257 This sets up the block devices on all nodes.
1259 @type lu: L{LogicalUnit}
1260 @param lu: the logical unit on whose behalf we execute
1261 @type instance: L{objects.Instance}
1262 @param instance: the instance for whose disks we assemble
1263 @type disks: list of L{objects.Disk} or None
1264 @param disks: which disks to assemble (or all, if None)
1265 @type ignore_secondaries: boolean
1266 @param ignore_secondaries: if true, errors on secondary nodes
1267 won't result in an error return from the function
1268 @type ignore_size: boolean
1269 @param ignore_size: if true, the current known size of the disk
1270 will not be used during the disk activation, useful for cases
1271 when the size is wrong
1272 @return: False if the operation failed, otherwise a list of
1273 (host, instance_visible_name, node_visible_name)
1274 with the mapping from node devices to instance devices
1279 disks = ExpandCheckDisks(instance, disks)
1281 # With the two passes mechanism we try to reduce the window of
1282 # opportunity for the race condition of switching DRBD to primary
1283 # before handshaking occured, but we do not eliminate it
1285 # The proper fix would be to wait (with some limits) until the
1286 # connection has been made and drbd transitions from WFConnection
1287 # into any other network-connected state (Connected, SyncTarget,
1290 # mark instance disks as active before doing actual work, so watcher does
1291 # not try to shut them down erroneously
1292 lu.cfg.MarkInstanceDisksActive(instance.uuid)
1294 # 1st pass, assemble on all nodes in secondary mode
1295 for idx, inst_disk in enumerate(disks):
1296 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1297 instance.primary_node):
1299 node_disk = node_disk.Copy()
1300 node_disk.UnsetSize()
1301 lu.cfg.SetDiskID(node_disk, node_uuid)
1302 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1303 instance.name, False, idx)
1304 msg = result.fail_msg
1306 is_offline_secondary = (node_uuid in instance.secondary_nodes and
1308 lu.LogWarning("Could not prepare block device %s on node %s"
1309 " (is_primary=False, pass=1): %s",
1310 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1311 if not (ignore_secondaries or is_offline_secondary):
1314 # FIXME: race condition on drbd migration to primary
1316 # 2nd pass, do only the primary node
1317 for idx, inst_disk in enumerate(disks):
1320 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1321 instance.primary_node):
1322 if node_uuid != instance.primary_node:
1325 node_disk = node_disk.Copy()
1326 node_disk.UnsetSize()
1327 lu.cfg.SetDiskID(node_disk, node_uuid)
1328 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1329 instance.name, True, idx)
1330 msg = result.fail_msg
1332 lu.LogWarning("Could not prepare block device %s on node %s"
1333 " (is_primary=True, pass=2): %s",
1334 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1337 dev_path = result.payload
1339 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1340 inst_disk.iv_name, dev_path))
1342 # leave the disks configured for the primary node
1343 # this is a workaround that would be fixed better by
1344 # improving the logical/physical id handling
1346 lu.cfg.SetDiskID(disk, instance.primary_node)
1349 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1351 return disks_ok, device_info
1354 def StartInstanceDisks(lu, instance, force):
1355 """Start the disks of an instance.
1358 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1359 ignore_secondaries=force)
1361 ShutdownInstanceDisks(lu, instance)
1362 if force is not None and not force:
1364 hint=("If the message above refers to a secondary node,"
1365 " you can retry the operation using '--force'"))
1366 raise errors.OpExecError("Disk consistency error")
1369 class LUInstanceGrowDisk(LogicalUnit):
1370 """Grow a disk of an instance.
1374 HTYPE = constants.HTYPE_INSTANCE
1377 def ExpandNames(self):
1378 self._ExpandAndLockInstance()
1379 self.needed_locks[locking.LEVEL_NODE] = []
1380 self.needed_locks[locking.LEVEL_NODE_RES] = []
1381 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1382 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1384 def DeclareLocks(self, level):
1385 if level == locking.LEVEL_NODE:
1386 self._LockInstancesNodes()
1387 elif level == locking.LEVEL_NODE_RES:
1389 self.needed_locks[locking.LEVEL_NODE_RES] = \
1390 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1392 def BuildHooksEnv(self):
1395 This runs on the master, the primary and all the secondaries.
1399 "DISK": self.op.disk,
1400 "AMOUNT": self.op.amount,
1401 "ABSOLUTE": self.op.absolute,
1403 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1406 def BuildHooksNodes(self):
1407 """Build hooks nodes.
1410 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1413 def CheckPrereq(self):
1414 """Check prerequisites.
1416 This checks that the instance is in the cluster.
1419 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1420 assert self.instance is not None, \
1421 "Cannot retrieve locked instance %s" % self.op.instance_name
1422 node_uuids = list(self.instance.all_nodes)
1423 for node_uuid in node_uuids:
1424 CheckNodeOnline(self, node_uuid)
1425 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1427 if self.instance.disk_template not in constants.DTS_GROWABLE:
1428 raise errors.OpPrereqError("Instance's disk layout does not support"
1429 " growing", errors.ECODE_INVAL)
1431 self.disk = self.instance.FindDisk(self.op.disk)
1433 if self.op.absolute:
1434 self.target = self.op.amount
1435 self.delta = self.target - self.disk.size
1437 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1438 "current disk size (%s)" %
1439 (utils.FormatUnit(self.target, "h"),
1440 utils.FormatUnit(self.disk.size, "h")),
1443 self.delta = self.op.amount
1444 self.target = self.disk.size + self.delta
1446 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1447 utils.FormatUnit(self.delta, "h"),
1450 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1452 def _CheckDiskSpace(self, node_uuids, req_vgspace):
1453 template = self.instance.disk_template
1454 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1455 not any(self.node_es_flags.values())):
1456 # TODO: check the free disk space for file, when that feature will be
1458 # With exclusive storage we need to do something smarter than just looking
1459 # at free space, which, in the end, is basically a dry run. So we rely on
1460 # the dry run performed in Exec() instead.
1461 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1463 def Exec(self, feedback_fn):
1464 """Execute disk grow.
1467 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1468 assert (self.owned_locks(locking.LEVEL_NODE) ==
1469 self.owned_locks(locking.LEVEL_NODE_RES))
1471 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1473 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1475 raise errors.OpExecError("Cannot activate block device to grow")
1477 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1478 (self.op.disk, self.instance.name,
1479 utils.FormatUnit(self.delta, "h"),
1480 utils.FormatUnit(self.target, "h")))
1482 # First run all grow ops in dry-run mode
1483 for node_uuid in self.instance.all_nodes:
1484 self.cfg.SetDiskID(self.disk, node_uuid)
1485 result = self.rpc.call_blockdev_grow(node_uuid,
1486 (self.disk, self.instance),
1487 self.delta, True, True,
1488 self.node_es_flags[node_uuid])
1489 result.Raise("Dry-run grow request failed to node %s" %
1490 self.cfg.GetNodeName(node_uuid))
1493 # Get disk size from primary node for wiping
1494 self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1495 result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1497 result.Raise("Failed to retrieve disk size from node '%s'" %
1498 self.instance.primary_node)
1500 (disk_dimensions, ) = result.payload
1502 if disk_dimensions is None:
1503 raise errors.OpExecError("Failed to retrieve disk size from primary"
1504 " node '%s'" % self.instance.primary_node)
1505 (disk_size_in_bytes, _) = disk_dimensions
1507 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1509 assert old_disk_size >= self.disk.size, \
1510 ("Retrieved disk size too small (got %s, should be at least %s)" %
1511 (old_disk_size, self.disk.size))
1513 old_disk_size = None
1515 # We know that (as far as we can test) operations across different
1516 # nodes will succeed, time to run it for real on the backing storage
1517 for node_uuid in self.instance.all_nodes:
1518 self.cfg.SetDiskID(self.disk, node_uuid)
1519 result = self.rpc.call_blockdev_grow(node_uuid,
1520 (self.disk, self.instance),
1521 self.delta, False, True,
1522 self.node_es_flags[node_uuid])
1523 result.Raise("Grow request failed to node %s" %
1524 self.cfg.GetNodeName(node_uuid))
1526 # And now execute it for logical storage, on the primary node
1527 node_uuid = self.instance.primary_node
1528 self.cfg.SetDiskID(self.disk, node_uuid)
1529 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1530 self.delta, False, False,
1531 self.node_es_flags[node_uuid])
1532 result.Raise("Grow request failed to node %s" %
1533 self.cfg.GetNodeName(node_uuid))
1535 self.disk.RecordGrow(self.delta)
1536 self.cfg.Update(self.instance, feedback_fn)
1538 # Changes have been recorded, release node lock
1539 ReleaseLocks(self, locking.LEVEL_NODE)
1541 # Downgrade lock while waiting for sync
1542 self.glm.downgrade(locking.LEVEL_INSTANCE)
1544 assert wipe_disks ^ (old_disk_size is None)
1547 assert self.instance.disks[self.op.disk] == self.disk
1549 # Wipe newly added disk space
1550 WipeDisks(self, self.instance,
1551 disks=[(self.op.disk, self.disk, old_disk_size)])
1553 if self.op.wait_for_sync:
1554 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1556 self.LogWarning("Disk syncing has not returned a good status; check"
1558 if not self.instance.disks_active:
1559 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1560 elif not self.instance.disks_active:
1561 self.LogWarning("Not shutting down the disk even if the instance is"
1562 " not supposed to be running because no wait for"
1563 " sync mode was requested")
1565 assert self.owned_locks(locking.LEVEL_NODE_RES)
1566 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1569 class LUInstanceReplaceDisks(LogicalUnit):
1570 """Replace the disks of an instance.
1573 HPATH = "mirrors-replace"
1574 HTYPE = constants.HTYPE_INSTANCE
1577 def CheckArguments(self):
1581 if self.op.mode == constants.REPLACE_DISK_CHG:
1582 if self.op.remote_node is None and self.op.iallocator is None:
1583 raise errors.OpPrereqError("When changing the secondary either an"
1584 " iallocator script must be used or the"
1585 " new node given", errors.ECODE_INVAL)
1587 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1589 elif self.op.remote_node is not None or self.op.iallocator is not None:
1590 # Not replacing the secondary
1591 raise errors.OpPrereqError("The iallocator and new node options can"
1592 " only be used when changing the"
1593 " secondary node", errors.ECODE_INVAL)
1595 def ExpandNames(self):
1596 self._ExpandAndLockInstance()
1598 assert locking.LEVEL_NODE not in self.needed_locks
1599 assert locking.LEVEL_NODE_RES not in self.needed_locks
1600 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1602 assert self.op.iallocator is None or self.op.remote_node is None, \
1603 "Conflicting options"
1605 if self.op.remote_node is not None:
1606 (self.op.remote_node_uuid, self.op.remote_node) = \
1607 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1608 self.op.remote_node)
1610 # Warning: do not remove the locking of the new secondary here
1611 # unless DRBD8Dev.AddChildren is changed to work in parallel;
1612 # currently it doesn't since parallel invocations of
1613 # FindUnusedMinor will conflict
1614 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1615 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1617 self.needed_locks[locking.LEVEL_NODE] = []
1618 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1620 if self.op.iallocator is not None:
1621 # iallocator will select a new node in the same group
1622 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1623 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1625 self.needed_locks[locking.LEVEL_NODE_RES] = []
1627 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1628 self.op.instance_name, self.op.mode,
1629 self.op.iallocator, self.op.remote_node_uuid,
1630 self.op.disks, self.op.early_release,
1631 self.op.ignore_ipolicy)
1633 self.tasklets = [self.replacer]
1635 def DeclareLocks(self, level):
1636 if level == locking.LEVEL_NODEGROUP:
1637 assert self.op.remote_node_uuid is None
1638 assert self.op.iallocator is not None
1639 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1641 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1642 # Lock all groups used by instance optimistically; this requires going
1643 # via the node before it's locked, requiring verification later on
1644 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1645 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1647 elif level == locking.LEVEL_NODE:
1648 if self.op.iallocator is not None:
1649 assert self.op.remote_node_uuid is None
1650 assert not self.needed_locks[locking.LEVEL_NODE]
1651 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1653 # Lock member nodes of all locked groups
1654 self.needed_locks[locking.LEVEL_NODE] = \
1656 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1657 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1659 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1661 self._LockInstancesNodes()
1663 elif level == locking.LEVEL_NODE_RES:
1665 self.needed_locks[locking.LEVEL_NODE_RES] = \
1666 self.needed_locks[locking.LEVEL_NODE]
1668 def BuildHooksEnv(self):
1671 This runs on the master, the primary and all the secondaries.
1674 instance = self.replacer.instance
1676 "MODE": self.op.mode,
1677 "NEW_SECONDARY": self.op.remote_node,
1678 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1680 env.update(BuildInstanceHookEnvByObject(self, instance))
1683 def BuildHooksNodes(self):
1684 """Build hooks nodes.
1687 instance = self.replacer.instance
1689 self.cfg.GetMasterNode(),
1690 instance.primary_node,
1692 if self.op.remote_node_uuid is not None:
1693 nl.append(self.op.remote_node_uuid)
1696 def CheckPrereq(self):
1697 """Check prerequisites.
1700 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1701 self.op.iallocator is None)
1703 # Verify if node group locks are still correct
1704 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1706 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1708 return LogicalUnit.CheckPrereq(self)
1711 class LUInstanceActivateDisks(NoHooksLU):
1712 """Bring up an instance's disks.
1717 def ExpandNames(self):
1718 self._ExpandAndLockInstance()
1719 self.needed_locks[locking.LEVEL_NODE] = []
1720 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1722 def DeclareLocks(self, level):
1723 if level == locking.LEVEL_NODE:
1724 self._LockInstancesNodes()
1726 def CheckPrereq(self):
1727 """Check prerequisites.
1729 This checks that the instance is in the cluster.
1732 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1733 assert self.instance is not None, \
1734 "Cannot retrieve locked instance %s" % self.op.instance_name
1735 CheckNodeOnline(self, self.instance.primary_node)
1737 def Exec(self, feedback_fn):
1738 """Activate the disks.
1741 disks_ok, disks_info = \
1742 AssembleInstanceDisks(self, self.instance,
1743 ignore_size=self.op.ignore_size)
1745 raise errors.OpExecError("Cannot activate block devices")
1747 if self.op.wait_for_sync:
1748 if not WaitForSync(self, self.instance):
1749 self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1750 raise errors.OpExecError("Some disks of the instance are degraded!")
1755 class LUInstanceDeactivateDisks(NoHooksLU):
1756 """Shutdown an instance's disks.
1761 def ExpandNames(self):
1762 self._ExpandAndLockInstance()
1763 self.needed_locks[locking.LEVEL_NODE] = []
1764 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1766 def DeclareLocks(self, level):
1767 if level == locking.LEVEL_NODE:
1768 self._LockInstancesNodes()
1770 def CheckPrereq(self):
1771 """Check prerequisites.
1773 This checks that the instance is in the cluster.
1776 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1777 assert self.instance is not None, \
1778 "Cannot retrieve locked instance %s" % self.op.instance_name
1780 def Exec(self, feedback_fn):
1781 """Deactivate the disks
1785 ShutdownInstanceDisks(self, self.instance)
1787 _SafeShutdownInstanceDisks(self, self.instance)
1790 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1792 """Check that mirrors are not degraded.
1794 @attention: The device has to be annotated already.
1796 The ldisk parameter, if True, will change the test from the
1797 is_degraded attribute (which represents overall non-ok status for
1798 the device(s)) to the ldisk (representing the local storage status).
1801 lu.cfg.SetDiskID(dev, node_uuid)
1805 if on_primary or dev.AssembleOnSecondary():
1806 rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1807 msg = rstats.fail_msg
1809 lu.LogWarning("Can't find disk on node %s: %s",
1810 lu.cfg.GetNodeName(node_uuid), msg)
1812 elif not rstats.payload:
1813 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1817 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1819 result = result and not rstats.payload.is_degraded
1822 for child in dev.children:
1823 result = result and _CheckDiskConsistencyInner(lu, instance, child,
1824 node_uuid, on_primary)
1829 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1830 """Wrapper around L{_CheckDiskConsistencyInner}.
1833 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1834 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1838 def _BlockdevFind(lu, node_uuid, dev, instance):
1839 """Wrapper around call_blockdev_find to annotate diskparams.
1841 @param lu: A reference to the lu object
1842 @param node_uuid: The node to call out
1843 @param dev: The device to find
1844 @param instance: The instance object the device belongs to
1845 @returns The result of the rpc call
1848 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1849 return lu.rpc.call_blockdev_find(node_uuid, disk)
1852 def _GenerateUniqueNames(lu, exts):
1853 """Generate a suitable LV name.
1855 This will generate a logical volume name for the given instance.
1860 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1861 results.append("%s%s" % (new_id, val))
1865 class TLReplaceDisks(Tasklet):
1866 """Replaces disks for an instance.
1868 Note: Locking is not within the scope of this class.
1871 def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1872 remote_node_uuid, disks, early_release, ignore_ipolicy):
1873 """Initializes this class.
1876 Tasklet.__init__(self, lu)
1879 self.instance_uuid = instance_uuid
1880 self.instance_name = instance_name
1882 self.iallocator_name = iallocator_name
1883 self.remote_node_uuid = remote_node_uuid
1885 self.early_release = early_release
1886 self.ignore_ipolicy = ignore_ipolicy
1889 self.instance = None
1890 self.new_node_uuid = None
1891 self.target_node_uuid = None
1892 self.other_node_uuid = None
1893 self.remote_node_info = None
1894 self.node_secondary_ip = None
1897 def _RunAllocator(lu, iallocator_name, instance_uuid,
1898 relocate_from_node_uuids):
1899 """Compute a new secondary node using an IAllocator.
1902 req = iallocator.IAReqRelocate(
1903 inst_uuid=instance_uuid,
1904 relocate_from_node_uuids=list(relocate_from_node_uuids))
1905 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1907 ial.Run(iallocator_name)
1910 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1911 " %s" % (iallocator_name, ial.info),
1914 remote_node_name = ial.result[0]
1915 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1917 if remote_node is None:
1918 raise errors.OpPrereqError("Node %s not found in configuration" %
1919 remote_node_name, errors.ECODE_NOENT)
1921 lu.LogInfo("Selected new secondary for instance '%s': %s",
1922 instance_uuid, remote_node_name)
1924 return remote_node.uuid
1926 def _FindFaultyDisks(self, node_uuid):
1927 """Wrapper for L{FindFaultyInstanceDisks}.
1930 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1933 def _CheckDisksActivated(self, instance):
1934 """Checks if the instance disks are activated.
1936 @param instance: The instance to check disks
1937 @return: True if they are activated, False otherwise
1940 node_uuids = instance.all_nodes
1942 for idx, dev in enumerate(instance.disks):
1943 for node_uuid in node_uuids:
1944 self.lu.LogInfo("Checking disk/%d on %s", idx,
1945 self.cfg.GetNodeName(node_uuid))
1946 self.cfg.SetDiskID(dev, node_uuid)
1948 result = _BlockdevFind(self, node_uuid, dev, instance)
1952 elif result.fail_msg or not result.payload:
1957 def CheckPrereq(self):
1958 """Check prerequisites.
1960 This checks that the instance is in the cluster.
1963 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1964 assert self.instance is not None, \
1965 "Cannot retrieve locked instance %s" % self.instance_name
1967 if self.instance.disk_template != constants.DT_DRBD8:
1968 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1969 " instances", errors.ECODE_INVAL)
1971 if len(self.instance.secondary_nodes) != 1:
1972 raise errors.OpPrereqError("The instance has a strange layout,"
1973 " expected one secondary but found %d" %
1974 len(self.instance.secondary_nodes),
1977 secondary_node_uuid = self.instance.secondary_nodes[0]
1979 if self.iallocator_name is None:
1980 remote_node_uuid = self.remote_node_uuid
1982 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1984 self.instance.secondary_nodes)
1986 if remote_node_uuid is None:
1987 self.remote_node_info = None
1989 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1990 "Remote node '%s' is not locked" % remote_node_uuid
1992 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1993 assert self.remote_node_info is not None, \
1994 "Cannot retrieve locked node %s" % remote_node_uuid
1996 if remote_node_uuid == self.instance.primary_node:
1997 raise errors.OpPrereqError("The specified node is the primary node of"
1998 " the instance", errors.ECODE_INVAL)
2000 if remote_node_uuid == secondary_node_uuid:
2001 raise errors.OpPrereqError("The specified node is already the"
2002 " secondary node of the instance",
2005 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2006 constants.REPLACE_DISK_CHG):
2007 raise errors.OpPrereqError("Cannot specify disks to be replaced",
2010 if self.mode == constants.REPLACE_DISK_AUTO:
2011 if not self._CheckDisksActivated(self.instance):
2012 raise errors.OpPrereqError("Please run activate-disks on instance %s"
2013 " first" % self.instance_name,
2015 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2016 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2018 if faulty_primary and faulty_secondary:
2019 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2020 " one node and can not be repaired"
2021 " automatically" % self.instance_name,
2025 self.disks = faulty_primary
2026 self.target_node_uuid = self.instance.primary_node
2027 self.other_node_uuid = secondary_node_uuid
2028 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2029 elif faulty_secondary:
2030 self.disks = faulty_secondary
2031 self.target_node_uuid = secondary_node_uuid
2032 self.other_node_uuid = self.instance.primary_node
2033 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2039 # Non-automatic modes
2040 if self.mode == constants.REPLACE_DISK_PRI:
2041 self.target_node_uuid = self.instance.primary_node
2042 self.other_node_uuid = secondary_node_uuid
2043 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2045 elif self.mode == constants.REPLACE_DISK_SEC:
2046 self.target_node_uuid = secondary_node_uuid
2047 self.other_node_uuid = self.instance.primary_node
2048 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2050 elif self.mode == constants.REPLACE_DISK_CHG:
2051 self.new_node_uuid = remote_node_uuid
2052 self.other_node_uuid = self.instance.primary_node
2053 self.target_node_uuid = secondary_node_uuid
2054 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2056 CheckNodeNotDrained(self.lu, remote_node_uuid)
2057 CheckNodeVmCapable(self.lu, remote_node_uuid)
2059 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2060 assert old_node_info is not None
2061 if old_node_info.offline and not self.early_release:
2062 # doesn't make sense to delay the release
2063 self.early_release = True
2064 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2065 " early-release mode", secondary_node_uuid)
2068 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2071 # If not specified all disks should be replaced
2073 self.disks = range(len(self.instance.disks))
2075 # TODO: This is ugly, but right now we can't distinguish between internal
2076 # submitted opcode and external one. We should fix that.
2077 if self.remote_node_info:
2078 # We change the node, lets verify it still meets instance policy
2079 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2080 cluster = self.cfg.GetClusterInfo()
2081 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2083 CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2084 self.remote_node_info, self.cfg,
2085 ignore=self.ignore_ipolicy)
2087 for node_uuid in check_nodes:
2088 CheckNodeOnline(self.lu, node_uuid)
2090 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2091 self.other_node_uuid,
2092 self.target_node_uuid]
2093 if node_uuid is not None)
2095 # Release unneeded node and node resource locks
2096 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2097 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2098 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2100 # Release any owned node group
2101 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2103 # Check whether disks are valid
2104 for disk_idx in self.disks:
2105 self.instance.FindDisk(disk_idx)
2107 # Get secondary node IP addresses
2108 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2109 in self.cfg.GetMultiNodeInfo(touched_nodes))
2111 def Exec(self, feedback_fn):
2112 """Execute disk replacement.
2114 This dispatches the disk replacement to the appropriate handler.
2118 # Verify owned locks before starting operation
2119 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2120 assert set(owned_nodes) == set(self.node_secondary_ip), \
2121 ("Incorrect node locks, owning %s, expected %s" %
2122 (owned_nodes, self.node_secondary_ip.keys()))
2123 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2124 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2125 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2127 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2128 assert list(owned_instances) == [self.instance_name], \
2129 "Instance '%s' not locked" % self.instance_name
2131 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2132 "Should not own any node group lock at this point"
2135 feedback_fn("No disks need replacement for instance '%s'" %
2139 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2140 (utils.CommaJoin(self.disks), self.instance.name))
2141 feedback_fn("Current primary node: %s" %
2142 self.cfg.GetNodeName(self.instance.primary_node))
2143 feedback_fn("Current seconary node: %s" %
2144 utils.CommaJoin(self.cfg.GetNodeNames(
2145 self.instance.secondary_nodes)))
2147 activate_disks = not self.instance.disks_active
2149 # Activate the instance disks if we're replacing them on a down instance
2151 StartInstanceDisks(self.lu, self.instance, True)
2154 # Should we replace the secondary node?
2155 if self.new_node_uuid is not None:
2156 fn = self._ExecDrbd8Secondary
2158 fn = self._ExecDrbd8DiskOnly
2160 result = fn(feedback_fn)
2162 # Deactivate the instance disks if we're replacing them on a
2165 _SafeShutdownInstanceDisks(self.lu, self.instance)
2167 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2170 # Verify owned locks
2171 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2172 nodes = frozenset(self.node_secondary_ip)
2173 assert ((self.early_release and not owned_nodes) or
2174 (not self.early_release and not (set(owned_nodes) - nodes))), \
2175 ("Not owning the correct locks, early_release=%s, owned=%r,"
2176 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2180 def _CheckVolumeGroup(self, node_uuids):
2181 self.lu.LogInfo("Checking volume groups")
2183 vgname = self.cfg.GetVGName()
2185 # Make sure volume group exists on all involved nodes
2186 results = self.rpc.call_vg_list(node_uuids)
2188 raise errors.OpExecError("Can't list volume groups on the nodes")
2190 for node_uuid in node_uuids:
2191 res = results[node_uuid]
2192 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2193 if vgname not in res.payload:
2194 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2195 (vgname, self.cfg.GetNodeName(node_uuid)))
2197 def _CheckDisksExistence(self, node_uuids):
2198 # Check disk existence
2199 for idx, dev in enumerate(self.instance.disks):
2200 if idx not in self.disks:
2203 for node_uuid in node_uuids:
2204 self.lu.LogInfo("Checking disk/%d on %s", idx,
2205 self.cfg.GetNodeName(node_uuid))
2206 self.cfg.SetDiskID(dev, node_uuid)
2208 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2210 msg = result.fail_msg
2211 if msg or not result.payload:
2213 msg = "disk not found"
2214 if not self._CheckDisksActivated(self.instance):
2215 extra_hint = ("\nDisks seem to be not properly activated. Try"
2216 " running activate-disks on the instance before"
2217 " using replace-disks.")
2220 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2221 (idx, self.cfg.GetNodeName(node_uuid), msg,
2224 def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2225 for idx, dev in enumerate(self.instance.disks):
2226 if idx not in self.disks:
2229 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2230 (idx, self.cfg.GetNodeName(node_uuid)))
2232 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2233 on_primary, ldisk=ldisk):
2234 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2235 " replace disks for instance %s" %
2236 (self.cfg.GetNodeName(node_uuid),
2237 self.instance.name))
2239 def _CreateNewStorage(self, node_uuid):
2240 """Create new storage on the primary or secondary node.
2242 This is only used for same-node replaces, not for changing the
2243 secondary node, hence we don't want to modify the existing disk.
2248 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2249 for idx, dev in enumerate(disks):
2250 if idx not in self.disks:
2253 self.lu.LogInfo("Adding storage on %s for disk/%d",
2254 self.cfg.GetNodeName(node_uuid), idx)
2256 self.cfg.SetDiskID(dev, node_uuid)
2258 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2259 names = _GenerateUniqueNames(self.lu, lv_names)
2261 (data_disk, meta_disk) = dev.children
2262 vg_data = data_disk.logical_id[0]
2263 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2264 logical_id=(vg_data, names[0]),
2265 params=data_disk.params)
2266 vg_meta = meta_disk.logical_id[0]
2267 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2268 size=constants.DRBD_META_SIZE,
2269 logical_id=(vg_meta, names[1]),
2270 params=meta_disk.params)
2272 new_lvs = [lv_data, lv_meta]
2273 old_lvs = [child.Copy() for child in dev.children]
2274 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2275 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2277 # we pass force_create=True to force the LVM creation
2278 for new_lv in new_lvs:
2280 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2281 GetInstanceInfoText(self.instance), False,
2283 except errors.DeviceCreationError, e:
2284 raise errors.OpExecError("Can't create block device: %s" % e.message)
2288 def _CheckDevices(self, node_uuid, iv_names):
2289 for name, (dev, _, _) in iv_names.iteritems():
2290 self.cfg.SetDiskID(dev, node_uuid)
2292 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2294 msg = result.fail_msg
2295 if msg or not result.payload:
2297 msg = "disk not found"
2298 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2301 if result.payload.is_degraded:
2302 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2304 def _RemoveOldStorage(self, node_uuid, iv_names):
2305 for name, (_, old_lvs, _) in iv_names.iteritems():
2306 self.lu.LogInfo("Remove logical volumes for %s", name)
2309 self.cfg.SetDiskID(lv, node_uuid)
2311 msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2313 self.lu.LogWarning("Can't remove old LV: %s", msg,
2314 hint="remove unused LVs manually")
2316 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2317 """Replace a disk on the primary or secondary for DRBD 8.
2319 The algorithm for replace is quite complicated:
2321 1. for each disk to be replaced:
2323 1. create new LVs on the target node with unique names
2324 1. detach old LVs from the drbd device
2325 1. rename old LVs to name_replaced.<time_t>
2326 1. rename new LVs to old LVs
2327 1. attach the new LVs (with the old names now) to the drbd device
2329 1. wait for sync across all devices
2331 1. for each modified disk:
2333 1. remove old LVs (which have the name name_replaces.<time_t>)
2335 Failures are not very well handled.
2340 # Step: check device activation
2341 self.lu.LogStep(1, steps_total, "Check device existence")
2342 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2343 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2345 # Step: check other node consistency
2346 self.lu.LogStep(2, steps_total, "Check peer consistency")
2347 self._CheckDisksConsistency(
2348 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2351 # Step: create new storage
2352 self.lu.LogStep(3, steps_total, "Allocate new storage")
2353 iv_names = self._CreateNewStorage(self.target_node_uuid)
2355 # Step: for each lv, detach+rename*2+attach
2356 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2357 for dev, old_lvs, new_lvs in iv_names.itervalues():
2358 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2360 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2362 result.Raise("Can't detach drbd from local storage on node"
2363 " %s for device %s" %
2364 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2366 #cfg.Update(instance)
2368 # ok, we created the new LVs, so now we know we have the needed
2369 # storage; as such, we proceed on the target node to rename
2370 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2371 # using the assumption that logical_id == physical_id (which in
2372 # turn is the unique_id on that node)
2374 # FIXME(iustin): use a better name for the replaced LVs
2375 temp_suffix = int(time.time())
2376 ren_fn = lambda d, suff: (d.physical_id[0],
2377 d.physical_id[1] + "_replaced-%s" % suff)
2379 # Build the rename list based on what LVs exist on the node
2380 rename_old_to_new = []
2381 for to_ren in old_lvs:
2382 result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2383 if not result.fail_msg and result.payload:
2385 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2387 self.lu.LogInfo("Renaming the old LVs on the target node")
2388 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2390 result.Raise("Can't rename old LVs on node %s" %
2391 self.cfg.GetNodeName(self.target_node_uuid))
2393 # Now we rename the new LVs to the old LVs
2394 self.lu.LogInfo("Renaming the new LVs on the target node")
2395 rename_new_to_old = [(new, old.physical_id)
2396 for old, new in zip(old_lvs, new_lvs)]
2397 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2399 result.Raise("Can't rename new LVs on node %s" %
2400 self.cfg.GetNodeName(self.target_node_uuid))
2402 # Intermediate steps of in memory modifications
2403 for old, new in zip(old_lvs, new_lvs):
2404 new.logical_id = old.logical_id
2405 self.cfg.SetDiskID(new, self.target_node_uuid)
2407 # We need to modify old_lvs so that removal later removes the
2408 # right LVs, not the newly added ones; note that old_lvs is a
2410 for disk in old_lvs:
2411 disk.logical_id = ren_fn(disk, temp_suffix)
2412 self.cfg.SetDiskID(disk, self.target_node_uuid)
2414 # Now that the new lvs have the old name, we can add them to the device
2415 self.lu.LogInfo("Adding new mirror component on %s",
2416 self.cfg.GetNodeName(self.target_node_uuid))
2417 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2418 (dev, self.instance), new_lvs)
2419 msg = result.fail_msg
2421 for new_lv in new_lvs:
2422 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2425 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2426 hint=("cleanup manually the unused logical"
2428 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2430 cstep = itertools.count(5)
2432 if self.early_release:
2433 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2434 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2435 # TODO: Check if releasing locks early still makes sense
2436 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2438 # Release all resource locks except those used by the instance
2439 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2440 keep=self.node_secondary_ip.keys())
2442 # Release all node locks while waiting for sync
2443 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2445 # TODO: Can the instance lock be downgraded here? Take the optional disk
2446 # shutdown in the caller into consideration.
2449 # This can fail as the old devices are degraded and _WaitForSync
2450 # does a combined result over all disks, so we don't check its return value
2451 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2452 WaitForSync(self.lu, self.instance)
2454 # Check all devices manually
2455 self._CheckDevices(self.instance.primary_node, iv_names)
2457 # Step: remove old storage
2458 if not self.early_release:
2459 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2460 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2462 def _ExecDrbd8Secondary(self, feedback_fn):
2463 """Replace the secondary node for DRBD 8.
2465 The algorithm for replace is quite complicated:
2466 - for all disks of the instance:
2467 - create new LVs on the new node with same names
2468 - shutdown the drbd device on the old secondary
2469 - disconnect the drbd network on the primary
2470 - create the drbd device on the new secondary
2471 - network attach the drbd on the primary, using an artifice:
2472 the drbd code for Attach() will connect to the network if it
2473 finds a device which is connected to the good local disks but
2475 - wait for sync across all devices
2476 - remove all disks from the old secondary
2478 Failures are not very well handled.
2483 pnode = self.instance.primary_node
2485 # Step: check device activation
2486 self.lu.LogStep(1, steps_total, "Check device existence")
2487 self._CheckDisksExistence([self.instance.primary_node])
2488 self._CheckVolumeGroup([self.instance.primary_node])
2490 # Step: check other node consistency
2491 self.lu.LogStep(2, steps_total, "Check peer consistency")
2492 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2494 # Step: create new storage
2495 self.lu.LogStep(3, steps_total, "Allocate new storage")
2496 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2497 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2499 for idx, dev in enumerate(disks):
2500 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2501 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2502 # we pass force_create=True to force LVM creation
2503 for new_lv in dev.children:
2505 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2506 new_lv, True, GetInstanceInfoText(self.instance),
2508 except errors.DeviceCreationError, e:
2509 raise errors.OpExecError("Can't create block device: %s" % e.message)
2511 # Step 4: dbrd minors and drbd setups changes
2512 # after this, we must manually remove the drbd minors on both the
2513 # error and the success paths
2514 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2515 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2516 for _ in self.instance.disks],
2518 logging.debug("Allocated minors %r", minors)
2521 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2522 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2523 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2524 # create new devices on new_node; note that we create two IDs:
2525 # one without port, so the drbd will be activated without
2526 # networking information on the new node at this stage, and one
2527 # with network, for the latter activation in step 4
2528 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2529 if self.instance.primary_node == o_node1:
2532 assert self.instance.primary_node == o_node2, "Three-node instance?"
2535 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2536 p_minor, new_minor, o_secret)
2537 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2538 p_minor, new_minor, o_secret)
2540 iv_names[idx] = (dev, dev.children, new_net_id)
2541 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2543 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2544 logical_id=new_alone_id,
2545 children=dev.children,
2548 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2551 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2553 GetInstanceInfoText(self.instance), False,
2555 except errors.GenericError:
2556 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2559 # We have new devices, shutdown the drbd on the old secondary
2560 for idx, dev in enumerate(self.instance.disks):
2561 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2562 self.cfg.SetDiskID(dev, self.target_node_uuid)
2563 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2564 (dev, self.instance)).fail_msg
2566 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2567 "node: %s" % (idx, msg),
2568 hint=("Please cleanup this device manually as"
2569 " soon as possible"))
2571 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2572 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2573 self.instance.disks)[pnode]
2575 msg = result.fail_msg
2577 # detaches didn't succeed (unlikely)
2578 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2579 raise errors.OpExecError("Can't detach the disks from the network on"
2580 " old node: %s" % (msg,))
2582 # if we managed to detach at least one, we update all the disks of
2583 # the instance to point to the new secondary
2584 self.lu.LogInfo("Updating instance configuration")
2585 for dev, _, new_logical_id in iv_names.itervalues():
2586 dev.logical_id = new_logical_id
2587 self.cfg.SetDiskID(dev, self.instance.primary_node)
2589 self.cfg.Update(self.instance, feedback_fn)
2591 # Release all node locks (the configuration has been updated)
2592 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2594 # and now perform the drbd attach
2595 self.lu.LogInfo("Attaching primary drbds to new secondary"
2596 " (standalone => connected)")
2597 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2598 self.new_node_uuid],
2599 self.node_secondary_ip,
2600 (self.instance.disks, self.instance),
2603 for to_node, to_result in result.items():
2604 msg = to_result.fail_msg
2606 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2607 self.cfg.GetNodeName(to_node), msg,
2608 hint=("please do a gnt-instance info to see the"
2609 " status of disks"))
2611 cstep = itertools.count(5)
2613 if self.early_release:
2614 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2615 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2616 # TODO: Check if releasing locks early still makes sense
2617 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2619 # Release all resource locks except those used by the instance
2620 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2621 keep=self.node_secondary_ip.keys())
2623 # TODO: Can the instance lock be downgraded here? Take the optional disk
2624 # shutdown in the caller into consideration.
2627 # This can fail as the old devices are degraded and _WaitForSync
2628 # does a combined result over all disks, so we don't check its return value
2629 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2630 WaitForSync(self.lu, self.instance)
2632 # Check all devices manually
2633 self._CheckDevices(self.instance.primary_node, iv_names)
2635 # Step: remove old storage
2636 if not self.early_release:
2637 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2638 self._RemoveOldStorage(self.target_node_uuid, iv_names)