Merge branch 'stable-2.8' into master
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node)
93   result = lu.rpc.call_blockdev_create(node, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device, node, instance.name))
98   if device.physical_id is None:
99     device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103                          info, force_open, excl_stor):
104   """Create a tree of block devices on a given node.
105
106   If this device type has to be created on secondaries, create it and
107   all its children.
108
109   If not, just recurse to children keeping the same 'force' value.
110
111   @attention: The device has to be annotated already.
112
113   @param lu: the lu on whose behalf we execute
114   @param node: the node on which to create the device
115   @type instance: L{objects.Instance}
116   @param instance: the instance which owns the device
117   @type device: L{objects.Disk}
118   @param device: the device to create
119   @type force_create: boolean
120   @param force_create: whether to force creation of this device; this
121       will be change to True whenever we find a device which has
122       CreateOnSecondary() attribute
123   @param info: the extra 'metadata' we should attach to the device
124       (this will be represented as a LVM tag)
125   @type force_open: boolean
126   @param force_open: this parameter will be passes to the
127       L{backend.BlockdevCreate} function where it specifies
128       whether we run on primary or not, and it affects both
129       the child assembly and the device own Open() execution
130   @type excl_stor: boolean
131   @param excl_stor: Whether exclusive_storage is active for the node
132
133   @return: list of created devices
134   """
135   created_devices = []
136   try:
137     if device.CreateOnSecondary():
138       force_create = True
139
140     if device.children:
141       for child in device.children:
142         devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143                                     info, force_open, excl_stor)
144         created_devices.extend(devs)
145
146     if not force_create:
147       return created_devices
148
149     CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150                          excl_stor)
151     # The device has been completely created, so there is no point in keeping
152     # its subdevices in the list. We just add the device itself instead.
153     created_devices = [(node, device)]
154     return created_devices
155
156   except errors.DeviceCreationError, e:
157     e.created_devices.extend(created_devices)
158     raise e
159   except errors.OpExecError, e:
160     raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164   """Whether exclusive_storage is in effect for the given node.
165
166   @type cfg: L{config.ConfigWriter}
167   @param cfg: The cluster configuration
168   @type nodename: string
169   @param nodename: The node
170   @rtype: bool
171   @return: The effective value of exclusive_storage
172   @raise errors.OpPrereqError: if no node exists with the given name
173
174   """
175   ni = cfg.GetNodeInfo(nodename)
176   if ni is None:
177     raise errors.OpPrereqError("Invalid node name %s" % nodename,
178                                errors.ECODE_NOENT)
179   return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
183                     force_open):
184   """Wrapper around L{_CreateBlockDevInner}.
185
186   This method annotates the root device first.
187
188   """
189   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190   excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192                               force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created):
196   """Undo the work performed by L{CreateDisks}.
197
198   This function is called in case of an error to undo the work of
199   L{CreateDisks}.
200
201   @type lu: L{LogicalUnit}
202   @param lu: the logical unit on whose behalf we execute
203   @param disks_created: the result returned by L{CreateDisks}
204
205   """
206   for (node, disk) in disks_created:
207     lu.cfg.SetDiskID(disk, node)
208     result = lu.rpc.call_blockdev_remove(node, disk)
209     if result.fail_msg:
210       logging.warning("Failed to remove newly-created disk %s on node %s:"
211                       " %s", disk, node, result.fail_msg)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215   """Create all disks for an instance.
216
217   This abstracts away some work from AddInstance.
218
219   @type lu: L{LogicalUnit}
220   @param lu: the logical unit on whose behalf we execute
221   @type instance: L{objects.Instance}
222   @param instance: the instance whose disks we should create
223   @type to_skip: list
224   @param to_skip: list of indices to skip
225   @type target_node: string
226   @param target_node: if passed, overrides the target node for creation
227   @type disks: list of {objects.Disk}
228   @param disks: the disks to create; if not specified, all the disks of the
229       instance are created
230   @return: information about the created disks, to be used to call
231       L{_UndoCreateDisks}
232   @raise errors.OpPrereqError: in case of error
233
234   """
235   info = GetInstanceInfoText(instance)
236   if target_node is None:
237     pnode = instance.primary_node
238     all_nodes = instance.all_nodes
239   else:
240     pnode = target_node
241     all_nodes = [pnode]
242
243   if disks is None:
244     disks = instance.disks
245
246   if instance.disk_template in constants.DTS_FILEBASED:
247     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249
250     result.Raise("Failed to create directory '%s' on"
251                  " node %s" % (file_storage_dir, pnode))
252
253   disks_created = []
254   for idx, device in enumerate(disks):
255     if to_skip and idx in to_skip:
256       continue
257     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258     for node in all_nodes:
259       f_create = node == pnode
260       try:
261         _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262         disks_created.append((node, device))
263       except errors.DeviceCreationError, e:
264         logging.warning("Creating disk %s for instance '%s' failed",
265                         idx, instance.name)
266         disks_created.extend(e.created_devices)
267         _UndoCreateDisks(lu, disks_created)
268         raise errors.OpExecError(e.message)
269   return disks_created
270
271
272 def ComputeDiskSizePerVG(disk_template, disks):
273   """Compute disk size requirements in the volume group
274
275   """
276   def _compute(disks, payload):
277     """Universal algorithm.
278
279     """
280     vgs = {}
281     for disk in disks:
282       vgs[disk[constants.IDISK_VG]] = \
283         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284
285     return vgs
286
287   # Required free disk space as a function of disk and swap space
288   req_size_dict = {
289     constants.DT_DISKLESS: {},
290     constants.DT_PLAIN: _compute(disks, 0),
291     # 128 MB are added for drbd metadata for each disk
292     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293     constants.DT_FILE: {},
294     constants.DT_SHARED_FILE: {},
295     }
296
297   if disk_template not in req_size_dict:
298     raise errors.ProgrammerError("Disk template '%s' size requirement"
299                                  " is unknown" % disk_template)
300
301   return req_size_dict[disk_template]
302
303
304 def ComputeDisks(op, default_vg):
305   """Computes the instance disks.
306
307   @param op: The instance opcode
308   @param default_vg: The default_vg to assume
309
310   @return: The computed disks
311
312   """
313   disks = []
314   for disk in op.disks:
315     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316     if mode not in constants.DISK_ACCESS_SET:
317       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318                                  mode, errors.ECODE_INVAL)
319     size = disk.get(constants.IDISK_SIZE, None)
320     if size is None:
321       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322     try:
323       size = int(size)
324     except (TypeError, ValueError):
325       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326                                  errors.ECODE_INVAL)
327
328     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329     if ext_provider and op.disk_template != constants.DT_EXT:
330       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331                                  " disk template, not %s" %
332                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
333                                   op.disk_template), errors.ECODE_INVAL)
334
335     data_vg = disk.get(constants.IDISK_VG, default_vg)
336     name = disk.get(constants.IDISK_NAME, None)
337     if name is not None and name.lower() == constants.VALUE_NONE:
338       name = None
339     new_disk = {
340       constants.IDISK_SIZE: size,
341       constants.IDISK_MODE: mode,
342       constants.IDISK_VG: data_vg,
343       constants.IDISK_NAME: name,
344       }
345
346     if constants.IDISK_METAVG in disk:
347       new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348     if constants.IDISK_ADOPT in disk:
349       new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
350
351     # For extstorage, demand the `provider' option and add any
352     # additional parameters (ext-params) to the dict
353     if op.disk_template == constants.DT_EXT:
354       if ext_provider:
355         new_disk[constants.IDISK_PROVIDER] = ext_provider
356         for key in disk:
357           if key not in constants.IDISK_PARAMS:
358             new_disk[key] = disk[key]
359       else:
360         raise errors.OpPrereqError("Missing provider for template '%s'" %
361                                    constants.DT_EXT, errors.ECODE_INVAL)
362
363     disks.append(new_disk)
364
365   return disks
366
367
368 def CheckRADOSFreeSpace():
369   """Compute disk size requirements inside the RADOS cluster.
370
371   """
372   # For the RADOS cluster we assume there is always enough space.
373   pass
374
375
376 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377                          iv_name, p_minor, s_minor):
378   """Generate a drbd8 device complete with its children.
379
380   """
381   assert len(vgnames) == len(names) == 2
382   port = lu.cfg.AllocatePort()
383   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384
385   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386                           logical_id=(vgnames[0], names[0]),
387                           params={})
388   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389   dev_meta = objects.Disk(dev_type=constants.LD_LV,
390                           size=constants.DRBD_META_SIZE,
391                           logical_id=(vgnames[1], names[1]),
392                           params={})
393   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395                           logical_id=(primary, secondary, port,
396                                       p_minor, s_minor,
397                                       shared_secret),
398                           children=[dev_data, dev_meta],
399                           iv_name=iv_name, params={})
400   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401   return drbd_dev
402
403
404 def GenerateDiskTemplate(
405   lu, template_name, instance_name, primary_node, secondary_nodes,
406   disk_info, file_storage_dir, file_driver, base_index,
407   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408   _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409   """Generate the entire disk layout for a given template type.
410
411   """
412   vgname = lu.cfg.GetVGName()
413   disk_count = len(disk_info)
414   disks = []
415
416   if template_name == constants.DT_DISKLESS:
417     pass
418   elif template_name == constants.DT_DRBD8:
419     if len(secondary_nodes) != 1:
420       raise errors.ProgrammerError("Wrong template configuration")
421     remote_node = secondary_nodes[0]
422     minors = lu.cfg.AllocateDRBDMinor(
423       [primary_node, remote_node] * len(disk_info), instance_name)
424
425     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426                                                        full_disk_params)
427     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428
429     names = []
430     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431                                                for i in range(disk_count)]):
432       names.append(lv_prefix + "_data")
433       names.append(lv_prefix + "_meta")
434     for idx, disk in enumerate(disk_info):
435       disk_index = idx + base_index
436       data_vg = disk.get(constants.IDISK_VG, vgname)
437       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439                                       disk[constants.IDISK_SIZE],
440                                       [data_vg, meta_vg],
441                                       names[idx * 2:idx * 2 + 2],
442                                       "disk/%d" % disk_index,
443                                       minors[idx * 2], minors[idx * 2 + 1])
444       disk_dev.mode = disk[constants.IDISK_MODE]
445       disk_dev.name = disk.get(constants.IDISK_NAME, None)
446       disks.append(disk_dev)
447   else:
448     if secondary_nodes:
449       raise errors.ProgrammerError("Wrong template configuration")
450
451     if template_name == constants.DT_FILE:
452       _req_file_storage()
453     elif template_name == constants.DT_SHARED_FILE:
454       _req_shr_file_storage()
455
456     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457     if name_prefix is None:
458       names = None
459     else:
460       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461                                         (name_prefix, base_index + i)
462                                         for i in range(disk_count)])
463
464     if template_name == constants.DT_PLAIN:
465
466       def logical_id_fn(idx, _, disk):
467         vg = disk.get(constants.IDISK_VG, vgname)
468         return (vg, names[idx])
469
470     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
471       logical_id_fn = \
472         lambda _, disk_index, disk: (file_driver,
473                                      "%s/disk%d" % (file_storage_dir,
474                                                     disk_index))
475     elif template_name == constants.DT_BLOCK:
476       logical_id_fn = \
477         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478                                        disk[constants.IDISK_ADOPT])
479     elif template_name == constants.DT_RBD:
480       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481     elif template_name == constants.DT_EXT:
482       def logical_id_fn(idx, _, disk):
483         provider = disk.get(constants.IDISK_PROVIDER, None)
484         if provider is None:
485           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486                                        " not found", constants.DT_EXT,
487                                        constants.IDISK_PROVIDER)
488         return (provider, names[idx])
489     else:
490       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491
492     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
493
494     for idx, disk in enumerate(disk_info):
495       params = {}
496       # Only for the Ext template add disk_info to params
497       if template_name == constants.DT_EXT:
498         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499         for key in disk:
500           if key not in constants.IDISK_PARAMS:
501             params[key] = disk[key]
502       disk_index = idx + base_index
503       size = disk[constants.IDISK_SIZE]
504       feedback_fn("* disk %s, size %s" %
505                   (disk_index, utils.FormatUnit(size, "h")))
506       disk_dev = objects.Disk(dev_type=dev_type, size=size,
507                               logical_id=logical_id_fn(idx, disk_index, disk),
508                               iv_name="disk/%d" % disk_index,
509                               mode=disk[constants.IDISK_MODE],
510                               params=params)
511       disk_dev.name = disk.get(constants.IDISK_NAME, None)
512       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513       disks.append(disk_dev)
514
515   return disks
516
517
518 class LUInstanceRecreateDisks(LogicalUnit):
519   """Recreate an instance's missing disks.
520
521   """
522   HPATH = "instance-recreate-disks"
523   HTYPE = constants.HTYPE_INSTANCE
524   REQ_BGL = False
525
526   _MODIFYABLE = compat.UniqueFrozenset([
527     constants.IDISK_SIZE,
528     constants.IDISK_MODE,
529     ])
530
531   # New or changed disk parameters may have different semantics
532   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
533     constants.IDISK_ADOPT,
534
535     # TODO: Implement support changing VG while recreating
536     constants.IDISK_VG,
537     constants.IDISK_METAVG,
538     constants.IDISK_PROVIDER,
539     constants.IDISK_NAME,
540     ]))
541
542   def _RunAllocator(self):
543     """Run the allocator based on input opcode.
544
545     """
546     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
547
548     # FIXME
549     # The allocator should actually run in "relocate" mode, but current
550     # allocators don't support relocating all the nodes of an instance at
551     # the same time. As a workaround we use "allocate" mode, but this is
552     # suboptimal for two reasons:
553     # - The instance name passed to the allocator is present in the list of
554     #   existing instances, so there could be a conflict within the
555     #   internal structures of the allocator. This doesn't happen with the
556     #   current allocators, but it's a liability.
557     # - The allocator counts the resources used by the instance twice: once
558     #   because the instance exists already, and once because it tries to
559     #   allocate a new instance.
560     # The allocator could choose some of the nodes on which the instance is
561     # running, but that's not a problem. If the instance nodes are broken,
562     # they should be already be marked as drained or offline, and hence
563     # skipped by the allocator. If instance disks have been lost for other
564     # reasons, then recreating the disks on the same nodes should be fine.
565     disk_template = self.instance.disk_template
566     spindle_use = be_full[constants.BE_SPINDLE_USE]
567     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
568                                         disk_template=disk_template,
569                                         tags=list(self.instance.GetTags()),
570                                         os=self.instance.os,
571                                         nics=[{}],
572                                         vcpus=be_full[constants.BE_VCPUS],
573                                         memory=be_full[constants.BE_MAXMEM],
574                                         spindle_use=spindle_use,
575                                         disks=[{constants.IDISK_SIZE: d.size,
576                                                 constants.IDISK_MODE: d.mode}
577                                                for d in self.instance.disks],
578                                         hypervisor=self.instance.hypervisor,
579                                         node_whitelist=None)
580     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
581
582     ial.Run(self.op.iallocator)
583
584     assert req.RequiredNodes() == len(self.instance.all_nodes)
585
586     if not ial.success:
587       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
588                                  " %s" % (self.op.iallocator, ial.info),
589                                  errors.ECODE_NORES)
590
591     self.op.nodes = ial.result
592     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
593                  self.op.instance_name, self.op.iallocator,
594                  utils.CommaJoin(ial.result))
595
596   def CheckArguments(self):
597     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
598       # Normalize and convert deprecated list of disk indices
599       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
600
601     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
602     if duplicates:
603       raise errors.OpPrereqError("Some disks have been specified more than"
604                                  " once: %s" % utils.CommaJoin(duplicates),
605                                  errors.ECODE_INVAL)
606
607     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
608     # when neither iallocator nor nodes are specified
609     if self.op.iallocator or self.op.nodes:
610       CheckIAllocatorOrNode(self, "iallocator", "nodes")
611
612     for (idx, params) in self.op.disks:
613       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
614       unsupported = frozenset(params.keys()) - self._MODIFYABLE
615       if unsupported:
616         raise errors.OpPrereqError("Parameters for disk %s try to change"
617                                    " unmodifyable parameter(s): %s" %
618                                    (idx, utils.CommaJoin(unsupported)),
619                                    errors.ECODE_INVAL)
620
621   def ExpandNames(self):
622     self._ExpandAndLockInstance()
623     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
624
625     if self.op.nodes:
626       self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
627       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
628     else:
629       self.needed_locks[locking.LEVEL_NODE] = []
630       if self.op.iallocator:
631         # iallocator will select a new node in the same group
632         self.needed_locks[locking.LEVEL_NODEGROUP] = []
633         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
634
635     self.needed_locks[locking.LEVEL_NODE_RES] = []
636
637   def DeclareLocks(self, level):
638     if level == locking.LEVEL_NODEGROUP:
639       assert self.op.iallocator is not None
640       assert not self.op.nodes
641       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
642       self.share_locks[locking.LEVEL_NODEGROUP] = 1
643       # Lock the primary group used by the instance optimistically; this
644       # requires going via the node before it's locked, requiring
645       # verification later on
646       self.needed_locks[locking.LEVEL_NODEGROUP] = \
647         self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
648
649     elif level == locking.LEVEL_NODE:
650       # If an allocator is used, then we lock all the nodes in the current
651       # instance group, as we don't know yet which ones will be selected;
652       # if we replace the nodes without using an allocator, locks are
653       # already declared in ExpandNames; otherwise, we need to lock all the
654       # instance nodes for disk re-creation
655       if self.op.iallocator:
656         assert not self.op.nodes
657         assert not self.needed_locks[locking.LEVEL_NODE]
658         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
659
660         # Lock member nodes of the group of the primary node
661         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
662           self.needed_locks[locking.LEVEL_NODE].extend(
663             self.cfg.GetNodeGroup(group_uuid).members)
664
665         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
666       elif not self.op.nodes:
667         self._LockInstancesNodes(primary_only=False)
668     elif level == locking.LEVEL_NODE_RES:
669       # Copy node locks
670       self.needed_locks[locking.LEVEL_NODE_RES] = \
671         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
672
673   def BuildHooksEnv(self):
674     """Build hooks env.
675
676     This runs on master, primary and secondary nodes of the instance.
677
678     """
679     return BuildInstanceHookEnvByObject(self, self.instance)
680
681   def BuildHooksNodes(self):
682     """Build hooks nodes.
683
684     """
685     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
686     return (nl, nl)
687
688   def CheckPrereq(self):
689     """Check prerequisites.
690
691     This checks that the instance is in the cluster and is not running.
692
693     """
694     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
695     assert instance is not None, \
696       "Cannot retrieve locked instance %s" % self.op.instance_name
697     if self.op.nodes:
698       if len(self.op.nodes) != len(instance.all_nodes):
699         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
700                                    " %d replacement nodes were specified" %
701                                    (instance.name, len(instance.all_nodes),
702                                     len(self.op.nodes)),
703                                    errors.ECODE_INVAL)
704       assert instance.disk_template != constants.DT_DRBD8 or \
705              len(self.op.nodes) == 2
706       assert instance.disk_template != constants.DT_PLAIN or \
707              len(self.op.nodes) == 1
708       primary_node = self.op.nodes[0]
709     else:
710       primary_node = instance.primary_node
711     if not self.op.iallocator:
712       CheckNodeOnline(self, primary_node)
713
714     if instance.disk_template == constants.DT_DISKLESS:
715       raise errors.OpPrereqError("Instance '%s' has no disks" %
716                                  self.op.instance_name, errors.ECODE_INVAL)
717
718     # Verify if node group locks are still correct
719     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
720     if owned_groups:
721       # Node group locks are acquired only for the primary node (and only
722       # when the allocator is used)
723       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
724                               primary_only=True)
725
726     # if we replace nodes *and* the old primary is offline, we don't
727     # check the instance state
728     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
729     if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
730       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
731                          msg="cannot recreate disks")
732
733     if self.op.disks:
734       self.disks = dict(self.op.disks)
735     else:
736       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
737
738     maxidx = max(self.disks.keys())
739     if maxidx >= len(instance.disks):
740       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
741                                  errors.ECODE_INVAL)
742
743     if ((self.op.nodes or self.op.iallocator) and
744          sorted(self.disks.keys()) != range(len(instance.disks))):
745       raise errors.OpPrereqError("Can't recreate disks partially and"
746                                  " change the nodes at the same time",
747                                  errors.ECODE_INVAL)
748
749     self.instance = instance
750
751     if self.op.iallocator:
752       self._RunAllocator()
753       # Release unneeded node and node resource locks
754       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
755       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
756       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
757
758     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
759
760   def Exec(self, feedback_fn):
761     """Recreate the disks.
762
763     """
764     instance = self.instance
765
766     assert (self.owned_locks(locking.LEVEL_NODE) ==
767             self.owned_locks(locking.LEVEL_NODE_RES))
768
769     to_skip = []
770     mods = [] # keeps track of needed changes
771
772     for idx, disk in enumerate(instance.disks):
773       try:
774         changes = self.disks[idx]
775       except KeyError:
776         # Disk should not be recreated
777         to_skip.append(idx)
778         continue
779
780       # update secondaries for disks, if needed
781       if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
782         # need to update the nodes and minors
783         assert len(self.op.nodes) == 2
784         assert len(disk.logical_id) == 6 # otherwise disk internals
785                                          # have changed
786         (_, _, old_port, _, _, old_secret) = disk.logical_id
787         new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
788         new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
789                   new_minors[0], new_minors[1], old_secret)
790         assert len(disk.logical_id) == len(new_id)
791       else:
792         new_id = None
793
794       mods.append((idx, new_id, changes))
795
796     # now that we have passed all asserts above, we can apply the mods
797     # in a single run (to avoid partial changes)
798     for idx, new_id, changes in mods:
799       disk = instance.disks[idx]
800       if new_id is not None:
801         assert disk.dev_type == constants.LD_DRBD8
802         disk.logical_id = new_id
803       if changes:
804         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
805                     mode=changes.get(constants.IDISK_MODE, None))
806
807     # change primary node, if needed
808     if self.op.nodes:
809       instance.primary_node = self.op.nodes[0]
810       self.LogWarning("Changing the instance's nodes, you will have to"
811                       " remove any disks left on the older nodes manually")
812
813     if self.op.nodes:
814       self.cfg.Update(instance, feedback_fn)
815
816     # All touched nodes must be locked
817     mylocks = self.owned_locks(locking.LEVEL_NODE)
818     assert mylocks.issuperset(frozenset(instance.all_nodes))
819     new_disks = CreateDisks(self, instance, to_skip=to_skip)
820
821     # TODO: Release node locks before wiping, or explain why it's not possible
822     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
823       wipedisks = [(idx, disk, 0)
824                    for (idx, disk) in enumerate(instance.disks)
825                    if idx not in to_skip]
826       WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
827
828
829 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
830   """Checks if nodes have enough free disk space in the specified VG.
831
832   This function checks if all given nodes have the needed amount of
833   free disk. In case any node has less disk or we cannot get the
834   information from the node, this function raises an OpPrereqError
835   exception.
836
837   @type lu: C{LogicalUnit}
838   @param lu: a logical unit from which we get configuration data
839   @type nodenames: C{list}
840   @param nodenames: the list of node names to check
841   @type vg: C{str}
842   @param vg: the volume group to check
843   @type requested: C{int}
844   @param requested: the amount of disk in MiB to check for
845   @raise errors.OpPrereqError: if the node doesn't have enough disk,
846       or we cannot check the node
847
848   """
849   es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
850   nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
851   for node in nodenames:
852     info = nodeinfo[node]
853     info.Raise("Cannot get current information from node %s" % node,
854                prereq=True, ecode=errors.ECODE_ENVIRON)
855     (_, (vg_info, ), _) = info.payload
856     vg_free = vg_info.get("vg_free", None)
857     if not isinstance(vg_free, int):
858       raise errors.OpPrereqError("Can't compute free disk space on node"
859                                  " %s for vg %s, result was '%s'" %
860                                  (node, vg, vg_free), errors.ECODE_ENVIRON)
861     if requested > vg_free:
862       raise errors.OpPrereqError("Not enough disk space on target node %s"
863                                  " vg %s: required %d MiB, available %d MiB" %
864                                  (node, vg, requested, vg_free),
865                                  errors.ECODE_NORES)
866
867
868 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
869   """Checks if nodes have enough free disk space in all the VGs.
870
871   This function checks if all given nodes have the needed amount of
872   free disk. In case any node has less disk or we cannot get the
873   information from the node, this function raises an OpPrereqError
874   exception.
875
876   @type lu: C{LogicalUnit}
877   @param lu: a logical unit from which we get configuration data
878   @type nodenames: C{list}
879   @param nodenames: the list of node names to check
880   @type req_sizes: C{dict}
881   @param req_sizes: the hash of vg and corresponding amount of disk in
882       MiB to check for
883   @raise errors.OpPrereqError: if the node doesn't have enough disk,
884       or we cannot check the node
885
886   """
887   for vg, req_size in req_sizes.items():
888     _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
889
890
891 def _DiskSizeInBytesToMebibytes(lu, size):
892   """Converts a disk size in bytes to mebibytes.
893
894   Warns and rounds up if the size isn't an even multiple of 1 MiB.
895
896   """
897   (mib, remainder) = divmod(size, 1024 * 1024)
898
899   if remainder != 0:
900     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
901                   " to not overwrite existing data (%s bytes will not be"
902                   " wiped)", (1024 * 1024) - remainder)
903     mib += 1
904
905   return mib
906
907
908 def _CalcEta(time_taken, written, total_size):
909   """Calculates the ETA based on size written and total size.
910
911   @param time_taken: The time taken so far
912   @param written: amount written so far
913   @param total_size: The total size of data to be written
914   @return: The remaining time in seconds
915
916   """
917   avg_time = time_taken / float(written)
918   return (total_size - written) * avg_time
919
920
921 def WipeDisks(lu, instance, disks=None):
922   """Wipes instance disks.
923
924   @type lu: L{LogicalUnit}
925   @param lu: the logical unit on whose behalf we execute
926   @type instance: L{objects.Instance}
927   @param instance: the instance whose disks we should create
928   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
929   @param disks: Disk details; tuple contains disk index, disk object and the
930     start offset
931
932   """
933   node = instance.primary_node
934
935   if disks is None:
936     disks = [(idx, disk, 0)
937              for (idx, disk) in enumerate(instance.disks)]
938
939   for (_, device, _) in disks:
940     lu.cfg.SetDiskID(device, node)
941
942   logging.info("Pausing synchronization of disks of instance '%s'",
943                instance.name)
944   result = lu.rpc.call_blockdev_pause_resume_sync(node,
945                                                   (map(compat.snd, disks),
946                                                    instance),
947                                                   True)
948   result.Raise("Failed to pause disk synchronization on node '%s'" % node)
949
950   for idx, success in enumerate(result.payload):
951     if not success:
952       logging.warn("Pausing synchronization of disk %s of instance '%s'"
953                    " failed", idx, instance.name)
954
955   try:
956     for (idx, device, offset) in disks:
957       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
958       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
959       wipe_chunk_size = \
960         int(min(constants.MAX_WIPE_CHUNK,
961                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
962
963       size = device.size
964       last_output = 0
965       start_time = time.time()
966
967       if offset == 0:
968         info_text = ""
969       else:
970         info_text = (" (from %s to %s)" %
971                      (utils.FormatUnit(offset, "h"),
972                       utils.FormatUnit(size, "h")))
973
974       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
975
976       logging.info("Wiping disk %d for instance %s on node %s using"
977                    " chunk size %s", idx, instance.name, node, wipe_chunk_size)
978
979       while offset < size:
980         wipe_size = min(wipe_chunk_size, size - offset)
981
982         logging.debug("Wiping disk %d, offset %s, chunk %s",
983                       idx, offset, wipe_size)
984
985         result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
986                                            wipe_size)
987         result.Raise("Could not wipe disk %d at offset %d for size %d" %
988                      (idx, offset, wipe_size))
989
990         now = time.time()
991         offset += wipe_size
992         if now - last_output >= 60:
993           eta = _CalcEta(now - start_time, offset, size)
994           lu.LogInfo(" - done: %.1f%% ETA: %s",
995                      offset / float(size) * 100, utils.FormatSeconds(eta))
996           last_output = now
997   finally:
998     logging.info("Resuming synchronization of disks for instance '%s'",
999                  instance.name)
1000
1001     result = lu.rpc.call_blockdev_pause_resume_sync(node,
1002                                                     (map(compat.snd, disks),
1003                                                      instance),
1004                                                     False)
1005
1006     if result.fail_msg:
1007       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1008                     node, result.fail_msg)
1009     else:
1010       for idx, success in enumerate(result.payload):
1011         if not success:
1012           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1013                         " failed", idx, instance.name)
1014
1015
1016 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1017   """Wrapper for L{WipeDisks} that handles errors.
1018
1019   @type lu: L{LogicalUnit}
1020   @param lu: the logical unit on whose behalf we execute
1021   @type instance: L{objects.Instance}
1022   @param instance: the instance whose disks we should wipe
1023   @param disks: see L{WipeDisks}
1024   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1025       case of error
1026   @raise errors.OpPrereqError: in case of failure
1027
1028   """
1029   try:
1030     WipeDisks(lu, instance, disks=disks)
1031   except errors.OpExecError:
1032     logging.warning("Wiping disks for instance '%s' failed",
1033                     instance.name)
1034     _UndoCreateDisks(lu, cleanup)
1035     raise
1036
1037
1038 def ExpandCheckDisks(instance, disks):
1039   """Return the instance disks selected by the disks list
1040
1041   @type disks: list of L{objects.Disk} or None
1042   @param disks: selected disks
1043   @rtype: list of L{objects.Disk}
1044   @return: selected instance disks to act on
1045
1046   """
1047   if disks is None:
1048     return instance.disks
1049   else:
1050     if not set(disks).issubset(instance.disks):
1051       raise errors.ProgrammerError("Can only act on disks belonging to the"
1052                                    " target instance")
1053     return disks
1054
1055
1056 def WaitForSync(lu, instance, disks=None, oneshot=False):
1057   """Sleep and poll for an instance's disk to sync.
1058
1059   """
1060   if not instance.disks or disks is not None and not disks:
1061     return True
1062
1063   disks = ExpandCheckDisks(instance, disks)
1064
1065   if not oneshot:
1066     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1067
1068   node = instance.primary_node
1069
1070   for dev in disks:
1071     lu.cfg.SetDiskID(dev, node)
1072
1073   # TODO: Convert to utils.Retry
1074
1075   retries = 0
1076   degr_retries = 10 # in seconds, as we sleep 1 second each time
1077   while True:
1078     max_time = 0
1079     done = True
1080     cumul_degraded = False
1081     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1082     msg = rstats.fail_msg
1083     if msg:
1084       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1085       retries += 1
1086       if retries >= 10:
1087         raise errors.RemoteError("Can't contact node %s for mirror data,"
1088                                  " aborting." % node)
1089       time.sleep(6)
1090       continue
1091     rstats = rstats.payload
1092     retries = 0
1093     for i, mstat in enumerate(rstats):
1094       if mstat is None:
1095         lu.LogWarning("Can't compute data for node %s/%s",
1096                       node, disks[i].iv_name)
1097         continue
1098
1099       cumul_degraded = (cumul_degraded or
1100                         (mstat.is_degraded and mstat.sync_percent is None))
1101       if mstat.sync_percent is not None:
1102         done = False
1103         if mstat.estimated_time is not None:
1104           rem_time = ("%s remaining (estimated)" %
1105                       utils.FormatSeconds(mstat.estimated_time))
1106           max_time = mstat.estimated_time
1107         else:
1108           rem_time = "no time estimate"
1109         lu.LogInfo("- device %s: %5.2f%% done, %s",
1110                    disks[i].iv_name, mstat.sync_percent, rem_time)
1111
1112     # if we're done but degraded, let's do a few small retries, to
1113     # make sure we see a stable and not transient situation; therefore
1114     # we force restart of the loop
1115     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1116       logging.info("Degraded disks found, %d retries left", degr_retries)
1117       degr_retries -= 1
1118       time.sleep(1)
1119       continue
1120
1121     if done or oneshot:
1122       break
1123
1124     time.sleep(min(60, max_time))
1125
1126   if done:
1127     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1128
1129   return not cumul_degraded
1130
1131
1132 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1133   """Shutdown block devices of an instance.
1134
1135   This does the shutdown on all nodes of the instance.
1136
1137   If the ignore_primary is false, errors on the primary node are
1138   ignored.
1139
1140   """
1141   all_result = True
1142   disks = ExpandCheckDisks(instance, disks)
1143
1144   for disk in disks:
1145     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1146       lu.cfg.SetDiskID(top_disk, node)
1147       result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1148       msg = result.fail_msg
1149       if msg:
1150         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1151                       disk.iv_name, node, msg)
1152         if ((node == instance.primary_node and not ignore_primary) or
1153             (node != instance.primary_node and not result.offline)):
1154           all_result = False
1155   return all_result
1156
1157
1158 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1159   """Shutdown block devices of an instance.
1160
1161   This function checks if an instance is running, before calling
1162   _ShutdownInstanceDisks.
1163
1164   """
1165   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1166   ShutdownInstanceDisks(lu, instance, disks=disks)
1167
1168
1169 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1170                            ignore_size=False):
1171   """Prepare the block devices for an instance.
1172
1173   This sets up the block devices on all nodes.
1174
1175   @type lu: L{LogicalUnit}
1176   @param lu: the logical unit on whose behalf we execute
1177   @type instance: L{objects.Instance}
1178   @param instance: the instance for whose disks we assemble
1179   @type disks: list of L{objects.Disk} or None
1180   @param disks: which disks to assemble (or all, if None)
1181   @type ignore_secondaries: boolean
1182   @param ignore_secondaries: if true, errors on secondary nodes
1183       won't result in an error return from the function
1184   @type ignore_size: boolean
1185   @param ignore_size: if true, the current known size of the disk
1186       will not be used during the disk activation, useful for cases
1187       when the size is wrong
1188   @return: False if the operation failed, otherwise a list of
1189       (host, instance_visible_name, node_visible_name)
1190       with the mapping from node devices to instance devices
1191
1192   """
1193   device_info = []
1194   disks_ok = True
1195   iname = instance.name
1196   disks = ExpandCheckDisks(instance, disks)
1197
1198   # With the two passes mechanism we try to reduce the window of
1199   # opportunity for the race condition of switching DRBD to primary
1200   # before handshaking occured, but we do not eliminate it
1201
1202   # The proper fix would be to wait (with some limits) until the
1203   # connection has been made and drbd transitions from WFConnection
1204   # into any other network-connected state (Connected, SyncTarget,
1205   # SyncSource, etc.)
1206
1207   # 1st pass, assemble on all nodes in secondary mode
1208   for idx, inst_disk in enumerate(disks):
1209     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1210       if ignore_size:
1211         node_disk = node_disk.Copy()
1212         node_disk.UnsetSize()
1213       lu.cfg.SetDiskID(node_disk, node)
1214       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1215                                              False, idx)
1216       msg = result.fail_msg
1217       if msg:
1218         is_offline_secondary = (node in instance.secondary_nodes and
1219                                 result.offline)
1220         lu.LogWarning("Could not prepare block device %s on node %s"
1221                       " (is_primary=False, pass=1): %s",
1222                       inst_disk.iv_name, node, msg)
1223         if not (ignore_secondaries or is_offline_secondary):
1224           disks_ok = False
1225
1226   # FIXME: race condition on drbd migration to primary
1227
1228   # 2nd pass, do only the primary node
1229   for idx, inst_disk in enumerate(disks):
1230     dev_path = None
1231
1232     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1233       if node != instance.primary_node:
1234         continue
1235       if ignore_size:
1236         node_disk = node_disk.Copy()
1237         node_disk.UnsetSize()
1238       lu.cfg.SetDiskID(node_disk, node)
1239       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1240                                              True, idx)
1241       msg = result.fail_msg
1242       if msg:
1243         lu.LogWarning("Could not prepare block device %s on node %s"
1244                       " (is_primary=True, pass=2): %s",
1245                       inst_disk.iv_name, node, msg)
1246         disks_ok = False
1247       else:
1248         dev_path = result.payload
1249
1250     device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1251
1252   # leave the disks configured for the primary node
1253   # this is a workaround that would be fixed better by
1254   # improving the logical/physical id handling
1255   for disk in disks:
1256     lu.cfg.SetDiskID(disk, instance.primary_node)
1257
1258   return disks_ok, device_info
1259
1260
1261 def StartInstanceDisks(lu, instance, force):
1262   """Start the disks of an instance.
1263
1264   """
1265   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1266                                       ignore_secondaries=force)
1267   if not disks_ok:
1268     ShutdownInstanceDisks(lu, instance)
1269     if force is not None and not force:
1270       lu.LogWarning("",
1271                     hint=("If the message above refers to a secondary node,"
1272                           " you can retry the operation using '--force'"))
1273     raise errors.OpExecError("Disk consistency error")
1274
1275
1276 class LUInstanceGrowDisk(LogicalUnit):
1277   """Grow a disk of an instance.
1278
1279   """
1280   HPATH = "disk-grow"
1281   HTYPE = constants.HTYPE_INSTANCE
1282   REQ_BGL = False
1283
1284   def ExpandNames(self):
1285     self._ExpandAndLockInstance()
1286     self.needed_locks[locking.LEVEL_NODE] = []
1287     self.needed_locks[locking.LEVEL_NODE_RES] = []
1288     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1289     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1290
1291   def DeclareLocks(self, level):
1292     if level == locking.LEVEL_NODE:
1293       self._LockInstancesNodes()
1294     elif level == locking.LEVEL_NODE_RES:
1295       # Copy node locks
1296       self.needed_locks[locking.LEVEL_NODE_RES] = \
1297         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1298
1299   def BuildHooksEnv(self):
1300     """Build hooks env.
1301
1302     This runs on the master, the primary and all the secondaries.
1303
1304     """
1305     env = {
1306       "DISK": self.op.disk,
1307       "AMOUNT": self.op.amount,
1308       "ABSOLUTE": self.op.absolute,
1309       }
1310     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1311     return env
1312
1313   def BuildHooksNodes(self):
1314     """Build hooks nodes.
1315
1316     """
1317     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1318     return (nl, nl)
1319
1320   def CheckPrereq(self):
1321     """Check prerequisites.
1322
1323     This checks that the instance is in the cluster.
1324
1325     """
1326     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1327     assert instance is not None, \
1328       "Cannot retrieve locked instance %s" % self.op.instance_name
1329     nodenames = list(instance.all_nodes)
1330     for node in nodenames:
1331       CheckNodeOnline(self, node)
1332
1333     self.instance = instance
1334
1335     if instance.disk_template not in constants.DTS_GROWABLE:
1336       raise errors.OpPrereqError("Instance's disk layout does not support"
1337                                  " growing", errors.ECODE_INVAL)
1338
1339     self.disk = instance.FindDisk(self.op.disk)
1340
1341     if self.op.absolute:
1342       self.target = self.op.amount
1343       self.delta = self.target - self.disk.size
1344       if self.delta < 0:
1345         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1346                                    "current disk size (%s)" %
1347                                    (utils.FormatUnit(self.target, "h"),
1348                                     utils.FormatUnit(self.disk.size, "h")),
1349                                    errors.ECODE_STATE)
1350     else:
1351       self.delta = self.op.amount
1352       self.target = self.disk.size + self.delta
1353       if self.delta < 0:
1354         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1355                                    utils.FormatUnit(self.delta, "h"),
1356                                    errors.ECODE_INVAL)
1357
1358     self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1359
1360   def _CheckDiskSpace(self, nodenames, req_vgspace):
1361     template = self.instance.disk_template
1362     if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1363       # TODO: check the free disk space for file, when that feature will be
1364       # supported
1365       nodes = map(self.cfg.GetNodeInfo, nodenames)
1366       es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1367                         nodes)
1368       if es_nodes:
1369         # With exclusive storage we need to something smarter than just looking
1370         # at free space; for now, let's simply abort the operation.
1371         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1372                                    " is enabled", errors.ECODE_STATE)
1373       CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1374
1375   def Exec(self, feedback_fn):
1376     """Execute disk grow.
1377
1378     """
1379     instance = self.instance
1380     disk = self.disk
1381
1382     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1383     assert (self.owned_locks(locking.LEVEL_NODE) ==
1384             self.owned_locks(locking.LEVEL_NODE_RES))
1385
1386     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1387
1388     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1389     if not disks_ok:
1390       raise errors.OpExecError("Cannot activate block device to grow")
1391
1392     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1393                 (self.op.disk, instance.name,
1394                  utils.FormatUnit(self.delta, "h"),
1395                  utils.FormatUnit(self.target, "h")))
1396
1397     # First run all grow ops in dry-run mode
1398     for node in instance.all_nodes:
1399       self.cfg.SetDiskID(disk, node)
1400       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1401                                            True, True)
1402       result.Raise("Dry-run grow request failed to node %s" % node)
1403
1404     if wipe_disks:
1405       # Get disk size from primary node for wiping
1406       result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1407       result.Raise("Failed to retrieve disk size from node '%s'" %
1408                    instance.primary_node)
1409
1410       (disk_size_in_bytes, ) = result.payload
1411
1412       if disk_size_in_bytes is None:
1413         raise errors.OpExecError("Failed to retrieve disk size from primary"
1414                                  " node '%s'" % instance.primary_node)
1415
1416       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1417
1418       assert old_disk_size >= disk.size, \
1419         ("Retrieved disk size too small (got %s, should be at least %s)" %
1420          (old_disk_size, disk.size))
1421     else:
1422       old_disk_size = None
1423
1424     # We know that (as far as we can test) operations across different
1425     # nodes will succeed, time to run it for real on the backing storage
1426     for node in instance.all_nodes:
1427       self.cfg.SetDiskID(disk, node)
1428       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1429                                            False, True)
1430       result.Raise("Grow request failed to node %s" % node)
1431
1432     # And now execute it for logical storage, on the primary node
1433     node = instance.primary_node
1434     self.cfg.SetDiskID(disk, node)
1435     result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1436                                          False, False)
1437     result.Raise("Grow request failed to node %s" % node)
1438
1439     disk.RecordGrow(self.delta)
1440     self.cfg.Update(instance, feedback_fn)
1441
1442     # Changes have been recorded, release node lock
1443     ReleaseLocks(self, locking.LEVEL_NODE)
1444
1445     # Downgrade lock while waiting for sync
1446     self.glm.downgrade(locking.LEVEL_INSTANCE)
1447
1448     assert wipe_disks ^ (old_disk_size is None)
1449
1450     if wipe_disks:
1451       assert instance.disks[self.op.disk] == disk
1452
1453       # Wipe newly added disk space
1454       WipeDisks(self, instance,
1455                 disks=[(self.op.disk, disk, old_disk_size)])
1456
1457     if self.op.wait_for_sync:
1458       disk_abort = not WaitForSync(self, instance, disks=[disk])
1459       if disk_abort:
1460         self.LogWarning("Disk syncing has not returned a good status; check"
1461                         " the instance")
1462       if instance.admin_state != constants.ADMINST_UP:
1463         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1464     elif instance.admin_state != constants.ADMINST_UP:
1465       self.LogWarning("Not shutting down the disk even if the instance is"
1466                       " not supposed to be running because no wait for"
1467                       " sync mode was requested")
1468
1469     assert self.owned_locks(locking.LEVEL_NODE_RES)
1470     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1471
1472
1473 class LUInstanceReplaceDisks(LogicalUnit):
1474   """Replace the disks of an instance.
1475
1476   """
1477   HPATH = "mirrors-replace"
1478   HTYPE = constants.HTYPE_INSTANCE
1479   REQ_BGL = False
1480
1481   def CheckArguments(self):
1482     """Check arguments.
1483
1484     """
1485     remote_node = self.op.remote_node
1486     ialloc = self.op.iallocator
1487     if self.op.mode == constants.REPLACE_DISK_CHG:
1488       if remote_node is None and ialloc is None:
1489         raise errors.OpPrereqError("When changing the secondary either an"
1490                                    " iallocator script must be used or the"
1491                                    " new node given", errors.ECODE_INVAL)
1492       else:
1493         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1494
1495     elif remote_node is not None or ialloc is not None:
1496       # Not replacing the secondary
1497       raise errors.OpPrereqError("The iallocator and new node options can"
1498                                  " only be used when changing the"
1499                                  " secondary node", errors.ECODE_INVAL)
1500
1501   def ExpandNames(self):
1502     self._ExpandAndLockInstance()
1503
1504     assert locking.LEVEL_NODE not in self.needed_locks
1505     assert locking.LEVEL_NODE_RES not in self.needed_locks
1506     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1507
1508     assert self.op.iallocator is None or self.op.remote_node is None, \
1509       "Conflicting options"
1510
1511     if self.op.remote_node is not None:
1512       self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1513
1514       # Warning: do not remove the locking of the new secondary here
1515       # unless DRBD8Dev.AddChildren is changed to work in parallel;
1516       # currently it doesn't since parallel invocations of
1517       # FindUnusedMinor will conflict
1518       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1519       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1520     else:
1521       self.needed_locks[locking.LEVEL_NODE] = []
1522       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1523
1524       if self.op.iallocator is not None:
1525         # iallocator will select a new node in the same group
1526         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1527         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1528
1529     self.needed_locks[locking.LEVEL_NODE_RES] = []
1530
1531     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1532                                    self.op.iallocator, self.op.remote_node,
1533                                    self.op.disks, self.op.early_release,
1534                                    self.op.ignore_ipolicy)
1535
1536     self.tasklets = [self.replacer]
1537
1538   def DeclareLocks(self, level):
1539     if level == locking.LEVEL_NODEGROUP:
1540       assert self.op.remote_node is None
1541       assert self.op.iallocator is not None
1542       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1543
1544       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1545       # Lock all groups used by instance optimistically; this requires going
1546       # via the node before it's locked, requiring verification later on
1547       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1548         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1549
1550     elif level == locking.LEVEL_NODE:
1551       if self.op.iallocator is not None:
1552         assert self.op.remote_node is None
1553         assert not self.needed_locks[locking.LEVEL_NODE]
1554         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1555
1556         # Lock member nodes of all locked groups
1557         self.needed_locks[locking.LEVEL_NODE] = \
1558           [node_name
1559            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1560            for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1561       else:
1562         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1563
1564         self._LockInstancesNodes()
1565
1566     elif level == locking.LEVEL_NODE_RES:
1567       # Reuse node locks
1568       self.needed_locks[locking.LEVEL_NODE_RES] = \
1569         self.needed_locks[locking.LEVEL_NODE]
1570
1571   def BuildHooksEnv(self):
1572     """Build hooks env.
1573
1574     This runs on the master, the primary and all the secondaries.
1575
1576     """
1577     instance = self.replacer.instance
1578     env = {
1579       "MODE": self.op.mode,
1580       "NEW_SECONDARY": self.op.remote_node,
1581       "OLD_SECONDARY": instance.secondary_nodes[0],
1582       }
1583     env.update(BuildInstanceHookEnvByObject(self, instance))
1584     return env
1585
1586   def BuildHooksNodes(self):
1587     """Build hooks nodes.
1588
1589     """
1590     instance = self.replacer.instance
1591     nl = [
1592       self.cfg.GetMasterNode(),
1593       instance.primary_node,
1594       ]
1595     if self.op.remote_node is not None:
1596       nl.append(self.op.remote_node)
1597     return nl, nl
1598
1599   def CheckPrereq(self):
1600     """Check prerequisites.
1601
1602     """
1603     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1604             self.op.iallocator is None)
1605
1606     # Verify if node group locks are still correct
1607     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1608     if owned_groups:
1609       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1610
1611     return LogicalUnit.CheckPrereq(self)
1612
1613
1614 class LUInstanceActivateDisks(NoHooksLU):
1615   """Bring up an instance's disks.
1616
1617   """
1618   REQ_BGL = False
1619
1620   def ExpandNames(self):
1621     self._ExpandAndLockInstance()
1622     self.needed_locks[locking.LEVEL_NODE] = []
1623     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1624
1625   def DeclareLocks(self, level):
1626     if level == locking.LEVEL_NODE:
1627       self._LockInstancesNodes()
1628
1629   def CheckPrereq(self):
1630     """Check prerequisites.
1631
1632     This checks that the instance is in the cluster.
1633
1634     """
1635     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1636     assert self.instance is not None, \
1637       "Cannot retrieve locked instance %s" % self.op.instance_name
1638     CheckNodeOnline(self, self.instance.primary_node)
1639
1640   def Exec(self, feedback_fn):
1641     """Activate the disks.
1642
1643     """
1644     disks_ok, disks_info = \
1645               AssembleInstanceDisks(self, self.instance,
1646                                     ignore_size=self.op.ignore_size)
1647     if not disks_ok:
1648       raise errors.OpExecError("Cannot activate block devices")
1649
1650     if self.op.wait_for_sync:
1651       if not WaitForSync(self, self.instance):
1652         raise errors.OpExecError("Some disks of the instance are degraded!")
1653
1654     return disks_info
1655
1656
1657 class LUInstanceDeactivateDisks(NoHooksLU):
1658   """Shutdown an instance's disks.
1659
1660   """
1661   REQ_BGL = False
1662
1663   def ExpandNames(self):
1664     self._ExpandAndLockInstance()
1665     self.needed_locks[locking.LEVEL_NODE] = []
1666     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1667
1668   def DeclareLocks(self, level):
1669     if level == locking.LEVEL_NODE:
1670       self._LockInstancesNodes()
1671
1672   def CheckPrereq(self):
1673     """Check prerequisites.
1674
1675     This checks that the instance is in the cluster.
1676
1677     """
1678     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1679     assert self.instance is not None, \
1680       "Cannot retrieve locked instance %s" % self.op.instance_name
1681
1682   def Exec(self, feedback_fn):
1683     """Deactivate the disks
1684
1685     """
1686     instance = self.instance
1687     if self.op.force:
1688       ShutdownInstanceDisks(self, instance)
1689     else:
1690       _SafeShutdownInstanceDisks(self, instance)
1691
1692
1693 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1694                                ldisk=False):
1695   """Check that mirrors are not degraded.
1696
1697   @attention: The device has to be annotated already.
1698
1699   The ldisk parameter, if True, will change the test from the
1700   is_degraded attribute (which represents overall non-ok status for
1701   the device(s)) to the ldisk (representing the local storage status).
1702
1703   """
1704   lu.cfg.SetDiskID(dev, node)
1705
1706   result = True
1707
1708   if on_primary or dev.AssembleOnSecondary():
1709     rstats = lu.rpc.call_blockdev_find(node, dev)
1710     msg = rstats.fail_msg
1711     if msg:
1712       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1713       result = False
1714     elif not rstats.payload:
1715       lu.LogWarning("Can't find disk on node %s", node)
1716       result = False
1717     else:
1718       if ldisk:
1719         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1720       else:
1721         result = result and not rstats.payload.is_degraded
1722
1723   if dev.children:
1724     for child in dev.children:
1725       result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1726                                                      on_primary)
1727
1728   return result
1729
1730
1731 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1732   """Wrapper around L{_CheckDiskConsistencyInner}.
1733
1734   """
1735   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1736   return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1737                                     ldisk=ldisk)
1738
1739
1740 def _BlockdevFind(lu, node, dev, instance):
1741   """Wrapper around call_blockdev_find to annotate diskparams.
1742
1743   @param lu: A reference to the lu object
1744   @param node: The node to call out
1745   @param dev: The device to find
1746   @param instance: The instance object the device belongs to
1747   @returns The result of the rpc call
1748
1749   """
1750   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1751   return lu.rpc.call_blockdev_find(node, disk)
1752
1753
1754 def _GenerateUniqueNames(lu, exts):
1755   """Generate a suitable LV name.
1756
1757   This will generate a logical volume name for the given instance.
1758
1759   """
1760   results = []
1761   for val in exts:
1762     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1763     results.append("%s%s" % (new_id, val))
1764   return results
1765
1766
1767 class TLReplaceDisks(Tasklet):
1768   """Replaces disks for an instance.
1769
1770   Note: Locking is not within the scope of this class.
1771
1772   """
1773   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1774                disks, early_release, ignore_ipolicy):
1775     """Initializes this class.
1776
1777     """
1778     Tasklet.__init__(self, lu)
1779
1780     # Parameters
1781     self.instance_name = instance_name
1782     self.mode = mode
1783     self.iallocator_name = iallocator_name
1784     self.remote_node = remote_node
1785     self.disks = disks
1786     self.early_release = early_release
1787     self.ignore_ipolicy = ignore_ipolicy
1788
1789     # Runtime data
1790     self.instance = None
1791     self.new_node = None
1792     self.target_node = None
1793     self.other_node = None
1794     self.remote_node_info = None
1795     self.node_secondary_ip = None
1796
1797   @staticmethod
1798   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1799     """Compute a new secondary node using an IAllocator.
1800
1801     """
1802     req = iallocator.IAReqRelocate(name=instance_name,
1803                                    relocate_from=list(relocate_from))
1804     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1805
1806     ial.Run(iallocator_name)
1807
1808     if not ial.success:
1809       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1810                                  " %s" % (iallocator_name, ial.info),
1811                                  errors.ECODE_NORES)
1812
1813     remote_node_name = ial.result[0]
1814
1815     lu.LogInfo("Selected new secondary for instance '%s': %s",
1816                instance_name, remote_node_name)
1817
1818     return remote_node_name
1819
1820   def _FindFaultyDisks(self, node_name):
1821     """Wrapper for L{FindFaultyInstanceDisks}.
1822
1823     """
1824     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1825                                    node_name, True)
1826
1827   def _CheckDisksActivated(self, instance):
1828     """Checks if the instance disks are activated.
1829
1830     @param instance: The instance to check disks
1831     @return: True if they are activated, False otherwise
1832
1833     """
1834     nodes = instance.all_nodes
1835
1836     for idx, dev in enumerate(instance.disks):
1837       for node in nodes:
1838         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1839         self.cfg.SetDiskID(dev, node)
1840
1841         result = _BlockdevFind(self, node, dev, instance)
1842
1843         if result.offline:
1844           continue
1845         elif result.fail_msg or not result.payload:
1846           return False
1847
1848     return True
1849
1850   def CheckPrereq(self):
1851     """Check prerequisites.
1852
1853     This checks that the instance is in the cluster.
1854
1855     """
1856     self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1857     assert instance is not None, \
1858       "Cannot retrieve locked instance %s" % self.instance_name
1859
1860     if instance.disk_template != constants.DT_DRBD8:
1861       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1862                                  " instances", errors.ECODE_INVAL)
1863
1864     if len(instance.secondary_nodes) != 1:
1865       raise errors.OpPrereqError("The instance has a strange layout,"
1866                                  " expected one secondary but found %d" %
1867                                  len(instance.secondary_nodes),
1868                                  errors.ECODE_FAULT)
1869
1870     instance = self.instance
1871     secondary_node = instance.secondary_nodes[0]
1872
1873     if self.iallocator_name is None:
1874       remote_node = self.remote_node
1875     else:
1876       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1877                                        instance.name, instance.secondary_nodes)
1878
1879     if remote_node is None:
1880       self.remote_node_info = None
1881     else:
1882       assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1883              "Remote node '%s' is not locked" % remote_node
1884
1885       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1886       assert self.remote_node_info is not None, \
1887         "Cannot retrieve locked node %s" % remote_node
1888
1889     if remote_node == self.instance.primary_node:
1890       raise errors.OpPrereqError("The specified node is the primary node of"
1891                                  " the instance", errors.ECODE_INVAL)
1892
1893     if remote_node == secondary_node:
1894       raise errors.OpPrereqError("The specified node is already the"
1895                                  " secondary node of the instance",
1896                                  errors.ECODE_INVAL)
1897
1898     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1899                                     constants.REPLACE_DISK_CHG):
1900       raise errors.OpPrereqError("Cannot specify disks to be replaced",
1901                                  errors.ECODE_INVAL)
1902
1903     if self.mode == constants.REPLACE_DISK_AUTO:
1904       if not self._CheckDisksActivated(instance):
1905         raise errors.OpPrereqError("Please run activate-disks on instance %s"
1906                                    " first" % self.instance_name,
1907                                    errors.ECODE_STATE)
1908       faulty_primary = self._FindFaultyDisks(instance.primary_node)
1909       faulty_secondary = self._FindFaultyDisks(secondary_node)
1910
1911       if faulty_primary and faulty_secondary:
1912         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1913                                    " one node and can not be repaired"
1914                                    " automatically" % self.instance_name,
1915                                    errors.ECODE_STATE)
1916
1917       if faulty_primary:
1918         self.disks = faulty_primary
1919         self.target_node = instance.primary_node
1920         self.other_node = secondary_node
1921         check_nodes = [self.target_node, self.other_node]
1922       elif faulty_secondary:
1923         self.disks = faulty_secondary
1924         self.target_node = secondary_node
1925         self.other_node = instance.primary_node
1926         check_nodes = [self.target_node, self.other_node]
1927       else:
1928         self.disks = []
1929         check_nodes = []
1930
1931     else:
1932       # Non-automatic modes
1933       if self.mode == constants.REPLACE_DISK_PRI:
1934         self.target_node = instance.primary_node
1935         self.other_node = secondary_node
1936         check_nodes = [self.target_node, self.other_node]
1937
1938       elif self.mode == constants.REPLACE_DISK_SEC:
1939         self.target_node = secondary_node
1940         self.other_node = instance.primary_node
1941         check_nodes = [self.target_node, self.other_node]
1942
1943       elif self.mode == constants.REPLACE_DISK_CHG:
1944         self.new_node = remote_node
1945         self.other_node = instance.primary_node
1946         self.target_node = secondary_node
1947         check_nodes = [self.new_node, self.other_node]
1948
1949         CheckNodeNotDrained(self.lu, remote_node)
1950         CheckNodeVmCapable(self.lu, remote_node)
1951
1952         old_node_info = self.cfg.GetNodeInfo(secondary_node)
1953         assert old_node_info is not None
1954         if old_node_info.offline and not self.early_release:
1955           # doesn't make sense to delay the release
1956           self.early_release = True
1957           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1958                           " early-release mode", secondary_node)
1959
1960       else:
1961         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1962                                      self.mode)
1963
1964       # If not specified all disks should be replaced
1965       if not self.disks:
1966         self.disks = range(len(self.instance.disks))
1967
1968     # TODO: This is ugly, but right now we can't distinguish between internal
1969     # submitted opcode and external one. We should fix that.
1970     if self.remote_node_info:
1971       # We change the node, lets verify it still meets instance policy
1972       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1973       cluster = self.cfg.GetClusterInfo()
1974       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1975                                                               new_group_info)
1976       CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1977                              self.cfg, ignore=self.ignore_ipolicy)
1978
1979     for node in check_nodes:
1980       CheckNodeOnline(self.lu, node)
1981
1982     touched_nodes = frozenset(node_name for node_name in [self.new_node,
1983                                                           self.other_node,
1984                                                           self.target_node]
1985                               if node_name is not None)
1986
1987     # Release unneeded node and node resource locks
1988     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
1989     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
1990     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
1991
1992     # Release any owned node group
1993     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
1994
1995     # Check whether disks are valid
1996     for disk_idx in self.disks:
1997       instance.FindDisk(disk_idx)
1998
1999     # Get secondary node IP addresses
2000     self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2001                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2002
2003   def Exec(self, feedback_fn):
2004     """Execute disk replacement.
2005
2006     This dispatches the disk replacement to the appropriate handler.
2007
2008     """
2009     if __debug__:
2010       # Verify owned locks before starting operation
2011       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2012       assert set(owned_nodes) == set(self.node_secondary_ip), \
2013           ("Incorrect node locks, owning %s, expected %s" %
2014            (owned_nodes, self.node_secondary_ip.keys()))
2015       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2016               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2017       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2018
2019       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2020       assert list(owned_instances) == [self.instance_name], \
2021           "Instance '%s' not locked" % self.instance_name
2022
2023       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2024           "Should not own any node group lock at this point"
2025
2026     if not self.disks:
2027       feedback_fn("No disks need replacement for instance '%s'" %
2028                   self.instance.name)
2029       return
2030
2031     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2032                 (utils.CommaJoin(self.disks), self.instance.name))
2033     feedback_fn("Current primary node: %s" % self.instance.primary_node)
2034     feedback_fn("Current seconary node: %s" %
2035                 utils.CommaJoin(self.instance.secondary_nodes))
2036
2037     activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2038
2039     # Activate the instance disks if we're replacing them on a down instance
2040     if activate_disks:
2041       StartInstanceDisks(self.lu, self.instance, True)
2042
2043     try:
2044       # Should we replace the secondary node?
2045       if self.new_node is not None:
2046         fn = self._ExecDrbd8Secondary
2047       else:
2048         fn = self._ExecDrbd8DiskOnly
2049
2050       result = fn(feedback_fn)
2051     finally:
2052       # Deactivate the instance disks if we're replacing them on a
2053       # down instance
2054       if activate_disks:
2055         _SafeShutdownInstanceDisks(self.lu, self.instance)
2056
2057     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2058
2059     if __debug__:
2060       # Verify owned locks
2061       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2062       nodes = frozenset(self.node_secondary_ip)
2063       assert ((self.early_release and not owned_nodes) or
2064               (not self.early_release and not (set(owned_nodes) - nodes))), \
2065         ("Not owning the correct locks, early_release=%s, owned=%r,"
2066          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2067
2068     return result
2069
2070   def _CheckVolumeGroup(self, nodes):
2071     self.lu.LogInfo("Checking volume groups")
2072
2073     vgname = self.cfg.GetVGName()
2074
2075     # Make sure volume group exists on all involved nodes
2076     results = self.rpc.call_vg_list(nodes)
2077     if not results:
2078       raise errors.OpExecError("Can't list volume groups on the nodes")
2079
2080     for node in nodes:
2081       res = results[node]
2082       res.Raise("Error checking node %s" % node)
2083       if vgname not in res.payload:
2084         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2085                                  (vgname, node))
2086
2087   def _CheckDisksExistence(self, nodes):
2088     # Check disk existence
2089     for idx, dev in enumerate(self.instance.disks):
2090       if idx not in self.disks:
2091         continue
2092
2093       for node in nodes:
2094         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2095         self.cfg.SetDiskID(dev, node)
2096
2097         result = _BlockdevFind(self, node, dev, self.instance)
2098
2099         msg = result.fail_msg
2100         if msg or not result.payload:
2101           if not msg:
2102             msg = "disk not found"
2103           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2104                                    (idx, node, msg))
2105
2106   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2107     for idx, dev in enumerate(self.instance.disks):
2108       if idx not in self.disks:
2109         continue
2110
2111       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2112                       (idx, node_name))
2113
2114       if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2115                                   on_primary, ldisk=ldisk):
2116         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2117                                  " replace disks for instance %s" %
2118                                  (node_name, self.instance.name))
2119
2120   def _CreateNewStorage(self, node_name):
2121     """Create new storage on the primary or secondary node.
2122
2123     This is only used for same-node replaces, not for changing the
2124     secondary node, hence we don't want to modify the existing disk.
2125
2126     """
2127     iv_names = {}
2128
2129     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2130     for idx, dev in enumerate(disks):
2131       if idx not in self.disks:
2132         continue
2133
2134       self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2135
2136       self.cfg.SetDiskID(dev, node_name)
2137
2138       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2139       names = _GenerateUniqueNames(self.lu, lv_names)
2140
2141       (data_disk, meta_disk) = dev.children
2142       vg_data = data_disk.logical_id[0]
2143       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2144                              logical_id=(vg_data, names[0]),
2145                              params=data_disk.params)
2146       vg_meta = meta_disk.logical_id[0]
2147       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2148                              size=constants.DRBD_META_SIZE,
2149                              logical_id=(vg_meta, names[1]),
2150                              params=meta_disk.params)
2151
2152       new_lvs = [lv_data, lv_meta]
2153       old_lvs = [child.Copy() for child in dev.children]
2154       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2155       excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2156
2157       # we pass force_create=True to force the LVM creation
2158       for new_lv in new_lvs:
2159         _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2160                              GetInstanceInfoText(self.instance), False,
2161                              excl_stor)
2162
2163     return iv_names
2164
2165   def _CheckDevices(self, node_name, iv_names):
2166     for name, (dev, _, _) in iv_names.iteritems():
2167       self.cfg.SetDiskID(dev, node_name)
2168
2169       result = _BlockdevFind(self, node_name, dev, self.instance)
2170
2171       msg = result.fail_msg
2172       if msg or not result.payload:
2173         if not msg:
2174           msg = "disk not found"
2175         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2176                                  (name, msg))
2177
2178       if result.payload.is_degraded:
2179         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2180
2181   def _RemoveOldStorage(self, node_name, iv_names):
2182     for name, (_, old_lvs, _) in iv_names.iteritems():
2183       self.lu.LogInfo("Remove logical volumes for %s", name)
2184
2185       for lv in old_lvs:
2186         self.cfg.SetDiskID(lv, node_name)
2187
2188         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2189         if msg:
2190           self.lu.LogWarning("Can't remove old LV: %s", msg,
2191                              hint="remove unused LVs manually")
2192
2193   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2194     """Replace a disk on the primary or secondary for DRBD 8.
2195
2196     The algorithm for replace is quite complicated:
2197
2198       1. for each disk to be replaced:
2199
2200         1. create new LVs on the target node with unique names
2201         1. detach old LVs from the drbd device
2202         1. rename old LVs to name_replaced.<time_t>
2203         1. rename new LVs to old LVs
2204         1. attach the new LVs (with the old names now) to the drbd device
2205
2206       1. wait for sync across all devices
2207
2208       1. for each modified disk:
2209
2210         1. remove old LVs (which have the name name_replaces.<time_t>)
2211
2212     Failures are not very well handled.
2213
2214     """
2215     steps_total = 6
2216
2217     # Step: check device activation
2218     self.lu.LogStep(1, steps_total, "Check device existence")
2219     self._CheckDisksExistence([self.other_node, self.target_node])
2220     self._CheckVolumeGroup([self.target_node, self.other_node])
2221
2222     # Step: check other node consistency
2223     self.lu.LogStep(2, steps_total, "Check peer consistency")
2224     self._CheckDisksConsistency(self.other_node,
2225                                 self.other_node == self.instance.primary_node,
2226                                 False)
2227
2228     # Step: create new storage
2229     self.lu.LogStep(3, steps_total, "Allocate new storage")
2230     iv_names = self._CreateNewStorage(self.target_node)
2231
2232     # Step: for each lv, detach+rename*2+attach
2233     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2234     for dev, old_lvs, new_lvs in iv_names.itervalues():
2235       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2236
2237       result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2238                                                      old_lvs)
2239       result.Raise("Can't detach drbd from local storage on node"
2240                    " %s for device %s" % (self.target_node, dev.iv_name))
2241       #dev.children = []
2242       #cfg.Update(instance)
2243
2244       # ok, we created the new LVs, so now we know we have the needed
2245       # storage; as such, we proceed on the target node to rename
2246       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2247       # using the assumption that logical_id == physical_id (which in
2248       # turn is the unique_id on that node)
2249
2250       # FIXME(iustin): use a better name for the replaced LVs
2251       temp_suffix = int(time.time())
2252       ren_fn = lambda d, suff: (d.physical_id[0],
2253                                 d.physical_id[1] + "_replaced-%s" % suff)
2254
2255       # Build the rename list based on what LVs exist on the node
2256       rename_old_to_new = []
2257       for to_ren in old_lvs:
2258         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2259         if not result.fail_msg and result.payload:
2260           # device exists
2261           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2262
2263       self.lu.LogInfo("Renaming the old LVs on the target node")
2264       result = self.rpc.call_blockdev_rename(self.target_node,
2265                                              rename_old_to_new)
2266       result.Raise("Can't rename old LVs on node %s" % self.target_node)
2267
2268       # Now we rename the new LVs to the old LVs
2269       self.lu.LogInfo("Renaming the new LVs on the target node")
2270       rename_new_to_old = [(new, old.physical_id)
2271                            for old, new in zip(old_lvs, new_lvs)]
2272       result = self.rpc.call_blockdev_rename(self.target_node,
2273                                              rename_new_to_old)
2274       result.Raise("Can't rename new LVs on node %s" % self.target_node)
2275
2276       # Intermediate steps of in memory modifications
2277       for old, new in zip(old_lvs, new_lvs):
2278         new.logical_id = old.logical_id
2279         self.cfg.SetDiskID(new, self.target_node)
2280
2281       # We need to modify old_lvs so that removal later removes the
2282       # right LVs, not the newly added ones; note that old_lvs is a
2283       # copy here
2284       for disk in old_lvs:
2285         disk.logical_id = ren_fn(disk, temp_suffix)
2286         self.cfg.SetDiskID(disk, self.target_node)
2287
2288       # Now that the new lvs have the old name, we can add them to the device
2289       self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2290       result = self.rpc.call_blockdev_addchildren(self.target_node,
2291                                                   (dev, self.instance), new_lvs)
2292       msg = result.fail_msg
2293       if msg:
2294         for new_lv in new_lvs:
2295           msg2 = self.rpc.call_blockdev_remove(self.target_node,
2296                                                new_lv).fail_msg
2297           if msg2:
2298             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2299                                hint=("cleanup manually the unused logical"
2300                                      "volumes"))
2301         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2302
2303     cstep = itertools.count(5)
2304
2305     if self.early_release:
2306       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2307       self._RemoveOldStorage(self.target_node, iv_names)
2308       # TODO: Check if releasing locks early still makes sense
2309       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2310     else:
2311       # Release all resource locks except those used by the instance
2312       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2313                    keep=self.node_secondary_ip.keys())
2314
2315     # Release all node locks while waiting for sync
2316     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2317
2318     # TODO: Can the instance lock be downgraded here? Take the optional disk
2319     # shutdown in the caller into consideration.
2320
2321     # Wait for sync
2322     # This can fail as the old devices are degraded and _WaitForSync
2323     # does a combined result over all disks, so we don't check its return value
2324     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2325     WaitForSync(self.lu, self.instance)
2326
2327     # Check all devices manually
2328     self._CheckDevices(self.instance.primary_node, iv_names)
2329
2330     # Step: remove old storage
2331     if not self.early_release:
2332       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2333       self._RemoveOldStorage(self.target_node, iv_names)
2334
2335   def _ExecDrbd8Secondary(self, feedback_fn):
2336     """Replace the secondary node for DRBD 8.
2337
2338     The algorithm for replace is quite complicated:
2339       - for all disks of the instance:
2340         - create new LVs on the new node with same names
2341         - shutdown the drbd device on the old secondary
2342         - disconnect the drbd network on the primary
2343         - create the drbd device on the new secondary
2344         - network attach the drbd on the primary, using an artifice:
2345           the drbd code for Attach() will connect to the network if it
2346           finds a device which is connected to the good local disks but
2347           not network enabled
2348       - wait for sync across all devices
2349       - remove all disks from the old secondary
2350
2351     Failures are not very well handled.
2352
2353     """
2354     steps_total = 6
2355
2356     pnode = self.instance.primary_node
2357
2358     # Step: check device activation
2359     self.lu.LogStep(1, steps_total, "Check device existence")
2360     self._CheckDisksExistence([self.instance.primary_node])
2361     self._CheckVolumeGroup([self.instance.primary_node])
2362
2363     # Step: check other node consistency
2364     self.lu.LogStep(2, steps_total, "Check peer consistency")
2365     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2366
2367     # Step: create new storage
2368     self.lu.LogStep(3, steps_total, "Allocate new storage")
2369     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2370     excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2371     for idx, dev in enumerate(disks):
2372       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2373                       (self.new_node, idx))
2374       # we pass force_create=True to force LVM creation
2375       for new_lv in dev.children:
2376         _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2377                              True, GetInstanceInfoText(self.instance), False,
2378                              excl_stor)
2379
2380     # Step 4: dbrd minors and drbd setups changes
2381     # after this, we must manually remove the drbd minors on both the
2382     # error and the success paths
2383     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2384     minors = self.cfg.AllocateDRBDMinor([self.new_node
2385                                          for dev in self.instance.disks],
2386                                         self.instance.name)
2387     logging.debug("Allocated minors %r", minors)
2388
2389     iv_names = {}
2390     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2391       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2392                       (self.new_node, idx))
2393       # create new devices on new_node; note that we create two IDs:
2394       # one without port, so the drbd will be activated without
2395       # networking information on the new node at this stage, and one
2396       # with network, for the latter activation in step 4
2397       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2398       if self.instance.primary_node == o_node1:
2399         p_minor = o_minor1
2400       else:
2401         assert self.instance.primary_node == o_node2, "Three-node instance?"
2402         p_minor = o_minor2
2403
2404       new_alone_id = (self.instance.primary_node, self.new_node, None,
2405                       p_minor, new_minor, o_secret)
2406       new_net_id = (self.instance.primary_node, self.new_node, o_port,
2407                     p_minor, new_minor, o_secret)
2408
2409       iv_names[idx] = (dev, dev.children, new_net_id)
2410       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2411                     new_net_id)
2412       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2413                               logical_id=new_alone_id,
2414                               children=dev.children,
2415                               size=dev.size,
2416                               params={})
2417       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2418                                             self.cfg)
2419       try:
2420         CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2421                              anno_new_drbd,
2422                              GetInstanceInfoText(self.instance), False,
2423                              excl_stor)
2424       except errors.GenericError:
2425         self.cfg.ReleaseDRBDMinors(self.instance.name)
2426         raise
2427
2428     # We have new devices, shutdown the drbd on the old secondary
2429     for idx, dev in enumerate(self.instance.disks):
2430       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2431       self.cfg.SetDiskID(dev, self.target_node)
2432       msg = self.rpc.call_blockdev_shutdown(self.target_node,
2433                                             (dev, self.instance)).fail_msg
2434       if msg:
2435         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2436                            "node: %s" % (idx, msg),
2437                            hint=("Please cleanup this device manually as"
2438                                  " soon as possible"))
2439
2440     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2441     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2442                                                self.instance.disks)[pnode]
2443
2444     msg = result.fail_msg
2445     if msg:
2446       # detaches didn't succeed (unlikely)
2447       self.cfg.ReleaseDRBDMinors(self.instance.name)
2448       raise errors.OpExecError("Can't detach the disks from the network on"
2449                                " old node: %s" % (msg,))
2450
2451     # if we managed to detach at least one, we update all the disks of
2452     # the instance to point to the new secondary
2453     self.lu.LogInfo("Updating instance configuration")
2454     for dev, _, new_logical_id in iv_names.itervalues():
2455       dev.logical_id = new_logical_id
2456       self.cfg.SetDiskID(dev, self.instance.primary_node)
2457
2458     self.cfg.Update(self.instance, feedback_fn)
2459
2460     # Release all node locks (the configuration has been updated)
2461     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2462
2463     # and now perform the drbd attach
2464     self.lu.LogInfo("Attaching primary drbds to new secondary"
2465                     " (standalone => connected)")
2466     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2467                                             self.new_node],
2468                                            self.node_secondary_ip,
2469                                            (self.instance.disks, self.instance),
2470                                            self.instance.name,
2471                                            False)
2472     for to_node, to_result in result.items():
2473       msg = to_result.fail_msg
2474       if msg:
2475         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2476                            to_node, msg,
2477                            hint=("please do a gnt-instance info to see the"
2478                                  " status of disks"))
2479
2480     cstep = itertools.count(5)
2481
2482     if self.early_release:
2483       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2484       self._RemoveOldStorage(self.target_node, iv_names)
2485       # TODO: Check if releasing locks early still makes sense
2486       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2487     else:
2488       # Release all resource locks except those used by the instance
2489       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2490                    keep=self.node_secondary_ip.keys())
2491
2492     # TODO: Can the instance lock be downgraded here? Take the optional disk
2493     # shutdown in the caller into consideration.
2494
2495     # Wait for sync
2496     # This can fail as the old devices are degraded and _WaitForSync
2497     # does a combined result over all disks, so we don't check its return value
2498     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2499     WaitForSync(self.lu, self.instance)
2500
2501     # Check all devices manually
2502     self._CheckDevices(self.instance.primary_node, iv_names)
2503
2504     # Step: remove old storage
2505     if not self.early_release:
2506       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2507       self._RemoveOldStorage(self.target_node, iv_names)