4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with storage of instances."""
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48 import ganeti.masterd.instance
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59 constants.DT_PLAIN: constants.LD_LV,
60 constants.DT_FILE: constants.LD_FILE,
61 constants.DT_SHARED_FILE: constants.LD_FILE,
62 constants.DT_BLOCK: constants.LD_BLOCKDEV,
63 constants.DT_RBD: constants.LD_RBD,
64 constants.DT_EXT: constants.LD_EXT,
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
70 """Create a single block device on a given node.
72 This will not recurse over children of the device, so they must be
75 @param lu: the lu on whose behalf we execute
76 @param node: the node on which to create the device
77 @type instance: L{objects.Instance}
78 @param instance: the instance which owns the device
79 @type device: L{objects.Disk}
80 @param device: the device to create
81 @param info: the extra 'metadata' we should attach to the device
82 (this will be represented as a LVM tag)
83 @type force_open: boolean
84 @param force_open: this parameter will be passes to the
85 L{backend.BlockdevCreate} function where it specifies
86 whether we run on primary or not, and it affects both
87 the child assembly and the device own Open() execution
88 @type excl_stor: boolean
89 @param excl_stor: Whether exclusive_storage is active for the node
92 lu.cfg.SetDiskID(device, node)
93 result = lu.rpc.call_blockdev_create(node, device, device.size,
94 instance.name, force_open, info,
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device, node, instance.name))
98 if device.physical_id is None:
99 device.physical_id = result.payload
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node.
106 If this device type has to be created on secondaries, create it and
109 If not, just recurse to children keeping the same 'force' value.
111 @attention: The device has to be annotated already.
113 @param lu: the lu on whose behalf we execute
114 @param node: the node on which to create the device
115 @type instance: L{objects.Instance}
116 @param instance: the instance which owns the device
117 @type device: L{objects.Disk}
118 @param device: the device to create
119 @type force_create: boolean
120 @param force_create: whether to force creation of this device; this
121 will be change to True whenever we find a device which has
122 CreateOnSecondary() attribute
123 @param info: the extra 'metadata' we should attach to the device
124 (this will be represented as a LVM tag)
125 @type force_open: boolean
126 @param force_open: this parameter will be passes to the
127 L{backend.BlockdevCreate} function where it specifies
128 whether we run on primary or not, and it affects both
129 the child assembly and the device own Open() execution
130 @type excl_stor: boolean
131 @param excl_stor: Whether exclusive_storage is active for the node
133 @return: list of created devices
137 if device.CreateOnSecondary():
141 for child in device.children:
142 devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143 info, force_open, excl_stor)
144 created_devices.extend(devs)
147 return created_devices
149 CreateSingleBlockDev(lu, node, instance, device, info, force_open,
151 # The device has been completely created, so there is no point in keeping
152 # its subdevices in the list. We just add the device itself instead.
153 created_devices = [(node, device)]
154 return created_devices
156 except errors.DeviceCreationError, e:
157 e.created_devices.extend(created_devices)
159 except errors.OpExecError, e:
160 raise errors.DeviceCreationError(str(e), created_devices)
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164 """Whether exclusive_storage is in effect for the given node.
166 @type cfg: L{config.ConfigWriter}
167 @param cfg: The cluster configuration
168 @type nodename: string
169 @param nodename: The node
171 @return: The effective value of exclusive_storage
172 @raise errors.OpPrereqError: if no node exists with the given name
175 ni = cfg.GetNodeInfo(nodename)
177 raise errors.OpPrereqError("Invalid node name %s" % nodename,
179 return IsExclusiveStorageEnabledNode(cfg, ni)
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
184 """Wrapper around L{_CreateBlockDevInner}.
186 This method annotates the root device first.
189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190 excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191 return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192 force_open, excl_stor)
195 def _UndoCreateDisks(lu, disks_created):
196 """Undo the work performed by L{CreateDisks}.
198 This function is called in case of an error to undo the work of
201 @type lu: L{LogicalUnit}
202 @param lu: the logical unit on whose behalf we execute
203 @param disks_created: the result returned by L{CreateDisks}
206 for (node, disk) in disks_created:
207 lu.cfg.SetDiskID(disk, node)
208 result = lu.rpc.call_blockdev_remove(node, disk)
210 logging.warning("Failed to remove newly-created disk %s on node %s:"
211 " %s", disk, node, result.fail_msg)
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215 """Create all disks for an instance.
217 This abstracts away some work from AddInstance.
219 @type lu: L{LogicalUnit}
220 @param lu: the logical unit on whose behalf we execute
221 @type instance: L{objects.Instance}
222 @param instance: the instance whose disks we should create
224 @param to_skip: list of indices to skip
225 @type target_node: string
226 @param target_node: if passed, overrides the target node for creation
227 @type disks: list of {objects.Disk}
228 @param disks: the disks to create; if not specified, all the disks of the
230 @return: information about the created disks, to be used to call
232 @raise errors.OpPrereqError: in case of error
235 info = GetInstanceInfoText(instance)
236 if target_node is None:
237 pnode = instance.primary_node
238 all_nodes = instance.all_nodes
244 disks = instance.disks
246 if instance.disk_template in constants.DTS_FILEBASED:
247 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
250 result.Raise("Failed to create directory '%s' on"
251 " node %s" % (file_storage_dir, pnode))
254 for idx, device in enumerate(disks):
255 if to_skip and idx in to_skip:
257 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258 for node in all_nodes:
259 f_create = node == pnode
261 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262 disks_created.append((node, device))
263 except errors.DeviceCreationError, e:
264 logging.warning("Creating disk %s for instance '%s' failed",
266 disks_created.extend(e.created_devices)
267 _UndoCreateDisks(lu, disks_created)
268 raise errors.OpExecError(e.message)
272 def ComputeDiskSizePerVG(disk_template, disks):
273 """Compute disk size requirements in the volume group
276 def _compute(disks, payload):
277 """Universal algorithm.
282 vgs[disk[constants.IDISK_VG]] = \
283 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
287 # Required free disk space as a function of disk and swap space
289 constants.DT_DISKLESS: {},
290 constants.DT_PLAIN: _compute(disks, 0),
291 # 128 MB are added for drbd metadata for each disk
292 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293 constants.DT_FILE: {},
294 constants.DT_SHARED_FILE: {},
297 if disk_template not in req_size_dict:
298 raise errors.ProgrammerError("Disk template '%s' size requirement"
299 " is unknown" % disk_template)
301 return req_size_dict[disk_template]
304 def ComputeDisks(op, default_vg):
305 """Computes the instance disks.
307 @param op: The instance opcode
308 @param default_vg: The default_vg to assume
310 @return: The computed disks
314 for disk in op.disks:
315 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316 if mode not in constants.DISK_ACCESS_SET:
317 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318 mode, errors.ECODE_INVAL)
319 size = disk.get(constants.IDISK_SIZE, None)
321 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
324 except (TypeError, ValueError):
325 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
328 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329 if ext_provider and op.disk_template != constants.DT_EXT:
330 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331 " disk template, not %s" %
332 (constants.IDISK_PROVIDER, constants.DT_EXT,
333 op.disk_template), errors.ECODE_INVAL)
335 data_vg = disk.get(constants.IDISK_VG, default_vg)
336 name = disk.get(constants.IDISK_NAME, None)
337 if name is not None and name.lower() == constants.VALUE_NONE:
340 constants.IDISK_SIZE: size,
341 constants.IDISK_MODE: mode,
342 constants.IDISK_VG: data_vg,
343 constants.IDISK_NAME: name,
346 if constants.IDISK_METAVG in disk:
347 new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348 if constants.IDISK_ADOPT in disk:
349 new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
351 # For extstorage, demand the `provider' option and add any
352 # additional parameters (ext-params) to the dict
353 if op.disk_template == constants.DT_EXT:
355 new_disk[constants.IDISK_PROVIDER] = ext_provider
357 if key not in constants.IDISK_PARAMS:
358 new_disk[key] = disk[key]
360 raise errors.OpPrereqError("Missing provider for template '%s'" %
361 constants.DT_EXT, errors.ECODE_INVAL)
363 disks.append(new_disk)
368 def CheckRADOSFreeSpace():
369 """Compute disk size requirements inside the RADOS cluster.
372 # For the RADOS cluster we assume there is always enough space.
376 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377 iv_name, p_minor, s_minor):
378 """Generate a drbd8 device complete with its children.
381 assert len(vgnames) == len(names) == 2
382 port = lu.cfg.AllocatePort()
383 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
385 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386 logical_id=(vgnames[0], names[0]),
388 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389 dev_meta = objects.Disk(dev_type=constants.LD_LV,
390 size=constants.DRBD_META_SIZE,
391 logical_id=(vgnames[1], names[1]),
393 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395 logical_id=(primary, secondary, port,
398 children=[dev_data, dev_meta],
399 iv_name=iv_name, params={})
400 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404 def GenerateDiskTemplate(
405 lu, template_name, instance_name, primary_node, secondary_nodes,
406 disk_info, file_storage_dir, file_driver, base_index,
407 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409 """Generate the entire disk layout for a given template type.
412 vgname = lu.cfg.GetVGName()
413 disk_count = len(disk_info)
416 if template_name == constants.DT_DISKLESS:
418 elif template_name == constants.DT_DRBD8:
419 if len(secondary_nodes) != 1:
420 raise errors.ProgrammerError("Wrong template configuration")
421 remote_node = secondary_nodes[0]
422 minors = lu.cfg.AllocateDRBDMinor(
423 [primary_node, remote_node] * len(disk_info), instance_name)
425 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
427 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
430 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431 for i in range(disk_count)]):
432 names.append(lv_prefix + "_data")
433 names.append(lv_prefix + "_meta")
434 for idx, disk in enumerate(disk_info):
435 disk_index = idx + base_index
436 data_vg = disk.get(constants.IDISK_VG, vgname)
437 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439 disk[constants.IDISK_SIZE],
441 names[idx * 2:idx * 2 + 2],
442 "disk/%d" % disk_index,
443 minors[idx * 2], minors[idx * 2 + 1])
444 disk_dev.mode = disk[constants.IDISK_MODE]
445 disk_dev.name = disk.get(constants.IDISK_NAME, None)
446 disks.append(disk_dev)
449 raise errors.ProgrammerError("Wrong template configuration")
451 if template_name == constants.DT_FILE:
453 elif template_name == constants.DT_SHARED_FILE:
454 _req_shr_file_storage()
456 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457 if name_prefix is None:
460 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461 (name_prefix, base_index + i)
462 for i in range(disk_count)])
464 if template_name == constants.DT_PLAIN:
466 def logical_id_fn(idx, _, disk):
467 vg = disk.get(constants.IDISK_VG, vgname)
468 return (vg, names[idx])
470 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
472 lambda _, disk_index, disk: (file_driver,
473 "%s/disk%d" % (file_storage_dir,
475 elif template_name == constants.DT_BLOCK:
477 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478 disk[constants.IDISK_ADOPT])
479 elif template_name == constants.DT_RBD:
480 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481 elif template_name == constants.DT_EXT:
482 def logical_id_fn(idx, _, disk):
483 provider = disk.get(constants.IDISK_PROVIDER, None)
485 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486 " not found", constants.DT_EXT,
487 constants.IDISK_PROVIDER)
488 return (provider, names[idx])
490 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
492 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
494 for idx, disk in enumerate(disk_info):
496 # Only for the Ext template add disk_info to params
497 if template_name == constants.DT_EXT:
498 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
500 if key not in constants.IDISK_PARAMS:
501 params[key] = disk[key]
502 disk_index = idx + base_index
503 size = disk[constants.IDISK_SIZE]
504 feedback_fn("* disk %s, size %s" %
505 (disk_index, utils.FormatUnit(size, "h")))
506 disk_dev = objects.Disk(dev_type=dev_type, size=size,
507 logical_id=logical_id_fn(idx, disk_index, disk),
508 iv_name="disk/%d" % disk_index,
509 mode=disk[constants.IDISK_MODE],
511 disk_dev.name = disk.get(constants.IDISK_NAME, None)
512 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513 disks.append(disk_dev)
518 class LUInstanceRecreateDisks(LogicalUnit):
519 """Recreate an instance's missing disks.
522 HPATH = "instance-recreate-disks"
523 HTYPE = constants.HTYPE_INSTANCE
526 _MODIFYABLE = compat.UniqueFrozenset([
527 constants.IDISK_SIZE,
528 constants.IDISK_MODE,
531 # New or changed disk parameters may have different semantics
532 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
533 constants.IDISK_ADOPT,
535 # TODO: Implement support changing VG while recreating
537 constants.IDISK_METAVG,
538 constants.IDISK_PROVIDER,
539 constants.IDISK_NAME,
542 def _RunAllocator(self):
543 """Run the allocator based on input opcode.
546 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
549 # The allocator should actually run in "relocate" mode, but current
550 # allocators don't support relocating all the nodes of an instance at
551 # the same time. As a workaround we use "allocate" mode, but this is
552 # suboptimal for two reasons:
553 # - The instance name passed to the allocator is present in the list of
554 # existing instances, so there could be a conflict within the
555 # internal structures of the allocator. This doesn't happen with the
556 # current allocators, but it's a liability.
557 # - The allocator counts the resources used by the instance twice: once
558 # because the instance exists already, and once because it tries to
559 # allocate a new instance.
560 # The allocator could choose some of the nodes on which the instance is
561 # running, but that's not a problem. If the instance nodes are broken,
562 # they should be already be marked as drained or offline, and hence
563 # skipped by the allocator. If instance disks have been lost for other
564 # reasons, then recreating the disks on the same nodes should be fine.
565 disk_template = self.instance.disk_template
566 spindle_use = be_full[constants.BE_SPINDLE_USE]
567 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
568 disk_template=disk_template,
569 tags=list(self.instance.GetTags()),
572 vcpus=be_full[constants.BE_VCPUS],
573 memory=be_full[constants.BE_MAXMEM],
574 spindle_use=spindle_use,
575 disks=[{constants.IDISK_SIZE: d.size,
576 constants.IDISK_MODE: d.mode}
577 for d in self.instance.disks],
578 hypervisor=self.instance.hypervisor,
580 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
582 ial.Run(self.op.iallocator)
584 assert req.RequiredNodes() == len(self.instance.all_nodes)
587 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
588 " %s" % (self.op.iallocator, ial.info),
591 self.op.nodes = ial.result
592 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
593 self.op.instance_name, self.op.iallocator,
594 utils.CommaJoin(ial.result))
596 def CheckArguments(self):
597 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
598 # Normalize and convert deprecated list of disk indices
599 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
601 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
603 raise errors.OpPrereqError("Some disks have been specified more than"
604 " once: %s" % utils.CommaJoin(duplicates),
607 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
608 # when neither iallocator nor nodes are specified
609 if self.op.iallocator or self.op.nodes:
610 CheckIAllocatorOrNode(self, "iallocator", "nodes")
612 for (idx, params) in self.op.disks:
613 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
614 unsupported = frozenset(params.keys()) - self._MODIFYABLE
616 raise errors.OpPrereqError("Parameters for disk %s try to change"
617 " unmodifyable parameter(s): %s" %
618 (idx, utils.CommaJoin(unsupported)),
621 def ExpandNames(self):
622 self._ExpandAndLockInstance()
623 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
626 self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
627 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
629 self.needed_locks[locking.LEVEL_NODE] = []
630 if self.op.iallocator:
631 # iallocator will select a new node in the same group
632 self.needed_locks[locking.LEVEL_NODEGROUP] = []
633 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
635 self.needed_locks[locking.LEVEL_NODE_RES] = []
637 def DeclareLocks(self, level):
638 if level == locking.LEVEL_NODEGROUP:
639 assert self.op.iallocator is not None
640 assert not self.op.nodes
641 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
642 self.share_locks[locking.LEVEL_NODEGROUP] = 1
643 # Lock the primary group used by the instance optimistically; this
644 # requires going via the node before it's locked, requiring
645 # verification later on
646 self.needed_locks[locking.LEVEL_NODEGROUP] = \
647 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
649 elif level == locking.LEVEL_NODE:
650 # If an allocator is used, then we lock all the nodes in the current
651 # instance group, as we don't know yet which ones will be selected;
652 # if we replace the nodes without using an allocator, locks are
653 # already declared in ExpandNames; otherwise, we need to lock all the
654 # instance nodes for disk re-creation
655 if self.op.iallocator:
656 assert not self.op.nodes
657 assert not self.needed_locks[locking.LEVEL_NODE]
658 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
660 # Lock member nodes of the group of the primary node
661 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
662 self.needed_locks[locking.LEVEL_NODE].extend(
663 self.cfg.GetNodeGroup(group_uuid).members)
665 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
666 elif not self.op.nodes:
667 self._LockInstancesNodes(primary_only=False)
668 elif level == locking.LEVEL_NODE_RES:
670 self.needed_locks[locking.LEVEL_NODE_RES] = \
671 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
673 def BuildHooksEnv(self):
676 This runs on master, primary and secondary nodes of the instance.
679 return BuildInstanceHookEnvByObject(self, self.instance)
681 def BuildHooksNodes(self):
682 """Build hooks nodes.
685 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
688 def CheckPrereq(self):
689 """Check prerequisites.
691 This checks that the instance is in the cluster and is not running.
694 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
695 assert instance is not None, \
696 "Cannot retrieve locked instance %s" % self.op.instance_name
698 if len(self.op.nodes) != len(instance.all_nodes):
699 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
700 " %d replacement nodes were specified" %
701 (instance.name, len(instance.all_nodes),
704 assert instance.disk_template != constants.DT_DRBD8 or \
705 len(self.op.nodes) == 2
706 assert instance.disk_template != constants.DT_PLAIN or \
707 len(self.op.nodes) == 1
708 primary_node = self.op.nodes[0]
710 primary_node = instance.primary_node
711 if not self.op.iallocator:
712 CheckNodeOnline(self, primary_node)
714 if instance.disk_template == constants.DT_DISKLESS:
715 raise errors.OpPrereqError("Instance '%s' has no disks" %
716 self.op.instance_name, errors.ECODE_INVAL)
718 # Verify if node group locks are still correct
719 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
721 # Node group locks are acquired only for the primary node (and only
722 # when the allocator is used)
723 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
726 # if we replace nodes *and* the old primary is offline, we don't
727 # check the instance state
728 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
729 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
730 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
731 msg="cannot recreate disks")
734 self.disks = dict(self.op.disks)
736 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
738 maxidx = max(self.disks.keys())
739 if maxidx >= len(instance.disks):
740 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
743 if ((self.op.nodes or self.op.iallocator) and
744 sorted(self.disks.keys()) != range(len(instance.disks))):
745 raise errors.OpPrereqError("Can't recreate disks partially and"
746 " change the nodes at the same time",
749 self.instance = instance
751 if self.op.iallocator:
753 # Release unneeded node and node resource locks
754 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
755 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
756 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
758 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
760 def Exec(self, feedback_fn):
761 """Recreate the disks.
764 instance = self.instance
766 assert (self.owned_locks(locking.LEVEL_NODE) ==
767 self.owned_locks(locking.LEVEL_NODE_RES))
770 mods = [] # keeps track of needed changes
772 for idx, disk in enumerate(instance.disks):
774 changes = self.disks[idx]
776 # Disk should not be recreated
780 # update secondaries for disks, if needed
781 if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
782 # need to update the nodes and minors
783 assert len(self.op.nodes) == 2
784 assert len(disk.logical_id) == 6 # otherwise disk internals
786 (_, _, old_port, _, _, old_secret) = disk.logical_id
787 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
788 new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
789 new_minors[0], new_minors[1], old_secret)
790 assert len(disk.logical_id) == len(new_id)
794 mods.append((idx, new_id, changes))
796 # now that we have passed all asserts above, we can apply the mods
797 # in a single run (to avoid partial changes)
798 for idx, new_id, changes in mods:
799 disk = instance.disks[idx]
800 if new_id is not None:
801 assert disk.dev_type == constants.LD_DRBD8
802 disk.logical_id = new_id
804 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
805 mode=changes.get(constants.IDISK_MODE, None))
807 # change primary node, if needed
809 instance.primary_node = self.op.nodes[0]
810 self.LogWarning("Changing the instance's nodes, you will have to"
811 " remove any disks left on the older nodes manually")
814 self.cfg.Update(instance, feedback_fn)
816 # All touched nodes must be locked
817 mylocks = self.owned_locks(locking.LEVEL_NODE)
818 assert mylocks.issuperset(frozenset(instance.all_nodes))
819 new_disks = CreateDisks(self, instance, to_skip=to_skip)
821 # TODO: Release node locks before wiping, or explain why it's not possible
822 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
823 wipedisks = [(idx, disk, 0)
824 for (idx, disk) in enumerate(instance.disks)
825 if idx not in to_skip]
826 WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
829 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
830 """Checks if nodes have enough free disk space in the specified VG.
832 This function checks if all given nodes have the needed amount of
833 free disk. In case any node has less disk or we cannot get the
834 information from the node, this function raises an OpPrereqError
837 @type lu: C{LogicalUnit}
838 @param lu: a logical unit from which we get configuration data
839 @type nodenames: C{list}
840 @param nodenames: the list of node names to check
842 @param vg: the volume group to check
843 @type requested: C{int}
844 @param requested: the amount of disk in MiB to check for
845 @raise errors.OpPrereqError: if the node doesn't have enough disk,
846 or we cannot check the node
849 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
850 nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
851 for node in nodenames:
852 info = nodeinfo[node]
853 info.Raise("Cannot get current information from node %s" % node,
854 prereq=True, ecode=errors.ECODE_ENVIRON)
855 (_, (vg_info, ), _) = info.payload
856 vg_free = vg_info.get("vg_free", None)
857 if not isinstance(vg_free, int):
858 raise errors.OpPrereqError("Can't compute free disk space on node"
859 " %s for vg %s, result was '%s'" %
860 (node, vg, vg_free), errors.ECODE_ENVIRON)
861 if requested > vg_free:
862 raise errors.OpPrereqError("Not enough disk space on target node %s"
863 " vg %s: required %d MiB, available %d MiB" %
864 (node, vg, requested, vg_free),
868 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
869 """Checks if nodes have enough free disk space in all the VGs.
871 This function checks if all given nodes have the needed amount of
872 free disk. In case any node has less disk or we cannot get the
873 information from the node, this function raises an OpPrereqError
876 @type lu: C{LogicalUnit}
877 @param lu: a logical unit from which we get configuration data
878 @type nodenames: C{list}
879 @param nodenames: the list of node names to check
880 @type req_sizes: C{dict}
881 @param req_sizes: the hash of vg and corresponding amount of disk in
883 @raise errors.OpPrereqError: if the node doesn't have enough disk,
884 or we cannot check the node
887 for vg, req_size in req_sizes.items():
888 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
891 def _DiskSizeInBytesToMebibytes(lu, size):
892 """Converts a disk size in bytes to mebibytes.
894 Warns and rounds up if the size isn't an even multiple of 1 MiB.
897 (mib, remainder) = divmod(size, 1024 * 1024)
900 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
901 " to not overwrite existing data (%s bytes will not be"
902 " wiped)", (1024 * 1024) - remainder)
908 def _CalcEta(time_taken, written, total_size):
909 """Calculates the ETA based on size written and total size.
911 @param time_taken: The time taken so far
912 @param written: amount written so far
913 @param total_size: The total size of data to be written
914 @return: The remaining time in seconds
917 avg_time = time_taken / float(written)
918 return (total_size - written) * avg_time
921 def WipeDisks(lu, instance, disks=None):
922 """Wipes instance disks.
924 @type lu: L{LogicalUnit}
925 @param lu: the logical unit on whose behalf we execute
926 @type instance: L{objects.Instance}
927 @param instance: the instance whose disks we should create
928 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
929 @param disks: Disk details; tuple contains disk index, disk object and the
933 node = instance.primary_node
936 disks = [(idx, disk, 0)
937 for (idx, disk) in enumerate(instance.disks)]
939 for (_, device, _) in disks:
940 lu.cfg.SetDiskID(device, node)
942 logging.info("Pausing synchronization of disks of instance '%s'",
944 result = lu.rpc.call_blockdev_pause_resume_sync(node,
945 (map(compat.snd, disks),
948 result.Raise("Failed to pause disk synchronization on node '%s'" % node)
950 for idx, success in enumerate(result.payload):
952 logging.warn("Pausing synchronization of disk %s of instance '%s'"
953 " failed", idx, instance.name)
956 for (idx, device, offset) in disks:
957 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
958 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
960 int(min(constants.MAX_WIPE_CHUNK,
961 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
965 start_time = time.time()
970 info_text = (" (from %s to %s)" %
971 (utils.FormatUnit(offset, "h"),
972 utils.FormatUnit(size, "h")))
974 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
976 logging.info("Wiping disk %d for instance %s on node %s using"
977 " chunk size %s", idx, instance.name, node, wipe_chunk_size)
980 wipe_size = min(wipe_chunk_size, size - offset)
982 logging.debug("Wiping disk %d, offset %s, chunk %s",
983 idx, offset, wipe_size)
985 result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
987 result.Raise("Could not wipe disk %d at offset %d for size %d" %
988 (idx, offset, wipe_size))
992 if now - last_output >= 60:
993 eta = _CalcEta(now - start_time, offset, size)
994 lu.LogInfo(" - done: %.1f%% ETA: %s",
995 offset / float(size) * 100, utils.FormatSeconds(eta))
998 logging.info("Resuming synchronization of disks for instance '%s'",
1001 result = lu.rpc.call_blockdev_pause_resume_sync(node,
1002 (map(compat.snd, disks),
1007 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1008 node, result.fail_msg)
1010 for idx, success in enumerate(result.payload):
1012 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1013 " failed", idx, instance.name)
1016 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1017 """Wrapper for L{WipeDisks} that handles errors.
1019 @type lu: L{LogicalUnit}
1020 @param lu: the logical unit on whose behalf we execute
1021 @type instance: L{objects.Instance}
1022 @param instance: the instance whose disks we should wipe
1023 @param disks: see L{WipeDisks}
1024 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1026 @raise errors.OpPrereqError: in case of failure
1030 WipeDisks(lu, instance, disks=disks)
1031 except errors.OpExecError:
1032 logging.warning("Wiping disks for instance '%s' failed",
1034 _UndoCreateDisks(lu, cleanup)
1038 def ExpandCheckDisks(instance, disks):
1039 """Return the instance disks selected by the disks list
1041 @type disks: list of L{objects.Disk} or None
1042 @param disks: selected disks
1043 @rtype: list of L{objects.Disk}
1044 @return: selected instance disks to act on
1048 return instance.disks
1050 if not set(disks).issubset(instance.disks):
1051 raise errors.ProgrammerError("Can only act on disks belonging to the"
1052 " target instance: expected a subset of %r,"
1053 " got %r" % (instance.disks, disks))
1057 def WaitForSync(lu, instance, disks=None, oneshot=False):
1058 """Sleep and poll for an instance's disk to sync.
1061 if not instance.disks or disks is not None and not disks:
1064 disks = ExpandCheckDisks(instance, disks)
1067 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1069 node = instance.primary_node
1072 lu.cfg.SetDiskID(dev, node)
1074 # TODO: Convert to utils.Retry
1077 degr_retries = 10 # in seconds, as we sleep 1 second each time
1081 cumul_degraded = False
1082 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1083 msg = rstats.fail_msg
1085 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1088 raise errors.RemoteError("Can't contact node %s for mirror data,"
1089 " aborting." % node)
1092 rstats = rstats.payload
1094 for i, mstat in enumerate(rstats):
1096 lu.LogWarning("Can't compute data for node %s/%s",
1097 node, disks[i].iv_name)
1100 cumul_degraded = (cumul_degraded or
1101 (mstat.is_degraded and mstat.sync_percent is None))
1102 if mstat.sync_percent is not None:
1104 if mstat.estimated_time is not None:
1105 rem_time = ("%s remaining (estimated)" %
1106 utils.FormatSeconds(mstat.estimated_time))
1107 max_time = mstat.estimated_time
1109 rem_time = "no time estimate"
1110 lu.LogInfo("- device %s: %5.2f%% done, %s",
1111 disks[i].iv_name, mstat.sync_percent, rem_time)
1113 # if we're done but degraded, let's do a few small retries, to
1114 # make sure we see a stable and not transient situation; therefore
1115 # we force restart of the loop
1116 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1117 logging.info("Degraded disks found, %d retries left", degr_retries)
1125 time.sleep(min(60, max_time))
1128 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1130 return not cumul_degraded
1133 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1134 """Shutdown block devices of an instance.
1136 This does the shutdown on all nodes of the instance.
1138 If the ignore_primary is false, errors on the primary node are
1145 # only mark instance disks as inactive if all disks are affected
1146 lu.cfg.MarkInstanceDisksInactive(instance.name)
1148 disks = ExpandCheckDisks(instance, disks)
1151 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1152 lu.cfg.SetDiskID(top_disk, node)
1153 result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1154 msg = result.fail_msg
1156 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1157 disk.iv_name, node, msg)
1158 if ((node == instance.primary_node and not ignore_primary) or
1159 (node != instance.primary_node and not result.offline)):
1164 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1165 """Shutdown block devices of an instance.
1167 This function checks if an instance is running, before calling
1168 _ShutdownInstanceDisks.
1171 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1172 ShutdownInstanceDisks(lu, instance, disks=disks)
1175 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1177 """Prepare the block devices for an instance.
1179 This sets up the block devices on all nodes.
1181 @type lu: L{LogicalUnit}
1182 @param lu: the logical unit on whose behalf we execute
1183 @type instance: L{objects.Instance}
1184 @param instance: the instance for whose disks we assemble
1185 @type disks: list of L{objects.Disk} or None
1186 @param disks: which disks to assemble (or all, if None)
1187 @type ignore_secondaries: boolean
1188 @param ignore_secondaries: if true, errors on secondary nodes
1189 won't result in an error return from the function
1190 @type ignore_size: boolean
1191 @param ignore_size: if true, the current known size of the disk
1192 will not be used during the disk activation, useful for cases
1193 when the size is wrong
1194 @return: False if the operation failed, otherwise a list of
1195 (host, instance_visible_name, node_visible_name)
1196 with the mapping from node devices to instance devices
1201 iname = instance.name
1204 # only mark instance disks as active if all disks are affected
1205 lu.cfg.MarkInstanceDisksActive(iname)
1207 disks = ExpandCheckDisks(instance, disks)
1209 # With the two passes mechanism we try to reduce the window of
1210 # opportunity for the race condition of switching DRBD to primary
1211 # before handshaking occured, but we do not eliminate it
1213 # The proper fix would be to wait (with some limits) until the
1214 # connection has been made and drbd transitions from WFConnection
1215 # into any other network-connected state (Connected, SyncTarget,
1218 # 1st pass, assemble on all nodes in secondary mode
1219 for idx, inst_disk in enumerate(disks):
1220 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1222 node_disk = node_disk.Copy()
1223 node_disk.UnsetSize()
1224 lu.cfg.SetDiskID(node_disk, node)
1225 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1227 msg = result.fail_msg
1229 is_offline_secondary = (node in instance.secondary_nodes and
1231 lu.LogWarning("Could not prepare block device %s on node %s"
1232 " (is_primary=False, pass=1): %s",
1233 inst_disk.iv_name, node, msg)
1234 if not (ignore_secondaries or is_offline_secondary):
1237 # FIXME: race condition on drbd migration to primary
1239 # 2nd pass, do only the primary node
1240 for idx, inst_disk in enumerate(disks):
1243 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1244 if node != instance.primary_node:
1247 node_disk = node_disk.Copy()
1248 node_disk.UnsetSize()
1249 lu.cfg.SetDiskID(node_disk, node)
1250 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1252 msg = result.fail_msg
1254 lu.LogWarning("Could not prepare block device %s on node %s"
1255 " (is_primary=True, pass=2): %s",
1256 inst_disk.iv_name, node, msg)
1259 dev_path, _ = result.payload
1261 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1263 # leave the disks configured for the primary node
1264 # this is a workaround that would be fixed better by
1265 # improving the logical/physical id handling
1267 lu.cfg.SetDiskID(disk, instance.primary_node)
1270 lu.cfg.MarkInstanceDisksInactive(iname)
1272 return disks_ok, device_info
1275 def StartInstanceDisks(lu, instance, force):
1276 """Start the disks of an instance.
1279 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1280 ignore_secondaries=force)
1282 ShutdownInstanceDisks(lu, instance)
1283 if force is not None and not force:
1285 hint=("If the message above refers to a secondary node,"
1286 " you can retry the operation using '--force'"))
1287 raise errors.OpExecError("Disk consistency error")
1290 class LUInstanceGrowDisk(LogicalUnit):
1291 """Grow a disk of an instance.
1295 HTYPE = constants.HTYPE_INSTANCE
1298 def ExpandNames(self):
1299 self._ExpandAndLockInstance()
1300 self.needed_locks[locking.LEVEL_NODE] = []
1301 self.needed_locks[locking.LEVEL_NODE_RES] = []
1302 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1303 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1305 def DeclareLocks(self, level):
1306 if level == locking.LEVEL_NODE:
1307 self._LockInstancesNodes()
1308 elif level == locking.LEVEL_NODE_RES:
1310 self.needed_locks[locking.LEVEL_NODE_RES] = \
1311 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1313 def BuildHooksEnv(self):
1316 This runs on the master, the primary and all the secondaries.
1320 "DISK": self.op.disk,
1321 "AMOUNT": self.op.amount,
1322 "ABSOLUTE": self.op.absolute,
1324 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1327 def BuildHooksNodes(self):
1328 """Build hooks nodes.
1331 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1334 def CheckPrereq(self):
1335 """Check prerequisites.
1337 This checks that the instance is in the cluster.
1340 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1341 assert instance is not None, \
1342 "Cannot retrieve locked instance %s" % self.op.instance_name
1343 nodenames = list(instance.all_nodes)
1344 for node in nodenames:
1345 CheckNodeOnline(self, node)
1347 self.instance = instance
1349 if instance.disk_template not in constants.DTS_GROWABLE:
1350 raise errors.OpPrereqError("Instance's disk layout does not support"
1351 " growing", errors.ECODE_INVAL)
1353 self.disk = instance.FindDisk(self.op.disk)
1355 if self.op.absolute:
1356 self.target = self.op.amount
1357 self.delta = self.target - self.disk.size
1359 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1360 "current disk size (%s)" %
1361 (utils.FormatUnit(self.target, "h"),
1362 utils.FormatUnit(self.disk.size, "h")),
1365 self.delta = self.op.amount
1366 self.target = self.disk.size + self.delta
1368 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1369 utils.FormatUnit(self.delta, "h"),
1372 self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1374 def _CheckDiskSpace(self, nodenames, req_vgspace):
1375 template = self.instance.disk_template
1376 if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1377 # TODO: check the free disk space for file, when that feature will be
1379 nodes = map(self.cfg.GetNodeInfo, nodenames)
1380 es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1383 # With exclusive storage we need to something smarter than just looking
1384 # at free space; for now, let's simply abort the operation.
1385 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1386 " is enabled", errors.ECODE_STATE)
1387 CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1389 def Exec(self, feedback_fn):
1390 """Execute disk grow.
1393 instance = self.instance
1396 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1397 assert (self.owned_locks(locking.LEVEL_NODE) ==
1398 self.owned_locks(locking.LEVEL_NODE_RES))
1400 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1402 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1404 raise errors.OpExecError("Cannot activate block device to grow")
1406 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1407 (self.op.disk, instance.name,
1408 utils.FormatUnit(self.delta, "h"),
1409 utils.FormatUnit(self.target, "h")))
1411 # First run all grow ops in dry-run mode
1412 for node in instance.all_nodes:
1413 self.cfg.SetDiskID(disk, node)
1414 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1416 result.Raise("Dry-run grow request failed to node %s" % node)
1419 # Get disk size from primary node for wiping
1420 self.cfg.SetDiskID(disk, instance.primary_node)
1421 result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1422 result.Raise("Failed to retrieve disk size from node '%s'" %
1423 instance.primary_node)
1425 (disk_size_in_bytes, ) = result.payload
1427 if disk_size_in_bytes is None:
1428 raise errors.OpExecError("Failed to retrieve disk size from primary"
1429 " node '%s'" % instance.primary_node)
1431 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1433 assert old_disk_size >= disk.size, \
1434 ("Retrieved disk size too small (got %s, should be at least %s)" %
1435 (old_disk_size, disk.size))
1437 old_disk_size = None
1439 # We know that (as far as we can test) operations across different
1440 # nodes will succeed, time to run it for real on the backing storage
1441 for node in instance.all_nodes:
1442 self.cfg.SetDiskID(disk, node)
1443 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1445 result.Raise("Grow request failed to node %s" % node)
1447 # And now execute it for logical storage, on the primary node
1448 node = instance.primary_node
1449 self.cfg.SetDiskID(disk, node)
1450 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1452 result.Raise("Grow request failed to node %s" % node)
1454 disk.RecordGrow(self.delta)
1455 self.cfg.Update(instance, feedback_fn)
1457 # Changes have been recorded, release node lock
1458 ReleaseLocks(self, locking.LEVEL_NODE)
1460 # Downgrade lock while waiting for sync
1461 self.glm.downgrade(locking.LEVEL_INSTANCE)
1463 assert wipe_disks ^ (old_disk_size is None)
1466 assert instance.disks[self.op.disk] == disk
1468 # Wipe newly added disk space
1469 WipeDisks(self, instance,
1470 disks=[(self.op.disk, disk, old_disk_size)])
1472 if self.op.wait_for_sync:
1473 disk_abort = not WaitForSync(self, instance, disks=[disk])
1475 self.LogWarning("Disk syncing has not returned a good status; check"
1477 if not instance.disks_active:
1478 _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1479 elif not instance.disks_active:
1480 self.LogWarning("Not shutting down the disk even if the instance is"
1481 " not supposed to be running because no wait for"
1482 " sync mode was requested")
1484 assert self.owned_locks(locking.LEVEL_NODE_RES)
1485 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1488 class LUInstanceReplaceDisks(LogicalUnit):
1489 """Replace the disks of an instance.
1492 HPATH = "mirrors-replace"
1493 HTYPE = constants.HTYPE_INSTANCE
1496 def CheckArguments(self):
1500 remote_node = self.op.remote_node
1501 ialloc = self.op.iallocator
1502 if self.op.mode == constants.REPLACE_DISK_CHG:
1503 if remote_node is None and ialloc is None:
1504 raise errors.OpPrereqError("When changing the secondary either an"
1505 " iallocator script must be used or the"
1506 " new node given", errors.ECODE_INVAL)
1508 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1510 elif remote_node is not None or ialloc is not None:
1511 # Not replacing the secondary
1512 raise errors.OpPrereqError("The iallocator and new node options can"
1513 " only be used when changing the"
1514 " secondary node", errors.ECODE_INVAL)
1516 def ExpandNames(self):
1517 self._ExpandAndLockInstance()
1519 assert locking.LEVEL_NODE not in self.needed_locks
1520 assert locking.LEVEL_NODE_RES not in self.needed_locks
1521 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1523 assert self.op.iallocator is None or self.op.remote_node is None, \
1524 "Conflicting options"
1526 if self.op.remote_node is not None:
1527 self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1529 # Warning: do not remove the locking of the new secondary here
1530 # unless DRBD8.AddChildren is changed to work in parallel;
1531 # currently it doesn't since parallel invocations of
1532 # FindUnusedMinor will conflict
1533 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1534 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1536 self.needed_locks[locking.LEVEL_NODE] = []
1537 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1539 if self.op.iallocator is not None:
1540 # iallocator will select a new node in the same group
1541 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1542 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1544 self.needed_locks[locking.LEVEL_NODE_RES] = []
1546 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1547 self.op.iallocator, self.op.remote_node,
1548 self.op.disks, self.op.early_release,
1549 self.op.ignore_ipolicy)
1551 self.tasklets = [self.replacer]
1553 def DeclareLocks(self, level):
1554 if level == locking.LEVEL_NODEGROUP:
1555 assert self.op.remote_node is None
1556 assert self.op.iallocator is not None
1557 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1559 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1560 # Lock all groups used by instance optimistically; this requires going
1561 # via the node before it's locked, requiring verification later on
1562 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1563 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1565 elif level == locking.LEVEL_NODE:
1566 if self.op.iallocator is not None:
1567 assert self.op.remote_node is None
1568 assert not self.needed_locks[locking.LEVEL_NODE]
1569 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1571 # Lock member nodes of all locked groups
1572 self.needed_locks[locking.LEVEL_NODE] = \
1574 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1575 for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1577 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1579 self._LockInstancesNodes()
1581 elif level == locking.LEVEL_NODE_RES:
1583 self.needed_locks[locking.LEVEL_NODE_RES] = \
1584 self.needed_locks[locking.LEVEL_NODE]
1586 def BuildHooksEnv(self):
1589 This runs on the master, the primary and all the secondaries.
1592 instance = self.replacer.instance
1594 "MODE": self.op.mode,
1595 "NEW_SECONDARY": self.op.remote_node,
1596 "OLD_SECONDARY": instance.secondary_nodes[0],
1598 env.update(BuildInstanceHookEnvByObject(self, instance))
1601 def BuildHooksNodes(self):
1602 """Build hooks nodes.
1605 instance = self.replacer.instance
1607 self.cfg.GetMasterNode(),
1608 instance.primary_node,
1610 if self.op.remote_node is not None:
1611 nl.append(self.op.remote_node)
1614 def CheckPrereq(self):
1615 """Check prerequisites.
1618 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1619 self.op.iallocator is None)
1621 # Verify if node group locks are still correct
1622 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1624 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1626 return LogicalUnit.CheckPrereq(self)
1629 class LUInstanceActivateDisks(NoHooksLU):
1630 """Bring up an instance's disks.
1635 def ExpandNames(self):
1636 self._ExpandAndLockInstance()
1637 self.needed_locks[locking.LEVEL_NODE] = []
1638 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1640 def DeclareLocks(self, level):
1641 if level == locking.LEVEL_NODE:
1642 self._LockInstancesNodes()
1644 def CheckPrereq(self):
1645 """Check prerequisites.
1647 This checks that the instance is in the cluster.
1650 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1651 assert self.instance is not None, \
1652 "Cannot retrieve locked instance %s" % self.op.instance_name
1653 CheckNodeOnline(self, self.instance.primary_node)
1655 def Exec(self, feedback_fn):
1656 """Activate the disks.
1659 disks_ok, disks_info = \
1660 AssembleInstanceDisks(self, self.instance,
1661 ignore_size=self.op.ignore_size)
1663 raise errors.OpExecError("Cannot activate block devices")
1665 if self.op.wait_for_sync:
1666 if not WaitForSync(self, self.instance):
1667 self.cfg.MarkInstanceDisksInactive(self.instance.name)
1668 raise errors.OpExecError("Some disks of the instance are degraded!")
1673 class LUInstanceDeactivateDisks(NoHooksLU):
1674 """Shutdown an instance's disks.
1679 def ExpandNames(self):
1680 self._ExpandAndLockInstance()
1681 self.needed_locks[locking.LEVEL_NODE] = []
1682 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1684 def DeclareLocks(self, level):
1685 if level == locking.LEVEL_NODE:
1686 self._LockInstancesNodes()
1688 def CheckPrereq(self):
1689 """Check prerequisites.
1691 This checks that the instance is in the cluster.
1694 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1695 assert self.instance is not None, \
1696 "Cannot retrieve locked instance %s" % self.op.instance_name
1698 def Exec(self, feedback_fn):
1699 """Deactivate the disks
1702 instance = self.instance
1704 ShutdownInstanceDisks(self, instance)
1706 _SafeShutdownInstanceDisks(self, instance)
1709 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1711 """Check that mirrors are not degraded.
1713 @attention: The device has to be annotated already.
1715 The ldisk parameter, if True, will change the test from the
1716 is_degraded attribute (which represents overall non-ok status for
1717 the device(s)) to the ldisk (representing the local storage status).
1720 lu.cfg.SetDiskID(dev, node)
1724 if on_primary or dev.AssembleOnSecondary():
1725 rstats = lu.rpc.call_blockdev_find(node, dev)
1726 msg = rstats.fail_msg
1728 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1730 elif not rstats.payload:
1731 lu.LogWarning("Can't find disk on node %s", node)
1735 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1737 result = result and not rstats.payload.is_degraded
1740 for child in dev.children:
1741 result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1747 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1748 """Wrapper around L{_CheckDiskConsistencyInner}.
1751 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1752 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1756 def _BlockdevFind(lu, node, dev, instance):
1757 """Wrapper around call_blockdev_find to annotate diskparams.
1759 @param lu: A reference to the lu object
1760 @param node: The node to call out
1761 @param dev: The device to find
1762 @param instance: The instance object the device belongs to
1763 @returns The result of the rpc call
1766 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1767 return lu.rpc.call_blockdev_find(node, disk)
1770 def _GenerateUniqueNames(lu, exts):
1771 """Generate a suitable LV name.
1773 This will generate a logical volume name for the given instance.
1778 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1779 results.append("%s%s" % (new_id, val))
1783 class TLReplaceDisks(Tasklet):
1784 """Replaces disks for an instance.
1786 Note: Locking is not within the scope of this class.
1789 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1790 disks, early_release, ignore_ipolicy):
1791 """Initializes this class.
1794 Tasklet.__init__(self, lu)
1797 self.instance_name = instance_name
1799 self.iallocator_name = iallocator_name
1800 self.remote_node = remote_node
1802 self.early_release = early_release
1803 self.ignore_ipolicy = ignore_ipolicy
1806 self.instance = None
1807 self.new_node = None
1808 self.target_node = None
1809 self.other_node = None
1810 self.remote_node_info = None
1811 self.node_secondary_ip = None
1814 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1815 """Compute a new secondary node using an IAllocator.
1818 req = iallocator.IAReqRelocate(name=instance_name,
1819 relocate_from=list(relocate_from))
1820 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1822 ial.Run(iallocator_name)
1825 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1826 " %s" % (iallocator_name, ial.info),
1829 remote_node_name = ial.result[0]
1831 lu.LogInfo("Selected new secondary for instance '%s': %s",
1832 instance_name, remote_node_name)
1834 return remote_node_name
1836 def _FindFaultyDisks(self, node_name):
1837 """Wrapper for L{FindFaultyInstanceDisks}.
1840 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1843 def _CheckDisksActivated(self, instance):
1844 """Checks if the instance disks are activated.
1846 @param instance: The instance to check disks
1847 @return: True if they are activated, False otherwise
1850 nodes = instance.all_nodes
1852 for idx, dev in enumerate(instance.disks):
1854 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1855 self.cfg.SetDiskID(dev, node)
1857 result = _BlockdevFind(self, node, dev, instance)
1861 elif result.fail_msg or not result.payload:
1866 def CheckPrereq(self):
1867 """Check prerequisites.
1869 This checks that the instance is in the cluster.
1872 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1873 assert instance is not None, \
1874 "Cannot retrieve locked instance %s" % self.instance_name
1876 if instance.disk_template != constants.DT_DRBD8:
1877 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1878 " instances", errors.ECODE_INVAL)
1880 if len(instance.secondary_nodes) != 1:
1881 raise errors.OpPrereqError("The instance has a strange layout,"
1882 " expected one secondary but found %d" %
1883 len(instance.secondary_nodes),
1886 instance = self.instance
1887 secondary_node = instance.secondary_nodes[0]
1889 if self.iallocator_name is None:
1890 remote_node = self.remote_node
1892 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1893 instance.name, instance.secondary_nodes)
1895 if remote_node is None:
1896 self.remote_node_info = None
1898 assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1899 "Remote node '%s' is not locked" % remote_node
1901 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1902 assert self.remote_node_info is not None, \
1903 "Cannot retrieve locked node %s" % remote_node
1905 if remote_node == self.instance.primary_node:
1906 raise errors.OpPrereqError("The specified node is the primary node of"
1907 " the instance", errors.ECODE_INVAL)
1909 if remote_node == secondary_node:
1910 raise errors.OpPrereqError("The specified node is already the"
1911 " secondary node of the instance",
1914 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1915 constants.REPLACE_DISK_CHG):
1916 raise errors.OpPrereqError("Cannot specify disks to be replaced",
1919 if self.mode == constants.REPLACE_DISK_AUTO:
1920 if not self._CheckDisksActivated(instance):
1921 raise errors.OpPrereqError("Please run activate-disks on instance %s"
1922 " first" % self.instance_name,
1924 faulty_primary = self._FindFaultyDisks(instance.primary_node)
1925 faulty_secondary = self._FindFaultyDisks(secondary_node)
1927 if faulty_primary and faulty_secondary:
1928 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1929 " one node and can not be repaired"
1930 " automatically" % self.instance_name,
1934 self.disks = faulty_primary
1935 self.target_node = instance.primary_node
1936 self.other_node = secondary_node
1937 check_nodes = [self.target_node, self.other_node]
1938 elif faulty_secondary:
1939 self.disks = faulty_secondary
1940 self.target_node = secondary_node
1941 self.other_node = instance.primary_node
1942 check_nodes = [self.target_node, self.other_node]
1948 # Non-automatic modes
1949 if self.mode == constants.REPLACE_DISK_PRI:
1950 self.target_node = instance.primary_node
1951 self.other_node = secondary_node
1952 check_nodes = [self.target_node, self.other_node]
1954 elif self.mode == constants.REPLACE_DISK_SEC:
1955 self.target_node = secondary_node
1956 self.other_node = instance.primary_node
1957 check_nodes = [self.target_node, self.other_node]
1959 elif self.mode == constants.REPLACE_DISK_CHG:
1960 self.new_node = remote_node
1961 self.other_node = instance.primary_node
1962 self.target_node = secondary_node
1963 check_nodes = [self.new_node, self.other_node]
1965 CheckNodeNotDrained(self.lu, remote_node)
1966 CheckNodeVmCapable(self.lu, remote_node)
1968 old_node_info = self.cfg.GetNodeInfo(secondary_node)
1969 assert old_node_info is not None
1970 if old_node_info.offline and not self.early_release:
1971 # doesn't make sense to delay the release
1972 self.early_release = True
1973 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1974 " early-release mode", secondary_node)
1977 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1980 # If not specified all disks should be replaced
1982 self.disks = range(len(self.instance.disks))
1984 # TODO: This is ugly, but right now we can't distinguish between internal
1985 # submitted opcode and external one. We should fix that.
1986 if self.remote_node_info:
1987 # We change the node, lets verify it still meets instance policy
1988 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1989 cluster = self.cfg.GetClusterInfo()
1990 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1992 CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1993 self.cfg, ignore=self.ignore_ipolicy)
1995 for node in check_nodes:
1996 CheckNodeOnline(self.lu, node)
1998 touched_nodes = frozenset(node_name for node_name in [self.new_node,
2001 if node_name is not None)
2003 # Release unneeded node and node resource locks
2004 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2005 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2006 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2008 # Release any owned node group
2009 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2011 # Check whether disks are valid
2012 for disk_idx in self.disks:
2013 instance.FindDisk(disk_idx)
2015 # Get secondary node IP addresses
2016 self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2017 in self.cfg.GetMultiNodeInfo(touched_nodes))
2019 def Exec(self, feedback_fn):
2020 """Execute disk replacement.
2022 This dispatches the disk replacement to the appropriate handler.
2026 # Verify owned locks before starting operation
2027 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2028 assert set(owned_nodes) == set(self.node_secondary_ip), \
2029 ("Incorrect node locks, owning %s, expected %s" %
2030 (owned_nodes, self.node_secondary_ip.keys()))
2031 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2032 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2033 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2035 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2036 assert list(owned_instances) == [self.instance_name], \
2037 "Instance '%s' not locked" % self.instance_name
2039 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2040 "Should not own any node group lock at this point"
2043 feedback_fn("No disks need replacement for instance '%s'" %
2047 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2048 (utils.CommaJoin(self.disks), self.instance.name))
2049 feedback_fn("Current primary node: %s" % self.instance.primary_node)
2050 feedback_fn("Current seconary node: %s" %
2051 utils.CommaJoin(self.instance.secondary_nodes))
2053 activate_disks = not self.instance.disks_active
2055 # Activate the instance disks if we're replacing them on a down instance
2057 StartInstanceDisks(self.lu, self.instance, True)
2060 # Should we replace the secondary node?
2061 if self.new_node is not None:
2062 fn = self._ExecDrbd8Secondary
2064 fn = self._ExecDrbd8DiskOnly
2066 result = fn(feedback_fn)
2068 # Deactivate the instance disks if we're replacing them on a
2071 _SafeShutdownInstanceDisks(self.lu, self.instance)
2073 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2076 # Verify owned locks
2077 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2078 nodes = frozenset(self.node_secondary_ip)
2079 assert ((self.early_release and not owned_nodes) or
2080 (not self.early_release and not (set(owned_nodes) - nodes))), \
2081 ("Not owning the correct locks, early_release=%s, owned=%r,"
2082 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2086 def _CheckVolumeGroup(self, nodes):
2087 self.lu.LogInfo("Checking volume groups")
2089 vgname = self.cfg.GetVGName()
2091 # Make sure volume group exists on all involved nodes
2092 results = self.rpc.call_vg_list(nodes)
2094 raise errors.OpExecError("Can't list volume groups on the nodes")
2098 res.Raise("Error checking node %s" % node)
2099 if vgname not in res.payload:
2100 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2103 def _CheckDisksExistence(self, nodes):
2104 # Check disk existence
2105 for idx, dev in enumerate(self.instance.disks):
2106 if idx not in self.disks:
2110 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2111 self.cfg.SetDiskID(dev, node)
2113 result = _BlockdevFind(self, node, dev, self.instance)
2115 msg = result.fail_msg
2116 if msg or not result.payload:
2118 msg = "disk not found"
2119 if not self._CheckDisksActivated(self.instance):
2120 extra_hint = ("\nDisks seem to be not properly activated. Try"
2121 " running activate-disks on the instance before"
2122 " using replace-disks.")
2125 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2126 (idx, node, msg, extra_hint))
2128 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2129 for idx, dev in enumerate(self.instance.disks):
2130 if idx not in self.disks:
2133 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2136 if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2137 on_primary, ldisk=ldisk):
2138 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2139 " replace disks for instance %s" %
2140 (node_name, self.instance.name))
2142 def _CreateNewStorage(self, node_name):
2143 """Create new storage on the primary or secondary node.
2145 This is only used for same-node replaces, not for changing the
2146 secondary node, hence we don't want to modify the existing disk.
2151 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2152 for idx, dev in enumerate(disks):
2153 if idx not in self.disks:
2156 self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2158 self.cfg.SetDiskID(dev, node_name)
2160 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2161 names = _GenerateUniqueNames(self.lu, lv_names)
2163 (data_disk, meta_disk) = dev.children
2164 vg_data = data_disk.logical_id[0]
2165 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2166 logical_id=(vg_data, names[0]),
2167 params=data_disk.params)
2168 vg_meta = meta_disk.logical_id[0]
2169 lv_meta = objects.Disk(dev_type=constants.LD_LV,
2170 size=constants.DRBD_META_SIZE,
2171 logical_id=(vg_meta, names[1]),
2172 params=meta_disk.params)
2174 new_lvs = [lv_data, lv_meta]
2175 old_lvs = [child.Copy() for child in dev.children]
2176 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2177 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2179 # we pass force_create=True to force the LVM creation
2180 for new_lv in new_lvs:
2182 _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2183 GetInstanceInfoText(self.instance), False,
2185 except errors.DeviceCreationError, e:
2186 raise errors.OpExecError("Can't create block device: %s" % e.message)
2190 def _CheckDevices(self, node_name, iv_names):
2191 for name, (dev, _, _) in iv_names.iteritems():
2192 self.cfg.SetDiskID(dev, node_name)
2194 result = _BlockdevFind(self, node_name, dev, self.instance)
2196 msg = result.fail_msg
2197 if msg or not result.payload:
2199 msg = "disk not found"
2200 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2203 if result.payload.is_degraded:
2204 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2206 def _RemoveOldStorage(self, node_name, iv_names):
2207 for name, (_, old_lvs, _) in iv_names.iteritems():
2208 self.lu.LogInfo("Remove logical volumes for %s", name)
2211 self.cfg.SetDiskID(lv, node_name)
2213 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2215 self.lu.LogWarning("Can't remove old LV: %s", msg,
2216 hint="remove unused LVs manually")
2218 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2219 """Replace a disk on the primary or secondary for DRBD 8.
2221 The algorithm for replace is quite complicated:
2223 1. for each disk to be replaced:
2225 1. create new LVs on the target node with unique names
2226 1. detach old LVs from the drbd device
2227 1. rename old LVs to name_replaced.<time_t>
2228 1. rename new LVs to old LVs
2229 1. attach the new LVs (with the old names now) to the drbd device
2231 1. wait for sync across all devices
2233 1. for each modified disk:
2235 1. remove old LVs (which have the name name_replaces.<time_t>)
2237 Failures are not very well handled.
2242 # Step: check device activation
2243 self.lu.LogStep(1, steps_total, "Check device existence")
2244 self._CheckDisksExistence([self.other_node, self.target_node])
2245 self._CheckVolumeGroup([self.target_node, self.other_node])
2247 # Step: check other node consistency
2248 self.lu.LogStep(2, steps_total, "Check peer consistency")
2249 self._CheckDisksConsistency(self.other_node,
2250 self.other_node == self.instance.primary_node,
2253 # Step: create new storage
2254 self.lu.LogStep(3, steps_total, "Allocate new storage")
2255 iv_names = self._CreateNewStorage(self.target_node)
2257 # Step: for each lv, detach+rename*2+attach
2258 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2259 for dev, old_lvs, new_lvs in iv_names.itervalues():
2260 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2262 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2264 result.Raise("Can't detach drbd from local storage on node"
2265 " %s for device %s" % (self.target_node, dev.iv_name))
2267 #cfg.Update(instance)
2269 # ok, we created the new LVs, so now we know we have the needed
2270 # storage; as such, we proceed on the target node to rename
2271 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2272 # using the assumption that logical_id == physical_id (which in
2273 # turn is the unique_id on that node)
2275 # FIXME(iustin): use a better name for the replaced LVs
2276 temp_suffix = int(time.time())
2277 ren_fn = lambda d, suff: (d.physical_id[0],
2278 d.physical_id[1] + "_replaced-%s" % suff)
2280 # Build the rename list based on what LVs exist on the node
2281 rename_old_to_new = []
2282 for to_ren in old_lvs:
2283 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2284 if not result.fail_msg and result.payload:
2286 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2288 self.lu.LogInfo("Renaming the old LVs on the target node")
2289 result = self.rpc.call_blockdev_rename(self.target_node,
2291 result.Raise("Can't rename old LVs on node %s" % self.target_node)
2293 # Now we rename the new LVs to the old LVs
2294 self.lu.LogInfo("Renaming the new LVs on the target node")
2295 rename_new_to_old = [(new, old.physical_id)
2296 for old, new in zip(old_lvs, new_lvs)]
2297 result = self.rpc.call_blockdev_rename(self.target_node,
2299 result.Raise("Can't rename new LVs on node %s" % self.target_node)
2301 # Intermediate steps of in memory modifications
2302 for old, new in zip(old_lvs, new_lvs):
2303 new.logical_id = old.logical_id
2304 self.cfg.SetDiskID(new, self.target_node)
2306 # We need to modify old_lvs so that removal later removes the
2307 # right LVs, not the newly added ones; note that old_lvs is a
2309 for disk in old_lvs:
2310 disk.logical_id = ren_fn(disk, temp_suffix)
2311 self.cfg.SetDiskID(disk, self.target_node)
2313 # Now that the new lvs have the old name, we can add them to the device
2314 self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2315 result = self.rpc.call_blockdev_addchildren(self.target_node,
2316 (dev, self.instance), new_lvs)
2317 msg = result.fail_msg
2319 for new_lv in new_lvs:
2320 msg2 = self.rpc.call_blockdev_remove(self.target_node,
2323 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2324 hint=("cleanup manually the unused logical"
2326 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2328 cstep = itertools.count(5)
2330 if self.early_release:
2331 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2332 self._RemoveOldStorage(self.target_node, iv_names)
2333 # TODO: Check if releasing locks early still makes sense
2334 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2336 # Release all resource locks except those used by the instance
2337 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2338 keep=self.node_secondary_ip.keys())
2340 # Release all node locks while waiting for sync
2341 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2343 # TODO: Can the instance lock be downgraded here? Take the optional disk
2344 # shutdown in the caller into consideration.
2347 # This can fail as the old devices are degraded and _WaitForSync
2348 # does a combined result over all disks, so we don't check its return value
2349 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2350 WaitForSync(self.lu, self.instance)
2352 # Check all devices manually
2353 self._CheckDevices(self.instance.primary_node, iv_names)
2355 # Step: remove old storage
2356 if not self.early_release:
2357 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2358 self._RemoveOldStorage(self.target_node, iv_names)
2360 def _ExecDrbd8Secondary(self, feedback_fn):
2361 """Replace the secondary node for DRBD 8.
2363 The algorithm for replace is quite complicated:
2364 - for all disks of the instance:
2365 - create new LVs on the new node with same names
2366 - shutdown the drbd device on the old secondary
2367 - disconnect the drbd network on the primary
2368 - create the drbd device on the new secondary
2369 - network attach the drbd on the primary, using an artifice:
2370 the drbd code for Attach() will connect to the network if it
2371 finds a device which is connected to the good local disks but
2373 - wait for sync across all devices
2374 - remove all disks from the old secondary
2376 Failures are not very well handled.
2381 pnode = self.instance.primary_node
2383 # Step: check device activation
2384 self.lu.LogStep(1, steps_total, "Check device existence")
2385 self._CheckDisksExistence([self.instance.primary_node])
2386 self._CheckVolumeGroup([self.instance.primary_node])
2388 # Step: check other node consistency
2389 self.lu.LogStep(2, steps_total, "Check peer consistency")
2390 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2392 # Step: create new storage
2393 self.lu.LogStep(3, steps_total, "Allocate new storage")
2394 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2395 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2396 for idx, dev in enumerate(disks):
2397 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2398 (self.new_node, idx))
2399 # we pass force_create=True to force LVM creation
2400 for new_lv in dev.children:
2402 _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2403 True, GetInstanceInfoText(self.instance), False,
2405 except errors.DeviceCreationError, e:
2406 raise errors.OpExecError("Can't create block device: %s" % e.message)
2408 # Step 4: dbrd minors and drbd setups changes
2409 # after this, we must manually remove the drbd minors on both the
2410 # error and the success paths
2411 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2412 minors = self.cfg.AllocateDRBDMinor([self.new_node
2413 for dev in self.instance.disks],
2415 logging.debug("Allocated minors %r", minors)
2418 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2419 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2420 (self.new_node, idx))
2421 # create new devices on new_node; note that we create two IDs:
2422 # one without port, so the drbd will be activated without
2423 # networking information on the new node at this stage, and one
2424 # with network, for the latter activation in step 4
2425 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2426 if self.instance.primary_node == o_node1:
2429 assert self.instance.primary_node == o_node2, "Three-node instance?"
2432 new_alone_id = (self.instance.primary_node, self.new_node, None,
2433 p_minor, new_minor, o_secret)
2434 new_net_id = (self.instance.primary_node, self.new_node, o_port,
2435 p_minor, new_minor, o_secret)
2437 iv_names[idx] = (dev, dev.children, new_net_id)
2438 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2440 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2441 logical_id=new_alone_id,
2442 children=dev.children,
2445 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2448 CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2450 GetInstanceInfoText(self.instance), False,
2452 except errors.GenericError:
2453 self.cfg.ReleaseDRBDMinors(self.instance.name)
2456 # We have new devices, shutdown the drbd on the old secondary
2457 for idx, dev in enumerate(self.instance.disks):
2458 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2459 self.cfg.SetDiskID(dev, self.target_node)
2460 msg = self.rpc.call_blockdev_shutdown(self.target_node,
2461 (dev, self.instance)).fail_msg
2463 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2464 "node: %s" % (idx, msg),
2465 hint=("Please cleanup this device manually as"
2466 " soon as possible"))
2468 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2469 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2470 self.instance.disks)[pnode]
2472 msg = result.fail_msg
2474 # detaches didn't succeed (unlikely)
2475 self.cfg.ReleaseDRBDMinors(self.instance.name)
2476 raise errors.OpExecError("Can't detach the disks from the network on"
2477 " old node: %s" % (msg,))
2479 # if we managed to detach at least one, we update all the disks of
2480 # the instance to point to the new secondary
2481 self.lu.LogInfo("Updating instance configuration")
2482 for dev, _, new_logical_id in iv_names.itervalues():
2483 dev.logical_id = new_logical_id
2484 self.cfg.SetDiskID(dev, self.instance.primary_node)
2486 self.cfg.Update(self.instance, feedback_fn)
2488 # Release all node locks (the configuration has been updated)
2489 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2491 # and now perform the drbd attach
2492 self.lu.LogInfo("Attaching primary drbds to new secondary"
2493 " (standalone => connected)")
2494 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2496 self.node_secondary_ip,
2497 (self.instance.disks, self.instance),
2500 for to_node, to_result in result.items():
2501 msg = to_result.fail_msg
2503 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2505 hint=("please do a gnt-instance info to see the"
2506 " status of disks"))
2508 cstep = itertools.count(5)
2510 if self.early_release:
2511 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2512 self._RemoveOldStorage(self.target_node, iv_names)
2513 # TODO: Check if releasing locks early still makes sense
2514 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2516 # Release all resource locks except those used by the instance
2517 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2518 keep=self.node_secondary_ip.keys())
2520 # TODO: Can the instance lock be downgraded here? Take the optional disk
2521 # shutdown in the caller into consideration.
2524 # This can fail as the old devices are degraded and _WaitForSync
2525 # does a combined result over all disks, so we don't check its return value
2526 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2527 WaitForSync(self.lu, self.instance)
2529 # Check all devices manually
2530 self._CheckDevices(self.instance.primary_node, iv_names)
2532 # Step: remove old storage
2533 if not self.early_release:
2534 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2535 self._RemoveOldStorage(self.target_node, iv_names)