4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with storage of instances."""
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48 import ganeti.masterd.instance
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59 constants.DT_PLAIN: constants.LD_LV,
60 constants.DT_FILE: constants.LD_FILE,
61 constants.DT_SHARED_FILE: constants.LD_FILE,
62 constants.DT_BLOCK: constants.LD_BLOCKDEV,
63 constants.DT_RBD: constants.LD_RBD,
64 constants.DT_EXT: constants.LD_EXT,
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
70 """Create a single block device on a given node.
72 This will not recurse over children of the device, so they must be
75 @param lu: the lu on whose behalf we execute
76 @param node: the node on which to create the device
77 @type instance: L{objects.Instance}
78 @param instance: the instance which owns the device
79 @type device: L{objects.Disk}
80 @param device: the device to create
81 @param info: the extra 'metadata' we should attach to the device
82 (this will be represented as a LVM tag)
83 @type force_open: boolean
84 @param force_open: this parameter will be passes to the
85 L{backend.BlockdevCreate} function where it specifies
86 whether we run on primary or not, and it affects both
87 the child assembly and the device own Open() execution
88 @type excl_stor: boolean
89 @param excl_stor: Whether exclusive_storage is active for the node
92 lu.cfg.SetDiskID(device, node)
93 result = lu.rpc.call_blockdev_create(node, device, device.size,
94 instance.name, force_open, info,
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device, node, instance.name))
98 if device.physical_id is None:
99 device.physical_id = result.payload
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node.
106 If this device type has to be created on secondaries, create it and
109 If not, just recurse to children keeping the same 'force' value.
111 @attention: The device has to be annotated already.
113 @param lu: the lu on whose behalf we execute
114 @param node: the node on which to create the device
115 @type instance: L{objects.Instance}
116 @param instance: the instance which owns the device
117 @type device: L{objects.Disk}
118 @param device: the device to create
119 @type force_create: boolean
120 @param force_create: whether to force creation of this device; this
121 will be change to True whenever we find a device which has
122 CreateOnSecondary() attribute
123 @param info: the extra 'metadata' we should attach to the device
124 (this will be represented as a LVM tag)
125 @type force_open: boolean
126 @param force_open: this parameter will be passes to the
127 L{backend.BlockdevCreate} function where it specifies
128 whether we run on primary or not, and it affects both
129 the child assembly and the device own Open() execution
130 @type excl_stor: boolean
131 @param excl_stor: Whether exclusive_storage is active for the node
133 @return: list of created devices
137 if device.CreateOnSecondary():
141 for child in device.children:
142 devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143 info, force_open, excl_stor)
144 created_devices.extend(devs)
147 return created_devices
149 CreateSingleBlockDev(lu, node, instance, device, info, force_open,
151 # The device has been completely created, so there is no point in keeping
152 # its subdevices in the list. We just add the device itself instead.
153 created_devices = [(node, device)]
154 return created_devices
156 except errors.DeviceCreationError, e:
157 e.created_devices.extend(created_devices)
159 except errors.OpExecError, e:
160 raise errors.DeviceCreationError(str(e), created_devices)
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164 """Whether exclusive_storage is in effect for the given node.
166 @type cfg: L{config.ConfigWriter}
167 @param cfg: The cluster configuration
168 @type nodename: string
169 @param nodename: The node
171 @return: The effective value of exclusive_storage
172 @raise errors.OpPrereqError: if no node exists with the given name
175 ni = cfg.GetNodeInfo(nodename)
177 raise errors.OpPrereqError("Invalid node name %s" % nodename,
179 return IsExclusiveStorageEnabledNode(cfg, ni)
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
184 """Wrapper around L{_CreateBlockDevInner}.
186 This method annotates the root device first.
189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190 excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191 return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192 force_open, excl_stor)
195 def _UndoCreateDisks(lu, disks_created):
196 """Undo the work performed by L{CreateDisks}.
198 This function is called in case of an error to undo the work of
201 @type lu: L{LogicalUnit}
202 @param lu: the logical unit on whose behalf we execute
203 @param disks_created: the result returned by L{CreateDisks}
206 for (node, disk) in disks_created:
207 lu.cfg.SetDiskID(disk, node)
208 result = lu.rpc.call_blockdev_remove(node, disk)
210 logging.warning("Failed to remove newly-created disk %s on node %s:"
211 " %s", disk, node, result.fail_msg)
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215 """Create all disks for an instance.
217 This abstracts away some work from AddInstance.
219 @type lu: L{LogicalUnit}
220 @param lu: the logical unit on whose behalf we execute
221 @type instance: L{objects.Instance}
222 @param instance: the instance whose disks we should create
224 @param to_skip: list of indices to skip
225 @type target_node: string
226 @param target_node: if passed, overrides the target node for creation
227 @type disks: list of {objects.Disk}
228 @param disks: the disks to create; if not specified, all the disks of the
230 @return: information about the created disks, to be used to call
232 @raise errors.OpPrereqError: in case of error
235 info = GetInstanceInfoText(instance)
236 if target_node is None:
237 pnode = instance.primary_node
238 all_nodes = instance.all_nodes
244 disks = instance.disks
246 if instance.disk_template in constants.DTS_FILEBASED:
247 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
250 result.Raise("Failed to create directory '%s' on"
251 " node %s" % (file_storage_dir, pnode))
254 for idx, device in enumerate(disks):
255 if to_skip and idx in to_skip:
257 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258 for node in all_nodes:
259 f_create = node == pnode
261 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262 disks_created.append((node, device))
263 except errors.DeviceCreationError, e:
264 logging.warning("Creating disk %s for instance '%s' failed",
266 disks_created.extend(e.created_devices)
267 _UndoCreateDisks(lu, disks_created)
268 raise errors.OpExecError(e.message)
272 def ComputeDiskSizePerVG(disk_template, disks):
273 """Compute disk size requirements in the volume group
276 def _compute(disks, payload):
277 """Universal algorithm.
282 vgs[disk[constants.IDISK_VG]] = \
283 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
287 # Required free disk space as a function of disk and swap space
289 constants.DT_DISKLESS: {},
290 constants.DT_PLAIN: _compute(disks, 0),
291 # 128 MB are added for drbd metadata for each disk
292 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293 constants.DT_FILE: {},
294 constants.DT_SHARED_FILE: {},
297 if disk_template not in req_size_dict:
298 raise errors.ProgrammerError("Disk template '%s' size requirement"
299 " is unknown" % disk_template)
301 return req_size_dict[disk_template]
304 def ComputeDisks(op, default_vg):
305 """Computes the instance disks.
307 @param op: The instance opcode
308 @param default_vg: The default_vg to assume
310 @return: The computed disks
314 for disk in op.disks:
315 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316 if mode not in constants.DISK_ACCESS_SET:
317 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318 mode, errors.ECODE_INVAL)
319 size = disk.get(constants.IDISK_SIZE, None)
321 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
324 except (TypeError, ValueError):
325 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
328 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329 if ext_provider and op.disk_template != constants.DT_EXT:
330 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331 " disk template, not %s" %
332 (constants.IDISK_PROVIDER, constants.DT_EXT,
333 op.disk_template), errors.ECODE_INVAL)
335 data_vg = disk.get(constants.IDISK_VG, default_vg)
336 name = disk.get(constants.IDISK_NAME, None)
337 if name is not None and name.lower() == constants.VALUE_NONE:
340 constants.IDISK_SIZE: size,
341 constants.IDISK_MODE: mode,
342 constants.IDISK_VG: data_vg,
343 constants.IDISK_NAME: name,
347 constants.IDISK_METAVG,
348 constants.IDISK_ADOPT,
349 constants.IDISK_SPINDLES,
352 new_disk[key] = disk[key]
354 # For extstorage, demand the `provider' option and add any
355 # additional parameters (ext-params) to the dict
356 if op.disk_template == constants.DT_EXT:
358 new_disk[constants.IDISK_PROVIDER] = ext_provider
360 if key not in constants.IDISK_PARAMS:
361 new_disk[key] = disk[key]
363 raise errors.OpPrereqError("Missing provider for template '%s'" %
364 constants.DT_EXT, errors.ECODE_INVAL)
366 disks.append(new_disk)
371 def CheckRADOSFreeSpace():
372 """Compute disk size requirements inside the RADOS cluster.
375 # For the RADOS cluster we assume there is always enough space.
379 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380 iv_name, p_minor, s_minor):
381 """Generate a drbd8 device complete with its children.
384 assert len(vgnames) == len(names) == 2
385 port = lu.cfg.AllocatePort()
386 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
388 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389 logical_id=(vgnames[0], names[0]),
391 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392 dev_meta = objects.Disk(dev_type=constants.LD_LV,
393 size=constants.DRBD_META_SIZE,
394 logical_id=(vgnames[1], names[1]),
396 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398 logical_id=(primary, secondary, port,
401 children=[dev_data, dev_meta],
402 iv_name=iv_name, params={})
403 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
407 def GenerateDiskTemplate(
408 lu, template_name, instance_name, primary_node, secondary_nodes,
409 disk_info, file_storage_dir, file_driver, base_index,
410 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412 """Generate the entire disk layout for a given template type.
415 vgname = lu.cfg.GetVGName()
416 disk_count = len(disk_info)
419 if template_name == constants.DT_DISKLESS:
421 elif template_name == constants.DT_DRBD8:
422 if len(secondary_nodes) != 1:
423 raise errors.ProgrammerError("Wrong template configuration")
424 remote_node = secondary_nodes[0]
425 minors = lu.cfg.AllocateDRBDMinor(
426 [primary_node, remote_node] * len(disk_info), instance_name)
428 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
430 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
433 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434 for i in range(disk_count)]):
435 names.append(lv_prefix + "_data")
436 names.append(lv_prefix + "_meta")
437 for idx, disk in enumerate(disk_info):
438 disk_index = idx + base_index
439 data_vg = disk.get(constants.IDISK_VG, vgname)
440 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442 disk[constants.IDISK_SIZE],
444 names[idx * 2:idx * 2 + 2],
445 "disk/%d" % disk_index,
446 minors[idx * 2], minors[idx * 2 + 1])
447 disk_dev.mode = disk[constants.IDISK_MODE]
448 disk_dev.name = disk.get(constants.IDISK_NAME, None)
449 disks.append(disk_dev)
452 raise errors.ProgrammerError("Wrong template configuration")
454 if template_name == constants.DT_FILE:
456 elif template_name == constants.DT_SHARED_FILE:
457 _req_shr_file_storage()
459 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460 if name_prefix is None:
463 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464 (name_prefix, base_index + i)
465 for i in range(disk_count)])
467 if template_name == constants.DT_PLAIN:
469 def logical_id_fn(idx, _, disk):
470 vg = disk.get(constants.IDISK_VG, vgname)
471 return (vg, names[idx])
473 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
475 lambda _, disk_index, disk: (file_driver,
476 "%s/disk%d" % (file_storage_dir,
478 elif template_name == constants.DT_BLOCK:
480 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481 disk[constants.IDISK_ADOPT])
482 elif template_name == constants.DT_RBD:
483 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484 elif template_name == constants.DT_EXT:
485 def logical_id_fn(idx, _, disk):
486 provider = disk.get(constants.IDISK_PROVIDER, None)
488 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489 " not found", constants.DT_EXT,
490 constants.IDISK_PROVIDER)
491 return (provider, names[idx])
493 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
495 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
497 for idx, disk in enumerate(disk_info):
499 # Only for the Ext template add disk_info to params
500 if template_name == constants.DT_EXT:
501 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
503 if key not in constants.IDISK_PARAMS:
504 params[key] = disk[key]
505 disk_index = idx + base_index
506 size = disk[constants.IDISK_SIZE]
507 feedback_fn("* disk %s, size %s" %
508 (disk_index, utils.FormatUnit(size, "h")))
509 disk_dev = objects.Disk(dev_type=dev_type, size=size,
510 logical_id=logical_id_fn(idx, disk_index, disk),
511 iv_name="disk/%d" % disk_index,
512 mode=disk[constants.IDISK_MODE],
514 spindles=disk.get(constants.IDISK_SPINDLES))
515 disk_dev.name = disk.get(constants.IDISK_NAME, None)
516 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517 disks.append(disk_dev)
522 def CheckSpindlesExclusiveStorage(diskdict, es_flag):
523 """Check the presence of the spindle options with exclusive_storage.
526 @param diskdict: disk parameters
528 @param es_flag: the effective value of the exlusive_storage flag
529 @raise errors.OpPrereqError when spindles are given and they should not
532 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
533 diskdict[constants.IDISK_SPINDLES] is not None):
534 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
535 " when exclusive storage is not active",
539 class LUInstanceRecreateDisks(LogicalUnit):
540 """Recreate an instance's missing disks.
543 HPATH = "instance-recreate-disks"
544 HTYPE = constants.HTYPE_INSTANCE
547 _MODIFYABLE = compat.UniqueFrozenset([
548 constants.IDISK_SIZE,
549 constants.IDISK_MODE,
550 constants.IDISK_SPINDLES,
553 # New or changed disk parameters may have different semantics
554 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555 constants.IDISK_ADOPT,
557 # TODO: Implement support changing VG while recreating
559 constants.IDISK_METAVG,
560 constants.IDISK_PROVIDER,
561 constants.IDISK_NAME,
564 def _RunAllocator(self):
565 """Run the allocator based on input opcode.
568 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
571 # The allocator should actually run in "relocate" mode, but current
572 # allocators don't support relocating all the nodes of an instance at
573 # the same time. As a workaround we use "allocate" mode, but this is
574 # suboptimal for two reasons:
575 # - The instance name passed to the allocator is present in the list of
576 # existing instances, so there could be a conflict within the
577 # internal structures of the allocator. This doesn't happen with the
578 # current allocators, but it's a liability.
579 # - The allocator counts the resources used by the instance twice: once
580 # because the instance exists already, and once because it tries to
581 # allocate a new instance.
582 # The allocator could choose some of the nodes on which the instance is
583 # running, but that's not a problem. If the instance nodes are broken,
584 # they should be already be marked as drained or offline, and hence
585 # skipped by the allocator. If instance disks have been lost for other
586 # reasons, then recreating the disks on the same nodes should be fine.
587 disk_template = self.instance.disk_template
588 spindle_use = be_full[constants.BE_SPINDLE_USE]
589 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
590 disk_template=disk_template,
591 tags=list(self.instance.GetTags()),
594 vcpus=be_full[constants.BE_VCPUS],
595 memory=be_full[constants.BE_MAXMEM],
596 spindle_use=spindle_use,
597 disks=[{constants.IDISK_SIZE: d.size,
598 constants.IDISK_MODE: d.mode}
599 for d in self.instance.disks],
600 hypervisor=self.instance.hypervisor,
602 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
604 ial.Run(self.op.iallocator)
606 assert req.RequiredNodes() == len(self.instance.all_nodes)
609 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
610 " %s" % (self.op.iallocator, ial.info),
613 self.op.nodes = ial.result
614 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
615 self.op.instance_name, self.op.iallocator,
616 utils.CommaJoin(ial.result))
618 def CheckArguments(self):
619 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
620 # Normalize and convert deprecated list of disk indices
621 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
623 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
625 raise errors.OpPrereqError("Some disks have been specified more than"
626 " once: %s" % utils.CommaJoin(duplicates),
629 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
630 # when neither iallocator nor nodes are specified
631 if self.op.iallocator or self.op.nodes:
632 CheckIAllocatorOrNode(self, "iallocator", "nodes")
634 for (idx, params) in self.op.disks:
635 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
636 unsupported = frozenset(params.keys()) - self._MODIFYABLE
638 raise errors.OpPrereqError("Parameters for disk %s try to change"
639 " unmodifyable parameter(s): %s" %
640 (idx, utils.CommaJoin(unsupported)),
643 def ExpandNames(self):
644 self._ExpandAndLockInstance()
645 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
648 self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
649 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
651 self.needed_locks[locking.LEVEL_NODE] = []
652 if self.op.iallocator:
653 # iallocator will select a new node in the same group
654 self.needed_locks[locking.LEVEL_NODEGROUP] = []
655 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
657 self.needed_locks[locking.LEVEL_NODE_RES] = []
659 def DeclareLocks(self, level):
660 if level == locking.LEVEL_NODEGROUP:
661 assert self.op.iallocator is not None
662 assert not self.op.nodes
663 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
664 self.share_locks[locking.LEVEL_NODEGROUP] = 1
665 # Lock the primary group used by the instance optimistically; this
666 # requires going via the node before it's locked, requiring
667 # verification later on
668 self.needed_locks[locking.LEVEL_NODEGROUP] = \
669 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
671 elif level == locking.LEVEL_NODE:
672 # If an allocator is used, then we lock all the nodes in the current
673 # instance group, as we don't know yet which ones will be selected;
674 # if we replace the nodes without using an allocator, locks are
675 # already declared in ExpandNames; otherwise, we need to lock all the
676 # instance nodes for disk re-creation
677 if self.op.iallocator:
678 assert not self.op.nodes
679 assert not self.needed_locks[locking.LEVEL_NODE]
680 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
682 # Lock member nodes of the group of the primary node
683 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
684 self.needed_locks[locking.LEVEL_NODE].extend(
685 self.cfg.GetNodeGroup(group_uuid).members)
687 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
688 elif not self.op.nodes:
689 self._LockInstancesNodes(primary_only=False)
690 elif level == locking.LEVEL_NODE_RES:
692 self.needed_locks[locking.LEVEL_NODE_RES] = \
693 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
695 def BuildHooksEnv(self):
698 This runs on master, primary and secondary nodes of the instance.
701 return BuildInstanceHookEnvByObject(self, self.instance)
703 def BuildHooksNodes(self):
704 """Build hooks nodes.
707 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
710 def CheckPrereq(self):
711 """Check prerequisites.
713 This checks that the instance is in the cluster and is not running.
716 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
717 assert instance is not None, \
718 "Cannot retrieve locked instance %s" % self.op.instance_name
720 if len(self.op.nodes) != len(instance.all_nodes):
721 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
722 " %d replacement nodes were specified" %
723 (instance.name, len(instance.all_nodes),
726 assert instance.disk_template != constants.DT_DRBD8 or \
727 len(self.op.nodes) == 2
728 assert instance.disk_template != constants.DT_PLAIN or \
729 len(self.op.nodes) == 1
730 primary_node = self.op.nodes[0]
732 primary_node = instance.primary_node
733 if not self.op.iallocator:
734 CheckNodeOnline(self, primary_node)
736 if instance.disk_template == constants.DT_DISKLESS:
737 raise errors.OpPrereqError("Instance '%s' has no disks" %
738 self.op.instance_name, errors.ECODE_INVAL)
740 # Verify if node group locks are still correct
741 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
743 # Node group locks are acquired only for the primary node (and only
744 # when the allocator is used)
745 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
748 # if we replace nodes *and* the old primary is offline, we don't
749 # check the instance state
750 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
751 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
752 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
753 msg="cannot recreate disks")
756 self.disks = dict(self.op.disks)
758 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
760 maxidx = max(self.disks.keys())
761 if maxidx >= len(instance.disks):
762 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
765 if ((self.op.nodes or self.op.iallocator) and
766 sorted(self.disks.keys()) != range(len(instance.disks))):
767 raise errors.OpPrereqError("Can't recreate disks partially and"
768 " change the nodes at the same time",
771 self.instance = instance
773 if self.op.iallocator:
775 # Release unneeded node and node resource locks
776 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
777 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
778 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
780 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
783 nodes = self.op.nodes
785 nodes = instance.all_nodes
786 excl_stor = compat.any(
787 rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
789 for new_params in self.disks.values():
790 CheckSpindlesExclusiveStorage(new_params, excl_stor)
792 def Exec(self, feedback_fn):
793 """Recreate the disks.
796 instance = self.instance
798 assert (self.owned_locks(locking.LEVEL_NODE) ==
799 self.owned_locks(locking.LEVEL_NODE_RES))
802 mods = [] # keeps track of needed changes
804 for idx, disk in enumerate(instance.disks):
806 changes = self.disks[idx]
808 # Disk should not be recreated
812 # update secondaries for disks, if needed
813 if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
814 # need to update the nodes and minors
815 assert len(self.op.nodes) == 2
816 assert len(disk.logical_id) == 6 # otherwise disk internals
818 (_, _, old_port, _, _, old_secret) = disk.logical_id
819 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
820 new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
821 new_minors[0], new_minors[1], old_secret)
822 assert len(disk.logical_id) == len(new_id)
826 mods.append((idx, new_id, changes))
828 # now that we have passed all asserts above, we can apply the mods
829 # in a single run (to avoid partial changes)
830 for idx, new_id, changes in mods:
831 disk = instance.disks[idx]
832 if new_id is not None:
833 assert disk.dev_type == constants.LD_DRBD8
834 disk.logical_id = new_id
836 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
837 mode=changes.get(constants.IDISK_MODE, None),
838 spindles=changes.get(constants.IDISK_SPINDLES, None))
840 # change primary node, if needed
842 instance.primary_node = self.op.nodes[0]
843 self.LogWarning("Changing the instance's nodes, you will have to"
844 " remove any disks left on the older nodes manually")
847 self.cfg.Update(instance, feedback_fn)
849 # All touched nodes must be locked
850 mylocks = self.owned_locks(locking.LEVEL_NODE)
851 assert mylocks.issuperset(frozenset(instance.all_nodes))
852 new_disks = CreateDisks(self, instance, to_skip=to_skip)
854 # TODO: Release node locks before wiping, or explain why it's not possible
855 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
856 wipedisks = [(idx, disk, 0)
857 for (idx, disk) in enumerate(instance.disks)
858 if idx not in to_skip]
859 WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
862 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
863 """Checks if nodes have enough free disk space in the specified VG.
865 This function checks if all given nodes have the needed amount of
866 free disk. In case any node has less disk or we cannot get the
867 information from the node, this function raises an OpPrereqError
870 @type lu: C{LogicalUnit}
871 @param lu: a logical unit from which we get configuration data
872 @type nodenames: C{list}
873 @param nodenames: the list of node names to check
875 @param vg: the volume group to check
876 @type requested: C{int}
877 @param requested: the amount of disk in MiB to check for
878 @raise errors.OpPrereqError: if the node doesn't have enough disk,
879 or we cannot check the node
882 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
883 # FIXME: This maps everything to storage type 'lvm-vg' to maintain
884 # the current functionality. Refactor to make it more flexible.
885 nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
887 for node in nodenames:
888 info = nodeinfo[node]
889 info.Raise("Cannot get current information from node %s" % node,
890 prereq=True, ecode=errors.ECODE_ENVIRON)
891 (_, (vg_info, ), _) = info.payload
892 vg_free = vg_info.get("vg_free", None)
893 if not isinstance(vg_free, int):
894 raise errors.OpPrereqError("Can't compute free disk space on node"
895 " %s for vg %s, result was '%s'" %
896 (node, vg, vg_free), errors.ECODE_ENVIRON)
897 if requested > vg_free:
898 raise errors.OpPrereqError("Not enough disk space on target node %s"
899 " vg %s: required %d MiB, available %d MiB" %
900 (node, vg, requested, vg_free),
904 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
905 """Checks if nodes have enough free disk space in all the VGs.
907 This function checks if all given nodes have the needed amount of
908 free disk. In case any node has less disk or we cannot get the
909 information from the node, this function raises an OpPrereqError
912 @type lu: C{LogicalUnit}
913 @param lu: a logical unit from which we get configuration data
914 @type nodenames: C{list}
915 @param nodenames: the list of node names to check
916 @type req_sizes: C{dict}
917 @param req_sizes: the hash of vg and corresponding amount of disk in
919 @raise errors.OpPrereqError: if the node doesn't have enough disk,
920 or we cannot check the node
923 for vg, req_size in req_sizes.items():
924 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
927 def _DiskSizeInBytesToMebibytes(lu, size):
928 """Converts a disk size in bytes to mebibytes.
930 Warns and rounds up if the size isn't an even multiple of 1 MiB.
933 (mib, remainder) = divmod(size, 1024 * 1024)
936 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
937 " to not overwrite existing data (%s bytes will not be"
938 " wiped)", (1024 * 1024) - remainder)
944 def _CalcEta(time_taken, written, total_size):
945 """Calculates the ETA based on size written and total size.
947 @param time_taken: The time taken so far
948 @param written: amount written so far
949 @param total_size: The total size of data to be written
950 @return: The remaining time in seconds
953 avg_time = time_taken / float(written)
954 return (total_size - written) * avg_time
957 def WipeDisks(lu, instance, disks=None):
958 """Wipes instance disks.
960 @type lu: L{LogicalUnit}
961 @param lu: the logical unit on whose behalf we execute
962 @type instance: L{objects.Instance}
963 @param instance: the instance whose disks we should create
964 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
965 @param disks: Disk details; tuple contains disk index, disk object and the
969 node = instance.primary_node
972 disks = [(idx, disk, 0)
973 for (idx, disk) in enumerate(instance.disks)]
975 for (_, device, _) in disks:
976 lu.cfg.SetDiskID(device, node)
978 logging.info("Pausing synchronization of disks of instance '%s'",
980 result = lu.rpc.call_blockdev_pause_resume_sync(node,
981 (map(compat.snd, disks),
984 result.Raise("Failed to pause disk synchronization on node '%s'" % node)
986 for idx, success in enumerate(result.payload):
988 logging.warn("Pausing synchronization of disk %s of instance '%s'"
989 " failed", idx, instance.name)
992 for (idx, device, offset) in disks:
993 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
994 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
996 int(min(constants.MAX_WIPE_CHUNK,
997 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1001 start_time = time.time()
1006 info_text = (" (from %s to %s)" %
1007 (utils.FormatUnit(offset, "h"),
1008 utils.FormatUnit(size, "h")))
1010 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1012 logging.info("Wiping disk %d for instance %s on node %s using"
1013 " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1015 while offset < size:
1016 wipe_size = min(wipe_chunk_size, size - offset)
1018 logging.debug("Wiping disk %d, offset %s, chunk %s",
1019 idx, offset, wipe_size)
1021 result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1023 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1024 (idx, offset, wipe_size))
1028 if now - last_output >= 60:
1029 eta = _CalcEta(now - start_time, offset, size)
1030 lu.LogInfo(" - done: %.1f%% ETA: %s",
1031 offset / float(size) * 100, utils.FormatSeconds(eta))
1034 logging.info("Resuming synchronization of disks for instance '%s'",
1037 result = lu.rpc.call_blockdev_pause_resume_sync(node,
1038 (map(compat.snd, disks),
1043 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1044 node, result.fail_msg)
1046 for idx, success in enumerate(result.payload):
1048 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1049 " failed", idx, instance.name)
1052 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1053 """Wrapper for L{WipeDisks} that handles errors.
1055 @type lu: L{LogicalUnit}
1056 @param lu: the logical unit on whose behalf we execute
1057 @type instance: L{objects.Instance}
1058 @param instance: the instance whose disks we should wipe
1059 @param disks: see L{WipeDisks}
1060 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1062 @raise errors.OpPrereqError: in case of failure
1066 WipeDisks(lu, instance, disks=disks)
1067 except errors.OpExecError:
1068 logging.warning("Wiping disks for instance '%s' failed",
1070 _UndoCreateDisks(lu, cleanup)
1074 def ExpandCheckDisks(instance, disks):
1075 """Return the instance disks selected by the disks list
1077 @type disks: list of L{objects.Disk} or None
1078 @param disks: selected disks
1079 @rtype: list of L{objects.Disk}
1080 @return: selected instance disks to act on
1084 return instance.disks
1086 if not set(disks).issubset(instance.disks):
1087 raise errors.ProgrammerError("Can only act on disks belonging to the"
1092 def WaitForSync(lu, instance, disks=None, oneshot=False):
1093 """Sleep and poll for an instance's disk to sync.
1096 if not instance.disks or disks is not None and not disks:
1099 disks = ExpandCheckDisks(instance, disks)
1102 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1104 node = instance.primary_node
1107 lu.cfg.SetDiskID(dev, node)
1109 # TODO: Convert to utils.Retry
1112 degr_retries = 10 # in seconds, as we sleep 1 second each time
1116 cumul_degraded = False
1117 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1118 msg = rstats.fail_msg
1120 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1123 raise errors.RemoteError("Can't contact node %s for mirror data,"
1124 " aborting." % node)
1127 rstats = rstats.payload
1129 for i, mstat in enumerate(rstats):
1131 lu.LogWarning("Can't compute data for node %s/%s",
1132 node, disks[i].iv_name)
1135 cumul_degraded = (cumul_degraded or
1136 (mstat.is_degraded and mstat.sync_percent is None))
1137 if mstat.sync_percent is not None:
1139 if mstat.estimated_time is not None:
1140 rem_time = ("%s remaining (estimated)" %
1141 utils.FormatSeconds(mstat.estimated_time))
1142 max_time = mstat.estimated_time
1144 rem_time = "no time estimate"
1145 lu.LogInfo("- device %s: %5.2f%% done, %s",
1146 disks[i].iv_name, mstat.sync_percent, rem_time)
1148 # if we're done but degraded, let's do a few small retries, to
1149 # make sure we see a stable and not transient situation; therefore
1150 # we force restart of the loop
1151 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1152 logging.info("Degraded disks found, %d retries left", degr_retries)
1160 time.sleep(min(60, max_time))
1163 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1165 return not cumul_degraded
1168 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1169 """Shutdown block devices of an instance.
1171 This does the shutdown on all nodes of the instance.
1173 If the ignore_primary is false, errors on the primary node are
1178 disks = ExpandCheckDisks(instance, disks)
1181 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1182 lu.cfg.SetDiskID(top_disk, node)
1183 result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1184 msg = result.fail_msg
1186 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1187 disk.iv_name, node, msg)
1188 if ((node == instance.primary_node and not ignore_primary) or
1189 (node != instance.primary_node and not result.offline)):
1194 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1195 """Shutdown block devices of an instance.
1197 This function checks if an instance is running, before calling
1198 _ShutdownInstanceDisks.
1201 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1202 ShutdownInstanceDisks(lu, instance, disks=disks)
1205 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1207 """Prepare the block devices for an instance.
1209 This sets up the block devices on all nodes.
1211 @type lu: L{LogicalUnit}
1212 @param lu: the logical unit on whose behalf we execute
1213 @type instance: L{objects.Instance}
1214 @param instance: the instance for whose disks we assemble
1215 @type disks: list of L{objects.Disk} or None
1216 @param disks: which disks to assemble (or all, if None)
1217 @type ignore_secondaries: boolean
1218 @param ignore_secondaries: if true, errors on secondary nodes
1219 won't result in an error return from the function
1220 @type ignore_size: boolean
1221 @param ignore_size: if true, the current known size of the disk
1222 will not be used during the disk activation, useful for cases
1223 when the size is wrong
1224 @return: False if the operation failed, otherwise a list of
1225 (host, instance_visible_name, node_visible_name)
1226 with the mapping from node devices to instance devices
1231 iname = instance.name
1232 disks = ExpandCheckDisks(instance, disks)
1234 # With the two passes mechanism we try to reduce the window of
1235 # opportunity for the race condition of switching DRBD to primary
1236 # before handshaking occured, but we do not eliminate it
1238 # The proper fix would be to wait (with some limits) until the
1239 # connection has been made and drbd transitions from WFConnection
1240 # into any other network-connected state (Connected, SyncTarget,
1243 # 1st pass, assemble on all nodes in secondary mode
1244 for idx, inst_disk in enumerate(disks):
1245 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1247 node_disk = node_disk.Copy()
1248 node_disk.UnsetSize()
1249 lu.cfg.SetDiskID(node_disk, node)
1250 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1252 msg = result.fail_msg
1254 is_offline_secondary = (node in instance.secondary_nodes and
1256 lu.LogWarning("Could not prepare block device %s on node %s"
1257 " (is_primary=False, pass=1): %s",
1258 inst_disk.iv_name, node, msg)
1259 if not (ignore_secondaries or is_offline_secondary):
1262 # FIXME: race condition on drbd migration to primary
1264 # 2nd pass, do only the primary node
1265 for idx, inst_disk in enumerate(disks):
1268 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1269 if node != instance.primary_node:
1272 node_disk = node_disk.Copy()
1273 node_disk.UnsetSize()
1274 lu.cfg.SetDiskID(node_disk, node)
1275 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1277 msg = result.fail_msg
1279 lu.LogWarning("Could not prepare block device %s on node %s"
1280 " (is_primary=True, pass=2): %s",
1281 inst_disk.iv_name, node, msg)
1284 dev_path = result.payload
1286 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1288 # leave the disks configured for the primary node
1289 # this is a workaround that would be fixed better by
1290 # improving the logical/physical id handling
1292 lu.cfg.SetDiskID(disk, instance.primary_node)
1294 return disks_ok, device_info
1297 def StartInstanceDisks(lu, instance, force):
1298 """Start the disks of an instance.
1301 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1302 ignore_secondaries=force)
1304 ShutdownInstanceDisks(lu, instance)
1305 if force is not None and not force:
1307 hint=("If the message above refers to a secondary node,"
1308 " you can retry the operation using '--force'"))
1309 raise errors.OpExecError("Disk consistency error")
1312 class LUInstanceGrowDisk(LogicalUnit):
1313 """Grow a disk of an instance.
1317 HTYPE = constants.HTYPE_INSTANCE
1320 def ExpandNames(self):
1321 self._ExpandAndLockInstance()
1322 self.needed_locks[locking.LEVEL_NODE] = []
1323 self.needed_locks[locking.LEVEL_NODE_RES] = []
1324 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1325 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1327 def DeclareLocks(self, level):
1328 if level == locking.LEVEL_NODE:
1329 self._LockInstancesNodes()
1330 elif level == locking.LEVEL_NODE_RES:
1332 self.needed_locks[locking.LEVEL_NODE_RES] = \
1333 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1335 def BuildHooksEnv(self):
1338 This runs on the master, the primary and all the secondaries.
1342 "DISK": self.op.disk,
1343 "AMOUNT": self.op.amount,
1344 "ABSOLUTE": self.op.absolute,
1346 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1349 def BuildHooksNodes(self):
1350 """Build hooks nodes.
1353 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1356 def CheckPrereq(self):
1357 """Check prerequisites.
1359 This checks that the instance is in the cluster.
1362 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1363 assert instance is not None, \
1364 "Cannot retrieve locked instance %s" % self.op.instance_name
1365 nodenames = list(instance.all_nodes)
1366 for node in nodenames:
1367 CheckNodeOnline(self, node)
1369 self.instance = instance
1371 if instance.disk_template not in constants.DTS_GROWABLE:
1372 raise errors.OpPrereqError("Instance's disk layout does not support"
1373 " growing", errors.ECODE_INVAL)
1375 self.disk = instance.FindDisk(self.op.disk)
1377 if self.op.absolute:
1378 self.target = self.op.amount
1379 self.delta = self.target - self.disk.size
1381 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1382 "current disk size (%s)" %
1383 (utils.FormatUnit(self.target, "h"),
1384 utils.FormatUnit(self.disk.size, "h")),
1387 self.delta = self.op.amount
1388 self.target = self.disk.size + self.delta
1390 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1391 utils.FormatUnit(self.delta, "h"),
1394 self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1396 def _CheckDiskSpace(self, nodenames, req_vgspace):
1397 template = self.instance.disk_template
1398 if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1399 # TODO: check the free disk space for file, when that feature will be
1401 nodes = map(self.cfg.GetNodeInfo, nodenames)
1402 es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1405 # With exclusive storage we need to something smarter than just looking
1406 # at free space; for now, let's simply abort the operation.
1407 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1408 " is enabled", errors.ECODE_STATE)
1409 CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1411 def Exec(self, feedback_fn):
1412 """Execute disk grow.
1415 instance = self.instance
1418 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1419 assert (self.owned_locks(locking.LEVEL_NODE) ==
1420 self.owned_locks(locking.LEVEL_NODE_RES))
1422 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1424 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1426 raise errors.OpExecError("Cannot activate block device to grow")
1428 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1429 (self.op.disk, instance.name,
1430 utils.FormatUnit(self.delta, "h"),
1431 utils.FormatUnit(self.target, "h")))
1433 # First run all grow ops in dry-run mode
1434 for node in instance.all_nodes:
1435 self.cfg.SetDiskID(disk, node)
1436 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1438 result.Raise("Dry-run grow request failed to node %s" % node)
1441 # Get disk size from primary node for wiping
1442 result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1443 result.Raise("Failed to retrieve disk size from node '%s'" %
1444 instance.primary_node)
1446 (disk_size_in_bytes, ) = result.payload
1448 if disk_size_in_bytes is None:
1449 raise errors.OpExecError("Failed to retrieve disk size from primary"
1450 " node '%s'" % instance.primary_node)
1452 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1454 assert old_disk_size >= disk.size, \
1455 ("Retrieved disk size too small (got %s, should be at least %s)" %
1456 (old_disk_size, disk.size))
1458 old_disk_size = None
1460 # We know that (as far as we can test) operations across different
1461 # nodes will succeed, time to run it for real on the backing storage
1462 for node in instance.all_nodes:
1463 self.cfg.SetDiskID(disk, node)
1464 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1466 result.Raise("Grow request failed to node %s" % node)
1468 # And now execute it for logical storage, on the primary node
1469 node = instance.primary_node
1470 self.cfg.SetDiskID(disk, node)
1471 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1473 result.Raise("Grow request failed to node %s" % node)
1475 disk.RecordGrow(self.delta)
1476 self.cfg.Update(instance, feedback_fn)
1478 # Changes have been recorded, release node lock
1479 ReleaseLocks(self, locking.LEVEL_NODE)
1481 # Downgrade lock while waiting for sync
1482 self.glm.downgrade(locking.LEVEL_INSTANCE)
1484 assert wipe_disks ^ (old_disk_size is None)
1487 assert instance.disks[self.op.disk] == disk
1489 # Wipe newly added disk space
1490 WipeDisks(self, instance,
1491 disks=[(self.op.disk, disk, old_disk_size)])
1493 if self.op.wait_for_sync:
1494 disk_abort = not WaitForSync(self, instance, disks=[disk])
1496 self.LogWarning("Disk syncing has not returned a good status; check"
1498 if instance.admin_state != constants.ADMINST_UP:
1499 _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1500 elif instance.admin_state != constants.ADMINST_UP:
1501 self.LogWarning("Not shutting down the disk even if the instance is"
1502 " not supposed to be running because no wait for"
1503 " sync mode was requested")
1505 assert self.owned_locks(locking.LEVEL_NODE_RES)
1506 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1509 class LUInstanceReplaceDisks(LogicalUnit):
1510 """Replace the disks of an instance.
1513 HPATH = "mirrors-replace"
1514 HTYPE = constants.HTYPE_INSTANCE
1517 def CheckArguments(self):
1521 remote_node = self.op.remote_node
1522 ialloc = self.op.iallocator
1523 if self.op.mode == constants.REPLACE_DISK_CHG:
1524 if remote_node is None and ialloc is None:
1525 raise errors.OpPrereqError("When changing the secondary either an"
1526 " iallocator script must be used or the"
1527 " new node given", errors.ECODE_INVAL)
1529 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1531 elif remote_node is not None or ialloc is not None:
1532 # Not replacing the secondary
1533 raise errors.OpPrereqError("The iallocator and new node options can"
1534 " only be used when changing the"
1535 " secondary node", errors.ECODE_INVAL)
1537 def ExpandNames(self):
1538 self._ExpandAndLockInstance()
1540 assert locking.LEVEL_NODE not in self.needed_locks
1541 assert locking.LEVEL_NODE_RES not in self.needed_locks
1542 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1544 assert self.op.iallocator is None or self.op.remote_node is None, \
1545 "Conflicting options"
1547 if self.op.remote_node is not None:
1548 self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1550 # Warning: do not remove the locking of the new secondary here
1551 # unless DRBD8Dev.AddChildren is changed to work in parallel;
1552 # currently it doesn't since parallel invocations of
1553 # FindUnusedMinor will conflict
1554 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1555 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1557 self.needed_locks[locking.LEVEL_NODE] = []
1558 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1560 if self.op.iallocator is not None:
1561 # iallocator will select a new node in the same group
1562 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1563 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1565 self.needed_locks[locking.LEVEL_NODE_RES] = []
1567 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1568 self.op.iallocator, self.op.remote_node,
1569 self.op.disks, self.op.early_release,
1570 self.op.ignore_ipolicy)
1572 self.tasklets = [self.replacer]
1574 def DeclareLocks(self, level):
1575 if level == locking.LEVEL_NODEGROUP:
1576 assert self.op.remote_node is None
1577 assert self.op.iallocator is not None
1578 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1580 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1581 # Lock all groups used by instance optimistically; this requires going
1582 # via the node before it's locked, requiring verification later on
1583 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1584 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1586 elif level == locking.LEVEL_NODE:
1587 if self.op.iallocator is not None:
1588 assert self.op.remote_node is None
1589 assert not self.needed_locks[locking.LEVEL_NODE]
1590 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1592 # Lock member nodes of all locked groups
1593 self.needed_locks[locking.LEVEL_NODE] = \
1595 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1596 for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1598 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1600 self._LockInstancesNodes()
1602 elif level == locking.LEVEL_NODE_RES:
1604 self.needed_locks[locking.LEVEL_NODE_RES] = \
1605 self.needed_locks[locking.LEVEL_NODE]
1607 def BuildHooksEnv(self):
1610 This runs on the master, the primary and all the secondaries.
1613 instance = self.replacer.instance
1615 "MODE": self.op.mode,
1616 "NEW_SECONDARY": self.op.remote_node,
1617 "OLD_SECONDARY": instance.secondary_nodes[0],
1619 env.update(BuildInstanceHookEnvByObject(self, instance))
1622 def BuildHooksNodes(self):
1623 """Build hooks nodes.
1626 instance = self.replacer.instance
1628 self.cfg.GetMasterNode(),
1629 instance.primary_node,
1631 if self.op.remote_node is not None:
1632 nl.append(self.op.remote_node)
1635 def CheckPrereq(self):
1636 """Check prerequisites.
1639 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1640 self.op.iallocator is None)
1642 # Verify if node group locks are still correct
1643 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1645 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1647 return LogicalUnit.CheckPrereq(self)
1650 class LUInstanceActivateDisks(NoHooksLU):
1651 """Bring up an instance's disks.
1656 def ExpandNames(self):
1657 self._ExpandAndLockInstance()
1658 self.needed_locks[locking.LEVEL_NODE] = []
1659 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1661 def DeclareLocks(self, level):
1662 if level == locking.LEVEL_NODE:
1663 self._LockInstancesNodes()
1665 def CheckPrereq(self):
1666 """Check prerequisites.
1668 This checks that the instance is in the cluster.
1671 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1672 assert self.instance is not None, \
1673 "Cannot retrieve locked instance %s" % self.op.instance_name
1674 CheckNodeOnline(self, self.instance.primary_node)
1676 def Exec(self, feedback_fn):
1677 """Activate the disks.
1680 disks_ok, disks_info = \
1681 AssembleInstanceDisks(self, self.instance,
1682 ignore_size=self.op.ignore_size)
1684 raise errors.OpExecError("Cannot activate block devices")
1686 if self.op.wait_for_sync:
1687 if not WaitForSync(self, self.instance):
1688 raise errors.OpExecError("Some disks of the instance are degraded!")
1693 class LUInstanceDeactivateDisks(NoHooksLU):
1694 """Shutdown an instance's disks.
1699 def ExpandNames(self):
1700 self._ExpandAndLockInstance()
1701 self.needed_locks[locking.LEVEL_NODE] = []
1702 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1704 def DeclareLocks(self, level):
1705 if level == locking.LEVEL_NODE:
1706 self._LockInstancesNodes()
1708 def CheckPrereq(self):
1709 """Check prerequisites.
1711 This checks that the instance is in the cluster.
1714 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1715 assert self.instance is not None, \
1716 "Cannot retrieve locked instance %s" % self.op.instance_name
1718 def Exec(self, feedback_fn):
1719 """Deactivate the disks
1722 instance = self.instance
1724 ShutdownInstanceDisks(self, instance)
1726 _SafeShutdownInstanceDisks(self, instance)
1729 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1731 """Check that mirrors are not degraded.
1733 @attention: The device has to be annotated already.
1735 The ldisk parameter, if True, will change the test from the
1736 is_degraded attribute (which represents overall non-ok status for
1737 the device(s)) to the ldisk (representing the local storage status).
1740 lu.cfg.SetDiskID(dev, node)
1744 if on_primary or dev.AssembleOnSecondary():
1745 rstats = lu.rpc.call_blockdev_find(node, dev)
1746 msg = rstats.fail_msg
1748 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1750 elif not rstats.payload:
1751 lu.LogWarning("Can't find disk on node %s", node)
1755 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1757 result = result and not rstats.payload.is_degraded
1760 for child in dev.children:
1761 result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1767 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1768 """Wrapper around L{_CheckDiskConsistencyInner}.
1771 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1772 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1776 def _BlockdevFind(lu, node, dev, instance):
1777 """Wrapper around call_blockdev_find to annotate diskparams.
1779 @param lu: A reference to the lu object
1780 @param node: The node to call out
1781 @param dev: The device to find
1782 @param instance: The instance object the device belongs to
1783 @returns The result of the rpc call
1786 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1787 return lu.rpc.call_blockdev_find(node, disk)
1790 def _GenerateUniqueNames(lu, exts):
1791 """Generate a suitable LV name.
1793 This will generate a logical volume name for the given instance.
1798 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1799 results.append("%s%s" % (new_id, val))
1803 class TLReplaceDisks(Tasklet):
1804 """Replaces disks for an instance.
1806 Note: Locking is not within the scope of this class.
1809 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1810 disks, early_release, ignore_ipolicy):
1811 """Initializes this class.
1814 Tasklet.__init__(self, lu)
1817 self.instance_name = instance_name
1819 self.iallocator_name = iallocator_name
1820 self.remote_node = remote_node
1822 self.early_release = early_release
1823 self.ignore_ipolicy = ignore_ipolicy
1826 self.instance = None
1827 self.new_node = None
1828 self.target_node = None
1829 self.other_node = None
1830 self.remote_node_info = None
1831 self.node_secondary_ip = None
1834 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1835 """Compute a new secondary node using an IAllocator.
1838 req = iallocator.IAReqRelocate(name=instance_name,
1839 relocate_from=list(relocate_from))
1840 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1842 ial.Run(iallocator_name)
1845 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1846 " %s" % (iallocator_name, ial.info),
1849 remote_node_name = ial.result[0]
1851 lu.LogInfo("Selected new secondary for instance '%s': %s",
1852 instance_name, remote_node_name)
1854 return remote_node_name
1856 def _FindFaultyDisks(self, node_name):
1857 """Wrapper for L{FindFaultyInstanceDisks}.
1860 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1863 def _CheckDisksActivated(self, instance):
1864 """Checks if the instance disks are activated.
1866 @param instance: The instance to check disks
1867 @return: True if they are activated, False otherwise
1870 nodes = instance.all_nodes
1872 for idx, dev in enumerate(instance.disks):
1874 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1875 self.cfg.SetDiskID(dev, node)
1877 result = _BlockdevFind(self, node, dev, instance)
1881 elif result.fail_msg or not result.payload:
1886 def CheckPrereq(self):
1887 """Check prerequisites.
1889 This checks that the instance is in the cluster.
1892 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1893 assert instance is not None, \
1894 "Cannot retrieve locked instance %s" % self.instance_name
1896 if instance.disk_template != constants.DT_DRBD8:
1897 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1898 " instances", errors.ECODE_INVAL)
1900 if len(instance.secondary_nodes) != 1:
1901 raise errors.OpPrereqError("The instance has a strange layout,"
1902 " expected one secondary but found %d" %
1903 len(instance.secondary_nodes),
1906 instance = self.instance
1907 secondary_node = instance.secondary_nodes[0]
1909 if self.iallocator_name is None:
1910 remote_node = self.remote_node
1912 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1913 instance.name, instance.secondary_nodes)
1915 if remote_node is None:
1916 self.remote_node_info = None
1918 assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1919 "Remote node '%s' is not locked" % remote_node
1921 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1922 assert self.remote_node_info is not None, \
1923 "Cannot retrieve locked node %s" % remote_node
1925 if remote_node == self.instance.primary_node:
1926 raise errors.OpPrereqError("The specified node is the primary node of"
1927 " the instance", errors.ECODE_INVAL)
1929 if remote_node == secondary_node:
1930 raise errors.OpPrereqError("The specified node is already the"
1931 " secondary node of the instance",
1934 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1935 constants.REPLACE_DISK_CHG):
1936 raise errors.OpPrereqError("Cannot specify disks to be replaced",
1939 if self.mode == constants.REPLACE_DISK_AUTO:
1940 if not self._CheckDisksActivated(instance):
1941 raise errors.OpPrereqError("Please run activate-disks on instance %s"
1942 " first" % self.instance_name,
1944 faulty_primary = self._FindFaultyDisks(instance.primary_node)
1945 faulty_secondary = self._FindFaultyDisks(secondary_node)
1947 if faulty_primary and faulty_secondary:
1948 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1949 " one node and can not be repaired"
1950 " automatically" % self.instance_name,
1954 self.disks = faulty_primary
1955 self.target_node = instance.primary_node
1956 self.other_node = secondary_node
1957 check_nodes = [self.target_node, self.other_node]
1958 elif faulty_secondary:
1959 self.disks = faulty_secondary
1960 self.target_node = secondary_node
1961 self.other_node = instance.primary_node
1962 check_nodes = [self.target_node, self.other_node]
1968 # Non-automatic modes
1969 if self.mode == constants.REPLACE_DISK_PRI:
1970 self.target_node = instance.primary_node
1971 self.other_node = secondary_node
1972 check_nodes = [self.target_node, self.other_node]
1974 elif self.mode == constants.REPLACE_DISK_SEC:
1975 self.target_node = secondary_node
1976 self.other_node = instance.primary_node
1977 check_nodes = [self.target_node, self.other_node]
1979 elif self.mode == constants.REPLACE_DISK_CHG:
1980 self.new_node = remote_node
1981 self.other_node = instance.primary_node
1982 self.target_node = secondary_node
1983 check_nodes = [self.new_node, self.other_node]
1985 CheckNodeNotDrained(self.lu, remote_node)
1986 CheckNodeVmCapable(self.lu, remote_node)
1988 old_node_info = self.cfg.GetNodeInfo(secondary_node)
1989 assert old_node_info is not None
1990 if old_node_info.offline and not self.early_release:
1991 # doesn't make sense to delay the release
1992 self.early_release = True
1993 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1994 " early-release mode", secondary_node)
1997 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2000 # If not specified all disks should be replaced
2002 self.disks = range(len(self.instance.disks))
2004 # TODO: This is ugly, but right now we can't distinguish between internal
2005 # submitted opcode and external one. We should fix that.
2006 if self.remote_node_info:
2007 # We change the node, lets verify it still meets instance policy
2008 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2009 cluster = self.cfg.GetClusterInfo()
2010 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2012 CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2013 self.cfg, ignore=self.ignore_ipolicy)
2015 for node in check_nodes:
2016 CheckNodeOnline(self.lu, node)
2018 touched_nodes = frozenset(node_name for node_name in [self.new_node,
2021 if node_name is not None)
2023 # Release unneeded node and node resource locks
2024 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2025 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2026 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2028 # Release any owned node group
2029 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2031 # Check whether disks are valid
2032 for disk_idx in self.disks:
2033 instance.FindDisk(disk_idx)
2035 # Get secondary node IP addresses
2036 self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2037 in self.cfg.GetMultiNodeInfo(touched_nodes))
2039 def Exec(self, feedback_fn):
2040 """Execute disk replacement.
2042 This dispatches the disk replacement to the appropriate handler.
2046 # Verify owned locks before starting operation
2047 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2048 assert set(owned_nodes) == set(self.node_secondary_ip), \
2049 ("Incorrect node locks, owning %s, expected %s" %
2050 (owned_nodes, self.node_secondary_ip.keys()))
2051 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2052 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2053 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2055 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2056 assert list(owned_instances) == [self.instance_name], \
2057 "Instance '%s' not locked" % self.instance_name
2059 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2060 "Should not own any node group lock at this point"
2063 feedback_fn("No disks need replacement for instance '%s'" %
2067 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2068 (utils.CommaJoin(self.disks), self.instance.name))
2069 feedback_fn("Current primary node: %s" % self.instance.primary_node)
2070 feedback_fn("Current seconary node: %s" %
2071 utils.CommaJoin(self.instance.secondary_nodes))
2073 activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2075 # Activate the instance disks if we're replacing them on a down instance
2077 StartInstanceDisks(self.lu, self.instance, True)
2080 # Should we replace the secondary node?
2081 if self.new_node is not None:
2082 fn = self._ExecDrbd8Secondary
2084 fn = self._ExecDrbd8DiskOnly
2086 result = fn(feedback_fn)
2088 # Deactivate the instance disks if we're replacing them on a
2091 _SafeShutdownInstanceDisks(self.lu, self.instance)
2093 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2096 # Verify owned locks
2097 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2098 nodes = frozenset(self.node_secondary_ip)
2099 assert ((self.early_release and not owned_nodes) or
2100 (not self.early_release and not (set(owned_nodes) - nodes))), \
2101 ("Not owning the correct locks, early_release=%s, owned=%r,"
2102 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2106 def _CheckVolumeGroup(self, nodes):
2107 self.lu.LogInfo("Checking volume groups")
2109 vgname = self.cfg.GetVGName()
2111 # Make sure volume group exists on all involved nodes
2112 results = self.rpc.call_vg_list(nodes)
2114 raise errors.OpExecError("Can't list volume groups on the nodes")
2118 res.Raise("Error checking node %s" % node)
2119 if vgname not in res.payload:
2120 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2123 def _CheckDisksExistence(self, nodes):
2124 # Check disk existence
2125 for idx, dev in enumerate(self.instance.disks):
2126 if idx not in self.disks:
2130 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2131 self.cfg.SetDiskID(dev, node)
2133 result = _BlockdevFind(self, node, dev, self.instance)
2135 msg = result.fail_msg
2136 if msg or not result.payload:
2138 msg = "disk not found"
2139 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2142 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2143 for idx, dev in enumerate(self.instance.disks):
2144 if idx not in self.disks:
2147 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2150 if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2151 on_primary, ldisk=ldisk):
2152 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2153 " replace disks for instance %s" %
2154 (node_name, self.instance.name))
2156 def _CreateNewStorage(self, node_name):
2157 """Create new storage on the primary or secondary node.
2159 This is only used for same-node replaces, not for changing the
2160 secondary node, hence we don't want to modify the existing disk.
2165 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2166 for idx, dev in enumerate(disks):
2167 if idx not in self.disks:
2170 self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2172 self.cfg.SetDiskID(dev, node_name)
2174 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2175 names = _GenerateUniqueNames(self.lu, lv_names)
2177 (data_disk, meta_disk) = dev.children
2178 vg_data = data_disk.logical_id[0]
2179 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2180 logical_id=(vg_data, names[0]),
2181 params=data_disk.params)
2182 vg_meta = meta_disk.logical_id[0]
2183 lv_meta = objects.Disk(dev_type=constants.LD_LV,
2184 size=constants.DRBD_META_SIZE,
2185 logical_id=(vg_meta, names[1]),
2186 params=meta_disk.params)
2188 new_lvs = [lv_data, lv_meta]
2189 old_lvs = [child.Copy() for child in dev.children]
2190 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2191 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2193 # we pass force_create=True to force the LVM creation
2194 for new_lv in new_lvs:
2195 _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2196 GetInstanceInfoText(self.instance), False,
2201 def _CheckDevices(self, node_name, iv_names):
2202 for name, (dev, _, _) in iv_names.iteritems():
2203 self.cfg.SetDiskID(dev, node_name)
2205 result = _BlockdevFind(self, node_name, dev, self.instance)
2207 msg = result.fail_msg
2208 if msg or not result.payload:
2210 msg = "disk not found"
2211 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2214 if result.payload.is_degraded:
2215 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2217 def _RemoveOldStorage(self, node_name, iv_names):
2218 for name, (_, old_lvs, _) in iv_names.iteritems():
2219 self.lu.LogInfo("Remove logical volumes for %s", name)
2222 self.cfg.SetDiskID(lv, node_name)
2224 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2226 self.lu.LogWarning("Can't remove old LV: %s", msg,
2227 hint="remove unused LVs manually")
2229 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2230 """Replace a disk on the primary or secondary for DRBD 8.
2232 The algorithm for replace is quite complicated:
2234 1. for each disk to be replaced:
2236 1. create new LVs on the target node with unique names
2237 1. detach old LVs from the drbd device
2238 1. rename old LVs to name_replaced.<time_t>
2239 1. rename new LVs to old LVs
2240 1. attach the new LVs (with the old names now) to the drbd device
2242 1. wait for sync across all devices
2244 1. for each modified disk:
2246 1. remove old LVs (which have the name name_replaces.<time_t>)
2248 Failures are not very well handled.
2253 # Step: check device activation
2254 self.lu.LogStep(1, steps_total, "Check device existence")
2255 self._CheckDisksExistence([self.other_node, self.target_node])
2256 self._CheckVolumeGroup([self.target_node, self.other_node])
2258 # Step: check other node consistency
2259 self.lu.LogStep(2, steps_total, "Check peer consistency")
2260 self._CheckDisksConsistency(self.other_node,
2261 self.other_node == self.instance.primary_node,
2264 # Step: create new storage
2265 self.lu.LogStep(3, steps_total, "Allocate new storage")
2266 iv_names = self._CreateNewStorage(self.target_node)
2268 # Step: for each lv, detach+rename*2+attach
2269 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2270 for dev, old_lvs, new_lvs in iv_names.itervalues():
2271 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2273 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2275 result.Raise("Can't detach drbd from local storage on node"
2276 " %s for device %s" % (self.target_node, dev.iv_name))
2278 #cfg.Update(instance)
2280 # ok, we created the new LVs, so now we know we have the needed
2281 # storage; as such, we proceed on the target node to rename
2282 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2283 # using the assumption that logical_id == physical_id (which in
2284 # turn is the unique_id on that node)
2286 # FIXME(iustin): use a better name for the replaced LVs
2287 temp_suffix = int(time.time())
2288 ren_fn = lambda d, suff: (d.physical_id[0],
2289 d.physical_id[1] + "_replaced-%s" % suff)
2291 # Build the rename list based on what LVs exist on the node
2292 rename_old_to_new = []
2293 for to_ren in old_lvs:
2294 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2295 if not result.fail_msg and result.payload:
2297 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2299 self.lu.LogInfo("Renaming the old LVs on the target node")
2300 result = self.rpc.call_blockdev_rename(self.target_node,
2302 result.Raise("Can't rename old LVs on node %s" % self.target_node)
2304 # Now we rename the new LVs to the old LVs
2305 self.lu.LogInfo("Renaming the new LVs on the target node")
2306 rename_new_to_old = [(new, old.physical_id)
2307 for old, new in zip(old_lvs, new_lvs)]
2308 result = self.rpc.call_blockdev_rename(self.target_node,
2310 result.Raise("Can't rename new LVs on node %s" % self.target_node)
2312 # Intermediate steps of in memory modifications
2313 for old, new in zip(old_lvs, new_lvs):
2314 new.logical_id = old.logical_id
2315 self.cfg.SetDiskID(new, self.target_node)
2317 # We need to modify old_lvs so that removal later removes the
2318 # right LVs, not the newly added ones; note that old_lvs is a
2320 for disk in old_lvs:
2321 disk.logical_id = ren_fn(disk, temp_suffix)
2322 self.cfg.SetDiskID(disk, self.target_node)
2324 # Now that the new lvs have the old name, we can add them to the device
2325 self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2326 result = self.rpc.call_blockdev_addchildren(self.target_node,
2327 (dev, self.instance), new_lvs)
2328 msg = result.fail_msg
2330 for new_lv in new_lvs:
2331 msg2 = self.rpc.call_blockdev_remove(self.target_node,
2334 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2335 hint=("cleanup manually the unused logical"
2337 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2339 cstep = itertools.count(5)
2341 if self.early_release:
2342 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2343 self._RemoveOldStorage(self.target_node, iv_names)
2344 # TODO: Check if releasing locks early still makes sense
2345 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2347 # Release all resource locks except those used by the instance
2348 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2349 keep=self.node_secondary_ip.keys())
2351 # Release all node locks while waiting for sync
2352 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2354 # TODO: Can the instance lock be downgraded here? Take the optional disk
2355 # shutdown in the caller into consideration.
2358 # This can fail as the old devices are degraded and _WaitForSync
2359 # does a combined result over all disks, so we don't check its return value
2360 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2361 WaitForSync(self.lu, self.instance)
2363 # Check all devices manually
2364 self._CheckDevices(self.instance.primary_node, iv_names)
2366 # Step: remove old storage
2367 if not self.early_release:
2368 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2369 self._RemoveOldStorage(self.target_node, iv_names)
2371 def _ExecDrbd8Secondary(self, feedback_fn):
2372 """Replace the secondary node for DRBD 8.
2374 The algorithm for replace is quite complicated:
2375 - for all disks of the instance:
2376 - create new LVs on the new node with same names
2377 - shutdown the drbd device on the old secondary
2378 - disconnect the drbd network on the primary
2379 - create the drbd device on the new secondary
2380 - network attach the drbd on the primary, using an artifice:
2381 the drbd code for Attach() will connect to the network if it
2382 finds a device which is connected to the good local disks but
2384 - wait for sync across all devices
2385 - remove all disks from the old secondary
2387 Failures are not very well handled.
2392 pnode = self.instance.primary_node
2394 # Step: check device activation
2395 self.lu.LogStep(1, steps_total, "Check device existence")
2396 self._CheckDisksExistence([self.instance.primary_node])
2397 self._CheckVolumeGroup([self.instance.primary_node])
2399 # Step: check other node consistency
2400 self.lu.LogStep(2, steps_total, "Check peer consistency")
2401 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2403 # Step: create new storage
2404 self.lu.LogStep(3, steps_total, "Allocate new storage")
2405 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2406 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2407 for idx, dev in enumerate(disks):
2408 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2409 (self.new_node, idx))
2410 # we pass force_create=True to force LVM creation
2411 for new_lv in dev.children:
2412 _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2413 True, GetInstanceInfoText(self.instance), False,
2416 # Step 4: dbrd minors and drbd setups changes
2417 # after this, we must manually remove the drbd minors on both the
2418 # error and the success paths
2419 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2420 minors = self.cfg.AllocateDRBDMinor([self.new_node
2421 for dev in self.instance.disks],
2423 logging.debug("Allocated minors %r", minors)
2426 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2427 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2428 (self.new_node, idx))
2429 # create new devices on new_node; note that we create two IDs:
2430 # one without port, so the drbd will be activated without
2431 # networking information on the new node at this stage, and one
2432 # with network, for the latter activation in step 4
2433 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2434 if self.instance.primary_node == o_node1:
2437 assert self.instance.primary_node == o_node2, "Three-node instance?"
2440 new_alone_id = (self.instance.primary_node, self.new_node, None,
2441 p_minor, new_minor, o_secret)
2442 new_net_id = (self.instance.primary_node, self.new_node, o_port,
2443 p_minor, new_minor, o_secret)
2445 iv_names[idx] = (dev, dev.children, new_net_id)
2446 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2448 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2449 logical_id=new_alone_id,
2450 children=dev.children,
2453 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2456 CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2458 GetInstanceInfoText(self.instance), False,
2460 except errors.GenericError:
2461 self.cfg.ReleaseDRBDMinors(self.instance.name)
2464 # We have new devices, shutdown the drbd on the old secondary
2465 for idx, dev in enumerate(self.instance.disks):
2466 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2467 self.cfg.SetDiskID(dev, self.target_node)
2468 msg = self.rpc.call_blockdev_shutdown(self.target_node,
2469 (dev, self.instance)).fail_msg
2471 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2472 "node: %s" % (idx, msg),
2473 hint=("Please cleanup this device manually as"
2474 " soon as possible"))
2476 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2477 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2478 self.instance.disks)[pnode]
2480 msg = result.fail_msg
2482 # detaches didn't succeed (unlikely)
2483 self.cfg.ReleaseDRBDMinors(self.instance.name)
2484 raise errors.OpExecError("Can't detach the disks from the network on"
2485 " old node: %s" % (msg,))
2487 # if we managed to detach at least one, we update all the disks of
2488 # the instance to point to the new secondary
2489 self.lu.LogInfo("Updating instance configuration")
2490 for dev, _, new_logical_id in iv_names.itervalues():
2491 dev.logical_id = new_logical_id
2492 self.cfg.SetDiskID(dev, self.instance.primary_node)
2494 self.cfg.Update(self.instance, feedback_fn)
2496 # Release all node locks (the configuration has been updated)
2497 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2499 # and now perform the drbd attach
2500 self.lu.LogInfo("Attaching primary drbds to new secondary"
2501 " (standalone => connected)")
2502 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2504 self.node_secondary_ip,
2505 (self.instance.disks, self.instance),
2508 for to_node, to_result in result.items():
2509 msg = to_result.fail_msg
2511 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2513 hint=("please do a gnt-instance info to see the"
2514 " status of disks"))
2516 cstep = itertools.count(5)
2518 if self.early_release:
2519 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2520 self._RemoveOldStorage(self.target_node, iv_names)
2521 # TODO: Check if releasing locks early still makes sense
2522 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2524 # Release all resource locks except those used by the instance
2525 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2526 keep=self.node_secondary_ip.keys())
2528 # TODO: Can the instance lock be downgraded here? Take the optional disk
2529 # shutdown in the caller into consideration.
2532 # This can fail as the old devices are degraded and _WaitForSync
2533 # does a combined result over all disks, so we don't check its return value
2534 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2535 WaitForSync(self.lu, self.instance)
2537 # Check all devices manually
2538 self._CheckDevices(self.instance.primary_node, iv_names)
2540 # Step: remove old storage
2541 if not self.early_release:
2542 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2543 self._RemoveOldStorage(self.target_node, iv_names)