cmdlib: Cleanup public/private functions
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node)
93   result = lu.rpc.call_blockdev_create(node, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device, node, instance.name))
98   if device.physical_id is None:
99     device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103                          info, force_open, excl_stor):
104   """Create a tree of block devices on a given node.
105
106   If this device type has to be created on secondaries, create it and
107   all its children.
108
109   If not, just recurse to children keeping the same 'force' value.
110
111   @attention: The device has to be annotated already.
112
113   @param lu: the lu on whose behalf we execute
114   @param node: the node on which to create the device
115   @type instance: L{objects.Instance}
116   @param instance: the instance which owns the device
117   @type device: L{objects.Disk}
118   @param device: the device to create
119   @type force_create: boolean
120   @param force_create: whether to force creation of this device; this
121       will be change to True whenever we find a device which has
122       CreateOnSecondary() attribute
123   @param info: the extra 'metadata' we should attach to the device
124       (this will be represented as a LVM tag)
125   @type force_open: boolean
126   @param force_open: this parameter will be passes to the
127       L{backend.BlockdevCreate} function where it specifies
128       whether we run on primary or not, and it affects both
129       the child assembly and the device own Open() execution
130   @type excl_stor: boolean
131   @param excl_stor: Whether exclusive_storage is active for the node
132
133   @return: list of created devices
134   """
135   created_devices = []
136   try:
137     if device.CreateOnSecondary():
138       force_create = True
139
140     if device.children:
141       for child in device.children:
142         devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143                                     info, force_open, excl_stor)
144         created_devices.extend(devs)
145
146     if not force_create:
147       return created_devices
148
149     CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150                          excl_stor)
151     # The device has been completely created, so there is no point in keeping
152     # its subdevices in the list. We just add the device itself instead.
153     created_devices = [(node, device)]
154     return created_devices
155
156   except errors.DeviceCreationError, e:
157     e.created_devices.extend(created_devices)
158     raise e
159   except errors.OpExecError, e:
160     raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164   """Whether exclusive_storage is in effect for the given node.
165
166   @type cfg: L{config.ConfigWriter}
167   @param cfg: The cluster configuration
168   @type nodename: string
169   @param nodename: The node
170   @rtype: bool
171   @return: The effective value of exclusive_storage
172   @raise errors.OpPrereqError: if no node exists with the given name
173
174   """
175   ni = cfg.GetNodeInfo(nodename)
176   if ni is None:
177     raise errors.OpPrereqError("Invalid node name %s" % nodename,
178                                errors.ECODE_NOENT)
179   return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def CreateBlockDev(lu, node, instance, device, force_create, info,
183                     force_open):
184   """Wrapper around L{_CreateBlockDevInner}.
185
186   This method annotates the root device first.
187
188   """
189   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190   excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192                               force_open, excl_stor)
193
194
195 def CreateDisks(lu, instance, to_skip=None, target_node=None):
196   """Create all disks for an instance.
197
198   This abstracts away some work from AddInstance.
199
200   @type lu: L{LogicalUnit}
201   @param lu: the logical unit on whose behalf we execute
202   @type instance: L{objects.Instance}
203   @param instance: the instance whose disks we should create
204   @type to_skip: list
205   @param to_skip: list of indices to skip
206   @type target_node: string
207   @param target_node: if passed, overrides the target node for creation
208   @rtype: boolean
209   @return: the success of the creation
210
211   """
212   info = GetInstanceInfoText(instance)
213   if target_node is None:
214     pnode = instance.primary_node
215     all_nodes = instance.all_nodes
216   else:
217     pnode = target_node
218     all_nodes = [pnode]
219
220   if instance.disk_template in constants.DTS_FILEBASED:
221     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
222     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
223
224     result.Raise("Failed to create directory '%s' on"
225                  " node %s" % (file_storage_dir, pnode))
226
227   disks_created = []
228   # Note: this needs to be kept in sync with adding of disks in
229   # LUInstanceSetParams
230   for idx, device in enumerate(instance.disks):
231     if to_skip and idx in to_skip:
232       continue
233     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
234     #HARDCODE
235     for node in all_nodes:
236       f_create = node == pnode
237       try:
238         CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
239         disks_created.append((node, device))
240       except errors.OpExecError:
241         logging.warning("Creating disk %s for instance '%s' failed",
242                         idx, instance.name)
243       except errors.DeviceCreationError, e:
244         logging.warning("Creating disk %s for instance '%s' failed",
245                         idx, instance.name)
246         disks_created.extend(e.created_devices)
247         for (node, disk) in disks_created:
248           lu.cfg.SetDiskID(disk, node)
249           result = lu.rpc.call_blockdev_remove(node, disk)
250           if result.fail_msg:
251             logging.warning("Failed to remove newly-created disk %s on node %s:"
252                             " %s", device, node, result.fail_msg)
253         raise errors.OpExecError(e.message)
254
255
256 def ComputeDiskSizePerVG(disk_template, disks):
257   """Compute disk size requirements in the volume group
258
259   """
260   def _compute(disks, payload):
261     """Universal algorithm.
262
263     """
264     vgs = {}
265     for disk in disks:
266       vgs[disk[constants.IDISK_VG]] = \
267         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
268
269     return vgs
270
271   # Required free disk space as a function of disk and swap space
272   req_size_dict = {
273     constants.DT_DISKLESS: {},
274     constants.DT_PLAIN: _compute(disks, 0),
275     # 128 MB are added for drbd metadata for each disk
276     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
277     constants.DT_FILE: {},
278     constants.DT_SHARED_FILE: {},
279     }
280
281   if disk_template not in req_size_dict:
282     raise errors.ProgrammerError("Disk template '%s' size requirement"
283                                  " is unknown" % disk_template)
284
285   return req_size_dict[disk_template]
286
287
288 def ComputeDisks(op, default_vg):
289   """Computes the instance disks.
290
291   @param op: The instance opcode
292   @param default_vg: The default_vg to assume
293
294   @return: The computed disks
295
296   """
297   disks = []
298   for disk in op.disks:
299     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
300     if mode not in constants.DISK_ACCESS_SET:
301       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
302                                  mode, errors.ECODE_INVAL)
303     size = disk.get(constants.IDISK_SIZE, None)
304     if size is None:
305       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
306     try:
307       size = int(size)
308     except (TypeError, ValueError):
309       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
310                                  errors.ECODE_INVAL)
311
312     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
313     if ext_provider and op.disk_template != constants.DT_EXT:
314       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
315                                  " disk template, not %s" %
316                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
317                                   op.disk_template), errors.ECODE_INVAL)
318
319     data_vg = disk.get(constants.IDISK_VG, default_vg)
320     name = disk.get(constants.IDISK_NAME, None)
321     if name is not None and name.lower() == constants.VALUE_NONE:
322       name = None
323     new_disk = {
324       constants.IDISK_SIZE: size,
325       constants.IDISK_MODE: mode,
326       constants.IDISK_VG: data_vg,
327       constants.IDISK_NAME: name,
328       }
329
330     if constants.IDISK_METAVG in disk:
331       new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
332     if constants.IDISK_ADOPT in disk:
333       new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
334
335     # For extstorage, demand the `provider' option and add any
336     # additional parameters (ext-params) to the dict
337     if op.disk_template == constants.DT_EXT:
338       if ext_provider:
339         new_disk[constants.IDISK_PROVIDER] = ext_provider
340         for key in disk:
341           if key not in constants.IDISK_PARAMS:
342             new_disk[key] = disk[key]
343       else:
344         raise errors.OpPrereqError("Missing provider for template '%s'" %
345                                    constants.DT_EXT, errors.ECODE_INVAL)
346
347     disks.append(new_disk)
348
349   return disks
350
351
352 def CheckRADOSFreeSpace():
353   """Compute disk size requirements inside the RADOS cluster.
354
355   """
356   # For the RADOS cluster we assume there is always enough space.
357   pass
358
359
360 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
361                          iv_name, p_minor, s_minor):
362   """Generate a drbd8 device complete with its children.
363
364   """
365   assert len(vgnames) == len(names) == 2
366   port = lu.cfg.AllocatePort()
367   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
368
369   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
370                           logical_id=(vgnames[0], names[0]),
371                           params={})
372   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
373   dev_meta = objects.Disk(dev_type=constants.LD_LV,
374                           size=constants.DRBD_META_SIZE,
375                           logical_id=(vgnames[1], names[1]),
376                           params={})
377   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
378   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
379                           logical_id=(primary, secondary, port,
380                                       p_minor, s_minor,
381                                       shared_secret),
382                           children=[dev_data, dev_meta],
383                           iv_name=iv_name, params={})
384   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
385   return drbd_dev
386
387
388 def GenerateDiskTemplate(
389   lu, template_name, instance_name, primary_node, secondary_nodes,
390   disk_info, file_storage_dir, file_driver, base_index,
391   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
392   _req_shr_file_storage=opcodes.RequireSharedFileStorage):
393   """Generate the entire disk layout for a given template type.
394
395   """
396   vgname = lu.cfg.GetVGName()
397   disk_count = len(disk_info)
398   disks = []
399
400   if template_name == constants.DT_DISKLESS:
401     pass
402   elif template_name == constants.DT_DRBD8:
403     if len(secondary_nodes) != 1:
404       raise errors.ProgrammerError("Wrong template configuration")
405     remote_node = secondary_nodes[0]
406     minors = lu.cfg.AllocateDRBDMinor(
407       [primary_node, remote_node] * len(disk_info), instance_name)
408
409     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
410                                                        full_disk_params)
411     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
412
413     names = []
414     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
415                                                for i in range(disk_count)]):
416       names.append(lv_prefix + "_data")
417       names.append(lv_prefix + "_meta")
418     for idx, disk in enumerate(disk_info):
419       disk_index = idx + base_index
420       data_vg = disk.get(constants.IDISK_VG, vgname)
421       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
422       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
423                                       disk[constants.IDISK_SIZE],
424                                       [data_vg, meta_vg],
425                                       names[idx * 2:idx * 2 + 2],
426                                       "disk/%d" % disk_index,
427                                       minors[idx * 2], minors[idx * 2 + 1])
428       disk_dev.mode = disk[constants.IDISK_MODE]
429       disk_dev.name = disk.get(constants.IDISK_NAME, None)
430       disks.append(disk_dev)
431   else:
432     if secondary_nodes:
433       raise errors.ProgrammerError("Wrong template configuration")
434
435     if template_name == constants.DT_FILE:
436       _req_file_storage()
437     elif template_name == constants.DT_SHARED_FILE:
438       _req_shr_file_storage()
439
440     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
441     if name_prefix is None:
442       names = None
443     else:
444       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
445                                         (name_prefix, base_index + i)
446                                         for i in range(disk_count)])
447
448     if template_name == constants.DT_PLAIN:
449
450       def logical_id_fn(idx, _, disk):
451         vg = disk.get(constants.IDISK_VG, vgname)
452         return (vg, names[idx])
453
454     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
455       logical_id_fn = \
456         lambda _, disk_index, disk: (file_driver,
457                                      "%s/disk%d" % (file_storage_dir,
458                                                     disk_index))
459     elif template_name == constants.DT_BLOCK:
460       logical_id_fn = \
461         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
462                                        disk[constants.IDISK_ADOPT])
463     elif template_name == constants.DT_RBD:
464       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
465     elif template_name == constants.DT_EXT:
466       def logical_id_fn(idx, _, disk):
467         provider = disk.get(constants.IDISK_PROVIDER, None)
468         if provider is None:
469           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
470                                        " not found", constants.DT_EXT,
471                                        constants.IDISK_PROVIDER)
472         return (provider, names[idx])
473     else:
474       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
475
476     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
477
478     for idx, disk in enumerate(disk_info):
479       params = {}
480       # Only for the Ext template add disk_info to params
481       if template_name == constants.DT_EXT:
482         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
483         for key in disk:
484           if key not in constants.IDISK_PARAMS:
485             params[key] = disk[key]
486       disk_index = idx + base_index
487       size = disk[constants.IDISK_SIZE]
488       feedback_fn("* disk %s, size %s" %
489                   (disk_index, utils.FormatUnit(size, "h")))
490       disk_dev = objects.Disk(dev_type=dev_type, size=size,
491                               logical_id=logical_id_fn(idx, disk_index, disk),
492                               iv_name="disk/%d" % disk_index,
493                               mode=disk[constants.IDISK_MODE],
494                               params=params)
495       disk_dev.name = disk.get(constants.IDISK_NAME, None)
496       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
497       disks.append(disk_dev)
498
499   return disks
500
501
502 class LUInstanceRecreateDisks(LogicalUnit):
503   """Recreate an instance's missing disks.
504
505   """
506   HPATH = "instance-recreate-disks"
507   HTYPE = constants.HTYPE_INSTANCE
508   REQ_BGL = False
509
510   _MODIFYABLE = compat.UniqueFrozenset([
511     constants.IDISK_SIZE,
512     constants.IDISK_MODE,
513     ])
514
515   # New or changed disk parameters may have different semantics
516   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
517     constants.IDISK_ADOPT,
518
519     # TODO: Implement support changing VG while recreating
520     constants.IDISK_VG,
521     constants.IDISK_METAVG,
522     constants.IDISK_PROVIDER,
523     constants.IDISK_NAME,
524     ]))
525
526   def _RunAllocator(self):
527     """Run the allocator based on input opcode.
528
529     """
530     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
531
532     # FIXME
533     # The allocator should actually run in "relocate" mode, but current
534     # allocators don't support relocating all the nodes of an instance at
535     # the same time. As a workaround we use "allocate" mode, but this is
536     # suboptimal for two reasons:
537     # - The instance name passed to the allocator is present in the list of
538     #   existing instances, so there could be a conflict within the
539     #   internal structures of the allocator. This doesn't happen with the
540     #   current allocators, but it's a liability.
541     # - The allocator counts the resources used by the instance twice: once
542     #   because the instance exists already, and once because it tries to
543     #   allocate a new instance.
544     # The allocator could choose some of the nodes on which the instance is
545     # running, but that's not a problem. If the instance nodes are broken,
546     # they should be already be marked as drained or offline, and hence
547     # skipped by the allocator. If instance disks have been lost for other
548     # reasons, then recreating the disks on the same nodes should be fine.
549     disk_template = self.instance.disk_template
550     spindle_use = be_full[constants.BE_SPINDLE_USE]
551     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
552                                         disk_template=disk_template,
553                                         tags=list(self.instance.GetTags()),
554                                         os=self.instance.os,
555                                         nics=[{}],
556                                         vcpus=be_full[constants.BE_VCPUS],
557                                         memory=be_full[constants.BE_MAXMEM],
558                                         spindle_use=spindle_use,
559                                         disks=[{constants.IDISK_SIZE: d.size,
560                                                 constants.IDISK_MODE: d.mode}
561                                                for d in self.instance.disks],
562                                         hypervisor=self.instance.hypervisor,
563                                         node_whitelist=None)
564     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
565
566     ial.Run(self.op.iallocator)
567
568     assert req.RequiredNodes() == len(self.instance.all_nodes)
569
570     if not ial.success:
571       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
572                                  " %s" % (self.op.iallocator, ial.info),
573                                  errors.ECODE_NORES)
574
575     self.op.nodes = ial.result
576     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
577                  self.op.instance_name, self.op.iallocator,
578                  utils.CommaJoin(ial.result))
579
580   def CheckArguments(self):
581     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
582       # Normalize and convert deprecated list of disk indices
583       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
584
585     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
586     if duplicates:
587       raise errors.OpPrereqError("Some disks have been specified more than"
588                                  " once: %s" % utils.CommaJoin(duplicates),
589                                  errors.ECODE_INVAL)
590
591     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
592     # when neither iallocator nor nodes are specified
593     if self.op.iallocator or self.op.nodes:
594       CheckIAllocatorOrNode(self, "iallocator", "nodes")
595
596     for (idx, params) in self.op.disks:
597       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
598       unsupported = frozenset(params.keys()) - self._MODIFYABLE
599       if unsupported:
600         raise errors.OpPrereqError("Parameters for disk %s try to change"
601                                    " unmodifyable parameter(s): %s" %
602                                    (idx, utils.CommaJoin(unsupported)),
603                                    errors.ECODE_INVAL)
604
605   def ExpandNames(self):
606     self._ExpandAndLockInstance()
607     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
608
609     if self.op.nodes:
610       self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
611       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
612     else:
613       self.needed_locks[locking.LEVEL_NODE] = []
614       if self.op.iallocator:
615         # iallocator will select a new node in the same group
616         self.needed_locks[locking.LEVEL_NODEGROUP] = []
617         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
618
619     self.needed_locks[locking.LEVEL_NODE_RES] = []
620
621   def DeclareLocks(self, level):
622     if level == locking.LEVEL_NODEGROUP:
623       assert self.op.iallocator is not None
624       assert not self.op.nodes
625       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
626       self.share_locks[locking.LEVEL_NODEGROUP] = 1
627       # Lock the primary group used by the instance optimistically; this
628       # requires going via the node before it's locked, requiring
629       # verification later on
630       self.needed_locks[locking.LEVEL_NODEGROUP] = \
631         self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
632
633     elif level == locking.LEVEL_NODE:
634       # If an allocator is used, then we lock all the nodes in the current
635       # instance group, as we don't know yet which ones will be selected;
636       # if we replace the nodes without using an allocator, locks are
637       # already declared in ExpandNames; otherwise, we need to lock all the
638       # instance nodes for disk re-creation
639       if self.op.iallocator:
640         assert not self.op.nodes
641         assert not self.needed_locks[locking.LEVEL_NODE]
642         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
643
644         # Lock member nodes of the group of the primary node
645         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
646           self.needed_locks[locking.LEVEL_NODE].extend(
647             self.cfg.GetNodeGroup(group_uuid).members)
648
649         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
650       elif not self.op.nodes:
651         self._LockInstancesNodes(primary_only=False)
652     elif level == locking.LEVEL_NODE_RES:
653       # Copy node locks
654       self.needed_locks[locking.LEVEL_NODE_RES] = \
655         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
656
657   def BuildHooksEnv(self):
658     """Build hooks env.
659
660     This runs on master, primary and secondary nodes of the instance.
661
662     """
663     return BuildInstanceHookEnvByObject(self, self.instance)
664
665   def BuildHooksNodes(self):
666     """Build hooks nodes.
667
668     """
669     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
670     return (nl, nl)
671
672   def CheckPrereq(self):
673     """Check prerequisites.
674
675     This checks that the instance is in the cluster and is not running.
676
677     """
678     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
679     assert instance is not None, \
680       "Cannot retrieve locked instance %s" % self.op.instance_name
681     if self.op.nodes:
682       if len(self.op.nodes) != len(instance.all_nodes):
683         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
684                                    " %d replacement nodes were specified" %
685                                    (instance.name, len(instance.all_nodes),
686                                     len(self.op.nodes)),
687                                    errors.ECODE_INVAL)
688       assert instance.disk_template != constants.DT_DRBD8 or \
689              len(self.op.nodes) == 2
690       assert instance.disk_template != constants.DT_PLAIN or \
691              len(self.op.nodes) == 1
692       primary_node = self.op.nodes[0]
693     else:
694       primary_node = instance.primary_node
695     if not self.op.iallocator:
696       CheckNodeOnline(self, primary_node)
697
698     if instance.disk_template == constants.DT_DISKLESS:
699       raise errors.OpPrereqError("Instance '%s' has no disks" %
700                                  self.op.instance_name, errors.ECODE_INVAL)
701
702     # Verify if node group locks are still correct
703     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
704     if owned_groups:
705       # Node group locks are acquired only for the primary node (and only
706       # when the allocator is used)
707       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
708                               primary_only=True)
709
710     # if we replace nodes *and* the old primary is offline, we don't
711     # check the instance state
712     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
713     if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
714       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
715                          msg="cannot recreate disks")
716
717     if self.op.disks:
718       self.disks = dict(self.op.disks)
719     else:
720       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
721
722     maxidx = max(self.disks.keys())
723     if maxidx >= len(instance.disks):
724       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
725                                  errors.ECODE_INVAL)
726
727     if ((self.op.nodes or self.op.iallocator) and
728          sorted(self.disks.keys()) != range(len(instance.disks))):
729       raise errors.OpPrereqError("Can't recreate disks partially and"
730                                  " change the nodes at the same time",
731                                  errors.ECODE_INVAL)
732
733     self.instance = instance
734
735     if self.op.iallocator:
736       self._RunAllocator()
737       # Release unneeded node and node resource locks
738       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
739       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
740       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
741
742     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
743
744   def Exec(self, feedback_fn):
745     """Recreate the disks.
746
747     """
748     instance = self.instance
749
750     assert (self.owned_locks(locking.LEVEL_NODE) ==
751             self.owned_locks(locking.LEVEL_NODE_RES))
752
753     to_skip = []
754     mods = [] # keeps track of needed changes
755
756     for idx, disk in enumerate(instance.disks):
757       try:
758         changes = self.disks[idx]
759       except KeyError:
760         # Disk should not be recreated
761         to_skip.append(idx)
762         continue
763
764       # update secondaries for disks, if needed
765       if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
766         # need to update the nodes and minors
767         assert len(self.op.nodes) == 2
768         assert len(disk.logical_id) == 6 # otherwise disk internals
769                                          # have changed
770         (_, _, old_port, _, _, old_secret) = disk.logical_id
771         new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
772         new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
773                   new_minors[0], new_minors[1], old_secret)
774         assert len(disk.logical_id) == len(new_id)
775       else:
776         new_id = None
777
778       mods.append((idx, new_id, changes))
779
780     # now that we have passed all asserts above, we can apply the mods
781     # in a single run (to avoid partial changes)
782     for idx, new_id, changes in mods:
783       disk = instance.disks[idx]
784       if new_id is not None:
785         assert disk.dev_type == constants.LD_DRBD8
786         disk.logical_id = new_id
787       if changes:
788         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
789                     mode=changes.get(constants.IDISK_MODE, None))
790
791     # change primary node, if needed
792     if self.op.nodes:
793       instance.primary_node = self.op.nodes[0]
794       self.LogWarning("Changing the instance's nodes, you will have to"
795                       " remove any disks left on the older nodes manually")
796
797     if self.op.nodes:
798       self.cfg.Update(instance, feedback_fn)
799
800     # All touched nodes must be locked
801     mylocks = self.owned_locks(locking.LEVEL_NODE)
802     assert mylocks.issuperset(frozenset(instance.all_nodes))
803     CreateDisks(self, instance, to_skip=to_skip)
804
805
806 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
807   """Checks if nodes have enough free disk space in the specified VG.
808
809   This function checks if all given nodes have the needed amount of
810   free disk. In case any node has less disk or we cannot get the
811   information from the node, this function raises an OpPrereqError
812   exception.
813
814   @type lu: C{LogicalUnit}
815   @param lu: a logical unit from which we get configuration data
816   @type nodenames: C{list}
817   @param nodenames: the list of node names to check
818   @type vg: C{str}
819   @param vg: the volume group to check
820   @type requested: C{int}
821   @param requested: the amount of disk in MiB to check for
822   @raise errors.OpPrereqError: if the node doesn't have enough disk,
823       or we cannot check the node
824
825   """
826   es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
827   nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
828   for node in nodenames:
829     info = nodeinfo[node]
830     info.Raise("Cannot get current information from node %s" % node,
831                prereq=True, ecode=errors.ECODE_ENVIRON)
832     (_, (vg_info, ), _) = info.payload
833     vg_free = vg_info.get("vg_free", None)
834     if not isinstance(vg_free, int):
835       raise errors.OpPrereqError("Can't compute free disk space on node"
836                                  " %s for vg %s, result was '%s'" %
837                                  (node, vg, vg_free), errors.ECODE_ENVIRON)
838     if requested > vg_free:
839       raise errors.OpPrereqError("Not enough disk space on target node %s"
840                                  " vg %s: required %d MiB, available %d MiB" %
841                                  (node, vg, requested, vg_free),
842                                  errors.ECODE_NORES)
843
844
845 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
846   """Checks if nodes have enough free disk space in all the VGs.
847
848   This function checks if all given nodes have the needed amount of
849   free disk. In case any node has less disk or we cannot get the
850   information from the node, this function raises an OpPrereqError
851   exception.
852
853   @type lu: C{LogicalUnit}
854   @param lu: a logical unit from which we get configuration data
855   @type nodenames: C{list}
856   @param nodenames: the list of node names to check
857   @type req_sizes: C{dict}
858   @param req_sizes: the hash of vg and corresponding amount of disk in
859       MiB to check for
860   @raise errors.OpPrereqError: if the node doesn't have enough disk,
861       or we cannot check the node
862
863   """
864   for vg, req_size in req_sizes.items():
865     _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
866
867
868 def _DiskSizeInBytesToMebibytes(lu, size):
869   """Converts a disk size in bytes to mebibytes.
870
871   Warns and rounds up if the size isn't an even multiple of 1 MiB.
872
873   """
874   (mib, remainder) = divmod(size, 1024 * 1024)
875
876   if remainder != 0:
877     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
878                   " to not overwrite existing data (%s bytes will not be"
879                   " wiped)", (1024 * 1024) - remainder)
880     mib += 1
881
882   return mib
883
884
885 def _CalcEta(time_taken, written, total_size):
886   """Calculates the ETA based on size written and total size.
887
888   @param time_taken: The time taken so far
889   @param written: amount written so far
890   @param total_size: The total size of data to be written
891   @return: The remaining time in seconds
892
893   """
894   avg_time = time_taken / float(written)
895   return (total_size - written) * avg_time
896
897
898 def WipeDisks(lu, instance, disks=None):
899   """Wipes instance disks.
900
901   @type lu: L{LogicalUnit}
902   @param lu: the logical unit on whose behalf we execute
903   @type instance: L{objects.Instance}
904   @param instance: the instance whose disks we should create
905   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
906   @param disks: Disk details; tuple contains disk index, disk object and the
907     start offset
908
909   """
910   node = instance.primary_node
911
912   if disks is None:
913     disks = [(idx, disk, 0)
914              for (idx, disk) in enumerate(instance.disks)]
915
916   for (_, device, _) in disks:
917     lu.cfg.SetDiskID(device, node)
918
919   logging.info("Pausing synchronization of disks of instance '%s'",
920                instance.name)
921   result = lu.rpc.call_blockdev_pause_resume_sync(node,
922                                                   (map(compat.snd, disks),
923                                                    instance),
924                                                   True)
925   result.Raise("Failed to pause disk synchronization on node '%s'" % node)
926
927   for idx, success in enumerate(result.payload):
928     if not success:
929       logging.warn("Pausing synchronization of disk %s of instance '%s'"
930                    " failed", idx, instance.name)
931
932   try:
933     for (idx, device, offset) in disks:
934       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
935       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
936       wipe_chunk_size = \
937         int(min(constants.MAX_WIPE_CHUNK,
938                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
939
940       size = device.size
941       last_output = 0
942       start_time = time.time()
943
944       if offset == 0:
945         info_text = ""
946       else:
947         info_text = (" (from %s to %s)" %
948                      (utils.FormatUnit(offset, "h"),
949                       utils.FormatUnit(size, "h")))
950
951       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
952
953       logging.info("Wiping disk %d for instance %s on node %s using"
954                    " chunk size %s", idx, instance.name, node, wipe_chunk_size)
955
956       while offset < size:
957         wipe_size = min(wipe_chunk_size, size - offset)
958
959         logging.debug("Wiping disk %d, offset %s, chunk %s",
960                       idx, offset, wipe_size)
961
962         result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
963                                            wipe_size)
964         result.Raise("Could not wipe disk %d at offset %d for size %d" %
965                      (idx, offset, wipe_size))
966
967         now = time.time()
968         offset += wipe_size
969         if now - last_output >= 60:
970           eta = _CalcEta(now - start_time, offset, size)
971           lu.LogInfo(" - done: %.1f%% ETA: %s",
972                      offset / float(size) * 100, utils.FormatSeconds(eta))
973           last_output = now
974   finally:
975     logging.info("Resuming synchronization of disks for instance '%s'",
976                  instance.name)
977
978     result = lu.rpc.call_blockdev_pause_resume_sync(node,
979                                                     (map(compat.snd, disks),
980                                                      instance),
981                                                     False)
982
983     if result.fail_msg:
984       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
985                     node, result.fail_msg)
986     else:
987       for idx, success in enumerate(result.payload):
988         if not success:
989           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
990                         " failed", idx, instance.name)
991
992
993 def ExpandCheckDisks(instance, disks):
994   """Return the instance disks selected by the disks list
995
996   @type disks: list of L{objects.Disk} or None
997   @param disks: selected disks
998   @rtype: list of L{objects.Disk}
999   @return: selected instance disks to act on
1000
1001   """
1002   if disks is None:
1003     return instance.disks
1004   else:
1005     if not set(disks).issubset(instance.disks):
1006       raise errors.ProgrammerError("Can only act on disks belonging to the"
1007                                    " target instance")
1008     return disks
1009
1010
1011 def WaitForSync(lu, instance, disks=None, oneshot=False):
1012   """Sleep and poll for an instance's disk to sync.
1013
1014   """
1015   if not instance.disks or disks is not None and not disks:
1016     return True
1017
1018   disks = ExpandCheckDisks(instance, disks)
1019
1020   if not oneshot:
1021     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1022
1023   node = instance.primary_node
1024
1025   for dev in disks:
1026     lu.cfg.SetDiskID(dev, node)
1027
1028   # TODO: Convert to utils.Retry
1029
1030   retries = 0
1031   degr_retries = 10 # in seconds, as we sleep 1 second each time
1032   while True:
1033     max_time = 0
1034     done = True
1035     cumul_degraded = False
1036     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1037     msg = rstats.fail_msg
1038     if msg:
1039       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1040       retries += 1
1041       if retries >= 10:
1042         raise errors.RemoteError("Can't contact node %s for mirror data,"
1043                                  " aborting." % node)
1044       time.sleep(6)
1045       continue
1046     rstats = rstats.payload
1047     retries = 0
1048     for i, mstat in enumerate(rstats):
1049       if mstat is None:
1050         lu.LogWarning("Can't compute data for node %s/%s",
1051                       node, disks[i].iv_name)
1052         continue
1053
1054       cumul_degraded = (cumul_degraded or
1055                         (mstat.is_degraded and mstat.sync_percent is None))
1056       if mstat.sync_percent is not None:
1057         done = False
1058         if mstat.estimated_time is not None:
1059           rem_time = ("%s remaining (estimated)" %
1060                       utils.FormatSeconds(mstat.estimated_time))
1061           max_time = mstat.estimated_time
1062         else:
1063           rem_time = "no time estimate"
1064         lu.LogInfo("- device %s: %5.2f%% done, %s",
1065                    disks[i].iv_name, mstat.sync_percent, rem_time)
1066
1067     # if we're done but degraded, let's do a few small retries, to
1068     # make sure we see a stable and not transient situation; therefore
1069     # we force restart of the loop
1070     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1071       logging.info("Degraded disks found, %d retries left", degr_retries)
1072       degr_retries -= 1
1073       time.sleep(1)
1074       continue
1075
1076     if done or oneshot:
1077       break
1078
1079     time.sleep(min(60, max_time))
1080
1081   if done:
1082     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1083
1084   return not cumul_degraded
1085
1086
1087 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1088   """Shutdown block devices of an instance.
1089
1090   This does the shutdown on all nodes of the instance.
1091
1092   If the ignore_primary is false, errors on the primary node are
1093   ignored.
1094
1095   """
1096   all_result = True
1097   disks = ExpandCheckDisks(instance, disks)
1098
1099   for disk in disks:
1100     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1101       lu.cfg.SetDiskID(top_disk, node)
1102       result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1103       msg = result.fail_msg
1104       if msg:
1105         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1106                       disk.iv_name, node, msg)
1107         if ((node == instance.primary_node and not ignore_primary) or
1108             (node != instance.primary_node and not result.offline)):
1109           all_result = False
1110   return all_result
1111
1112
1113 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1114   """Shutdown block devices of an instance.
1115
1116   This function checks if an instance is running, before calling
1117   _ShutdownInstanceDisks.
1118
1119   """
1120   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1121   ShutdownInstanceDisks(lu, instance, disks=disks)
1122
1123
1124 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1125                            ignore_size=False):
1126   """Prepare the block devices for an instance.
1127
1128   This sets up the block devices on all nodes.
1129
1130   @type lu: L{LogicalUnit}
1131   @param lu: the logical unit on whose behalf we execute
1132   @type instance: L{objects.Instance}
1133   @param instance: the instance for whose disks we assemble
1134   @type disks: list of L{objects.Disk} or None
1135   @param disks: which disks to assemble (or all, if None)
1136   @type ignore_secondaries: boolean
1137   @param ignore_secondaries: if true, errors on secondary nodes
1138       won't result in an error return from the function
1139   @type ignore_size: boolean
1140   @param ignore_size: if true, the current known size of the disk
1141       will not be used during the disk activation, useful for cases
1142       when the size is wrong
1143   @return: False if the operation failed, otherwise a list of
1144       (host, instance_visible_name, node_visible_name)
1145       with the mapping from node devices to instance devices
1146
1147   """
1148   device_info = []
1149   disks_ok = True
1150   iname = instance.name
1151   disks = ExpandCheckDisks(instance, disks)
1152
1153   # With the two passes mechanism we try to reduce the window of
1154   # opportunity for the race condition of switching DRBD to primary
1155   # before handshaking occured, but we do not eliminate it
1156
1157   # The proper fix would be to wait (with some limits) until the
1158   # connection has been made and drbd transitions from WFConnection
1159   # into any other network-connected state (Connected, SyncTarget,
1160   # SyncSource, etc.)
1161
1162   # 1st pass, assemble on all nodes in secondary mode
1163   for idx, inst_disk in enumerate(disks):
1164     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1165       if ignore_size:
1166         node_disk = node_disk.Copy()
1167         node_disk.UnsetSize()
1168       lu.cfg.SetDiskID(node_disk, node)
1169       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1170                                              False, idx)
1171       msg = result.fail_msg
1172       if msg:
1173         is_offline_secondary = (node in instance.secondary_nodes and
1174                                 result.offline)
1175         lu.LogWarning("Could not prepare block device %s on node %s"
1176                       " (is_primary=False, pass=1): %s",
1177                       inst_disk.iv_name, node, msg)
1178         if not (ignore_secondaries or is_offline_secondary):
1179           disks_ok = False
1180
1181   # FIXME: race condition on drbd migration to primary
1182
1183   # 2nd pass, do only the primary node
1184   for idx, inst_disk in enumerate(disks):
1185     dev_path = None
1186
1187     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1188       if node != instance.primary_node:
1189         continue
1190       if ignore_size:
1191         node_disk = node_disk.Copy()
1192         node_disk.UnsetSize()
1193       lu.cfg.SetDiskID(node_disk, node)
1194       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1195                                              True, idx)
1196       msg = result.fail_msg
1197       if msg:
1198         lu.LogWarning("Could not prepare block device %s on node %s"
1199                       " (is_primary=True, pass=2): %s",
1200                       inst_disk.iv_name, node, msg)
1201         disks_ok = False
1202       else:
1203         dev_path = result.payload
1204
1205     device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1206
1207   # leave the disks configured for the primary node
1208   # this is a workaround that would be fixed better by
1209   # improving the logical/physical id handling
1210   for disk in disks:
1211     lu.cfg.SetDiskID(disk, instance.primary_node)
1212
1213   return disks_ok, device_info
1214
1215
1216 def StartInstanceDisks(lu, instance, force):
1217   """Start the disks of an instance.
1218
1219   """
1220   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1221                                       ignore_secondaries=force)
1222   if not disks_ok:
1223     ShutdownInstanceDisks(lu, instance)
1224     if force is not None and not force:
1225       lu.LogWarning("",
1226                     hint=("If the message above refers to a secondary node,"
1227                           " you can retry the operation using '--force'"))
1228     raise errors.OpExecError("Disk consistency error")
1229
1230
1231 class LUInstanceGrowDisk(LogicalUnit):
1232   """Grow a disk of an instance.
1233
1234   """
1235   HPATH = "disk-grow"
1236   HTYPE = constants.HTYPE_INSTANCE
1237   REQ_BGL = False
1238
1239   def ExpandNames(self):
1240     self._ExpandAndLockInstance()
1241     self.needed_locks[locking.LEVEL_NODE] = []
1242     self.needed_locks[locking.LEVEL_NODE_RES] = []
1243     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1244     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1245
1246   def DeclareLocks(self, level):
1247     if level == locking.LEVEL_NODE:
1248       self._LockInstancesNodes()
1249     elif level == locking.LEVEL_NODE_RES:
1250       # Copy node locks
1251       self.needed_locks[locking.LEVEL_NODE_RES] = \
1252         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1253
1254   def BuildHooksEnv(self):
1255     """Build hooks env.
1256
1257     This runs on the master, the primary and all the secondaries.
1258
1259     """
1260     env = {
1261       "DISK": self.op.disk,
1262       "AMOUNT": self.op.amount,
1263       "ABSOLUTE": self.op.absolute,
1264       }
1265     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1266     return env
1267
1268   def BuildHooksNodes(self):
1269     """Build hooks nodes.
1270
1271     """
1272     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1273     return (nl, nl)
1274
1275   def CheckPrereq(self):
1276     """Check prerequisites.
1277
1278     This checks that the instance is in the cluster.
1279
1280     """
1281     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1282     assert instance is not None, \
1283       "Cannot retrieve locked instance %s" % self.op.instance_name
1284     nodenames = list(instance.all_nodes)
1285     for node in nodenames:
1286       CheckNodeOnline(self, node)
1287
1288     self.instance = instance
1289
1290     if instance.disk_template not in constants.DTS_GROWABLE:
1291       raise errors.OpPrereqError("Instance's disk layout does not support"
1292                                  " growing", errors.ECODE_INVAL)
1293
1294     self.disk = instance.FindDisk(self.op.disk)
1295
1296     if self.op.absolute:
1297       self.target = self.op.amount
1298       self.delta = self.target - self.disk.size
1299       if self.delta < 0:
1300         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1301                                    "current disk size (%s)" %
1302                                    (utils.FormatUnit(self.target, "h"),
1303                                     utils.FormatUnit(self.disk.size, "h")),
1304                                    errors.ECODE_STATE)
1305     else:
1306       self.delta = self.op.amount
1307       self.target = self.disk.size + self.delta
1308       if self.delta < 0:
1309         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1310                                    utils.FormatUnit(self.delta, "h"),
1311                                    errors.ECODE_INVAL)
1312
1313     self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1314
1315   def _CheckDiskSpace(self, nodenames, req_vgspace):
1316     template = self.instance.disk_template
1317     if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1318       # TODO: check the free disk space for file, when that feature will be
1319       # supported
1320       nodes = map(self.cfg.GetNodeInfo, nodenames)
1321       es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1322                         nodes)
1323       if es_nodes:
1324         # With exclusive storage we need to something smarter than just looking
1325         # at free space; for now, let's simply abort the operation.
1326         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1327                                    " is enabled", errors.ECODE_STATE)
1328       CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1329
1330   def Exec(self, feedback_fn):
1331     """Execute disk grow.
1332
1333     """
1334     instance = self.instance
1335     disk = self.disk
1336
1337     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1338     assert (self.owned_locks(locking.LEVEL_NODE) ==
1339             self.owned_locks(locking.LEVEL_NODE_RES))
1340
1341     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1342
1343     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1344     if not disks_ok:
1345       raise errors.OpExecError("Cannot activate block device to grow")
1346
1347     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1348                 (self.op.disk, instance.name,
1349                  utils.FormatUnit(self.delta, "h"),
1350                  utils.FormatUnit(self.target, "h")))
1351
1352     # First run all grow ops in dry-run mode
1353     for node in instance.all_nodes:
1354       self.cfg.SetDiskID(disk, node)
1355       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1356                                            True, True)
1357       result.Raise("Dry-run grow request failed to node %s" % node)
1358
1359     if wipe_disks:
1360       # Get disk size from primary node for wiping
1361       result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1362       result.Raise("Failed to retrieve disk size from node '%s'" %
1363                    instance.primary_node)
1364
1365       (disk_size_in_bytes, ) = result.payload
1366
1367       if disk_size_in_bytes is None:
1368         raise errors.OpExecError("Failed to retrieve disk size from primary"
1369                                  " node '%s'" % instance.primary_node)
1370
1371       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1372
1373       assert old_disk_size >= disk.size, \
1374         ("Retrieved disk size too small (got %s, should be at least %s)" %
1375          (old_disk_size, disk.size))
1376     else:
1377       old_disk_size = None
1378
1379     # We know that (as far as we can test) operations across different
1380     # nodes will succeed, time to run it for real on the backing storage
1381     for node in instance.all_nodes:
1382       self.cfg.SetDiskID(disk, node)
1383       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1384                                            False, True)
1385       result.Raise("Grow request failed to node %s" % node)
1386
1387     # And now execute it for logical storage, on the primary node
1388     node = instance.primary_node
1389     self.cfg.SetDiskID(disk, node)
1390     result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1391                                          False, False)
1392     result.Raise("Grow request failed to node %s" % node)
1393
1394     disk.RecordGrow(self.delta)
1395     self.cfg.Update(instance, feedback_fn)
1396
1397     # Changes have been recorded, release node lock
1398     ReleaseLocks(self, locking.LEVEL_NODE)
1399
1400     # Downgrade lock while waiting for sync
1401     self.glm.downgrade(locking.LEVEL_INSTANCE)
1402
1403     assert wipe_disks ^ (old_disk_size is None)
1404
1405     if wipe_disks:
1406       assert instance.disks[self.op.disk] == disk
1407
1408       # Wipe newly added disk space
1409       WipeDisks(self, instance,
1410                 disks=[(self.op.disk, disk, old_disk_size)])
1411
1412     if self.op.wait_for_sync:
1413       disk_abort = not WaitForSync(self, instance, disks=[disk])
1414       if disk_abort:
1415         self.LogWarning("Disk syncing has not returned a good status; check"
1416                         " the instance")
1417       if instance.admin_state != constants.ADMINST_UP:
1418         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1419     elif instance.admin_state != constants.ADMINST_UP:
1420       self.LogWarning("Not shutting down the disk even if the instance is"
1421                       " not supposed to be running because no wait for"
1422                       " sync mode was requested")
1423
1424     assert self.owned_locks(locking.LEVEL_NODE_RES)
1425     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1426
1427
1428 class LUInstanceReplaceDisks(LogicalUnit):
1429   """Replace the disks of an instance.
1430
1431   """
1432   HPATH = "mirrors-replace"
1433   HTYPE = constants.HTYPE_INSTANCE
1434   REQ_BGL = False
1435
1436   def CheckArguments(self):
1437     """Check arguments.
1438
1439     """
1440     remote_node = self.op.remote_node
1441     ialloc = self.op.iallocator
1442     if self.op.mode == constants.REPLACE_DISK_CHG:
1443       if remote_node is None and ialloc is None:
1444         raise errors.OpPrereqError("When changing the secondary either an"
1445                                    " iallocator script must be used or the"
1446                                    " new node given", errors.ECODE_INVAL)
1447       else:
1448         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1449
1450     elif remote_node is not None or ialloc is not None:
1451       # Not replacing the secondary
1452       raise errors.OpPrereqError("The iallocator and new node options can"
1453                                  " only be used when changing the"
1454                                  " secondary node", errors.ECODE_INVAL)
1455
1456   def ExpandNames(self):
1457     self._ExpandAndLockInstance()
1458
1459     assert locking.LEVEL_NODE not in self.needed_locks
1460     assert locking.LEVEL_NODE_RES not in self.needed_locks
1461     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1462
1463     assert self.op.iallocator is None or self.op.remote_node is None, \
1464       "Conflicting options"
1465
1466     if self.op.remote_node is not None:
1467       self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1468
1469       # Warning: do not remove the locking of the new secondary here
1470       # unless DRBD8.AddChildren is changed to work in parallel;
1471       # currently it doesn't since parallel invocations of
1472       # FindUnusedMinor will conflict
1473       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1474       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1475     else:
1476       self.needed_locks[locking.LEVEL_NODE] = []
1477       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1478
1479       if self.op.iallocator is not None:
1480         # iallocator will select a new node in the same group
1481         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1482         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1483
1484     self.needed_locks[locking.LEVEL_NODE_RES] = []
1485
1486     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1487                                    self.op.iallocator, self.op.remote_node,
1488                                    self.op.disks, self.op.early_release,
1489                                    self.op.ignore_ipolicy)
1490
1491     self.tasklets = [self.replacer]
1492
1493   def DeclareLocks(self, level):
1494     if level == locking.LEVEL_NODEGROUP:
1495       assert self.op.remote_node is None
1496       assert self.op.iallocator is not None
1497       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1498
1499       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1500       # Lock all groups used by instance optimistically; this requires going
1501       # via the node before it's locked, requiring verification later on
1502       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1503         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1504
1505     elif level == locking.LEVEL_NODE:
1506       if self.op.iallocator is not None:
1507         assert self.op.remote_node is None
1508         assert not self.needed_locks[locking.LEVEL_NODE]
1509         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1510
1511         # Lock member nodes of all locked groups
1512         self.needed_locks[locking.LEVEL_NODE] = \
1513           [node_name
1514            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1515            for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1516       else:
1517         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1518
1519         self._LockInstancesNodes()
1520
1521     elif level == locking.LEVEL_NODE_RES:
1522       # Reuse node locks
1523       self.needed_locks[locking.LEVEL_NODE_RES] = \
1524         self.needed_locks[locking.LEVEL_NODE]
1525
1526   def BuildHooksEnv(self):
1527     """Build hooks env.
1528
1529     This runs on the master, the primary and all the secondaries.
1530
1531     """
1532     instance = self.replacer.instance
1533     env = {
1534       "MODE": self.op.mode,
1535       "NEW_SECONDARY": self.op.remote_node,
1536       "OLD_SECONDARY": instance.secondary_nodes[0],
1537       }
1538     env.update(BuildInstanceHookEnvByObject(self, instance))
1539     return env
1540
1541   def BuildHooksNodes(self):
1542     """Build hooks nodes.
1543
1544     """
1545     instance = self.replacer.instance
1546     nl = [
1547       self.cfg.GetMasterNode(),
1548       instance.primary_node,
1549       ]
1550     if self.op.remote_node is not None:
1551       nl.append(self.op.remote_node)
1552     return nl, nl
1553
1554   def CheckPrereq(self):
1555     """Check prerequisites.
1556
1557     """
1558     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1559             self.op.iallocator is None)
1560
1561     # Verify if node group locks are still correct
1562     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1563     if owned_groups:
1564       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1565
1566     return LogicalUnit.CheckPrereq(self)
1567
1568
1569 class LUInstanceActivateDisks(NoHooksLU):
1570   """Bring up an instance's disks.
1571
1572   """
1573   REQ_BGL = False
1574
1575   def ExpandNames(self):
1576     self._ExpandAndLockInstance()
1577     self.needed_locks[locking.LEVEL_NODE] = []
1578     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1579
1580   def DeclareLocks(self, level):
1581     if level == locking.LEVEL_NODE:
1582       self._LockInstancesNodes()
1583
1584   def CheckPrereq(self):
1585     """Check prerequisites.
1586
1587     This checks that the instance is in the cluster.
1588
1589     """
1590     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1591     assert self.instance is not None, \
1592       "Cannot retrieve locked instance %s" % self.op.instance_name
1593     CheckNodeOnline(self, self.instance.primary_node)
1594
1595   def Exec(self, feedback_fn):
1596     """Activate the disks.
1597
1598     """
1599     disks_ok, disks_info = \
1600               AssembleInstanceDisks(self, self.instance,
1601                                     ignore_size=self.op.ignore_size)
1602     if not disks_ok:
1603       raise errors.OpExecError("Cannot activate block devices")
1604
1605     if self.op.wait_for_sync:
1606       if not WaitForSync(self, self.instance):
1607         raise errors.OpExecError("Some disks of the instance are degraded!")
1608
1609     return disks_info
1610
1611
1612 class LUInstanceDeactivateDisks(NoHooksLU):
1613   """Shutdown an instance's disks.
1614
1615   """
1616   REQ_BGL = False
1617
1618   def ExpandNames(self):
1619     self._ExpandAndLockInstance()
1620     self.needed_locks[locking.LEVEL_NODE] = []
1621     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1622
1623   def DeclareLocks(self, level):
1624     if level == locking.LEVEL_NODE:
1625       self._LockInstancesNodes()
1626
1627   def CheckPrereq(self):
1628     """Check prerequisites.
1629
1630     This checks that the instance is in the cluster.
1631
1632     """
1633     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1634     assert self.instance is not None, \
1635       "Cannot retrieve locked instance %s" % self.op.instance_name
1636
1637   def Exec(self, feedback_fn):
1638     """Deactivate the disks
1639
1640     """
1641     instance = self.instance
1642     if self.op.force:
1643       ShutdownInstanceDisks(self, instance)
1644     else:
1645       _SafeShutdownInstanceDisks(self, instance)
1646
1647
1648 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1649                                ldisk=False):
1650   """Check that mirrors are not degraded.
1651
1652   @attention: The device has to be annotated already.
1653
1654   The ldisk parameter, if True, will change the test from the
1655   is_degraded attribute (which represents overall non-ok status for
1656   the device(s)) to the ldisk (representing the local storage status).
1657
1658   """
1659   lu.cfg.SetDiskID(dev, node)
1660
1661   result = True
1662
1663   if on_primary or dev.AssembleOnSecondary():
1664     rstats = lu.rpc.call_blockdev_find(node, dev)
1665     msg = rstats.fail_msg
1666     if msg:
1667       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1668       result = False
1669     elif not rstats.payload:
1670       lu.LogWarning("Can't find disk on node %s", node)
1671       result = False
1672     else:
1673       if ldisk:
1674         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1675       else:
1676         result = result and not rstats.payload.is_degraded
1677
1678   if dev.children:
1679     for child in dev.children:
1680       result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1681                                                      on_primary)
1682
1683   return result
1684
1685
1686 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1687   """Wrapper around L{_CheckDiskConsistencyInner}.
1688
1689   """
1690   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1691   return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1692                                     ldisk=ldisk)
1693
1694
1695 def _BlockdevFind(lu, node, dev, instance):
1696   """Wrapper around call_blockdev_find to annotate diskparams.
1697
1698   @param lu: A reference to the lu object
1699   @param node: The node to call out
1700   @param dev: The device to find
1701   @param instance: The instance object the device belongs to
1702   @returns The result of the rpc call
1703
1704   """
1705   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1706   return lu.rpc.call_blockdev_find(node, disk)
1707
1708
1709 def _GenerateUniqueNames(lu, exts):
1710   """Generate a suitable LV name.
1711
1712   This will generate a logical volume name for the given instance.
1713
1714   """
1715   results = []
1716   for val in exts:
1717     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1718     results.append("%s%s" % (new_id, val))
1719   return results
1720
1721
1722 class TLReplaceDisks(Tasklet):
1723   """Replaces disks for an instance.
1724
1725   Note: Locking is not within the scope of this class.
1726
1727   """
1728   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1729                disks, early_release, ignore_ipolicy):
1730     """Initializes this class.
1731
1732     """
1733     Tasklet.__init__(self, lu)
1734
1735     # Parameters
1736     self.instance_name = instance_name
1737     self.mode = mode
1738     self.iallocator_name = iallocator_name
1739     self.remote_node = remote_node
1740     self.disks = disks
1741     self.early_release = early_release
1742     self.ignore_ipolicy = ignore_ipolicy
1743
1744     # Runtime data
1745     self.instance = None
1746     self.new_node = None
1747     self.target_node = None
1748     self.other_node = None
1749     self.remote_node_info = None
1750     self.node_secondary_ip = None
1751
1752   @staticmethod
1753   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1754     """Compute a new secondary node using an IAllocator.
1755
1756     """
1757     req = iallocator.IAReqRelocate(name=instance_name,
1758                                    relocate_from=list(relocate_from))
1759     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1760
1761     ial.Run(iallocator_name)
1762
1763     if not ial.success:
1764       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1765                                  " %s" % (iallocator_name, ial.info),
1766                                  errors.ECODE_NORES)
1767
1768     remote_node_name = ial.result[0]
1769
1770     lu.LogInfo("Selected new secondary for instance '%s': %s",
1771                instance_name, remote_node_name)
1772
1773     return remote_node_name
1774
1775   def _FindFaultyDisks(self, node_name):
1776     """Wrapper for L{FindFaultyInstanceDisks}.
1777
1778     """
1779     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1780                                    node_name, True)
1781
1782   def _CheckDisksActivated(self, instance):
1783     """Checks if the instance disks are activated.
1784
1785     @param instance: The instance to check disks
1786     @return: True if they are activated, False otherwise
1787
1788     """
1789     nodes = instance.all_nodes
1790
1791     for idx, dev in enumerate(instance.disks):
1792       for node in nodes:
1793         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1794         self.cfg.SetDiskID(dev, node)
1795
1796         result = _BlockdevFind(self, node, dev, instance)
1797
1798         if result.offline:
1799           continue
1800         elif result.fail_msg or not result.payload:
1801           return False
1802
1803     return True
1804
1805   def CheckPrereq(self):
1806     """Check prerequisites.
1807
1808     This checks that the instance is in the cluster.
1809
1810     """
1811     self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1812     assert instance is not None, \
1813       "Cannot retrieve locked instance %s" % self.instance_name
1814
1815     if instance.disk_template != constants.DT_DRBD8:
1816       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1817                                  " instances", errors.ECODE_INVAL)
1818
1819     if len(instance.secondary_nodes) != 1:
1820       raise errors.OpPrereqError("The instance has a strange layout,"
1821                                  " expected one secondary but found %d" %
1822                                  len(instance.secondary_nodes),
1823                                  errors.ECODE_FAULT)
1824
1825     instance = self.instance
1826     secondary_node = instance.secondary_nodes[0]
1827
1828     if self.iallocator_name is None:
1829       remote_node = self.remote_node
1830     else:
1831       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1832                                        instance.name, instance.secondary_nodes)
1833
1834     if remote_node is None:
1835       self.remote_node_info = None
1836     else:
1837       assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1838              "Remote node '%s' is not locked" % remote_node
1839
1840       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1841       assert self.remote_node_info is not None, \
1842         "Cannot retrieve locked node %s" % remote_node
1843
1844     if remote_node == self.instance.primary_node:
1845       raise errors.OpPrereqError("The specified node is the primary node of"
1846                                  " the instance", errors.ECODE_INVAL)
1847
1848     if remote_node == secondary_node:
1849       raise errors.OpPrereqError("The specified node is already the"
1850                                  " secondary node of the instance",
1851                                  errors.ECODE_INVAL)
1852
1853     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1854                                     constants.REPLACE_DISK_CHG):
1855       raise errors.OpPrereqError("Cannot specify disks to be replaced",
1856                                  errors.ECODE_INVAL)
1857
1858     if self.mode == constants.REPLACE_DISK_AUTO:
1859       if not self._CheckDisksActivated(instance):
1860         raise errors.OpPrereqError("Please run activate-disks on instance %s"
1861                                    " first" % self.instance_name,
1862                                    errors.ECODE_STATE)
1863       faulty_primary = self._FindFaultyDisks(instance.primary_node)
1864       faulty_secondary = self._FindFaultyDisks(secondary_node)
1865
1866       if faulty_primary and faulty_secondary:
1867         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1868                                    " one node and can not be repaired"
1869                                    " automatically" % self.instance_name,
1870                                    errors.ECODE_STATE)
1871
1872       if faulty_primary:
1873         self.disks = faulty_primary
1874         self.target_node = instance.primary_node
1875         self.other_node = secondary_node
1876         check_nodes = [self.target_node, self.other_node]
1877       elif faulty_secondary:
1878         self.disks = faulty_secondary
1879         self.target_node = secondary_node
1880         self.other_node = instance.primary_node
1881         check_nodes = [self.target_node, self.other_node]
1882       else:
1883         self.disks = []
1884         check_nodes = []
1885
1886     else:
1887       # Non-automatic modes
1888       if self.mode == constants.REPLACE_DISK_PRI:
1889         self.target_node = instance.primary_node
1890         self.other_node = secondary_node
1891         check_nodes = [self.target_node, self.other_node]
1892
1893       elif self.mode == constants.REPLACE_DISK_SEC:
1894         self.target_node = secondary_node
1895         self.other_node = instance.primary_node
1896         check_nodes = [self.target_node, self.other_node]
1897
1898       elif self.mode == constants.REPLACE_DISK_CHG:
1899         self.new_node = remote_node
1900         self.other_node = instance.primary_node
1901         self.target_node = secondary_node
1902         check_nodes = [self.new_node, self.other_node]
1903
1904         CheckNodeNotDrained(self.lu, remote_node)
1905         CheckNodeVmCapable(self.lu, remote_node)
1906
1907         old_node_info = self.cfg.GetNodeInfo(secondary_node)
1908         assert old_node_info is not None
1909         if old_node_info.offline and not self.early_release:
1910           # doesn't make sense to delay the release
1911           self.early_release = True
1912           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1913                           " early-release mode", secondary_node)
1914
1915       else:
1916         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1917                                      self.mode)
1918
1919       # If not specified all disks should be replaced
1920       if not self.disks:
1921         self.disks = range(len(self.instance.disks))
1922
1923     # TODO: This is ugly, but right now we can't distinguish between internal
1924     # submitted opcode and external one. We should fix that.
1925     if self.remote_node_info:
1926       # We change the node, lets verify it still meets instance policy
1927       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1928       cluster = self.cfg.GetClusterInfo()
1929       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1930                                                               new_group_info)
1931       CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1932                              self.cfg, ignore=self.ignore_ipolicy)
1933
1934     for node in check_nodes:
1935       CheckNodeOnline(self.lu, node)
1936
1937     touched_nodes = frozenset(node_name for node_name in [self.new_node,
1938                                                           self.other_node,
1939                                                           self.target_node]
1940                               if node_name is not None)
1941
1942     # Release unneeded node and node resource locks
1943     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
1944     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
1945     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
1946
1947     # Release any owned node group
1948     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
1949
1950     # Check whether disks are valid
1951     for disk_idx in self.disks:
1952       instance.FindDisk(disk_idx)
1953
1954     # Get secondary node IP addresses
1955     self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
1956                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
1957
1958   def Exec(self, feedback_fn):
1959     """Execute disk replacement.
1960
1961     This dispatches the disk replacement to the appropriate handler.
1962
1963     """
1964     if __debug__:
1965       # Verify owned locks before starting operation
1966       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
1967       assert set(owned_nodes) == set(self.node_secondary_ip), \
1968           ("Incorrect node locks, owning %s, expected %s" %
1969            (owned_nodes, self.node_secondary_ip.keys()))
1970       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
1971               self.lu.owned_locks(locking.LEVEL_NODE_RES))
1972       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1973
1974       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
1975       assert list(owned_instances) == [self.instance_name], \
1976           "Instance '%s' not locked" % self.instance_name
1977
1978       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
1979           "Should not own any node group lock at this point"
1980
1981     if not self.disks:
1982       feedback_fn("No disks need replacement for instance '%s'" %
1983                   self.instance.name)
1984       return
1985
1986     feedback_fn("Replacing disk(s) %s for instance '%s'" %
1987                 (utils.CommaJoin(self.disks), self.instance.name))
1988     feedback_fn("Current primary node: %s" % self.instance.primary_node)
1989     feedback_fn("Current seconary node: %s" %
1990                 utils.CommaJoin(self.instance.secondary_nodes))
1991
1992     activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
1993
1994     # Activate the instance disks if we're replacing them on a down instance
1995     if activate_disks:
1996       StartInstanceDisks(self.lu, self.instance, True)
1997
1998     try:
1999       # Should we replace the secondary node?
2000       if self.new_node is not None:
2001         fn = self._ExecDrbd8Secondary
2002       else:
2003         fn = self._ExecDrbd8DiskOnly
2004
2005       result = fn(feedback_fn)
2006     finally:
2007       # Deactivate the instance disks if we're replacing them on a
2008       # down instance
2009       if activate_disks:
2010         _SafeShutdownInstanceDisks(self.lu, self.instance)
2011
2012     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2013
2014     if __debug__:
2015       # Verify owned locks
2016       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2017       nodes = frozenset(self.node_secondary_ip)
2018       assert ((self.early_release and not owned_nodes) or
2019               (not self.early_release and not (set(owned_nodes) - nodes))), \
2020         ("Not owning the correct locks, early_release=%s, owned=%r,"
2021          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2022
2023     return result
2024
2025   def _CheckVolumeGroup(self, nodes):
2026     self.lu.LogInfo("Checking volume groups")
2027
2028     vgname = self.cfg.GetVGName()
2029
2030     # Make sure volume group exists on all involved nodes
2031     results = self.rpc.call_vg_list(nodes)
2032     if not results:
2033       raise errors.OpExecError("Can't list volume groups on the nodes")
2034
2035     for node in nodes:
2036       res = results[node]
2037       res.Raise("Error checking node %s" % node)
2038       if vgname not in res.payload:
2039         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2040                                  (vgname, node))
2041
2042   def _CheckDisksExistence(self, nodes):
2043     # Check disk existence
2044     for idx, dev in enumerate(self.instance.disks):
2045       if idx not in self.disks:
2046         continue
2047
2048       for node in nodes:
2049         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2050         self.cfg.SetDiskID(dev, node)
2051
2052         result = _BlockdevFind(self, node, dev, self.instance)
2053
2054         msg = result.fail_msg
2055         if msg or not result.payload:
2056           if not msg:
2057             msg = "disk not found"
2058           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2059                                    (idx, node, msg))
2060
2061   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2062     for idx, dev in enumerate(self.instance.disks):
2063       if idx not in self.disks:
2064         continue
2065
2066       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2067                       (idx, node_name))
2068
2069       if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2070                                   on_primary, ldisk=ldisk):
2071         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2072                                  " replace disks for instance %s" %
2073                                  (node_name, self.instance.name))
2074
2075   def _CreateNewStorage(self, node_name):
2076     """Create new storage on the primary or secondary node.
2077
2078     This is only used for same-node replaces, not for changing the
2079     secondary node, hence we don't want to modify the existing disk.
2080
2081     """
2082     iv_names = {}
2083
2084     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2085     for idx, dev in enumerate(disks):
2086       if idx not in self.disks:
2087         continue
2088
2089       self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2090
2091       self.cfg.SetDiskID(dev, node_name)
2092
2093       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2094       names = _GenerateUniqueNames(self.lu, lv_names)
2095
2096       (data_disk, meta_disk) = dev.children
2097       vg_data = data_disk.logical_id[0]
2098       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2099                              logical_id=(vg_data, names[0]),
2100                              params=data_disk.params)
2101       vg_meta = meta_disk.logical_id[0]
2102       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2103                              size=constants.DRBD_META_SIZE,
2104                              logical_id=(vg_meta, names[1]),
2105                              params=meta_disk.params)
2106
2107       new_lvs = [lv_data, lv_meta]
2108       old_lvs = [child.Copy() for child in dev.children]
2109       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2110       excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2111
2112       # we pass force_create=True to force the LVM creation
2113       for new_lv in new_lvs:
2114         _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2115                              GetInstanceInfoText(self.instance), False,
2116                              excl_stor)
2117
2118     return iv_names
2119
2120   def _CheckDevices(self, node_name, iv_names):
2121     for name, (dev, _, _) in iv_names.iteritems():
2122       self.cfg.SetDiskID(dev, node_name)
2123
2124       result = _BlockdevFind(self, node_name, dev, self.instance)
2125
2126       msg = result.fail_msg
2127       if msg or not result.payload:
2128         if not msg:
2129           msg = "disk not found"
2130         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2131                                  (name, msg))
2132
2133       if result.payload.is_degraded:
2134         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2135
2136   def _RemoveOldStorage(self, node_name, iv_names):
2137     for name, (_, old_lvs, _) in iv_names.iteritems():
2138       self.lu.LogInfo("Remove logical volumes for %s", name)
2139
2140       for lv in old_lvs:
2141         self.cfg.SetDiskID(lv, node_name)
2142
2143         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2144         if msg:
2145           self.lu.LogWarning("Can't remove old LV: %s", msg,
2146                              hint="remove unused LVs manually")
2147
2148   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2149     """Replace a disk on the primary or secondary for DRBD 8.
2150
2151     The algorithm for replace is quite complicated:
2152
2153       1. for each disk to be replaced:
2154
2155         1. create new LVs on the target node with unique names
2156         1. detach old LVs from the drbd device
2157         1. rename old LVs to name_replaced.<time_t>
2158         1. rename new LVs to old LVs
2159         1. attach the new LVs (with the old names now) to the drbd device
2160
2161       1. wait for sync across all devices
2162
2163       1. for each modified disk:
2164
2165         1. remove old LVs (which have the name name_replaces.<time_t>)
2166
2167     Failures are not very well handled.
2168
2169     """
2170     steps_total = 6
2171
2172     # Step: check device activation
2173     self.lu.LogStep(1, steps_total, "Check device existence")
2174     self._CheckDisksExistence([self.other_node, self.target_node])
2175     self._CheckVolumeGroup([self.target_node, self.other_node])
2176
2177     # Step: check other node consistency
2178     self.lu.LogStep(2, steps_total, "Check peer consistency")
2179     self._CheckDisksConsistency(self.other_node,
2180                                 self.other_node == self.instance.primary_node,
2181                                 False)
2182
2183     # Step: create new storage
2184     self.lu.LogStep(3, steps_total, "Allocate new storage")
2185     iv_names = self._CreateNewStorage(self.target_node)
2186
2187     # Step: for each lv, detach+rename*2+attach
2188     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2189     for dev, old_lvs, new_lvs in iv_names.itervalues():
2190       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2191
2192       result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2193                                                      old_lvs)
2194       result.Raise("Can't detach drbd from local storage on node"
2195                    " %s for device %s" % (self.target_node, dev.iv_name))
2196       #dev.children = []
2197       #cfg.Update(instance)
2198
2199       # ok, we created the new LVs, so now we know we have the needed
2200       # storage; as such, we proceed on the target node to rename
2201       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2202       # using the assumption that logical_id == physical_id (which in
2203       # turn is the unique_id on that node)
2204
2205       # FIXME(iustin): use a better name for the replaced LVs
2206       temp_suffix = int(time.time())
2207       ren_fn = lambda d, suff: (d.physical_id[0],
2208                                 d.physical_id[1] + "_replaced-%s" % suff)
2209
2210       # Build the rename list based on what LVs exist on the node
2211       rename_old_to_new = []
2212       for to_ren in old_lvs:
2213         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2214         if not result.fail_msg and result.payload:
2215           # device exists
2216           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2217
2218       self.lu.LogInfo("Renaming the old LVs on the target node")
2219       result = self.rpc.call_blockdev_rename(self.target_node,
2220                                              rename_old_to_new)
2221       result.Raise("Can't rename old LVs on node %s" % self.target_node)
2222
2223       # Now we rename the new LVs to the old LVs
2224       self.lu.LogInfo("Renaming the new LVs on the target node")
2225       rename_new_to_old = [(new, old.physical_id)
2226                            for old, new in zip(old_lvs, new_lvs)]
2227       result = self.rpc.call_blockdev_rename(self.target_node,
2228                                              rename_new_to_old)
2229       result.Raise("Can't rename new LVs on node %s" % self.target_node)
2230
2231       # Intermediate steps of in memory modifications
2232       for old, new in zip(old_lvs, new_lvs):
2233         new.logical_id = old.logical_id
2234         self.cfg.SetDiskID(new, self.target_node)
2235
2236       # We need to modify old_lvs so that removal later removes the
2237       # right LVs, not the newly added ones; note that old_lvs is a
2238       # copy here
2239       for disk in old_lvs:
2240         disk.logical_id = ren_fn(disk, temp_suffix)
2241         self.cfg.SetDiskID(disk, self.target_node)
2242
2243       # Now that the new lvs have the old name, we can add them to the device
2244       self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2245       result = self.rpc.call_blockdev_addchildren(self.target_node,
2246                                                   (dev, self.instance), new_lvs)
2247       msg = result.fail_msg
2248       if msg:
2249         for new_lv in new_lvs:
2250           msg2 = self.rpc.call_blockdev_remove(self.target_node,
2251                                                new_lv).fail_msg
2252           if msg2:
2253             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2254                                hint=("cleanup manually the unused logical"
2255                                      "volumes"))
2256         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2257
2258     cstep = itertools.count(5)
2259
2260     if self.early_release:
2261       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2262       self._RemoveOldStorage(self.target_node, iv_names)
2263       # TODO: Check if releasing locks early still makes sense
2264       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2265     else:
2266       # Release all resource locks except those used by the instance
2267       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2268                    keep=self.node_secondary_ip.keys())
2269
2270     # Release all node locks while waiting for sync
2271     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2272
2273     # TODO: Can the instance lock be downgraded here? Take the optional disk
2274     # shutdown in the caller into consideration.
2275
2276     # Wait for sync
2277     # This can fail as the old devices are degraded and _WaitForSync
2278     # does a combined result over all disks, so we don't check its return value
2279     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2280     WaitForSync(self.lu, self.instance)
2281
2282     # Check all devices manually
2283     self._CheckDevices(self.instance.primary_node, iv_names)
2284
2285     # Step: remove old storage
2286     if not self.early_release:
2287       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2288       self._RemoveOldStorage(self.target_node, iv_names)
2289
2290   def _ExecDrbd8Secondary(self, feedback_fn):
2291     """Replace the secondary node for DRBD 8.
2292
2293     The algorithm for replace is quite complicated:
2294       - for all disks of the instance:
2295         - create new LVs on the new node with same names
2296         - shutdown the drbd device on the old secondary
2297         - disconnect the drbd network on the primary
2298         - create the drbd device on the new secondary
2299         - network attach the drbd on the primary, using an artifice:
2300           the drbd code for Attach() will connect to the network if it
2301           finds a device which is connected to the good local disks but
2302           not network enabled
2303       - wait for sync across all devices
2304       - remove all disks from the old secondary
2305
2306     Failures are not very well handled.
2307
2308     """
2309     steps_total = 6
2310
2311     pnode = self.instance.primary_node
2312
2313     # Step: check device activation
2314     self.lu.LogStep(1, steps_total, "Check device existence")
2315     self._CheckDisksExistence([self.instance.primary_node])
2316     self._CheckVolumeGroup([self.instance.primary_node])
2317
2318     # Step: check other node consistency
2319     self.lu.LogStep(2, steps_total, "Check peer consistency")
2320     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2321
2322     # Step: create new storage
2323     self.lu.LogStep(3, steps_total, "Allocate new storage")
2324     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2325     excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2326     for idx, dev in enumerate(disks):
2327       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2328                       (self.new_node, idx))
2329       # we pass force_create=True to force LVM creation
2330       for new_lv in dev.children:
2331         _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2332                              True, GetInstanceInfoText(self.instance), False,
2333                              excl_stor)
2334
2335     # Step 4: dbrd minors and drbd setups changes
2336     # after this, we must manually remove the drbd minors on both the
2337     # error and the success paths
2338     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2339     minors = self.cfg.AllocateDRBDMinor([self.new_node
2340                                          for dev in self.instance.disks],
2341                                         self.instance.name)
2342     logging.debug("Allocated minors %r", minors)
2343
2344     iv_names = {}
2345     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2346       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2347                       (self.new_node, idx))
2348       # create new devices on new_node; note that we create two IDs:
2349       # one without port, so the drbd will be activated without
2350       # networking information on the new node at this stage, and one
2351       # with network, for the latter activation in step 4
2352       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2353       if self.instance.primary_node == o_node1:
2354         p_minor = o_minor1
2355       else:
2356         assert self.instance.primary_node == o_node2, "Three-node instance?"
2357         p_minor = o_minor2
2358
2359       new_alone_id = (self.instance.primary_node, self.new_node, None,
2360                       p_minor, new_minor, o_secret)
2361       new_net_id = (self.instance.primary_node, self.new_node, o_port,
2362                     p_minor, new_minor, o_secret)
2363
2364       iv_names[idx] = (dev, dev.children, new_net_id)
2365       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2366                     new_net_id)
2367       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2368                               logical_id=new_alone_id,
2369                               children=dev.children,
2370                               size=dev.size,
2371                               params={})
2372       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2373                                             self.cfg)
2374       try:
2375         CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2376                              anno_new_drbd,
2377                              GetInstanceInfoText(self.instance), False,
2378                              excl_stor)
2379       except errors.GenericError:
2380         self.cfg.ReleaseDRBDMinors(self.instance.name)
2381         raise
2382
2383     # We have new devices, shutdown the drbd on the old secondary
2384     for idx, dev in enumerate(self.instance.disks):
2385       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2386       self.cfg.SetDiskID(dev, self.target_node)
2387       msg = self.rpc.call_blockdev_shutdown(self.target_node,
2388                                             (dev, self.instance)).fail_msg
2389       if msg:
2390         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2391                            "node: %s" % (idx, msg),
2392                            hint=("Please cleanup this device manually as"
2393                                  " soon as possible"))
2394
2395     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2396     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2397                                                self.instance.disks)[pnode]
2398
2399     msg = result.fail_msg
2400     if msg:
2401       # detaches didn't succeed (unlikely)
2402       self.cfg.ReleaseDRBDMinors(self.instance.name)
2403       raise errors.OpExecError("Can't detach the disks from the network on"
2404                                " old node: %s" % (msg,))
2405
2406     # if we managed to detach at least one, we update all the disks of
2407     # the instance to point to the new secondary
2408     self.lu.LogInfo("Updating instance configuration")
2409     for dev, _, new_logical_id in iv_names.itervalues():
2410       dev.logical_id = new_logical_id
2411       self.cfg.SetDiskID(dev, self.instance.primary_node)
2412
2413     self.cfg.Update(self.instance, feedback_fn)
2414
2415     # Release all node locks (the configuration has been updated)
2416     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2417
2418     # and now perform the drbd attach
2419     self.lu.LogInfo("Attaching primary drbds to new secondary"
2420                     " (standalone => connected)")
2421     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2422                                             self.new_node],
2423                                            self.node_secondary_ip,
2424                                            (self.instance.disks, self.instance),
2425                                            self.instance.name,
2426                                            False)
2427     for to_node, to_result in result.items():
2428       msg = to_result.fail_msg
2429       if msg:
2430         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2431                            to_node, msg,
2432                            hint=("please do a gnt-instance info to see the"
2433                                  " status of disks"))
2434
2435     cstep = itertools.count(5)
2436
2437     if self.early_release:
2438       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2439       self._RemoveOldStorage(self.target_node, iv_names)
2440       # TODO: Check if releasing locks early still makes sense
2441       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2442     else:
2443       # Release all resource locks except those used by the instance
2444       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2445                    keep=self.node_secondary_ip.keys())
2446
2447     # TODO: Can the instance lock be downgraded here? Take the optional disk
2448     # shutdown in the caller into consideration.
2449
2450     # Wait for sync
2451     # This can fail as the old devices are degraded and _WaitForSync
2452     # does a combined result over all disks, so we don't check its return value
2453     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2454     WaitForSync(self.lu, self.instance)
2455
2456     # Check all devices manually
2457     self._CheckDevices(self.instance.primary_node, iv_names)
2458
2459     # Step: remove old storage
2460     if not self.early_release:
2461       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2462       self._RemoveOldStorage(self.target_node, iv_names)