QA for spindles in creating disks
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node)
93   result = lu.rpc.call_blockdev_create(node, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device, node, instance.name))
98   if device.physical_id is None:
99     device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103                          info, force_open, excl_stor):
104   """Create a tree of block devices on a given node.
105
106   If this device type has to be created on secondaries, create it and
107   all its children.
108
109   If not, just recurse to children keeping the same 'force' value.
110
111   @attention: The device has to be annotated already.
112
113   @param lu: the lu on whose behalf we execute
114   @param node: the node on which to create the device
115   @type instance: L{objects.Instance}
116   @param instance: the instance which owns the device
117   @type device: L{objects.Disk}
118   @param device: the device to create
119   @type force_create: boolean
120   @param force_create: whether to force creation of this device; this
121       will be change to True whenever we find a device which has
122       CreateOnSecondary() attribute
123   @param info: the extra 'metadata' we should attach to the device
124       (this will be represented as a LVM tag)
125   @type force_open: boolean
126   @param force_open: this parameter will be passes to the
127       L{backend.BlockdevCreate} function where it specifies
128       whether we run on primary or not, and it affects both
129       the child assembly and the device own Open() execution
130   @type excl_stor: boolean
131   @param excl_stor: Whether exclusive_storage is active for the node
132
133   @return: list of created devices
134   """
135   created_devices = []
136   try:
137     if device.CreateOnSecondary():
138       force_create = True
139
140     if device.children:
141       for child in device.children:
142         devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143                                     info, force_open, excl_stor)
144         created_devices.extend(devs)
145
146     if not force_create:
147       return created_devices
148
149     CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150                          excl_stor)
151     # The device has been completely created, so there is no point in keeping
152     # its subdevices in the list. We just add the device itself instead.
153     created_devices = [(node, device)]
154     return created_devices
155
156   except errors.DeviceCreationError, e:
157     e.created_devices.extend(created_devices)
158     raise e
159   except errors.OpExecError, e:
160     raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164   """Whether exclusive_storage is in effect for the given node.
165
166   @type cfg: L{config.ConfigWriter}
167   @param cfg: The cluster configuration
168   @type nodename: string
169   @param nodename: The node
170   @rtype: bool
171   @return: The effective value of exclusive_storage
172   @raise errors.OpPrereqError: if no node exists with the given name
173
174   """
175   ni = cfg.GetNodeInfo(nodename)
176   if ni is None:
177     raise errors.OpPrereqError("Invalid node name %s" % nodename,
178                                errors.ECODE_NOENT)
179   return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
183                     force_open):
184   """Wrapper around L{_CreateBlockDevInner}.
185
186   This method annotates the root device first.
187
188   """
189   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190   excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192                               force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created):
196   """Undo the work performed by L{CreateDisks}.
197
198   This function is called in case of an error to undo the work of
199   L{CreateDisks}.
200
201   @type lu: L{LogicalUnit}
202   @param lu: the logical unit on whose behalf we execute
203   @param disks_created: the result returned by L{CreateDisks}
204
205   """
206   for (node, disk) in disks_created:
207     lu.cfg.SetDiskID(disk, node)
208     result = lu.rpc.call_blockdev_remove(node, disk)
209     if result.fail_msg:
210       logging.warning("Failed to remove newly-created disk %s on node %s:"
211                       " %s", disk, node, result.fail_msg)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215   """Create all disks for an instance.
216
217   This abstracts away some work from AddInstance.
218
219   @type lu: L{LogicalUnit}
220   @param lu: the logical unit on whose behalf we execute
221   @type instance: L{objects.Instance}
222   @param instance: the instance whose disks we should create
223   @type to_skip: list
224   @param to_skip: list of indices to skip
225   @type target_node: string
226   @param target_node: if passed, overrides the target node for creation
227   @type disks: list of {objects.Disk}
228   @param disks: the disks to create; if not specified, all the disks of the
229       instance are created
230   @return: information about the created disks, to be used to call
231       L{_UndoCreateDisks}
232   @raise errors.OpPrereqError: in case of error
233
234   """
235   info = GetInstanceInfoText(instance)
236   if target_node is None:
237     pnode = instance.primary_node
238     all_nodes = instance.all_nodes
239   else:
240     pnode = target_node
241     all_nodes = [pnode]
242
243   if disks is None:
244     disks = instance.disks
245
246   if instance.disk_template in constants.DTS_FILEBASED:
247     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249
250     result.Raise("Failed to create directory '%s' on"
251                  " node %s" % (file_storage_dir, pnode))
252
253   disks_created = []
254   for idx, device in enumerate(disks):
255     if to_skip and idx in to_skip:
256       continue
257     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258     for node in all_nodes:
259       f_create = node == pnode
260       try:
261         _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262         disks_created.append((node, device))
263       except errors.DeviceCreationError, e:
264         logging.warning("Creating disk %s for instance '%s' failed",
265                         idx, instance.name)
266         disks_created.extend(e.created_devices)
267         _UndoCreateDisks(lu, disks_created)
268         raise errors.OpExecError(e.message)
269   return disks_created
270
271
272 def ComputeDiskSizePerVG(disk_template, disks):
273   """Compute disk size requirements in the volume group
274
275   """
276   def _compute(disks, payload):
277     """Universal algorithm.
278
279     """
280     vgs = {}
281     for disk in disks:
282       vgs[disk[constants.IDISK_VG]] = \
283         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284
285     return vgs
286
287   # Required free disk space as a function of disk and swap space
288   req_size_dict = {
289     constants.DT_DISKLESS: {},
290     constants.DT_PLAIN: _compute(disks, 0),
291     # 128 MB are added for drbd metadata for each disk
292     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293     constants.DT_FILE: {},
294     constants.DT_SHARED_FILE: {},
295     }
296
297   if disk_template not in req_size_dict:
298     raise errors.ProgrammerError("Disk template '%s' size requirement"
299                                  " is unknown" % disk_template)
300
301   return req_size_dict[disk_template]
302
303
304 def ComputeDisks(op, default_vg):
305   """Computes the instance disks.
306
307   @param op: The instance opcode
308   @param default_vg: The default_vg to assume
309
310   @return: The computed disks
311
312   """
313   disks = []
314   for disk in op.disks:
315     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316     if mode not in constants.DISK_ACCESS_SET:
317       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318                                  mode, errors.ECODE_INVAL)
319     size = disk.get(constants.IDISK_SIZE, None)
320     if size is None:
321       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322     try:
323       size = int(size)
324     except (TypeError, ValueError):
325       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326                                  errors.ECODE_INVAL)
327
328     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329     if ext_provider and op.disk_template != constants.DT_EXT:
330       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331                                  " disk template, not %s" %
332                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
333                                   op.disk_template), errors.ECODE_INVAL)
334
335     data_vg = disk.get(constants.IDISK_VG, default_vg)
336     name = disk.get(constants.IDISK_NAME, None)
337     if name is not None and name.lower() == constants.VALUE_NONE:
338       name = None
339     new_disk = {
340       constants.IDISK_SIZE: size,
341       constants.IDISK_MODE: mode,
342       constants.IDISK_VG: data_vg,
343       constants.IDISK_NAME: name,
344       }
345
346     for key in [
347       constants.IDISK_METAVG,
348       constants.IDISK_ADOPT,
349       constants.IDISK_SPINDLES,
350       ]:
351       if key in disk:
352         new_disk[key] = disk[key]
353
354     # For extstorage, demand the `provider' option and add any
355     # additional parameters (ext-params) to the dict
356     if op.disk_template == constants.DT_EXT:
357       if ext_provider:
358         new_disk[constants.IDISK_PROVIDER] = ext_provider
359         for key in disk:
360           if key not in constants.IDISK_PARAMS:
361             new_disk[key] = disk[key]
362       else:
363         raise errors.OpPrereqError("Missing provider for template '%s'" %
364                                    constants.DT_EXT, errors.ECODE_INVAL)
365
366     disks.append(new_disk)
367
368   return disks
369
370
371 def CheckRADOSFreeSpace():
372   """Compute disk size requirements inside the RADOS cluster.
373
374   """
375   # For the RADOS cluster we assume there is always enough space.
376   pass
377
378
379 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380                          iv_name, p_minor, s_minor):
381   """Generate a drbd8 device complete with its children.
382
383   """
384   assert len(vgnames) == len(names) == 2
385   port = lu.cfg.AllocatePort()
386   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387
388   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389                           logical_id=(vgnames[0], names[0]),
390                           params={})
391   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392   dev_meta = objects.Disk(dev_type=constants.LD_LV,
393                           size=constants.DRBD_META_SIZE,
394                           logical_id=(vgnames[1], names[1]),
395                           params={})
396   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398                           logical_id=(primary, secondary, port,
399                                       p_minor, s_minor,
400                                       shared_secret),
401                           children=[dev_data, dev_meta],
402                           iv_name=iv_name, params={})
403   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404   return drbd_dev
405
406
407 def GenerateDiskTemplate(
408   lu, template_name, instance_name, primary_node, secondary_nodes,
409   disk_info, file_storage_dir, file_driver, base_index,
410   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411   _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412   """Generate the entire disk layout for a given template type.
413
414   """
415   vgname = lu.cfg.GetVGName()
416   disk_count = len(disk_info)
417   disks = []
418
419   if template_name == constants.DT_DISKLESS:
420     pass
421   elif template_name == constants.DT_DRBD8:
422     if len(secondary_nodes) != 1:
423       raise errors.ProgrammerError("Wrong template configuration")
424     remote_node = secondary_nodes[0]
425     minors = lu.cfg.AllocateDRBDMinor(
426       [primary_node, remote_node] * len(disk_info), instance_name)
427
428     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
429                                                        full_disk_params)
430     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431
432     names = []
433     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434                                                for i in range(disk_count)]):
435       names.append(lv_prefix + "_data")
436       names.append(lv_prefix + "_meta")
437     for idx, disk in enumerate(disk_info):
438       disk_index = idx + base_index
439       data_vg = disk.get(constants.IDISK_VG, vgname)
440       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442                                       disk[constants.IDISK_SIZE],
443                                       [data_vg, meta_vg],
444                                       names[idx * 2:idx * 2 + 2],
445                                       "disk/%d" % disk_index,
446                                       minors[idx * 2], minors[idx * 2 + 1])
447       disk_dev.mode = disk[constants.IDISK_MODE]
448       disk_dev.name = disk.get(constants.IDISK_NAME, None)
449       disks.append(disk_dev)
450   else:
451     if secondary_nodes:
452       raise errors.ProgrammerError("Wrong template configuration")
453
454     if template_name == constants.DT_FILE:
455       _req_file_storage()
456     elif template_name == constants.DT_SHARED_FILE:
457       _req_shr_file_storage()
458
459     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460     if name_prefix is None:
461       names = None
462     else:
463       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464                                         (name_prefix, base_index + i)
465                                         for i in range(disk_count)])
466
467     if template_name == constants.DT_PLAIN:
468
469       def logical_id_fn(idx, _, disk):
470         vg = disk.get(constants.IDISK_VG, vgname)
471         return (vg, names[idx])
472
473     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474       logical_id_fn = \
475         lambda _, disk_index, disk: (file_driver,
476                                      "%s/disk%d" % (file_storage_dir,
477                                                     disk_index))
478     elif template_name == constants.DT_BLOCK:
479       logical_id_fn = \
480         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481                                        disk[constants.IDISK_ADOPT])
482     elif template_name == constants.DT_RBD:
483       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484     elif template_name == constants.DT_EXT:
485       def logical_id_fn(idx, _, disk):
486         provider = disk.get(constants.IDISK_PROVIDER, None)
487         if provider is None:
488           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489                                        " not found", constants.DT_EXT,
490                                        constants.IDISK_PROVIDER)
491         return (provider, names[idx])
492     else:
493       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494
495     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
496
497     for idx, disk in enumerate(disk_info):
498       params = {}
499       # Only for the Ext template add disk_info to params
500       if template_name == constants.DT_EXT:
501         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502         for key in disk:
503           if key not in constants.IDISK_PARAMS:
504             params[key] = disk[key]
505       disk_index = idx + base_index
506       size = disk[constants.IDISK_SIZE]
507       feedback_fn("* disk %s, size %s" %
508                   (disk_index, utils.FormatUnit(size, "h")))
509       disk_dev = objects.Disk(dev_type=dev_type, size=size,
510                               logical_id=logical_id_fn(idx, disk_index, disk),
511                               iv_name="disk/%d" % disk_index,
512                               mode=disk[constants.IDISK_MODE],
513                               params=params,
514                               spindles=disk.get(constants.IDISK_SPINDLES))
515       disk_dev.name = disk.get(constants.IDISK_NAME, None)
516       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517       disks.append(disk_dev)
518
519   return disks
520
521
522 def CheckSpindlesExclusiveStorage(diskdict, es_flag):
523   """Check the presence of the spindle options with exclusive_storage.
524
525   @type diskdict: dict
526   @param diskdict: disk parameters
527   @type es_flag: bool
528   @param es_flag: the effective value of the exlusive_storage flag
529   @raise errors.OpPrereqError when spindles are given and they should not
530
531   """
532   if (not es_flag and constants.IDISK_SPINDLES in diskdict and
533       diskdict[constants.IDISK_SPINDLES] is not None):
534     raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
535                                " when exclusive storage is not active",
536                                errors.ECODE_INVAL)
537
538
539 class LUInstanceRecreateDisks(LogicalUnit):
540   """Recreate an instance's missing disks.
541
542   """
543   HPATH = "instance-recreate-disks"
544   HTYPE = constants.HTYPE_INSTANCE
545   REQ_BGL = False
546
547   _MODIFYABLE = compat.UniqueFrozenset([
548     constants.IDISK_SIZE,
549     constants.IDISK_MODE,
550     constants.IDISK_SPINDLES,
551     ])
552
553   # New or changed disk parameters may have different semantics
554   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555     constants.IDISK_ADOPT,
556
557     # TODO: Implement support changing VG while recreating
558     constants.IDISK_VG,
559     constants.IDISK_METAVG,
560     constants.IDISK_PROVIDER,
561     constants.IDISK_NAME,
562     ]))
563
564   def _RunAllocator(self):
565     """Run the allocator based on input opcode.
566
567     """
568     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569
570     # FIXME
571     # The allocator should actually run in "relocate" mode, but current
572     # allocators don't support relocating all the nodes of an instance at
573     # the same time. As a workaround we use "allocate" mode, but this is
574     # suboptimal for two reasons:
575     # - The instance name passed to the allocator is present in the list of
576     #   existing instances, so there could be a conflict within the
577     #   internal structures of the allocator. This doesn't happen with the
578     #   current allocators, but it's a liability.
579     # - The allocator counts the resources used by the instance twice: once
580     #   because the instance exists already, and once because it tries to
581     #   allocate a new instance.
582     # The allocator could choose some of the nodes on which the instance is
583     # running, but that's not a problem. If the instance nodes are broken,
584     # they should be already be marked as drained or offline, and hence
585     # skipped by the allocator. If instance disks have been lost for other
586     # reasons, then recreating the disks on the same nodes should be fine.
587     disk_template = self.instance.disk_template
588     spindle_use = be_full[constants.BE_SPINDLE_USE]
589     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
590                                         disk_template=disk_template,
591                                         tags=list(self.instance.GetTags()),
592                                         os=self.instance.os,
593                                         nics=[{}],
594                                         vcpus=be_full[constants.BE_VCPUS],
595                                         memory=be_full[constants.BE_MAXMEM],
596                                         spindle_use=spindle_use,
597                                         disks=[{constants.IDISK_SIZE: d.size,
598                                                 constants.IDISK_MODE: d.mode}
599                                                for d in self.instance.disks],
600                                         hypervisor=self.instance.hypervisor,
601                                         node_whitelist=None)
602     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
603
604     ial.Run(self.op.iallocator)
605
606     assert req.RequiredNodes() == len(self.instance.all_nodes)
607
608     if not ial.success:
609       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
610                                  " %s" % (self.op.iallocator, ial.info),
611                                  errors.ECODE_NORES)
612
613     self.op.nodes = ial.result
614     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
615                  self.op.instance_name, self.op.iallocator,
616                  utils.CommaJoin(ial.result))
617
618   def CheckArguments(self):
619     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
620       # Normalize and convert deprecated list of disk indices
621       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
622
623     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
624     if duplicates:
625       raise errors.OpPrereqError("Some disks have been specified more than"
626                                  " once: %s" % utils.CommaJoin(duplicates),
627                                  errors.ECODE_INVAL)
628
629     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
630     # when neither iallocator nor nodes are specified
631     if self.op.iallocator or self.op.nodes:
632       CheckIAllocatorOrNode(self, "iallocator", "nodes")
633
634     for (idx, params) in self.op.disks:
635       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
636       unsupported = frozenset(params.keys()) - self._MODIFYABLE
637       if unsupported:
638         raise errors.OpPrereqError("Parameters for disk %s try to change"
639                                    " unmodifyable parameter(s): %s" %
640                                    (idx, utils.CommaJoin(unsupported)),
641                                    errors.ECODE_INVAL)
642
643   def ExpandNames(self):
644     self._ExpandAndLockInstance()
645     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
646
647     if self.op.nodes:
648       self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
649       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
650     else:
651       self.needed_locks[locking.LEVEL_NODE] = []
652       if self.op.iallocator:
653         # iallocator will select a new node in the same group
654         self.needed_locks[locking.LEVEL_NODEGROUP] = []
655         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
656
657     self.needed_locks[locking.LEVEL_NODE_RES] = []
658
659   def DeclareLocks(self, level):
660     if level == locking.LEVEL_NODEGROUP:
661       assert self.op.iallocator is not None
662       assert not self.op.nodes
663       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
664       self.share_locks[locking.LEVEL_NODEGROUP] = 1
665       # Lock the primary group used by the instance optimistically; this
666       # requires going via the node before it's locked, requiring
667       # verification later on
668       self.needed_locks[locking.LEVEL_NODEGROUP] = \
669         self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
670
671     elif level == locking.LEVEL_NODE:
672       # If an allocator is used, then we lock all the nodes in the current
673       # instance group, as we don't know yet which ones will be selected;
674       # if we replace the nodes without using an allocator, locks are
675       # already declared in ExpandNames; otherwise, we need to lock all the
676       # instance nodes for disk re-creation
677       if self.op.iallocator:
678         assert not self.op.nodes
679         assert not self.needed_locks[locking.LEVEL_NODE]
680         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
681
682         # Lock member nodes of the group of the primary node
683         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
684           self.needed_locks[locking.LEVEL_NODE].extend(
685             self.cfg.GetNodeGroup(group_uuid).members)
686
687         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
688       elif not self.op.nodes:
689         self._LockInstancesNodes(primary_only=False)
690     elif level == locking.LEVEL_NODE_RES:
691       # Copy node locks
692       self.needed_locks[locking.LEVEL_NODE_RES] = \
693         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
694
695   def BuildHooksEnv(self):
696     """Build hooks env.
697
698     This runs on master, primary and secondary nodes of the instance.
699
700     """
701     return BuildInstanceHookEnvByObject(self, self.instance)
702
703   def BuildHooksNodes(self):
704     """Build hooks nodes.
705
706     """
707     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
708     return (nl, nl)
709
710   def CheckPrereq(self):
711     """Check prerequisites.
712
713     This checks that the instance is in the cluster and is not running.
714
715     """
716     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
717     assert instance is not None, \
718       "Cannot retrieve locked instance %s" % self.op.instance_name
719     if self.op.nodes:
720       if len(self.op.nodes) != len(instance.all_nodes):
721         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
722                                    " %d replacement nodes were specified" %
723                                    (instance.name, len(instance.all_nodes),
724                                     len(self.op.nodes)),
725                                    errors.ECODE_INVAL)
726       assert instance.disk_template != constants.DT_DRBD8 or \
727              len(self.op.nodes) == 2
728       assert instance.disk_template != constants.DT_PLAIN or \
729              len(self.op.nodes) == 1
730       primary_node = self.op.nodes[0]
731     else:
732       primary_node = instance.primary_node
733     if not self.op.iallocator:
734       CheckNodeOnline(self, primary_node)
735
736     if instance.disk_template == constants.DT_DISKLESS:
737       raise errors.OpPrereqError("Instance '%s' has no disks" %
738                                  self.op.instance_name, errors.ECODE_INVAL)
739
740     # Verify if node group locks are still correct
741     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
742     if owned_groups:
743       # Node group locks are acquired only for the primary node (and only
744       # when the allocator is used)
745       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
746                               primary_only=True)
747
748     # if we replace nodes *and* the old primary is offline, we don't
749     # check the instance state
750     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
751     if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
752       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
753                          msg="cannot recreate disks")
754
755     if self.op.disks:
756       self.disks = dict(self.op.disks)
757     else:
758       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
759
760     maxidx = max(self.disks.keys())
761     if maxidx >= len(instance.disks):
762       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
763                                  errors.ECODE_INVAL)
764
765     if ((self.op.nodes or self.op.iallocator) and
766          sorted(self.disks.keys()) != range(len(instance.disks))):
767       raise errors.OpPrereqError("Can't recreate disks partially and"
768                                  " change the nodes at the same time",
769                                  errors.ECODE_INVAL)
770
771     self.instance = instance
772
773     if self.op.iallocator:
774       self._RunAllocator()
775       # Release unneeded node and node resource locks
776       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
777       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
778       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
779
780     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
781
782     if self.op.nodes:
783       nodes = self.op.nodes
784     else:
785       nodes = instance.all_nodes
786     excl_stor = compat.any(
787       rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
788       )
789     for new_params in self.disks.values():
790       CheckSpindlesExclusiveStorage(new_params, excl_stor)
791
792   def Exec(self, feedback_fn):
793     """Recreate the disks.
794
795     """
796     instance = self.instance
797
798     assert (self.owned_locks(locking.LEVEL_NODE) ==
799             self.owned_locks(locking.LEVEL_NODE_RES))
800
801     to_skip = []
802     mods = [] # keeps track of needed changes
803
804     for idx, disk in enumerate(instance.disks):
805       try:
806         changes = self.disks[idx]
807       except KeyError:
808         # Disk should not be recreated
809         to_skip.append(idx)
810         continue
811
812       # update secondaries for disks, if needed
813       if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
814         # need to update the nodes and minors
815         assert len(self.op.nodes) == 2
816         assert len(disk.logical_id) == 6 # otherwise disk internals
817                                          # have changed
818         (_, _, old_port, _, _, old_secret) = disk.logical_id
819         new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
820         new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
821                   new_minors[0], new_minors[1], old_secret)
822         assert len(disk.logical_id) == len(new_id)
823       else:
824         new_id = None
825
826       mods.append((idx, new_id, changes))
827
828     # now that we have passed all asserts above, we can apply the mods
829     # in a single run (to avoid partial changes)
830     for idx, new_id, changes in mods:
831       disk = instance.disks[idx]
832       if new_id is not None:
833         assert disk.dev_type == constants.LD_DRBD8
834         disk.logical_id = new_id
835       if changes:
836         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
837                     mode=changes.get(constants.IDISK_MODE, None),
838                     spindles=changes.get(constants.IDISK_SPINDLES, None))
839
840     # change primary node, if needed
841     if self.op.nodes:
842       instance.primary_node = self.op.nodes[0]
843       self.LogWarning("Changing the instance's nodes, you will have to"
844                       " remove any disks left on the older nodes manually")
845
846     if self.op.nodes:
847       self.cfg.Update(instance, feedback_fn)
848
849     # All touched nodes must be locked
850     mylocks = self.owned_locks(locking.LEVEL_NODE)
851     assert mylocks.issuperset(frozenset(instance.all_nodes))
852     new_disks = CreateDisks(self, instance, to_skip=to_skip)
853
854     # TODO: Release node locks before wiping, or explain why it's not possible
855     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
856       wipedisks = [(idx, disk, 0)
857                    for (idx, disk) in enumerate(instance.disks)
858                    if idx not in to_skip]
859       WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
860
861
862 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
863   """Checks if nodes have enough free disk space in the specified VG.
864
865   This function checks if all given nodes have the needed amount of
866   free disk. In case any node has less disk or we cannot get the
867   information from the node, this function raises an OpPrereqError
868   exception.
869
870   @type lu: C{LogicalUnit}
871   @param lu: a logical unit from which we get configuration data
872   @type nodenames: C{list}
873   @param nodenames: the list of node names to check
874   @type vg: C{str}
875   @param vg: the volume group to check
876   @type requested: C{int}
877   @param requested: the amount of disk in MiB to check for
878   @raise errors.OpPrereqError: if the node doesn't have enough disk,
879       or we cannot check the node
880
881   """
882   es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
883   # FIXME: This maps everything to storage type 'lvm-vg' to maintain
884   # the current functionality. Refactor to make it more flexible.
885   nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
886                                    es_flags)
887   for node in nodenames:
888     info = nodeinfo[node]
889     info.Raise("Cannot get current information from node %s" % node,
890                prereq=True, ecode=errors.ECODE_ENVIRON)
891     (_, (vg_info, ), _) = info.payload
892     vg_free = vg_info.get("vg_free", None)
893     if not isinstance(vg_free, int):
894       raise errors.OpPrereqError("Can't compute free disk space on node"
895                                  " %s for vg %s, result was '%s'" %
896                                  (node, vg, vg_free), errors.ECODE_ENVIRON)
897     if requested > vg_free:
898       raise errors.OpPrereqError("Not enough disk space on target node %s"
899                                  " vg %s: required %d MiB, available %d MiB" %
900                                  (node, vg, requested, vg_free),
901                                  errors.ECODE_NORES)
902
903
904 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
905   """Checks if nodes have enough free disk space in all the VGs.
906
907   This function checks if all given nodes have the needed amount of
908   free disk. In case any node has less disk or we cannot get the
909   information from the node, this function raises an OpPrereqError
910   exception.
911
912   @type lu: C{LogicalUnit}
913   @param lu: a logical unit from which we get configuration data
914   @type nodenames: C{list}
915   @param nodenames: the list of node names to check
916   @type req_sizes: C{dict}
917   @param req_sizes: the hash of vg and corresponding amount of disk in
918       MiB to check for
919   @raise errors.OpPrereqError: if the node doesn't have enough disk,
920       or we cannot check the node
921
922   """
923   for vg, req_size in req_sizes.items():
924     _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
925
926
927 def _DiskSizeInBytesToMebibytes(lu, size):
928   """Converts a disk size in bytes to mebibytes.
929
930   Warns and rounds up if the size isn't an even multiple of 1 MiB.
931
932   """
933   (mib, remainder) = divmod(size, 1024 * 1024)
934
935   if remainder != 0:
936     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
937                   " to not overwrite existing data (%s bytes will not be"
938                   " wiped)", (1024 * 1024) - remainder)
939     mib += 1
940
941   return mib
942
943
944 def _CalcEta(time_taken, written, total_size):
945   """Calculates the ETA based on size written and total size.
946
947   @param time_taken: The time taken so far
948   @param written: amount written so far
949   @param total_size: The total size of data to be written
950   @return: The remaining time in seconds
951
952   """
953   avg_time = time_taken / float(written)
954   return (total_size - written) * avg_time
955
956
957 def WipeDisks(lu, instance, disks=None):
958   """Wipes instance disks.
959
960   @type lu: L{LogicalUnit}
961   @param lu: the logical unit on whose behalf we execute
962   @type instance: L{objects.Instance}
963   @param instance: the instance whose disks we should create
964   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
965   @param disks: Disk details; tuple contains disk index, disk object and the
966     start offset
967
968   """
969   node = instance.primary_node
970
971   if disks is None:
972     disks = [(idx, disk, 0)
973              for (idx, disk) in enumerate(instance.disks)]
974
975   for (_, device, _) in disks:
976     lu.cfg.SetDiskID(device, node)
977
978   logging.info("Pausing synchronization of disks of instance '%s'",
979                instance.name)
980   result = lu.rpc.call_blockdev_pause_resume_sync(node,
981                                                   (map(compat.snd, disks),
982                                                    instance),
983                                                   True)
984   result.Raise("Failed to pause disk synchronization on node '%s'" % node)
985
986   for idx, success in enumerate(result.payload):
987     if not success:
988       logging.warn("Pausing synchronization of disk %s of instance '%s'"
989                    " failed", idx, instance.name)
990
991   try:
992     for (idx, device, offset) in disks:
993       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
994       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
995       wipe_chunk_size = \
996         int(min(constants.MAX_WIPE_CHUNK,
997                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
998
999       size = device.size
1000       last_output = 0
1001       start_time = time.time()
1002
1003       if offset == 0:
1004         info_text = ""
1005       else:
1006         info_text = (" (from %s to %s)" %
1007                      (utils.FormatUnit(offset, "h"),
1008                       utils.FormatUnit(size, "h")))
1009
1010       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1011
1012       logging.info("Wiping disk %d for instance %s on node %s using"
1013                    " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1014
1015       while offset < size:
1016         wipe_size = min(wipe_chunk_size, size - offset)
1017
1018         logging.debug("Wiping disk %d, offset %s, chunk %s",
1019                       idx, offset, wipe_size)
1020
1021         result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1022                                            wipe_size)
1023         result.Raise("Could not wipe disk %d at offset %d for size %d" %
1024                      (idx, offset, wipe_size))
1025
1026         now = time.time()
1027         offset += wipe_size
1028         if now - last_output >= 60:
1029           eta = _CalcEta(now - start_time, offset, size)
1030           lu.LogInfo(" - done: %.1f%% ETA: %s",
1031                      offset / float(size) * 100, utils.FormatSeconds(eta))
1032           last_output = now
1033   finally:
1034     logging.info("Resuming synchronization of disks for instance '%s'",
1035                  instance.name)
1036
1037     result = lu.rpc.call_blockdev_pause_resume_sync(node,
1038                                                     (map(compat.snd, disks),
1039                                                      instance),
1040                                                     False)
1041
1042     if result.fail_msg:
1043       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1044                     node, result.fail_msg)
1045     else:
1046       for idx, success in enumerate(result.payload):
1047         if not success:
1048           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1049                         " failed", idx, instance.name)
1050
1051
1052 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1053   """Wrapper for L{WipeDisks} that handles errors.
1054
1055   @type lu: L{LogicalUnit}
1056   @param lu: the logical unit on whose behalf we execute
1057   @type instance: L{objects.Instance}
1058   @param instance: the instance whose disks we should wipe
1059   @param disks: see L{WipeDisks}
1060   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1061       case of error
1062   @raise errors.OpPrereqError: in case of failure
1063
1064   """
1065   try:
1066     WipeDisks(lu, instance, disks=disks)
1067   except errors.OpExecError:
1068     logging.warning("Wiping disks for instance '%s' failed",
1069                     instance.name)
1070     _UndoCreateDisks(lu, cleanup)
1071     raise
1072
1073
1074 def ExpandCheckDisks(instance, disks):
1075   """Return the instance disks selected by the disks list
1076
1077   @type disks: list of L{objects.Disk} or None
1078   @param disks: selected disks
1079   @rtype: list of L{objects.Disk}
1080   @return: selected instance disks to act on
1081
1082   """
1083   if disks is None:
1084     return instance.disks
1085   else:
1086     if not set(disks).issubset(instance.disks):
1087       raise errors.ProgrammerError("Can only act on disks belonging to the"
1088                                    " target instance")
1089     return disks
1090
1091
1092 def WaitForSync(lu, instance, disks=None, oneshot=False):
1093   """Sleep and poll for an instance's disk to sync.
1094
1095   """
1096   if not instance.disks or disks is not None and not disks:
1097     return True
1098
1099   disks = ExpandCheckDisks(instance, disks)
1100
1101   if not oneshot:
1102     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1103
1104   node = instance.primary_node
1105
1106   for dev in disks:
1107     lu.cfg.SetDiskID(dev, node)
1108
1109   # TODO: Convert to utils.Retry
1110
1111   retries = 0
1112   degr_retries = 10 # in seconds, as we sleep 1 second each time
1113   while True:
1114     max_time = 0
1115     done = True
1116     cumul_degraded = False
1117     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1118     msg = rstats.fail_msg
1119     if msg:
1120       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1121       retries += 1
1122       if retries >= 10:
1123         raise errors.RemoteError("Can't contact node %s for mirror data,"
1124                                  " aborting." % node)
1125       time.sleep(6)
1126       continue
1127     rstats = rstats.payload
1128     retries = 0
1129     for i, mstat in enumerate(rstats):
1130       if mstat is None:
1131         lu.LogWarning("Can't compute data for node %s/%s",
1132                       node, disks[i].iv_name)
1133         continue
1134
1135       cumul_degraded = (cumul_degraded or
1136                         (mstat.is_degraded and mstat.sync_percent is None))
1137       if mstat.sync_percent is not None:
1138         done = False
1139         if mstat.estimated_time is not None:
1140           rem_time = ("%s remaining (estimated)" %
1141                       utils.FormatSeconds(mstat.estimated_time))
1142           max_time = mstat.estimated_time
1143         else:
1144           rem_time = "no time estimate"
1145         lu.LogInfo("- device %s: %5.2f%% done, %s",
1146                    disks[i].iv_name, mstat.sync_percent, rem_time)
1147
1148     # if we're done but degraded, let's do a few small retries, to
1149     # make sure we see a stable and not transient situation; therefore
1150     # we force restart of the loop
1151     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1152       logging.info("Degraded disks found, %d retries left", degr_retries)
1153       degr_retries -= 1
1154       time.sleep(1)
1155       continue
1156
1157     if done or oneshot:
1158       break
1159
1160     time.sleep(min(60, max_time))
1161
1162   if done:
1163     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1164
1165   return not cumul_degraded
1166
1167
1168 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1169   """Shutdown block devices of an instance.
1170
1171   This does the shutdown on all nodes of the instance.
1172
1173   If the ignore_primary is false, errors on the primary node are
1174   ignored.
1175
1176   """
1177   all_result = True
1178   disks = ExpandCheckDisks(instance, disks)
1179
1180   for disk in disks:
1181     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1182       lu.cfg.SetDiskID(top_disk, node)
1183       result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1184       msg = result.fail_msg
1185       if msg:
1186         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1187                       disk.iv_name, node, msg)
1188         if ((node == instance.primary_node and not ignore_primary) or
1189             (node != instance.primary_node and not result.offline)):
1190           all_result = False
1191   return all_result
1192
1193
1194 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1195   """Shutdown block devices of an instance.
1196
1197   This function checks if an instance is running, before calling
1198   _ShutdownInstanceDisks.
1199
1200   """
1201   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1202   ShutdownInstanceDisks(lu, instance, disks=disks)
1203
1204
1205 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1206                            ignore_size=False):
1207   """Prepare the block devices for an instance.
1208
1209   This sets up the block devices on all nodes.
1210
1211   @type lu: L{LogicalUnit}
1212   @param lu: the logical unit on whose behalf we execute
1213   @type instance: L{objects.Instance}
1214   @param instance: the instance for whose disks we assemble
1215   @type disks: list of L{objects.Disk} or None
1216   @param disks: which disks to assemble (or all, if None)
1217   @type ignore_secondaries: boolean
1218   @param ignore_secondaries: if true, errors on secondary nodes
1219       won't result in an error return from the function
1220   @type ignore_size: boolean
1221   @param ignore_size: if true, the current known size of the disk
1222       will not be used during the disk activation, useful for cases
1223       when the size is wrong
1224   @return: False if the operation failed, otherwise a list of
1225       (host, instance_visible_name, node_visible_name)
1226       with the mapping from node devices to instance devices
1227
1228   """
1229   device_info = []
1230   disks_ok = True
1231   iname = instance.name
1232   disks = ExpandCheckDisks(instance, disks)
1233
1234   # With the two passes mechanism we try to reduce the window of
1235   # opportunity for the race condition of switching DRBD to primary
1236   # before handshaking occured, but we do not eliminate it
1237
1238   # The proper fix would be to wait (with some limits) until the
1239   # connection has been made and drbd transitions from WFConnection
1240   # into any other network-connected state (Connected, SyncTarget,
1241   # SyncSource, etc.)
1242
1243   # 1st pass, assemble on all nodes in secondary mode
1244   for idx, inst_disk in enumerate(disks):
1245     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1246       if ignore_size:
1247         node_disk = node_disk.Copy()
1248         node_disk.UnsetSize()
1249       lu.cfg.SetDiskID(node_disk, node)
1250       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1251                                              False, idx)
1252       msg = result.fail_msg
1253       if msg:
1254         is_offline_secondary = (node in instance.secondary_nodes and
1255                                 result.offline)
1256         lu.LogWarning("Could not prepare block device %s on node %s"
1257                       " (is_primary=False, pass=1): %s",
1258                       inst_disk.iv_name, node, msg)
1259         if not (ignore_secondaries or is_offline_secondary):
1260           disks_ok = False
1261
1262   # FIXME: race condition on drbd migration to primary
1263
1264   # 2nd pass, do only the primary node
1265   for idx, inst_disk in enumerate(disks):
1266     dev_path = None
1267
1268     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1269       if node != instance.primary_node:
1270         continue
1271       if ignore_size:
1272         node_disk = node_disk.Copy()
1273         node_disk.UnsetSize()
1274       lu.cfg.SetDiskID(node_disk, node)
1275       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1276                                              True, idx)
1277       msg = result.fail_msg
1278       if msg:
1279         lu.LogWarning("Could not prepare block device %s on node %s"
1280                       " (is_primary=True, pass=2): %s",
1281                       inst_disk.iv_name, node, msg)
1282         disks_ok = False
1283       else:
1284         dev_path = result.payload
1285
1286     device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1287
1288   # leave the disks configured for the primary node
1289   # this is a workaround that would be fixed better by
1290   # improving the logical/physical id handling
1291   for disk in disks:
1292     lu.cfg.SetDiskID(disk, instance.primary_node)
1293
1294   return disks_ok, device_info
1295
1296
1297 def StartInstanceDisks(lu, instance, force):
1298   """Start the disks of an instance.
1299
1300   """
1301   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1302                                       ignore_secondaries=force)
1303   if not disks_ok:
1304     ShutdownInstanceDisks(lu, instance)
1305     if force is not None and not force:
1306       lu.LogWarning("",
1307                     hint=("If the message above refers to a secondary node,"
1308                           " you can retry the operation using '--force'"))
1309     raise errors.OpExecError("Disk consistency error")
1310
1311
1312 class LUInstanceGrowDisk(LogicalUnit):
1313   """Grow a disk of an instance.
1314
1315   """
1316   HPATH = "disk-grow"
1317   HTYPE = constants.HTYPE_INSTANCE
1318   REQ_BGL = False
1319
1320   def ExpandNames(self):
1321     self._ExpandAndLockInstance()
1322     self.needed_locks[locking.LEVEL_NODE] = []
1323     self.needed_locks[locking.LEVEL_NODE_RES] = []
1324     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1325     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1326
1327   def DeclareLocks(self, level):
1328     if level == locking.LEVEL_NODE:
1329       self._LockInstancesNodes()
1330     elif level == locking.LEVEL_NODE_RES:
1331       # Copy node locks
1332       self.needed_locks[locking.LEVEL_NODE_RES] = \
1333         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1334
1335   def BuildHooksEnv(self):
1336     """Build hooks env.
1337
1338     This runs on the master, the primary and all the secondaries.
1339
1340     """
1341     env = {
1342       "DISK": self.op.disk,
1343       "AMOUNT": self.op.amount,
1344       "ABSOLUTE": self.op.absolute,
1345       }
1346     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1347     return env
1348
1349   def BuildHooksNodes(self):
1350     """Build hooks nodes.
1351
1352     """
1353     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1354     return (nl, nl)
1355
1356   def CheckPrereq(self):
1357     """Check prerequisites.
1358
1359     This checks that the instance is in the cluster.
1360
1361     """
1362     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1363     assert instance is not None, \
1364       "Cannot retrieve locked instance %s" % self.op.instance_name
1365     nodenames = list(instance.all_nodes)
1366     for node in nodenames:
1367       CheckNodeOnline(self, node)
1368
1369     self.instance = instance
1370
1371     if instance.disk_template not in constants.DTS_GROWABLE:
1372       raise errors.OpPrereqError("Instance's disk layout does not support"
1373                                  " growing", errors.ECODE_INVAL)
1374
1375     self.disk = instance.FindDisk(self.op.disk)
1376
1377     if self.op.absolute:
1378       self.target = self.op.amount
1379       self.delta = self.target - self.disk.size
1380       if self.delta < 0:
1381         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1382                                    "current disk size (%s)" %
1383                                    (utils.FormatUnit(self.target, "h"),
1384                                     utils.FormatUnit(self.disk.size, "h")),
1385                                    errors.ECODE_STATE)
1386     else:
1387       self.delta = self.op.amount
1388       self.target = self.disk.size + self.delta
1389       if self.delta < 0:
1390         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1391                                    utils.FormatUnit(self.delta, "h"),
1392                                    errors.ECODE_INVAL)
1393
1394     self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1395
1396   def _CheckDiskSpace(self, nodenames, req_vgspace):
1397     template = self.instance.disk_template
1398     if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1399       # TODO: check the free disk space for file, when that feature will be
1400       # supported
1401       nodes = map(self.cfg.GetNodeInfo, nodenames)
1402       es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1403                         nodes)
1404       if es_nodes:
1405         # With exclusive storage we need to something smarter than just looking
1406         # at free space; for now, let's simply abort the operation.
1407         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1408                                    " is enabled", errors.ECODE_STATE)
1409       CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1410
1411   def Exec(self, feedback_fn):
1412     """Execute disk grow.
1413
1414     """
1415     instance = self.instance
1416     disk = self.disk
1417
1418     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1419     assert (self.owned_locks(locking.LEVEL_NODE) ==
1420             self.owned_locks(locking.LEVEL_NODE_RES))
1421
1422     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1423
1424     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1425     if not disks_ok:
1426       raise errors.OpExecError("Cannot activate block device to grow")
1427
1428     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1429                 (self.op.disk, instance.name,
1430                  utils.FormatUnit(self.delta, "h"),
1431                  utils.FormatUnit(self.target, "h")))
1432
1433     # First run all grow ops in dry-run mode
1434     for node in instance.all_nodes:
1435       self.cfg.SetDiskID(disk, node)
1436       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1437                                            True, True)
1438       result.Raise("Dry-run grow request failed to node %s" % node)
1439
1440     if wipe_disks:
1441       # Get disk size from primary node for wiping
1442       result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1443       result.Raise("Failed to retrieve disk size from node '%s'" %
1444                    instance.primary_node)
1445
1446       (disk_size_in_bytes, ) = result.payload
1447
1448       if disk_size_in_bytes is None:
1449         raise errors.OpExecError("Failed to retrieve disk size from primary"
1450                                  " node '%s'" % instance.primary_node)
1451
1452       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1453
1454       assert old_disk_size >= disk.size, \
1455         ("Retrieved disk size too small (got %s, should be at least %s)" %
1456          (old_disk_size, disk.size))
1457     else:
1458       old_disk_size = None
1459
1460     # We know that (as far as we can test) operations across different
1461     # nodes will succeed, time to run it for real on the backing storage
1462     for node in instance.all_nodes:
1463       self.cfg.SetDiskID(disk, node)
1464       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1465                                            False, True)
1466       result.Raise("Grow request failed to node %s" % node)
1467
1468     # And now execute it for logical storage, on the primary node
1469     node = instance.primary_node
1470     self.cfg.SetDiskID(disk, node)
1471     result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1472                                          False, False)
1473     result.Raise("Grow request failed to node %s" % node)
1474
1475     disk.RecordGrow(self.delta)
1476     self.cfg.Update(instance, feedback_fn)
1477
1478     # Changes have been recorded, release node lock
1479     ReleaseLocks(self, locking.LEVEL_NODE)
1480
1481     # Downgrade lock while waiting for sync
1482     self.glm.downgrade(locking.LEVEL_INSTANCE)
1483
1484     assert wipe_disks ^ (old_disk_size is None)
1485
1486     if wipe_disks:
1487       assert instance.disks[self.op.disk] == disk
1488
1489       # Wipe newly added disk space
1490       WipeDisks(self, instance,
1491                 disks=[(self.op.disk, disk, old_disk_size)])
1492
1493     if self.op.wait_for_sync:
1494       disk_abort = not WaitForSync(self, instance, disks=[disk])
1495       if disk_abort:
1496         self.LogWarning("Disk syncing has not returned a good status; check"
1497                         " the instance")
1498       if instance.admin_state != constants.ADMINST_UP:
1499         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1500     elif instance.admin_state != constants.ADMINST_UP:
1501       self.LogWarning("Not shutting down the disk even if the instance is"
1502                       " not supposed to be running because no wait for"
1503                       " sync mode was requested")
1504
1505     assert self.owned_locks(locking.LEVEL_NODE_RES)
1506     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1507
1508
1509 class LUInstanceReplaceDisks(LogicalUnit):
1510   """Replace the disks of an instance.
1511
1512   """
1513   HPATH = "mirrors-replace"
1514   HTYPE = constants.HTYPE_INSTANCE
1515   REQ_BGL = False
1516
1517   def CheckArguments(self):
1518     """Check arguments.
1519
1520     """
1521     remote_node = self.op.remote_node
1522     ialloc = self.op.iallocator
1523     if self.op.mode == constants.REPLACE_DISK_CHG:
1524       if remote_node is None and ialloc is None:
1525         raise errors.OpPrereqError("When changing the secondary either an"
1526                                    " iallocator script must be used or the"
1527                                    " new node given", errors.ECODE_INVAL)
1528       else:
1529         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1530
1531     elif remote_node is not None or ialloc is not None:
1532       # Not replacing the secondary
1533       raise errors.OpPrereqError("The iallocator and new node options can"
1534                                  " only be used when changing the"
1535                                  " secondary node", errors.ECODE_INVAL)
1536
1537   def ExpandNames(self):
1538     self._ExpandAndLockInstance()
1539
1540     assert locking.LEVEL_NODE not in self.needed_locks
1541     assert locking.LEVEL_NODE_RES not in self.needed_locks
1542     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1543
1544     assert self.op.iallocator is None or self.op.remote_node is None, \
1545       "Conflicting options"
1546
1547     if self.op.remote_node is not None:
1548       self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1549
1550       # Warning: do not remove the locking of the new secondary here
1551       # unless DRBD8Dev.AddChildren is changed to work in parallel;
1552       # currently it doesn't since parallel invocations of
1553       # FindUnusedMinor will conflict
1554       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1555       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1556     else:
1557       self.needed_locks[locking.LEVEL_NODE] = []
1558       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1559
1560       if self.op.iallocator is not None:
1561         # iallocator will select a new node in the same group
1562         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1563         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1564
1565     self.needed_locks[locking.LEVEL_NODE_RES] = []
1566
1567     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1568                                    self.op.iallocator, self.op.remote_node,
1569                                    self.op.disks, self.op.early_release,
1570                                    self.op.ignore_ipolicy)
1571
1572     self.tasklets = [self.replacer]
1573
1574   def DeclareLocks(self, level):
1575     if level == locking.LEVEL_NODEGROUP:
1576       assert self.op.remote_node is None
1577       assert self.op.iallocator is not None
1578       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1579
1580       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1581       # Lock all groups used by instance optimistically; this requires going
1582       # via the node before it's locked, requiring verification later on
1583       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1584         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1585
1586     elif level == locking.LEVEL_NODE:
1587       if self.op.iallocator is not None:
1588         assert self.op.remote_node is None
1589         assert not self.needed_locks[locking.LEVEL_NODE]
1590         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1591
1592         # Lock member nodes of all locked groups
1593         self.needed_locks[locking.LEVEL_NODE] = \
1594           [node_name
1595            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1596            for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1597       else:
1598         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1599
1600         self._LockInstancesNodes()
1601
1602     elif level == locking.LEVEL_NODE_RES:
1603       # Reuse node locks
1604       self.needed_locks[locking.LEVEL_NODE_RES] = \
1605         self.needed_locks[locking.LEVEL_NODE]
1606
1607   def BuildHooksEnv(self):
1608     """Build hooks env.
1609
1610     This runs on the master, the primary and all the secondaries.
1611
1612     """
1613     instance = self.replacer.instance
1614     env = {
1615       "MODE": self.op.mode,
1616       "NEW_SECONDARY": self.op.remote_node,
1617       "OLD_SECONDARY": instance.secondary_nodes[0],
1618       }
1619     env.update(BuildInstanceHookEnvByObject(self, instance))
1620     return env
1621
1622   def BuildHooksNodes(self):
1623     """Build hooks nodes.
1624
1625     """
1626     instance = self.replacer.instance
1627     nl = [
1628       self.cfg.GetMasterNode(),
1629       instance.primary_node,
1630       ]
1631     if self.op.remote_node is not None:
1632       nl.append(self.op.remote_node)
1633     return nl, nl
1634
1635   def CheckPrereq(self):
1636     """Check prerequisites.
1637
1638     """
1639     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1640             self.op.iallocator is None)
1641
1642     # Verify if node group locks are still correct
1643     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1644     if owned_groups:
1645       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1646
1647     return LogicalUnit.CheckPrereq(self)
1648
1649
1650 class LUInstanceActivateDisks(NoHooksLU):
1651   """Bring up an instance's disks.
1652
1653   """
1654   REQ_BGL = False
1655
1656   def ExpandNames(self):
1657     self._ExpandAndLockInstance()
1658     self.needed_locks[locking.LEVEL_NODE] = []
1659     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1660
1661   def DeclareLocks(self, level):
1662     if level == locking.LEVEL_NODE:
1663       self._LockInstancesNodes()
1664
1665   def CheckPrereq(self):
1666     """Check prerequisites.
1667
1668     This checks that the instance is in the cluster.
1669
1670     """
1671     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1672     assert self.instance is not None, \
1673       "Cannot retrieve locked instance %s" % self.op.instance_name
1674     CheckNodeOnline(self, self.instance.primary_node)
1675
1676   def Exec(self, feedback_fn):
1677     """Activate the disks.
1678
1679     """
1680     disks_ok, disks_info = \
1681               AssembleInstanceDisks(self, self.instance,
1682                                     ignore_size=self.op.ignore_size)
1683     if not disks_ok:
1684       raise errors.OpExecError("Cannot activate block devices")
1685
1686     if self.op.wait_for_sync:
1687       if not WaitForSync(self, self.instance):
1688         raise errors.OpExecError("Some disks of the instance are degraded!")
1689
1690     return disks_info
1691
1692
1693 class LUInstanceDeactivateDisks(NoHooksLU):
1694   """Shutdown an instance's disks.
1695
1696   """
1697   REQ_BGL = False
1698
1699   def ExpandNames(self):
1700     self._ExpandAndLockInstance()
1701     self.needed_locks[locking.LEVEL_NODE] = []
1702     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1703
1704   def DeclareLocks(self, level):
1705     if level == locking.LEVEL_NODE:
1706       self._LockInstancesNodes()
1707
1708   def CheckPrereq(self):
1709     """Check prerequisites.
1710
1711     This checks that the instance is in the cluster.
1712
1713     """
1714     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1715     assert self.instance is not None, \
1716       "Cannot retrieve locked instance %s" % self.op.instance_name
1717
1718   def Exec(self, feedback_fn):
1719     """Deactivate the disks
1720
1721     """
1722     instance = self.instance
1723     if self.op.force:
1724       ShutdownInstanceDisks(self, instance)
1725     else:
1726       _SafeShutdownInstanceDisks(self, instance)
1727
1728
1729 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1730                                ldisk=False):
1731   """Check that mirrors are not degraded.
1732
1733   @attention: The device has to be annotated already.
1734
1735   The ldisk parameter, if True, will change the test from the
1736   is_degraded attribute (which represents overall non-ok status for
1737   the device(s)) to the ldisk (representing the local storage status).
1738
1739   """
1740   lu.cfg.SetDiskID(dev, node)
1741
1742   result = True
1743
1744   if on_primary or dev.AssembleOnSecondary():
1745     rstats = lu.rpc.call_blockdev_find(node, dev)
1746     msg = rstats.fail_msg
1747     if msg:
1748       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1749       result = False
1750     elif not rstats.payload:
1751       lu.LogWarning("Can't find disk on node %s", node)
1752       result = False
1753     else:
1754       if ldisk:
1755         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1756       else:
1757         result = result and not rstats.payload.is_degraded
1758
1759   if dev.children:
1760     for child in dev.children:
1761       result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1762                                                      on_primary)
1763
1764   return result
1765
1766
1767 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1768   """Wrapper around L{_CheckDiskConsistencyInner}.
1769
1770   """
1771   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1772   return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1773                                     ldisk=ldisk)
1774
1775
1776 def _BlockdevFind(lu, node, dev, instance):
1777   """Wrapper around call_blockdev_find to annotate diskparams.
1778
1779   @param lu: A reference to the lu object
1780   @param node: The node to call out
1781   @param dev: The device to find
1782   @param instance: The instance object the device belongs to
1783   @returns The result of the rpc call
1784
1785   """
1786   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1787   return lu.rpc.call_blockdev_find(node, disk)
1788
1789
1790 def _GenerateUniqueNames(lu, exts):
1791   """Generate a suitable LV name.
1792
1793   This will generate a logical volume name for the given instance.
1794
1795   """
1796   results = []
1797   for val in exts:
1798     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1799     results.append("%s%s" % (new_id, val))
1800   return results
1801
1802
1803 class TLReplaceDisks(Tasklet):
1804   """Replaces disks for an instance.
1805
1806   Note: Locking is not within the scope of this class.
1807
1808   """
1809   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1810                disks, early_release, ignore_ipolicy):
1811     """Initializes this class.
1812
1813     """
1814     Tasklet.__init__(self, lu)
1815
1816     # Parameters
1817     self.instance_name = instance_name
1818     self.mode = mode
1819     self.iallocator_name = iallocator_name
1820     self.remote_node = remote_node
1821     self.disks = disks
1822     self.early_release = early_release
1823     self.ignore_ipolicy = ignore_ipolicy
1824
1825     # Runtime data
1826     self.instance = None
1827     self.new_node = None
1828     self.target_node = None
1829     self.other_node = None
1830     self.remote_node_info = None
1831     self.node_secondary_ip = None
1832
1833   @staticmethod
1834   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1835     """Compute a new secondary node using an IAllocator.
1836
1837     """
1838     req = iallocator.IAReqRelocate(name=instance_name,
1839                                    relocate_from=list(relocate_from))
1840     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1841
1842     ial.Run(iallocator_name)
1843
1844     if not ial.success:
1845       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1846                                  " %s" % (iallocator_name, ial.info),
1847                                  errors.ECODE_NORES)
1848
1849     remote_node_name = ial.result[0]
1850
1851     lu.LogInfo("Selected new secondary for instance '%s': %s",
1852                instance_name, remote_node_name)
1853
1854     return remote_node_name
1855
1856   def _FindFaultyDisks(self, node_name):
1857     """Wrapper for L{FindFaultyInstanceDisks}.
1858
1859     """
1860     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1861                                    node_name, True)
1862
1863   def _CheckDisksActivated(self, instance):
1864     """Checks if the instance disks are activated.
1865
1866     @param instance: The instance to check disks
1867     @return: True if they are activated, False otherwise
1868
1869     """
1870     nodes = instance.all_nodes
1871
1872     for idx, dev in enumerate(instance.disks):
1873       for node in nodes:
1874         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1875         self.cfg.SetDiskID(dev, node)
1876
1877         result = _BlockdevFind(self, node, dev, instance)
1878
1879         if result.offline:
1880           continue
1881         elif result.fail_msg or not result.payload:
1882           return False
1883
1884     return True
1885
1886   def CheckPrereq(self):
1887     """Check prerequisites.
1888
1889     This checks that the instance is in the cluster.
1890
1891     """
1892     self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1893     assert instance is not None, \
1894       "Cannot retrieve locked instance %s" % self.instance_name
1895
1896     if instance.disk_template != constants.DT_DRBD8:
1897       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1898                                  " instances", errors.ECODE_INVAL)
1899
1900     if len(instance.secondary_nodes) != 1:
1901       raise errors.OpPrereqError("The instance has a strange layout,"
1902                                  " expected one secondary but found %d" %
1903                                  len(instance.secondary_nodes),
1904                                  errors.ECODE_FAULT)
1905
1906     instance = self.instance
1907     secondary_node = instance.secondary_nodes[0]
1908
1909     if self.iallocator_name is None:
1910       remote_node = self.remote_node
1911     else:
1912       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1913                                        instance.name, instance.secondary_nodes)
1914
1915     if remote_node is None:
1916       self.remote_node_info = None
1917     else:
1918       assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1919              "Remote node '%s' is not locked" % remote_node
1920
1921       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1922       assert self.remote_node_info is not None, \
1923         "Cannot retrieve locked node %s" % remote_node
1924
1925     if remote_node == self.instance.primary_node:
1926       raise errors.OpPrereqError("The specified node is the primary node of"
1927                                  " the instance", errors.ECODE_INVAL)
1928
1929     if remote_node == secondary_node:
1930       raise errors.OpPrereqError("The specified node is already the"
1931                                  " secondary node of the instance",
1932                                  errors.ECODE_INVAL)
1933
1934     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1935                                     constants.REPLACE_DISK_CHG):
1936       raise errors.OpPrereqError("Cannot specify disks to be replaced",
1937                                  errors.ECODE_INVAL)
1938
1939     if self.mode == constants.REPLACE_DISK_AUTO:
1940       if not self._CheckDisksActivated(instance):
1941         raise errors.OpPrereqError("Please run activate-disks on instance %s"
1942                                    " first" % self.instance_name,
1943                                    errors.ECODE_STATE)
1944       faulty_primary = self._FindFaultyDisks(instance.primary_node)
1945       faulty_secondary = self._FindFaultyDisks(secondary_node)
1946
1947       if faulty_primary and faulty_secondary:
1948         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1949                                    " one node and can not be repaired"
1950                                    " automatically" % self.instance_name,
1951                                    errors.ECODE_STATE)
1952
1953       if faulty_primary:
1954         self.disks = faulty_primary
1955         self.target_node = instance.primary_node
1956         self.other_node = secondary_node
1957         check_nodes = [self.target_node, self.other_node]
1958       elif faulty_secondary:
1959         self.disks = faulty_secondary
1960         self.target_node = secondary_node
1961         self.other_node = instance.primary_node
1962         check_nodes = [self.target_node, self.other_node]
1963       else:
1964         self.disks = []
1965         check_nodes = []
1966
1967     else:
1968       # Non-automatic modes
1969       if self.mode == constants.REPLACE_DISK_PRI:
1970         self.target_node = instance.primary_node
1971         self.other_node = secondary_node
1972         check_nodes = [self.target_node, self.other_node]
1973
1974       elif self.mode == constants.REPLACE_DISK_SEC:
1975         self.target_node = secondary_node
1976         self.other_node = instance.primary_node
1977         check_nodes = [self.target_node, self.other_node]
1978
1979       elif self.mode == constants.REPLACE_DISK_CHG:
1980         self.new_node = remote_node
1981         self.other_node = instance.primary_node
1982         self.target_node = secondary_node
1983         check_nodes = [self.new_node, self.other_node]
1984
1985         CheckNodeNotDrained(self.lu, remote_node)
1986         CheckNodeVmCapable(self.lu, remote_node)
1987
1988         old_node_info = self.cfg.GetNodeInfo(secondary_node)
1989         assert old_node_info is not None
1990         if old_node_info.offline and not self.early_release:
1991           # doesn't make sense to delay the release
1992           self.early_release = True
1993           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1994                           " early-release mode", secondary_node)
1995
1996       else:
1997         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1998                                      self.mode)
1999
2000       # If not specified all disks should be replaced
2001       if not self.disks:
2002         self.disks = range(len(self.instance.disks))
2003
2004     # TODO: This is ugly, but right now we can't distinguish between internal
2005     # submitted opcode and external one. We should fix that.
2006     if self.remote_node_info:
2007       # We change the node, lets verify it still meets instance policy
2008       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2009       cluster = self.cfg.GetClusterInfo()
2010       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2011                                                               new_group_info)
2012       CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2013                              self.cfg, ignore=self.ignore_ipolicy)
2014
2015     for node in check_nodes:
2016       CheckNodeOnline(self.lu, node)
2017
2018     touched_nodes = frozenset(node_name for node_name in [self.new_node,
2019                                                           self.other_node,
2020                                                           self.target_node]
2021                               if node_name is not None)
2022
2023     # Release unneeded node and node resource locks
2024     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2025     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2026     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2027
2028     # Release any owned node group
2029     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2030
2031     # Check whether disks are valid
2032     for disk_idx in self.disks:
2033       instance.FindDisk(disk_idx)
2034
2035     # Get secondary node IP addresses
2036     self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2037                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2038
2039   def Exec(self, feedback_fn):
2040     """Execute disk replacement.
2041
2042     This dispatches the disk replacement to the appropriate handler.
2043
2044     """
2045     if __debug__:
2046       # Verify owned locks before starting operation
2047       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2048       assert set(owned_nodes) == set(self.node_secondary_ip), \
2049           ("Incorrect node locks, owning %s, expected %s" %
2050            (owned_nodes, self.node_secondary_ip.keys()))
2051       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2052               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2053       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2054
2055       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2056       assert list(owned_instances) == [self.instance_name], \
2057           "Instance '%s' not locked" % self.instance_name
2058
2059       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2060           "Should not own any node group lock at this point"
2061
2062     if not self.disks:
2063       feedback_fn("No disks need replacement for instance '%s'" %
2064                   self.instance.name)
2065       return
2066
2067     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2068                 (utils.CommaJoin(self.disks), self.instance.name))
2069     feedback_fn("Current primary node: %s" % self.instance.primary_node)
2070     feedback_fn("Current seconary node: %s" %
2071                 utils.CommaJoin(self.instance.secondary_nodes))
2072
2073     activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2074
2075     # Activate the instance disks if we're replacing them on a down instance
2076     if activate_disks:
2077       StartInstanceDisks(self.lu, self.instance, True)
2078
2079     try:
2080       # Should we replace the secondary node?
2081       if self.new_node is not None:
2082         fn = self._ExecDrbd8Secondary
2083       else:
2084         fn = self._ExecDrbd8DiskOnly
2085
2086       result = fn(feedback_fn)
2087     finally:
2088       # Deactivate the instance disks if we're replacing them on a
2089       # down instance
2090       if activate_disks:
2091         _SafeShutdownInstanceDisks(self.lu, self.instance)
2092
2093     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2094
2095     if __debug__:
2096       # Verify owned locks
2097       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2098       nodes = frozenset(self.node_secondary_ip)
2099       assert ((self.early_release and not owned_nodes) or
2100               (not self.early_release and not (set(owned_nodes) - nodes))), \
2101         ("Not owning the correct locks, early_release=%s, owned=%r,"
2102          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2103
2104     return result
2105
2106   def _CheckVolumeGroup(self, nodes):
2107     self.lu.LogInfo("Checking volume groups")
2108
2109     vgname = self.cfg.GetVGName()
2110
2111     # Make sure volume group exists on all involved nodes
2112     results = self.rpc.call_vg_list(nodes)
2113     if not results:
2114       raise errors.OpExecError("Can't list volume groups on the nodes")
2115
2116     for node in nodes:
2117       res = results[node]
2118       res.Raise("Error checking node %s" % node)
2119       if vgname not in res.payload:
2120         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2121                                  (vgname, node))
2122
2123   def _CheckDisksExistence(self, nodes):
2124     # Check disk existence
2125     for idx, dev in enumerate(self.instance.disks):
2126       if idx not in self.disks:
2127         continue
2128
2129       for node in nodes:
2130         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2131         self.cfg.SetDiskID(dev, node)
2132
2133         result = _BlockdevFind(self, node, dev, self.instance)
2134
2135         msg = result.fail_msg
2136         if msg or not result.payload:
2137           if not msg:
2138             msg = "disk not found"
2139           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2140                                    (idx, node, msg))
2141
2142   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2143     for idx, dev in enumerate(self.instance.disks):
2144       if idx not in self.disks:
2145         continue
2146
2147       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2148                       (idx, node_name))
2149
2150       if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2151                                   on_primary, ldisk=ldisk):
2152         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2153                                  " replace disks for instance %s" %
2154                                  (node_name, self.instance.name))
2155
2156   def _CreateNewStorage(self, node_name):
2157     """Create new storage on the primary or secondary node.
2158
2159     This is only used for same-node replaces, not for changing the
2160     secondary node, hence we don't want to modify the existing disk.
2161
2162     """
2163     iv_names = {}
2164
2165     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2166     for idx, dev in enumerate(disks):
2167       if idx not in self.disks:
2168         continue
2169
2170       self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2171
2172       self.cfg.SetDiskID(dev, node_name)
2173
2174       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2175       names = _GenerateUniqueNames(self.lu, lv_names)
2176
2177       (data_disk, meta_disk) = dev.children
2178       vg_data = data_disk.logical_id[0]
2179       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2180                              logical_id=(vg_data, names[0]),
2181                              params=data_disk.params)
2182       vg_meta = meta_disk.logical_id[0]
2183       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2184                              size=constants.DRBD_META_SIZE,
2185                              logical_id=(vg_meta, names[1]),
2186                              params=meta_disk.params)
2187
2188       new_lvs = [lv_data, lv_meta]
2189       old_lvs = [child.Copy() for child in dev.children]
2190       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2191       excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2192
2193       # we pass force_create=True to force the LVM creation
2194       for new_lv in new_lvs:
2195         _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2196                              GetInstanceInfoText(self.instance), False,
2197                              excl_stor)
2198
2199     return iv_names
2200
2201   def _CheckDevices(self, node_name, iv_names):
2202     for name, (dev, _, _) in iv_names.iteritems():
2203       self.cfg.SetDiskID(dev, node_name)
2204
2205       result = _BlockdevFind(self, node_name, dev, self.instance)
2206
2207       msg = result.fail_msg
2208       if msg or not result.payload:
2209         if not msg:
2210           msg = "disk not found"
2211         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2212                                  (name, msg))
2213
2214       if result.payload.is_degraded:
2215         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2216
2217   def _RemoveOldStorage(self, node_name, iv_names):
2218     for name, (_, old_lvs, _) in iv_names.iteritems():
2219       self.lu.LogInfo("Remove logical volumes for %s", name)
2220
2221       for lv in old_lvs:
2222         self.cfg.SetDiskID(lv, node_name)
2223
2224         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2225         if msg:
2226           self.lu.LogWarning("Can't remove old LV: %s", msg,
2227                              hint="remove unused LVs manually")
2228
2229   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2230     """Replace a disk on the primary or secondary for DRBD 8.
2231
2232     The algorithm for replace is quite complicated:
2233
2234       1. for each disk to be replaced:
2235
2236         1. create new LVs on the target node with unique names
2237         1. detach old LVs from the drbd device
2238         1. rename old LVs to name_replaced.<time_t>
2239         1. rename new LVs to old LVs
2240         1. attach the new LVs (with the old names now) to the drbd device
2241
2242       1. wait for sync across all devices
2243
2244       1. for each modified disk:
2245
2246         1. remove old LVs (which have the name name_replaces.<time_t>)
2247
2248     Failures are not very well handled.
2249
2250     """
2251     steps_total = 6
2252
2253     # Step: check device activation
2254     self.lu.LogStep(1, steps_total, "Check device existence")
2255     self._CheckDisksExistence([self.other_node, self.target_node])
2256     self._CheckVolumeGroup([self.target_node, self.other_node])
2257
2258     # Step: check other node consistency
2259     self.lu.LogStep(2, steps_total, "Check peer consistency")
2260     self._CheckDisksConsistency(self.other_node,
2261                                 self.other_node == self.instance.primary_node,
2262                                 False)
2263
2264     # Step: create new storage
2265     self.lu.LogStep(3, steps_total, "Allocate new storage")
2266     iv_names = self._CreateNewStorage(self.target_node)
2267
2268     # Step: for each lv, detach+rename*2+attach
2269     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2270     for dev, old_lvs, new_lvs in iv_names.itervalues():
2271       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2272
2273       result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2274                                                      old_lvs)
2275       result.Raise("Can't detach drbd from local storage on node"
2276                    " %s for device %s" % (self.target_node, dev.iv_name))
2277       #dev.children = []
2278       #cfg.Update(instance)
2279
2280       # ok, we created the new LVs, so now we know we have the needed
2281       # storage; as such, we proceed on the target node to rename
2282       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2283       # using the assumption that logical_id == physical_id (which in
2284       # turn is the unique_id on that node)
2285
2286       # FIXME(iustin): use a better name for the replaced LVs
2287       temp_suffix = int(time.time())
2288       ren_fn = lambda d, suff: (d.physical_id[0],
2289                                 d.physical_id[1] + "_replaced-%s" % suff)
2290
2291       # Build the rename list based on what LVs exist on the node
2292       rename_old_to_new = []
2293       for to_ren in old_lvs:
2294         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2295         if not result.fail_msg and result.payload:
2296           # device exists
2297           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2298
2299       self.lu.LogInfo("Renaming the old LVs on the target node")
2300       result = self.rpc.call_blockdev_rename(self.target_node,
2301                                              rename_old_to_new)
2302       result.Raise("Can't rename old LVs on node %s" % self.target_node)
2303
2304       # Now we rename the new LVs to the old LVs
2305       self.lu.LogInfo("Renaming the new LVs on the target node")
2306       rename_new_to_old = [(new, old.physical_id)
2307                            for old, new in zip(old_lvs, new_lvs)]
2308       result = self.rpc.call_blockdev_rename(self.target_node,
2309                                              rename_new_to_old)
2310       result.Raise("Can't rename new LVs on node %s" % self.target_node)
2311
2312       # Intermediate steps of in memory modifications
2313       for old, new in zip(old_lvs, new_lvs):
2314         new.logical_id = old.logical_id
2315         self.cfg.SetDiskID(new, self.target_node)
2316
2317       # We need to modify old_lvs so that removal later removes the
2318       # right LVs, not the newly added ones; note that old_lvs is a
2319       # copy here
2320       for disk in old_lvs:
2321         disk.logical_id = ren_fn(disk, temp_suffix)
2322         self.cfg.SetDiskID(disk, self.target_node)
2323
2324       # Now that the new lvs have the old name, we can add them to the device
2325       self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2326       result = self.rpc.call_blockdev_addchildren(self.target_node,
2327                                                   (dev, self.instance), new_lvs)
2328       msg = result.fail_msg
2329       if msg:
2330         for new_lv in new_lvs:
2331           msg2 = self.rpc.call_blockdev_remove(self.target_node,
2332                                                new_lv).fail_msg
2333           if msg2:
2334             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2335                                hint=("cleanup manually the unused logical"
2336                                      "volumes"))
2337         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2338
2339     cstep = itertools.count(5)
2340
2341     if self.early_release:
2342       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2343       self._RemoveOldStorage(self.target_node, iv_names)
2344       # TODO: Check if releasing locks early still makes sense
2345       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2346     else:
2347       # Release all resource locks except those used by the instance
2348       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2349                    keep=self.node_secondary_ip.keys())
2350
2351     # Release all node locks while waiting for sync
2352     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2353
2354     # TODO: Can the instance lock be downgraded here? Take the optional disk
2355     # shutdown in the caller into consideration.
2356
2357     # Wait for sync
2358     # This can fail as the old devices are degraded and _WaitForSync
2359     # does a combined result over all disks, so we don't check its return value
2360     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2361     WaitForSync(self.lu, self.instance)
2362
2363     # Check all devices manually
2364     self._CheckDevices(self.instance.primary_node, iv_names)
2365
2366     # Step: remove old storage
2367     if not self.early_release:
2368       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2369       self._RemoveOldStorage(self.target_node, iv_names)
2370
2371   def _ExecDrbd8Secondary(self, feedback_fn):
2372     """Replace the secondary node for DRBD 8.
2373
2374     The algorithm for replace is quite complicated:
2375       - for all disks of the instance:
2376         - create new LVs on the new node with same names
2377         - shutdown the drbd device on the old secondary
2378         - disconnect the drbd network on the primary
2379         - create the drbd device on the new secondary
2380         - network attach the drbd on the primary, using an artifice:
2381           the drbd code for Attach() will connect to the network if it
2382           finds a device which is connected to the good local disks but
2383           not network enabled
2384       - wait for sync across all devices
2385       - remove all disks from the old secondary
2386
2387     Failures are not very well handled.
2388
2389     """
2390     steps_total = 6
2391
2392     pnode = self.instance.primary_node
2393
2394     # Step: check device activation
2395     self.lu.LogStep(1, steps_total, "Check device existence")
2396     self._CheckDisksExistence([self.instance.primary_node])
2397     self._CheckVolumeGroup([self.instance.primary_node])
2398
2399     # Step: check other node consistency
2400     self.lu.LogStep(2, steps_total, "Check peer consistency")
2401     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2402
2403     # Step: create new storage
2404     self.lu.LogStep(3, steps_total, "Allocate new storage")
2405     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2406     excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2407     for idx, dev in enumerate(disks):
2408       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2409                       (self.new_node, idx))
2410       # we pass force_create=True to force LVM creation
2411       for new_lv in dev.children:
2412         _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2413                              True, GetInstanceInfoText(self.instance), False,
2414                              excl_stor)
2415
2416     # Step 4: dbrd minors and drbd setups changes
2417     # after this, we must manually remove the drbd minors on both the
2418     # error and the success paths
2419     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2420     minors = self.cfg.AllocateDRBDMinor([self.new_node
2421                                          for dev in self.instance.disks],
2422                                         self.instance.name)
2423     logging.debug("Allocated minors %r", minors)
2424
2425     iv_names = {}
2426     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2427       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2428                       (self.new_node, idx))
2429       # create new devices on new_node; note that we create two IDs:
2430       # one without port, so the drbd will be activated without
2431       # networking information on the new node at this stage, and one
2432       # with network, for the latter activation in step 4
2433       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2434       if self.instance.primary_node == o_node1:
2435         p_minor = o_minor1
2436       else:
2437         assert self.instance.primary_node == o_node2, "Three-node instance?"
2438         p_minor = o_minor2
2439
2440       new_alone_id = (self.instance.primary_node, self.new_node, None,
2441                       p_minor, new_minor, o_secret)
2442       new_net_id = (self.instance.primary_node, self.new_node, o_port,
2443                     p_minor, new_minor, o_secret)
2444
2445       iv_names[idx] = (dev, dev.children, new_net_id)
2446       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2447                     new_net_id)
2448       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2449                               logical_id=new_alone_id,
2450                               children=dev.children,
2451                               size=dev.size,
2452                               params={})
2453       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2454                                             self.cfg)
2455       try:
2456         CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2457                              anno_new_drbd,
2458                              GetInstanceInfoText(self.instance), False,
2459                              excl_stor)
2460       except errors.GenericError:
2461         self.cfg.ReleaseDRBDMinors(self.instance.name)
2462         raise
2463
2464     # We have new devices, shutdown the drbd on the old secondary
2465     for idx, dev in enumerate(self.instance.disks):
2466       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2467       self.cfg.SetDiskID(dev, self.target_node)
2468       msg = self.rpc.call_blockdev_shutdown(self.target_node,
2469                                             (dev, self.instance)).fail_msg
2470       if msg:
2471         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2472                            "node: %s" % (idx, msg),
2473                            hint=("Please cleanup this device manually as"
2474                                  " soon as possible"))
2475
2476     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2477     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2478                                                self.instance.disks)[pnode]
2479
2480     msg = result.fail_msg
2481     if msg:
2482       # detaches didn't succeed (unlikely)
2483       self.cfg.ReleaseDRBDMinors(self.instance.name)
2484       raise errors.OpExecError("Can't detach the disks from the network on"
2485                                " old node: %s" % (msg,))
2486
2487     # if we managed to detach at least one, we update all the disks of
2488     # the instance to point to the new secondary
2489     self.lu.LogInfo("Updating instance configuration")
2490     for dev, _, new_logical_id in iv_names.itervalues():
2491       dev.logical_id = new_logical_id
2492       self.cfg.SetDiskID(dev, self.instance.primary_node)
2493
2494     self.cfg.Update(self.instance, feedback_fn)
2495
2496     # Release all node locks (the configuration has been updated)
2497     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2498
2499     # and now perform the drbd attach
2500     self.lu.LogInfo("Attaching primary drbds to new secondary"
2501                     " (standalone => connected)")
2502     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2503                                             self.new_node],
2504                                            self.node_secondary_ip,
2505                                            (self.instance.disks, self.instance),
2506                                            self.instance.name,
2507                                            False)
2508     for to_node, to_result in result.items():
2509       msg = to_result.fail_msg
2510       if msg:
2511         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2512                            to_node, msg,
2513                            hint=("please do a gnt-instance info to see the"
2514                                  " status of disks"))
2515
2516     cstep = itertools.count(5)
2517
2518     if self.early_release:
2519       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2520       self._RemoveOldStorage(self.target_node, iv_names)
2521       # TODO: Check if releasing locks early still makes sense
2522       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2523     else:
2524       # Release all resource locks except those used by the instance
2525       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2526                    keep=self.node_secondary_ip.keys())
2527
2528     # TODO: Can the instance lock be downgraded here? Take the optional disk
2529     # shutdown in the caller into consideration.
2530
2531     # Wait for sync
2532     # This can fail as the old devices are degraded and _WaitForSync
2533     # does a combined result over all disks, so we don't check its return value
2534     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2535     WaitForSync(self.lu, self.instance)
2536
2537     # Check all devices manually
2538     self._CheckDevices(self.instance.primary_node, iv_names)
2539
2540     # Step: remove old storage
2541     if not self.early_release:
2542       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2543       self._RemoveOldStorage(self.target_node, iv_names)