Merge branch 'stable-2.8'
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node)
93   result = lu.rpc.call_blockdev_create(node, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device, node, instance.name))
98   if device.physical_id is None:
99     device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103                          info, force_open, excl_stor):
104   """Create a tree of block devices on a given node.
105
106   If this device type has to be created on secondaries, create it and
107   all its children.
108
109   If not, just recurse to children keeping the same 'force' value.
110
111   @attention: The device has to be annotated already.
112
113   @param lu: the lu on whose behalf we execute
114   @param node: the node on which to create the device
115   @type instance: L{objects.Instance}
116   @param instance: the instance which owns the device
117   @type device: L{objects.Disk}
118   @param device: the device to create
119   @type force_create: boolean
120   @param force_create: whether to force creation of this device; this
121       will be change to True whenever we find a device which has
122       CreateOnSecondary() attribute
123   @param info: the extra 'metadata' we should attach to the device
124       (this will be represented as a LVM tag)
125   @type force_open: boolean
126   @param force_open: this parameter will be passes to the
127       L{backend.BlockdevCreate} function where it specifies
128       whether we run on primary or not, and it affects both
129       the child assembly and the device own Open() execution
130   @type excl_stor: boolean
131   @param excl_stor: Whether exclusive_storage is active for the node
132
133   @return: list of created devices
134   """
135   created_devices = []
136   try:
137     if device.CreateOnSecondary():
138       force_create = True
139
140     if device.children:
141       for child in device.children:
142         devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143                                     info, force_open, excl_stor)
144         created_devices.extend(devs)
145
146     if not force_create:
147       return created_devices
148
149     CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150                          excl_stor)
151     # The device has been completely created, so there is no point in keeping
152     # its subdevices in the list. We just add the device itself instead.
153     created_devices = [(node, device)]
154     return created_devices
155
156   except errors.DeviceCreationError, e:
157     e.created_devices.extend(created_devices)
158     raise e
159   except errors.OpExecError, e:
160     raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164   """Whether exclusive_storage is in effect for the given node.
165
166   @type cfg: L{config.ConfigWriter}
167   @param cfg: The cluster configuration
168   @type nodename: string
169   @param nodename: The node
170   @rtype: bool
171   @return: The effective value of exclusive_storage
172   @raise errors.OpPrereqError: if no node exists with the given name
173
174   """
175   ni = cfg.GetNodeInfo(nodename)
176   if ni is None:
177     raise errors.OpPrereqError("Invalid node name %s" % nodename,
178                                errors.ECODE_NOENT)
179   return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
183                     force_open):
184   """Wrapper around L{_CreateBlockDevInner}.
185
186   This method annotates the root device first.
187
188   """
189   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190   excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192                               force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created):
196   """Undo the work performed by L{CreateDisks}.
197
198   This function is called in case of an error to undo the work of
199   L{CreateDisks}.
200
201   @type lu: L{LogicalUnit}
202   @param lu: the logical unit on whose behalf we execute
203   @param disks_created: the result returned by L{CreateDisks}
204
205   """
206   for (node, disk) in disks_created:
207     lu.cfg.SetDiskID(disk, node)
208     result = lu.rpc.call_blockdev_remove(node, disk)
209     if result.fail_msg:
210       logging.warning("Failed to remove newly-created disk %s on node %s:"
211                       " %s", disk, node, result.fail_msg)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215   """Create all disks for an instance.
216
217   This abstracts away some work from AddInstance.
218
219   @type lu: L{LogicalUnit}
220   @param lu: the logical unit on whose behalf we execute
221   @type instance: L{objects.Instance}
222   @param instance: the instance whose disks we should create
223   @type to_skip: list
224   @param to_skip: list of indices to skip
225   @type target_node: string
226   @param target_node: if passed, overrides the target node for creation
227   @type disks: list of {objects.Disk}
228   @param disks: the disks to create; if not specified, all the disks of the
229       instance are created
230   @return: information about the created disks, to be used to call
231       L{_UndoCreateDisks}
232   @raise errors.OpPrereqError: in case of error
233
234   """
235   info = GetInstanceInfoText(instance)
236   if target_node is None:
237     pnode = instance.primary_node
238     all_nodes = instance.all_nodes
239   else:
240     pnode = target_node
241     all_nodes = [pnode]
242
243   if disks is None:
244     disks = instance.disks
245
246   if instance.disk_template in constants.DTS_FILEBASED:
247     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249
250     result.Raise("Failed to create directory '%s' on"
251                  " node %s" % (file_storage_dir, pnode))
252
253   disks_created = []
254   for idx, device in enumerate(disks):
255     if to_skip and idx in to_skip:
256       continue
257     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258     for node in all_nodes:
259       f_create = node == pnode
260       try:
261         _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262         disks_created.append((node, device))
263       except errors.DeviceCreationError, e:
264         logging.warning("Creating disk %s for instance '%s' failed",
265                         idx, instance.name)
266         disks_created.extend(e.created_devices)
267         _UndoCreateDisks(lu, disks_created)
268         raise errors.OpExecError(e.message)
269   return disks_created
270
271
272 def ComputeDiskSizePerVG(disk_template, disks):
273   """Compute disk size requirements in the volume group
274
275   """
276   def _compute(disks, payload):
277     """Universal algorithm.
278
279     """
280     vgs = {}
281     for disk in disks:
282       vgs[disk[constants.IDISK_VG]] = \
283         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284
285     return vgs
286
287   # Required free disk space as a function of disk and swap space
288   req_size_dict = {
289     constants.DT_DISKLESS: {},
290     constants.DT_PLAIN: _compute(disks, 0),
291     # 128 MB are added for drbd metadata for each disk
292     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293     constants.DT_FILE: {},
294     constants.DT_SHARED_FILE: {},
295     }
296
297   if disk_template not in req_size_dict:
298     raise errors.ProgrammerError("Disk template '%s' size requirement"
299                                  " is unknown" % disk_template)
300
301   return req_size_dict[disk_template]
302
303
304 def ComputeDisks(op, default_vg):
305   """Computes the instance disks.
306
307   @param op: The instance opcode
308   @param default_vg: The default_vg to assume
309
310   @return: The computed disks
311
312   """
313   disks = []
314   for disk in op.disks:
315     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316     if mode not in constants.DISK_ACCESS_SET:
317       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318                                  mode, errors.ECODE_INVAL)
319     size = disk.get(constants.IDISK_SIZE, None)
320     if size is None:
321       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322     try:
323       size = int(size)
324     except (TypeError, ValueError):
325       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326                                  errors.ECODE_INVAL)
327
328     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329     if ext_provider and op.disk_template != constants.DT_EXT:
330       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331                                  " disk template, not %s" %
332                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
333                                   op.disk_template), errors.ECODE_INVAL)
334
335     data_vg = disk.get(constants.IDISK_VG, default_vg)
336     name = disk.get(constants.IDISK_NAME, None)
337     if name is not None and name.lower() == constants.VALUE_NONE:
338       name = None
339     new_disk = {
340       constants.IDISK_SIZE: size,
341       constants.IDISK_MODE: mode,
342       constants.IDISK_VG: data_vg,
343       constants.IDISK_NAME: name,
344       }
345
346     for key in [
347       constants.IDISK_METAVG,
348       constants.IDISK_ADOPT,
349       constants.IDISK_SPINDLES,
350       ]:
351       if key in disk:
352         new_disk[key] = disk[key]
353
354     # For extstorage, demand the `provider' option and add any
355     # additional parameters (ext-params) to the dict
356     if op.disk_template == constants.DT_EXT:
357       if ext_provider:
358         new_disk[constants.IDISK_PROVIDER] = ext_provider
359         for key in disk:
360           if key not in constants.IDISK_PARAMS:
361             new_disk[key] = disk[key]
362       else:
363         raise errors.OpPrereqError("Missing provider for template '%s'" %
364                                    constants.DT_EXT, errors.ECODE_INVAL)
365
366     disks.append(new_disk)
367
368   return disks
369
370
371 def CheckRADOSFreeSpace():
372   """Compute disk size requirements inside the RADOS cluster.
373
374   """
375   # For the RADOS cluster we assume there is always enough space.
376   pass
377
378
379 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380                          iv_name, p_minor, s_minor):
381   """Generate a drbd8 device complete with its children.
382
383   """
384   assert len(vgnames) == len(names) == 2
385   port = lu.cfg.AllocatePort()
386   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387
388   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389                           logical_id=(vgnames[0], names[0]),
390                           params={})
391   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392   dev_meta = objects.Disk(dev_type=constants.LD_LV,
393                           size=constants.DRBD_META_SIZE,
394                           logical_id=(vgnames[1], names[1]),
395                           params={})
396   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398                           logical_id=(primary, secondary, port,
399                                       p_minor, s_minor,
400                                       shared_secret),
401                           children=[dev_data, dev_meta],
402                           iv_name=iv_name, params={})
403   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404   return drbd_dev
405
406
407 def GenerateDiskTemplate(
408   lu, template_name, instance_name, primary_node, secondary_nodes,
409   disk_info, file_storage_dir, file_driver, base_index,
410   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411   _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412   """Generate the entire disk layout for a given template type.
413
414   """
415   vgname = lu.cfg.GetVGName()
416   disk_count = len(disk_info)
417   disks = []
418
419   if template_name == constants.DT_DISKLESS:
420     pass
421   elif template_name == constants.DT_DRBD8:
422     if len(secondary_nodes) != 1:
423       raise errors.ProgrammerError("Wrong template configuration")
424     remote_node = secondary_nodes[0]
425     minors = lu.cfg.AllocateDRBDMinor(
426       [primary_node, remote_node] * len(disk_info), instance_name)
427
428     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
429                                                        full_disk_params)
430     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431
432     names = []
433     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434                                                for i in range(disk_count)]):
435       names.append(lv_prefix + "_data")
436       names.append(lv_prefix + "_meta")
437     for idx, disk in enumerate(disk_info):
438       disk_index = idx + base_index
439       data_vg = disk.get(constants.IDISK_VG, vgname)
440       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442                                       disk[constants.IDISK_SIZE],
443                                       [data_vg, meta_vg],
444                                       names[idx * 2:idx * 2 + 2],
445                                       "disk/%d" % disk_index,
446                                       minors[idx * 2], minors[idx * 2 + 1])
447       disk_dev.mode = disk[constants.IDISK_MODE]
448       disk_dev.name = disk.get(constants.IDISK_NAME, None)
449       disks.append(disk_dev)
450   else:
451     if secondary_nodes:
452       raise errors.ProgrammerError("Wrong template configuration")
453
454     if template_name == constants.DT_FILE:
455       _req_file_storage()
456     elif template_name == constants.DT_SHARED_FILE:
457       _req_shr_file_storage()
458
459     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460     if name_prefix is None:
461       names = None
462     else:
463       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464                                         (name_prefix, base_index + i)
465                                         for i in range(disk_count)])
466
467     if template_name == constants.DT_PLAIN:
468
469       def logical_id_fn(idx, _, disk):
470         vg = disk.get(constants.IDISK_VG, vgname)
471         return (vg, names[idx])
472
473     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474       logical_id_fn = \
475         lambda _, disk_index, disk: (file_driver,
476                                      "%s/disk%d" % (file_storage_dir,
477                                                     disk_index))
478     elif template_name == constants.DT_BLOCK:
479       logical_id_fn = \
480         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481                                        disk[constants.IDISK_ADOPT])
482     elif template_name == constants.DT_RBD:
483       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484     elif template_name == constants.DT_EXT:
485       def logical_id_fn(idx, _, disk):
486         provider = disk.get(constants.IDISK_PROVIDER, None)
487         if provider is None:
488           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489                                        " not found", constants.DT_EXT,
490                                        constants.IDISK_PROVIDER)
491         return (provider, names[idx])
492     else:
493       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494
495     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
496
497     for idx, disk in enumerate(disk_info):
498       params = {}
499       # Only for the Ext template add disk_info to params
500       if template_name == constants.DT_EXT:
501         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502         for key in disk:
503           if key not in constants.IDISK_PARAMS:
504             params[key] = disk[key]
505       disk_index = idx + base_index
506       size = disk[constants.IDISK_SIZE]
507       feedback_fn("* disk %s, size %s" %
508                   (disk_index, utils.FormatUnit(size, "h")))
509       disk_dev = objects.Disk(dev_type=dev_type, size=size,
510                               logical_id=logical_id_fn(idx, disk_index, disk),
511                               iv_name="disk/%d" % disk_index,
512                               mode=disk[constants.IDISK_MODE],
513                               params=params,
514                               spindles=disk.get(constants.IDISK_SPINDLES))
515       disk_dev.name = disk.get(constants.IDISK_NAME, None)
516       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517       disks.append(disk_dev)
518
519   return disks
520
521
522 def CheckSpindlesExclusiveStorage(diskdict, es_flag):
523   """Check the presence of the spindle options with exclusive_storage.
524
525   @type diskdict: dict
526   @param diskdict: disk parameters
527   @type es_flag: bool
528   @param es_flag: the effective value of the exlusive_storage flag
529   @raise errors.OpPrereqError when spindles are given and they should not
530
531   """
532   if (not es_flag and constants.IDISK_SPINDLES in diskdict and
533       diskdict[constants.IDISK_SPINDLES] is not None):
534     raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
535                                " when exclusive storage is not active",
536                                errors.ECODE_INVAL)
537
538
539 class LUInstanceRecreateDisks(LogicalUnit):
540   """Recreate an instance's missing disks.
541
542   """
543   HPATH = "instance-recreate-disks"
544   HTYPE = constants.HTYPE_INSTANCE
545   REQ_BGL = False
546
547   _MODIFYABLE = compat.UniqueFrozenset([
548     constants.IDISK_SIZE,
549     constants.IDISK_MODE,
550     constants.IDISK_SPINDLES,
551     ])
552
553   # New or changed disk parameters may have different semantics
554   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555     constants.IDISK_ADOPT,
556
557     # TODO: Implement support changing VG while recreating
558     constants.IDISK_VG,
559     constants.IDISK_METAVG,
560     constants.IDISK_PROVIDER,
561     constants.IDISK_NAME,
562     ]))
563
564   def _RunAllocator(self):
565     """Run the allocator based on input opcode.
566
567     """
568     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569
570     # FIXME
571     # The allocator should actually run in "relocate" mode, but current
572     # allocators don't support relocating all the nodes of an instance at
573     # the same time. As a workaround we use "allocate" mode, but this is
574     # suboptimal for two reasons:
575     # - The instance name passed to the allocator is present in the list of
576     #   existing instances, so there could be a conflict within the
577     #   internal structures of the allocator. This doesn't happen with the
578     #   current allocators, but it's a liability.
579     # - The allocator counts the resources used by the instance twice: once
580     #   because the instance exists already, and once because it tries to
581     #   allocate a new instance.
582     # The allocator could choose some of the nodes on which the instance is
583     # running, but that's not a problem. If the instance nodes are broken,
584     # they should be already be marked as drained or offline, and hence
585     # skipped by the allocator. If instance disks have been lost for other
586     # reasons, then recreating the disks on the same nodes should be fine.
587     disk_template = self.instance.disk_template
588     spindle_use = be_full[constants.BE_SPINDLE_USE]
589     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
590                                         disk_template=disk_template,
591                                         tags=list(self.instance.GetTags()),
592                                         os=self.instance.os,
593                                         nics=[{}],
594                                         vcpus=be_full[constants.BE_VCPUS],
595                                         memory=be_full[constants.BE_MAXMEM],
596                                         spindle_use=spindle_use,
597                                         disks=[{constants.IDISK_SIZE: d.size,
598                                                 constants.IDISK_MODE: d.mode}
599                                                for d in self.instance.disks],
600                                         hypervisor=self.instance.hypervisor,
601                                         node_whitelist=None)
602     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
603
604     ial.Run(self.op.iallocator)
605
606     assert req.RequiredNodes() == len(self.instance.all_nodes)
607
608     if not ial.success:
609       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
610                                  " %s" % (self.op.iallocator, ial.info),
611                                  errors.ECODE_NORES)
612
613     self.op.nodes = ial.result
614     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
615                  self.op.instance_name, self.op.iallocator,
616                  utils.CommaJoin(ial.result))
617
618   def CheckArguments(self):
619     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
620       # Normalize and convert deprecated list of disk indices
621       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
622
623     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
624     if duplicates:
625       raise errors.OpPrereqError("Some disks have been specified more than"
626                                  " once: %s" % utils.CommaJoin(duplicates),
627                                  errors.ECODE_INVAL)
628
629     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
630     # when neither iallocator nor nodes are specified
631     if self.op.iallocator or self.op.nodes:
632       CheckIAllocatorOrNode(self, "iallocator", "nodes")
633
634     for (idx, params) in self.op.disks:
635       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
636       unsupported = frozenset(params.keys()) - self._MODIFYABLE
637       if unsupported:
638         raise errors.OpPrereqError("Parameters for disk %s try to change"
639                                    " unmodifyable parameter(s): %s" %
640                                    (idx, utils.CommaJoin(unsupported)),
641                                    errors.ECODE_INVAL)
642
643   def ExpandNames(self):
644     self._ExpandAndLockInstance()
645     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
646
647     if self.op.nodes:
648       self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
649       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
650     else:
651       self.needed_locks[locking.LEVEL_NODE] = []
652       if self.op.iallocator:
653         # iallocator will select a new node in the same group
654         self.needed_locks[locking.LEVEL_NODEGROUP] = []
655         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
656
657     self.needed_locks[locking.LEVEL_NODE_RES] = []
658
659   def DeclareLocks(self, level):
660     if level == locking.LEVEL_NODEGROUP:
661       assert self.op.iallocator is not None
662       assert not self.op.nodes
663       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
664       self.share_locks[locking.LEVEL_NODEGROUP] = 1
665       # Lock the primary group used by the instance optimistically; this
666       # requires going via the node before it's locked, requiring
667       # verification later on
668       self.needed_locks[locking.LEVEL_NODEGROUP] = \
669         self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
670
671     elif level == locking.LEVEL_NODE:
672       # If an allocator is used, then we lock all the nodes in the current
673       # instance group, as we don't know yet which ones will be selected;
674       # if we replace the nodes without using an allocator, locks are
675       # already declared in ExpandNames; otherwise, we need to lock all the
676       # instance nodes for disk re-creation
677       if self.op.iallocator:
678         assert not self.op.nodes
679         assert not self.needed_locks[locking.LEVEL_NODE]
680         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
681
682         # Lock member nodes of the group of the primary node
683         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
684           self.needed_locks[locking.LEVEL_NODE].extend(
685             self.cfg.GetNodeGroup(group_uuid).members)
686
687         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
688       elif not self.op.nodes:
689         self._LockInstancesNodes(primary_only=False)
690     elif level == locking.LEVEL_NODE_RES:
691       # Copy node locks
692       self.needed_locks[locking.LEVEL_NODE_RES] = \
693         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
694
695   def BuildHooksEnv(self):
696     """Build hooks env.
697
698     This runs on master, primary and secondary nodes of the instance.
699
700     """
701     return BuildInstanceHookEnvByObject(self, self.instance)
702
703   def BuildHooksNodes(self):
704     """Build hooks nodes.
705
706     """
707     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
708     return (nl, nl)
709
710   def CheckPrereq(self):
711     """Check prerequisites.
712
713     This checks that the instance is in the cluster and is not running.
714
715     """
716     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
717     assert instance is not None, \
718       "Cannot retrieve locked instance %s" % self.op.instance_name
719     if self.op.nodes:
720       if len(self.op.nodes) != len(instance.all_nodes):
721         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
722                                    " %d replacement nodes were specified" %
723                                    (instance.name, len(instance.all_nodes),
724                                     len(self.op.nodes)),
725                                    errors.ECODE_INVAL)
726       assert instance.disk_template != constants.DT_DRBD8 or \
727              len(self.op.nodes) == 2
728       assert instance.disk_template != constants.DT_PLAIN or \
729              len(self.op.nodes) == 1
730       primary_node = self.op.nodes[0]
731     else:
732       primary_node = instance.primary_node
733     if not self.op.iallocator:
734       CheckNodeOnline(self, primary_node)
735
736     if instance.disk_template == constants.DT_DISKLESS:
737       raise errors.OpPrereqError("Instance '%s' has no disks" %
738                                  self.op.instance_name, errors.ECODE_INVAL)
739
740     # Verify if node group locks are still correct
741     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
742     if owned_groups:
743       # Node group locks are acquired only for the primary node (and only
744       # when the allocator is used)
745       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
746                               primary_only=True)
747
748     # if we replace nodes *and* the old primary is offline, we don't
749     # check the instance state
750     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
751     if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
752       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
753                          msg="cannot recreate disks")
754
755     if self.op.disks:
756       self.disks = dict(self.op.disks)
757     else:
758       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
759
760     maxidx = max(self.disks.keys())
761     if maxidx >= len(instance.disks):
762       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
763                                  errors.ECODE_INVAL)
764
765     if ((self.op.nodes or self.op.iallocator) and
766          sorted(self.disks.keys()) != range(len(instance.disks))):
767       raise errors.OpPrereqError("Can't recreate disks partially and"
768                                  " change the nodes at the same time",
769                                  errors.ECODE_INVAL)
770
771     self.instance = instance
772
773     if self.op.iallocator:
774       self._RunAllocator()
775       # Release unneeded node and node resource locks
776       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
777       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
778       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
779
780     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
781
782     if self.op.nodes:
783       nodes = self.op.nodes
784     else:
785       nodes = instance.all_nodes
786     excl_stor = compat.any(
787       rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
788       )
789     for new_params in self.disks.values():
790       CheckSpindlesExclusiveStorage(new_params, excl_stor)
791
792   def Exec(self, feedback_fn):
793     """Recreate the disks.
794
795     """
796     instance = self.instance
797
798     assert (self.owned_locks(locking.LEVEL_NODE) ==
799             self.owned_locks(locking.LEVEL_NODE_RES))
800
801     to_skip = []
802     mods = [] # keeps track of needed changes
803
804     for idx, disk in enumerate(instance.disks):
805       try:
806         changes = self.disks[idx]
807       except KeyError:
808         # Disk should not be recreated
809         to_skip.append(idx)
810         continue
811
812       # update secondaries for disks, if needed
813       if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
814         # need to update the nodes and minors
815         assert len(self.op.nodes) == 2
816         assert len(disk.logical_id) == 6 # otherwise disk internals
817                                          # have changed
818         (_, _, old_port, _, _, old_secret) = disk.logical_id
819         new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
820         new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
821                   new_minors[0], new_minors[1], old_secret)
822         assert len(disk.logical_id) == len(new_id)
823       else:
824         new_id = None
825
826       mods.append((idx, new_id, changes))
827
828     # now that we have passed all asserts above, we can apply the mods
829     # in a single run (to avoid partial changes)
830     for idx, new_id, changes in mods:
831       disk = instance.disks[idx]
832       if new_id is not None:
833         assert disk.dev_type == constants.LD_DRBD8
834         disk.logical_id = new_id
835       if changes:
836         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
837                     mode=changes.get(constants.IDISK_MODE, None),
838                     spindles=changes.get(constants.IDISK_SPINDLES, None))
839
840     # change primary node, if needed
841     if self.op.nodes:
842       instance.primary_node = self.op.nodes[0]
843       self.LogWarning("Changing the instance's nodes, you will have to"
844                       " remove any disks left on the older nodes manually")
845
846     if self.op.nodes:
847       self.cfg.Update(instance, feedback_fn)
848
849     # All touched nodes must be locked
850     mylocks = self.owned_locks(locking.LEVEL_NODE)
851     assert mylocks.issuperset(frozenset(instance.all_nodes))
852     new_disks = CreateDisks(self, instance, to_skip=to_skip)
853
854     # TODO: Release node locks before wiping, or explain why it's not possible
855     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
856       wipedisks = [(idx, disk, 0)
857                    for (idx, disk) in enumerate(instance.disks)
858                    if idx not in to_skip]
859       WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
860
861
862 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
863   """Checks if nodes have enough free disk space in the specified VG.
864
865   This function checks if all given nodes have the needed amount of
866   free disk. In case any node has less disk or we cannot get the
867   information from the node, this function raises an OpPrereqError
868   exception.
869
870   @type lu: C{LogicalUnit}
871   @param lu: a logical unit from which we get configuration data
872   @type nodenames: C{list}
873   @param nodenames: the list of node names to check
874   @type vg: C{str}
875   @param vg: the volume group to check
876   @type requested: C{int}
877   @param requested: the amount of disk in MiB to check for
878   @raise errors.OpPrereqError: if the node doesn't have enough disk,
879       or we cannot check the node
880
881   """
882   es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
883   # FIXME: This maps everything to storage type 'lvm-vg' to maintain
884   # the current functionality. Refactor to make it more flexible.
885   nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
886                                    es_flags)
887   for node in nodenames:
888     info = nodeinfo[node]
889     info.Raise("Cannot get current information from node %s" % node,
890                prereq=True, ecode=errors.ECODE_ENVIRON)
891     (_, (vg_info, ), _) = info.payload
892     vg_free = vg_info.get("vg_free", None)
893     if not isinstance(vg_free, int):
894       raise errors.OpPrereqError("Can't compute free disk space on node"
895                                  " %s for vg %s, result was '%s'" %
896                                  (node, vg, vg_free), errors.ECODE_ENVIRON)
897     if requested > vg_free:
898       raise errors.OpPrereqError("Not enough disk space on target node %s"
899                                  " vg %s: required %d MiB, available %d MiB" %
900                                  (node, vg, requested, vg_free),
901                                  errors.ECODE_NORES)
902
903
904 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
905   """Checks if nodes have enough free disk space in all the VGs.
906
907   This function checks if all given nodes have the needed amount of
908   free disk. In case any node has less disk or we cannot get the
909   information from the node, this function raises an OpPrereqError
910   exception.
911
912   @type lu: C{LogicalUnit}
913   @param lu: a logical unit from which we get configuration data
914   @type nodenames: C{list}
915   @param nodenames: the list of node names to check
916   @type req_sizes: C{dict}
917   @param req_sizes: the hash of vg and corresponding amount of disk in
918       MiB to check for
919   @raise errors.OpPrereqError: if the node doesn't have enough disk,
920       or we cannot check the node
921
922   """
923   for vg, req_size in req_sizes.items():
924     _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
925
926
927 def _DiskSizeInBytesToMebibytes(lu, size):
928   """Converts a disk size in bytes to mebibytes.
929
930   Warns and rounds up if the size isn't an even multiple of 1 MiB.
931
932   """
933   (mib, remainder) = divmod(size, 1024 * 1024)
934
935   if remainder != 0:
936     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
937                   " to not overwrite existing data (%s bytes will not be"
938                   " wiped)", (1024 * 1024) - remainder)
939     mib += 1
940
941   return mib
942
943
944 def _CalcEta(time_taken, written, total_size):
945   """Calculates the ETA based on size written and total size.
946
947   @param time_taken: The time taken so far
948   @param written: amount written so far
949   @param total_size: The total size of data to be written
950   @return: The remaining time in seconds
951
952   """
953   avg_time = time_taken / float(written)
954   return (total_size - written) * avg_time
955
956
957 def WipeDisks(lu, instance, disks=None):
958   """Wipes instance disks.
959
960   @type lu: L{LogicalUnit}
961   @param lu: the logical unit on whose behalf we execute
962   @type instance: L{objects.Instance}
963   @param instance: the instance whose disks we should create
964   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
965   @param disks: Disk details; tuple contains disk index, disk object and the
966     start offset
967
968   """
969   node = instance.primary_node
970
971   if disks is None:
972     disks = [(idx, disk, 0)
973              for (idx, disk) in enumerate(instance.disks)]
974
975   for (_, device, _) in disks:
976     lu.cfg.SetDiskID(device, node)
977
978   logging.info("Pausing synchronization of disks of instance '%s'",
979                instance.name)
980   result = lu.rpc.call_blockdev_pause_resume_sync(node,
981                                                   (map(compat.snd, disks),
982                                                    instance),
983                                                   True)
984   result.Raise("Failed to pause disk synchronization on node '%s'" % node)
985
986   for idx, success in enumerate(result.payload):
987     if not success:
988       logging.warn("Pausing synchronization of disk %s of instance '%s'"
989                    " failed", idx, instance.name)
990
991   try:
992     for (idx, device, offset) in disks:
993       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
994       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
995       wipe_chunk_size = \
996         int(min(constants.MAX_WIPE_CHUNK,
997                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
998
999       size = device.size
1000       last_output = 0
1001       start_time = time.time()
1002
1003       if offset == 0:
1004         info_text = ""
1005       else:
1006         info_text = (" (from %s to %s)" %
1007                      (utils.FormatUnit(offset, "h"),
1008                       utils.FormatUnit(size, "h")))
1009
1010       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1011
1012       logging.info("Wiping disk %d for instance %s on node %s using"
1013                    " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1014
1015       while offset < size:
1016         wipe_size = min(wipe_chunk_size, size - offset)
1017
1018         logging.debug("Wiping disk %d, offset %s, chunk %s",
1019                       idx, offset, wipe_size)
1020
1021         result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1022                                            wipe_size)
1023         result.Raise("Could not wipe disk %d at offset %d for size %d" %
1024                      (idx, offset, wipe_size))
1025
1026         now = time.time()
1027         offset += wipe_size
1028         if now - last_output >= 60:
1029           eta = _CalcEta(now - start_time, offset, size)
1030           lu.LogInfo(" - done: %.1f%% ETA: %s",
1031                      offset / float(size) * 100, utils.FormatSeconds(eta))
1032           last_output = now
1033   finally:
1034     logging.info("Resuming synchronization of disks for instance '%s'",
1035                  instance.name)
1036
1037     result = lu.rpc.call_blockdev_pause_resume_sync(node,
1038                                                     (map(compat.snd, disks),
1039                                                      instance),
1040                                                     False)
1041
1042     if result.fail_msg:
1043       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1044                     node, result.fail_msg)
1045     else:
1046       for idx, success in enumerate(result.payload):
1047         if not success:
1048           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1049                         " failed", idx, instance.name)
1050
1051
1052 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1053   """Wrapper for L{WipeDisks} that handles errors.
1054
1055   @type lu: L{LogicalUnit}
1056   @param lu: the logical unit on whose behalf we execute
1057   @type instance: L{objects.Instance}
1058   @param instance: the instance whose disks we should wipe
1059   @param disks: see L{WipeDisks}
1060   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1061       case of error
1062   @raise errors.OpPrereqError: in case of failure
1063
1064   """
1065   try:
1066     WipeDisks(lu, instance, disks=disks)
1067   except errors.OpExecError:
1068     logging.warning("Wiping disks for instance '%s' failed",
1069                     instance.name)
1070     _UndoCreateDisks(lu, cleanup)
1071     raise
1072
1073
1074 def ExpandCheckDisks(instance, disks):
1075   """Return the instance disks selected by the disks list
1076
1077   @type disks: list of L{objects.Disk} or None
1078   @param disks: selected disks
1079   @rtype: list of L{objects.Disk}
1080   @return: selected instance disks to act on
1081
1082   """
1083   if disks is None:
1084     return instance.disks
1085   else:
1086     if not set(disks).issubset(instance.disks):
1087       raise errors.ProgrammerError("Can only act on disks belonging to the"
1088                                    " target instance: expected a subset of %r,"
1089                                    " got %r" % (instance.disks, disks))
1090     return disks
1091
1092
1093 def WaitForSync(lu, instance, disks=None, oneshot=False):
1094   """Sleep and poll for an instance's disk to sync.
1095
1096   """
1097   if not instance.disks or disks is not None and not disks:
1098     return True
1099
1100   disks = ExpandCheckDisks(instance, disks)
1101
1102   if not oneshot:
1103     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1104
1105   node = instance.primary_node
1106
1107   for dev in disks:
1108     lu.cfg.SetDiskID(dev, node)
1109
1110   # TODO: Convert to utils.Retry
1111
1112   retries = 0
1113   degr_retries = 10 # in seconds, as we sleep 1 second each time
1114   while True:
1115     max_time = 0
1116     done = True
1117     cumul_degraded = False
1118     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1119     msg = rstats.fail_msg
1120     if msg:
1121       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1122       retries += 1
1123       if retries >= 10:
1124         raise errors.RemoteError("Can't contact node %s for mirror data,"
1125                                  " aborting." % node)
1126       time.sleep(6)
1127       continue
1128     rstats = rstats.payload
1129     retries = 0
1130     for i, mstat in enumerate(rstats):
1131       if mstat is None:
1132         lu.LogWarning("Can't compute data for node %s/%s",
1133                       node, disks[i].iv_name)
1134         continue
1135
1136       cumul_degraded = (cumul_degraded or
1137                         (mstat.is_degraded and mstat.sync_percent is None))
1138       if mstat.sync_percent is not None:
1139         done = False
1140         if mstat.estimated_time is not None:
1141           rem_time = ("%s remaining (estimated)" %
1142                       utils.FormatSeconds(mstat.estimated_time))
1143           max_time = mstat.estimated_time
1144         else:
1145           rem_time = "no time estimate"
1146         lu.LogInfo("- device %s: %5.2f%% done, %s",
1147                    disks[i].iv_name, mstat.sync_percent, rem_time)
1148
1149     # if we're done but degraded, let's do a few small retries, to
1150     # make sure we see a stable and not transient situation; therefore
1151     # we force restart of the loop
1152     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1153       logging.info("Degraded disks found, %d retries left", degr_retries)
1154       degr_retries -= 1
1155       time.sleep(1)
1156       continue
1157
1158     if done or oneshot:
1159       break
1160
1161     time.sleep(min(60, max_time))
1162
1163   if done:
1164     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1165
1166   return not cumul_degraded
1167
1168
1169 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1170   """Shutdown block devices of an instance.
1171
1172   This does the shutdown on all nodes of the instance.
1173
1174   If the ignore_primary is false, errors on the primary node are
1175   ignored.
1176
1177   """
1178   all_result = True
1179   disks = ExpandCheckDisks(instance, disks)
1180
1181   for disk in disks:
1182     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1183       lu.cfg.SetDiskID(top_disk, node)
1184       result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1185       msg = result.fail_msg
1186       if msg:
1187         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1188                       disk.iv_name, node, msg)
1189         if ((node == instance.primary_node and not ignore_primary) or
1190             (node != instance.primary_node and not result.offline)):
1191           all_result = False
1192   return all_result
1193
1194
1195 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1196   """Shutdown block devices of an instance.
1197
1198   This function checks if an instance is running, before calling
1199   _ShutdownInstanceDisks.
1200
1201   """
1202   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1203   ShutdownInstanceDisks(lu, instance, disks=disks)
1204
1205
1206 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1207                            ignore_size=False):
1208   """Prepare the block devices for an instance.
1209
1210   This sets up the block devices on all nodes.
1211
1212   @type lu: L{LogicalUnit}
1213   @param lu: the logical unit on whose behalf we execute
1214   @type instance: L{objects.Instance}
1215   @param instance: the instance for whose disks we assemble
1216   @type disks: list of L{objects.Disk} or None
1217   @param disks: which disks to assemble (or all, if None)
1218   @type ignore_secondaries: boolean
1219   @param ignore_secondaries: if true, errors on secondary nodes
1220       won't result in an error return from the function
1221   @type ignore_size: boolean
1222   @param ignore_size: if true, the current known size of the disk
1223       will not be used during the disk activation, useful for cases
1224       when the size is wrong
1225   @return: False if the operation failed, otherwise a list of
1226       (host, instance_visible_name, node_visible_name)
1227       with the mapping from node devices to instance devices
1228
1229   """
1230   device_info = []
1231   disks_ok = True
1232   iname = instance.name
1233   disks = ExpandCheckDisks(instance, disks)
1234
1235   # With the two passes mechanism we try to reduce the window of
1236   # opportunity for the race condition of switching DRBD to primary
1237   # before handshaking occured, but we do not eliminate it
1238
1239   # The proper fix would be to wait (with some limits) until the
1240   # connection has been made and drbd transitions from WFConnection
1241   # into any other network-connected state (Connected, SyncTarget,
1242   # SyncSource, etc.)
1243
1244   # 1st pass, assemble on all nodes in secondary mode
1245   for idx, inst_disk in enumerate(disks):
1246     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1247       if ignore_size:
1248         node_disk = node_disk.Copy()
1249         node_disk.UnsetSize()
1250       lu.cfg.SetDiskID(node_disk, node)
1251       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1252                                              False, idx)
1253       msg = result.fail_msg
1254       if msg:
1255         is_offline_secondary = (node in instance.secondary_nodes and
1256                                 result.offline)
1257         lu.LogWarning("Could not prepare block device %s on node %s"
1258                       " (is_primary=False, pass=1): %s",
1259                       inst_disk.iv_name, node, msg)
1260         if not (ignore_secondaries or is_offline_secondary):
1261           disks_ok = False
1262
1263   # FIXME: race condition on drbd migration to primary
1264
1265   # 2nd pass, do only the primary node
1266   for idx, inst_disk in enumerate(disks):
1267     dev_path = None
1268
1269     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1270       if node != instance.primary_node:
1271         continue
1272       if ignore_size:
1273         node_disk = node_disk.Copy()
1274         node_disk.UnsetSize()
1275       lu.cfg.SetDiskID(node_disk, node)
1276       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1277                                              True, idx)
1278       msg = result.fail_msg
1279       if msg:
1280         lu.LogWarning("Could not prepare block device %s on node %s"
1281                       " (is_primary=True, pass=2): %s",
1282                       inst_disk.iv_name, node, msg)
1283         disks_ok = False
1284       else:
1285         dev_path = result.payload
1286
1287     device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1288
1289   # leave the disks configured for the primary node
1290   # this is a workaround that would be fixed better by
1291   # improving the logical/physical id handling
1292   for disk in disks:
1293     lu.cfg.SetDiskID(disk, instance.primary_node)
1294
1295   return disks_ok, device_info
1296
1297
1298 def StartInstanceDisks(lu, instance, force):
1299   """Start the disks of an instance.
1300
1301   """
1302   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1303                                       ignore_secondaries=force)
1304   if not disks_ok:
1305     ShutdownInstanceDisks(lu, instance)
1306     if force is not None and not force:
1307       lu.LogWarning("",
1308                     hint=("If the message above refers to a secondary node,"
1309                           " you can retry the operation using '--force'"))
1310     raise errors.OpExecError("Disk consistency error")
1311
1312
1313 class LUInstanceGrowDisk(LogicalUnit):
1314   """Grow a disk of an instance.
1315
1316   """
1317   HPATH = "disk-grow"
1318   HTYPE = constants.HTYPE_INSTANCE
1319   REQ_BGL = False
1320
1321   def ExpandNames(self):
1322     self._ExpandAndLockInstance()
1323     self.needed_locks[locking.LEVEL_NODE] = []
1324     self.needed_locks[locking.LEVEL_NODE_RES] = []
1325     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1326     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1327
1328   def DeclareLocks(self, level):
1329     if level == locking.LEVEL_NODE:
1330       self._LockInstancesNodes()
1331     elif level == locking.LEVEL_NODE_RES:
1332       # Copy node locks
1333       self.needed_locks[locking.LEVEL_NODE_RES] = \
1334         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1335
1336   def BuildHooksEnv(self):
1337     """Build hooks env.
1338
1339     This runs on the master, the primary and all the secondaries.
1340
1341     """
1342     env = {
1343       "DISK": self.op.disk,
1344       "AMOUNT": self.op.amount,
1345       "ABSOLUTE": self.op.absolute,
1346       }
1347     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1348     return env
1349
1350   def BuildHooksNodes(self):
1351     """Build hooks nodes.
1352
1353     """
1354     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1355     return (nl, nl)
1356
1357   def CheckPrereq(self):
1358     """Check prerequisites.
1359
1360     This checks that the instance is in the cluster.
1361
1362     """
1363     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1364     assert instance is not None, \
1365       "Cannot retrieve locked instance %s" % self.op.instance_name
1366     nodenames = list(instance.all_nodes)
1367     for node in nodenames:
1368       CheckNodeOnline(self, node)
1369
1370     self.instance = instance
1371
1372     if instance.disk_template not in constants.DTS_GROWABLE:
1373       raise errors.OpPrereqError("Instance's disk layout does not support"
1374                                  " growing", errors.ECODE_INVAL)
1375
1376     self.disk = instance.FindDisk(self.op.disk)
1377
1378     if self.op.absolute:
1379       self.target = self.op.amount
1380       self.delta = self.target - self.disk.size
1381       if self.delta < 0:
1382         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1383                                    "current disk size (%s)" %
1384                                    (utils.FormatUnit(self.target, "h"),
1385                                     utils.FormatUnit(self.disk.size, "h")),
1386                                    errors.ECODE_STATE)
1387     else:
1388       self.delta = self.op.amount
1389       self.target = self.disk.size + self.delta
1390       if self.delta < 0:
1391         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1392                                    utils.FormatUnit(self.delta, "h"),
1393                                    errors.ECODE_INVAL)
1394
1395     self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1396
1397   def _CheckDiskSpace(self, nodenames, req_vgspace):
1398     template = self.instance.disk_template
1399     if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1400       # TODO: check the free disk space for file, when that feature will be
1401       # supported
1402       nodes = map(self.cfg.GetNodeInfo, nodenames)
1403       es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1404                         nodes)
1405       if es_nodes:
1406         # With exclusive storage we need to something smarter than just looking
1407         # at free space; for now, let's simply abort the operation.
1408         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1409                                    " is enabled", errors.ECODE_STATE)
1410       CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1411
1412   def Exec(self, feedback_fn):
1413     """Execute disk grow.
1414
1415     """
1416     instance = self.instance
1417     disk = self.disk
1418
1419     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1420     assert (self.owned_locks(locking.LEVEL_NODE) ==
1421             self.owned_locks(locking.LEVEL_NODE_RES))
1422
1423     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1424
1425     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1426     if not disks_ok:
1427       raise errors.OpExecError("Cannot activate block device to grow")
1428
1429     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1430                 (self.op.disk, instance.name,
1431                  utils.FormatUnit(self.delta, "h"),
1432                  utils.FormatUnit(self.target, "h")))
1433
1434     # First run all grow ops in dry-run mode
1435     for node in instance.all_nodes:
1436       self.cfg.SetDiskID(disk, node)
1437       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1438                                            True, True)
1439       result.Raise("Dry-run grow request failed to node %s" % node)
1440
1441     if wipe_disks:
1442       # Get disk size from primary node for wiping
1443       result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1444       result.Raise("Failed to retrieve disk size from node '%s'" %
1445                    instance.primary_node)
1446
1447       (disk_size_in_bytes, ) = result.payload
1448
1449       if disk_size_in_bytes is None:
1450         raise errors.OpExecError("Failed to retrieve disk size from primary"
1451                                  " node '%s'" % instance.primary_node)
1452
1453       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1454
1455       assert old_disk_size >= disk.size, \
1456         ("Retrieved disk size too small (got %s, should be at least %s)" %
1457          (old_disk_size, disk.size))
1458     else:
1459       old_disk_size = None
1460
1461     # We know that (as far as we can test) operations across different
1462     # nodes will succeed, time to run it for real on the backing storage
1463     for node in instance.all_nodes:
1464       self.cfg.SetDiskID(disk, node)
1465       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1466                                            False, True)
1467       result.Raise("Grow request failed to node %s" % node)
1468
1469     # And now execute it for logical storage, on the primary node
1470     node = instance.primary_node
1471     self.cfg.SetDiskID(disk, node)
1472     result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1473                                          False, False)
1474     result.Raise("Grow request failed to node %s" % node)
1475
1476     disk.RecordGrow(self.delta)
1477     self.cfg.Update(instance, feedback_fn)
1478
1479     # Changes have been recorded, release node lock
1480     ReleaseLocks(self, locking.LEVEL_NODE)
1481
1482     # Downgrade lock while waiting for sync
1483     self.glm.downgrade(locking.LEVEL_INSTANCE)
1484
1485     assert wipe_disks ^ (old_disk_size is None)
1486
1487     if wipe_disks:
1488       assert instance.disks[self.op.disk] == disk
1489
1490       # Wipe newly added disk space
1491       WipeDisks(self, instance,
1492                 disks=[(self.op.disk, disk, old_disk_size)])
1493
1494     if self.op.wait_for_sync:
1495       disk_abort = not WaitForSync(self, instance, disks=[disk])
1496       if disk_abort:
1497         self.LogWarning("Disk syncing has not returned a good status; check"
1498                         " the instance")
1499       if instance.admin_state != constants.ADMINST_UP:
1500         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1501     elif instance.admin_state != constants.ADMINST_UP:
1502       self.LogWarning("Not shutting down the disk even if the instance is"
1503                       " not supposed to be running because no wait for"
1504                       " sync mode was requested")
1505
1506     assert self.owned_locks(locking.LEVEL_NODE_RES)
1507     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1508
1509
1510 class LUInstanceReplaceDisks(LogicalUnit):
1511   """Replace the disks of an instance.
1512
1513   """
1514   HPATH = "mirrors-replace"
1515   HTYPE = constants.HTYPE_INSTANCE
1516   REQ_BGL = False
1517
1518   def CheckArguments(self):
1519     """Check arguments.
1520
1521     """
1522     remote_node = self.op.remote_node
1523     ialloc = self.op.iallocator
1524     if self.op.mode == constants.REPLACE_DISK_CHG:
1525       if remote_node is None and ialloc is None:
1526         raise errors.OpPrereqError("When changing the secondary either an"
1527                                    " iallocator script must be used or the"
1528                                    " new node given", errors.ECODE_INVAL)
1529       else:
1530         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1531
1532     elif remote_node is not None or ialloc is not None:
1533       # Not replacing the secondary
1534       raise errors.OpPrereqError("The iallocator and new node options can"
1535                                  " only be used when changing the"
1536                                  " secondary node", errors.ECODE_INVAL)
1537
1538   def ExpandNames(self):
1539     self._ExpandAndLockInstance()
1540
1541     assert locking.LEVEL_NODE not in self.needed_locks
1542     assert locking.LEVEL_NODE_RES not in self.needed_locks
1543     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1544
1545     assert self.op.iallocator is None or self.op.remote_node is None, \
1546       "Conflicting options"
1547
1548     if self.op.remote_node is not None:
1549       self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1550
1551       # Warning: do not remove the locking of the new secondary here
1552       # unless DRBD8Dev.AddChildren is changed to work in parallel;
1553       # currently it doesn't since parallel invocations of
1554       # FindUnusedMinor will conflict
1555       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1556       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1557     else:
1558       self.needed_locks[locking.LEVEL_NODE] = []
1559       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1560
1561       if self.op.iallocator is not None:
1562         # iallocator will select a new node in the same group
1563         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1564         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1565
1566     self.needed_locks[locking.LEVEL_NODE_RES] = []
1567
1568     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1569                                    self.op.iallocator, self.op.remote_node,
1570                                    self.op.disks, self.op.early_release,
1571                                    self.op.ignore_ipolicy)
1572
1573     self.tasklets = [self.replacer]
1574
1575   def DeclareLocks(self, level):
1576     if level == locking.LEVEL_NODEGROUP:
1577       assert self.op.remote_node is None
1578       assert self.op.iallocator is not None
1579       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1580
1581       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1582       # Lock all groups used by instance optimistically; this requires going
1583       # via the node before it's locked, requiring verification later on
1584       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1585         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1586
1587     elif level == locking.LEVEL_NODE:
1588       if self.op.iallocator is not None:
1589         assert self.op.remote_node is None
1590         assert not self.needed_locks[locking.LEVEL_NODE]
1591         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1592
1593         # Lock member nodes of all locked groups
1594         self.needed_locks[locking.LEVEL_NODE] = \
1595           [node_name
1596            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1597            for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1598       else:
1599         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1600
1601         self._LockInstancesNodes()
1602
1603     elif level == locking.LEVEL_NODE_RES:
1604       # Reuse node locks
1605       self.needed_locks[locking.LEVEL_NODE_RES] = \
1606         self.needed_locks[locking.LEVEL_NODE]
1607
1608   def BuildHooksEnv(self):
1609     """Build hooks env.
1610
1611     This runs on the master, the primary and all the secondaries.
1612
1613     """
1614     instance = self.replacer.instance
1615     env = {
1616       "MODE": self.op.mode,
1617       "NEW_SECONDARY": self.op.remote_node,
1618       "OLD_SECONDARY": instance.secondary_nodes[0],
1619       }
1620     env.update(BuildInstanceHookEnvByObject(self, instance))
1621     return env
1622
1623   def BuildHooksNodes(self):
1624     """Build hooks nodes.
1625
1626     """
1627     instance = self.replacer.instance
1628     nl = [
1629       self.cfg.GetMasterNode(),
1630       instance.primary_node,
1631       ]
1632     if self.op.remote_node is not None:
1633       nl.append(self.op.remote_node)
1634     return nl, nl
1635
1636   def CheckPrereq(self):
1637     """Check prerequisites.
1638
1639     """
1640     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1641             self.op.iallocator is None)
1642
1643     # Verify if node group locks are still correct
1644     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1645     if owned_groups:
1646       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1647
1648     return LogicalUnit.CheckPrereq(self)
1649
1650
1651 class LUInstanceActivateDisks(NoHooksLU):
1652   """Bring up an instance's disks.
1653
1654   """
1655   REQ_BGL = False
1656
1657   def ExpandNames(self):
1658     self._ExpandAndLockInstance()
1659     self.needed_locks[locking.LEVEL_NODE] = []
1660     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1661
1662   def DeclareLocks(self, level):
1663     if level == locking.LEVEL_NODE:
1664       self._LockInstancesNodes()
1665
1666   def CheckPrereq(self):
1667     """Check prerequisites.
1668
1669     This checks that the instance is in the cluster.
1670
1671     """
1672     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1673     assert self.instance is not None, \
1674       "Cannot retrieve locked instance %s" % self.op.instance_name
1675     CheckNodeOnline(self, self.instance.primary_node)
1676
1677   def Exec(self, feedback_fn):
1678     """Activate the disks.
1679
1680     """
1681     disks_ok, disks_info = \
1682               AssembleInstanceDisks(self, self.instance,
1683                                     ignore_size=self.op.ignore_size)
1684     if not disks_ok:
1685       raise errors.OpExecError("Cannot activate block devices")
1686
1687     if self.op.wait_for_sync:
1688       if not WaitForSync(self, self.instance):
1689         raise errors.OpExecError("Some disks of the instance are degraded!")
1690
1691     return disks_info
1692
1693
1694 class LUInstanceDeactivateDisks(NoHooksLU):
1695   """Shutdown an instance's disks.
1696
1697   """
1698   REQ_BGL = False
1699
1700   def ExpandNames(self):
1701     self._ExpandAndLockInstance()
1702     self.needed_locks[locking.LEVEL_NODE] = []
1703     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1704
1705   def DeclareLocks(self, level):
1706     if level == locking.LEVEL_NODE:
1707       self._LockInstancesNodes()
1708
1709   def CheckPrereq(self):
1710     """Check prerequisites.
1711
1712     This checks that the instance is in the cluster.
1713
1714     """
1715     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1716     assert self.instance is not None, \
1717       "Cannot retrieve locked instance %s" % self.op.instance_name
1718
1719   def Exec(self, feedback_fn):
1720     """Deactivate the disks
1721
1722     """
1723     instance = self.instance
1724     if self.op.force:
1725       ShutdownInstanceDisks(self, instance)
1726     else:
1727       _SafeShutdownInstanceDisks(self, instance)
1728
1729
1730 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1731                                ldisk=False):
1732   """Check that mirrors are not degraded.
1733
1734   @attention: The device has to be annotated already.
1735
1736   The ldisk parameter, if True, will change the test from the
1737   is_degraded attribute (which represents overall non-ok status for
1738   the device(s)) to the ldisk (representing the local storage status).
1739
1740   """
1741   lu.cfg.SetDiskID(dev, node)
1742
1743   result = True
1744
1745   if on_primary or dev.AssembleOnSecondary():
1746     rstats = lu.rpc.call_blockdev_find(node, dev)
1747     msg = rstats.fail_msg
1748     if msg:
1749       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1750       result = False
1751     elif not rstats.payload:
1752       lu.LogWarning("Can't find disk on node %s", node)
1753       result = False
1754     else:
1755       if ldisk:
1756         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1757       else:
1758         result = result and not rstats.payload.is_degraded
1759
1760   if dev.children:
1761     for child in dev.children:
1762       result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1763                                                      on_primary)
1764
1765   return result
1766
1767
1768 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1769   """Wrapper around L{_CheckDiskConsistencyInner}.
1770
1771   """
1772   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1773   return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1774                                     ldisk=ldisk)
1775
1776
1777 def _BlockdevFind(lu, node, dev, instance):
1778   """Wrapper around call_blockdev_find to annotate diskparams.
1779
1780   @param lu: A reference to the lu object
1781   @param node: The node to call out
1782   @param dev: The device to find
1783   @param instance: The instance object the device belongs to
1784   @returns The result of the rpc call
1785
1786   """
1787   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1788   return lu.rpc.call_blockdev_find(node, disk)
1789
1790
1791 def _GenerateUniqueNames(lu, exts):
1792   """Generate a suitable LV name.
1793
1794   This will generate a logical volume name for the given instance.
1795
1796   """
1797   results = []
1798   for val in exts:
1799     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1800     results.append("%s%s" % (new_id, val))
1801   return results
1802
1803
1804 class TLReplaceDisks(Tasklet):
1805   """Replaces disks for an instance.
1806
1807   Note: Locking is not within the scope of this class.
1808
1809   """
1810   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1811                disks, early_release, ignore_ipolicy):
1812     """Initializes this class.
1813
1814     """
1815     Tasklet.__init__(self, lu)
1816
1817     # Parameters
1818     self.instance_name = instance_name
1819     self.mode = mode
1820     self.iallocator_name = iallocator_name
1821     self.remote_node = remote_node
1822     self.disks = disks
1823     self.early_release = early_release
1824     self.ignore_ipolicy = ignore_ipolicy
1825
1826     # Runtime data
1827     self.instance = None
1828     self.new_node = None
1829     self.target_node = None
1830     self.other_node = None
1831     self.remote_node_info = None
1832     self.node_secondary_ip = None
1833
1834   @staticmethod
1835   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1836     """Compute a new secondary node using an IAllocator.
1837
1838     """
1839     req = iallocator.IAReqRelocate(name=instance_name,
1840                                    relocate_from=list(relocate_from))
1841     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1842
1843     ial.Run(iallocator_name)
1844
1845     if not ial.success:
1846       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1847                                  " %s" % (iallocator_name, ial.info),
1848                                  errors.ECODE_NORES)
1849
1850     remote_node_name = ial.result[0]
1851
1852     lu.LogInfo("Selected new secondary for instance '%s': %s",
1853                instance_name, remote_node_name)
1854
1855     return remote_node_name
1856
1857   def _FindFaultyDisks(self, node_name):
1858     """Wrapper for L{FindFaultyInstanceDisks}.
1859
1860     """
1861     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1862                                    node_name, True)
1863
1864   def _CheckDisksActivated(self, instance):
1865     """Checks if the instance disks are activated.
1866
1867     @param instance: The instance to check disks
1868     @return: True if they are activated, False otherwise
1869
1870     """
1871     nodes = instance.all_nodes
1872
1873     for idx, dev in enumerate(instance.disks):
1874       for node in nodes:
1875         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1876         self.cfg.SetDiskID(dev, node)
1877
1878         result = _BlockdevFind(self, node, dev, instance)
1879
1880         if result.offline:
1881           continue
1882         elif result.fail_msg or not result.payload:
1883           return False
1884
1885     return True
1886
1887   def CheckPrereq(self):
1888     """Check prerequisites.
1889
1890     This checks that the instance is in the cluster.
1891
1892     """
1893     self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1894     assert instance is not None, \
1895       "Cannot retrieve locked instance %s" % self.instance_name
1896
1897     if instance.disk_template != constants.DT_DRBD8:
1898       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1899                                  " instances", errors.ECODE_INVAL)
1900
1901     if len(instance.secondary_nodes) != 1:
1902       raise errors.OpPrereqError("The instance has a strange layout,"
1903                                  " expected one secondary but found %d" %
1904                                  len(instance.secondary_nodes),
1905                                  errors.ECODE_FAULT)
1906
1907     instance = self.instance
1908     secondary_node = instance.secondary_nodes[0]
1909
1910     if self.iallocator_name is None:
1911       remote_node = self.remote_node
1912     else:
1913       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1914                                        instance.name, instance.secondary_nodes)
1915
1916     if remote_node is None:
1917       self.remote_node_info = None
1918     else:
1919       assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1920              "Remote node '%s' is not locked" % remote_node
1921
1922       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1923       assert self.remote_node_info is not None, \
1924         "Cannot retrieve locked node %s" % remote_node
1925
1926     if remote_node == self.instance.primary_node:
1927       raise errors.OpPrereqError("The specified node is the primary node of"
1928                                  " the instance", errors.ECODE_INVAL)
1929
1930     if remote_node == secondary_node:
1931       raise errors.OpPrereqError("The specified node is already the"
1932                                  " secondary node of the instance",
1933                                  errors.ECODE_INVAL)
1934
1935     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1936                                     constants.REPLACE_DISK_CHG):
1937       raise errors.OpPrereqError("Cannot specify disks to be replaced",
1938                                  errors.ECODE_INVAL)
1939
1940     if self.mode == constants.REPLACE_DISK_AUTO:
1941       if not self._CheckDisksActivated(instance):
1942         raise errors.OpPrereqError("Please run activate-disks on instance %s"
1943                                    " first" % self.instance_name,
1944                                    errors.ECODE_STATE)
1945       faulty_primary = self._FindFaultyDisks(instance.primary_node)
1946       faulty_secondary = self._FindFaultyDisks(secondary_node)
1947
1948       if faulty_primary and faulty_secondary:
1949         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1950                                    " one node and can not be repaired"
1951                                    " automatically" % self.instance_name,
1952                                    errors.ECODE_STATE)
1953
1954       if faulty_primary:
1955         self.disks = faulty_primary
1956         self.target_node = instance.primary_node
1957         self.other_node = secondary_node
1958         check_nodes = [self.target_node, self.other_node]
1959       elif faulty_secondary:
1960         self.disks = faulty_secondary
1961         self.target_node = secondary_node
1962         self.other_node = instance.primary_node
1963         check_nodes = [self.target_node, self.other_node]
1964       else:
1965         self.disks = []
1966         check_nodes = []
1967
1968     else:
1969       # Non-automatic modes
1970       if self.mode == constants.REPLACE_DISK_PRI:
1971         self.target_node = instance.primary_node
1972         self.other_node = secondary_node
1973         check_nodes = [self.target_node, self.other_node]
1974
1975       elif self.mode == constants.REPLACE_DISK_SEC:
1976         self.target_node = secondary_node
1977         self.other_node = instance.primary_node
1978         check_nodes = [self.target_node, self.other_node]
1979
1980       elif self.mode == constants.REPLACE_DISK_CHG:
1981         self.new_node = remote_node
1982         self.other_node = instance.primary_node
1983         self.target_node = secondary_node
1984         check_nodes = [self.new_node, self.other_node]
1985
1986         CheckNodeNotDrained(self.lu, remote_node)
1987         CheckNodeVmCapable(self.lu, remote_node)
1988
1989         old_node_info = self.cfg.GetNodeInfo(secondary_node)
1990         assert old_node_info is not None
1991         if old_node_info.offline and not self.early_release:
1992           # doesn't make sense to delay the release
1993           self.early_release = True
1994           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1995                           " early-release mode", secondary_node)
1996
1997       else:
1998         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1999                                      self.mode)
2000
2001       # If not specified all disks should be replaced
2002       if not self.disks:
2003         self.disks = range(len(self.instance.disks))
2004
2005     # TODO: This is ugly, but right now we can't distinguish between internal
2006     # submitted opcode and external one. We should fix that.
2007     if self.remote_node_info:
2008       # We change the node, lets verify it still meets instance policy
2009       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2010       cluster = self.cfg.GetClusterInfo()
2011       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2012                                                               new_group_info)
2013       CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2014                              self.cfg, ignore=self.ignore_ipolicy)
2015
2016     for node in check_nodes:
2017       CheckNodeOnline(self.lu, node)
2018
2019     touched_nodes = frozenset(node_name for node_name in [self.new_node,
2020                                                           self.other_node,
2021                                                           self.target_node]
2022                               if node_name is not None)
2023
2024     # Release unneeded node and node resource locks
2025     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2026     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2027     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2028
2029     # Release any owned node group
2030     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2031
2032     # Check whether disks are valid
2033     for disk_idx in self.disks:
2034       instance.FindDisk(disk_idx)
2035
2036     # Get secondary node IP addresses
2037     self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2038                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2039
2040   def Exec(self, feedback_fn):
2041     """Execute disk replacement.
2042
2043     This dispatches the disk replacement to the appropriate handler.
2044
2045     """
2046     if __debug__:
2047       # Verify owned locks before starting operation
2048       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2049       assert set(owned_nodes) == set(self.node_secondary_ip), \
2050           ("Incorrect node locks, owning %s, expected %s" %
2051            (owned_nodes, self.node_secondary_ip.keys()))
2052       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2053               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2054       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2055
2056       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2057       assert list(owned_instances) == [self.instance_name], \
2058           "Instance '%s' not locked" % self.instance_name
2059
2060       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2061           "Should not own any node group lock at this point"
2062
2063     if not self.disks:
2064       feedback_fn("No disks need replacement for instance '%s'" %
2065                   self.instance.name)
2066       return
2067
2068     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2069                 (utils.CommaJoin(self.disks), self.instance.name))
2070     feedback_fn("Current primary node: %s" % self.instance.primary_node)
2071     feedback_fn("Current seconary node: %s" %
2072                 utils.CommaJoin(self.instance.secondary_nodes))
2073
2074     activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2075
2076     # Activate the instance disks if we're replacing them on a down instance
2077     if activate_disks:
2078       StartInstanceDisks(self.lu, self.instance, True)
2079
2080     try:
2081       # Should we replace the secondary node?
2082       if self.new_node is not None:
2083         fn = self._ExecDrbd8Secondary
2084       else:
2085         fn = self._ExecDrbd8DiskOnly
2086
2087       result = fn(feedback_fn)
2088     finally:
2089       # Deactivate the instance disks if we're replacing them on a
2090       # down instance
2091       if activate_disks:
2092         _SafeShutdownInstanceDisks(self.lu, self.instance)
2093
2094     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2095
2096     if __debug__:
2097       # Verify owned locks
2098       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2099       nodes = frozenset(self.node_secondary_ip)
2100       assert ((self.early_release and not owned_nodes) or
2101               (not self.early_release and not (set(owned_nodes) - nodes))), \
2102         ("Not owning the correct locks, early_release=%s, owned=%r,"
2103          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2104
2105     return result
2106
2107   def _CheckVolumeGroup(self, nodes):
2108     self.lu.LogInfo("Checking volume groups")
2109
2110     vgname = self.cfg.GetVGName()
2111
2112     # Make sure volume group exists on all involved nodes
2113     results = self.rpc.call_vg_list(nodes)
2114     if not results:
2115       raise errors.OpExecError("Can't list volume groups on the nodes")
2116
2117     for node in nodes:
2118       res = results[node]
2119       res.Raise("Error checking node %s" % node)
2120       if vgname not in res.payload:
2121         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2122                                  (vgname, node))
2123
2124   def _CheckDisksExistence(self, nodes):
2125     # Check disk existence
2126     for idx, dev in enumerate(self.instance.disks):
2127       if idx not in self.disks:
2128         continue
2129
2130       for node in nodes:
2131         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2132         self.cfg.SetDiskID(dev, node)
2133
2134         result = _BlockdevFind(self, node, dev, self.instance)
2135
2136         msg = result.fail_msg
2137         if msg or not result.payload:
2138           if not msg:
2139             msg = "disk not found"
2140           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2141                                    (idx, node, msg))
2142
2143   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2144     for idx, dev in enumerate(self.instance.disks):
2145       if idx not in self.disks:
2146         continue
2147
2148       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2149                       (idx, node_name))
2150
2151       if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2152                                   on_primary, ldisk=ldisk):
2153         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2154                                  " replace disks for instance %s" %
2155                                  (node_name, self.instance.name))
2156
2157   def _CreateNewStorage(self, node_name):
2158     """Create new storage on the primary or secondary node.
2159
2160     This is only used for same-node replaces, not for changing the
2161     secondary node, hence we don't want to modify the existing disk.
2162
2163     """
2164     iv_names = {}
2165
2166     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2167     for idx, dev in enumerate(disks):
2168       if idx not in self.disks:
2169         continue
2170
2171       self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2172
2173       self.cfg.SetDiskID(dev, node_name)
2174
2175       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2176       names = _GenerateUniqueNames(self.lu, lv_names)
2177
2178       (data_disk, meta_disk) = dev.children
2179       vg_data = data_disk.logical_id[0]
2180       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2181                              logical_id=(vg_data, names[0]),
2182                              params=data_disk.params)
2183       vg_meta = meta_disk.logical_id[0]
2184       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2185                              size=constants.DRBD_META_SIZE,
2186                              logical_id=(vg_meta, names[1]),
2187                              params=meta_disk.params)
2188
2189       new_lvs = [lv_data, lv_meta]
2190       old_lvs = [child.Copy() for child in dev.children]
2191       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2192       excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2193
2194       # we pass force_create=True to force the LVM creation
2195       for new_lv in new_lvs:
2196         _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2197                              GetInstanceInfoText(self.instance), False,
2198                              excl_stor)
2199
2200     return iv_names
2201
2202   def _CheckDevices(self, node_name, iv_names):
2203     for name, (dev, _, _) in iv_names.iteritems():
2204       self.cfg.SetDiskID(dev, node_name)
2205
2206       result = _BlockdevFind(self, node_name, dev, self.instance)
2207
2208       msg = result.fail_msg
2209       if msg or not result.payload:
2210         if not msg:
2211           msg = "disk not found"
2212         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2213                                  (name, msg))
2214
2215       if result.payload.is_degraded:
2216         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2217
2218   def _RemoveOldStorage(self, node_name, iv_names):
2219     for name, (_, old_lvs, _) in iv_names.iteritems():
2220       self.lu.LogInfo("Remove logical volumes for %s", name)
2221
2222       for lv in old_lvs:
2223         self.cfg.SetDiskID(lv, node_name)
2224
2225         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2226         if msg:
2227           self.lu.LogWarning("Can't remove old LV: %s", msg,
2228                              hint="remove unused LVs manually")
2229
2230   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2231     """Replace a disk on the primary or secondary for DRBD 8.
2232
2233     The algorithm for replace is quite complicated:
2234
2235       1. for each disk to be replaced:
2236
2237         1. create new LVs on the target node with unique names
2238         1. detach old LVs from the drbd device
2239         1. rename old LVs to name_replaced.<time_t>
2240         1. rename new LVs to old LVs
2241         1. attach the new LVs (with the old names now) to the drbd device
2242
2243       1. wait for sync across all devices
2244
2245       1. for each modified disk:
2246
2247         1. remove old LVs (which have the name name_replaces.<time_t>)
2248
2249     Failures are not very well handled.
2250
2251     """
2252     steps_total = 6
2253
2254     # Step: check device activation
2255     self.lu.LogStep(1, steps_total, "Check device existence")
2256     self._CheckDisksExistence([self.other_node, self.target_node])
2257     self._CheckVolumeGroup([self.target_node, self.other_node])
2258
2259     # Step: check other node consistency
2260     self.lu.LogStep(2, steps_total, "Check peer consistency")
2261     self._CheckDisksConsistency(self.other_node,
2262                                 self.other_node == self.instance.primary_node,
2263                                 False)
2264
2265     # Step: create new storage
2266     self.lu.LogStep(3, steps_total, "Allocate new storage")
2267     iv_names = self._CreateNewStorage(self.target_node)
2268
2269     # Step: for each lv, detach+rename*2+attach
2270     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2271     for dev, old_lvs, new_lvs in iv_names.itervalues():
2272       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2273
2274       result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2275                                                      old_lvs)
2276       result.Raise("Can't detach drbd from local storage on node"
2277                    " %s for device %s" % (self.target_node, dev.iv_name))
2278       #dev.children = []
2279       #cfg.Update(instance)
2280
2281       # ok, we created the new LVs, so now we know we have the needed
2282       # storage; as such, we proceed on the target node to rename
2283       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2284       # using the assumption that logical_id == physical_id (which in
2285       # turn is the unique_id on that node)
2286
2287       # FIXME(iustin): use a better name for the replaced LVs
2288       temp_suffix = int(time.time())
2289       ren_fn = lambda d, suff: (d.physical_id[0],
2290                                 d.physical_id[1] + "_replaced-%s" % suff)
2291
2292       # Build the rename list based on what LVs exist on the node
2293       rename_old_to_new = []
2294       for to_ren in old_lvs:
2295         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2296         if not result.fail_msg and result.payload:
2297           # device exists
2298           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2299
2300       self.lu.LogInfo("Renaming the old LVs on the target node")
2301       result = self.rpc.call_blockdev_rename(self.target_node,
2302                                              rename_old_to_new)
2303       result.Raise("Can't rename old LVs on node %s" % self.target_node)
2304
2305       # Now we rename the new LVs to the old LVs
2306       self.lu.LogInfo("Renaming the new LVs on the target node")
2307       rename_new_to_old = [(new, old.physical_id)
2308                            for old, new in zip(old_lvs, new_lvs)]
2309       result = self.rpc.call_blockdev_rename(self.target_node,
2310                                              rename_new_to_old)
2311       result.Raise("Can't rename new LVs on node %s" % self.target_node)
2312
2313       # Intermediate steps of in memory modifications
2314       for old, new in zip(old_lvs, new_lvs):
2315         new.logical_id = old.logical_id
2316         self.cfg.SetDiskID(new, self.target_node)
2317
2318       # We need to modify old_lvs so that removal later removes the
2319       # right LVs, not the newly added ones; note that old_lvs is a
2320       # copy here
2321       for disk in old_lvs:
2322         disk.logical_id = ren_fn(disk, temp_suffix)
2323         self.cfg.SetDiskID(disk, self.target_node)
2324
2325       # Now that the new lvs have the old name, we can add them to the device
2326       self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2327       result = self.rpc.call_blockdev_addchildren(self.target_node,
2328                                                   (dev, self.instance), new_lvs)
2329       msg = result.fail_msg
2330       if msg:
2331         for new_lv in new_lvs:
2332           msg2 = self.rpc.call_blockdev_remove(self.target_node,
2333                                                new_lv).fail_msg
2334           if msg2:
2335             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2336                                hint=("cleanup manually the unused logical"
2337                                      "volumes"))
2338         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2339
2340     cstep = itertools.count(5)
2341
2342     if self.early_release:
2343       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2344       self._RemoveOldStorage(self.target_node, iv_names)
2345       # TODO: Check if releasing locks early still makes sense
2346       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2347     else:
2348       # Release all resource locks except those used by the instance
2349       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2350                    keep=self.node_secondary_ip.keys())
2351
2352     # Release all node locks while waiting for sync
2353     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2354
2355     # TODO: Can the instance lock be downgraded here? Take the optional disk
2356     # shutdown in the caller into consideration.
2357
2358     # Wait for sync
2359     # This can fail as the old devices are degraded and _WaitForSync
2360     # does a combined result over all disks, so we don't check its return value
2361     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2362     WaitForSync(self.lu, self.instance)
2363
2364     # Check all devices manually
2365     self._CheckDevices(self.instance.primary_node, iv_names)
2366
2367     # Step: remove old storage
2368     if not self.early_release:
2369       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2370       self._RemoveOldStorage(self.target_node, iv_names)
2371
2372   def _ExecDrbd8Secondary(self, feedback_fn):
2373     """Replace the secondary node for DRBD 8.
2374
2375     The algorithm for replace is quite complicated:
2376       - for all disks of the instance:
2377         - create new LVs on the new node with same names
2378         - shutdown the drbd device on the old secondary
2379         - disconnect the drbd network on the primary
2380         - create the drbd device on the new secondary
2381         - network attach the drbd on the primary, using an artifice:
2382           the drbd code for Attach() will connect to the network if it
2383           finds a device which is connected to the good local disks but
2384           not network enabled
2385       - wait for sync across all devices
2386       - remove all disks from the old secondary
2387
2388     Failures are not very well handled.
2389
2390     """
2391     steps_total = 6
2392
2393     pnode = self.instance.primary_node
2394
2395     # Step: check device activation
2396     self.lu.LogStep(1, steps_total, "Check device existence")
2397     self._CheckDisksExistence([self.instance.primary_node])
2398     self._CheckVolumeGroup([self.instance.primary_node])
2399
2400     # Step: check other node consistency
2401     self.lu.LogStep(2, steps_total, "Check peer consistency")
2402     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2403
2404     # Step: create new storage
2405     self.lu.LogStep(3, steps_total, "Allocate new storage")
2406     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2407     excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2408     for idx, dev in enumerate(disks):
2409       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2410                       (self.new_node, idx))
2411       # we pass force_create=True to force LVM creation
2412       for new_lv in dev.children:
2413         _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2414                              True, GetInstanceInfoText(self.instance), False,
2415                              excl_stor)
2416
2417     # Step 4: dbrd minors and drbd setups changes
2418     # after this, we must manually remove the drbd minors on both the
2419     # error and the success paths
2420     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2421     minors = self.cfg.AllocateDRBDMinor([self.new_node
2422                                          for dev in self.instance.disks],
2423                                         self.instance.name)
2424     logging.debug("Allocated minors %r", minors)
2425
2426     iv_names = {}
2427     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2428       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2429                       (self.new_node, idx))
2430       # create new devices on new_node; note that we create two IDs:
2431       # one without port, so the drbd will be activated without
2432       # networking information on the new node at this stage, and one
2433       # with network, for the latter activation in step 4
2434       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2435       if self.instance.primary_node == o_node1:
2436         p_minor = o_minor1
2437       else:
2438         assert self.instance.primary_node == o_node2, "Three-node instance?"
2439         p_minor = o_minor2
2440
2441       new_alone_id = (self.instance.primary_node, self.new_node, None,
2442                       p_minor, new_minor, o_secret)
2443       new_net_id = (self.instance.primary_node, self.new_node, o_port,
2444                     p_minor, new_minor, o_secret)
2445
2446       iv_names[idx] = (dev, dev.children, new_net_id)
2447       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2448                     new_net_id)
2449       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2450                               logical_id=new_alone_id,
2451                               children=dev.children,
2452                               size=dev.size,
2453                               params={})
2454       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2455                                             self.cfg)
2456       try:
2457         CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2458                              anno_new_drbd,
2459                              GetInstanceInfoText(self.instance), False,
2460                              excl_stor)
2461       except errors.GenericError:
2462         self.cfg.ReleaseDRBDMinors(self.instance.name)
2463         raise
2464
2465     # We have new devices, shutdown the drbd on the old secondary
2466     for idx, dev in enumerate(self.instance.disks):
2467       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2468       self.cfg.SetDiskID(dev, self.target_node)
2469       msg = self.rpc.call_blockdev_shutdown(self.target_node,
2470                                             (dev, self.instance)).fail_msg
2471       if msg:
2472         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2473                            "node: %s" % (idx, msg),
2474                            hint=("Please cleanup this device manually as"
2475                                  " soon as possible"))
2476
2477     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2478     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2479                                                self.instance.disks)[pnode]
2480
2481     msg = result.fail_msg
2482     if msg:
2483       # detaches didn't succeed (unlikely)
2484       self.cfg.ReleaseDRBDMinors(self.instance.name)
2485       raise errors.OpExecError("Can't detach the disks from the network on"
2486                                " old node: %s" % (msg,))
2487
2488     # if we managed to detach at least one, we update all the disks of
2489     # the instance to point to the new secondary
2490     self.lu.LogInfo("Updating instance configuration")
2491     for dev, _, new_logical_id in iv_names.itervalues():
2492       dev.logical_id = new_logical_id
2493       self.cfg.SetDiskID(dev, self.instance.primary_node)
2494
2495     self.cfg.Update(self.instance, feedback_fn)
2496
2497     # Release all node locks (the configuration has been updated)
2498     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2499
2500     # and now perform the drbd attach
2501     self.lu.LogInfo("Attaching primary drbds to new secondary"
2502                     " (standalone => connected)")
2503     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2504                                             self.new_node],
2505                                            self.node_secondary_ip,
2506                                            (self.instance.disks, self.instance),
2507                                            self.instance.name,
2508                                            False)
2509     for to_node, to_result in result.items():
2510       msg = to_result.fail_msg
2511       if msg:
2512         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2513                            to_node, msg,
2514                            hint=("please do a gnt-instance info to see the"
2515                                  " status of disks"))
2516
2517     cstep = itertools.count(5)
2518
2519     if self.early_release:
2520       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2521       self._RemoveOldStorage(self.target_node, iv_names)
2522       # TODO: Check if releasing locks early still makes sense
2523       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2524     else:
2525       # Release all resource locks except those used by the instance
2526       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2527                    keep=self.node_secondary_ip.keys())
2528
2529     # TODO: Can the instance lock be downgraded here? Take the optional disk
2530     # shutdown in the caller into consideration.
2531
2532     # Wait for sync
2533     # This can fail as the old devices are degraded and _WaitForSync
2534     # does a combined result over all disks, so we don't check its return value
2535     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2536     WaitForSync(self.lu, self.instance)
2537
2538     # Check all devices manually
2539     self._CheckDevices(self.instance.primary_node, iv_names)
2540
2541     # Step: remove old storage
2542     if not self.early_release:
2543       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2544       self._RemoveOldStorage(self.target_node, iv_names)