+ def _ExecD8DiskOnly(self, feedback_fn):
+ """Replace a disk on the primary or secondary for dbrd8.
+
+ The algorithm for replace is quite complicated:
+ - for each disk to be replaced:
+ - create new LVs on the target node with unique names
+ - detach old LVs from the drbd device
+ - rename old LVs to name_replaced.<time_t>
+ - rename new LVs to old LVs
+ - attach the new LVs (with the old names now) to the drbd device
+ - wait for sync across all devices
+ - for each modified disk:
+ - remove old LVs (which have the name name_replaces.<time_t>)
+
+ Failures are not very well handled.
+
+ """
+ steps_total = 6
+ warning, info = (self.proc.LogWarning, self.proc.LogInfo)
+ instance = self.instance
+ iv_names = {}
+ vgname = self.cfg.GetVGName()
+ # start of work
+ cfg = self.cfg
+ tgt_node = self.tgt_node
+ oth_node = self.oth_node
+
+ # Step: check device activation
+ self.proc.LogStep(1, steps_total, "check device existence")
+ info("checking volume groups")
+ my_vg = cfg.GetVGName()
+ results = rpc.call_vg_list([oth_node, tgt_node])
+ if not results:
+ raise errors.OpExecError("Can't list volume groups on the nodes")
+ for node in oth_node, tgt_node:
+ res = results.get(node, False)
+ if not res or my_vg not in res:
+ raise errors.OpExecError("Volume group '%s' not found on %s" %
+ (my_vg, node))
+ for dev in instance.disks:
+ if not dev.iv_name in self.op.disks:
+ continue
+ for node in tgt_node, oth_node:
+ info("checking %s on %s" % (dev.iv_name, node))
+ cfg.SetDiskID(dev, node)
+ if not rpc.call_blockdev_find(node, dev):
+ raise errors.OpExecError("Can't find device %s on node %s" %
+ (dev.iv_name, node))
+
+ # Step: check other node consistency
+ self.proc.LogStep(2, steps_total, "check peer consistency")
+ for dev in instance.disks:
+ if not dev.iv_name in self.op.disks:
+ continue
+ info("checking %s consistency on %s" % (dev.iv_name, oth_node))
+ if not _CheckDiskConsistency(self.cfg, dev, oth_node,
+ oth_node==instance.primary_node):
+ raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
+ " to replace disks on this node (%s)" %
+ (oth_node, tgt_node))
+
+ # Step: create new storage
+ self.proc.LogStep(3, steps_total, "allocate new storage")
+ for dev in instance.disks:
+ if not dev.iv_name in self.op.disks:
+ continue
+ size = dev.size
+ cfg.SetDiskID(dev, tgt_node)
+ lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
+ names = _GenerateUniqueNames(cfg, lv_names)
+ lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
+ logical_id=(vgname, names[0]))
+ lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
+ logical_id=(vgname, names[1]))
+ new_lvs = [lv_data, lv_meta]
+ old_lvs = dev.children
+ iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
+ info("creating new local storage on %s for %s" %
+ (tgt_node, dev.iv_name))
+ # since we *always* want to create this LV, we use the
+ # _Create...OnPrimary (which forces the creation), even if we
+ # are talking about the secondary node
+ for new_lv in new_lvs:
+ if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
+ _GetInstanceInfoText(instance)):
+ raise errors.OpExecError("Failed to create new LV named '%s' on"
+ " node '%s'" %
+ (new_lv.logical_id[1], tgt_node))
+
+ # Step: for each lv, detach+rename*2+attach
+ self.proc.LogStep(4, steps_total, "change drbd configuration")
+ for dev, old_lvs, new_lvs in iv_names.itervalues():
+ info("detaching %s drbd from local storage" % dev.iv_name)
+ if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
+ raise errors.OpExecError("Can't detach drbd from local storage on node"
+ " %s for device %s" % (tgt_node, dev.iv_name))
+ #dev.children = []
+ #cfg.Update(instance)
+
+ # ok, we created the new LVs, so now we know we have the needed
+ # storage; as such, we proceed on the target node to rename
+ # old_lv to _old, and new_lv to old_lv; note that we rename LVs
+ # using the assumption than logical_id == physical_id (which in
+ # turn is the unique_id on that node)
+
+ # FIXME(iustin): use a better name for the replaced LVs
+ temp_suffix = int(time.time())
+ ren_fn = lambda d, suff: (d.physical_id[0],
+ d.physical_id[1] + "_replaced-%s" % suff)
+ # build the rename list based on what LVs exist on the node
+ rlist = []
+ for to_ren in old_lvs:
+ find_res = rpc.call_blockdev_find(tgt_node, to_ren)
+ if find_res is not None: # device exists
+ rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
+
+ info("renaming the old LVs on the target node")
+ if not rpc.call_blockdev_rename(tgt_node, rlist):
+ raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
+ # now we rename the new LVs to the old LVs
+ info("renaming the new LVs on the target node")
+ rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
+ if not rpc.call_blockdev_rename(tgt_node, rlist):
+ raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
+
+ for old, new in zip(old_lvs, new_lvs):
+ new.logical_id = old.logical_id
+ cfg.SetDiskID(new, tgt_node)
+
+ for disk in old_lvs:
+ disk.logical_id = ren_fn(disk, temp_suffix)
+ cfg.SetDiskID(disk, tgt_node)
+
+ # now that the new lvs have the old name, we can add them to the device
+ info("adding new mirror component on %s" % tgt_node)
+ if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
+ for new_lv in new_lvs:
+ if not rpc.call_blockdev_remove(tgt_node, new_lv):
+ warning("Can't rollback device %s", hint="manually cleanup unused"
+ " logical volumes")
+ raise errors.OpExecError("Can't add local storage to drbd")
+
+ dev.children = new_lvs
+ cfg.Update(instance)
+
+ # Step: wait for sync
+
+ # this can fail as the old devices are degraded and _WaitForSync
+ # does a combined result over all disks, so we don't check its
+ # return value
+ self.proc.LogStep(5, steps_total, "sync devices")
+ _WaitForSync(cfg, instance, self.proc, unlock=True)
+
+ # so check manually all the devices
+ for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
+ cfg.SetDiskID(dev, instance.primary_node)
+ is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
+ if is_degr:
+ raise errors.OpExecError("DRBD device %s is degraded!" % name)
+
+ # Step: remove old storage
+ self.proc.LogStep(6, steps_total, "removing old storage")
+ for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
+ info("remove logical volumes for %s" % name)
+ for lv in old_lvs:
+ cfg.SetDiskID(lv, tgt_node)
+ if not rpc.call_blockdev_remove(tgt_node, lv):
+ warning("Can't remove old LV", hint="manually remove unused LVs")
+ continue
+
+ def _ExecD8Secondary(self, feedback_fn):
+ """Replace the secondary node for drbd8.
+
+ The algorithm for replace is quite complicated:
+ - for all disks of the instance:
+ - create new LVs on the new node with same names
+ - shutdown the drbd device on the old secondary
+ - disconnect the drbd network on the primary
+ - create the drbd device on the new secondary
+ - network attach the drbd on the primary, using an artifice:
+ the drbd code for Attach() will connect to the network if it
+ finds a device which is connected to the good local disks but
+ not network enabled
+ - wait for sync across all devices
+ - remove all disks from the old secondary
+
+ Failures are not very well handled.
+
+ """
+ steps_total = 6
+ warning, info = (self.proc.LogWarning, self.proc.LogInfo)
+ instance = self.instance
+ iv_names = {}
+ vgname = self.cfg.GetVGName()
+ # start of work
+ cfg = self.cfg
+ old_node = self.tgt_node
+ new_node = self.new_node
+ pri_node = instance.primary_node
+
+ # Step: check device activation
+ self.proc.LogStep(1, steps_total, "check device existence")
+ info("checking volume groups")
+ my_vg = cfg.GetVGName()
+ results = rpc.call_vg_list([pri_node, new_node])
+ if not results:
+ raise errors.OpExecError("Can't list volume groups on the nodes")
+ for node in pri_node, new_node:
+ res = results.get(node, False)
+ if not res or my_vg not in res:
+ raise errors.OpExecError("Volume group '%s' not found on %s" %
+ (my_vg, node))
+ for dev in instance.disks:
+ if not dev.iv_name in self.op.disks:
+ continue
+ info("checking %s on %s" % (dev.iv_name, pri_node))
+ cfg.SetDiskID(dev, pri_node)
+ if not rpc.call_blockdev_find(pri_node, dev):
+ raise errors.OpExecError("Can't find device %s on node %s" %
+ (dev.iv_name, pri_node))
+
+ # Step: check other node consistency
+ self.proc.LogStep(2, steps_total, "check peer consistency")
+ for dev in instance.disks:
+ if not dev.iv_name in self.op.disks:
+ continue
+ info("checking %s consistency on %s" % (dev.iv_name, pri_node))
+ if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
+ raise errors.OpExecError("Primary node (%s) has degraded storage,"
+ " unsafe to replace the secondary" %
+ pri_node)
+
+ # Step: create new storage
+ self.proc.LogStep(3, steps_total, "allocate new storage")
+ for dev in instance.disks:
+ size = dev.size
+ info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
+ # since we *always* want to create this LV, we use the
+ # _Create...OnPrimary (which forces the creation), even if we
+ # are talking about the secondary node
+ for new_lv in dev.children:
+ if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
+ _GetInstanceInfoText(instance)):
+ raise errors.OpExecError("Failed to create new LV named '%s' on"
+ " node '%s'" %
+ (new_lv.logical_id[1], new_node))
+
+ iv_names[dev.iv_name] = (dev, dev.children)
+
+ self.proc.LogStep(4, steps_total, "changing drbd configuration")
+ for dev in instance.disks:
+ size = dev.size
+ info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
+ # create new devices on new_node
+ new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
+ logical_id=(pri_node, new_node,
+ dev.logical_id[2]),
+ children=dev.children)
+ if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
+ new_drbd, False,
+ _GetInstanceInfoText(instance)):
+ raise errors.OpExecError("Failed to create new DRBD on"
+ " node '%s'" % new_node)
+
+ for dev in instance.disks:
+ # we have new devices, shutdown the drbd on the old secondary
+ info("shutting down drbd for %s on old node" % dev.iv_name)
+ cfg.SetDiskID(dev, old_node)
+ if not rpc.call_blockdev_shutdown(old_node, dev):
+ warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
+ hint="Please cleanup this device manually as soon as possible")
+
+ info("detaching primary drbds from the network (=> standalone)")
+ done = 0
+ for dev in instance.disks:
+ cfg.SetDiskID(dev, pri_node)
+ # set the physical (unique in bdev terms) id to None, meaning
+ # detach from network
+ dev.physical_id = (None,) * len(dev.physical_id)
+ # and 'find' the device, which will 'fix' it to match the
+ # standalone state
+ if rpc.call_blockdev_find(pri_node, dev):
+ done += 1
+ else:
+ warning("Failed to detach drbd %s from network, unusual case" %
+ dev.iv_name)
+
+ if not done:
+ # no detaches succeeded (very unlikely)
+ raise errors.OpExecError("Can't detach at least one DRBD from old node")
+
+ # if we managed to detach at least one, we update all the disks of
+ # the instance to point to the new secondary
+ info("updating instance configuration")
+ for dev in instance.disks:
+ dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
+ cfg.SetDiskID(dev, pri_node)
+ cfg.Update(instance)
+
+ # and now perform the drbd attach
+ info("attaching primary drbds to new secondary (standalone => connected)")
+ failures = []
+ for dev in instance.disks:
+ info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
+ # since the attach is smart, it's enough to 'find' the device,
+ # it will automatically activate the network, if the physical_id
+ # is correct
+ cfg.SetDiskID(dev, pri_node)
+ if not rpc.call_blockdev_find(pri_node, dev):
+ warning("can't attach drbd %s to new secondary!" % dev.iv_name,
+ "please do a gnt-instance info to see the status of disks")
+
+ # this can fail as the old devices are degraded and _WaitForSync
+ # does a combined result over all disks, so we don't check its
+ # return value
+ self.proc.LogStep(5, steps_total, "sync devices")
+ _WaitForSync(cfg, instance, self.proc, unlock=True)
+
+ # so check manually all the devices
+ for name, (dev, old_lvs) in iv_names.iteritems():
+ cfg.SetDiskID(dev, pri_node)
+ is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
+ if is_degr:
+ raise errors.OpExecError("DRBD device %s is degraded!" % name)
+
+ self.proc.LogStep(6, steps_total, "removing old storage")
+ for name, (dev, old_lvs) in iv_names.iteritems():
+ info("remove logical volumes for %s" % name)
+ for lv in old_lvs:
+ cfg.SetDiskID(lv, old_node)
+ if not rpc.call_blockdev_remove(old_node, lv):
+ warning("Can't remove LV on old secondary",
+ hint="Cleanup stale volumes by hand")
+
+ def Exec(self, feedback_fn):
+ """Execute disk replacement.
+
+ This dispatches the disk replacement to the appropriate handler.
+
+ """
+ instance = self.instance
+ if instance.disk_template == constants.DT_REMOTE_RAID1:
+ fn = self._ExecRR1
+ elif instance.disk_template == constants.DT_DRBD8:
+ if self.op.remote_node is None:
+ fn = self._ExecD8DiskOnly
+ else:
+ fn = self._ExecD8Secondary
+ else:
+ raise errors.ProgrammerError("Unhandled disk replacement case")
+ return fn(feedback_fn)
+