4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with storage of instances."""
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48 import ganeti.masterd.instance
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59 constants.DT_PLAIN: constants.LD_LV,
60 constants.DT_FILE: constants.LD_FILE,
61 constants.DT_SHARED_FILE: constants.LD_FILE,
62 constants.DT_BLOCK: constants.LD_BLOCKDEV,
63 constants.DT_RBD: constants.LD_RBD,
64 constants.DT_EXT: constants.LD_EXT,
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
70 """Create a single block device on a given node.
72 This will not recurse over children of the device, so they must be
75 @param lu: the lu on whose behalf we execute
76 @param node: the node on which to create the device
77 @type instance: L{objects.Instance}
78 @param instance: the instance which owns the device
79 @type device: L{objects.Disk}
80 @param device: the device to create
81 @param info: the extra 'metadata' we should attach to the device
82 (this will be represented as a LVM tag)
83 @type force_open: boolean
84 @param force_open: this parameter will be passes to the
85 L{backend.BlockdevCreate} function where it specifies
86 whether we run on primary or not, and it affects both
87 the child assembly and the device own Open() execution
88 @type excl_stor: boolean
89 @param excl_stor: Whether exclusive_storage is active for the node
92 lu.cfg.SetDiskID(device, node)
93 result = lu.rpc.call_blockdev_create(node, device, device.size,
94 instance.name, force_open, info,
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device, node, instance.name))
98 if device.physical_id is None:
99 device.physical_id = result.payload
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node.
106 If this device type has to be created on secondaries, create it and
109 If not, just recurse to children keeping the same 'force' value.
111 @attention: The device has to be annotated already.
113 @param lu: the lu on whose behalf we execute
114 @param node: the node on which to create the device
115 @type instance: L{objects.Instance}
116 @param instance: the instance which owns the device
117 @type device: L{objects.Disk}
118 @param device: the device to create
119 @type force_create: boolean
120 @param force_create: whether to force creation of this device; this
121 will be change to True whenever we find a device which has
122 CreateOnSecondary() attribute
123 @param info: the extra 'metadata' we should attach to the device
124 (this will be represented as a LVM tag)
125 @type force_open: boolean
126 @param force_open: this parameter will be passes to the
127 L{backend.BlockdevCreate} function where it specifies
128 whether we run on primary or not, and it affects both
129 the child assembly and the device own Open() execution
130 @type excl_stor: boolean
131 @param excl_stor: Whether exclusive_storage is active for the node
133 @return: list of created devices
137 if device.CreateOnSecondary():
141 for child in device.children:
142 devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143 info, force_open, excl_stor)
144 created_devices.extend(devs)
147 return created_devices
149 CreateSingleBlockDev(lu, node, instance, device, info, force_open,
151 # The device has been completely created, so there is no point in keeping
152 # its subdevices in the list. We just add the device itself instead.
153 created_devices = [(node, device)]
154 return created_devices
156 except errors.DeviceCreationError, e:
157 e.created_devices.extend(created_devices)
159 except errors.OpExecError, e:
160 raise errors.DeviceCreationError(str(e), created_devices)
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164 """Whether exclusive_storage is in effect for the given node.
166 @type cfg: L{config.ConfigWriter}
167 @param cfg: The cluster configuration
168 @type nodename: string
169 @param nodename: The node
171 @return: The effective value of exclusive_storage
172 @raise errors.OpPrereqError: if no node exists with the given name
175 ni = cfg.GetNodeInfo(nodename)
177 raise errors.OpPrereqError("Invalid node name %s" % nodename,
179 return IsExclusiveStorageEnabledNode(cfg, ni)
182 def CreateBlockDev(lu, node, instance, device, force_create, info,
184 """Wrapper around L{_CreateBlockDevInner}.
186 This method annotates the root device first.
189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190 excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191 return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192 force_open, excl_stor)
195 def CreateDisks(lu, instance, to_skip=None, target_node=None):
196 """Create all disks for an instance.
198 This abstracts away some work from AddInstance.
200 @type lu: L{LogicalUnit}
201 @param lu: the logical unit on whose behalf we execute
202 @type instance: L{objects.Instance}
203 @param instance: the instance whose disks we should create
205 @param to_skip: list of indices to skip
206 @type target_node: string
207 @param target_node: if passed, overrides the target node for creation
209 @return: the success of the creation
212 info = GetInstanceInfoText(instance)
213 if target_node is None:
214 pnode = instance.primary_node
215 all_nodes = instance.all_nodes
220 if instance.disk_template in constants.DTS_FILEBASED:
221 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
222 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
224 result.Raise("Failed to create directory '%s' on"
225 " node %s" % (file_storage_dir, pnode))
228 # Note: this needs to be kept in sync with adding of disks in
229 # LUInstanceSetParams
230 for idx, device in enumerate(instance.disks):
231 if to_skip and idx in to_skip:
233 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
235 for node in all_nodes:
236 f_create = node == pnode
238 CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
239 disks_created.append((node, device))
240 except errors.OpExecError:
241 logging.warning("Creating disk %s for instance '%s' failed",
243 except errors.DeviceCreationError, e:
244 logging.warning("Creating disk %s for instance '%s' failed",
246 disks_created.extend(e.created_devices)
247 for (node, disk) in disks_created:
248 lu.cfg.SetDiskID(disk, node)
249 result = lu.rpc.call_blockdev_remove(node, disk)
251 logging.warning("Failed to remove newly-created disk %s on node %s:"
252 " %s", device, node, result.fail_msg)
253 raise errors.OpExecError(e.message)
256 def ComputeDiskSizePerVG(disk_template, disks):
257 """Compute disk size requirements in the volume group
260 def _compute(disks, payload):
261 """Universal algorithm.
266 vgs[disk[constants.IDISK_VG]] = \
267 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
271 # Required free disk space as a function of disk and swap space
273 constants.DT_DISKLESS: {},
274 constants.DT_PLAIN: _compute(disks, 0),
275 # 128 MB are added for drbd metadata for each disk
276 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
277 constants.DT_FILE: {},
278 constants.DT_SHARED_FILE: {},
281 if disk_template not in req_size_dict:
282 raise errors.ProgrammerError("Disk template '%s' size requirement"
283 " is unknown" % disk_template)
285 return req_size_dict[disk_template]
288 def ComputeDisks(op, default_vg):
289 """Computes the instance disks.
291 @param op: The instance opcode
292 @param default_vg: The default_vg to assume
294 @return: The computed disks
298 for disk in op.disks:
299 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
300 if mode not in constants.DISK_ACCESS_SET:
301 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
302 mode, errors.ECODE_INVAL)
303 size = disk.get(constants.IDISK_SIZE, None)
305 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
308 except (TypeError, ValueError):
309 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
312 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
313 if ext_provider and op.disk_template != constants.DT_EXT:
314 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
315 " disk template, not %s" %
316 (constants.IDISK_PROVIDER, constants.DT_EXT,
317 op.disk_template), errors.ECODE_INVAL)
319 data_vg = disk.get(constants.IDISK_VG, default_vg)
320 name = disk.get(constants.IDISK_NAME, None)
321 if name is not None and name.lower() == constants.VALUE_NONE:
324 constants.IDISK_SIZE: size,
325 constants.IDISK_MODE: mode,
326 constants.IDISK_VG: data_vg,
327 constants.IDISK_NAME: name,
330 if constants.IDISK_METAVG in disk:
331 new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
332 if constants.IDISK_ADOPT in disk:
333 new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
335 # For extstorage, demand the `provider' option and add any
336 # additional parameters (ext-params) to the dict
337 if op.disk_template == constants.DT_EXT:
339 new_disk[constants.IDISK_PROVIDER] = ext_provider
341 if key not in constants.IDISK_PARAMS:
342 new_disk[key] = disk[key]
344 raise errors.OpPrereqError("Missing provider for template '%s'" %
345 constants.DT_EXT, errors.ECODE_INVAL)
347 disks.append(new_disk)
352 def CheckRADOSFreeSpace():
353 """Compute disk size requirements inside the RADOS cluster.
356 # For the RADOS cluster we assume there is always enough space.
360 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
361 iv_name, p_minor, s_minor):
362 """Generate a drbd8 device complete with its children.
365 assert len(vgnames) == len(names) == 2
366 port = lu.cfg.AllocatePort()
367 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
369 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
370 logical_id=(vgnames[0], names[0]),
372 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
373 dev_meta = objects.Disk(dev_type=constants.LD_LV,
374 size=constants.DRBD_META_SIZE,
375 logical_id=(vgnames[1], names[1]),
377 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
378 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
379 logical_id=(primary, secondary, port,
382 children=[dev_data, dev_meta],
383 iv_name=iv_name, params={})
384 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
388 def GenerateDiskTemplate(
389 lu, template_name, instance_name, primary_node, secondary_nodes,
390 disk_info, file_storage_dir, file_driver, base_index,
391 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
392 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
393 """Generate the entire disk layout for a given template type.
396 vgname = lu.cfg.GetVGName()
397 disk_count = len(disk_info)
400 if template_name == constants.DT_DISKLESS:
402 elif template_name == constants.DT_DRBD8:
403 if len(secondary_nodes) != 1:
404 raise errors.ProgrammerError("Wrong template configuration")
405 remote_node = secondary_nodes[0]
406 minors = lu.cfg.AllocateDRBDMinor(
407 [primary_node, remote_node] * len(disk_info), instance_name)
409 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
411 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
414 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
415 for i in range(disk_count)]):
416 names.append(lv_prefix + "_data")
417 names.append(lv_prefix + "_meta")
418 for idx, disk in enumerate(disk_info):
419 disk_index = idx + base_index
420 data_vg = disk.get(constants.IDISK_VG, vgname)
421 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
422 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
423 disk[constants.IDISK_SIZE],
425 names[idx * 2:idx * 2 + 2],
426 "disk/%d" % disk_index,
427 minors[idx * 2], minors[idx * 2 + 1])
428 disk_dev.mode = disk[constants.IDISK_MODE]
429 disk_dev.name = disk.get(constants.IDISK_NAME, None)
430 disks.append(disk_dev)
433 raise errors.ProgrammerError("Wrong template configuration")
435 if template_name == constants.DT_FILE:
437 elif template_name == constants.DT_SHARED_FILE:
438 _req_shr_file_storage()
440 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
441 if name_prefix is None:
444 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
445 (name_prefix, base_index + i)
446 for i in range(disk_count)])
448 if template_name == constants.DT_PLAIN:
450 def logical_id_fn(idx, _, disk):
451 vg = disk.get(constants.IDISK_VG, vgname)
452 return (vg, names[idx])
454 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
456 lambda _, disk_index, disk: (file_driver,
457 "%s/disk%d" % (file_storage_dir,
459 elif template_name == constants.DT_BLOCK:
461 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
462 disk[constants.IDISK_ADOPT])
463 elif template_name == constants.DT_RBD:
464 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
465 elif template_name == constants.DT_EXT:
466 def logical_id_fn(idx, _, disk):
467 provider = disk.get(constants.IDISK_PROVIDER, None)
469 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
470 " not found", constants.DT_EXT,
471 constants.IDISK_PROVIDER)
472 return (provider, names[idx])
474 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
476 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
478 for idx, disk in enumerate(disk_info):
480 # Only for the Ext template add disk_info to params
481 if template_name == constants.DT_EXT:
482 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
484 if key not in constants.IDISK_PARAMS:
485 params[key] = disk[key]
486 disk_index = idx + base_index
487 size = disk[constants.IDISK_SIZE]
488 feedback_fn("* disk %s, size %s" %
489 (disk_index, utils.FormatUnit(size, "h")))
490 disk_dev = objects.Disk(dev_type=dev_type, size=size,
491 logical_id=logical_id_fn(idx, disk_index, disk),
492 iv_name="disk/%d" % disk_index,
493 mode=disk[constants.IDISK_MODE],
495 disk_dev.name = disk.get(constants.IDISK_NAME, None)
496 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
497 disks.append(disk_dev)
502 class LUInstanceRecreateDisks(LogicalUnit):
503 """Recreate an instance's missing disks.
506 HPATH = "instance-recreate-disks"
507 HTYPE = constants.HTYPE_INSTANCE
510 _MODIFYABLE = compat.UniqueFrozenset([
511 constants.IDISK_SIZE,
512 constants.IDISK_MODE,
515 # New or changed disk parameters may have different semantics
516 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
517 constants.IDISK_ADOPT,
519 # TODO: Implement support changing VG while recreating
521 constants.IDISK_METAVG,
522 constants.IDISK_PROVIDER,
523 constants.IDISK_NAME,
526 def _RunAllocator(self):
527 """Run the allocator based on input opcode.
530 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
533 # The allocator should actually run in "relocate" mode, but current
534 # allocators don't support relocating all the nodes of an instance at
535 # the same time. As a workaround we use "allocate" mode, but this is
536 # suboptimal for two reasons:
537 # - The instance name passed to the allocator is present in the list of
538 # existing instances, so there could be a conflict within the
539 # internal structures of the allocator. This doesn't happen with the
540 # current allocators, but it's a liability.
541 # - The allocator counts the resources used by the instance twice: once
542 # because the instance exists already, and once because it tries to
543 # allocate a new instance.
544 # The allocator could choose some of the nodes on which the instance is
545 # running, but that's not a problem. If the instance nodes are broken,
546 # they should be already be marked as drained or offline, and hence
547 # skipped by the allocator. If instance disks have been lost for other
548 # reasons, then recreating the disks on the same nodes should be fine.
549 disk_template = self.instance.disk_template
550 spindle_use = be_full[constants.BE_SPINDLE_USE]
551 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
552 disk_template=disk_template,
553 tags=list(self.instance.GetTags()),
556 vcpus=be_full[constants.BE_VCPUS],
557 memory=be_full[constants.BE_MAXMEM],
558 spindle_use=spindle_use,
559 disks=[{constants.IDISK_SIZE: d.size,
560 constants.IDISK_MODE: d.mode}
561 for d in self.instance.disks],
562 hypervisor=self.instance.hypervisor,
564 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
566 ial.Run(self.op.iallocator)
568 assert req.RequiredNodes() == len(self.instance.all_nodes)
571 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
572 " %s" % (self.op.iallocator, ial.info),
575 self.op.nodes = ial.result
576 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
577 self.op.instance_name, self.op.iallocator,
578 utils.CommaJoin(ial.result))
580 def CheckArguments(self):
581 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
582 # Normalize and convert deprecated list of disk indices
583 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
585 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
587 raise errors.OpPrereqError("Some disks have been specified more than"
588 " once: %s" % utils.CommaJoin(duplicates),
591 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
592 # when neither iallocator nor nodes are specified
593 if self.op.iallocator or self.op.nodes:
594 CheckIAllocatorOrNode(self, "iallocator", "nodes")
596 for (idx, params) in self.op.disks:
597 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
598 unsupported = frozenset(params.keys()) - self._MODIFYABLE
600 raise errors.OpPrereqError("Parameters for disk %s try to change"
601 " unmodifyable parameter(s): %s" %
602 (idx, utils.CommaJoin(unsupported)),
605 def ExpandNames(self):
606 self._ExpandAndLockInstance()
607 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
610 self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
611 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
613 self.needed_locks[locking.LEVEL_NODE] = []
614 if self.op.iallocator:
615 # iallocator will select a new node in the same group
616 self.needed_locks[locking.LEVEL_NODEGROUP] = []
617 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
619 self.needed_locks[locking.LEVEL_NODE_RES] = []
621 def DeclareLocks(self, level):
622 if level == locking.LEVEL_NODEGROUP:
623 assert self.op.iallocator is not None
624 assert not self.op.nodes
625 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
626 self.share_locks[locking.LEVEL_NODEGROUP] = 1
627 # Lock the primary group used by the instance optimistically; this
628 # requires going via the node before it's locked, requiring
629 # verification later on
630 self.needed_locks[locking.LEVEL_NODEGROUP] = \
631 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
633 elif level == locking.LEVEL_NODE:
634 # If an allocator is used, then we lock all the nodes in the current
635 # instance group, as we don't know yet which ones will be selected;
636 # if we replace the nodes without using an allocator, locks are
637 # already declared in ExpandNames; otherwise, we need to lock all the
638 # instance nodes for disk re-creation
639 if self.op.iallocator:
640 assert not self.op.nodes
641 assert not self.needed_locks[locking.LEVEL_NODE]
642 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
644 # Lock member nodes of the group of the primary node
645 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
646 self.needed_locks[locking.LEVEL_NODE].extend(
647 self.cfg.GetNodeGroup(group_uuid).members)
649 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
650 elif not self.op.nodes:
651 self._LockInstancesNodes(primary_only=False)
652 elif level == locking.LEVEL_NODE_RES:
654 self.needed_locks[locking.LEVEL_NODE_RES] = \
655 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
657 def BuildHooksEnv(self):
660 This runs on master, primary and secondary nodes of the instance.
663 return BuildInstanceHookEnvByObject(self, self.instance)
665 def BuildHooksNodes(self):
666 """Build hooks nodes.
669 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
672 def CheckPrereq(self):
673 """Check prerequisites.
675 This checks that the instance is in the cluster and is not running.
678 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
679 assert instance is not None, \
680 "Cannot retrieve locked instance %s" % self.op.instance_name
682 if len(self.op.nodes) != len(instance.all_nodes):
683 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
684 " %d replacement nodes were specified" %
685 (instance.name, len(instance.all_nodes),
688 assert instance.disk_template != constants.DT_DRBD8 or \
689 len(self.op.nodes) == 2
690 assert instance.disk_template != constants.DT_PLAIN or \
691 len(self.op.nodes) == 1
692 primary_node = self.op.nodes[0]
694 primary_node = instance.primary_node
695 if not self.op.iallocator:
696 CheckNodeOnline(self, primary_node)
698 if instance.disk_template == constants.DT_DISKLESS:
699 raise errors.OpPrereqError("Instance '%s' has no disks" %
700 self.op.instance_name, errors.ECODE_INVAL)
702 # Verify if node group locks are still correct
703 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
705 # Node group locks are acquired only for the primary node (and only
706 # when the allocator is used)
707 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
710 # if we replace nodes *and* the old primary is offline, we don't
711 # check the instance state
712 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
713 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
714 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
715 msg="cannot recreate disks")
718 self.disks = dict(self.op.disks)
720 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
722 maxidx = max(self.disks.keys())
723 if maxidx >= len(instance.disks):
724 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
727 if ((self.op.nodes or self.op.iallocator) and
728 sorted(self.disks.keys()) != range(len(instance.disks))):
729 raise errors.OpPrereqError("Can't recreate disks partially and"
730 " change the nodes at the same time",
733 self.instance = instance
735 if self.op.iallocator:
737 # Release unneeded node and node resource locks
738 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
739 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
740 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
742 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
744 def Exec(self, feedback_fn):
745 """Recreate the disks.
748 instance = self.instance
750 assert (self.owned_locks(locking.LEVEL_NODE) ==
751 self.owned_locks(locking.LEVEL_NODE_RES))
754 mods = [] # keeps track of needed changes
756 for idx, disk in enumerate(instance.disks):
758 changes = self.disks[idx]
760 # Disk should not be recreated
764 # update secondaries for disks, if needed
765 if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
766 # need to update the nodes and minors
767 assert len(self.op.nodes) == 2
768 assert len(disk.logical_id) == 6 # otherwise disk internals
770 (_, _, old_port, _, _, old_secret) = disk.logical_id
771 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
772 new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
773 new_minors[0], new_minors[1], old_secret)
774 assert len(disk.logical_id) == len(new_id)
778 mods.append((idx, new_id, changes))
780 # now that we have passed all asserts above, we can apply the mods
781 # in a single run (to avoid partial changes)
782 for idx, new_id, changes in mods:
783 disk = instance.disks[idx]
784 if new_id is not None:
785 assert disk.dev_type == constants.LD_DRBD8
786 disk.logical_id = new_id
788 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
789 mode=changes.get(constants.IDISK_MODE, None))
791 # change primary node, if needed
793 instance.primary_node = self.op.nodes[0]
794 self.LogWarning("Changing the instance's nodes, you will have to"
795 " remove any disks left on the older nodes manually")
798 self.cfg.Update(instance, feedback_fn)
800 # All touched nodes must be locked
801 mylocks = self.owned_locks(locking.LEVEL_NODE)
802 assert mylocks.issuperset(frozenset(instance.all_nodes))
803 CreateDisks(self, instance, to_skip=to_skip)
806 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
807 """Checks if nodes have enough free disk space in the specified VG.
809 This function checks if all given nodes have the needed amount of
810 free disk. In case any node has less disk or we cannot get the
811 information from the node, this function raises an OpPrereqError
814 @type lu: C{LogicalUnit}
815 @param lu: a logical unit from which we get configuration data
816 @type nodenames: C{list}
817 @param nodenames: the list of node names to check
819 @param vg: the volume group to check
820 @type requested: C{int}
821 @param requested: the amount of disk in MiB to check for
822 @raise errors.OpPrereqError: if the node doesn't have enough disk,
823 or we cannot check the node
826 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
827 nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
828 for node in nodenames:
829 info = nodeinfo[node]
830 info.Raise("Cannot get current information from node %s" % node,
831 prereq=True, ecode=errors.ECODE_ENVIRON)
832 (_, (vg_info, ), _) = info.payload
833 vg_free = vg_info.get("vg_free", None)
834 if not isinstance(vg_free, int):
835 raise errors.OpPrereqError("Can't compute free disk space on node"
836 " %s for vg %s, result was '%s'" %
837 (node, vg, vg_free), errors.ECODE_ENVIRON)
838 if requested > vg_free:
839 raise errors.OpPrereqError("Not enough disk space on target node %s"
840 " vg %s: required %d MiB, available %d MiB" %
841 (node, vg, requested, vg_free),
845 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
846 """Checks if nodes have enough free disk space in all the VGs.
848 This function checks if all given nodes have the needed amount of
849 free disk. In case any node has less disk or we cannot get the
850 information from the node, this function raises an OpPrereqError
853 @type lu: C{LogicalUnit}
854 @param lu: a logical unit from which we get configuration data
855 @type nodenames: C{list}
856 @param nodenames: the list of node names to check
857 @type req_sizes: C{dict}
858 @param req_sizes: the hash of vg and corresponding amount of disk in
860 @raise errors.OpPrereqError: if the node doesn't have enough disk,
861 or we cannot check the node
864 for vg, req_size in req_sizes.items():
865 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
868 def _DiskSizeInBytesToMebibytes(lu, size):
869 """Converts a disk size in bytes to mebibytes.
871 Warns and rounds up if the size isn't an even multiple of 1 MiB.
874 (mib, remainder) = divmod(size, 1024 * 1024)
877 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
878 " to not overwrite existing data (%s bytes will not be"
879 " wiped)", (1024 * 1024) - remainder)
885 def _CalcEta(time_taken, written, total_size):
886 """Calculates the ETA based on size written and total size.
888 @param time_taken: The time taken so far
889 @param written: amount written so far
890 @param total_size: The total size of data to be written
891 @return: The remaining time in seconds
894 avg_time = time_taken / float(written)
895 return (total_size - written) * avg_time
898 def WipeDisks(lu, instance, disks=None):
899 """Wipes instance disks.
901 @type lu: L{LogicalUnit}
902 @param lu: the logical unit on whose behalf we execute
903 @type instance: L{objects.Instance}
904 @param instance: the instance whose disks we should create
905 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
906 @param disks: Disk details; tuple contains disk index, disk object and the
910 node = instance.primary_node
913 disks = [(idx, disk, 0)
914 for (idx, disk) in enumerate(instance.disks)]
916 for (_, device, _) in disks:
917 lu.cfg.SetDiskID(device, node)
919 logging.info("Pausing synchronization of disks of instance '%s'",
921 result = lu.rpc.call_blockdev_pause_resume_sync(node,
922 (map(compat.snd, disks),
925 result.Raise("Failed to pause disk synchronization on node '%s'" % node)
927 for idx, success in enumerate(result.payload):
929 logging.warn("Pausing synchronization of disk %s of instance '%s'"
930 " failed", idx, instance.name)
933 for (idx, device, offset) in disks:
934 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
935 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
937 int(min(constants.MAX_WIPE_CHUNK,
938 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
942 start_time = time.time()
947 info_text = (" (from %s to %s)" %
948 (utils.FormatUnit(offset, "h"),
949 utils.FormatUnit(size, "h")))
951 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
953 logging.info("Wiping disk %d for instance %s on node %s using"
954 " chunk size %s", idx, instance.name, node, wipe_chunk_size)
957 wipe_size = min(wipe_chunk_size, size - offset)
959 logging.debug("Wiping disk %d, offset %s, chunk %s",
960 idx, offset, wipe_size)
962 result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
964 result.Raise("Could not wipe disk %d at offset %d for size %d" %
965 (idx, offset, wipe_size))
969 if now - last_output >= 60:
970 eta = _CalcEta(now - start_time, offset, size)
971 lu.LogInfo(" - done: %.1f%% ETA: %s",
972 offset / float(size) * 100, utils.FormatSeconds(eta))
975 logging.info("Resuming synchronization of disks for instance '%s'",
978 result = lu.rpc.call_blockdev_pause_resume_sync(node,
979 (map(compat.snd, disks),
984 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
985 node, result.fail_msg)
987 for idx, success in enumerate(result.payload):
989 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
990 " failed", idx, instance.name)
993 def ExpandCheckDisks(instance, disks):
994 """Return the instance disks selected by the disks list
996 @type disks: list of L{objects.Disk} or None
997 @param disks: selected disks
998 @rtype: list of L{objects.Disk}
999 @return: selected instance disks to act on
1003 return instance.disks
1005 if not set(disks).issubset(instance.disks):
1006 raise errors.ProgrammerError("Can only act on disks belonging to the"
1011 def WaitForSync(lu, instance, disks=None, oneshot=False):
1012 """Sleep and poll for an instance's disk to sync.
1015 if not instance.disks or disks is not None and not disks:
1018 disks = ExpandCheckDisks(instance, disks)
1021 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1023 node = instance.primary_node
1026 lu.cfg.SetDiskID(dev, node)
1028 # TODO: Convert to utils.Retry
1031 degr_retries = 10 # in seconds, as we sleep 1 second each time
1035 cumul_degraded = False
1036 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1037 msg = rstats.fail_msg
1039 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1042 raise errors.RemoteError("Can't contact node %s for mirror data,"
1043 " aborting." % node)
1046 rstats = rstats.payload
1048 for i, mstat in enumerate(rstats):
1050 lu.LogWarning("Can't compute data for node %s/%s",
1051 node, disks[i].iv_name)
1054 cumul_degraded = (cumul_degraded or
1055 (mstat.is_degraded and mstat.sync_percent is None))
1056 if mstat.sync_percent is not None:
1058 if mstat.estimated_time is not None:
1059 rem_time = ("%s remaining (estimated)" %
1060 utils.FormatSeconds(mstat.estimated_time))
1061 max_time = mstat.estimated_time
1063 rem_time = "no time estimate"
1064 lu.LogInfo("- device %s: %5.2f%% done, %s",
1065 disks[i].iv_name, mstat.sync_percent, rem_time)
1067 # if we're done but degraded, let's do a few small retries, to
1068 # make sure we see a stable and not transient situation; therefore
1069 # we force restart of the loop
1070 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1071 logging.info("Degraded disks found, %d retries left", degr_retries)
1079 time.sleep(min(60, max_time))
1082 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1084 return not cumul_degraded
1087 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1088 """Shutdown block devices of an instance.
1090 This does the shutdown on all nodes of the instance.
1092 If the ignore_primary is false, errors on the primary node are
1097 disks = ExpandCheckDisks(instance, disks)
1100 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1101 lu.cfg.SetDiskID(top_disk, node)
1102 result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1103 msg = result.fail_msg
1105 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1106 disk.iv_name, node, msg)
1107 if ((node == instance.primary_node and not ignore_primary) or
1108 (node != instance.primary_node and not result.offline)):
1113 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1114 """Shutdown block devices of an instance.
1116 This function checks if an instance is running, before calling
1117 _ShutdownInstanceDisks.
1120 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1121 ShutdownInstanceDisks(lu, instance, disks=disks)
1124 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1126 """Prepare the block devices for an instance.
1128 This sets up the block devices on all nodes.
1130 @type lu: L{LogicalUnit}
1131 @param lu: the logical unit on whose behalf we execute
1132 @type instance: L{objects.Instance}
1133 @param instance: the instance for whose disks we assemble
1134 @type disks: list of L{objects.Disk} or None
1135 @param disks: which disks to assemble (or all, if None)
1136 @type ignore_secondaries: boolean
1137 @param ignore_secondaries: if true, errors on secondary nodes
1138 won't result in an error return from the function
1139 @type ignore_size: boolean
1140 @param ignore_size: if true, the current known size of the disk
1141 will not be used during the disk activation, useful for cases
1142 when the size is wrong
1143 @return: False if the operation failed, otherwise a list of
1144 (host, instance_visible_name, node_visible_name)
1145 with the mapping from node devices to instance devices
1150 iname = instance.name
1151 disks = ExpandCheckDisks(instance, disks)
1153 # With the two passes mechanism we try to reduce the window of
1154 # opportunity for the race condition of switching DRBD to primary
1155 # before handshaking occured, but we do not eliminate it
1157 # The proper fix would be to wait (with some limits) until the
1158 # connection has been made and drbd transitions from WFConnection
1159 # into any other network-connected state (Connected, SyncTarget,
1162 # 1st pass, assemble on all nodes in secondary mode
1163 for idx, inst_disk in enumerate(disks):
1164 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1166 node_disk = node_disk.Copy()
1167 node_disk.UnsetSize()
1168 lu.cfg.SetDiskID(node_disk, node)
1169 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1171 msg = result.fail_msg
1173 is_offline_secondary = (node in instance.secondary_nodes and
1175 lu.LogWarning("Could not prepare block device %s on node %s"
1176 " (is_primary=False, pass=1): %s",
1177 inst_disk.iv_name, node, msg)
1178 if not (ignore_secondaries or is_offline_secondary):
1181 # FIXME: race condition on drbd migration to primary
1183 # 2nd pass, do only the primary node
1184 for idx, inst_disk in enumerate(disks):
1187 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1188 if node != instance.primary_node:
1191 node_disk = node_disk.Copy()
1192 node_disk.UnsetSize()
1193 lu.cfg.SetDiskID(node_disk, node)
1194 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1196 msg = result.fail_msg
1198 lu.LogWarning("Could not prepare block device %s on node %s"
1199 " (is_primary=True, pass=2): %s",
1200 inst_disk.iv_name, node, msg)
1203 dev_path = result.payload
1205 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1207 # leave the disks configured for the primary node
1208 # this is a workaround that would be fixed better by
1209 # improving the logical/physical id handling
1211 lu.cfg.SetDiskID(disk, instance.primary_node)
1213 return disks_ok, device_info
1216 def StartInstanceDisks(lu, instance, force):
1217 """Start the disks of an instance.
1220 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1221 ignore_secondaries=force)
1223 ShutdownInstanceDisks(lu, instance)
1224 if force is not None and not force:
1226 hint=("If the message above refers to a secondary node,"
1227 " you can retry the operation using '--force'"))
1228 raise errors.OpExecError("Disk consistency error")
1231 class LUInstanceGrowDisk(LogicalUnit):
1232 """Grow a disk of an instance.
1236 HTYPE = constants.HTYPE_INSTANCE
1239 def ExpandNames(self):
1240 self._ExpandAndLockInstance()
1241 self.needed_locks[locking.LEVEL_NODE] = []
1242 self.needed_locks[locking.LEVEL_NODE_RES] = []
1243 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1244 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1246 def DeclareLocks(self, level):
1247 if level == locking.LEVEL_NODE:
1248 self._LockInstancesNodes()
1249 elif level == locking.LEVEL_NODE_RES:
1251 self.needed_locks[locking.LEVEL_NODE_RES] = \
1252 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1254 def BuildHooksEnv(self):
1257 This runs on the master, the primary and all the secondaries.
1261 "DISK": self.op.disk,
1262 "AMOUNT": self.op.amount,
1263 "ABSOLUTE": self.op.absolute,
1265 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1268 def BuildHooksNodes(self):
1269 """Build hooks nodes.
1272 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1275 def CheckPrereq(self):
1276 """Check prerequisites.
1278 This checks that the instance is in the cluster.
1281 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1282 assert instance is not None, \
1283 "Cannot retrieve locked instance %s" % self.op.instance_name
1284 nodenames = list(instance.all_nodes)
1285 for node in nodenames:
1286 CheckNodeOnline(self, node)
1288 self.instance = instance
1290 if instance.disk_template not in constants.DTS_GROWABLE:
1291 raise errors.OpPrereqError("Instance's disk layout does not support"
1292 " growing", errors.ECODE_INVAL)
1294 self.disk = instance.FindDisk(self.op.disk)
1296 if self.op.absolute:
1297 self.target = self.op.amount
1298 self.delta = self.target - self.disk.size
1300 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1301 "current disk size (%s)" %
1302 (utils.FormatUnit(self.target, "h"),
1303 utils.FormatUnit(self.disk.size, "h")),
1306 self.delta = self.op.amount
1307 self.target = self.disk.size + self.delta
1309 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1310 utils.FormatUnit(self.delta, "h"),
1313 self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1315 def _CheckDiskSpace(self, nodenames, req_vgspace):
1316 template = self.instance.disk_template
1317 if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1318 # TODO: check the free disk space for file, when that feature will be
1320 nodes = map(self.cfg.GetNodeInfo, nodenames)
1321 es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1324 # With exclusive storage we need to something smarter than just looking
1325 # at free space; for now, let's simply abort the operation.
1326 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1327 " is enabled", errors.ECODE_STATE)
1328 CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1330 def Exec(self, feedback_fn):
1331 """Execute disk grow.
1334 instance = self.instance
1337 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1338 assert (self.owned_locks(locking.LEVEL_NODE) ==
1339 self.owned_locks(locking.LEVEL_NODE_RES))
1341 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1343 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1345 raise errors.OpExecError("Cannot activate block device to grow")
1347 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1348 (self.op.disk, instance.name,
1349 utils.FormatUnit(self.delta, "h"),
1350 utils.FormatUnit(self.target, "h")))
1352 # First run all grow ops in dry-run mode
1353 for node in instance.all_nodes:
1354 self.cfg.SetDiskID(disk, node)
1355 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1357 result.Raise("Dry-run grow request failed to node %s" % node)
1360 # Get disk size from primary node for wiping
1361 result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1362 result.Raise("Failed to retrieve disk size from node '%s'" %
1363 instance.primary_node)
1365 (disk_size_in_bytes, ) = result.payload
1367 if disk_size_in_bytes is None:
1368 raise errors.OpExecError("Failed to retrieve disk size from primary"
1369 " node '%s'" % instance.primary_node)
1371 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1373 assert old_disk_size >= disk.size, \
1374 ("Retrieved disk size too small (got %s, should be at least %s)" %
1375 (old_disk_size, disk.size))
1377 old_disk_size = None
1379 # We know that (as far as we can test) operations across different
1380 # nodes will succeed, time to run it for real on the backing storage
1381 for node in instance.all_nodes:
1382 self.cfg.SetDiskID(disk, node)
1383 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1385 result.Raise("Grow request failed to node %s" % node)
1387 # And now execute it for logical storage, on the primary node
1388 node = instance.primary_node
1389 self.cfg.SetDiskID(disk, node)
1390 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1392 result.Raise("Grow request failed to node %s" % node)
1394 disk.RecordGrow(self.delta)
1395 self.cfg.Update(instance, feedback_fn)
1397 # Changes have been recorded, release node lock
1398 ReleaseLocks(self, locking.LEVEL_NODE)
1400 # Downgrade lock while waiting for sync
1401 self.glm.downgrade(locking.LEVEL_INSTANCE)
1403 assert wipe_disks ^ (old_disk_size is None)
1406 assert instance.disks[self.op.disk] == disk
1408 # Wipe newly added disk space
1409 WipeDisks(self, instance,
1410 disks=[(self.op.disk, disk, old_disk_size)])
1412 if self.op.wait_for_sync:
1413 disk_abort = not WaitForSync(self, instance, disks=[disk])
1415 self.LogWarning("Disk syncing has not returned a good status; check"
1417 if instance.admin_state != constants.ADMINST_UP:
1418 _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1419 elif instance.admin_state != constants.ADMINST_UP:
1420 self.LogWarning("Not shutting down the disk even if the instance is"
1421 " not supposed to be running because no wait for"
1422 " sync mode was requested")
1424 assert self.owned_locks(locking.LEVEL_NODE_RES)
1425 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1428 class LUInstanceReplaceDisks(LogicalUnit):
1429 """Replace the disks of an instance.
1432 HPATH = "mirrors-replace"
1433 HTYPE = constants.HTYPE_INSTANCE
1436 def CheckArguments(self):
1440 remote_node = self.op.remote_node
1441 ialloc = self.op.iallocator
1442 if self.op.mode == constants.REPLACE_DISK_CHG:
1443 if remote_node is None and ialloc is None:
1444 raise errors.OpPrereqError("When changing the secondary either an"
1445 " iallocator script must be used or the"
1446 " new node given", errors.ECODE_INVAL)
1448 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1450 elif remote_node is not None or ialloc is not None:
1451 # Not replacing the secondary
1452 raise errors.OpPrereqError("The iallocator and new node options can"
1453 " only be used when changing the"
1454 " secondary node", errors.ECODE_INVAL)
1456 def ExpandNames(self):
1457 self._ExpandAndLockInstance()
1459 assert locking.LEVEL_NODE not in self.needed_locks
1460 assert locking.LEVEL_NODE_RES not in self.needed_locks
1461 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1463 assert self.op.iallocator is None or self.op.remote_node is None, \
1464 "Conflicting options"
1466 if self.op.remote_node is not None:
1467 self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1469 # Warning: do not remove the locking of the new secondary here
1470 # unless DRBD8.AddChildren is changed to work in parallel;
1471 # currently it doesn't since parallel invocations of
1472 # FindUnusedMinor will conflict
1473 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1474 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1476 self.needed_locks[locking.LEVEL_NODE] = []
1477 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1479 if self.op.iallocator is not None:
1480 # iallocator will select a new node in the same group
1481 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1482 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1484 self.needed_locks[locking.LEVEL_NODE_RES] = []
1486 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1487 self.op.iallocator, self.op.remote_node,
1488 self.op.disks, self.op.early_release,
1489 self.op.ignore_ipolicy)
1491 self.tasklets = [self.replacer]
1493 def DeclareLocks(self, level):
1494 if level == locking.LEVEL_NODEGROUP:
1495 assert self.op.remote_node is None
1496 assert self.op.iallocator is not None
1497 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1499 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1500 # Lock all groups used by instance optimistically; this requires going
1501 # via the node before it's locked, requiring verification later on
1502 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1503 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1505 elif level == locking.LEVEL_NODE:
1506 if self.op.iallocator is not None:
1507 assert self.op.remote_node is None
1508 assert not self.needed_locks[locking.LEVEL_NODE]
1509 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1511 # Lock member nodes of all locked groups
1512 self.needed_locks[locking.LEVEL_NODE] = \
1514 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1515 for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1517 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1519 self._LockInstancesNodes()
1521 elif level == locking.LEVEL_NODE_RES:
1523 self.needed_locks[locking.LEVEL_NODE_RES] = \
1524 self.needed_locks[locking.LEVEL_NODE]
1526 def BuildHooksEnv(self):
1529 This runs on the master, the primary and all the secondaries.
1532 instance = self.replacer.instance
1534 "MODE": self.op.mode,
1535 "NEW_SECONDARY": self.op.remote_node,
1536 "OLD_SECONDARY": instance.secondary_nodes[0],
1538 env.update(BuildInstanceHookEnvByObject(self, instance))
1541 def BuildHooksNodes(self):
1542 """Build hooks nodes.
1545 instance = self.replacer.instance
1547 self.cfg.GetMasterNode(),
1548 instance.primary_node,
1550 if self.op.remote_node is not None:
1551 nl.append(self.op.remote_node)
1554 def CheckPrereq(self):
1555 """Check prerequisites.
1558 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1559 self.op.iallocator is None)
1561 # Verify if node group locks are still correct
1562 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1564 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1566 return LogicalUnit.CheckPrereq(self)
1569 class LUInstanceActivateDisks(NoHooksLU):
1570 """Bring up an instance's disks.
1575 def ExpandNames(self):
1576 self._ExpandAndLockInstance()
1577 self.needed_locks[locking.LEVEL_NODE] = []
1578 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1580 def DeclareLocks(self, level):
1581 if level == locking.LEVEL_NODE:
1582 self._LockInstancesNodes()
1584 def CheckPrereq(self):
1585 """Check prerequisites.
1587 This checks that the instance is in the cluster.
1590 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1591 assert self.instance is not None, \
1592 "Cannot retrieve locked instance %s" % self.op.instance_name
1593 CheckNodeOnline(self, self.instance.primary_node)
1595 def Exec(self, feedback_fn):
1596 """Activate the disks.
1599 disks_ok, disks_info = \
1600 AssembleInstanceDisks(self, self.instance,
1601 ignore_size=self.op.ignore_size)
1603 raise errors.OpExecError("Cannot activate block devices")
1605 if self.op.wait_for_sync:
1606 if not WaitForSync(self, self.instance):
1607 raise errors.OpExecError("Some disks of the instance are degraded!")
1612 class LUInstanceDeactivateDisks(NoHooksLU):
1613 """Shutdown an instance's disks.
1618 def ExpandNames(self):
1619 self._ExpandAndLockInstance()
1620 self.needed_locks[locking.LEVEL_NODE] = []
1621 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1623 def DeclareLocks(self, level):
1624 if level == locking.LEVEL_NODE:
1625 self._LockInstancesNodes()
1627 def CheckPrereq(self):
1628 """Check prerequisites.
1630 This checks that the instance is in the cluster.
1633 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1634 assert self.instance is not None, \
1635 "Cannot retrieve locked instance %s" % self.op.instance_name
1637 def Exec(self, feedback_fn):
1638 """Deactivate the disks
1641 instance = self.instance
1643 ShutdownInstanceDisks(self, instance)
1645 _SafeShutdownInstanceDisks(self, instance)
1648 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1650 """Check that mirrors are not degraded.
1652 @attention: The device has to be annotated already.
1654 The ldisk parameter, if True, will change the test from the
1655 is_degraded attribute (which represents overall non-ok status for
1656 the device(s)) to the ldisk (representing the local storage status).
1659 lu.cfg.SetDiskID(dev, node)
1663 if on_primary or dev.AssembleOnSecondary():
1664 rstats = lu.rpc.call_blockdev_find(node, dev)
1665 msg = rstats.fail_msg
1667 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1669 elif not rstats.payload:
1670 lu.LogWarning("Can't find disk on node %s", node)
1674 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1676 result = result and not rstats.payload.is_degraded
1679 for child in dev.children:
1680 result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1686 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1687 """Wrapper around L{_CheckDiskConsistencyInner}.
1690 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1691 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1695 def _BlockdevFind(lu, node, dev, instance):
1696 """Wrapper around call_blockdev_find to annotate diskparams.
1698 @param lu: A reference to the lu object
1699 @param node: The node to call out
1700 @param dev: The device to find
1701 @param instance: The instance object the device belongs to
1702 @returns The result of the rpc call
1705 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1706 return lu.rpc.call_blockdev_find(node, disk)
1709 def _GenerateUniqueNames(lu, exts):
1710 """Generate a suitable LV name.
1712 This will generate a logical volume name for the given instance.
1717 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1718 results.append("%s%s" % (new_id, val))
1722 class TLReplaceDisks(Tasklet):
1723 """Replaces disks for an instance.
1725 Note: Locking is not within the scope of this class.
1728 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1729 disks, early_release, ignore_ipolicy):
1730 """Initializes this class.
1733 Tasklet.__init__(self, lu)
1736 self.instance_name = instance_name
1738 self.iallocator_name = iallocator_name
1739 self.remote_node = remote_node
1741 self.early_release = early_release
1742 self.ignore_ipolicy = ignore_ipolicy
1745 self.instance = None
1746 self.new_node = None
1747 self.target_node = None
1748 self.other_node = None
1749 self.remote_node_info = None
1750 self.node_secondary_ip = None
1753 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1754 """Compute a new secondary node using an IAllocator.
1757 req = iallocator.IAReqRelocate(name=instance_name,
1758 relocate_from=list(relocate_from))
1759 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1761 ial.Run(iallocator_name)
1764 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1765 " %s" % (iallocator_name, ial.info),
1768 remote_node_name = ial.result[0]
1770 lu.LogInfo("Selected new secondary for instance '%s': %s",
1771 instance_name, remote_node_name)
1773 return remote_node_name
1775 def _FindFaultyDisks(self, node_name):
1776 """Wrapper for L{FindFaultyInstanceDisks}.
1779 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1782 def _CheckDisksActivated(self, instance):
1783 """Checks if the instance disks are activated.
1785 @param instance: The instance to check disks
1786 @return: True if they are activated, False otherwise
1789 nodes = instance.all_nodes
1791 for idx, dev in enumerate(instance.disks):
1793 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1794 self.cfg.SetDiskID(dev, node)
1796 result = _BlockdevFind(self, node, dev, instance)
1800 elif result.fail_msg or not result.payload:
1805 def CheckPrereq(self):
1806 """Check prerequisites.
1808 This checks that the instance is in the cluster.
1811 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1812 assert instance is not None, \
1813 "Cannot retrieve locked instance %s" % self.instance_name
1815 if instance.disk_template != constants.DT_DRBD8:
1816 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1817 " instances", errors.ECODE_INVAL)
1819 if len(instance.secondary_nodes) != 1:
1820 raise errors.OpPrereqError("The instance has a strange layout,"
1821 " expected one secondary but found %d" %
1822 len(instance.secondary_nodes),
1825 instance = self.instance
1826 secondary_node = instance.secondary_nodes[0]
1828 if self.iallocator_name is None:
1829 remote_node = self.remote_node
1831 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1832 instance.name, instance.secondary_nodes)
1834 if remote_node is None:
1835 self.remote_node_info = None
1837 assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1838 "Remote node '%s' is not locked" % remote_node
1840 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1841 assert self.remote_node_info is not None, \
1842 "Cannot retrieve locked node %s" % remote_node
1844 if remote_node == self.instance.primary_node:
1845 raise errors.OpPrereqError("The specified node is the primary node of"
1846 " the instance", errors.ECODE_INVAL)
1848 if remote_node == secondary_node:
1849 raise errors.OpPrereqError("The specified node is already the"
1850 " secondary node of the instance",
1853 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1854 constants.REPLACE_DISK_CHG):
1855 raise errors.OpPrereqError("Cannot specify disks to be replaced",
1858 if self.mode == constants.REPLACE_DISK_AUTO:
1859 if not self._CheckDisksActivated(instance):
1860 raise errors.OpPrereqError("Please run activate-disks on instance %s"
1861 " first" % self.instance_name,
1863 faulty_primary = self._FindFaultyDisks(instance.primary_node)
1864 faulty_secondary = self._FindFaultyDisks(secondary_node)
1866 if faulty_primary and faulty_secondary:
1867 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1868 " one node and can not be repaired"
1869 " automatically" % self.instance_name,
1873 self.disks = faulty_primary
1874 self.target_node = instance.primary_node
1875 self.other_node = secondary_node
1876 check_nodes = [self.target_node, self.other_node]
1877 elif faulty_secondary:
1878 self.disks = faulty_secondary
1879 self.target_node = secondary_node
1880 self.other_node = instance.primary_node
1881 check_nodes = [self.target_node, self.other_node]
1887 # Non-automatic modes
1888 if self.mode == constants.REPLACE_DISK_PRI:
1889 self.target_node = instance.primary_node
1890 self.other_node = secondary_node
1891 check_nodes = [self.target_node, self.other_node]
1893 elif self.mode == constants.REPLACE_DISK_SEC:
1894 self.target_node = secondary_node
1895 self.other_node = instance.primary_node
1896 check_nodes = [self.target_node, self.other_node]
1898 elif self.mode == constants.REPLACE_DISK_CHG:
1899 self.new_node = remote_node
1900 self.other_node = instance.primary_node
1901 self.target_node = secondary_node
1902 check_nodes = [self.new_node, self.other_node]
1904 CheckNodeNotDrained(self.lu, remote_node)
1905 CheckNodeVmCapable(self.lu, remote_node)
1907 old_node_info = self.cfg.GetNodeInfo(secondary_node)
1908 assert old_node_info is not None
1909 if old_node_info.offline and not self.early_release:
1910 # doesn't make sense to delay the release
1911 self.early_release = True
1912 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1913 " early-release mode", secondary_node)
1916 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1919 # If not specified all disks should be replaced
1921 self.disks = range(len(self.instance.disks))
1923 # TODO: This is ugly, but right now we can't distinguish between internal
1924 # submitted opcode and external one. We should fix that.
1925 if self.remote_node_info:
1926 # We change the node, lets verify it still meets instance policy
1927 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1928 cluster = self.cfg.GetClusterInfo()
1929 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1931 CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1932 self.cfg, ignore=self.ignore_ipolicy)
1934 for node in check_nodes:
1935 CheckNodeOnline(self.lu, node)
1937 touched_nodes = frozenset(node_name for node_name in [self.new_node,
1940 if node_name is not None)
1942 # Release unneeded node and node resource locks
1943 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
1944 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
1945 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
1947 # Release any owned node group
1948 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
1950 # Check whether disks are valid
1951 for disk_idx in self.disks:
1952 instance.FindDisk(disk_idx)
1954 # Get secondary node IP addresses
1955 self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
1956 in self.cfg.GetMultiNodeInfo(touched_nodes))
1958 def Exec(self, feedback_fn):
1959 """Execute disk replacement.
1961 This dispatches the disk replacement to the appropriate handler.
1965 # Verify owned locks before starting operation
1966 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
1967 assert set(owned_nodes) == set(self.node_secondary_ip), \
1968 ("Incorrect node locks, owning %s, expected %s" %
1969 (owned_nodes, self.node_secondary_ip.keys()))
1970 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
1971 self.lu.owned_locks(locking.LEVEL_NODE_RES))
1972 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1974 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
1975 assert list(owned_instances) == [self.instance_name], \
1976 "Instance '%s' not locked" % self.instance_name
1978 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
1979 "Should not own any node group lock at this point"
1982 feedback_fn("No disks need replacement for instance '%s'" %
1986 feedback_fn("Replacing disk(s) %s for instance '%s'" %
1987 (utils.CommaJoin(self.disks), self.instance.name))
1988 feedback_fn("Current primary node: %s" % self.instance.primary_node)
1989 feedback_fn("Current seconary node: %s" %
1990 utils.CommaJoin(self.instance.secondary_nodes))
1992 activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
1994 # Activate the instance disks if we're replacing them on a down instance
1996 StartInstanceDisks(self.lu, self.instance, True)
1999 # Should we replace the secondary node?
2000 if self.new_node is not None:
2001 fn = self._ExecDrbd8Secondary
2003 fn = self._ExecDrbd8DiskOnly
2005 result = fn(feedback_fn)
2007 # Deactivate the instance disks if we're replacing them on a
2010 _SafeShutdownInstanceDisks(self.lu, self.instance)
2012 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2015 # Verify owned locks
2016 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2017 nodes = frozenset(self.node_secondary_ip)
2018 assert ((self.early_release and not owned_nodes) or
2019 (not self.early_release and not (set(owned_nodes) - nodes))), \
2020 ("Not owning the correct locks, early_release=%s, owned=%r,"
2021 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2025 def _CheckVolumeGroup(self, nodes):
2026 self.lu.LogInfo("Checking volume groups")
2028 vgname = self.cfg.GetVGName()
2030 # Make sure volume group exists on all involved nodes
2031 results = self.rpc.call_vg_list(nodes)
2033 raise errors.OpExecError("Can't list volume groups on the nodes")
2037 res.Raise("Error checking node %s" % node)
2038 if vgname not in res.payload:
2039 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2042 def _CheckDisksExistence(self, nodes):
2043 # Check disk existence
2044 for idx, dev in enumerate(self.instance.disks):
2045 if idx not in self.disks:
2049 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2050 self.cfg.SetDiskID(dev, node)
2052 result = _BlockdevFind(self, node, dev, self.instance)
2054 msg = result.fail_msg
2055 if msg or not result.payload:
2057 msg = "disk not found"
2058 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2061 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2062 for idx, dev in enumerate(self.instance.disks):
2063 if idx not in self.disks:
2066 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2069 if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2070 on_primary, ldisk=ldisk):
2071 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2072 " replace disks for instance %s" %
2073 (node_name, self.instance.name))
2075 def _CreateNewStorage(self, node_name):
2076 """Create new storage on the primary or secondary node.
2078 This is only used for same-node replaces, not for changing the
2079 secondary node, hence we don't want to modify the existing disk.
2084 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2085 for idx, dev in enumerate(disks):
2086 if idx not in self.disks:
2089 self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2091 self.cfg.SetDiskID(dev, node_name)
2093 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2094 names = _GenerateUniqueNames(self.lu, lv_names)
2096 (data_disk, meta_disk) = dev.children
2097 vg_data = data_disk.logical_id[0]
2098 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2099 logical_id=(vg_data, names[0]),
2100 params=data_disk.params)
2101 vg_meta = meta_disk.logical_id[0]
2102 lv_meta = objects.Disk(dev_type=constants.LD_LV,
2103 size=constants.DRBD_META_SIZE,
2104 logical_id=(vg_meta, names[1]),
2105 params=meta_disk.params)
2107 new_lvs = [lv_data, lv_meta]
2108 old_lvs = [child.Copy() for child in dev.children]
2109 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2110 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2112 # we pass force_create=True to force the LVM creation
2113 for new_lv in new_lvs:
2114 _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2115 GetInstanceInfoText(self.instance), False,
2120 def _CheckDevices(self, node_name, iv_names):
2121 for name, (dev, _, _) in iv_names.iteritems():
2122 self.cfg.SetDiskID(dev, node_name)
2124 result = _BlockdevFind(self, node_name, dev, self.instance)
2126 msg = result.fail_msg
2127 if msg or not result.payload:
2129 msg = "disk not found"
2130 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2133 if result.payload.is_degraded:
2134 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2136 def _RemoveOldStorage(self, node_name, iv_names):
2137 for name, (_, old_lvs, _) in iv_names.iteritems():
2138 self.lu.LogInfo("Remove logical volumes for %s", name)
2141 self.cfg.SetDiskID(lv, node_name)
2143 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2145 self.lu.LogWarning("Can't remove old LV: %s", msg,
2146 hint="remove unused LVs manually")
2148 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2149 """Replace a disk on the primary or secondary for DRBD 8.
2151 The algorithm for replace is quite complicated:
2153 1. for each disk to be replaced:
2155 1. create new LVs on the target node with unique names
2156 1. detach old LVs from the drbd device
2157 1. rename old LVs to name_replaced.<time_t>
2158 1. rename new LVs to old LVs
2159 1. attach the new LVs (with the old names now) to the drbd device
2161 1. wait for sync across all devices
2163 1. for each modified disk:
2165 1. remove old LVs (which have the name name_replaces.<time_t>)
2167 Failures are not very well handled.
2172 # Step: check device activation
2173 self.lu.LogStep(1, steps_total, "Check device existence")
2174 self._CheckDisksExistence([self.other_node, self.target_node])
2175 self._CheckVolumeGroup([self.target_node, self.other_node])
2177 # Step: check other node consistency
2178 self.lu.LogStep(2, steps_total, "Check peer consistency")
2179 self._CheckDisksConsistency(self.other_node,
2180 self.other_node == self.instance.primary_node,
2183 # Step: create new storage
2184 self.lu.LogStep(3, steps_total, "Allocate new storage")
2185 iv_names = self._CreateNewStorage(self.target_node)
2187 # Step: for each lv, detach+rename*2+attach
2188 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2189 for dev, old_lvs, new_lvs in iv_names.itervalues():
2190 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2192 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2194 result.Raise("Can't detach drbd from local storage on node"
2195 " %s for device %s" % (self.target_node, dev.iv_name))
2197 #cfg.Update(instance)
2199 # ok, we created the new LVs, so now we know we have the needed
2200 # storage; as such, we proceed on the target node to rename
2201 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2202 # using the assumption that logical_id == physical_id (which in
2203 # turn is the unique_id on that node)
2205 # FIXME(iustin): use a better name for the replaced LVs
2206 temp_suffix = int(time.time())
2207 ren_fn = lambda d, suff: (d.physical_id[0],
2208 d.physical_id[1] + "_replaced-%s" % suff)
2210 # Build the rename list based on what LVs exist on the node
2211 rename_old_to_new = []
2212 for to_ren in old_lvs:
2213 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2214 if not result.fail_msg and result.payload:
2216 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2218 self.lu.LogInfo("Renaming the old LVs on the target node")
2219 result = self.rpc.call_blockdev_rename(self.target_node,
2221 result.Raise("Can't rename old LVs on node %s" % self.target_node)
2223 # Now we rename the new LVs to the old LVs
2224 self.lu.LogInfo("Renaming the new LVs on the target node")
2225 rename_new_to_old = [(new, old.physical_id)
2226 for old, new in zip(old_lvs, new_lvs)]
2227 result = self.rpc.call_blockdev_rename(self.target_node,
2229 result.Raise("Can't rename new LVs on node %s" % self.target_node)
2231 # Intermediate steps of in memory modifications
2232 for old, new in zip(old_lvs, new_lvs):
2233 new.logical_id = old.logical_id
2234 self.cfg.SetDiskID(new, self.target_node)
2236 # We need to modify old_lvs so that removal later removes the
2237 # right LVs, not the newly added ones; note that old_lvs is a
2239 for disk in old_lvs:
2240 disk.logical_id = ren_fn(disk, temp_suffix)
2241 self.cfg.SetDiskID(disk, self.target_node)
2243 # Now that the new lvs have the old name, we can add them to the device
2244 self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2245 result = self.rpc.call_blockdev_addchildren(self.target_node,
2246 (dev, self.instance), new_lvs)
2247 msg = result.fail_msg
2249 for new_lv in new_lvs:
2250 msg2 = self.rpc.call_blockdev_remove(self.target_node,
2253 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2254 hint=("cleanup manually the unused logical"
2256 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2258 cstep = itertools.count(5)
2260 if self.early_release:
2261 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2262 self._RemoveOldStorage(self.target_node, iv_names)
2263 # TODO: Check if releasing locks early still makes sense
2264 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2266 # Release all resource locks except those used by the instance
2267 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2268 keep=self.node_secondary_ip.keys())
2270 # Release all node locks while waiting for sync
2271 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2273 # TODO: Can the instance lock be downgraded here? Take the optional disk
2274 # shutdown in the caller into consideration.
2277 # This can fail as the old devices are degraded and _WaitForSync
2278 # does a combined result over all disks, so we don't check its return value
2279 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2280 WaitForSync(self.lu, self.instance)
2282 # Check all devices manually
2283 self._CheckDevices(self.instance.primary_node, iv_names)
2285 # Step: remove old storage
2286 if not self.early_release:
2287 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2288 self._RemoveOldStorage(self.target_node, iv_names)
2290 def _ExecDrbd8Secondary(self, feedback_fn):
2291 """Replace the secondary node for DRBD 8.
2293 The algorithm for replace is quite complicated:
2294 - for all disks of the instance:
2295 - create new LVs on the new node with same names
2296 - shutdown the drbd device on the old secondary
2297 - disconnect the drbd network on the primary
2298 - create the drbd device on the new secondary
2299 - network attach the drbd on the primary, using an artifice:
2300 the drbd code for Attach() will connect to the network if it
2301 finds a device which is connected to the good local disks but
2303 - wait for sync across all devices
2304 - remove all disks from the old secondary
2306 Failures are not very well handled.
2311 pnode = self.instance.primary_node
2313 # Step: check device activation
2314 self.lu.LogStep(1, steps_total, "Check device existence")
2315 self._CheckDisksExistence([self.instance.primary_node])
2316 self._CheckVolumeGroup([self.instance.primary_node])
2318 # Step: check other node consistency
2319 self.lu.LogStep(2, steps_total, "Check peer consistency")
2320 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2322 # Step: create new storage
2323 self.lu.LogStep(3, steps_total, "Allocate new storage")
2324 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2325 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2326 for idx, dev in enumerate(disks):
2327 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2328 (self.new_node, idx))
2329 # we pass force_create=True to force LVM creation
2330 for new_lv in dev.children:
2331 _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2332 True, GetInstanceInfoText(self.instance), False,
2335 # Step 4: dbrd minors and drbd setups changes
2336 # after this, we must manually remove the drbd minors on both the
2337 # error and the success paths
2338 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2339 minors = self.cfg.AllocateDRBDMinor([self.new_node
2340 for dev in self.instance.disks],
2342 logging.debug("Allocated minors %r", minors)
2345 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2346 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2347 (self.new_node, idx))
2348 # create new devices on new_node; note that we create two IDs:
2349 # one without port, so the drbd will be activated without
2350 # networking information on the new node at this stage, and one
2351 # with network, for the latter activation in step 4
2352 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2353 if self.instance.primary_node == o_node1:
2356 assert self.instance.primary_node == o_node2, "Three-node instance?"
2359 new_alone_id = (self.instance.primary_node, self.new_node, None,
2360 p_minor, new_minor, o_secret)
2361 new_net_id = (self.instance.primary_node, self.new_node, o_port,
2362 p_minor, new_minor, o_secret)
2364 iv_names[idx] = (dev, dev.children, new_net_id)
2365 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2367 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2368 logical_id=new_alone_id,
2369 children=dev.children,
2372 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2375 CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2377 GetInstanceInfoText(self.instance), False,
2379 except errors.GenericError:
2380 self.cfg.ReleaseDRBDMinors(self.instance.name)
2383 # We have new devices, shutdown the drbd on the old secondary
2384 for idx, dev in enumerate(self.instance.disks):
2385 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2386 self.cfg.SetDiskID(dev, self.target_node)
2387 msg = self.rpc.call_blockdev_shutdown(self.target_node,
2388 (dev, self.instance)).fail_msg
2390 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2391 "node: %s" % (idx, msg),
2392 hint=("Please cleanup this device manually as"
2393 " soon as possible"))
2395 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2396 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2397 self.instance.disks)[pnode]
2399 msg = result.fail_msg
2401 # detaches didn't succeed (unlikely)
2402 self.cfg.ReleaseDRBDMinors(self.instance.name)
2403 raise errors.OpExecError("Can't detach the disks from the network on"
2404 " old node: %s" % (msg,))
2406 # if we managed to detach at least one, we update all the disks of
2407 # the instance to point to the new secondary
2408 self.lu.LogInfo("Updating instance configuration")
2409 for dev, _, new_logical_id in iv_names.itervalues():
2410 dev.logical_id = new_logical_id
2411 self.cfg.SetDiskID(dev, self.instance.primary_node)
2413 self.cfg.Update(self.instance, feedback_fn)
2415 # Release all node locks (the configuration has been updated)
2416 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2418 # and now perform the drbd attach
2419 self.lu.LogInfo("Attaching primary drbds to new secondary"
2420 " (standalone => connected)")
2421 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2423 self.node_secondary_ip,
2424 (self.instance.disks, self.instance),
2427 for to_node, to_result in result.items():
2428 msg = to_result.fail_msg
2430 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2432 hint=("please do a gnt-instance info to see the"
2433 " status of disks"))
2435 cstep = itertools.count(5)
2437 if self.early_release:
2438 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2439 self._RemoveOldStorage(self.target_node, iv_names)
2440 # TODO: Check if releasing locks early still makes sense
2441 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2443 # Release all resource locks except those used by the instance
2444 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2445 keep=self.node_secondary_ip.keys())
2447 # TODO: Can the instance lock be downgraded here? Take the optional disk
2448 # shutdown in the caller into consideration.
2451 # This can fail as the old devices are degraded and _WaitForSync
2452 # does a combined result over all disks, so we don't check its return value
2453 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2454 WaitForSync(self.lu, self.instance)
2456 # Check all devices manually
2457 self._CheckDevices(self.instance.primary_node, iv_names)
2459 # Step: remove old storage
2460 if not self.early_release:
2461 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2462 self._RemoveOldStorage(self.target_node, iv_names)