4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with storage of instances."""
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
48 import ganeti.masterd.instance
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59 constants.DT_PLAIN: constants.LD_LV,
60 constants.DT_FILE: constants.LD_FILE,
61 constants.DT_SHARED_FILE: constants.LD_FILE,
62 constants.DT_BLOCK: constants.LD_BLOCKDEV,
63 constants.DT_RBD: constants.LD_RBD,
64 constants.DT_EXT: constants.LD_EXT,
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
70 """Create a single block device on a given node.
72 This will not recurse over children of the device, so they must be
75 @param lu: the lu on whose behalf we execute
76 @param node: the node on which to create the device
77 @type instance: L{objects.Instance}
78 @param instance: the instance which owns the device
79 @type device: L{objects.Disk}
80 @param device: the device to create
81 @param info: the extra 'metadata' we should attach to the device
82 (this will be represented as a LVM tag)
83 @type force_open: boolean
84 @param force_open: this parameter will be passes to the
85 L{backend.BlockdevCreate} function where it specifies
86 whether we run on primary or not, and it affects both
87 the child assembly and the device own Open() execution
88 @type excl_stor: boolean
89 @param excl_stor: Whether exclusive_storage is active for the node
92 lu.cfg.SetDiskID(device, node)
93 result = lu.rpc.call_blockdev_create(node, device, device.size,
94 instance.name, force_open, info,
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device, node, instance.name))
98 if device.physical_id is None:
99 device.physical_id = result.payload
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node.
106 If this device type has to be created on secondaries, create it and
109 If not, just recurse to children keeping the same 'force' value.
111 @attention: The device has to be annotated already.
113 @param lu: the lu on whose behalf we execute
114 @param node: the node on which to create the device
115 @type instance: L{objects.Instance}
116 @param instance: the instance which owns the device
117 @type device: L{objects.Disk}
118 @param device: the device to create
119 @type force_create: boolean
120 @param force_create: whether to force creation of this device; this
121 will be change to True whenever we find a device which has
122 CreateOnSecondary() attribute
123 @param info: the extra 'metadata' we should attach to the device
124 (this will be represented as a LVM tag)
125 @type force_open: boolean
126 @param force_open: this parameter will be passes to the
127 L{backend.BlockdevCreate} function where it specifies
128 whether we run on primary or not, and it affects both
129 the child assembly and the device own Open() execution
130 @type excl_stor: boolean
131 @param excl_stor: Whether exclusive_storage is active for the node
133 @return: list of created devices
137 if device.CreateOnSecondary():
141 for child in device.children:
142 devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143 info, force_open, excl_stor)
144 created_devices.extend(devs)
147 return created_devices
149 CreateSingleBlockDev(lu, node, instance, device, info, force_open,
151 # The device has been completely created, so there is no point in keeping
152 # its subdevices in the list. We just add the device itself instead.
153 created_devices = [(node, device)]
154 return created_devices
156 except errors.DeviceCreationError, e:
157 e.created_devices.extend(created_devices)
159 except errors.OpExecError, e:
160 raise errors.DeviceCreationError(str(e), created_devices)
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164 """Whether exclusive_storage is in effect for the given node.
166 @type cfg: L{config.ConfigWriter}
167 @param cfg: The cluster configuration
168 @type nodename: string
169 @param nodename: The node
171 @return: The effective value of exclusive_storage
172 @raise errors.OpPrereqError: if no node exists with the given name
175 ni = cfg.GetNodeInfo(nodename)
177 raise errors.OpPrereqError("Invalid node name %s" % nodename,
179 return IsExclusiveStorageEnabledNode(cfg, ni)
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
184 """Wrapper around L{_CreateBlockDevInner}.
186 This method annotates the root device first.
189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190 excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191 return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192 force_open, excl_stor)
195 def _UndoCreateDisks(lu, disks_created):
196 """Undo the work performed by L{CreateDisks}.
198 This function is called in case of an error to undo the work of
201 @type lu: L{LogicalUnit}
202 @param lu: the logical unit on whose behalf we execute
203 @param disks_created: the result returned by L{CreateDisks}
206 for (node, disk) in disks_created:
207 lu.cfg.SetDiskID(disk, node)
208 result = lu.rpc.call_blockdev_remove(node, disk)
210 logging.warning("Failed to remove newly-created disk %s on node %s:"
211 " %s", disk, node, result.fail_msg)
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215 """Create all disks for an instance.
217 This abstracts away some work from AddInstance.
219 @type lu: L{LogicalUnit}
220 @param lu: the logical unit on whose behalf we execute
221 @type instance: L{objects.Instance}
222 @param instance: the instance whose disks we should create
224 @param to_skip: list of indices to skip
225 @type target_node: string
226 @param target_node: if passed, overrides the target node for creation
227 @type disks: list of {objects.Disk}
228 @param disks: the disks to create; if not specified, all the disks of the
230 @return: information about the created disks, to be used to call
232 @raise errors.OpPrereqError: in case of error
235 info = GetInstanceInfoText(instance)
236 if target_node is None:
237 pnode = instance.primary_node
238 all_nodes = instance.all_nodes
244 disks = instance.disks
246 if instance.disk_template in constants.DTS_FILEBASED:
247 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
250 result.Raise("Failed to create directory '%s' on"
251 " node %s" % (file_storage_dir, pnode))
254 for idx, device in enumerate(disks):
255 if to_skip and idx in to_skip:
257 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258 for node in all_nodes:
259 f_create = node == pnode
261 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262 disks_created.append((node, device))
263 except errors.DeviceCreationError, e:
264 logging.warning("Creating disk %s for instance '%s' failed",
266 disks_created.extend(e.created_devices)
267 _UndoCreateDisks(lu, disks_created)
268 raise errors.OpExecError(e.message)
272 def ComputeDiskSizePerVG(disk_template, disks):
273 """Compute disk size requirements in the volume group
276 def _compute(disks, payload):
277 """Universal algorithm.
282 vgs[disk[constants.IDISK_VG]] = \
283 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
287 # Required free disk space as a function of disk and swap space
289 constants.DT_DISKLESS: {},
290 constants.DT_PLAIN: _compute(disks, 0),
291 # 128 MB are added for drbd metadata for each disk
292 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293 constants.DT_FILE: {},
294 constants.DT_SHARED_FILE: {},
297 if disk_template not in req_size_dict:
298 raise errors.ProgrammerError("Disk template '%s' size requirement"
299 " is unknown" % disk_template)
301 return req_size_dict[disk_template]
304 def ComputeDisks(op, default_vg):
305 """Computes the instance disks.
307 @param op: The instance opcode
308 @param default_vg: The default_vg to assume
310 @return: The computed disks
314 for disk in op.disks:
315 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316 if mode not in constants.DISK_ACCESS_SET:
317 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318 mode, errors.ECODE_INVAL)
319 size = disk.get(constants.IDISK_SIZE, None)
321 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
324 except (TypeError, ValueError):
325 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
328 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329 if ext_provider and op.disk_template != constants.DT_EXT:
330 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331 " disk template, not %s" %
332 (constants.IDISK_PROVIDER, constants.DT_EXT,
333 op.disk_template), errors.ECODE_INVAL)
335 data_vg = disk.get(constants.IDISK_VG, default_vg)
336 name = disk.get(constants.IDISK_NAME, None)
337 if name is not None and name.lower() == constants.VALUE_NONE:
340 constants.IDISK_SIZE: size,
341 constants.IDISK_MODE: mode,
342 constants.IDISK_VG: data_vg,
343 constants.IDISK_NAME: name,
347 constants.IDISK_METAVG,
348 constants.IDISK_ADOPT,
349 constants.IDISK_SPINDLES,
352 new_disk[key] = disk[key]
354 # For extstorage, demand the `provider' option and add any
355 # additional parameters (ext-params) to the dict
356 if op.disk_template == constants.DT_EXT:
358 new_disk[constants.IDISK_PROVIDER] = ext_provider
360 if key not in constants.IDISK_PARAMS:
361 new_disk[key] = disk[key]
363 raise errors.OpPrereqError("Missing provider for template '%s'" %
364 constants.DT_EXT, errors.ECODE_INVAL)
366 disks.append(new_disk)
371 def CheckRADOSFreeSpace():
372 """Compute disk size requirements inside the RADOS cluster.
375 # For the RADOS cluster we assume there is always enough space.
379 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380 iv_name, p_minor, s_minor):
381 """Generate a drbd8 device complete with its children.
384 assert len(vgnames) == len(names) == 2
385 port = lu.cfg.AllocatePort()
386 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
388 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389 logical_id=(vgnames[0], names[0]),
391 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392 dev_meta = objects.Disk(dev_type=constants.LD_LV,
393 size=constants.DRBD_META_SIZE,
394 logical_id=(vgnames[1], names[1]),
396 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398 logical_id=(primary, secondary, port,
401 children=[dev_data, dev_meta],
402 iv_name=iv_name, params={})
403 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
407 def GenerateDiskTemplate(
408 lu, template_name, instance_name, primary_node, secondary_nodes,
409 disk_info, file_storage_dir, file_driver, base_index,
410 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412 """Generate the entire disk layout for a given template type.
415 vgname = lu.cfg.GetVGName()
416 disk_count = len(disk_info)
419 if template_name == constants.DT_DISKLESS:
421 elif template_name == constants.DT_DRBD8:
422 if len(secondary_nodes) != 1:
423 raise errors.ProgrammerError("Wrong template configuration")
424 remote_node = secondary_nodes[0]
425 minors = lu.cfg.AllocateDRBDMinor(
426 [primary_node, remote_node] * len(disk_info), instance_name)
428 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
430 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
433 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434 for i in range(disk_count)]):
435 names.append(lv_prefix + "_data")
436 names.append(lv_prefix + "_meta")
437 for idx, disk in enumerate(disk_info):
438 disk_index = idx + base_index
439 data_vg = disk.get(constants.IDISK_VG, vgname)
440 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442 disk[constants.IDISK_SIZE],
444 names[idx * 2:idx * 2 + 2],
445 "disk/%d" % disk_index,
446 minors[idx * 2], minors[idx * 2 + 1])
447 disk_dev.mode = disk[constants.IDISK_MODE]
448 disk_dev.name = disk.get(constants.IDISK_NAME, None)
449 disks.append(disk_dev)
452 raise errors.ProgrammerError("Wrong template configuration")
454 if template_name == constants.DT_FILE:
456 elif template_name == constants.DT_SHARED_FILE:
457 _req_shr_file_storage()
459 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460 if name_prefix is None:
463 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464 (name_prefix, base_index + i)
465 for i in range(disk_count)])
467 if template_name == constants.DT_PLAIN:
469 def logical_id_fn(idx, _, disk):
470 vg = disk.get(constants.IDISK_VG, vgname)
471 return (vg, names[idx])
473 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
475 lambda _, disk_index, disk: (file_driver,
476 "%s/disk%d" % (file_storage_dir,
478 elif template_name == constants.DT_BLOCK:
480 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481 disk[constants.IDISK_ADOPT])
482 elif template_name == constants.DT_RBD:
483 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484 elif template_name == constants.DT_EXT:
485 def logical_id_fn(idx, _, disk):
486 provider = disk.get(constants.IDISK_PROVIDER, None)
488 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489 " not found", constants.DT_EXT,
490 constants.IDISK_PROVIDER)
491 return (provider, names[idx])
493 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
495 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
497 for idx, disk in enumerate(disk_info):
499 # Only for the Ext template add disk_info to params
500 if template_name == constants.DT_EXT:
501 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
503 if key not in constants.IDISK_PARAMS:
504 params[key] = disk[key]
505 disk_index = idx + base_index
506 size = disk[constants.IDISK_SIZE]
507 feedback_fn("* disk %s, size %s" %
508 (disk_index, utils.FormatUnit(size, "h")))
509 disk_dev = objects.Disk(dev_type=dev_type, size=size,
510 logical_id=logical_id_fn(idx, disk_index, disk),
511 iv_name="disk/%d" % disk_index,
512 mode=disk[constants.IDISK_MODE],
514 spindles=disk.get(constants.IDISK_SPINDLES))
515 disk_dev.name = disk.get(constants.IDISK_NAME, None)
516 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517 disks.append(disk_dev)
522 def CheckSpindlesExclusiveStorage(diskdict, es_flag):
523 """Check the presence of the spindle options with exclusive_storage.
526 @param diskdict: disk parameters
528 @param es_flag: the effective value of the exlusive_storage flag
529 @raise errors.OpPrereqError when spindles are given and they should not
532 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
533 diskdict[constants.IDISK_SPINDLES] is not None):
534 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
535 " when exclusive storage is not active",
539 class LUInstanceRecreateDisks(LogicalUnit):
540 """Recreate an instance's missing disks.
543 HPATH = "instance-recreate-disks"
544 HTYPE = constants.HTYPE_INSTANCE
547 _MODIFYABLE = compat.UniqueFrozenset([
548 constants.IDISK_SIZE,
549 constants.IDISK_MODE,
550 constants.IDISK_SPINDLES,
553 # New or changed disk parameters may have different semantics
554 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555 constants.IDISK_ADOPT,
557 # TODO: Implement support changing VG while recreating
559 constants.IDISK_METAVG,
560 constants.IDISK_PROVIDER,
561 constants.IDISK_NAME,
564 def _RunAllocator(self):
565 """Run the allocator based on input opcode.
568 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
571 # The allocator should actually run in "relocate" mode, but current
572 # allocators don't support relocating all the nodes of an instance at
573 # the same time. As a workaround we use "allocate" mode, but this is
574 # suboptimal for two reasons:
575 # - The instance name passed to the allocator is present in the list of
576 # existing instances, so there could be a conflict within the
577 # internal structures of the allocator. This doesn't happen with the
578 # current allocators, but it's a liability.
579 # - The allocator counts the resources used by the instance twice: once
580 # because the instance exists already, and once because it tries to
581 # allocate a new instance.
582 # The allocator could choose some of the nodes on which the instance is
583 # running, but that's not a problem. If the instance nodes are broken,
584 # they should be already be marked as drained or offline, and hence
585 # skipped by the allocator. If instance disks have been lost for other
586 # reasons, then recreating the disks on the same nodes should be fine.
587 disk_template = self.instance.disk_template
588 spindle_use = be_full[constants.BE_SPINDLE_USE]
589 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
590 disk_template=disk_template,
591 tags=list(self.instance.GetTags()),
594 vcpus=be_full[constants.BE_VCPUS],
595 memory=be_full[constants.BE_MAXMEM],
596 spindle_use=spindle_use,
597 disks=[{constants.IDISK_SIZE: d.size,
598 constants.IDISK_MODE: d.mode}
599 for d in self.instance.disks],
600 hypervisor=self.instance.hypervisor,
602 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
604 ial.Run(self.op.iallocator)
606 assert req.RequiredNodes() == len(self.instance.all_nodes)
609 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
610 " %s" % (self.op.iallocator, ial.info),
613 self.op.nodes = ial.result
614 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
615 self.op.instance_name, self.op.iallocator,
616 utils.CommaJoin(ial.result))
618 def CheckArguments(self):
619 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
620 # Normalize and convert deprecated list of disk indices
621 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
623 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
625 raise errors.OpPrereqError("Some disks have been specified more than"
626 " once: %s" % utils.CommaJoin(duplicates),
629 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
630 # when neither iallocator nor nodes are specified
631 if self.op.iallocator or self.op.nodes:
632 CheckIAllocatorOrNode(self, "iallocator", "nodes")
634 for (idx, params) in self.op.disks:
635 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
636 unsupported = frozenset(params.keys()) - self._MODIFYABLE
638 raise errors.OpPrereqError("Parameters for disk %s try to change"
639 " unmodifyable parameter(s): %s" %
640 (idx, utils.CommaJoin(unsupported)),
643 def ExpandNames(self):
644 self._ExpandAndLockInstance()
645 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
648 self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
649 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
651 self.needed_locks[locking.LEVEL_NODE] = []
652 if self.op.iallocator:
653 # iallocator will select a new node in the same group
654 self.needed_locks[locking.LEVEL_NODEGROUP] = []
655 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
657 self.needed_locks[locking.LEVEL_NODE_RES] = []
659 def DeclareLocks(self, level):
660 if level == locking.LEVEL_NODEGROUP:
661 assert self.op.iallocator is not None
662 assert not self.op.nodes
663 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
664 self.share_locks[locking.LEVEL_NODEGROUP] = 1
665 # Lock the primary group used by the instance optimistically; this
666 # requires going via the node before it's locked, requiring
667 # verification later on
668 self.needed_locks[locking.LEVEL_NODEGROUP] = \
669 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
671 elif level == locking.LEVEL_NODE:
672 # If an allocator is used, then we lock all the nodes in the current
673 # instance group, as we don't know yet which ones will be selected;
674 # if we replace the nodes without using an allocator, locks are
675 # already declared in ExpandNames; otherwise, we need to lock all the
676 # instance nodes for disk re-creation
677 if self.op.iallocator:
678 assert not self.op.nodes
679 assert not self.needed_locks[locking.LEVEL_NODE]
680 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
682 # Lock member nodes of the group of the primary node
683 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
684 self.needed_locks[locking.LEVEL_NODE].extend(
685 self.cfg.GetNodeGroup(group_uuid).members)
687 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
688 elif not self.op.nodes:
689 self._LockInstancesNodes(primary_only=False)
690 elif level == locking.LEVEL_NODE_RES:
692 self.needed_locks[locking.LEVEL_NODE_RES] = \
693 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
695 def BuildHooksEnv(self):
698 This runs on master, primary and secondary nodes of the instance.
701 return BuildInstanceHookEnvByObject(self, self.instance)
703 def BuildHooksNodes(self):
704 """Build hooks nodes.
707 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
710 def CheckPrereq(self):
711 """Check prerequisites.
713 This checks that the instance is in the cluster and is not running.
716 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
717 assert instance is not None, \
718 "Cannot retrieve locked instance %s" % self.op.instance_name
720 if len(self.op.nodes) != len(instance.all_nodes):
721 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
722 " %d replacement nodes were specified" %
723 (instance.name, len(instance.all_nodes),
726 assert instance.disk_template != constants.DT_DRBD8 or \
727 len(self.op.nodes) == 2
728 assert instance.disk_template != constants.DT_PLAIN or \
729 len(self.op.nodes) == 1
730 primary_node = self.op.nodes[0]
732 primary_node = instance.primary_node
733 if not self.op.iallocator:
734 CheckNodeOnline(self, primary_node)
736 if instance.disk_template == constants.DT_DISKLESS:
737 raise errors.OpPrereqError("Instance '%s' has no disks" %
738 self.op.instance_name, errors.ECODE_INVAL)
740 # Verify if node group locks are still correct
741 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
743 # Node group locks are acquired only for the primary node (and only
744 # when the allocator is used)
745 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
748 # if we replace nodes *and* the old primary is offline, we don't
749 # check the instance state
750 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
751 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
752 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
753 msg="cannot recreate disks")
756 self.disks = dict(self.op.disks)
758 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
760 maxidx = max(self.disks.keys())
761 if maxidx >= len(instance.disks):
762 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
765 if ((self.op.nodes or self.op.iallocator) and
766 sorted(self.disks.keys()) != range(len(instance.disks))):
767 raise errors.OpPrereqError("Can't recreate disks partially and"
768 " change the nodes at the same time",
771 self.instance = instance
773 if self.op.iallocator:
775 # Release unneeded node and node resource locks
776 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
777 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
778 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
780 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
783 nodes = self.op.nodes
785 nodes = instance.all_nodes
786 excl_stor = compat.any(
787 rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
789 for new_params in self.disks.values():
790 CheckSpindlesExclusiveStorage(new_params, excl_stor)
792 def Exec(self, feedback_fn):
793 """Recreate the disks.
796 instance = self.instance
798 assert (self.owned_locks(locking.LEVEL_NODE) ==
799 self.owned_locks(locking.LEVEL_NODE_RES))
802 mods = [] # keeps track of needed changes
804 for idx, disk in enumerate(instance.disks):
806 changes = self.disks[idx]
808 # Disk should not be recreated
812 # update secondaries for disks, if needed
813 if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
814 # need to update the nodes and minors
815 assert len(self.op.nodes) == 2
816 assert len(disk.logical_id) == 6 # otherwise disk internals
818 (_, _, old_port, _, _, old_secret) = disk.logical_id
819 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
820 new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
821 new_minors[0], new_minors[1], old_secret)
822 assert len(disk.logical_id) == len(new_id)
826 mods.append((idx, new_id, changes))
828 # now that we have passed all asserts above, we can apply the mods
829 # in a single run (to avoid partial changes)
830 for idx, new_id, changes in mods:
831 disk = instance.disks[idx]
832 if new_id is not None:
833 assert disk.dev_type == constants.LD_DRBD8
834 disk.logical_id = new_id
836 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
837 mode=changes.get(constants.IDISK_MODE, None),
838 spindles=changes.get(constants.IDISK_SPINDLES, None))
840 # change primary node, if needed
842 instance.primary_node = self.op.nodes[0]
843 self.LogWarning("Changing the instance's nodes, you will have to"
844 " remove any disks left on the older nodes manually")
847 self.cfg.Update(instance, feedback_fn)
849 # All touched nodes must be locked
850 mylocks = self.owned_locks(locking.LEVEL_NODE)
851 assert mylocks.issuperset(frozenset(instance.all_nodes))
852 new_disks = CreateDisks(self, instance, to_skip=to_skip)
854 # TODO: Release node locks before wiping, or explain why it's not possible
855 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
856 wipedisks = [(idx, disk, 0)
857 for (idx, disk) in enumerate(instance.disks)
858 if idx not in to_skip]
859 WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
862 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
863 """Checks if nodes have enough free disk space in the specified VG.
865 This function checks if all given nodes have the needed amount of
866 free disk. In case any node has less disk or we cannot get the
867 information from the node, this function raises an OpPrereqError
870 @type lu: C{LogicalUnit}
871 @param lu: a logical unit from which we get configuration data
872 @type nodenames: C{list}
873 @param nodenames: the list of node names to check
875 @param vg: the volume group to check
876 @type requested: C{int}
877 @param requested: the amount of disk in MiB to check for
878 @raise errors.OpPrereqError: if the node doesn't have enough disk,
879 or we cannot check the node
882 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
883 # FIXME: This maps everything to storage type 'lvm-vg' to maintain
884 # the current functionality. Refactor to make it more flexible.
885 nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
887 for node in nodenames:
888 info = nodeinfo[node]
889 info.Raise("Cannot get current information from node %s" % node,
890 prereq=True, ecode=errors.ECODE_ENVIRON)
891 (_, (vg_info, ), _) = info.payload
892 vg_free = vg_info.get("vg_free", None)
893 if not isinstance(vg_free, int):
894 raise errors.OpPrereqError("Can't compute free disk space on node"
895 " %s for vg %s, result was '%s'" %
896 (node, vg, vg_free), errors.ECODE_ENVIRON)
897 if requested > vg_free:
898 raise errors.OpPrereqError("Not enough disk space on target node %s"
899 " vg %s: required %d MiB, available %d MiB" %
900 (node, vg, requested, vg_free),
904 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
905 """Checks if nodes have enough free disk space in all the VGs.
907 This function checks if all given nodes have the needed amount of
908 free disk. In case any node has less disk or we cannot get the
909 information from the node, this function raises an OpPrereqError
912 @type lu: C{LogicalUnit}
913 @param lu: a logical unit from which we get configuration data
914 @type nodenames: C{list}
915 @param nodenames: the list of node names to check
916 @type req_sizes: C{dict}
917 @param req_sizes: the hash of vg and corresponding amount of disk in
919 @raise errors.OpPrereqError: if the node doesn't have enough disk,
920 or we cannot check the node
923 for vg, req_size in req_sizes.items():
924 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
927 def _DiskSizeInBytesToMebibytes(lu, size):
928 """Converts a disk size in bytes to mebibytes.
930 Warns and rounds up if the size isn't an even multiple of 1 MiB.
933 (mib, remainder) = divmod(size, 1024 * 1024)
936 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
937 " to not overwrite existing data (%s bytes will not be"
938 " wiped)", (1024 * 1024) - remainder)
944 def _CalcEta(time_taken, written, total_size):
945 """Calculates the ETA based on size written and total size.
947 @param time_taken: The time taken so far
948 @param written: amount written so far
949 @param total_size: The total size of data to be written
950 @return: The remaining time in seconds
953 avg_time = time_taken / float(written)
954 return (total_size - written) * avg_time
957 def WipeDisks(lu, instance, disks=None):
958 """Wipes instance disks.
960 @type lu: L{LogicalUnit}
961 @param lu: the logical unit on whose behalf we execute
962 @type instance: L{objects.Instance}
963 @param instance: the instance whose disks we should create
964 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
965 @param disks: Disk details; tuple contains disk index, disk object and the
969 node = instance.primary_node
972 disks = [(idx, disk, 0)
973 for (idx, disk) in enumerate(instance.disks)]
975 for (_, device, _) in disks:
976 lu.cfg.SetDiskID(device, node)
978 logging.info("Pausing synchronization of disks of instance '%s'",
980 result = lu.rpc.call_blockdev_pause_resume_sync(node,
981 (map(compat.snd, disks),
984 result.Raise("Failed to pause disk synchronization on node '%s'" % node)
986 for idx, success in enumerate(result.payload):
988 logging.warn("Pausing synchronization of disk %s of instance '%s'"
989 " failed", idx, instance.name)
992 for (idx, device, offset) in disks:
993 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
994 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
996 int(min(constants.MAX_WIPE_CHUNK,
997 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1001 start_time = time.time()
1006 info_text = (" (from %s to %s)" %
1007 (utils.FormatUnit(offset, "h"),
1008 utils.FormatUnit(size, "h")))
1010 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1012 logging.info("Wiping disk %d for instance %s on node %s using"
1013 " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1015 while offset < size:
1016 wipe_size = min(wipe_chunk_size, size - offset)
1018 logging.debug("Wiping disk %d, offset %s, chunk %s",
1019 idx, offset, wipe_size)
1021 result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1023 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1024 (idx, offset, wipe_size))
1028 if now - last_output >= 60:
1029 eta = _CalcEta(now - start_time, offset, size)
1030 lu.LogInfo(" - done: %.1f%% ETA: %s",
1031 offset / float(size) * 100, utils.FormatSeconds(eta))
1034 logging.info("Resuming synchronization of disks for instance '%s'",
1037 result = lu.rpc.call_blockdev_pause_resume_sync(node,
1038 (map(compat.snd, disks),
1043 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1044 node, result.fail_msg)
1046 for idx, success in enumerate(result.payload):
1048 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1049 " failed", idx, instance.name)
1052 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1053 """Wrapper for L{WipeDisks} that handles errors.
1055 @type lu: L{LogicalUnit}
1056 @param lu: the logical unit on whose behalf we execute
1057 @type instance: L{objects.Instance}
1058 @param instance: the instance whose disks we should wipe
1059 @param disks: see L{WipeDisks}
1060 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1062 @raise errors.OpPrereqError: in case of failure
1066 WipeDisks(lu, instance, disks=disks)
1067 except errors.OpExecError:
1068 logging.warning("Wiping disks for instance '%s' failed",
1070 _UndoCreateDisks(lu, cleanup)
1074 def ExpandCheckDisks(instance, disks):
1075 """Return the instance disks selected by the disks list
1077 @type disks: list of L{objects.Disk} or None
1078 @param disks: selected disks
1079 @rtype: list of L{objects.Disk}
1080 @return: selected instance disks to act on
1084 return instance.disks
1086 if not set(disks).issubset(instance.disks):
1087 raise errors.ProgrammerError("Can only act on disks belonging to the"
1088 " target instance: expected a subset of %r,"
1089 " got %r" % (instance.disks, disks))
1093 def WaitForSync(lu, instance, disks=None, oneshot=False):
1094 """Sleep and poll for an instance's disk to sync.
1097 if not instance.disks or disks is not None and not disks:
1100 disks = ExpandCheckDisks(instance, disks)
1103 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1105 node = instance.primary_node
1108 lu.cfg.SetDiskID(dev, node)
1110 # TODO: Convert to utils.Retry
1113 degr_retries = 10 # in seconds, as we sleep 1 second each time
1117 cumul_degraded = False
1118 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1119 msg = rstats.fail_msg
1121 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1124 raise errors.RemoteError("Can't contact node %s for mirror data,"
1125 " aborting." % node)
1128 rstats = rstats.payload
1130 for i, mstat in enumerate(rstats):
1132 lu.LogWarning("Can't compute data for node %s/%s",
1133 node, disks[i].iv_name)
1136 cumul_degraded = (cumul_degraded or
1137 (mstat.is_degraded and mstat.sync_percent is None))
1138 if mstat.sync_percent is not None:
1140 if mstat.estimated_time is not None:
1141 rem_time = ("%s remaining (estimated)" %
1142 utils.FormatSeconds(mstat.estimated_time))
1143 max_time = mstat.estimated_time
1145 rem_time = "no time estimate"
1146 lu.LogInfo("- device %s: %5.2f%% done, %s",
1147 disks[i].iv_name, mstat.sync_percent, rem_time)
1149 # if we're done but degraded, let's do a few small retries, to
1150 # make sure we see a stable and not transient situation; therefore
1151 # we force restart of the loop
1152 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1153 logging.info("Degraded disks found, %d retries left", degr_retries)
1161 time.sleep(min(60, max_time))
1164 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1166 return not cumul_degraded
1169 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1170 """Shutdown block devices of an instance.
1172 This does the shutdown on all nodes of the instance.
1174 If the ignore_primary is false, errors on the primary node are
1179 disks = ExpandCheckDisks(instance, disks)
1182 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1183 lu.cfg.SetDiskID(top_disk, node)
1184 result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1185 msg = result.fail_msg
1187 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1188 disk.iv_name, node, msg)
1189 if ((node == instance.primary_node and not ignore_primary) or
1190 (node != instance.primary_node and not result.offline)):
1195 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1196 """Shutdown block devices of an instance.
1198 This function checks if an instance is running, before calling
1199 _ShutdownInstanceDisks.
1202 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1203 ShutdownInstanceDisks(lu, instance, disks=disks)
1206 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1208 """Prepare the block devices for an instance.
1210 This sets up the block devices on all nodes.
1212 @type lu: L{LogicalUnit}
1213 @param lu: the logical unit on whose behalf we execute
1214 @type instance: L{objects.Instance}
1215 @param instance: the instance for whose disks we assemble
1216 @type disks: list of L{objects.Disk} or None
1217 @param disks: which disks to assemble (or all, if None)
1218 @type ignore_secondaries: boolean
1219 @param ignore_secondaries: if true, errors on secondary nodes
1220 won't result in an error return from the function
1221 @type ignore_size: boolean
1222 @param ignore_size: if true, the current known size of the disk
1223 will not be used during the disk activation, useful for cases
1224 when the size is wrong
1225 @return: False if the operation failed, otherwise a list of
1226 (host, instance_visible_name, node_visible_name)
1227 with the mapping from node devices to instance devices
1232 iname = instance.name
1233 disks = ExpandCheckDisks(instance, disks)
1235 # With the two passes mechanism we try to reduce the window of
1236 # opportunity for the race condition of switching DRBD to primary
1237 # before handshaking occured, but we do not eliminate it
1239 # The proper fix would be to wait (with some limits) until the
1240 # connection has been made and drbd transitions from WFConnection
1241 # into any other network-connected state (Connected, SyncTarget,
1244 # 1st pass, assemble on all nodes in secondary mode
1245 for idx, inst_disk in enumerate(disks):
1246 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1248 node_disk = node_disk.Copy()
1249 node_disk.UnsetSize()
1250 lu.cfg.SetDiskID(node_disk, node)
1251 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1253 msg = result.fail_msg
1255 is_offline_secondary = (node in instance.secondary_nodes and
1257 lu.LogWarning("Could not prepare block device %s on node %s"
1258 " (is_primary=False, pass=1): %s",
1259 inst_disk.iv_name, node, msg)
1260 if not (ignore_secondaries or is_offline_secondary):
1263 # FIXME: race condition on drbd migration to primary
1265 # 2nd pass, do only the primary node
1266 for idx, inst_disk in enumerate(disks):
1269 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1270 if node != instance.primary_node:
1273 node_disk = node_disk.Copy()
1274 node_disk.UnsetSize()
1275 lu.cfg.SetDiskID(node_disk, node)
1276 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1278 msg = result.fail_msg
1280 lu.LogWarning("Could not prepare block device %s on node %s"
1281 " (is_primary=True, pass=2): %s",
1282 inst_disk.iv_name, node, msg)
1285 dev_path = result.payload
1287 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1289 # leave the disks configured for the primary node
1290 # this is a workaround that would be fixed better by
1291 # improving the logical/physical id handling
1293 lu.cfg.SetDiskID(disk, instance.primary_node)
1295 return disks_ok, device_info
1298 def StartInstanceDisks(lu, instance, force):
1299 """Start the disks of an instance.
1302 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1303 ignore_secondaries=force)
1305 ShutdownInstanceDisks(lu, instance)
1306 if force is not None and not force:
1308 hint=("If the message above refers to a secondary node,"
1309 " you can retry the operation using '--force'"))
1310 raise errors.OpExecError("Disk consistency error")
1313 class LUInstanceGrowDisk(LogicalUnit):
1314 """Grow a disk of an instance.
1318 HTYPE = constants.HTYPE_INSTANCE
1321 def ExpandNames(self):
1322 self._ExpandAndLockInstance()
1323 self.needed_locks[locking.LEVEL_NODE] = []
1324 self.needed_locks[locking.LEVEL_NODE_RES] = []
1325 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1326 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1328 def DeclareLocks(self, level):
1329 if level == locking.LEVEL_NODE:
1330 self._LockInstancesNodes()
1331 elif level == locking.LEVEL_NODE_RES:
1333 self.needed_locks[locking.LEVEL_NODE_RES] = \
1334 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1336 def BuildHooksEnv(self):
1339 This runs on the master, the primary and all the secondaries.
1343 "DISK": self.op.disk,
1344 "AMOUNT": self.op.amount,
1345 "ABSOLUTE": self.op.absolute,
1347 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1350 def BuildHooksNodes(self):
1351 """Build hooks nodes.
1354 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1357 def CheckPrereq(self):
1358 """Check prerequisites.
1360 This checks that the instance is in the cluster.
1363 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1364 assert instance is not None, \
1365 "Cannot retrieve locked instance %s" % self.op.instance_name
1366 nodenames = list(instance.all_nodes)
1367 for node in nodenames:
1368 CheckNodeOnline(self, node)
1370 self.instance = instance
1372 if instance.disk_template not in constants.DTS_GROWABLE:
1373 raise errors.OpPrereqError("Instance's disk layout does not support"
1374 " growing", errors.ECODE_INVAL)
1376 self.disk = instance.FindDisk(self.op.disk)
1378 if self.op.absolute:
1379 self.target = self.op.amount
1380 self.delta = self.target - self.disk.size
1382 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1383 "current disk size (%s)" %
1384 (utils.FormatUnit(self.target, "h"),
1385 utils.FormatUnit(self.disk.size, "h")),
1388 self.delta = self.op.amount
1389 self.target = self.disk.size + self.delta
1391 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1392 utils.FormatUnit(self.delta, "h"),
1395 self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1397 def _CheckDiskSpace(self, nodenames, req_vgspace):
1398 template = self.instance.disk_template
1399 if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1400 # TODO: check the free disk space for file, when that feature will be
1402 nodes = map(self.cfg.GetNodeInfo, nodenames)
1403 es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1406 # With exclusive storage we need to something smarter than just looking
1407 # at free space; for now, let's simply abort the operation.
1408 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1409 " is enabled", errors.ECODE_STATE)
1410 CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1412 def Exec(self, feedback_fn):
1413 """Execute disk grow.
1416 instance = self.instance
1419 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1420 assert (self.owned_locks(locking.LEVEL_NODE) ==
1421 self.owned_locks(locking.LEVEL_NODE_RES))
1423 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1425 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1427 raise errors.OpExecError("Cannot activate block device to grow")
1429 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1430 (self.op.disk, instance.name,
1431 utils.FormatUnit(self.delta, "h"),
1432 utils.FormatUnit(self.target, "h")))
1434 # First run all grow ops in dry-run mode
1435 for node in instance.all_nodes:
1436 self.cfg.SetDiskID(disk, node)
1437 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1439 result.Raise("Dry-run grow request failed to node %s" % node)
1442 # Get disk size from primary node for wiping
1443 result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1444 result.Raise("Failed to retrieve disk size from node '%s'" %
1445 instance.primary_node)
1447 (disk_size_in_bytes, ) = result.payload
1449 if disk_size_in_bytes is None:
1450 raise errors.OpExecError("Failed to retrieve disk size from primary"
1451 " node '%s'" % instance.primary_node)
1453 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1455 assert old_disk_size >= disk.size, \
1456 ("Retrieved disk size too small (got %s, should be at least %s)" %
1457 (old_disk_size, disk.size))
1459 old_disk_size = None
1461 # We know that (as far as we can test) operations across different
1462 # nodes will succeed, time to run it for real on the backing storage
1463 for node in instance.all_nodes:
1464 self.cfg.SetDiskID(disk, node)
1465 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1467 result.Raise("Grow request failed to node %s" % node)
1469 # And now execute it for logical storage, on the primary node
1470 node = instance.primary_node
1471 self.cfg.SetDiskID(disk, node)
1472 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1474 result.Raise("Grow request failed to node %s" % node)
1476 disk.RecordGrow(self.delta)
1477 self.cfg.Update(instance, feedback_fn)
1479 # Changes have been recorded, release node lock
1480 ReleaseLocks(self, locking.LEVEL_NODE)
1482 # Downgrade lock while waiting for sync
1483 self.glm.downgrade(locking.LEVEL_INSTANCE)
1485 assert wipe_disks ^ (old_disk_size is None)
1488 assert instance.disks[self.op.disk] == disk
1490 # Wipe newly added disk space
1491 WipeDisks(self, instance,
1492 disks=[(self.op.disk, disk, old_disk_size)])
1494 if self.op.wait_for_sync:
1495 disk_abort = not WaitForSync(self, instance, disks=[disk])
1497 self.LogWarning("Disk syncing has not returned a good status; check"
1499 if instance.admin_state != constants.ADMINST_UP:
1500 _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1501 elif instance.admin_state != constants.ADMINST_UP:
1502 self.LogWarning("Not shutting down the disk even if the instance is"
1503 " not supposed to be running because no wait for"
1504 " sync mode was requested")
1506 assert self.owned_locks(locking.LEVEL_NODE_RES)
1507 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1510 class LUInstanceReplaceDisks(LogicalUnit):
1511 """Replace the disks of an instance.
1514 HPATH = "mirrors-replace"
1515 HTYPE = constants.HTYPE_INSTANCE
1518 def CheckArguments(self):
1522 remote_node = self.op.remote_node
1523 ialloc = self.op.iallocator
1524 if self.op.mode == constants.REPLACE_DISK_CHG:
1525 if remote_node is None and ialloc is None:
1526 raise errors.OpPrereqError("When changing the secondary either an"
1527 " iallocator script must be used or the"
1528 " new node given", errors.ECODE_INVAL)
1530 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1532 elif remote_node is not None or ialloc is not None:
1533 # Not replacing the secondary
1534 raise errors.OpPrereqError("The iallocator and new node options can"
1535 " only be used when changing the"
1536 " secondary node", errors.ECODE_INVAL)
1538 def ExpandNames(self):
1539 self._ExpandAndLockInstance()
1541 assert locking.LEVEL_NODE not in self.needed_locks
1542 assert locking.LEVEL_NODE_RES not in self.needed_locks
1543 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1545 assert self.op.iallocator is None or self.op.remote_node is None, \
1546 "Conflicting options"
1548 if self.op.remote_node is not None:
1549 self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1551 # Warning: do not remove the locking of the new secondary here
1552 # unless DRBD8Dev.AddChildren is changed to work in parallel;
1553 # currently it doesn't since parallel invocations of
1554 # FindUnusedMinor will conflict
1555 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1556 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1558 self.needed_locks[locking.LEVEL_NODE] = []
1559 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1561 if self.op.iallocator is not None:
1562 # iallocator will select a new node in the same group
1563 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1564 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1566 self.needed_locks[locking.LEVEL_NODE_RES] = []
1568 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1569 self.op.iallocator, self.op.remote_node,
1570 self.op.disks, self.op.early_release,
1571 self.op.ignore_ipolicy)
1573 self.tasklets = [self.replacer]
1575 def DeclareLocks(self, level):
1576 if level == locking.LEVEL_NODEGROUP:
1577 assert self.op.remote_node is None
1578 assert self.op.iallocator is not None
1579 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1581 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1582 # Lock all groups used by instance optimistically; this requires going
1583 # via the node before it's locked, requiring verification later on
1584 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1585 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1587 elif level == locking.LEVEL_NODE:
1588 if self.op.iallocator is not None:
1589 assert self.op.remote_node is None
1590 assert not self.needed_locks[locking.LEVEL_NODE]
1591 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1593 # Lock member nodes of all locked groups
1594 self.needed_locks[locking.LEVEL_NODE] = \
1596 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1597 for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1599 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1601 self._LockInstancesNodes()
1603 elif level == locking.LEVEL_NODE_RES:
1605 self.needed_locks[locking.LEVEL_NODE_RES] = \
1606 self.needed_locks[locking.LEVEL_NODE]
1608 def BuildHooksEnv(self):
1611 This runs on the master, the primary and all the secondaries.
1614 instance = self.replacer.instance
1616 "MODE": self.op.mode,
1617 "NEW_SECONDARY": self.op.remote_node,
1618 "OLD_SECONDARY": instance.secondary_nodes[0],
1620 env.update(BuildInstanceHookEnvByObject(self, instance))
1623 def BuildHooksNodes(self):
1624 """Build hooks nodes.
1627 instance = self.replacer.instance
1629 self.cfg.GetMasterNode(),
1630 instance.primary_node,
1632 if self.op.remote_node is not None:
1633 nl.append(self.op.remote_node)
1636 def CheckPrereq(self):
1637 """Check prerequisites.
1640 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1641 self.op.iallocator is None)
1643 # Verify if node group locks are still correct
1644 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1646 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1648 return LogicalUnit.CheckPrereq(self)
1651 class LUInstanceActivateDisks(NoHooksLU):
1652 """Bring up an instance's disks.
1657 def ExpandNames(self):
1658 self._ExpandAndLockInstance()
1659 self.needed_locks[locking.LEVEL_NODE] = []
1660 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1662 def DeclareLocks(self, level):
1663 if level == locking.LEVEL_NODE:
1664 self._LockInstancesNodes()
1666 def CheckPrereq(self):
1667 """Check prerequisites.
1669 This checks that the instance is in the cluster.
1672 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1673 assert self.instance is not None, \
1674 "Cannot retrieve locked instance %s" % self.op.instance_name
1675 CheckNodeOnline(self, self.instance.primary_node)
1677 def Exec(self, feedback_fn):
1678 """Activate the disks.
1681 disks_ok, disks_info = \
1682 AssembleInstanceDisks(self, self.instance,
1683 ignore_size=self.op.ignore_size)
1685 raise errors.OpExecError("Cannot activate block devices")
1687 if self.op.wait_for_sync:
1688 if not WaitForSync(self, self.instance):
1689 raise errors.OpExecError("Some disks of the instance are degraded!")
1694 class LUInstanceDeactivateDisks(NoHooksLU):
1695 """Shutdown an instance's disks.
1700 def ExpandNames(self):
1701 self._ExpandAndLockInstance()
1702 self.needed_locks[locking.LEVEL_NODE] = []
1703 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1705 def DeclareLocks(self, level):
1706 if level == locking.LEVEL_NODE:
1707 self._LockInstancesNodes()
1709 def CheckPrereq(self):
1710 """Check prerequisites.
1712 This checks that the instance is in the cluster.
1715 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1716 assert self.instance is not None, \
1717 "Cannot retrieve locked instance %s" % self.op.instance_name
1719 def Exec(self, feedback_fn):
1720 """Deactivate the disks
1723 instance = self.instance
1725 ShutdownInstanceDisks(self, instance)
1727 _SafeShutdownInstanceDisks(self, instance)
1730 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1732 """Check that mirrors are not degraded.
1734 @attention: The device has to be annotated already.
1736 The ldisk parameter, if True, will change the test from the
1737 is_degraded attribute (which represents overall non-ok status for
1738 the device(s)) to the ldisk (representing the local storage status).
1741 lu.cfg.SetDiskID(dev, node)
1745 if on_primary or dev.AssembleOnSecondary():
1746 rstats = lu.rpc.call_blockdev_find(node, dev)
1747 msg = rstats.fail_msg
1749 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1751 elif not rstats.payload:
1752 lu.LogWarning("Can't find disk on node %s", node)
1756 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1758 result = result and not rstats.payload.is_degraded
1761 for child in dev.children:
1762 result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1768 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1769 """Wrapper around L{_CheckDiskConsistencyInner}.
1772 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1773 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1777 def _BlockdevFind(lu, node, dev, instance):
1778 """Wrapper around call_blockdev_find to annotate diskparams.
1780 @param lu: A reference to the lu object
1781 @param node: The node to call out
1782 @param dev: The device to find
1783 @param instance: The instance object the device belongs to
1784 @returns The result of the rpc call
1787 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1788 return lu.rpc.call_blockdev_find(node, disk)
1791 def _GenerateUniqueNames(lu, exts):
1792 """Generate a suitable LV name.
1794 This will generate a logical volume name for the given instance.
1799 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1800 results.append("%s%s" % (new_id, val))
1804 class TLReplaceDisks(Tasklet):
1805 """Replaces disks for an instance.
1807 Note: Locking is not within the scope of this class.
1810 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1811 disks, early_release, ignore_ipolicy):
1812 """Initializes this class.
1815 Tasklet.__init__(self, lu)
1818 self.instance_name = instance_name
1820 self.iallocator_name = iallocator_name
1821 self.remote_node = remote_node
1823 self.early_release = early_release
1824 self.ignore_ipolicy = ignore_ipolicy
1827 self.instance = None
1828 self.new_node = None
1829 self.target_node = None
1830 self.other_node = None
1831 self.remote_node_info = None
1832 self.node_secondary_ip = None
1835 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1836 """Compute a new secondary node using an IAllocator.
1839 req = iallocator.IAReqRelocate(name=instance_name,
1840 relocate_from=list(relocate_from))
1841 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1843 ial.Run(iallocator_name)
1846 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1847 " %s" % (iallocator_name, ial.info),
1850 remote_node_name = ial.result[0]
1852 lu.LogInfo("Selected new secondary for instance '%s': %s",
1853 instance_name, remote_node_name)
1855 return remote_node_name
1857 def _FindFaultyDisks(self, node_name):
1858 """Wrapper for L{FindFaultyInstanceDisks}.
1861 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1864 def _CheckDisksActivated(self, instance):
1865 """Checks if the instance disks are activated.
1867 @param instance: The instance to check disks
1868 @return: True if they are activated, False otherwise
1871 nodes = instance.all_nodes
1873 for idx, dev in enumerate(instance.disks):
1875 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1876 self.cfg.SetDiskID(dev, node)
1878 result = _BlockdevFind(self, node, dev, instance)
1882 elif result.fail_msg or not result.payload:
1887 def CheckPrereq(self):
1888 """Check prerequisites.
1890 This checks that the instance is in the cluster.
1893 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1894 assert instance is not None, \
1895 "Cannot retrieve locked instance %s" % self.instance_name
1897 if instance.disk_template != constants.DT_DRBD8:
1898 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1899 " instances", errors.ECODE_INVAL)
1901 if len(instance.secondary_nodes) != 1:
1902 raise errors.OpPrereqError("The instance has a strange layout,"
1903 " expected one secondary but found %d" %
1904 len(instance.secondary_nodes),
1907 instance = self.instance
1908 secondary_node = instance.secondary_nodes[0]
1910 if self.iallocator_name is None:
1911 remote_node = self.remote_node
1913 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1914 instance.name, instance.secondary_nodes)
1916 if remote_node is None:
1917 self.remote_node_info = None
1919 assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1920 "Remote node '%s' is not locked" % remote_node
1922 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1923 assert self.remote_node_info is not None, \
1924 "Cannot retrieve locked node %s" % remote_node
1926 if remote_node == self.instance.primary_node:
1927 raise errors.OpPrereqError("The specified node is the primary node of"
1928 " the instance", errors.ECODE_INVAL)
1930 if remote_node == secondary_node:
1931 raise errors.OpPrereqError("The specified node is already the"
1932 " secondary node of the instance",
1935 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1936 constants.REPLACE_DISK_CHG):
1937 raise errors.OpPrereqError("Cannot specify disks to be replaced",
1940 if self.mode == constants.REPLACE_DISK_AUTO:
1941 if not self._CheckDisksActivated(instance):
1942 raise errors.OpPrereqError("Please run activate-disks on instance %s"
1943 " first" % self.instance_name,
1945 faulty_primary = self._FindFaultyDisks(instance.primary_node)
1946 faulty_secondary = self._FindFaultyDisks(secondary_node)
1948 if faulty_primary and faulty_secondary:
1949 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1950 " one node and can not be repaired"
1951 " automatically" % self.instance_name,
1955 self.disks = faulty_primary
1956 self.target_node = instance.primary_node
1957 self.other_node = secondary_node
1958 check_nodes = [self.target_node, self.other_node]
1959 elif faulty_secondary:
1960 self.disks = faulty_secondary
1961 self.target_node = secondary_node
1962 self.other_node = instance.primary_node
1963 check_nodes = [self.target_node, self.other_node]
1969 # Non-automatic modes
1970 if self.mode == constants.REPLACE_DISK_PRI:
1971 self.target_node = instance.primary_node
1972 self.other_node = secondary_node
1973 check_nodes = [self.target_node, self.other_node]
1975 elif self.mode == constants.REPLACE_DISK_SEC:
1976 self.target_node = secondary_node
1977 self.other_node = instance.primary_node
1978 check_nodes = [self.target_node, self.other_node]
1980 elif self.mode == constants.REPLACE_DISK_CHG:
1981 self.new_node = remote_node
1982 self.other_node = instance.primary_node
1983 self.target_node = secondary_node
1984 check_nodes = [self.new_node, self.other_node]
1986 CheckNodeNotDrained(self.lu, remote_node)
1987 CheckNodeVmCapable(self.lu, remote_node)
1989 old_node_info = self.cfg.GetNodeInfo(secondary_node)
1990 assert old_node_info is not None
1991 if old_node_info.offline and not self.early_release:
1992 # doesn't make sense to delay the release
1993 self.early_release = True
1994 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1995 " early-release mode", secondary_node)
1998 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2001 # If not specified all disks should be replaced
2003 self.disks = range(len(self.instance.disks))
2005 # TODO: This is ugly, but right now we can't distinguish between internal
2006 # submitted opcode and external one. We should fix that.
2007 if self.remote_node_info:
2008 # We change the node, lets verify it still meets instance policy
2009 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2010 cluster = self.cfg.GetClusterInfo()
2011 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2013 CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2014 self.cfg, ignore=self.ignore_ipolicy)
2016 for node in check_nodes:
2017 CheckNodeOnline(self.lu, node)
2019 touched_nodes = frozenset(node_name for node_name in [self.new_node,
2022 if node_name is not None)
2024 # Release unneeded node and node resource locks
2025 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2026 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2027 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2029 # Release any owned node group
2030 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2032 # Check whether disks are valid
2033 for disk_idx in self.disks:
2034 instance.FindDisk(disk_idx)
2036 # Get secondary node IP addresses
2037 self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2038 in self.cfg.GetMultiNodeInfo(touched_nodes))
2040 def Exec(self, feedback_fn):
2041 """Execute disk replacement.
2043 This dispatches the disk replacement to the appropriate handler.
2047 # Verify owned locks before starting operation
2048 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2049 assert set(owned_nodes) == set(self.node_secondary_ip), \
2050 ("Incorrect node locks, owning %s, expected %s" %
2051 (owned_nodes, self.node_secondary_ip.keys()))
2052 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2053 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2054 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2056 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2057 assert list(owned_instances) == [self.instance_name], \
2058 "Instance '%s' not locked" % self.instance_name
2060 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2061 "Should not own any node group lock at this point"
2064 feedback_fn("No disks need replacement for instance '%s'" %
2068 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2069 (utils.CommaJoin(self.disks), self.instance.name))
2070 feedback_fn("Current primary node: %s" % self.instance.primary_node)
2071 feedback_fn("Current seconary node: %s" %
2072 utils.CommaJoin(self.instance.secondary_nodes))
2074 activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2076 # Activate the instance disks if we're replacing them on a down instance
2078 StartInstanceDisks(self.lu, self.instance, True)
2081 # Should we replace the secondary node?
2082 if self.new_node is not None:
2083 fn = self._ExecDrbd8Secondary
2085 fn = self._ExecDrbd8DiskOnly
2087 result = fn(feedback_fn)
2089 # Deactivate the instance disks if we're replacing them on a
2092 _SafeShutdownInstanceDisks(self.lu, self.instance)
2094 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2097 # Verify owned locks
2098 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2099 nodes = frozenset(self.node_secondary_ip)
2100 assert ((self.early_release and not owned_nodes) or
2101 (not self.early_release and not (set(owned_nodes) - nodes))), \
2102 ("Not owning the correct locks, early_release=%s, owned=%r,"
2103 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2107 def _CheckVolumeGroup(self, nodes):
2108 self.lu.LogInfo("Checking volume groups")
2110 vgname = self.cfg.GetVGName()
2112 # Make sure volume group exists on all involved nodes
2113 results = self.rpc.call_vg_list(nodes)
2115 raise errors.OpExecError("Can't list volume groups on the nodes")
2119 res.Raise("Error checking node %s" % node)
2120 if vgname not in res.payload:
2121 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2124 def _CheckDisksExistence(self, nodes):
2125 # Check disk existence
2126 for idx, dev in enumerate(self.instance.disks):
2127 if idx not in self.disks:
2131 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2132 self.cfg.SetDiskID(dev, node)
2134 result = _BlockdevFind(self, node, dev, self.instance)
2136 msg = result.fail_msg
2137 if msg or not result.payload:
2139 msg = "disk not found"
2140 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2143 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2144 for idx, dev in enumerate(self.instance.disks):
2145 if idx not in self.disks:
2148 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2151 if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2152 on_primary, ldisk=ldisk):
2153 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2154 " replace disks for instance %s" %
2155 (node_name, self.instance.name))
2157 def _CreateNewStorage(self, node_name):
2158 """Create new storage on the primary or secondary node.
2160 This is only used for same-node replaces, not for changing the
2161 secondary node, hence we don't want to modify the existing disk.
2166 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2167 for idx, dev in enumerate(disks):
2168 if idx not in self.disks:
2171 self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2173 self.cfg.SetDiskID(dev, node_name)
2175 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2176 names = _GenerateUniqueNames(self.lu, lv_names)
2178 (data_disk, meta_disk) = dev.children
2179 vg_data = data_disk.logical_id[0]
2180 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2181 logical_id=(vg_data, names[0]),
2182 params=data_disk.params)
2183 vg_meta = meta_disk.logical_id[0]
2184 lv_meta = objects.Disk(dev_type=constants.LD_LV,
2185 size=constants.DRBD_META_SIZE,
2186 logical_id=(vg_meta, names[1]),
2187 params=meta_disk.params)
2189 new_lvs = [lv_data, lv_meta]
2190 old_lvs = [child.Copy() for child in dev.children]
2191 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2192 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2194 # we pass force_create=True to force the LVM creation
2195 for new_lv in new_lvs:
2196 _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2197 GetInstanceInfoText(self.instance), False,
2202 def _CheckDevices(self, node_name, iv_names):
2203 for name, (dev, _, _) in iv_names.iteritems():
2204 self.cfg.SetDiskID(dev, node_name)
2206 result = _BlockdevFind(self, node_name, dev, self.instance)
2208 msg = result.fail_msg
2209 if msg or not result.payload:
2211 msg = "disk not found"
2212 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2215 if result.payload.is_degraded:
2216 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2218 def _RemoveOldStorage(self, node_name, iv_names):
2219 for name, (_, old_lvs, _) in iv_names.iteritems():
2220 self.lu.LogInfo("Remove logical volumes for %s", name)
2223 self.cfg.SetDiskID(lv, node_name)
2225 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2227 self.lu.LogWarning("Can't remove old LV: %s", msg,
2228 hint="remove unused LVs manually")
2230 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2231 """Replace a disk on the primary or secondary for DRBD 8.
2233 The algorithm for replace is quite complicated:
2235 1. for each disk to be replaced:
2237 1. create new LVs on the target node with unique names
2238 1. detach old LVs from the drbd device
2239 1. rename old LVs to name_replaced.<time_t>
2240 1. rename new LVs to old LVs
2241 1. attach the new LVs (with the old names now) to the drbd device
2243 1. wait for sync across all devices
2245 1. for each modified disk:
2247 1. remove old LVs (which have the name name_replaces.<time_t>)
2249 Failures are not very well handled.
2254 # Step: check device activation
2255 self.lu.LogStep(1, steps_total, "Check device existence")
2256 self._CheckDisksExistence([self.other_node, self.target_node])
2257 self._CheckVolumeGroup([self.target_node, self.other_node])
2259 # Step: check other node consistency
2260 self.lu.LogStep(2, steps_total, "Check peer consistency")
2261 self._CheckDisksConsistency(self.other_node,
2262 self.other_node == self.instance.primary_node,
2265 # Step: create new storage
2266 self.lu.LogStep(3, steps_total, "Allocate new storage")
2267 iv_names = self._CreateNewStorage(self.target_node)
2269 # Step: for each lv, detach+rename*2+attach
2270 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2271 for dev, old_lvs, new_lvs in iv_names.itervalues():
2272 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2274 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2276 result.Raise("Can't detach drbd from local storage on node"
2277 " %s for device %s" % (self.target_node, dev.iv_name))
2279 #cfg.Update(instance)
2281 # ok, we created the new LVs, so now we know we have the needed
2282 # storage; as such, we proceed on the target node to rename
2283 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2284 # using the assumption that logical_id == physical_id (which in
2285 # turn is the unique_id on that node)
2287 # FIXME(iustin): use a better name for the replaced LVs
2288 temp_suffix = int(time.time())
2289 ren_fn = lambda d, suff: (d.physical_id[0],
2290 d.physical_id[1] + "_replaced-%s" % suff)
2292 # Build the rename list based on what LVs exist on the node
2293 rename_old_to_new = []
2294 for to_ren in old_lvs:
2295 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2296 if not result.fail_msg and result.payload:
2298 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2300 self.lu.LogInfo("Renaming the old LVs on the target node")
2301 result = self.rpc.call_blockdev_rename(self.target_node,
2303 result.Raise("Can't rename old LVs on node %s" % self.target_node)
2305 # Now we rename the new LVs to the old LVs
2306 self.lu.LogInfo("Renaming the new LVs on the target node")
2307 rename_new_to_old = [(new, old.physical_id)
2308 for old, new in zip(old_lvs, new_lvs)]
2309 result = self.rpc.call_blockdev_rename(self.target_node,
2311 result.Raise("Can't rename new LVs on node %s" % self.target_node)
2313 # Intermediate steps of in memory modifications
2314 for old, new in zip(old_lvs, new_lvs):
2315 new.logical_id = old.logical_id
2316 self.cfg.SetDiskID(new, self.target_node)
2318 # We need to modify old_lvs so that removal later removes the
2319 # right LVs, not the newly added ones; note that old_lvs is a
2321 for disk in old_lvs:
2322 disk.logical_id = ren_fn(disk, temp_suffix)
2323 self.cfg.SetDiskID(disk, self.target_node)
2325 # Now that the new lvs have the old name, we can add them to the device
2326 self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2327 result = self.rpc.call_blockdev_addchildren(self.target_node,
2328 (dev, self.instance), new_lvs)
2329 msg = result.fail_msg
2331 for new_lv in new_lvs:
2332 msg2 = self.rpc.call_blockdev_remove(self.target_node,
2335 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2336 hint=("cleanup manually the unused logical"
2338 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2340 cstep = itertools.count(5)
2342 if self.early_release:
2343 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2344 self._RemoveOldStorage(self.target_node, iv_names)
2345 # TODO: Check if releasing locks early still makes sense
2346 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2348 # Release all resource locks except those used by the instance
2349 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2350 keep=self.node_secondary_ip.keys())
2352 # Release all node locks while waiting for sync
2353 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2355 # TODO: Can the instance lock be downgraded here? Take the optional disk
2356 # shutdown in the caller into consideration.
2359 # This can fail as the old devices are degraded and _WaitForSync
2360 # does a combined result over all disks, so we don't check its return value
2361 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2362 WaitForSync(self.lu, self.instance)
2364 # Check all devices manually
2365 self._CheckDevices(self.instance.primary_node, iv_names)
2367 # Step: remove old storage
2368 if not self.early_release:
2369 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2370 self._RemoveOldStorage(self.target_node, iv_names)
2372 def _ExecDrbd8Secondary(self, feedback_fn):
2373 """Replace the secondary node for DRBD 8.
2375 The algorithm for replace is quite complicated:
2376 - for all disks of the instance:
2377 - create new LVs on the new node with same names
2378 - shutdown the drbd device on the old secondary
2379 - disconnect the drbd network on the primary
2380 - create the drbd device on the new secondary
2381 - network attach the drbd on the primary, using an artifice:
2382 the drbd code for Attach() will connect to the network if it
2383 finds a device which is connected to the good local disks but
2385 - wait for sync across all devices
2386 - remove all disks from the old secondary
2388 Failures are not very well handled.
2393 pnode = self.instance.primary_node
2395 # Step: check device activation
2396 self.lu.LogStep(1, steps_total, "Check device existence")
2397 self._CheckDisksExistence([self.instance.primary_node])
2398 self._CheckVolumeGroup([self.instance.primary_node])
2400 # Step: check other node consistency
2401 self.lu.LogStep(2, steps_total, "Check peer consistency")
2402 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2404 # Step: create new storage
2405 self.lu.LogStep(3, steps_total, "Allocate new storage")
2406 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2407 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2408 for idx, dev in enumerate(disks):
2409 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2410 (self.new_node, idx))
2411 # we pass force_create=True to force LVM creation
2412 for new_lv in dev.children:
2413 _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2414 True, GetInstanceInfoText(self.instance), False,
2417 # Step 4: dbrd minors and drbd setups changes
2418 # after this, we must manually remove the drbd minors on both the
2419 # error and the success paths
2420 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2421 minors = self.cfg.AllocateDRBDMinor([self.new_node
2422 for dev in self.instance.disks],
2424 logging.debug("Allocated minors %r", minors)
2427 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2428 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2429 (self.new_node, idx))
2430 # create new devices on new_node; note that we create two IDs:
2431 # one without port, so the drbd will be activated without
2432 # networking information on the new node at this stage, and one
2433 # with network, for the latter activation in step 4
2434 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2435 if self.instance.primary_node == o_node1:
2438 assert self.instance.primary_node == o_node2, "Three-node instance?"
2441 new_alone_id = (self.instance.primary_node, self.new_node, None,
2442 p_minor, new_minor, o_secret)
2443 new_net_id = (self.instance.primary_node, self.new_node, o_port,
2444 p_minor, new_minor, o_secret)
2446 iv_names[idx] = (dev, dev.children, new_net_id)
2447 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2449 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2450 logical_id=new_alone_id,
2451 children=dev.children,
2454 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2457 CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2459 GetInstanceInfoText(self.instance), False,
2461 except errors.GenericError:
2462 self.cfg.ReleaseDRBDMinors(self.instance.name)
2465 # We have new devices, shutdown the drbd on the old secondary
2466 for idx, dev in enumerate(self.instance.disks):
2467 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2468 self.cfg.SetDiskID(dev, self.target_node)
2469 msg = self.rpc.call_blockdev_shutdown(self.target_node,
2470 (dev, self.instance)).fail_msg
2472 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2473 "node: %s" % (idx, msg),
2474 hint=("Please cleanup this device manually as"
2475 " soon as possible"))
2477 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2478 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2479 self.instance.disks)[pnode]
2481 msg = result.fail_msg
2483 # detaches didn't succeed (unlikely)
2484 self.cfg.ReleaseDRBDMinors(self.instance.name)
2485 raise errors.OpExecError("Can't detach the disks from the network on"
2486 " old node: %s" % (msg,))
2488 # if we managed to detach at least one, we update all the disks of
2489 # the instance to point to the new secondary
2490 self.lu.LogInfo("Updating instance configuration")
2491 for dev, _, new_logical_id in iv_names.itervalues():
2492 dev.logical_id = new_logical_id
2493 self.cfg.SetDiskID(dev, self.instance.primary_node)
2495 self.cfg.Update(self.instance, feedback_fn)
2497 # Release all node locks (the configuration has been updated)
2498 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2500 # and now perform the drbd attach
2501 self.lu.LogInfo("Attaching primary drbds to new secondary"
2502 " (standalone => connected)")
2503 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2505 self.node_secondary_ip,
2506 (self.instance.disks, self.instance),
2509 for to_node, to_result in result.items():
2510 msg = to_result.fail_msg
2512 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2514 hint=("please do a gnt-instance info to see the"
2515 " status of disks"))
2517 cstep = itertools.count(5)
2519 if self.early_release:
2520 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2521 self._RemoveOldStorage(self.target_node, iv_names)
2522 # TODO: Check if releasing locks early still makes sense
2523 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2525 # Release all resource locks except those used by the instance
2526 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2527 keep=self.node_secondary_ip.keys())
2529 # TODO: Can the instance lock be downgraded here? Take the optional disk
2530 # shutdown in the caller into consideration.
2533 # This can fail as the old devices are degraded and _WaitForSync
2534 # does a combined result over all disks, so we don't check its return value
2535 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2536 WaitForSync(self.lu, self.instance)
2538 # Check all devices manually
2539 self._CheckDevices(self.instance.primary_node, iv_names)
2541 # Step: remove old storage
2542 if not self.early_release:
2543 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2544 self._RemoveOldStorage(self.target_node, iv_names)