(grnet) Remove deprecated idx slot from NIC/Disk objects
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node)
93   result = lu.rpc.call_blockdev_create(node, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device, node, instance.name))
98   if device.physical_id is None:
99     device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103                          info, force_open, excl_stor):
104   """Create a tree of block devices on a given node.
105
106   If this device type has to be created on secondaries, create it and
107   all its children.
108
109   If not, just recurse to children keeping the same 'force' value.
110
111   @attention: The device has to be annotated already.
112
113   @param lu: the lu on whose behalf we execute
114   @param node: the node on which to create the device
115   @type instance: L{objects.Instance}
116   @param instance: the instance which owns the device
117   @type device: L{objects.Disk}
118   @param device: the device to create
119   @type force_create: boolean
120   @param force_create: whether to force creation of this device; this
121       will be change to True whenever we find a device which has
122       CreateOnSecondary() attribute
123   @param info: the extra 'metadata' we should attach to the device
124       (this will be represented as a LVM tag)
125   @type force_open: boolean
126   @param force_open: this parameter will be passes to the
127       L{backend.BlockdevCreate} function where it specifies
128       whether we run on primary or not, and it affects both
129       the child assembly and the device own Open() execution
130   @type excl_stor: boolean
131   @param excl_stor: Whether exclusive_storage is active for the node
132
133   @return: list of created devices
134   """
135   created_devices = []
136   try:
137     if device.CreateOnSecondary():
138       force_create = True
139
140     if device.children:
141       for child in device.children:
142         devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143                                     info, force_open, excl_stor)
144         created_devices.extend(devs)
145
146     if not force_create:
147       return created_devices
148
149     CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150                          excl_stor)
151     # The device has been completely created, so there is no point in keeping
152     # its subdevices in the list. We just add the device itself instead.
153     created_devices = [(node, device)]
154     return created_devices
155
156   except errors.DeviceCreationError, e:
157     e.created_devices.extend(created_devices)
158     raise e
159   except errors.OpExecError, e:
160     raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164   """Whether exclusive_storage is in effect for the given node.
165
166   @type cfg: L{config.ConfigWriter}
167   @param cfg: The cluster configuration
168   @type nodename: string
169   @param nodename: The node
170   @rtype: bool
171   @return: The effective value of exclusive_storage
172   @raise errors.OpPrereqError: if no node exists with the given name
173
174   """
175   ni = cfg.GetNodeInfo(nodename)
176   if ni is None:
177     raise errors.OpPrereqError("Invalid node name %s" % nodename,
178                                errors.ECODE_NOENT)
179   return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
183                     force_open):
184   """Wrapper around L{_CreateBlockDevInner}.
185
186   This method annotates the root device first.
187
188   """
189   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190   excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192                               force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created):
196   """Undo the work performed by L{CreateDisks}.
197
198   This function is called in case of an error to undo the work of
199   L{CreateDisks}.
200
201   @type lu: L{LogicalUnit}
202   @param lu: the logical unit on whose behalf we execute
203   @param disks_created: the result returned by L{CreateDisks}
204
205   """
206   for (node, disk) in disks_created:
207     lu.cfg.SetDiskID(disk, node)
208     result = lu.rpc.call_blockdev_remove(node, disk)
209     if result.fail_msg:
210       logging.warning("Failed to remove newly-created disk %s on node %s:"
211                       " %s", disk, node, result.fail_msg)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215   """Create all disks for an instance.
216
217   This abstracts away some work from AddInstance.
218
219   @type lu: L{LogicalUnit}
220   @param lu: the logical unit on whose behalf we execute
221   @type instance: L{objects.Instance}
222   @param instance: the instance whose disks we should create
223   @type to_skip: list
224   @param to_skip: list of indices to skip
225   @type target_node: string
226   @param target_node: if passed, overrides the target node for creation
227   @type disks: list of {objects.Disk}
228   @param disks: the disks to create; if not specified, all the disks of the
229       instance are created
230   @return: information about the created disks, to be used to call
231       L{_UndoCreateDisks}
232   @raise errors.OpPrereqError: in case of error
233
234   """
235   info = GetInstanceInfoText(instance)
236   if target_node is None:
237     pnode = instance.primary_node
238     all_nodes = instance.all_nodes
239   else:
240     pnode = target_node
241     all_nodes = [pnode]
242
243   if disks is None:
244     disks = instance.disks
245
246   if instance.disk_template in constants.DTS_FILEBASED:
247     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249
250     result.Raise("Failed to create directory '%s' on"
251                  " node %s" % (file_storage_dir, pnode))
252
253   disks_created = []
254   for idx, device in enumerate(disks):
255     if to_skip and idx in to_skip:
256       continue
257     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258     for node in all_nodes:
259       f_create = node == pnode
260       try:
261         _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262         disks_created.append((node, device))
263       except errors.DeviceCreationError, e:
264         logging.warning("Creating disk %s for instance '%s' failed",
265                         idx, instance.name)
266         disks_created.extend(e.created_devices)
267         _UndoCreateDisks(lu, disks_created)
268         raise errors.OpExecError(e.message)
269   return disks_created
270
271
272 def ComputeDiskSizePerVG(disk_template, disks):
273   """Compute disk size requirements in the volume group
274
275   """
276   def _compute(disks, payload):
277     """Universal algorithm.
278
279     """
280     vgs = {}
281     for disk in disks:
282       vgs[disk[constants.IDISK_VG]] = \
283         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284
285     return vgs
286
287   # Required free disk space as a function of disk and swap space
288   req_size_dict = {
289     constants.DT_DISKLESS: {},
290     constants.DT_PLAIN: _compute(disks, 0),
291     # 128 MB are added for drbd metadata for each disk
292     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293     constants.DT_FILE: {},
294     constants.DT_SHARED_FILE: {},
295     }
296
297   if disk_template not in req_size_dict:
298     raise errors.ProgrammerError("Disk template '%s' size requirement"
299                                  " is unknown" % disk_template)
300
301   return req_size_dict[disk_template]
302
303
304 def ComputeDisks(op, default_vg):
305   """Computes the instance disks.
306
307   @param op: The instance opcode
308   @param default_vg: The default_vg to assume
309
310   @return: The computed disks
311
312   """
313   disks = []
314   for disk in op.disks:
315     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316     if mode not in constants.DISK_ACCESS_SET:
317       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318                                  mode, errors.ECODE_INVAL)
319     size = disk.get(constants.IDISK_SIZE, None)
320     if size is None:
321       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322     try:
323       size = int(size)
324     except (TypeError, ValueError):
325       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326                                  errors.ECODE_INVAL)
327
328     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329     if ext_provider and op.disk_template != constants.DT_EXT:
330       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331                                  " disk template, not %s" %
332                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
333                                   op.disk_template), errors.ECODE_INVAL)
334
335     data_vg = disk.get(constants.IDISK_VG, default_vg)
336     name = disk.get(constants.IDISK_NAME, None)
337     if name is not None and name.lower() == constants.VALUE_NONE:
338       name = None
339     new_disk = {
340       constants.IDISK_SIZE: size,
341       constants.IDISK_MODE: mode,
342       constants.IDISK_VG: data_vg,
343       constants.IDISK_NAME: name,
344       }
345
346     if constants.IDISK_METAVG in disk:
347       new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348     if constants.IDISK_ADOPT in disk:
349       new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
350
351     # For extstorage, demand the `provider' option and add any
352     # additional parameters (ext-params) to the dict
353     if op.disk_template == constants.DT_EXT:
354       if ext_provider:
355         new_disk[constants.IDISK_PROVIDER] = ext_provider
356         for key in disk:
357           if key not in constants.IDISK_PARAMS:
358             new_disk[key] = disk[key]
359       else:
360         raise errors.OpPrereqError("Missing provider for template '%s'" %
361                                    constants.DT_EXT, errors.ECODE_INVAL)
362
363     disks.append(new_disk)
364
365   return disks
366
367
368 def CheckRADOSFreeSpace():
369   """Compute disk size requirements inside the RADOS cluster.
370
371   """
372   # For the RADOS cluster we assume there is always enough space.
373   pass
374
375
376 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377                          iv_name, p_minor, s_minor):
378   """Generate a drbd8 device complete with its children.
379
380   """
381   assert len(vgnames) == len(names) == 2
382   port = lu.cfg.AllocatePort()
383   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384
385   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386                           logical_id=(vgnames[0], names[0]),
387                           params={})
388   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389   dev_meta = objects.Disk(dev_type=constants.LD_LV,
390                           size=constants.DRBD_META_SIZE,
391                           logical_id=(vgnames[1], names[1]),
392                           params={})
393   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395                           logical_id=(primary, secondary, port,
396                                       p_minor, s_minor,
397                                       shared_secret),
398                           children=[dev_data, dev_meta],
399                           iv_name=iv_name, params={})
400   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401   return drbd_dev
402
403
404 def GenerateDiskTemplate(
405   lu, template_name, instance_name, primary_node, secondary_nodes,
406   disk_info, file_storage_dir, file_driver, base_index,
407   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408   _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409   """Generate the entire disk layout for a given template type.
410
411   """
412   vgname = lu.cfg.GetVGName()
413   disk_count = len(disk_info)
414   disks = []
415
416   if template_name == constants.DT_DISKLESS:
417     pass
418   elif template_name == constants.DT_DRBD8:
419     if len(secondary_nodes) != 1:
420       raise errors.ProgrammerError("Wrong template configuration")
421     remote_node = secondary_nodes[0]
422     minors = lu.cfg.AllocateDRBDMinor(
423       [primary_node, remote_node] * len(disk_info), instance_name)
424
425     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426                                                        full_disk_params)
427     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428
429     names = []
430     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431                                                for i in range(disk_count)]):
432       names.append(lv_prefix + "_data")
433       names.append(lv_prefix + "_meta")
434     for idx, disk in enumerate(disk_info):
435       disk_index = idx + base_index
436       data_vg = disk.get(constants.IDISK_VG, vgname)
437       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439                                       disk[constants.IDISK_SIZE],
440                                       [data_vg, meta_vg],
441                                       names[idx * 2:idx * 2 + 2],
442                                       "disk/%d" % disk_index,
443                                       minors[idx * 2], minors[idx * 2 + 1])
444       disk_dev.mode = disk[constants.IDISK_MODE]
445       disk_dev.name = disk.get(constants.IDISK_NAME, None)
446       disks.append(disk_dev)
447   else:
448     if secondary_nodes:
449       raise errors.ProgrammerError("Wrong template configuration")
450
451     if template_name == constants.DT_FILE:
452       _req_file_storage()
453     elif template_name == constants.DT_SHARED_FILE:
454       _req_shr_file_storage()
455
456     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457     if name_prefix is None:
458       names = None
459     else:
460       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461                                         (name_prefix, base_index + i)
462                                         for i in range(disk_count)])
463
464     if template_name == constants.DT_PLAIN:
465
466       def logical_id_fn(idx, _, disk):
467         vg = disk.get(constants.IDISK_VG, vgname)
468         return (vg, names[idx])
469
470     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
471       logical_id_fn = \
472         lambda _, disk_index, disk: (file_driver,
473                                      "%s/disk%d" % (file_storage_dir,
474                                                     disk_index))
475     elif template_name == constants.DT_BLOCK:
476       logical_id_fn = \
477         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478                                        disk[constants.IDISK_ADOPT])
479     elif template_name == constants.DT_RBD:
480       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481     elif template_name == constants.DT_EXT:
482       def logical_id_fn(idx, _, disk):
483         provider = disk.get(constants.IDISK_PROVIDER, None)
484         if provider is None:
485           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486                                        " not found", constants.DT_EXT,
487                                        constants.IDISK_PROVIDER)
488         return (provider, names[idx])
489     else:
490       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491
492     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
493
494     for idx, disk in enumerate(disk_info):
495       params = {}
496       # Only for the Ext template add disk_info to params
497       if template_name == constants.DT_EXT:
498         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499         for key in disk:
500           if key not in constants.IDISK_PARAMS:
501             params[key] = disk[key]
502       disk_index = idx + base_index
503       size = disk[constants.IDISK_SIZE]
504       feedback_fn("* disk %s, size %s" %
505                   (disk_index, utils.FormatUnit(size, "h")))
506       disk_dev = objects.Disk(dev_type=dev_type, size=size,
507                               logical_id=logical_id_fn(idx, disk_index, disk),
508                               iv_name="disk/%d" % disk_index,
509                               mode=disk[constants.IDISK_MODE],
510                               params=params)
511       disk_dev.name = disk.get(constants.IDISK_NAME, None)
512       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513       disks.append(disk_dev)
514
515   return disks
516
517
518 class LUInstanceRecreateDisks(LogicalUnit):
519   """Recreate an instance's missing disks.
520
521   """
522   HPATH = "instance-recreate-disks"
523   HTYPE = constants.HTYPE_INSTANCE
524   REQ_BGL = False
525
526   _MODIFYABLE = compat.UniqueFrozenset([
527     constants.IDISK_SIZE,
528     constants.IDISK_MODE,
529     ])
530
531   # New or changed disk parameters may have different semantics
532   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
533     constants.IDISK_ADOPT,
534
535     # TODO: Implement support changing VG while recreating
536     constants.IDISK_VG,
537     constants.IDISK_METAVG,
538     constants.IDISK_PROVIDER,
539     constants.IDISK_NAME,
540     ]))
541
542   def _RunAllocator(self):
543     """Run the allocator based on input opcode.
544
545     """
546     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
547
548     # FIXME
549     # The allocator should actually run in "relocate" mode, but current
550     # allocators don't support relocating all the nodes of an instance at
551     # the same time. As a workaround we use "allocate" mode, but this is
552     # suboptimal for two reasons:
553     # - The instance name passed to the allocator is present in the list of
554     #   existing instances, so there could be a conflict within the
555     #   internal structures of the allocator. This doesn't happen with the
556     #   current allocators, but it's a liability.
557     # - The allocator counts the resources used by the instance twice: once
558     #   because the instance exists already, and once because it tries to
559     #   allocate a new instance.
560     # The allocator could choose some of the nodes on which the instance is
561     # running, but that's not a problem. If the instance nodes are broken,
562     # they should be already be marked as drained or offline, and hence
563     # skipped by the allocator. If instance disks have been lost for other
564     # reasons, then recreating the disks on the same nodes should be fine.
565     disk_template = self.instance.disk_template
566     spindle_use = be_full[constants.BE_SPINDLE_USE]
567     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
568                                         disk_template=disk_template,
569                                         tags=list(self.instance.GetTags()),
570                                         os=self.instance.os,
571                                         nics=[{}],
572                                         vcpus=be_full[constants.BE_VCPUS],
573                                         memory=be_full[constants.BE_MAXMEM],
574                                         spindle_use=spindle_use,
575                                         disks=[{constants.IDISK_SIZE: d.size,
576                                                 constants.IDISK_MODE: d.mode}
577                                                for d in self.instance.disks],
578                                         hypervisor=self.instance.hypervisor,
579                                         node_whitelist=None)
580     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
581
582     ial.Run(self.op.iallocator)
583
584     assert req.RequiredNodes() == len(self.instance.all_nodes)
585
586     if not ial.success:
587       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
588                                  " %s" % (self.op.iallocator, ial.info),
589                                  errors.ECODE_NORES)
590
591     self.op.nodes = ial.result
592     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
593                  self.op.instance_name, self.op.iallocator,
594                  utils.CommaJoin(ial.result))
595
596   def CheckArguments(self):
597     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
598       # Normalize and convert deprecated list of disk indices
599       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
600
601     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
602     if duplicates:
603       raise errors.OpPrereqError("Some disks have been specified more than"
604                                  " once: %s" % utils.CommaJoin(duplicates),
605                                  errors.ECODE_INVAL)
606
607     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
608     # when neither iallocator nor nodes are specified
609     if self.op.iallocator or self.op.nodes:
610       CheckIAllocatorOrNode(self, "iallocator", "nodes")
611
612     for (idx, params) in self.op.disks:
613       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
614       unsupported = frozenset(params.keys()) - self._MODIFYABLE
615       if unsupported:
616         raise errors.OpPrereqError("Parameters for disk %s try to change"
617                                    " unmodifyable parameter(s): %s" %
618                                    (idx, utils.CommaJoin(unsupported)),
619                                    errors.ECODE_INVAL)
620
621   def ExpandNames(self):
622     self._ExpandAndLockInstance()
623     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
624
625     if self.op.nodes:
626       self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
627       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
628     else:
629       self.needed_locks[locking.LEVEL_NODE] = []
630       if self.op.iallocator:
631         # iallocator will select a new node in the same group
632         self.needed_locks[locking.LEVEL_NODEGROUP] = []
633         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
634
635     self.needed_locks[locking.LEVEL_NODE_RES] = []
636
637   def DeclareLocks(self, level):
638     if level == locking.LEVEL_NODEGROUP:
639       assert self.op.iallocator is not None
640       assert not self.op.nodes
641       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
642       self.share_locks[locking.LEVEL_NODEGROUP] = 1
643       # Lock the primary group used by the instance optimistically; this
644       # requires going via the node before it's locked, requiring
645       # verification later on
646       self.needed_locks[locking.LEVEL_NODEGROUP] = \
647         self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
648
649     elif level == locking.LEVEL_NODE:
650       # If an allocator is used, then we lock all the nodes in the current
651       # instance group, as we don't know yet which ones will be selected;
652       # if we replace the nodes without using an allocator, locks are
653       # already declared in ExpandNames; otherwise, we need to lock all the
654       # instance nodes for disk re-creation
655       if self.op.iallocator:
656         assert not self.op.nodes
657         assert not self.needed_locks[locking.LEVEL_NODE]
658         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
659
660         # Lock member nodes of the group of the primary node
661         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
662           self.needed_locks[locking.LEVEL_NODE].extend(
663             self.cfg.GetNodeGroup(group_uuid).members)
664
665         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
666       elif not self.op.nodes:
667         self._LockInstancesNodes(primary_only=False)
668     elif level == locking.LEVEL_NODE_RES:
669       # Copy node locks
670       self.needed_locks[locking.LEVEL_NODE_RES] = \
671         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
672
673   def BuildHooksEnv(self):
674     """Build hooks env.
675
676     This runs on master, primary and secondary nodes of the instance.
677
678     """
679     return BuildInstanceHookEnvByObject(self, self.instance)
680
681   def BuildHooksNodes(self):
682     """Build hooks nodes.
683
684     """
685     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
686     return (nl, nl)
687
688   def CheckPrereq(self):
689     """Check prerequisites.
690
691     This checks that the instance is in the cluster and is not running.
692
693     """
694     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
695     assert instance is not None, \
696       "Cannot retrieve locked instance %s" % self.op.instance_name
697     if self.op.nodes:
698       if len(self.op.nodes) != len(instance.all_nodes):
699         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
700                                    " %d replacement nodes were specified" %
701                                    (instance.name, len(instance.all_nodes),
702                                     len(self.op.nodes)),
703                                    errors.ECODE_INVAL)
704       assert instance.disk_template != constants.DT_DRBD8 or \
705              len(self.op.nodes) == 2
706       assert instance.disk_template != constants.DT_PLAIN or \
707              len(self.op.nodes) == 1
708       primary_node = self.op.nodes[0]
709     else:
710       primary_node = instance.primary_node
711     if not self.op.iallocator:
712       CheckNodeOnline(self, primary_node)
713
714     if instance.disk_template == constants.DT_DISKLESS:
715       raise errors.OpPrereqError("Instance '%s' has no disks" %
716                                  self.op.instance_name, errors.ECODE_INVAL)
717
718     # Verify if node group locks are still correct
719     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
720     if owned_groups:
721       # Node group locks are acquired only for the primary node (and only
722       # when the allocator is used)
723       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
724                               primary_only=True)
725
726     # if we replace nodes *and* the old primary is offline, we don't
727     # check the instance state
728     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
729     if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
730       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
731                          msg="cannot recreate disks")
732
733     if self.op.disks:
734       self.disks = dict(self.op.disks)
735     else:
736       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
737
738     maxidx = max(self.disks.keys())
739     if maxidx >= len(instance.disks):
740       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
741                                  errors.ECODE_INVAL)
742
743     if ((self.op.nodes or self.op.iallocator) and
744          sorted(self.disks.keys()) != range(len(instance.disks))):
745       raise errors.OpPrereqError("Can't recreate disks partially and"
746                                  " change the nodes at the same time",
747                                  errors.ECODE_INVAL)
748
749     self.instance = instance
750
751     if self.op.iallocator:
752       self._RunAllocator()
753       # Release unneeded node and node resource locks
754       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
755       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
756       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
757
758     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
759
760   def Exec(self, feedback_fn):
761     """Recreate the disks.
762
763     """
764     instance = self.instance
765
766     assert (self.owned_locks(locking.LEVEL_NODE) ==
767             self.owned_locks(locking.LEVEL_NODE_RES))
768
769     to_skip = []
770     mods = [] # keeps track of needed changes
771
772     for idx, disk in enumerate(instance.disks):
773       try:
774         changes = self.disks[idx]
775       except KeyError:
776         # Disk should not be recreated
777         to_skip.append(idx)
778         continue
779
780       # update secondaries for disks, if needed
781       if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
782         # need to update the nodes and minors
783         assert len(self.op.nodes) == 2
784         assert len(disk.logical_id) == 6 # otherwise disk internals
785                                          # have changed
786         (_, _, old_port, _, _, old_secret) = disk.logical_id
787         new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
788         new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
789                   new_minors[0], new_minors[1], old_secret)
790         assert len(disk.logical_id) == len(new_id)
791       else:
792         new_id = None
793
794       mods.append((idx, new_id, changes))
795
796     # now that we have passed all asserts above, we can apply the mods
797     # in a single run (to avoid partial changes)
798     for idx, new_id, changes in mods:
799       disk = instance.disks[idx]
800       if new_id is not None:
801         assert disk.dev_type == constants.LD_DRBD8
802         disk.logical_id = new_id
803       if changes:
804         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
805                     mode=changes.get(constants.IDISK_MODE, None))
806
807     # change primary node, if needed
808     if self.op.nodes:
809       instance.primary_node = self.op.nodes[0]
810       self.LogWarning("Changing the instance's nodes, you will have to"
811                       " remove any disks left on the older nodes manually")
812
813     if self.op.nodes:
814       self.cfg.Update(instance, feedback_fn)
815
816     # All touched nodes must be locked
817     mylocks = self.owned_locks(locking.LEVEL_NODE)
818     assert mylocks.issuperset(frozenset(instance.all_nodes))
819     new_disks = CreateDisks(self, instance, to_skip=to_skip)
820
821     # TODO: Release node locks before wiping, or explain why it's not possible
822     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
823       wipedisks = [(idx, disk, 0)
824                    for (idx, disk) in enumerate(instance.disks)
825                    if idx not in to_skip]
826       WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
827
828
829 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
830   """Checks if nodes have enough free disk space in the specified VG.
831
832   This function checks if all given nodes have the needed amount of
833   free disk. In case any node has less disk or we cannot get the
834   information from the node, this function raises an OpPrereqError
835   exception.
836
837   @type lu: C{LogicalUnit}
838   @param lu: a logical unit from which we get configuration data
839   @type nodenames: C{list}
840   @param nodenames: the list of node names to check
841   @type vg: C{str}
842   @param vg: the volume group to check
843   @type requested: C{int}
844   @param requested: the amount of disk in MiB to check for
845   @raise errors.OpPrereqError: if the node doesn't have enough disk,
846       or we cannot check the node
847
848   """
849   es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
850   nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
851   for node in nodenames:
852     info = nodeinfo[node]
853     info.Raise("Cannot get current information from node %s" % node,
854                prereq=True, ecode=errors.ECODE_ENVIRON)
855     (_, (vg_info, ), _) = info.payload
856     vg_free = vg_info.get("vg_free", None)
857     if not isinstance(vg_free, int):
858       raise errors.OpPrereqError("Can't compute free disk space on node"
859                                  " %s for vg %s, result was '%s'" %
860                                  (node, vg, vg_free), errors.ECODE_ENVIRON)
861     if requested > vg_free:
862       raise errors.OpPrereqError("Not enough disk space on target node %s"
863                                  " vg %s: required %d MiB, available %d MiB" %
864                                  (node, vg, requested, vg_free),
865                                  errors.ECODE_NORES)
866
867
868 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
869   """Checks if nodes have enough free disk space in all the VGs.
870
871   This function checks if all given nodes have the needed amount of
872   free disk. In case any node has less disk or we cannot get the
873   information from the node, this function raises an OpPrereqError
874   exception.
875
876   @type lu: C{LogicalUnit}
877   @param lu: a logical unit from which we get configuration data
878   @type nodenames: C{list}
879   @param nodenames: the list of node names to check
880   @type req_sizes: C{dict}
881   @param req_sizes: the hash of vg and corresponding amount of disk in
882       MiB to check for
883   @raise errors.OpPrereqError: if the node doesn't have enough disk,
884       or we cannot check the node
885
886   """
887   for vg, req_size in req_sizes.items():
888     _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
889
890
891 def _DiskSizeInBytesToMebibytes(lu, size):
892   """Converts a disk size in bytes to mebibytes.
893
894   Warns and rounds up if the size isn't an even multiple of 1 MiB.
895
896   """
897   (mib, remainder) = divmod(size, 1024 * 1024)
898
899   if remainder != 0:
900     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
901                   " to not overwrite existing data (%s bytes will not be"
902                   " wiped)", (1024 * 1024) - remainder)
903     mib += 1
904
905   return mib
906
907
908 def _CalcEta(time_taken, written, total_size):
909   """Calculates the ETA based on size written and total size.
910
911   @param time_taken: The time taken so far
912   @param written: amount written so far
913   @param total_size: The total size of data to be written
914   @return: The remaining time in seconds
915
916   """
917   avg_time = time_taken / float(written)
918   return (total_size - written) * avg_time
919
920
921 def WipeDisks(lu, instance, disks=None):
922   """Wipes instance disks.
923
924   @type lu: L{LogicalUnit}
925   @param lu: the logical unit on whose behalf we execute
926   @type instance: L{objects.Instance}
927   @param instance: the instance whose disks we should create
928   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
929   @param disks: Disk details; tuple contains disk index, disk object and the
930     start offset
931
932   """
933   node = instance.primary_node
934
935   if disks is None:
936     disks = [(idx, disk, 0)
937              for (idx, disk) in enumerate(instance.disks)]
938
939   for (_, device, _) in disks:
940     lu.cfg.SetDiskID(device, node)
941
942   logging.info("Pausing synchronization of disks of instance '%s'",
943                instance.name)
944   result = lu.rpc.call_blockdev_pause_resume_sync(node,
945                                                   (map(compat.snd, disks),
946                                                    instance),
947                                                   True)
948   result.Raise("Failed to pause disk synchronization on node '%s'" % node)
949
950   for idx, success in enumerate(result.payload):
951     if not success:
952       logging.warn("Pausing synchronization of disk %s of instance '%s'"
953                    " failed", idx, instance.name)
954
955   try:
956     for (idx, device, offset) in disks:
957       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
958       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
959       wipe_chunk_size = \
960         int(min(constants.MAX_WIPE_CHUNK,
961                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
962
963       size = device.size
964       last_output = 0
965       start_time = time.time()
966
967       if offset == 0:
968         info_text = ""
969       else:
970         info_text = (" (from %s to %s)" %
971                      (utils.FormatUnit(offset, "h"),
972                       utils.FormatUnit(size, "h")))
973
974       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
975
976       logging.info("Wiping disk %d for instance %s on node %s using"
977                    " chunk size %s", idx, instance.name, node, wipe_chunk_size)
978
979       while offset < size:
980         wipe_size = min(wipe_chunk_size, size - offset)
981
982         logging.debug("Wiping disk %d, offset %s, chunk %s",
983                       idx, offset, wipe_size)
984
985         result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
986                                            wipe_size)
987         result.Raise("Could not wipe disk %d at offset %d for size %d" %
988                      (idx, offset, wipe_size))
989
990         now = time.time()
991         offset += wipe_size
992         if now - last_output >= 60:
993           eta = _CalcEta(now - start_time, offset, size)
994           lu.LogInfo(" - done: %.1f%% ETA: %s",
995                      offset / float(size) * 100, utils.FormatSeconds(eta))
996           last_output = now
997   finally:
998     logging.info("Resuming synchronization of disks for instance '%s'",
999                  instance.name)
1000
1001     result = lu.rpc.call_blockdev_pause_resume_sync(node,
1002                                                     (map(compat.snd, disks),
1003                                                      instance),
1004                                                     False)
1005
1006     if result.fail_msg:
1007       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1008                     node, result.fail_msg)
1009     else:
1010       for idx, success in enumerate(result.payload):
1011         if not success:
1012           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1013                         " failed", idx, instance.name)
1014
1015
1016 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1017   """Wrapper for L{WipeDisks} that handles errors.
1018
1019   @type lu: L{LogicalUnit}
1020   @param lu: the logical unit on whose behalf we execute
1021   @type instance: L{objects.Instance}
1022   @param instance: the instance whose disks we should wipe
1023   @param disks: see L{WipeDisks}
1024   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1025       case of error
1026   @raise errors.OpPrereqError: in case of failure
1027
1028   """
1029   try:
1030     WipeDisks(lu, instance, disks=disks)
1031   except errors.OpExecError:
1032     logging.warning("Wiping disks for instance '%s' failed",
1033                     instance.name)
1034     _UndoCreateDisks(lu, cleanup)
1035     raise
1036
1037
1038 def ExpandCheckDisks(instance, disks):
1039   """Return the instance disks selected by the disks list
1040
1041   @type disks: list of L{objects.Disk} or None
1042   @param disks: selected disks
1043   @rtype: list of L{objects.Disk}
1044   @return: selected instance disks to act on
1045
1046   """
1047   if disks is None:
1048     return instance.disks
1049   else:
1050     if not set(disks).issubset(instance.disks):
1051       raise errors.ProgrammerError("Can only act on disks belonging to the"
1052                                    " target instance: expected a subset of %r,"
1053                                    " got %r" % (instance.disks, disks))
1054     return disks
1055
1056
1057 def WaitForSync(lu, instance, disks=None, oneshot=False):
1058   """Sleep and poll for an instance's disk to sync.
1059
1060   """
1061   if not instance.disks or disks is not None and not disks:
1062     return True
1063
1064   disks = ExpandCheckDisks(instance, disks)
1065
1066   if not oneshot:
1067     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1068
1069   node = instance.primary_node
1070
1071   for dev in disks:
1072     lu.cfg.SetDiskID(dev, node)
1073
1074   # TODO: Convert to utils.Retry
1075
1076   retries = 0
1077   degr_retries = 10 # in seconds, as we sleep 1 second each time
1078   while True:
1079     max_time = 0
1080     done = True
1081     cumul_degraded = False
1082     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1083     msg = rstats.fail_msg
1084     if msg:
1085       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1086       retries += 1
1087       if retries >= 10:
1088         raise errors.RemoteError("Can't contact node %s for mirror data,"
1089                                  " aborting." % node)
1090       time.sleep(6)
1091       continue
1092     rstats = rstats.payload
1093     retries = 0
1094     for i, mstat in enumerate(rstats):
1095       if mstat is None:
1096         lu.LogWarning("Can't compute data for node %s/%s",
1097                       node, disks[i].iv_name)
1098         continue
1099
1100       cumul_degraded = (cumul_degraded or
1101                         (mstat.is_degraded and mstat.sync_percent is None))
1102       if mstat.sync_percent is not None:
1103         done = False
1104         if mstat.estimated_time is not None:
1105           rem_time = ("%s remaining (estimated)" %
1106                       utils.FormatSeconds(mstat.estimated_time))
1107           max_time = mstat.estimated_time
1108         else:
1109           rem_time = "no time estimate"
1110         lu.LogInfo("- device %s: %5.2f%% done, %s",
1111                    disks[i].iv_name, mstat.sync_percent, rem_time)
1112
1113     # if we're done but degraded, let's do a few small retries, to
1114     # make sure we see a stable and not transient situation; therefore
1115     # we force restart of the loop
1116     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1117       logging.info("Degraded disks found, %d retries left", degr_retries)
1118       degr_retries -= 1
1119       time.sleep(1)
1120       continue
1121
1122     if done or oneshot:
1123       break
1124
1125     time.sleep(min(60, max_time))
1126
1127   if done:
1128     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1129
1130   return not cumul_degraded
1131
1132
1133 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1134   """Shutdown block devices of an instance.
1135
1136   This does the shutdown on all nodes of the instance.
1137
1138   If the ignore_primary is false, errors on the primary node are
1139   ignored.
1140
1141   """
1142   all_result = True
1143
1144   if disks is None:
1145     # only mark instance disks as inactive if all disks are affected
1146     lu.cfg.MarkInstanceDisksInactive(instance.name)
1147
1148   disks = ExpandCheckDisks(instance, disks)
1149
1150   for disk in disks:
1151     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1152       lu.cfg.SetDiskID(top_disk, node)
1153       result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1154       msg = result.fail_msg
1155       if msg:
1156         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1157                       disk.iv_name, node, msg)
1158         if ((node == instance.primary_node and not ignore_primary) or
1159             (node != instance.primary_node and not result.offline)):
1160           all_result = False
1161   return all_result
1162
1163
1164 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1165   """Shutdown block devices of an instance.
1166
1167   This function checks if an instance is running, before calling
1168   _ShutdownInstanceDisks.
1169
1170   """
1171   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1172   ShutdownInstanceDisks(lu, instance, disks=disks)
1173
1174
1175 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1176                           ignore_size=False):
1177   """Prepare the block devices for an instance.
1178
1179   This sets up the block devices on all nodes.
1180
1181   @type lu: L{LogicalUnit}
1182   @param lu: the logical unit on whose behalf we execute
1183   @type instance: L{objects.Instance}
1184   @param instance: the instance for whose disks we assemble
1185   @type disks: list of L{objects.Disk} or None
1186   @param disks: which disks to assemble (or all, if None)
1187   @type ignore_secondaries: boolean
1188   @param ignore_secondaries: if true, errors on secondary nodes
1189       won't result in an error return from the function
1190   @type ignore_size: boolean
1191   @param ignore_size: if true, the current known size of the disk
1192       will not be used during the disk activation, useful for cases
1193       when the size is wrong
1194   @return: False if the operation failed, otherwise a list of
1195       (host, instance_visible_name, node_visible_name)
1196       with the mapping from node devices to instance devices
1197
1198   """
1199   device_info = []
1200   disks_ok = True
1201   iname = instance.name
1202
1203   if disks is None:
1204     # only mark instance disks as active if all disks are affected
1205     lu.cfg.MarkInstanceDisksActive(iname)
1206
1207   disks = ExpandCheckDisks(instance, disks)
1208
1209   # With the two passes mechanism we try to reduce the window of
1210   # opportunity for the race condition of switching DRBD to primary
1211   # before handshaking occured, but we do not eliminate it
1212
1213   # The proper fix would be to wait (with some limits) until the
1214   # connection has been made and drbd transitions from WFConnection
1215   # into any other network-connected state (Connected, SyncTarget,
1216   # SyncSource, etc.)
1217
1218   # 1st pass, assemble on all nodes in secondary mode
1219   for idx, inst_disk in enumerate(disks):
1220     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1221       if ignore_size:
1222         node_disk = node_disk.Copy()
1223         node_disk.UnsetSize()
1224       lu.cfg.SetDiskID(node_disk, node)
1225       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1226                                              False, idx)
1227       msg = result.fail_msg
1228       if msg:
1229         is_offline_secondary = (node in instance.secondary_nodes and
1230                                 result.offline)
1231         lu.LogWarning("Could not prepare block device %s on node %s"
1232                       " (is_primary=False, pass=1): %s",
1233                       inst_disk.iv_name, node, msg)
1234         if not (ignore_secondaries or is_offline_secondary):
1235           disks_ok = False
1236
1237   # FIXME: race condition on drbd migration to primary
1238
1239   # 2nd pass, do only the primary node
1240   for idx, inst_disk in enumerate(disks):
1241     dev_path = None
1242
1243     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1244       if node != instance.primary_node:
1245         continue
1246       if ignore_size:
1247         node_disk = node_disk.Copy()
1248         node_disk.UnsetSize()
1249       lu.cfg.SetDiskID(node_disk, node)
1250       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1251                                              True, idx)
1252       msg = result.fail_msg
1253       if msg:
1254         lu.LogWarning("Could not prepare block device %s on node %s"
1255                       " (is_primary=True, pass=2): %s",
1256                       inst_disk.iv_name, node, msg)
1257         disks_ok = False
1258       else:
1259         dev_path, _ = result.payload
1260
1261     device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1262
1263   # leave the disks configured for the primary node
1264   # this is a workaround that would be fixed better by
1265   # improving the logical/physical id handling
1266   for disk in disks:
1267     lu.cfg.SetDiskID(disk, instance.primary_node)
1268
1269   if not disks_ok:
1270     lu.cfg.MarkInstanceDisksInactive(iname)
1271
1272   return disks_ok, device_info
1273
1274
1275 def StartInstanceDisks(lu, instance, force):
1276   """Start the disks of an instance.
1277
1278   """
1279   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1280                                       ignore_secondaries=force)
1281   if not disks_ok:
1282     ShutdownInstanceDisks(lu, instance)
1283     if force is not None and not force:
1284       lu.LogWarning("",
1285                     hint=("If the message above refers to a secondary node,"
1286                           " you can retry the operation using '--force'"))
1287     raise errors.OpExecError("Disk consistency error")
1288
1289
1290 class LUInstanceGrowDisk(LogicalUnit):
1291   """Grow a disk of an instance.
1292
1293   """
1294   HPATH = "disk-grow"
1295   HTYPE = constants.HTYPE_INSTANCE
1296   REQ_BGL = False
1297
1298   def ExpandNames(self):
1299     self._ExpandAndLockInstance()
1300     self.needed_locks[locking.LEVEL_NODE] = []
1301     self.needed_locks[locking.LEVEL_NODE_RES] = []
1302     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1303     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1304
1305   def DeclareLocks(self, level):
1306     if level == locking.LEVEL_NODE:
1307       self._LockInstancesNodes()
1308     elif level == locking.LEVEL_NODE_RES:
1309       # Copy node locks
1310       self.needed_locks[locking.LEVEL_NODE_RES] = \
1311         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1312
1313   def BuildHooksEnv(self):
1314     """Build hooks env.
1315
1316     This runs on the master, the primary and all the secondaries.
1317
1318     """
1319     env = {
1320       "DISK": self.op.disk,
1321       "AMOUNT": self.op.amount,
1322       "ABSOLUTE": self.op.absolute,
1323       }
1324     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1325     return env
1326
1327   def BuildHooksNodes(self):
1328     """Build hooks nodes.
1329
1330     """
1331     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1332     return (nl, nl)
1333
1334   def CheckPrereq(self):
1335     """Check prerequisites.
1336
1337     This checks that the instance is in the cluster.
1338
1339     """
1340     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1341     assert instance is not None, \
1342       "Cannot retrieve locked instance %s" % self.op.instance_name
1343     nodenames = list(instance.all_nodes)
1344     for node in nodenames:
1345       CheckNodeOnline(self, node)
1346
1347     self.instance = instance
1348
1349     if instance.disk_template not in constants.DTS_GROWABLE:
1350       raise errors.OpPrereqError("Instance's disk layout does not support"
1351                                  " growing", errors.ECODE_INVAL)
1352
1353     self.disk = instance.FindDisk(self.op.disk)
1354
1355     if self.op.absolute:
1356       self.target = self.op.amount
1357       self.delta = self.target - self.disk.size
1358       if self.delta < 0:
1359         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1360                                    "current disk size (%s)" %
1361                                    (utils.FormatUnit(self.target, "h"),
1362                                     utils.FormatUnit(self.disk.size, "h")),
1363                                    errors.ECODE_STATE)
1364     else:
1365       self.delta = self.op.amount
1366       self.target = self.disk.size + self.delta
1367       if self.delta < 0:
1368         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1369                                    utils.FormatUnit(self.delta, "h"),
1370                                    errors.ECODE_INVAL)
1371
1372     self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1373
1374   def _CheckDiskSpace(self, nodenames, req_vgspace):
1375     template = self.instance.disk_template
1376     if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1377       # TODO: check the free disk space for file, when that feature will be
1378       # supported
1379       nodes = map(self.cfg.GetNodeInfo, nodenames)
1380       es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1381                         nodes)
1382       if es_nodes:
1383         # With exclusive storage we need to something smarter than just looking
1384         # at free space; for now, let's simply abort the operation.
1385         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1386                                    " is enabled", errors.ECODE_STATE)
1387       CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1388
1389   def Exec(self, feedback_fn):
1390     """Execute disk grow.
1391
1392     """
1393     instance = self.instance
1394     disk = self.disk
1395
1396     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1397     assert (self.owned_locks(locking.LEVEL_NODE) ==
1398             self.owned_locks(locking.LEVEL_NODE_RES))
1399
1400     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1401
1402     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1403     if not disks_ok:
1404       raise errors.OpExecError("Cannot activate block device to grow")
1405
1406     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1407                 (self.op.disk, instance.name,
1408                  utils.FormatUnit(self.delta, "h"),
1409                  utils.FormatUnit(self.target, "h")))
1410
1411     # First run all grow ops in dry-run mode
1412     for node in instance.all_nodes:
1413       self.cfg.SetDiskID(disk, node)
1414       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1415                                            True, True)
1416       result.Raise("Dry-run grow request failed to node %s" % node)
1417
1418     if wipe_disks:
1419       # Get disk size from primary node for wiping
1420       self.cfg.SetDiskID(disk, instance.primary_node)
1421       result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1422       result.Raise("Failed to retrieve disk size from node '%s'" %
1423                    instance.primary_node)
1424
1425       (disk_size_in_bytes, ) = result.payload
1426
1427       if disk_size_in_bytes is None:
1428         raise errors.OpExecError("Failed to retrieve disk size from primary"
1429                                  " node '%s'" % instance.primary_node)
1430
1431       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1432
1433       assert old_disk_size >= disk.size, \
1434         ("Retrieved disk size too small (got %s, should be at least %s)" %
1435          (old_disk_size, disk.size))
1436     else:
1437       old_disk_size = None
1438
1439     # We know that (as far as we can test) operations across different
1440     # nodes will succeed, time to run it for real on the backing storage
1441     for node in instance.all_nodes:
1442       self.cfg.SetDiskID(disk, node)
1443       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1444                                            False, True)
1445       result.Raise("Grow request failed to node %s" % node)
1446
1447     # And now execute it for logical storage, on the primary node
1448     node = instance.primary_node
1449     self.cfg.SetDiskID(disk, node)
1450     result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1451                                          False, False)
1452     result.Raise("Grow request failed to node %s" % node)
1453
1454     disk.RecordGrow(self.delta)
1455     self.cfg.Update(instance, feedback_fn)
1456
1457     # Changes have been recorded, release node lock
1458     ReleaseLocks(self, locking.LEVEL_NODE)
1459
1460     # Downgrade lock while waiting for sync
1461     self.glm.downgrade(locking.LEVEL_INSTANCE)
1462
1463     assert wipe_disks ^ (old_disk_size is None)
1464
1465     if wipe_disks:
1466       assert instance.disks[self.op.disk] == disk
1467
1468       # Wipe newly added disk space
1469       WipeDisks(self, instance,
1470                 disks=[(self.op.disk, disk, old_disk_size)])
1471
1472     if self.op.wait_for_sync:
1473       disk_abort = not WaitForSync(self, instance, disks=[disk])
1474       if disk_abort:
1475         self.LogWarning("Disk syncing has not returned a good status; check"
1476                         " the instance")
1477       if not instance.disks_active:
1478         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1479     elif not instance.disks_active:
1480       self.LogWarning("Not shutting down the disk even if the instance is"
1481                       " not supposed to be running because no wait for"
1482                       " sync mode was requested")
1483
1484     assert self.owned_locks(locking.LEVEL_NODE_RES)
1485     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1486
1487
1488 class LUInstanceReplaceDisks(LogicalUnit):
1489   """Replace the disks of an instance.
1490
1491   """
1492   HPATH = "mirrors-replace"
1493   HTYPE = constants.HTYPE_INSTANCE
1494   REQ_BGL = False
1495
1496   def CheckArguments(self):
1497     """Check arguments.
1498
1499     """
1500     remote_node = self.op.remote_node
1501     ialloc = self.op.iallocator
1502     if self.op.mode == constants.REPLACE_DISK_CHG:
1503       if remote_node is None and ialloc is None:
1504         raise errors.OpPrereqError("When changing the secondary either an"
1505                                    " iallocator script must be used or the"
1506                                    " new node given", errors.ECODE_INVAL)
1507       else:
1508         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1509
1510     elif remote_node is not None or ialloc is not None:
1511       # Not replacing the secondary
1512       raise errors.OpPrereqError("The iallocator and new node options can"
1513                                  " only be used when changing the"
1514                                  " secondary node", errors.ECODE_INVAL)
1515
1516   def ExpandNames(self):
1517     self._ExpandAndLockInstance()
1518
1519     assert locking.LEVEL_NODE not in self.needed_locks
1520     assert locking.LEVEL_NODE_RES not in self.needed_locks
1521     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1522
1523     assert self.op.iallocator is None or self.op.remote_node is None, \
1524       "Conflicting options"
1525
1526     if self.op.remote_node is not None:
1527       self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1528
1529       # Warning: do not remove the locking of the new secondary here
1530       # unless DRBD8.AddChildren is changed to work in parallel;
1531       # currently it doesn't since parallel invocations of
1532       # FindUnusedMinor will conflict
1533       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1534       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1535     else:
1536       self.needed_locks[locking.LEVEL_NODE] = []
1537       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1538
1539       if self.op.iallocator is not None:
1540         # iallocator will select a new node in the same group
1541         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1542         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1543
1544     self.needed_locks[locking.LEVEL_NODE_RES] = []
1545
1546     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1547                                    self.op.iallocator, self.op.remote_node,
1548                                    self.op.disks, self.op.early_release,
1549                                    self.op.ignore_ipolicy)
1550
1551     self.tasklets = [self.replacer]
1552
1553   def DeclareLocks(self, level):
1554     if level == locking.LEVEL_NODEGROUP:
1555       assert self.op.remote_node is None
1556       assert self.op.iallocator is not None
1557       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1558
1559       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1560       # Lock all groups used by instance optimistically; this requires going
1561       # via the node before it's locked, requiring verification later on
1562       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1563         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1564
1565     elif level == locking.LEVEL_NODE:
1566       if self.op.iallocator is not None:
1567         assert self.op.remote_node is None
1568         assert not self.needed_locks[locking.LEVEL_NODE]
1569         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1570
1571         # Lock member nodes of all locked groups
1572         self.needed_locks[locking.LEVEL_NODE] = \
1573           [node_name
1574            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1575            for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1576       else:
1577         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1578
1579         self._LockInstancesNodes()
1580
1581     elif level == locking.LEVEL_NODE_RES:
1582       # Reuse node locks
1583       self.needed_locks[locking.LEVEL_NODE_RES] = \
1584         self.needed_locks[locking.LEVEL_NODE]
1585
1586   def BuildHooksEnv(self):
1587     """Build hooks env.
1588
1589     This runs on the master, the primary and all the secondaries.
1590
1591     """
1592     instance = self.replacer.instance
1593     env = {
1594       "MODE": self.op.mode,
1595       "NEW_SECONDARY": self.op.remote_node,
1596       "OLD_SECONDARY": instance.secondary_nodes[0],
1597       }
1598     env.update(BuildInstanceHookEnvByObject(self, instance))
1599     return env
1600
1601   def BuildHooksNodes(self):
1602     """Build hooks nodes.
1603
1604     """
1605     instance = self.replacer.instance
1606     nl = [
1607       self.cfg.GetMasterNode(),
1608       instance.primary_node,
1609       ]
1610     if self.op.remote_node is not None:
1611       nl.append(self.op.remote_node)
1612     return nl, nl
1613
1614   def CheckPrereq(self):
1615     """Check prerequisites.
1616
1617     """
1618     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1619             self.op.iallocator is None)
1620
1621     # Verify if node group locks are still correct
1622     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1623     if owned_groups:
1624       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1625
1626     return LogicalUnit.CheckPrereq(self)
1627
1628
1629 class LUInstanceActivateDisks(NoHooksLU):
1630   """Bring up an instance's disks.
1631
1632   """
1633   REQ_BGL = False
1634
1635   def ExpandNames(self):
1636     self._ExpandAndLockInstance()
1637     self.needed_locks[locking.LEVEL_NODE] = []
1638     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1639
1640   def DeclareLocks(self, level):
1641     if level == locking.LEVEL_NODE:
1642       self._LockInstancesNodes()
1643
1644   def CheckPrereq(self):
1645     """Check prerequisites.
1646
1647     This checks that the instance is in the cluster.
1648
1649     """
1650     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1651     assert self.instance is not None, \
1652       "Cannot retrieve locked instance %s" % self.op.instance_name
1653     CheckNodeOnline(self, self.instance.primary_node)
1654
1655   def Exec(self, feedback_fn):
1656     """Activate the disks.
1657
1658     """
1659     disks_ok, disks_info = \
1660               AssembleInstanceDisks(self, self.instance,
1661                                     ignore_size=self.op.ignore_size)
1662     if not disks_ok:
1663       raise errors.OpExecError("Cannot activate block devices")
1664
1665     if self.op.wait_for_sync:
1666       if not WaitForSync(self, self.instance):
1667         self.cfg.MarkInstanceDisksInactive(self.instance.name)
1668         raise errors.OpExecError("Some disks of the instance are degraded!")
1669
1670     return disks_info
1671
1672
1673 class LUInstanceDeactivateDisks(NoHooksLU):
1674   """Shutdown an instance's disks.
1675
1676   """
1677   REQ_BGL = False
1678
1679   def ExpandNames(self):
1680     self._ExpandAndLockInstance()
1681     self.needed_locks[locking.LEVEL_NODE] = []
1682     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1683
1684   def DeclareLocks(self, level):
1685     if level == locking.LEVEL_NODE:
1686       self._LockInstancesNodes()
1687
1688   def CheckPrereq(self):
1689     """Check prerequisites.
1690
1691     This checks that the instance is in the cluster.
1692
1693     """
1694     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1695     assert self.instance is not None, \
1696       "Cannot retrieve locked instance %s" % self.op.instance_name
1697
1698   def Exec(self, feedback_fn):
1699     """Deactivate the disks
1700
1701     """
1702     instance = self.instance
1703     if self.op.force:
1704       ShutdownInstanceDisks(self, instance)
1705     else:
1706       _SafeShutdownInstanceDisks(self, instance)
1707
1708
1709 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1710                                ldisk=False):
1711   """Check that mirrors are not degraded.
1712
1713   @attention: The device has to be annotated already.
1714
1715   The ldisk parameter, if True, will change the test from the
1716   is_degraded attribute (which represents overall non-ok status for
1717   the device(s)) to the ldisk (representing the local storage status).
1718
1719   """
1720   lu.cfg.SetDiskID(dev, node)
1721
1722   result = True
1723
1724   if on_primary or dev.AssembleOnSecondary():
1725     rstats = lu.rpc.call_blockdev_find(node, dev)
1726     msg = rstats.fail_msg
1727     if msg:
1728       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1729       result = False
1730     elif not rstats.payload:
1731       lu.LogWarning("Can't find disk on node %s", node)
1732       result = False
1733     else:
1734       if ldisk:
1735         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1736       else:
1737         result = result and not rstats.payload.is_degraded
1738
1739   if dev.children:
1740     for child in dev.children:
1741       result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1742                                                      on_primary)
1743
1744   return result
1745
1746
1747 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1748   """Wrapper around L{_CheckDiskConsistencyInner}.
1749
1750   """
1751   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1752   return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1753                                     ldisk=ldisk)
1754
1755
1756 def _BlockdevFind(lu, node, dev, instance):
1757   """Wrapper around call_blockdev_find to annotate diskparams.
1758
1759   @param lu: A reference to the lu object
1760   @param node: The node to call out
1761   @param dev: The device to find
1762   @param instance: The instance object the device belongs to
1763   @returns The result of the rpc call
1764
1765   """
1766   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1767   return lu.rpc.call_blockdev_find(node, disk)
1768
1769
1770 def _GenerateUniqueNames(lu, exts):
1771   """Generate a suitable LV name.
1772
1773   This will generate a logical volume name for the given instance.
1774
1775   """
1776   results = []
1777   for val in exts:
1778     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1779     results.append("%s%s" % (new_id, val))
1780   return results
1781
1782
1783 class TLReplaceDisks(Tasklet):
1784   """Replaces disks for an instance.
1785
1786   Note: Locking is not within the scope of this class.
1787
1788   """
1789   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1790                disks, early_release, ignore_ipolicy):
1791     """Initializes this class.
1792
1793     """
1794     Tasklet.__init__(self, lu)
1795
1796     # Parameters
1797     self.instance_name = instance_name
1798     self.mode = mode
1799     self.iallocator_name = iallocator_name
1800     self.remote_node = remote_node
1801     self.disks = disks
1802     self.early_release = early_release
1803     self.ignore_ipolicy = ignore_ipolicy
1804
1805     # Runtime data
1806     self.instance = None
1807     self.new_node = None
1808     self.target_node = None
1809     self.other_node = None
1810     self.remote_node_info = None
1811     self.node_secondary_ip = None
1812
1813   @staticmethod
1814   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1815     """Compute a new secondary node using an IAllocator.
1816
1817     """
1818     req = iallocator.IAReqRelocate(name=instance_name,
1819                                    relocate_from=list(relocate_from))
1820     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1821
1822     ial.Run(iallocator_name)
1823
1824     if not ial.success:
1825       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1826                                  " %s" % (iallocator_name, ial.info),
1827                                  errors.ECODE_NORES)
1828
1829     remote_node_name = ial.result[0]
1830
1831     lu.LogInfo("Selected new secondary for instance '%s': %s",
1832                instance_name, remote_node_name)
1833
1834     return remote_node_name
1835
1836   def _FindFaultyDisks(self, node_name):
1837     """Wrapper for L{FindFaultyInstanceDisks}.
1838
1839     """
1840     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1841                                    node_name, True)
1842
1843   def _CheckDisksActivated(self, instance):
1844     """Checks if the instance disks are activated.
1845
1846     @param instance: The instance to check disks
1847     @return: True if they are activated, False otherwise
1848
1849     """
1850     nodes = instance.all_nodes
1851
1852     for idx, dev in enumerate(instance.disks):
1853       for node in nodes:
1854         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1855         self.cfg.SetDiskID(dev, node)
1856
1857         result = _BlockdevFind(self, node, dev, instance)
1858
1859         if result.offline:
1860           continue
1861         elif result.fail_msg or not result.payload:
1862           return False
1863
1864     return True
1865
1866   def CheckPrereq(self):
1867     """Check prerequisites.
1868
1869     This checks that the instance is in the cluster.
1870
1871     """
1872     self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1873     assert instance is not None, \
1874       "Cannot retrieve locked instance %s" % self.instance_name
1875
1876     if instance.disk_template != constants.DT_DRBD8:
1877       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1878                                  " instances", errors.ECODE_INVAL)
1879
1880     if len(instance.secondary_nodes) != 1:
1881       raise errors.OpPrereqError("The instance has a strange layout,"
1882                                  " expected one secondary but found %d" %
1883                                  len(instance.secondary_nodes),
1884                                  errors.ECODE_FAULT)
1885
1886     instance = self.instance
1887     secondary_node = instance.secondary_nodes[0]
1888
1889     if self.iallocator_name is None:
1890       remote_node = self.remote_node
1891     else:
1892       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1893                                        instance.name, instance.secondary_nodes)
1894
1895     if remote_node is None:
1896       self.remote_node_info = None
1897     else:
1898       assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1899              "Remote node '%s' is not locked" % remote_node
1900
1901       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1902       assert self.remote_node_info is not None, \
1903         "Cannot retrieve locked node %s" % remote_node
1904
1905     if remote_node == self.instance.primary_node:
1906       raise errors.OpPrereqError("The specified node is the primary node of"
1907                                  " the instance", errors.ECODE_INVAL)
1908
1909     if remote_node == secondary_node:
1910       raise errors.OpPrereqError("The specified node is already the"
1911                                  " secondary node of the instance",
1912                                  errors.ECODE_INVAL)
1913
1914     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1915                                     constants.REPLACE_DISK_CHG):
1916       raise errors.OpPrereqError("Cannot specify disks to be replaced",
1917                                  errors.ECODE_INVAL)
1918
1919     if self.mode == constants.REPLACE_DISK_AUTO:
1920       if not self._CheckDisksActivated(instance):
1921         raise errors.OpPrereqError("Please run activate-disks on instance %s"
1922                                    " first" % self.instance_name,
1923                                    errors.ECODE_STATE)
1924       faulty_primary = self._FindFaultyDisks(instance.primary_node)
1925       faulty_secondary = self._FindFaultyDisks(secondary_node)
1926
1927       if faulty_primary and faulty_secondary:
1928         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1929                                    " one node and can not be repaired"
1930                                    " automatically" % self.instance_name,
1931                                    errors.ECODE_STATE)
1932
1933       if faulty_primary:
1934         self.disks = faulty_primary
1935         self.target_node = instance.primary_node
1936         self.other_node = secondary_node
1937         check_nodes = [self.target_node, self.other_node]
1938       elif faulty_secondary:
1939         self.disks = faulty_secondary
1940         self.target_node = secondary_node
1941         self.other_node = instance.primary_node
1942         check_nodes = [self.target_node, self.other_node]
1943       else:
1944         self.disks = []
1945         check_nodes = []
1946
1947     else:
1948       # Non-automatic modes
1949       if self.mode == constants.REPLACE_DISK_PRI:
1950         self.target_node = instance.primary_node
1951         self.other_node = secondary_node
1952         check_nodes = [self.target_node, self.other_node]
1953
1954       elif self.mode == constants.REPLACE_DISK_SEC:
1955         self.target_node = secondary_node
1956         self.other_node = instance.primary_node
1957         check_nodes = [self.target_node, self.other_node]
1958
1959       elif self.mode == constants.REPLACE_DISK_CHG:
1960         self.new_node = remote_node
1961         self.other_node = instance.primary_node
1962         self.target_node = secondary_node
1963         check_nodes = [self.new_node, self.other_node]
1964
1965         CheckNodeNotDrained(self.lu, remote_node)
1966         CheckNodeVmCapable(self.lu, remote_node)
1967
1968         old_node_info = self.cfg.GetNodeInfo(secondary_node)
1969         assert old_node_info is not None
1970         if old_node_info.offline and not self.early_release:
1971           # doesn't make sense to delay the release
1972           self.early_release = True
1973           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1974                           " early-release mode", secondary_node)
1975
1976       else:
1977         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1978                                      self.mode)
1979
1980       # If not specified all disks should be replaced
1981       if not self.disks:
1982         self.disks = range(len(self.instance.disks))
1983
1984     # TODO: This is ugly, but right now we can't distinguish between internal
1985     # submitted opcode and external one. We should fix that.
1986     if self.remote_node_info:
1987       # We change the node, lets verify it still meets instance policy
1988       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1989       cluster = self.cfg.GetClusterInfo()
1990       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1991                                                               new_group_info)
1992       CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1993                              self.cfg, ignore=self.ignore_ipolicy)
1994
1995     for node in check_nodes:
1996       CheckNodeOnline(self.lu, node)
1997
1998     touched_nodes = frozenset(node_name for node_name in [self.new_node,
1999                                                           self.other_node,
2000                                                           self.target_node]
2001                               if node_name is not None)
2002
2003     # Release unneeded node and node resource locks
2004     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2005     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2006     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2007
2008     # Release any owned node group
2009     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2010
2011     # Check whether disks are valid
2012     for disk_idx in self.disks:
2013       instance.FindDisk(disk_idx)
2014
2015     # Get secondary node IP addresses
2016     self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2017                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2018
2019   def Exec(self, feedback_fn):
2020     """Execute disk replacement.
2021
2022     This dispatches the disk replacement to the appropriate handler.
2023
2024     """
2025     if __debug__:
2026       # Verify owned locks before starting operation
2027       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2028       assert set(owned_nodes) == set(self.node_secondary_ip), \
2029           ("Incorrect node locks, owning %s, expected %s" %
2030            (owned_nodes, self.node_secondary_ip.keys()))
2031       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2032               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2033       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2034
2035       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2036       assert list(owned_instances) == [self.instance_name], \
2037           "Instance '%s' not locked" % self.instance_name
2038
2039       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2040           "Should not own any node group lock at this point"
2041
2042     if not self.disks:
2043       feedback_fn("No disks need replacement for instance '%s'" %
2044                   self.instance.name)
2045       return
2046
2047     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2048                 (utils.CommaJoin(self.disks), self.instance.name))
2049     feedback_fn("Current primary node: %s" % self.instance.primary_node)
2050     feedback_fn("Current seconary node: %s" %
2051                 utils.CommaJoin(self.instance.secondary_nodes))
2052
2053     activate_disks = not self.instance.disks_active
2054
2055     # Activate the instance disks if we're replacing them on a down instance
2056     if activate_disks:
2057       StartInstanceDisks(self.lu, self.instance, True)
2058
2059     try:
2060       # Should we replace the secondary node?
2061       if self.new_node is not None:
2062         fn = self._ExecDrbd8Secondary
2063       else:
2064         fn = self._ExecDrbd8DiskOnly
2065
2066       result = fn(feedback_fn)
2067     finally:
2068       # Deactivate the instance disks if we're replacing them on a
2069       # down instance
2070       if activate_disks:
2071         _SafeShutdownInstanceDisks(self.lu, self.instance)
2072
2073     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2074
2075     if __debug__:
2076       # Verify owned locks
2077       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2078       nodes = frozenset(self.node_secondary_ip)
2079       assert ((self.early_release and not owned_nodes) or
2080               (not self.early_release and not (set(owned_nodes) - nodes))), \
2081         ("Not owning the correct locks, early_release=%s, owned=%r,"
2082          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2083
2084     return result
2085
2086   def _CheckVolumeGroup(self, nodes):
2087     self.lu.LogInfo("Checking volume groups")
2088
2089     vgname = self.cfg.GetVGName()
2090
2091     # Make sure volume group exists on all involved nodes
2092     results = self.rpc.call_vg_list(nodes)
2093     if not results:
2094       raise errors.OpExecError("Can't list volume groups on the nodes")
2095
2096     for node in nodes:
2097       res = results[node]
2098       res.Raise("Error checking node %s" % node)
2099       if vgname not in res.payload:
2100         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2101                                  (vgname, node))
2102
2103   def _CheckDisksExistence(self, nodes):
2104     # Check disk existence
2105     for idx, dev in enumerate(self.instance.disks):
2106       if idx not in self.disks:
2107         continue
2108
2109       for node in nodes:
2110         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2111         self.cfg.SetDiskID(dev, node)
2112
2113         result = _BlockdevFind(self, node, dev, self.instance)
2114
2115         msg = result.fail_msg
2116         if msg or not result.payload:
2117           if not msg:
2118             msg = "disk not found"
2119           if not self._CheckDisksActivated(self.instance):
2120             extra_hint = ("\nDisks seem to be not properly activated. Try"
2121                           " running activate-disks on the instance before"
2122                           " using replace-disks.")
2123           else:
2124             extra_hint = ""
2125           raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2126                                    (idx, node, msg, extra_hint))
2127
2128   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2129     for idx, dev in enumerate(self.instance.disks):
2130       if idx not in self.disks:
2131         continue
2132
2133       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2134                       (idx, node_name))
2135
2136       if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2137                                   on_primary, ldisk=ldisk):
2138         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2139                                  " replace disks for instance %s" %
2140                                  (node_name, self.instance.name))
2141
2142   def _CreateNewStorage(self, node_name):
2143     """Create new storage on the primary or secondary node.
2144
2145     This is only used for same-node replaces, not for changing the
2146     secondary node, hence we don't want to modify the existing disk.
2147
2148     """
2149     iv_names = {}
2150
2151     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2152     for idx, dev in enumerate(disks):
2153       if idx not in self.disks:
2154         continue
2155
2156       self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2157
2158       self.cfg.SetDiskID(dev, node_name)
2159
2160       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2161       names = _GenerateUniqueNames(self.lu, lv_names)
2162
2163       (data_disk, meta_disk) = dev.children
2164       vg_data = data_disk.logical_id[0]
2165       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2166                              logical_id=(vg_data, names[0]),
2167                              params=data_disk.params)
2168       vg_meta = meta_disk.logical_id[0]
2169       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2170                              size=constants.DRBD_META_SIZE,
2171                              logical_id=(vg_meta, names[1]),
2172                              params=meta_disk.params)
2173
2174       new_lvs = [lv_data, lv_meta]
2175       old_lvs = [child.Copy() for child in dev.children]
2176       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2177       excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2178
2179       # we pass force_create=True to force the LVM creation
2180       for new_lv in new_lvs:
2181         try:
2182           _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2183                                GetInstanceInfoText(self.instance), False,
2184                                excl_stor)
2185         except errors.DeviceCreationError, e:
2186           raise errors.OpExecError("Can't create block device: %s" % e.message)
2187
2188     return iv_names
2189
2190   def _CheckDevices(self, node_name, iv_names):
2191     for name, (dev, _, _) in iv_names.iteritems():
2192       self.cfg.SetDiskID(dev, node_name)
2193
2194       result = _BlockdevFind(self, node_name, dev, self.instance)
2195
2196       msg = result.fail_msg
2197       if msg or not result.payload:
2198         if not msg:
2199           msg = "disk not found"
2200         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2201                                  (name, msg))
2202
2203       if result.payload.is_degraded:
2204         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2205
2206   def _RemoveOldStorage(self, node_name, iv_names):
2207     for name, (_, old_lvs, _) in iv_names.iteritems():
2208       self.lu.LogInfo("Remove logical volumes for %s", name)
2209
2210       for lv in old_lvs:
2211         self.cfg.SetDiskID(lv, node_name)
2212
2213         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2214         if msg:
2215           self.lu.LogWarning("Can't remove old LV: %s", msg,
2216                              hint="remove unused LVs manually")
2217
2218   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2219     """Replace a disk on the primary or secondary for DRBD 8.
2220
2221     The algorithm for replace is quite complicated:
2222
2223       1. for each disk to be replaced:
2224
2225         1. create new LVs on the target node with unique names
2226         1. detach old LVs from the drbd device
2227         1. rename old LVs to name_replaced.<time_t>
2228         1. rename new LVs to old LVs
2229         1. attach the new LVs (with the old names now) to the drbd device
2230
2231       1. wait for sync across all devices
2232
2233       1. for each modified disk:
2234
2235         1. remove old LVs (which have the name name_replaces.<time_t>)
2236
2237     Failures are not very well handled.
2238
2239     """
2240     steps_total = 6
2241
2242     # Step: check device activation
2243     self.lu.LogStep(1, steps_total, "Check device existence")
2244     self._CheckDisksExistence([self.other_node, self.target_node])
2245     self._CheckVolumeGroup([self.target_node, self.other_node])
2246
2247     # Step: check other node consistency
2248     self.lu.LogStep(2, steps_total, "Check peer consistency")
2249     self._CheckDisksConsistency(self.other_node,
2250                                 self.other_node == self.instance.primary_node,
2251                                 False)
2252
2253     # Step: create new storage
2254     self.lu.LogStep(3, steps_total, "Allocate new storage")
2255     iv_names = self._CreateNewStorage(self.target_node)
2256
2257     # Step: for each lv, detach+rename*2+attach
2258     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2259     for dev, old_lvs, new_lvs in iv_names.itervalues():
2260       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2261
2262       result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2263                                                      old_lvs)
2264       result.Raise("Can't detach drbd from local storage on node"
2265                    " %s for device %s" % (self.target_node, dev.iv_name))
2266       #dev.children = []
2267       #cfg.Update(instance)
2268
2269       # ok, we created the new LVs, so now we know we have the needed
2270       # storage; as such, we proceed on the target node to rename
2271       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2272       # using the assumption that logical_id == physical_id (which in
2273       # turn is the unique_id on that node)
2274
2275       # FIXME(iustin): use a better name for the replaced LVs
2276       temp_suffix = int(time.time())
2277       ren_fn = lambda d, suff: (d.physical_id[0],
2278                                 d.physical_id[1] + "_replaced-%s" % suff)
2279
2280       # Build the rename list based on what LVs exist on the node
2281       rename_old_to_new = []
2282       for to_ren in old_lvs:
2283         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2284         if not result.fail_msg and result.payload:
2285           # device exists
2286           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2287
2288       self.lu.LogInfo("Renaming the old LVs on the target node")
2289       result = self.rpc.call_blockdev_rename(self.target_node,
2290                                              rename_old_to_new)
2291       result.Raise("Can't rename old LVs on node %s" % self.target_node)
2292
2293       # Now we rename the new LVs to the old LVs
2294       self.lu.LogInfo("Renaming the new LVs on the target node")
2295       rename_new_to_old = [(new, old.physical_id)
2296                            for old, new in zip(old_lvs, new_lvs)]
2297       result = self.rpc.call_blockdev_rename(self.target_node,
2298                                              rename_new_to_old)
2299       result.Raise("Can't rename new LVs on node %s" % self.target_node)
2300
2301       # Intermediate steps of in memory modifications
2302       for old, new in zip(old_lvs, new_lvs):
2303         new.logical_id = old.logical_id
2304         self.cfg.SetDiskID(new, self.target_node)
2305
2306       # We need to modify old_lvs so that removal later removes the
2307       # right LVs, not the newly added ones; note that old_lvs is a
2308       # copy here
2309       for disk in old_lvs:
2310         disk.logical_id = ren_fn(disk, temp_suffix)
2311         self.cfg.SetDiskID(disk, self.target_node)
2312
2313       # Now that the new lvs have the old name, we can add them to the device
2314       self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2315       result = self.rpc.call_blockdev_addchildren(self.target_node,
2316                                                   (dev, self.instance), new_lvs)
2317       msg = result.fail_msg
2318       if msg:
2319         for new_lv in new_lvs:
2320           msg2 = self.rpc.call_blockdev_remove(self.target_node,
2321                                                new_lv).fail_msg
2322           if msg2:
2323             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2324                                hint=("cleanup manually the unused logical"
2325                                      "volumes"))
2326         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2327
2328     cstep = itertools.count(5)
2329
2330     if self.early_release:
2331       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2332       self._RemoveOldStorage(self.target_node, iv_names)
2333       # TODO: Check if releasing locks early still makes sense
2334       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2335     else:
2336       # Release all resource locks except those used by the instance
2337       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2338                    keep=self.node_secondary_ip.keys())
2339
2340     # Release all node locks while waiting for sync
2341     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2342
2343     # TODO: Can the instance lock be downgraded here? Take the optional disk
2344     # shutdown in the caller into consideration.
2345
2346     # Wait for sync
2347     # This can fail as the old devices are degraded and _WaitForSync
2348     # does a combined result over all disks, so we don't check its return value
2349     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2350     WaitForSync(self.lu, self.instance)
2351
2352     # Check all devices manually
2353     self._CheckDevices(self.instance.primary_node, iv_names)
2354
2355     # Step: remove old storage
2356     if not self.early_release:
2357       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2358       self._RemoveOldStorage(self.target_node, iv_names)
2359
2360   def _ExecDrbd8Secondary(self, feedback_fn):
2361     """Replace the secondary node for DRBD 8.
2362
2363     The algorithm for replace is quite complicated:
2364       - for all disks of the instance:
2365         - create new LVs on the new node with same names
2366         - shutdown the drbd device on the old secondary
2367         - disconnect the drbd network on the primary
2368         - create the drbd device on the new secondary
2369         - network attach the drbd on the primary, using an artifice:
2370           the drbd code for Attach() will connect to the network if it
2371           finds a device which is connected to the good local disks but
2372           not network enabled
2373       - wait for sync across all devices
2374       - remove all disks from the old secondary
2375
2376     Failures are not very well handled.
2377
2378     """
2379     steps_total = 6
2380
2381     pnode = self.instance.primary_node
2382
2383     # Step: check device activation
2384     self.lu.LogStep(1, steps_total, "Check device existence")
2385     self._CheckDisksExistence([self.instance.primary_node])
2386     self._CheckVolumeGroup([self.instance.primary_node])
2387
2388     # Step: check other node consistency
2389     self.lu.LogStep(2, steps_total, "Check peer consistency")
2390     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2391
2392     # Step: create new storage
2393     self.lu.LogStep(3, steps_total, "Allocate new storage")
2394     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2395     excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2396     for idx, dev in enumerate(disks):
2397       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2398                       (self.new_node, idx))
2399       # we pass force_create=True to force LVM creation
2400       for new_lv in dev.children:
2401         try:
2402           _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2403                                True, GetInstanceInfoText(self.instance), False,
2404                                excl_stor)
2405         except errors.DeviceCreationError, e:
2406           raise errors.OpExecError("Can't create block device: %s" % e.message)
2407
2408     # Step 4: dbrd minors and drbd setups changes
2409     # after this, we must manually remove the drbd minors on both the
2410     # error and the success paths
2411     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2412     minors = self.cfg.AllocateDRBDMinor([self.new_node
2413                                          for dev in self.instance.disks],
2414                                         self.instance.name)
2415     logging.debug("Allocated minors %r", minors)
2416
2417     iv_names = {}
2418     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2419       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2420                       (self.new_node, idx))
2421       # create new devices on new_node; note that we create two IDs:
2422       # one without port, so the drbd will be activated without
2423       # networking information on the new node at this stage, and one
2424       # with network, for the latter activation in step 4
2425       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2426       if self.instance.primary_node == o_node1:
2427         p_minor = o_minor1
2428       else:
2429         assert self.instance.primary_node == o_node2, "Three-node instance?"
2430         p_minor = o_minor2
2431
2432       new_alone_id = (self.instance.primary_node, self.new_node, None,
2433                       p_minor, new_minor, o_secret)
2434       new_net_id = (self.instance.primary_node, self.new_node, o_port,
2435                     p_minor, new_minor, o_secret)
2436
2437       iv_names[idx] = (dev, dev.children, new_net_id)
2438       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2439                     new_net_id)
2440       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2441                               logical_id=new_alone_id,
2442                               children=dev.children,
2443                               size=dev.size,
2444                               params={})
2445       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2446                                             self.cfg)
2447       try:
2448         CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2449                              anno_new_drbd,
2450                              GetInstanceInfoText(self.instance), False,
2451                              excl_stor)
2452       except errors.GenericError:
2453         self.cfg.ReleaseDRBDMinors(self.instance.name)
2454         raise
2455
2456     # We have new devices, shutdown the drbd on the old secondary
2457     for idx, dev in enumerate(self.instance.disks):
2458       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2459       self.cfg.SetDiskID(dev, self.target_node)
2460       msg = self.rpc.call_blockdev_shutdown(self.target_node,
2461                                             (dev, self.instance)).fail_msg
2462       if msg:
2463         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2464                            "node: %s" % (idx, msg),
2465                            hint=("Please cleanup this device manually as"
2466                                  " soon as possible"))
2467
2468     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2469     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2470                                                self.instance.disks)[pnode]
2471
2472     msg = result.fail_msg
2473     if msg:
2474       # detaches didn't succeed (unlikely)
2475       self.cfg.ReleaseDRBDMinors(self.instance.name)
2476       raise errors.OpExecError("Can't detach the disks from the network on"
2477                                " old node: %s" % (msg,))
2478
2479     # if we managed to detach at least one, we update all the disks of
2480     # the instance to point to the new secondary
2481     self.lu.LogInfo("Updating instance configuration")
2482     for dev, _, new_logical_id in iv_names.itervalues():
2483       dev.logical_id = new_logical_id
2484       self.cfg.SetDiskID(dev, self.instance.primary_node)
2485
2486     self.cfg.Update(self.instance, feedback_fn)
2487
2488     # Release all node locks (the configuration has been updated)
2489     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2490
2491     # and now perform the drbd attach
2492     self.lu.LogInfo("Attaching primary drbds to new secondary"
2493                     " (standalone => connected)")
2494     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2495                                             self.new_node],
2496                                            self.node_secondary_ip,
2497                                            (self.instance.disks, self.instance),
2498                                            self.instance.name,
2499                                            False)
2500     for to_node, to_result in result.items():
2501       msg = to_result.fail_msg
2502       if msg:
2503         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2504                            to_node, msg,
2505                            hint=("please do a gnt-instance info to see the"
2506                                  " status of disks"))
2507
2508     cstep = itertools.count(5)
2509
2510     if self.early_release:
2511       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2512       self._RemoveOldStorage(self.target_node, iv_names)
2513       # TODO: Check if releasing locks early still makes sense
2514       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2515     else:
2516       # Release all resource locks except those used by the instance
2517       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2518                    keep=self.node_secondary_ip.keys())
2519
2520     # TODO: Can the instance lock be downgraded here? Take the optional disk
2521     # shutdown in the caller into consideration.
2522
2523     # Wait for sync
2524     # This can fail as the old devices are degraded and _WaitForSync
2525     # does a combined result over all disks, so we don't check its return value
2526     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2527     WaitForSync(self.lu, self.instance)
2528
2529     # Check all devices manually
2530     self._CheckDevices(self.instance.primary_node, iv_names)
2531
2532     # Step: remove old storage
2533     if not self.early_release:
2534       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2535       self._RemoveOldStorage(self.target_node, iv_names)