New RPC to get size and spindles of disks
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node)
93   result = lu.rpc.call_blockdev_create(node, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device, node, instance.name))
98   if device.physical_id is None:
99     device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103                          info, force_open, excl_stor):
104   """Create a tree of block devices on a given node.
105
106   If this device type has to be created on secondaries, create it and
107   all its children.
108
109   If not, just recurse to children keeping the same 'force' value.
110
111   @attention: The device has to be annotated already.
112
113   @param lu: the lu on whose behalf we execute
114   @param node: the node on which to create the device
115   @type instance: L{objects.Instance}
116   @param instance: the instance which owns the device
117   @type device: L{objects.Disk}
118   @param device: the device to create
119   @type force_create: boolean
120   @param force_create: whether to force creation of this device; this
121       will be change to True whenever we find a device which has
122       CreateOnSecondary() attribute
123   @param info: the extra 'metadata' we should attach to the device
124       (this will be represented as a LVM tag)
125   @type force_open: boolean
126   @param force_open: this parameter will be passes to the
127       L{backend.BlockdevCreate} function where it specifies
128       whether we run on primary or not, and it affects both
129       the child assembly and the device own Open() execution
130   @type excl_stor: boolean
131   @param excl_stor: Whether exclusive_storage is active for the node
132
133   @return: list of created devices
134   """
135   created_devices = []
136   try:
137     if device.CreateOnSecondary():
138       force_create = True
139
140     if device.children:
141       for child in device.children:
142         devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143                                     info, force_open, excl_stor)
144         created_devices.extend(devs)
145
146     if not force_create:
147       return created_devices
148
149     CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150                          excl_stor)
151     # The device has been completely created, so there is no point in keeping
152     # its subdevices in the list. We just add the device itself instead.
153     created_devices = [(node, device)]
154     return created_devices
155
156   except errors.DeviceCreationError, e:
157     e.created_devices.extend(created_devices)
158     raise e
159   except errors.OpExecError, e:
160     raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164   """Whether exclusive_storage is in effect for the given node.
165
166   @type cfg: L{config.ConfigWriter}
167   @param cfg: The cluster configuration
168   @type nodename: string
169   @param nodename: The node
170   @rtype: bool
171   @return: The effective value of exclusive_storage
172   @raise errors.OpPrereqError: if no node exists with the given name
173
174   """
175   ni = cfg.GetNodeInfo(nodename)
176   if ni is None:
177     raise errors.OpPrereqError("Invalid node name %s" % nodename,
178                                errors.ECODE_NOENT)
179   return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
183                     force_open):
184   """Wrapper around L{_CreateBlockDevInner}.
185
186   This method annotates the root device first.
187
188   """
189   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190   excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192                               force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created):
196   """Undo the work performed by L{CreateDisks}.
197
198   This function is called in case of an error to undo the work of
199   L{CreateDisks}.
200
201   @type lu: L{LogicalUnit}
202   @param lu: the logical unit on whose behalf we execute
203   @param disks_created: the result returned by L{CreateDisks}
204
205   """
206   for (node, disk) in disks_created:
207     lu.cfg.SetDiskID(disk, node)
208     result = lu.rpc.call_blockdev_remove(node, disk)
209     if result.fail_msg:
210       logging.warning("Failed to remove newly-created disk %s on node %s:"
211                       " %s", disk, node, result.fail_msg)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215   """Create all disks for an instance.
216
217   This abstracts away some work from AddInstance.
218
219   @type lu: L{LogicalUnit}
220   @param lu: the logical unit on whose behalf we execute
221   @type instance: L{objects.Instance}
222   @param instance: the instance whose disks we should create
223   @type to_skip: list
224   @param to_skip: list of indices to skip
225   @type target_node: string
226   @param target_node: if passed, overrides the target node for creation
227   @type disks: list of {objects.Disk}
228   @param disks: the disks to create; if not specified, all the disks of the
229       instance are created
230   @return: information about the created disks, to be used to call
231       L{_UndoCreateDisks}
232   @raise errors.OpPrereqError: in case of error
233
234   """
235   info = GetInstanceInfoText(instance)
236   if target_node is None:
237     pnode = instance.primary_node
238     all_nodes = instance.all_nodes
239   else:
240     pnode = target_node
241     all_nodes = [pnode]
242
243   if disks is None:
244     disks = instance.disks
245
246   if instance.disk_template in constants.DTS_FILEBASED:
247     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249
250     result.Raise("Failed to create directory '%s' on"
251                  " node %s" % (file_storage_dir, pnode))
252
253   disks_created = []
254   for idx, device in enumerate(disks):
255     if to_skip and idx in to_skip:
256       continue
257     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258     for node in all_nodes:
259       f_create = node == pnode
260       try:
261         _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262         disks_created.append((node, device))
263       except errors.DeviceCreationError, e:
264         logging.warning("Creating disk %s for instance '%s' failed",
265                         idx, instance.name)
266         disks_created.extend(e.created_devices)
267         _UndoCreateDisks(lu, disks_created)
268         raise errors.OpExecError(e.message)
269   return disks_created
270
271
272 def ComputeDiskSizePerVG(disk_template, disks):
273   """Compute disk size requirements in the volume group
274
275   """
276   def _compute(disks, payload):
277     """Universal algorithm.
278
279     """
280     vgs = {}
281     for disk in disks:
282       vgs[disk[constants.IDISK_VG]] = \
283         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284
285     return vgs
286
287   # Required free disk space as a function of disk and swap space
288   req_size_dict = {
289     constants.DT_DISKLESS: {},
290     constants.DT_PLAIN: _compute(disks, 0),
291     # 128 MB are added for drbd metadata for each disk
292     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293     constants.DT_FILE: {},
294     constants.DT_SHARED_FILE: {},
295     }
296
297   if disk_template not in req_size_dict:
298     raise errors.ProgrammerError("Disk template '%s' size requirement"
299                                  " is unknown" % disk_template)
300
301   return req_size_dict[disk_template]
302
303
304 def ComputeDisks(op, default_vg):
305   """Computes the instance disks.
306
307   @param op: The instance opcode
308   @param default_vg: The default_vg to assume
309
310   @return: The computed disks
311
312   """
313   disks = []
314   for disk in op.disks:
315     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316     if mode not in constants.DISK_ACCESS_SET:
317       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318                                  mode, errors.ECODE_INVAL)
319     size = disk.get(constants.IDISK_SIZE, None)
320     if size is None:
321       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322     try:
323       size = int(size)
324     except (TypeError, ValueError):
325       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326                                  errors.ECODE_INVAL)
327
328     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329     if ext_provider and op.disk_template != constants.DT_EXT:
330       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331                                  " disk template, not %s" %
332                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
333                                   op.disk_template), errors.ECODE_INVAL)
334
335     data_vg = disk.get(constants.IDISK_VG, default_vg)
336     name = disk.get(constants.IDISK_NAME, None)
337     if name is not None and name.lower() == constants.VALUE_NONE:
338       name = None
339     new_disk = {
340       constants.IDISK_SIZE: size,
341       constants.IDISK_MODE: mode,
342       constants.IDISK_VG: data_vg,
343       constants.IDISK_NAME: name,
344       }
345
346     for key in [
347       constants.IDISK_METAVG,
348       constants.IDISK_ADOPT,
349       constants.IDISK_SPINDLES,
350       ]:
351       if key in disk:
352         new_disk[key] = disk[key]
353
354     # For extstorage, demand the `provider' option and add any
355     # additional parameters (ext-params) to the dict
356     if op.disk_template == constants.DT_EXT:
357       if ext_provider:
358         new_disk[constants.IDISK_PROVIDER] = ext_provider
359         for key in disk:
360           if key not in constants.IDISK_PARAMS:
361             new_disk[key] = disk[key]
362       else:
363         raise errors.OpPrereqError("Missing provider for template '%s'" %
364                                    constants.DT_EXT, errors.ECODE_INVAL)
365
366     disks.append(new_disk)
367
368   return disks
369
370
371 def CheckRADOSFreeSpace():
372   """Compute disk size requirements inside the RADOS cluster.
373
374   """
375   # For the RADOS cluster we assume there is always enough space.
376   pass
377
378
379 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
380                          iv_name, p_minor, s_minor):
381   """Generate a drbd8 device complete with its children.
382
383   """
384   assert len(vgnames) == len(names) == 2
385   port = lu.cfg.AllocatePort()
386   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
387
388   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
389                           logical_id=(vgnames[0], names[0]),
390                           params={})
391   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
392   dev_meta = objects.Disk(dev_type=constants.LD_LV,
393                           size=constants.DRBD_META_SIZE,
394                           logical_id=(vgnames[1], names[1]),
395                           params={})
396   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
398                           logical_id=(primary, secondary, port,
399                                       p_minor, s_minor,
400                                       shared_secret),
401                           children=[dev_data, dev_meta],
402                           iv_name=iv_name, params={})
403   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
404   return drbd_dev
405
406
407 def GenerateDiskTemplate(
408   lu, template_name, instance_name, primary_node, secondary_nodes,
409   disk_info, file_storage_dir, file_driver, base_index,
410   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
411   _req_shr_file_storage=opcodes.RequireSharedFileStorage):
412   """Generate the entire disk layout for a given template type.
413
414   """
415   vgname = lu.cfg.GetVGName()
416   disk_count = len(disk_info)
417   disks = []
418
419   if template_name == constants.DT_DISKLESS:
420     pass
421   elif template_name == constants.DT_DRBD8:
422     if len(secondary_nodes) != 1:
423       raise errors.ProgrammerError("Wrong template configuration")
424     remote_node = secondary_nodes[0]
425     minors = lu.cfg.AllocateDRBDMinor(
426       [primary_node, remote_node] * len(disk_info), instance_name)
427
428     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
429                                                        full_disk_params)
430     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
431
432     names = []
433     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
434                                                for i in range(disk_count)]):
435       names.append(lv_prefix + "_data")
436       names.append(lv_prefix + "_meta")
437     for idx, disk in enumerate(disk_info):
438       disk_index = idx + base_index
439       data_vg = disk.get(constants.IDISK_VG, vgname)
440       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
441       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
442                                       disk[constants.IDISK_SIZE],
443                                       [data_vg, meta_vg],
444                                       names[idx * 2:idx * 2 + 2],
445                                       "disk/%d" % disk_index,
446                                       minors[idx * 2], minors[idx * 2 + 1])
447       disk_dev.mode = disk[constants.IDISK_MODE]
448       disk_dev.name = disk.get(constants.IDISK_NAME, None)
449       disks.append(disk_dev)
450   else:
451     if secondary_nodes:
452       raise errors.ProgrammerError("Wrong template configuration")
453
454     if template_name == constants.DT_FILE:
455       _req_file_storage()
456     elif template_name == constants.DT_SHARED_FILE:
457       _req_shr_file_storage()
458
459     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460     if name_prefix is None:
461       names = None
462     else:
463       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464                                         (name_prefix, base_index + i)
465                                         for i in range(disk_count)])
466
467     if template_name == constants.DT_PLAIN:
468
469       def logical_id_fn(idx, _, disk):
470         vg = disk.get(constants.IDISK_VG, vgname)
471         return (vg, names[idx])
472
473     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474       logical_id_fn = \
475         lambda _, disk_index, disk: (file_driver,
476                                      "%s/disk%d" % (file_storage_dir,
477                                                     disk_index))
478     elif template_name == constants.DT_BLOCK:
479       logical_id_fn = \
480         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481                                        disk[constants.IDISK_ADOPT])
482     elif template_name == constants.DT_RBD:
483       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484     elif template_name == constants.DT_EXT:
485       def logical_id_fn(idx, _, disk):
486         provider = disk.get(constants.IDISK_PROVIDER, None)
487         if provider is None:
488           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489                                        " not found", constants.DT_EXT,
490                                        constants.IDISK_PROVIDER)
491         return (provider, names[idx])
492     else:
493       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494
495     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
496
497     for idx, disk in enumerate(disk_info):
498       params = {}
499       # Only for the Ext template add disk_info to params
500       if template_name == constants.DT_EXT:
501         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502         for key in disk:
503           if key not in constants.IDISK_PARAMS:
504             params[key] = disk[key]
505       disk_index = idx + base_index
506       size = disk[constants.IDISK_SIZE]
507       feedback_fn("* disk %s, size %s" %
508                   (disk_index, utils.FormatUnit(size, "h")))
509       disk_dev = objects.Disk(dev_type=dev_type, size=size,
510                               logical_id=logical_id_fn(idx, disk_index, disk),
511                               iv_name="disk/%d" % disk_index,
512                               mode=disk[constants.IDISK_MODE],
513                               params=params,
514                               spindles=disk.get(constants.IDISK_SPINDLES))
515       disk_dev.name = disk.get(constants.IDISK_NAME, None)
516       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517       disks.append(disk_dev)
518
519   return disks
520
521
522 def CheckSpindlesExclusiveStorage(diskdict, es_flag):
523   """Check the presence of the spindle options with exclusive_storage.
524
525   @type diskdict: dict
526   @param diskdict: disk parameters
527   @type es_flag: bool
528   @param es_flag: the effective value of the exlusive_storage flag
529   @raise errors.OpPrereqError when spindles are given and they should not
530
531   """
532   if (not es_flag and constants.IDISK_SPINDLES in diskdict and
533       diskdict[constants.IDISK_SPINDLES] is not None):
534     raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
535                                " when exclusive storage is not active",
536                                errors.ECODE_INVAL)
537
538
539 class LUInstanceRecreateDisks(LogicalUnit):
540   """Recreate an instance's missing disks.
541
542   """
543   HPATH = "instance-recreate-disks"
544   HTYPE = constants.HTYPE_INSTANCE
545   REQ_BGL = False
546
547   _MODIFYABLE = compat.UniqueFrozenset([
548     constants.IDISK_SIZE,
549     constants.IDISK_MODE,
550     constants.IDISK_SPINDLES,
551     ])
552
553   # New or changed disk parameters may have different semantics
554   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555     constants.IDISK_ADOPT,
556
557     # TODO: Implement support changing VG while recreating
558     constants.IDISK_VG,
559     constants.IDISK_METAVG,
560     constants.IDISK_PROVIDER,
561     constants.IDISK_NAME,
562     ]))
563
564   def _RunAllocator(self):
565     """Run the allocator based on input opcode.
566
567     """
568     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569
570     # FIXME
571     # The allocator should actually run in "relocate" mode, but current
572     # allocators don't support relocating all the nodes of an instance at
573     # the same time. As a workaround we use "allocate" mode, but this is
574     # suboptimal for two reasons:
575     # - The instance name passed to the allocator is present in the list of
576     #   existing instances, so there could be a conflict within the
577     #   internal structures of the allocator. This doesn't happen with the
578     #   current allocators, but it's a liability.
579     # - The allocator counts the resources used by the instance twice: once
580     #   because the instance exists already, and once because it tries to
581     #   allocate a new instance.
582     # The allocator could choose some of the nodes on which the instance is
583     # running, but that's not a problem. If the instance nodes are broken,
584     # they should be already be marked as drained or offline, and hence
585     # skipped by the allocator. If instance disks have been lost for other
586     # reasons, then recreating the disks on the same nodes should be fine.
587     disk_template = self.instance.disk_template
588     spindle_use = be_full[constants.BE_SPINDLE_USE]
589     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
590                                         disk_template=disk_template,
591                                         tags=list(self.instance.GetTags()),
592                                         os=self.instance.os,
593                                         nics=[{}],
594                                         vcpus=be_full[constants.BE_VCPUS],
595                                         memory=be_full[constants.BE_MAXMEM],
596                                         spindle_use=spindle_use,
597                                         disks=[{constants.IDISK_SIZE: d.size,
598                                                 constants.IDISK_MODE: d.mode}
599                                                for d in self.instance.disks],
600                                         hypervisor=self.instance.hypervisor,
601                                         node_whitelist=None)
602     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
603
604     ial.Run(self.op.iallocator)
605
606     assert req.RequiredNodes() == len(self.instance.all_nodes)
607
608     if not ial.success:
609       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
610                                  " %s" % (self.op.iallocator, ial.info),
611                                  errors.ECODE_NORES)
612
613     self.op.nodes = ial.result
614     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
615                  self.op.instance_name, self.op.iallocator,
616                  utils.CommaJoin(ial.result))
617
618   def CheckArguments(self):
619     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
620       # Normalize and convert deprecated list of disk indices
621       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
622
623     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
624     if duplicates:
625       raise errors.OpPrereqError("Some disks have been specified more than"
626                                  " once: %s" % utils.CommaJoin(duplicates),
627                                  errors.ECODE_INVAL)
628
629     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
630     # when neither iallocator nor nodes are specified
631     if self.op.iallocator or self.op.nodes:
632       CheckIAllocatorOrNode(self, "iallocator", "nodes")
633
634     for (idx, params) in self.op.disks:
635       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
636       unsupported = frozenset(params.keys()) - self._MODIFYABLE
637       if unsupported:
638         raise errors.OpPrereqError("Parameters for disk %s try to change"
639                                    " unmodifyable parameter(s): %s" %
640                                    (idx, utils.CommaJoin(unsupported)),
641                                    errors.ECODE_INVAL)
642
643   def ExpandNames(self):
644     self._ExpandAndLockInstance()
645     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
646
647     if self.op.nodes:
648       self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
649       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
650     else:
651       self.needed_locks[locking.LEVEL_NODE] = []
652       if self.op.iallocator:
653         # iallocator will select a new node in the same group
654         self.needed_locks[locking.LEVEL_NODEGROUP] = []
655         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
656
657     self.needed_locks[locking.LEVEL_NODE_RES] = []
658
659   def DeclareLocks(self, level):
660     if level == locking.LEVEL_NODEGROUP:
661       assert self.op.iallocator is not None
662       assert not self.op.nodes
663       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
664       self.share_locks[locking.LEVEL_NODEGROUP] = 1
665       # Lock the primary group used by the instance optimistically; this
666       # requires going via the node before it's locked, requiring
667       # verification later on
668       self.needed_locks[locking.LEVEL_NODEGROUP] = \
669         self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
670
671     elif level == locking.LEVEL_NODE:
672       # If an allocator is used, then we lock all the nodes in the current
673       # instance group, as we don't know yet which ones will be selected;
674       # if we replace the nodes without using an allocator, locks are
675       # already declared in ExpandNames; otherwise, we need to lock all the
676       # instance nodes for disk re-creation
677       if self.op.iallocator:
678         assert not self.op.nodes
679         assert not self.needed_locks[locking.LEVEL_NODE]
680         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
681
682         # Lock member nodes of the group of the primary node
683         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
684           self.needed_locks[locking.LEVEL_NODE].extend(
685             self.cfg.GetNodeGroup(group_uuid).members)
686
687         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
688       elif not self.op.nodes:
689         self._LockInstancesNodes(primary_only=False)
690     elif level == locking.LEVEL_NODE_RES:
691       # Copy node locks
692       self.needed_locks[locking.LEVEL_NODE_RES] = \
693         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
694
695   def BuildHooksEnv(self):
696     """Build hooks env.
697
698     This runs on master, primary and secondary nodes of the instance.
699
700     """
701     return BuildInstanceHookEnvByObject(self, self.instance)
702
703   def BuildHooksNodes(self):
704     """Build hooks nodes.
705
706     """
707     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
708     return (nl, nl)
709
710   def CheckPrereq(self):
711     """Check prerequisites.
712
713     This checks that the instance is in the cluster and is not running.
714
715     """
716     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
717     assert instance is not None, \
718       "Cannot retrieve locked instance %s" % self.op.instance_name
719     if self.op.nodes:
720       if len(self.op.nodes) != len(instance.all_nodes):
721         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
722                                    " %d replacement nodes were specified" %
723                                    (instance.name, len(instance.all_nodes),
724                                     len(self.op.nodes)),
725                                    errors.ECODE_INVAL)
726       assert instance.disk_template != constants.DT_DRBD8 or \
727              len(self.op.nodes) == 2
728       assert instance.disk_template != constants.DT_PLAIN or \
729              len(self.op.nodes) == 1
730       primary_node = self.op.nodes[0]
731     else:
732       primary_node = instance.primary_node
733     if not self.op.iallocator:
734       CheckNodeOnline(self, primary_node)
735
736     if instance.disk_template == constants.DT_DISKLESS:
737       raise errors.OpPrereqError("Instance '%s' has no disks" %
738                                  self.op.instance_name, errors.ECODE_INVAL)
739
740     # Verify if node group locks are still correct
741     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
742     if owned_groups:
743       # Node group locks are acquired only for the primary node (and only
744       # when the allocator is used)
745       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
746                               primary_only=True)
747
748     # if we replace nodes *and* the old primary is offline, we don't
749     # check the instance state
750     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
751     if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
752       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
753                          msg="cannot recreate disks")
754
755     if self.op.disks:
756       self.disks = dict(self.op.disks)
757     else:
758       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
759
760     maxidx = max(self.disks.keys())
761     if maxidx >= len(instance.disks):
762       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
763                                  errors.ECODE_INVAL)
764
765     if ((self.op.nodes or self.op.iallocator) and
766          sorted(self.disks.keys()) != range(len(instance.disks))):
767       raise errors.OpPrereqError("Can't recreate disks partially and"
768                                  " change the nodes at the same time",
769                                  errors.ECODE_INVAL)
770
771     self.instance = instance
772
773     if self.op.iallocator:
774       self._RunAllocator()
775       # Release unneeded node and node resource locks
776       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
777       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
778       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
779
780     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
781
782     if self.op.nodes:
783       nodes = self.op.nodes
784     else:
785       nodes = instance.all_nodes
786     excl_stor = compat.any(
787       rpc.GetExclusiveStorageForNodeNames(self.cfg, nodes).values()
788       )
789     for new_params in self.disks.values():
790       CheckSpindlesExclusiveStorage(new_params, excl_stor)
791
792   def Exec(self, feedback_fn):
793     """Recreate the disks.
794
795     """
796     instance = self.instance
797
798     assert (self.owned_locks(locking.LEVEL_NODE) ==
799             self.owned_locks(locking.LEVEL_NODE_RES))
800
801     to_skip = []
802     mods = [] # keeps track of needed changes
803
804     for idx, disk in enumerate(instance.disks):
805       try:
806         changes = self.disks[idx]
807       except KeyError:
808         # Disk should not be recreated
809         to_skip.append(idx)
810         continue
811
812       # update secondaries for disks, if needed
813       if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
814         # need to update the nodes and minors
815         assert len(self.op.nodes) == 2
816         assert len(disk.logical_id) == 6 # otherwise disk internals
817                                          # have changed
818         (_, _, old_port, _, _, old_secret) = disk.logical_id
819         new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
820         new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
821                   new_minors[0], new_minors[1], old_secret)
822         assert len(disk.logical_id) == len(new_id)
823       else:
824         new_id = None
825
826       mods.append((idx, new_id, changes))
827
828     # now that we have passed all asserts above, we can apply the mods
829     # in a single run (to avoid partial changes)
830     for idx, new_id, changes in mods:
831       disk = instance.disks[idx]
832       if new_id is not None:
833         assert disk.dev_type == constants.LD_DRBD8
834         disk.logical_id = new_id
835       if changes:
836         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
837                     mode=changes.get(constants.IDISK_MODE, None),
838                     spindles=changes.get(constants.IDISK_SPINDLES, None))
839
840     # change primary node, if needed
841     if self.op.nodes:
842       instance.primary_node = self.op.nodes[0]
843       self.LogWarning("Changing the instance's nodes, you will have to"
844                       " remove any disks left on the older nodes manually")
845
846     if self.op.nodes:
847       self.cfg.Update(instance, feedback_fn)
848
849     # All touched nodes must be locked
850     mylocks = self.owned_locks(locking.LEVEL_NODE)
851     assert mylocks.issuperset(frozenset(instance.all_nodes))
852     new_disks = CreateDisks(self, instance, to_skip=to_skip)
853
854     # TODO: Release node locks before wiping, or explain why it's not possible
855     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
856       wipedisks = [(idx, disk, 0)
857                    for (idx, disk) in enumerate(instance.disks)
858                    if idx not in to_skip]
859       WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
860
861
862 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
863   """Checks if nodes have enough free disk space in the specified VG.
864
865   This function checks if all given nodes have the needed amount of
866   free disk. In case any node has less disk or we cannot get the
867   information from the node, this function raises an OpPrereqError
868   exception.
869
870   @type lu: C{LogicalUnit}
871   @param lu: a logical unit from which we get configuration data
872   @type nodenames: C{list}
873   @param nodenames: the list of node names to check
874   @type vg: C{str}
875   @param vg: the volume group to check
876   @type requested: C{int}
877   @param requested: the amount of disk in MiB to check for
878   @raise errors.OpPrereqError: if the node doesn't have enough disk,
879       or we cannot check the node
880
881   """
882   es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
883   # FIXME: This maps everything to storage type 'lvm-vg' to maintain
884   # the current functionality. Refactor to make it more flexible.
885   nodeinfo = lu.rpc.call_node_info(nodenames, [(constants.ST_LVM_VG, vg)], None,
886                                    es_flags)
887   for node in nodenames:
888     info = nodeinfo[node]
889     info.Raise("Cannot get current information from node %s" % node,
890                prereq=True, ecode=errors.ECODE_ENVIRON)
891     (_, (vg_info, ), _) = info.payload
892     vg_free = vg_info.get("vg_free", None)
893     if not isinstance(vg_free, int):
894       raise errors.OpPrereqError("Can't compute free disk space on node"
895                                  " %s for vg %s, result was '%s'" %
896                                  (node, vg, vg_free), errors.ECODE_ENVIRON)
897     if requested > vg_free:
898       raise errors.OpPrereqError("Not enough disk space on target node %s"
899                                  " vg %s: required %d MiB, available %d MiB" %
900                                  (node, vg, requested, vg_free),
901                                  errors.ECODE_NORES)
902
903
904 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
905   """Checks if nodes have enough free disk space in all the VGs.
906
907   This function checks if all given nodes have the needed amount of
908   free disk. In case any node has less disk or we cannot get the
909   information from the node, this function raises an OpPrereqError
910   exception.
911
912   @type lu: C{LogicalUnit}
913   @param lu: a logical unit from which we get configuration data
914   @type nodenames: C{list}
915   @param nodenames: the list of node names to check
916   @type req_sizes: C{dict}
917   @param req_sizes: the hash of vg and corresponding amount of disk in
918       MiB to check for
919   @raise errors.OpPrereqError: if the node doesn't have enough disk,
920       or we cannot check the node
921
922   """
923   for vg, req_size in req_sizes.items():
924     _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
925
926
927 def _DiskSizeInBytesToMebibytes(lu, size):
928   """Converts a disk size in bytes to mebibytes.
929
930   Warns and rounds up if the size isn't an even multiple of 1 MiB.
931
932   """
933   (mib, remainder) = divmod(size, 1024 * 1024)
934
935   if remainder != 0:
936     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
937                   " to not overwrite existing data (%s bytes will not be"
938                   " wiped)", (1024 * 1024) - remainder)
939     mib += 1
940
941   return mib
942
943
944 def _CalcEta(time_taken, written, total_size):
945   """Calculates the ETA based on size written and total size.
946
947   @param time_taken: The time taken so far
948   @param written: amount written so far
949   @param total_size: The total size of data to be written
950   @return: The remaining time in seconds
951
952   """
953   avg_time = time_taken / float(written)
954   return (total_size - written) * avg_time
955
956
957 def WipeDisks(lu, instance, disks=None):
958   """Wipes instance disks.
959
960   @type lu: L{LogicalUnit}
961   @param lu: the logical unit on whose behalf we execute
962   @type instance: L{objects.Instance}
963   @param instance: the instance whose disks we should create
964   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
965   @param disks: Disk details; tuple contains disk index, disk object and the
966     start offset
967
968   """
969   node = instance.primary_node
970
971   if disks is None:
972     disks = [(idx, disk, 0)
973              for (idx, disk) in enumerate(instance.disks)]
974
975   for (_, device, _) in disks:
976     lu.cfg.SetDiskID(device, node)
977
978   logging.info("Pausing synchronization of disks of instance '%s'",
979                instance.name)
980   result = lu.rpc.call_blockdev_pause_resume_sync(node,
981                                                   (map(compat.snd, disks),
982                                                    instance),
983                                                   True)
984   result.Raise("Failed to pause disk synchronization on node '%s'" % node)
985
986   for idx, success in enumerate(result.payload):
987     if not success:
988       logging.warn("Pausing synchronization of disk %s of instance '%s'"
989                    " failed", idx, instance.name)
990
991   try:
992     for (idx, device, offset) in disks:
993       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
994       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
995       wipe_chunk_size = \
996         int(min(constants.MAX_WIPE_CHUNK,
997                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
998
999       size = device.size
1000       last_output = 0
1001       start_time = time.time()
1002
1003       if offset == 0:
1004         info_text = ""
1005       else:
1006         info_text = (" (from %s to %s)" %
1007                      (utils.FormatUnit(offset, "h"),
1008                       utils.FormatUnit(size, "h")))
1009
1010       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1011
1012       logging.info("Wiping disk %d for instance %s on node %s using"
1013                    " chunk size %s", idx, instance.name, node, wipe_chunk_size)
1014
1015       while offset < size:
1016         wipe_size = min(wipe_chunk_size, size - offset)
1017
1018         logging.debug("Wiping disk %d, offset %s, chunk %s",
1019                       idx, offset, wipe_size)
1020
1021         result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
1022                                            wipe_size)
1023         result.Raise("Could not wipe disk %d at offset %d for size %d" %
1024                      (idx, offset, wipe_size))
1025
1026         now = time.time()
1027         offset += wipe_size
1028         if now - last_output >= 60:
1029           eta = _CalcEta(now - start_time, offset, size)
1030           lu.LogInfo(" - done: %.1f%% ETA: %s",
1031                      offset / float(size) * 100, utils.FormatSeconds(eta))
1032           last_output = now
1033   finally:
1034     logging.info("Resuming synchronization of disks for instance '%s'",
1035                  instance.name)
1036
1037     result = lu.rpc.call_blockdev_pause_resume_sync(node,
1038                                                     (map(compat.snd, disks),
1039                                                      instance),
1040                                                     False)
1041
1042     if result.fail_msg:
1043       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1044                     node, result.fail_msg)
1045     else:
1046       for idx, success in enumerate(result.payload):
1047         if not success:
1048           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1049                         " failed", idx, instance.name)
1050
1051
1052 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1053   """Wrapper for L{WipeDisks} that handles errors.
1054
1055   @type lu: L{LogicalUnit}
1056   @param lu: the logical unit on whose behalf we execute
1057   @type instance: L{objects.Instance}
1058   @param instance: the instance whose disks we should wipe
1059   @param disks: see L{WipeDisks}
1060   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1061       case of error
1062   @raise errors.OpPrereqError: in case of failure
1063
1064   """
1065   try:
1066     WipeDisks(lu, instance, disks=disks)
1067   except errors.OpExecError:
1068     logging.warning("Wiping disks for instance '%s' failed",
1069                     instance.name)
1070     _UndoCreateDisks(lu, cleanup)
1071     raise
1072
1073
1074 def ExpandCheckDisks(instance, disks):
1075   """Return the instance disks selected by the disks list
1076
1077   @type disks: list of L{objects.Disk} or None
1078   @param disks: selected disks
1079   @rtype: list of L{objects.Disk}
1080   @return: selected instance disks to act on
1081
1082   """
1083   if disks is None:
1084     return instance.disks
1085   else:
1086     if not set(disks).issubset(instance.disks):
1087       raise errors.ProgrammerError("Can only act on disks belonging to the"
1088                                    " target instance: expected a subset of %r,"
1089                                    " got %r" % (instance.disks, disks))
1090     return disks
1091
1092
1093 def WaitForSync(lu, instance, disks=None, oneshot=False):
1094   """Sleep and poll for an instance's disk to sync.
1095
1096   """
1097   if not instance.disks or disks is not None and not disks:
1098     return True
1099
1100   disks = ExpandCheckDisks(instance, disks)
1101
1102   if not oneshot:
1103     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1104
1105   node = instance.primary_node
1106
1107   for dev in disks:
1108     lu.cfg.SetDiskID(dev, node)
1109
1110   # TODO: Convert to utils.Retry
1111
1112   retries = 0
1113   degr_retries = 10 # in seconds, as we sleep 1 second each time
1114   while True:
1115     max_time = 0
1116     done = True
1117     cumul_degraded = False
1118     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1119     msg = rstats.fail_msg
1120     if msg:
1121       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1122       retries += 1
1123       if retries >= 10:
1124         raise errors.RemoteError("Can't contact node %s for mirror data,"
1125                                  " aborting." % node)
1126       time.sleep(6)
1127       continue
1128     rstats = rstats.payload
1129     retries = 0
1130     for i, mstat in enumerate(rstats):
1131       if mstat is None:
1132         lu.LogWarning("Can't compute data for node %s/%s",
1133                       node, disks[i].iv_name)
1134         continue
1135
1136       cumul_degraded = (cumul_degraded or
1137                         (mstat.is_degraded and mstat.sync_percent is None))
1138       if mstat.sync_percent is not None:
1139         done = False
1140         if mstat.estimated_time is not None:
1141           rem_time = ("%s remaining (estimated)" %
1142                       utils.FormatSeconds(mstat.estimated_time))
1143           max_time = mstat.estimated_time
1144         else:
1145           rem_time = "no time estimate"
1146         lu.LogInfo("- device %s: %5.2f%% done, %s",
1147                    disks[i].iv_name, mstat.sync_percent, rem_time)
1148
1149     # if we're done but degraded, let's do a few small retries, to
1150     # make sure we see a stable and not transient situation; therefore
1151     # we force restart of the loop
1152     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1153       logging.info("Degraded disks found, %d retries left", degr_retries)
1154       degr_retries -= 1
1155       time.sleep(1)
1156       continue
1157
1158     if done or oneshot:
1159       break
1160
1161     time.sleep(min(60, max_time))
1162
1163   if done:
1164     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1165
1166   return not cumul_degraded
1167
1168
1169 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1170   """Shutdown block devices of an instance.
1171
1172   This does the shutdown on all nodes of the instance.
1173
1174   If the ignore_primary is false, errors on the primary node are
1175   ignored.
1176
1177   """
1178   all_result = True
1179   disks = ExpandCheckDisks(instance, disks)
1180
1181   for disk in disks:
1182     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1183       lu.cfg.SetDiskID(top_disk, node)
1184       result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1185       msg = result.fail_msg
1186       if msg:
1187         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1188                       disk.iv_name, node, msg)
1189         if ((node == instance.primary_node and not ignore_primary) or
1190             (node != instance.primary_node and not result.offline)):
1191           all_result = False
1192   return all_result
1193
1194
1195 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1196   """Shutdown block devices of an instance.
1197
1198   This function checks if an instance is running, before calling
1199   _ShutdownInstanceDisks.
1200
1201   """
1202   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1203   ShutdownInstanceDisks(lu, instance, disks=disks)
1204
1205
1206 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1207                            ignore_size=False):
1208   """Prepare the block devices for an instance.
1209
1210   This sets up the block devices on all nodes.
1211
1212   @type lu: L{LogicalUnit}
1213   @param lu: the logical unit on whose behalf we execute
1214   @type instance: L{objects.Instance}
1215   @param instance: the instance for whose disks we assemble
1216   @type disks: list of L{objects.Disk} or None
1217   @param disks: which disks to assemble (or all, if None)
1218   @type ignore_secondaries: boolean
1219   @param ignore_secondaries: if true, errors on secondary nodes
1220       won't result in an error return from the function
1221   @type ignore_size: boolean
1222   @param ignore_size: if true, the current known size of the disk
1223       will not be used during the disk activation, useful for cases
1224       when the size is wrong
1225   @return: False if the operation failed, otherwise a list of
1226       (host, instance_visible_name, node_visible_name)
1227       with the mapping from node devices to instance devices
1228
1229   """
1230   device_info = []
1231   disks_ok = True
1232   iname = instance.name
1233   disks = ExpandCheckDisks(instance, disks)
1234
1235   # With the two passes mechanism we try to reduce the window of
1236   # opportunity for the race condition of switching DRBD to primary
1237   # before handshaking occured, but we do not eliminate it
1238
1239   # The proper fix would be to wait (with some limits) until the
1240   # connection has been made and drbd transitions from WFConnection
1241   # into any other network-connected state (Connected, SyncTarget,
1242   # SyncSource, etc.)
1243
1244   # 1st pass, assemble on all nodes in secondary mode
1245   for idx, inst_disk in enumerate(disks):
1246     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1247       if ignore_size:
1248         node_disk = node_disk.Copy()
1249         node_disk.UnsetSize()
1250       lu.cfg.SetDiskID(node_disk, node)
1251       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1252                                              False, idx)
1253       msg = result.fail_msg
1254       if msg:
1255         is_offline_secondary = (node in instance.secondary_nodes and
1256                                 result.offline)
1257         lu.LogWarning("Could not prepare block device %s on node %s"
1258                       " (is_primary=False, pass=1): %s",
1259                       inst_disk.iv_name, node, msg)
1260         if not (ignore_secondaries or is_offline_secondary):
1261           disks_ok = False
1262
1263   # FIXME: race condition on drbd migration to primary
1264
1265   # 2nd pass, do only the primary node
1266   for idx, inst_disk in enumerate(disks):
1267     dev_path = None
1268
1269     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1270       if node != instance.primary_node:
1271         continue
1272       if ignore_size:
1273         node_disk = node_disk.Copy()
1274         node_disk.UnsetSize()
1275       lu.cfg.SetDiskID(node_disk, node)
1276       result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1277                                              True, idx)
1278       msg = result.fail_msg
1279       if msg:
1280         lu.LogWarning("Could not prepare block device %s on node %s"
1281                       " (is_primary=True, pass=2): %s",
1282                       inst_disk.iv_name, node, msg)
1283         disks_ok = False
1284       else:
1285         dev_path = result.payload
1286
1287     device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1288
1289   # leave the disks configured for the primary node
1290   # this is a workaround that would be fixed better by
1291   # improving the logical/physical id handling
1292   for disk in disks:
1293     lu.cfg.SetDiskID(disk, instance.primary_node)
1294
1295   return disks_ok, device_info
1296
1297
1298 def StartInstanceDisks(lu, instance, force):
1299   """Start the disks of an instance.
1300
1301   """
1302   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1303                                       ignore_secondaries=force)
1304   if not disks_ok:
1305     ShutdownInstanceDisks(lu, instance)
1306     if force is not None and not force:
1307       lu.LogWarning("",
1308                     hint=("If the message above refers to a secondary node,"
1309                           " you can retry the operation using '--force'"))
1310     raise errors.OpExecError("Disk consistency error")
1311
1312
1313 class LUInstanceGrowDisk(LogicalUnit):
1314   """Grow a disk of an instance.
1315
1316   """
1317   HPATH = "disk-grow"
1318   HTYPE = constants.HTYPE_INSTANCE
1319   REQ_BGL = False
1320
1321   def ExpandNames(self):
1322     self._ExpandAndLockInstance()
1323     self.needed_locks[locking.LEVEL_NODE] = []
1324     self.needed_locks[locking.LEVEL_NODE_RES] = []
1325     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1326     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1327
1328   def DeclareLocks(self, level):
1329     if level == locking.LEVEL_NODE:
1330       self._LockInstancesNodes()
1331     elif level == locking.LEVEL_NODE_RES:
1332       # Copy node locks
1333       self.needed_locks[locking.LEVEL_NODE_RES] = \
1334         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1335
1336   def BuildHooksEnv(self):
1337     """Build hooks env.
1338
1339     This runs on the master, the primary and all the secondaries.
1340
1341     """
1342     env = {
1343       "DISK": self.op.disk,
1344       "AMOUNT": self.op.amount,
1345       "ABSOLUTE": self.op.absolute,
1346       }
1347     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1348     return env
1349
1350   def BuildHooksNodes(self):
1351     """Build hooks nodes.
1352
1353     """
1354     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1355     return (nl, nl)
1356
1357   def CheckPrereq(self):
1358     """Check prerequisites.
1359
1360     This checks that the instance is in the cluster.
1361
1362     """
1363     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1364     assert instance is not None, \
1365       "Cannot retrieve locked instance %s" % self.op.instance_name
1366     nodenames = list(instance.all_nodes)
1367     for node in nodenames:
1368       CheckNodeOnline(self, node)
1369
1370     self.instance = instance
1371
1372     if instance.disk_template not in constants.DTS_GROWABLE:
1373       raise errors.OpPrereqError("Instance's disk layout does not support"
1374                                  " growing", errors.ECODE_INVAL)
1375
1376     self.disk = instance.FindDisk(self.op.disk)
1377
1378     if self.op.absolute:
1379       self.target = self.op.amount
1380       self.delta = self.target - self.disk.size
1381       if self.delta < 0:
1382         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1383                                    "current disk size (%s)" %
1384                                    (utils.FormatUnit(self.target, "h"),
1385                                     utils.FormatUnit(self.disk.size, "h")),
1386                                    errors.ECODE_STATE)
1387     else:
1388       self.delta = self.op.amount
1389       self.target = self.disk.size + self.delta
1390       if self.delta < 0:
1391         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1392                                    utils.FormatUnit(self.delta, "h"),
1393                                    errors.ECODE_INVAL)
1394
1395     self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1396
1397   def _CheckDiskSpace(self, nodenames, req_vgspace):
1398     template = self.instance.disk_template
1399     if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1400       # TODO: check the free disk space for file, when that feature will be
1401       # supported
1402       nodes = map(self.cfg.GetNodeInfo, nodenames)
1403       es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1404                         nodes)
1405       if es_nodes:
1406         # With exclusive storage we need to something smarter than just looking
1407         # at free space; for now, let's simply abort the operation.
1408         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1409                                    " is enabled", errors.ECODE_STATE)
1410       CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1411
1412   def Exec(self, feedback_fn):
1413     """Execute disk grow.
1414
1415     """
1416     instance = self.instance
1417     disk = self.disk
1418
1419     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1420     assert (self.owned_locks(locking.LEVEL_NODE) ==
1421             self.owned_locks(locking.LEVEL_NODE_RES))
1422
1423     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1424
1425     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1426     if not disks_ok:
1427       raise errors.OpExecError("Cannot activate block device to grow")
1428
1429     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1430                 (self.op.disk, instance.name,
1431                  utils.FormatUnit(self.delta, "h"),
1432                  utils.FormatUnit(self.target, "h")))
1433
1434     # First run all grow ops in dry-run mode
1435     for node in instance.all_nodes:
1436       self.cfg.SetDiskID(disk, node)
1437       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1438                                            True, True)
1439       result.Raise("Dry-run grow request failed to node %s" % node)
1440
1441     if wipe_disks:
1442       # Get disk size from primary node for wiping
1443       result = self.rpc.call_blockdev_getdimensions(instance.primary_node,
1444                                                     [disk])
1445       result.Raise("Failed to retrieve disk size from node '%s'" %
1446                    instance.primary_node)
1447
1448       (disk_dimensions, ) = result.payload
1449
1450       if disk_dimensions is None:
1451         raise errors.OpExecError("Failed to retrieve disk size from primary"
1452                                  " node '%s'" % instance.primary_node)
1453       (disk_size_in_bytes, _) = disk_dimensions
1454
1455       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1456
1457       assert old_disk_size >= disk.size, \
1458         ("Retrieved disk size too small (got %s, should be at least %s)" %
1459          (old_disk_size, disk.size))
1460     else:
1461       old_disk_size = None
1462
1463     # We know that (as far as we can test) operations across different
1464     # nodes will succeed, time to run it for real on the backing storage
1465     for node in instance.all_nodes:
1466       self.cfg.SetDiskID(disk, node)
1467       result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1468                                            False, True)
1469       result.Raise("Grow request failed to node %s" % node)
1470
1471     # And now execute it for logical storage, on the primary node
1472     node = instance.primary_node
1473     self.cfg.SetDiskID(disk, node)
1474     result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1475                                          False, False)
1476     result.Raise("Grow request failed to node %s" % node)
1477
1478     disk.RecordGrow(self.delta)
1479     self.cfg.Update(instance, feedback_fn)
1480
1481     # Changes have been recorded, release node lock
1482     ReleaseLocks(self, locking.LEVEL_NODE)
1483
1484     # Downgrade lock while waiting for sync
1485     self.glm.downgrade(locking.LEVEL_INSTANCE)
1486
1487     assert wipe_disks ^ (old_disk_size is None)
1488
1489     if wipe_disks:
1490       assert instance.disks[self.op.disk] == disk
1491
1492       # Wipe newly added disk space
1493       WipeDisks(self, instance,
1494                 disks=[(self.op.disk, disk, old_disk_size)])
1495
1496     if self.op.wait_for_sync:
1497       disk_abort = not WaitForSync(self, instance, disks=[disk])
1498       if disk_abort:
1499         self.LogWarning("Disk syncing has not returned a good status; check"
1500                         " the instance")
1501       if instance.admin_state != constants.ADMINST_UP:
1502         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1503     elif instance.admin_state != constants.ADMINST_UP:
1504       self.LogWarning("Not shutting down the disk even if the instance is"
1505                       " not supposed to be running because no wait for"
1506                       " sync mode was requested")
1507
1508     assert self.owned_locks(locking.LEVEL_NODE_RES)
1509     assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1510
1511
1512 class LUInstanceReplaceDisks(LogicalUnit):
1513   """Replace the disks of an instance.
1514
1515   """
1516   HPATH = "mirrors-replace"
1517   HTYPE = constants.HTYPE_INSTANCE
1518   REQ_BGL = False
1519
1520   def CheckArguments(self):
1521     """Check arguments.
1522
1523     """
1524     remote_node = self.op.remote_node
1525     ialloc = self.op.iallocator
1526     if self.op.mode == constants.REPLACE_DISK_CHG:
1527       if remote_node is None and ialloc is None:
1528         raise errors.OpPrereqError("When changing the secondary either an"
1529                                    " iallocator script must be used or the"
1530                                    " new node given", errors.ECODE_INVAL)
1531       else:
1532         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1533
1534     elif remote_node is not None or ialloc is not None:
1535       # Not replacing the secondary
1536       raise errors.OpPrereqError("The iallocator and new node options can"
1537                                  " only be used when changing the"
1538                                  " secondary node", errors.ECODE_INVAL)
1539
1540   def ExpandNames(self):
1541     self._ExpandAndLockInstance()
1542
1543     assert locking.LEVEL_NODE not in self.needed_locks
1544     assert locking.LEVEL_NODE_RES not in self.needed_locks
1545     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1546
1547     assert self.op.iallocator is None or self.op.remote_node is None, \
1548       "Conflicting options"
1549
1550     if self.op.remote_node is not None:
1551       self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1552
1553       # Warning: do not remove the locking of the new secondary here
1554       # unless DRBD8Dev.AddChildren is changed to work in parallel;
1555       # currently it doesn't since parallel invocations of
1556       # FindUnusedMinor will conflict
1557       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1558       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1559     else:
1560       self.needed_locks[locking.LEVEL_NODE] = []
1561       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1562
1563       if self.op.iallocator is not None:
1564         # iallocator will select a new node in the same group
1565         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1566         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1567
1568     self.needed_locks[locking.LEVEL_NODE_RES] = []
1569
1570     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1571                                    self.op.iallocator, self.op.remote_node,
1572                                    self.op.disks, self.op.early_release,
1573                                    self.op.ignore_ipolicy)
1574
1575     self.tasklets = [self.replacer]
1576
1577   def DeclareLocks(self, level):
1578     if level == locking.LEVEL_NODEGROUP:
1579       assert self.op.remote_node is None
1580       assert self.op.iallocator is not None
1581       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1582
1583       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1584       # Lock all groups used by instance optimistically; this requires going
1585       # via the node before it's locked, requiring verification later on
1586       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1587         self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1588
1589     elif level == locking.LEVEL_NODE:
1590       if self.op.iallocator is not None:
1591         assert self.op.remote_node is None
1592         assert not self.needed_locks[locking.LEVEL_NODE]
1593         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1594
1595         # Lock member nodes of all locked groups
1596         self.needed_locks[locking.LEVEL_NODE] = \
1597           [node_name
1598            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1599            for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1600       else:
1601         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1602
1603         self._LockInstancesNodes()
1604
1605     elif level == locking.LEVEL_NODE_RES:
1606       # Reuse node locks
1607       self.needed_locks[locking.LEVEL_NODE_RES] = \
1608         self.needed_locks[locking.LEVEL_NODE]
1609
1610   def BuildHooksEnv(self):
1611     """Build hooks env.
1612
1613     This runs on the master, the primary and all the secondaries.
1614
1615     """
1616     instance = self.replacer.instance
1617     env = {
1618       "MODE": self.op.mode,
1619       "NEW_SECONDARY": self.op.remote_node,
1620       "OLD_SECONDARY": instance.secondary_nodes[0],
1621       }
1622     env.update(BuildInstanceHookEnvByObject(self, instance))
1623     return env
1624
1625   def BuildHooksNodes(self):
1626     """Build hooks nodes.
1627
1628     """
1629     instance = self.replacer.instance
1630     nl = [
1631       self.cfg.GetMasterNode(),
1632       instance.primary_node,
1633       ]
1634     if self.op.remote_node is not None:
1635       nl.append(self.op.remote_node)
1636     return nl, nl
1637
1638   def CheckPrereq(self):
1639     """Check prerequisites.
1640
1641     """
1642     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1643             self.op.iallocator is None)
1644
1645     # Verify if node group locks are still correct
1646     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1647     if owned_groups:
1648       CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1649
1650     return LogicalUnit.CheckPrereq(self)
1651
1652
1653 class LUInstanceActivateDisks(NoHooksLU):
1654   """Bring up an instance's disks.
1655
1656   """
1657   REQ_BGL = False
1658
1659   def ExpandNames(self):
1660     self._ExpandAndLockInstance()
1661     self.needed_locks[locking.LEVEL_NODE] = []
1662     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1663
1664   def DeclareLocks(self, level):
1665     if level == locking.LEVEL_NODE:
1666       self._LockInstancesNodes()
1667
1668   def CheckPrereq(self):
1669     """Check prerequisites.
1670
1671     This checks that the instance is in the cluster.
1672
1673     """
1674     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1675     assert self.instance is not None, \
1676       "Cannot retrieve locked instance %s" % self.op.instance_name
1677     CheckNodeOnline(self, self.instance.primary_node)
1678
1679   def Exec(self, feedback_fn):
1680     """Activate the disks.
1681
1682     """
1683     disks_ok, disks_info = \
1684               AssembleInstanceDisks(self, self.instance,
1685                                     ignore_size=self.op.ignore_size)
1686     if not disks_ok:
1687       raise errors.OpExecError("Cannot activate block devices")
1688
1689     if self.op.wait_for_sync:
1690       if not WaitForSync(self, self.instance):
1691         raise errors.OpExecError("Some disks of the instance are degraded!")
1692
1693     return disks_info
1694
1695
1696 class LUInstanceDeactivateDisks(NoHooksLU):
1697   """Shutdown an instance's disks.
1698
1699   """
1700   REQ_BGL = False
1701
1702   def ExpandNames(self):
1703     self._ExpandAndLockInstance()
1704     self.needed_locks[locking.LEVEL_NODE] = []
1705     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1706
1707   def DeclareLocks(self, level):
1708     if level == locking.LEVEL_NODE:
1709       self._LockInstancesNodes()
1710
1711   def CheckPrereq(self):
1712     """Check prerequisites.
1713
1714     This checks that the instance is in the cluster.
1715
1716     """
1717     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1718     assert self.instance is not None, \
1719       "Cannot retrieve locked instance %s" % self.op.instance_name
1720
1721   def Exec(self, feedback_fn):
1722     """Deactivate the disks
1723
1724     """
1725     instance = self.instance
1726     if self.op.force:
1727       ShutdownInstanceDisks(self, instance)
1728     else:
1729       _SafeShutdownInstanceDisks(self, instance)
1730
1731
1732 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1733                                ldisk=False):
1734   """Check that mirrors are not degraded.
1735
1736   @attention: The device has to be annotated already.
1737
1738   The ldisk parameter, if True, will change the test from the
1739   is_degraded attribute (which represents overall non-ok status for
1740   the device(s)) to the ldisk (representing the local storage status).
1741
1742   """
1743   lu.cfg.SetDiskID(dev, node)
1744
1745   result = True
1746
1747   if on_primary or dev.AssembleOnSecondary():
1748     rstats = lu.rpc.call_blockdev_find(node, dev)
1749     msg = rstats.fail_msg
1750     if msg:
1751       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1752       result = False
1753     elif not rstats.payload:
1754       lu.LogWarning("Can't find disk on node %s", node)
1755       result = False
1756     else:
1757       if ldisk:
1758         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1759       else:
1760         result = result and not rstats.payload.is_degraded
1761
1762   if dev.children:
1763     for child in dev.children:
1764       result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1765                                                      on_primary)
1766
1767   return result
1768
1769
1770 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1771   """Wrapper around L{_CheckDiskConsistencyInner}.
1772
1773   """
1774   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1775   return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1776                                     ldisk=ldisk)
1777
1778
1779 def _BlockdevFind(lu, node, dev, instance):
1780   """Wrapper around call_blockdev_find to annotate diskparams.
1781
1782   @param lu: A reference to the lu object
1783   @param node: The node to call out
1784   @param dev: The device to find
1785   @param instance: The instance object the device belongs to
1786   @returns The result of the rpc call
1787
1788   """
1789   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1790   return lu.rpc.call_blockdev_find(node, disk)
1791
1792
1793 def _GenerateUniqueNames(lu, exts):
1794   """Generate a suitable LV name.
1795
1796   This will generate a logical volume name for the given instance.
1797
1798   """
1799   results = []
1800   for val in exts:
1801     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1802     results.append("%s%s" % (new_id, val))
1803   return results
1804
1805
1806 class TLReplaceDisks(Tasklet):
1807   """Replaces disks for an instance.
1808
1809   Note: Locking is not within the scope of this class.
1810
1811   """
1812   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1813                disks, early_release, ignore_ipolicy):
1814     """Initializes this class.
1815
1816     """
1817     Tasklet.__init__(self, lu)
1818
1819     # Parameters
1820     self.instance_name = instance_name
1821     self.mode = mode
1822     self.iallocator_name = iallocator_name
1823     self.remote_node = remote_node
1824     self.disks = disks
1825     self.early_release = early_release
1826     self.ignore_ipolicy = ignore_ipolicy
1827
1828     # Runtime data
1829     self.instance = None
1830     self.new_node = None
1831     self.target_node = None
1832     self.other_node = None
1833     self.remote_node_info = None
1834     self.node_secondary_ip = None
1835
1836   @staticmethod
1837   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1838     """Compute a new secondary node using an IAllocator.
1839
1840     """
1841     req = iallocator.IAReqRelocate(name=instance_name,
1842                                    relocate_from=list(relocate_from))
1843     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1844
1845     ial.Run(iallocator_name)
1846
1847     if not ial.success:
1848       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1849                                  " %s" % (iallocator_name, ial.info),
1850                                  errors.ECODE_NORES)
1851
1852     remote_node_name = ial.result[0]
1853
1854     lu.LogInfo("Selected new secondary for instance '%s': %s",
1855                instance_name, remote_node_name)
1856
1857     return remote_node_name
1858
1859   def _FindFaultyDisks(self, node_name):
1860     """Wrapper for L{FindFaultyInstanceDisks}.
1861
1862     """
1863     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1864                                    node_name, True)
1865
1866   def _CheckDisksActivated(self, instance):
1867     """Checks if the instance disks are activated.
1868
1869     @param instance: The instance to check disks
1870     @return: True if they are activated, False otherwise
1871
1872     """
1873     nodes = instance.all_nodes
1874
1875     for idx, dev in enumerate(instance.disks):
1876       for node in nodes:
1877         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1878         self.cfg.SetDiskID(dev, node)
1879
1880         result = _BlockdevFind(self, node, dev, instance)
1881
1882         if result.offline:
1883           continue
1884         elif result.fail_msg or not result.payload:
1885           return False
1886
1887     return True
1888
1889   def CheckPrereq(self):
1890     """Check prerequisites.
1891
1892     This checks that the instance is in the cluster.
1893
1894     """
1895     self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1896     assert instance is not None, \
1897       "Cannot retrieve locked instance %s" % self.instance_name
1898
1899     if instance.disk_template != constants.DT_DRBD8:
1900       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1901                                  " instances", errors.ECODE_INVAL)
1902
1903     if len(instance.secondary_nodes) != 1:
1904       raise errors.OpPrereqError("The instance has a strange layout,"
1905                                  " expected one secondary but found %d" %
1906                                  len(instance.secondary_nodes),
1907                                  errors.ECODE_FAULT)
1908
1909     instance = self.instance
1910     secondary_node = instance.secondary_nodes[0]
1911
1912     if self.iallocator_name is None:
1913       remote_node = self.remote_node
1914     else:
1915       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1916                                        instance.name, instance.secondary_nodes)
1917
1918     if remote_node is None:
1919       self.remote_node_info = None
1920     else:
1921       assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1922              "Remote node '%s' is not locked" % remote_node
1923
1924       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1925       assert self.remote_node_info is not None, \
1926         "Cannot retrieve locked node %s" % remote_node
1927
1928     if remote_node == self.instance.primary_node:
1929       raise errors.OpPrereqError("The specified node is the primary node of"
1930                                  " the instance", errors.ECODE_INVAL)
1931
1932     if remote_node == secondary_node:
1933       raise errors.OpPrereqError("The specified node is already the"
1934                                  " secondary node of the instance",
1935                                  errors.ECODE_INVAL)
1936
1937     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1938                                     constants.REPLACE_DISK_CHG):
1939       raise errors.OpPrereqError("Cannot specify disks to be replaced",
1940                                  errors.ECODE_INVAL)
1941
1942     if self.mode == constants.REPLACE_DISK_AUTO:
1943       if not self._CheckDisksActivated(instance):
1944         raise errors.OpPrereqError("Please run activate-disks on instance %s"
1945                                    " first" % self.instance_name,
1946                                    errors.ECODE_STATE)
1947       faulty_primary = self._FindFaultyDisks(instance.primary_node)
1948       faulty_secondary = self._FindFaultyDisks(secondary_node)
1949
1950       if faulty_primary and faulty_secondary:
1951         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1952                                    " one node and can not be repaired"
1953                                    " automatically" % self.instance_name,
1954                                    errors.ECODE_STATE)
1955
1956       if faulty_primary:
1957         self.disks = faulty_primary
1958         self.target_node = instance.primary_node
1959         self.other_node = secondary_node
1960         check_nodes = [self.target_node, self.other_node]
1961       elif faulty_secondary:
1962         self.disks = faulty_secondary
1963         self.target_node = secondary_node
1964         self.other_node = instance.primary_node
1965         check_nodes = [self.target_node, self.other_node]
1966       else:
1967         self.disks = []
1968         check_nodes = []
1969
1970     else:
1971       # Non-automatic modes
1972       if self.mode == constants.REPLACE_DISK_PRI:
1973         self.target_node = instance.primary_node
1974         self.other_node = secondary_node
1975         check_nodes = [self.target_node, self.other_node]
1976
1977       elif self.mode == constants.REPLACE_DISK_SEC:
1978         self.target_node = secondary_node
1979         self.other_node = instance.primary_node
1980         check_nodes = [self.target_node, self.other_node]
1981
1982       elif self.mode == constants.REPLACE_DISK_CHG:
1983         self.new_node = remote_node
1984         self.other_node = instance.primary_node
1985         self.target_node = secondary_node
1986         check_nodes = [self.new_node, self.other_node]
1987
1988         CheckNodeNotDrained(self.lu, remote_node)
1989         CheckNodeVmCapable(self.lu, remote_node)
1990
1991         old_node_info = self.cfg.GetNodeInfo(secondary_node)
1992         assert old_node_info is not None
1993         if old_node_info.offline and not self.early_release:
1994           # doesn't make sense to delay the release
1995           self.early_release = True
1996           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1997                           " early-release mode", secondary_node)
1998
1999       else:
2000         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2001                                      self.mode)
2002
2003       # If not specified all disks should be replaced
2004       if not self.disks:
2005         self.disks = range(len(self.instance.disks))
2006
2007     # TODO: This is ugly, but right now we can't distinguish between internal
2008     # submitted opcode and external one. We should fix that.
2009     if self.remote_node_info:
2010       # We change the node, lets verify it still meets instance policy
2011       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2012       cluster = self.cfg.GetClusterInfo()
2013       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2014                                                               new_group_info)
2015       CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
2016                              self.cfg, ignore=self.ignore_ipolicy)
2017
2018     for node in check_nodes:
2019       CheckNodeOnline(self.lu, node)
2020
2021     touched_nodes = frozenset(node_name for node_name in [self.new_node,
2022                                                           self.other_node,
2023                                                           self.target_node]
2024                               if node_name is not None)
2025
2026     # Release unneeded node and node resource locks
2027     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2028     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2029     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2030
2031     # Release any owned node group
2032     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2033
2034     # Check whether disks are valid
2035     for disk_idx in self.disks:
2036       instance.FindDisk(disk_idx)
2037
2038     # Get secondary node IP addresses
2039     self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2040                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2041
2042   def Exec(self, feedback_fn):
2043     """Execute disk replacement.
2044
2045     This dispatches the disk replacement to the appropriate handler.
2046
2047     """
2048     if __debug__:
2049       # Verify owned locks before starting operation
2050       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2051       assert set(owned_nodes) == set(self.node_secondary_ip), \
2052           ("Incorrect node locks, owning %s, expected %s" %
2053            (owned_nodes, self.node_secondary_ip.keys()))
2054       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2055               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2056       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2057
2058       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2059       assert list(owned_instances) == [self.instance_name], \
2060           "Instance '%s' not locked" % self.instance_name
2061
2062       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2063           "Should not own any node group lock at this point"
2064
2065     if not self.disks:
2066       feedback_fn("No disks need replacement for instance '%s'" %
2067                   self.instance.name)
2068       return
2069
2070     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2071                 (utils.CommaJoin(self.disks), self.instance.name))
2072     feedback_fn("Current primary node: %s" % self.instance.primary_node)
2073     feedback_fn("Current seconary node: %s" %
2074                 utils.CommaJoin(self.instance.secondary_nodes))
2075
2076     activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
2077
2078     # Activate the instance disks if we're replacing them on a down instance
2079     if activate_disks:
2080       StartInstanceDisks(self.lu, self.instance, True)
2081
2082     try:
2083       # Should we replace the secondary node?
2084       if self.new_node is not None:
2085         fn = self._ExecDrbd8Secondary
2086       else:
2087         fn = self._ExecDrbd8DiskOnly
2088
2089       result = fn(feedback_fn)
2090     finally:
2091       # Deactivate the instance disks if we're replacing them on a
2092       # down instance
2093       if activate_disks:
2094         _SafeShutdownInstanceDisks(self.lu, self.instance)
2095
2096     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2097
2098     if __debug__:
2099       # Verify owned locks
2100       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2101       nodes = frozenset(self.node_secondary_ip)
2102       assert ((self.early_release and not owned_nodes) or
2103               (not self.early_release and not (set(owned_nodes) - nodes))), \
2104         ("Not owning the correct locks, early_release=%s, owned=%r,"
2105          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2106
2107     return result
2108
2109   def _CheckVolumeGroup(self, nodes):
2110     self.lu.LogInfo("Checking volume groups")
2111
2112     vgname = self.cfg.GetVGName()
2113
2114     # Make sure volume group exists on all involved nodes
2115     results = self.rpc.call_vg_list(nodes)
2116     if not results:
2117       raise errors.OpExecError("Can't list volume groups on the nodes")
2118
2119     for node in nodes:
2120       res = results[node]
2121       res.Raise("Error checking node %s" % node)
2122       if vgname not in res.payload:
2123         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2124                                  (vgname, node))
2125
2126   def _CheckDisksExistence(self, nodes):
2127     # Check disk existence
2128     for idx, dev in enumerate(self.instance.disks):
2129       if idx not in self.disks:
2130         continue
2131
2132       for node in nodes:
2133         self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2134         self.cfg.SetDiskID(dev, node)
2135
2136         result = _BlockdevFind(self, node, dev, self.instance)
2137
2138         msg = result.fail_msg
2139         if msg or not result.payload:
2140           if not msg:
2141             msg = "disk not found"
2142           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2143                                    (idx, node, msg))
2144
2145   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2146     for idx, dev in enumerate(self.instance.disks):
2147       if idx not in self.disks:
2148         continue
2149
2150       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2151                       (idx, node_name))
2152
2153       if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2154                                   on_primary, ldisk=ldisk):
2155         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2156                                  " replace disks for instance %s" %
2157                                  (node_name, self.instance.name))
2158
2159   def _CreateNewStorage(self, node_name):
2160     """Create new storage on the primary or secondary node.
2161
2162     This is only used for same-node replaces, not for changing the
2163     secondary node, hence we don't want to modify the existing disk.
2164
2165     """
2166     iv_names = {}
2167
2168     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2169     for idx, dev in enumerate(disks):
2170       if idx not in self.disks:
2171         continue
2172
2173       self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2174
2175       self.cfg.SetDiskID(dev, node_name)
2176
2177       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2178       names = _GenerateUniqueNames(self.lu, lv_names)
2179
2180       (data_disk, meta_disk) = dev.children
2181       vg_data = data_disk.logical_id[0]
2182       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2183                              logical_id=(vg_data, names[0]),
2184                              params=data_disk.params)
2185       vg_meta = meta_disk.logical_id[0]
2186       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2187                              size=constants.DRBD_META_SIZE,
2188                              logical_id=(vg_meta, names[1]),
2189                              params=meta_disk.params)
2190
2191       new_lvs = [lv_data, lv_meta]
2192       old_lvs = [child.Copy() for child in dev.children]
2193       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2194       excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2195
2196       # we pass force_create=True to force the LVM creation
2197       for new_lv in new_lvs:
2198         _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2199                              GetInstanceInfoText(self.instance), False,
2200                              excl_stor)
2201
2202     return iv_names
2203
2204   def _CheckDevices(self, node_name, iv_names):
2205     for name, (dev, _, _) in iv_names.iteritems():
2206       self.cfg.SetDiskID(dev, node_name)
2207
2208       result = _BlockdevFind(self, node_name, dev, self.instance)
2209
2210       msg = result.fail_msg
2211       if msg or not result.payload:
2212         if not msg:
2213           msg = "disk not found"
2214         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2215                                  (name, msg))
2216
2217       if result.payload.is_degraded:
2218         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2219
2220   def _RemoveOldStorage(self, node_name, iv_names):
2221     for name, (_, old_lvs, _) in iv_names.iteritems():
2222       self.lu.LogInfo("Remove logical volumes for %s", name)
2223
2224       for lv in old_lvs:
2225         self.cfg.SetDiskID(lv, node_name)
2226
2227         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2228         if msg:
2229           self.lu.LogWarning("Can't remove old LV: %s", msg,
2230                              hint="remove unused LVs manually")
2231
2232   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2233     """Replace a disk on the primary or secondary for DRBD 8.
2234
2235     The algorithm for replace is quite complicated:
2236
2237       1. for each disk to be replaced:
2238
2239         1. create new LVs on the target node with unique names
2240         1. detach old LVs from the drbd device
2241         1. rename old LVs to name_replaced.<time_t>
2242         1. rename new LVs to old LVs
2243         1. attach the new LVs (with the old names now) to the drbd device
2244
2245       1. wait for sync across all devices
2246
2247       1. for each modified disk:
2248
2249         1. remove old LVs (which have the name name_replaces.<time_t>)
2250
2251     Failures are not very well handled.
2252
2253     """
2254     steps_total = 6
2255
2256     # Step: check device activation
2257     self.lu.LogStep(1, steps_total, "Check device existence")
2258     self._CheckDisksExistence([self.other_node, self.target_node])
2259     self._CheckVolumeGroup([self.target_node, self.other_node])
2260
2261     # Step: check other node consistency
2262     self.lu.LogStep(2, steps_total, "Check peer consistency")
2263     self._CheckDisksConsistency(self.other_node,
2264                                 self.other_node == self.instance.primary_node,
2265                                 False)
2266
2267     # Step: create new storage
2268     self.lu.LogStep(3, steps_total, "Allocate new storage")
2269     iv_names = self._CreateNewStorage(self.target_node)
2270
2271     # Step: for each lv, detach+rename*2+attach
2272     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2273     for dev, old_lvs, new_lvs in iv_names.itervalues():
2274       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2275
2276       result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2277                                                      old_lvs)
2278       result.Raise("Can't detach drbd from local storage on node"
2279                    " %s for device %s" % (self.target_node, dev.iv_name))
2280       #dev.children = []
2281       #cfg.Update(instance)
2282
2283       # ok, we created the new LVs, so now we know we have the needed
2284       # storage; as such, we proceed on the target node to rename
2285       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2286       # using the assumption that logical_id == physical_id (which in
2287       # turn is the unique_id on that node)
2288
2289       # FIXME(iustin): use a better name for the replaced LVs
2290       temp_suffix = int(time.time())
2291       ren_fn = lambda d, suff: (d.physical_id[0],
2292                                 d.physical_id[1] + "_replaced-%s" % suff)
2293
2294       # Build the rename list based on what LVs exist on the node
2295       rename_old_to_new = []
2296       for to_ren in old_lvs:
2297         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2298         if not result.fail_msg and result.payload:
2299           # device exists
2300           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2301
2302       self.lu.LogInfo("Renaming the old LVs on the target node")
2303       result = self.rpc.call_blockdev_rename(self.target_node,
2304                                              rename_old_to_new)
2305       result.Raise("Can't rename old LVs on node %s" % self.target_node)
2306
2307       # Now we rename the new LVs to the old LVs
2308       self.lu.LogInfo("Renaming the new LVs on the target node")
2309       rename_new_to_old = [(new, old.physical_id)
2310                            for old, new in zip(old_lvs, new_lvs)]
2311       result = self.rpc.call_blockdev_rename(self.target_node,
2312                                              rename_new_to_old)
2313       result.Raise("Can't rename new LVs on node %s" % self.target_node)
2314
2315       # Intermediate steps of in memory modifications
2316       for old, new in zip(old_lvs, new_lvs):
2317         new.logical_id = old.logical_id
2318         self.cfg.SetDiskID(new, self.target_node)
2319
2320       # We need to modify old_lvs so that removal later removes the
2321       # right LVs, not the newly added ones; note that old_lvs is a
2322       # copy here
2323       for disk in old_lvs:
2324         disk.logical_id = ren_fn(disk, temp_suffix)
2325         self.cfg.SetDiskID(disk, self.target_node)
2326
2327       # Now that the new lvs have the old name, we can add them to the device
2328       self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2329       result = self.rpc.call_blockdev_addchildren(self.target_node,
2330                                                   (dev, self.instance), new_lvs)
2331       msg = result.fail_msg
2332       if msg:
2333         for new_lv in new_lvs:
2334           msg2 = self.rpc.call_blockdev_remove(self.target_node,
2335                                                new_lv).fail_msg
2336           if msg2:
2337             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2338                                hint=("cleanup manually the unused logical"
2339                                      "volumes"))
2340         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2341
2342     cstep = itertools.count(5)
2343
2344     if self.early_release:
2345       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2346       self._RemoveOldStorage(self.target_node, iv_names)
2347       # TODO: Check if releasing locks early still makes sense
2348       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2349     else:
2350       # Release all resource locks except those used by the instance
2351       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2352                    keep=self.node_secondary_ip.keys())
2353
2354     # Release all node locks while waiting for sync
2355     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2356
2357     # TODO: Can the instance lock be downgraded here? Take the optional disk
2358     # shutdown in the caller into consideration.
2359
2360     # Wait for sync
2361     # This can fail as the old devices are degraded and _WaitForSync
2362     # does a combined result over all disks, so we don't check its return value
2363     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2364     WaitForSync(self.lu, self.instance)
2365
2366     # Check all devices manually
2367     self._CheckDevices(self.instance.primary_node, iv_names)
2368
2369     # Step: remove old storage
2370     if not self.early_release:
2371       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2372       self._RemoveOldStorage(self.target_node, iv_names)
2373
2374   def _ExecDrbd8Secondary(self, feedback_fn):
2375     """Replace the secondary node for DRBD 8.
2376
2377     The algorithm for replace is quite complicated:
2378       - for all disks of the instance:
2379         - create new LVs on the new node with same names
2380         - shutdown the drbd device on the old secondary
2381         - disconnect the drbd network on the primary
2382         - create the drbd device on the new secondary
2383         - network attach the drbd on the primary, using an artifice:
2384           the drbd code for Attach() will connect to the network if it
2385           finds a device which is connected to the good local disks but
2386           not network enabled
2387       - wait for sync across all devices
2388       - remove all disks from the old secondary
2389
2390     Failures are not very well handled.
2391
2392     """
2393     steps_total = 6
2394
2395     pnode = self.instance.primary_node
2396
2397     # Step: check device activation
2398     self.lu.LogStep(1, steps_total, "Check device existence")
2399     self._CheckDisksExistence([self.instance.primary_node])
2400     self._CheckVolumeGroup([self.instance.primary_node])
2401
2402     # Step: check other node consistency
2403     self.lu.LogStep(2, steps_total, "Check peer consistency")
2404     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2405
2406     # Step: create new storage
2407     self.lu.LogStep(3, steps_total, "Allocate new storage")
2408     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2409     excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2410     for idx, dev in enumerate(disks):
2411       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2412                       (self.new_node, idx))
2413       # we pass force_create=True to force LVM creation
2414       for new_lv in dev.children:
2415         _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2416                              True, GetInstanceInfoText(self.instance), False,
2417                              excl_stor)
2418
2419     # Step 4: dbrd minors and drbd setups changes
2420     # after this, we must manually remove the drbd minors on both the
2421     # error and the success paths
2422     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2423     minors = self.cfg.AllocateDRBDMinor([self.new_node
2424                                          for dev in self.instance.disks],
2425                                         self.instance.name)
2426     logging.debug("Allocated minors %r", minors)
2427
2428     iv_names = {}
2429     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2430       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2431                       (self.new_node, idx))
2432       # create new devices on new_node; note that we create two IDs:
2433       # one without port, so the drbd will be activated without
2434       # networking information on the new node at this stage, and one
2435       # with network, for the latter activation in step 4
2436       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2437       if self.instance.primary_node == o_node1:
2438         p_minor = o_minor1
2439       else:
2440         assert self.instance.primary_node == o_node2, "Three-node instance?"
2441         p_minor = o_minor2
2442
2443       new_alone_id = (self.instance.primary_node, self.new_node, None,
2444                       p_minor, new_minor, o_secret)
2445       new_net_id = (self.instance.primary_node, self.new_node, o_port,
2446                     p_minor, new_minor, o_secret)
2447
2448       iv_names[idx] = (dev, dev.children, new_net_id)
2449       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2450                     new_net_id)
2451       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2452                               logical_id=new_alone_id,
2453                               children=dev.children,
2454                               size=dev.size,
2455                               params={})
2456       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2457                                             self.cfg)
2458       try:
2459         CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2460                              anno_new_drbd,
2461                              GetInstanceInfoText(self.instance), False,
2462                              excl_stor)
2463       except errors.GenericError:
2464         self.cfg.ReleaseDRBDMinors(self.instance.name)
2465         raise
2466
2467     # We have new devices, shutdown the drbd on the old secondary
2468     for idx, dev in enumerate(self.instance.disks):
2469       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2470       self.cfg.SetDiskID(dev, self.target_node)
2471       msg = self.rpc.call_blockdev_shutdown(self.target_node,
2472                                             (dev, self.instance)).fail_msg
2473       if msg:
2474         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2475                            "node: %s" % (idx, msg),
2476                            hint=("Please cleanup this device manually as"
2477                                  " soon as possible"))
2478
2479     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2480     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2481                                                self.instance.disks)[pnode]
2482
2483     msg = result.fail_msg
2484     if msg:
2485       # detaches didn't succeed (unlikely)
2486       self.cfg.ReleaseDRBDMinors(self.instance.name)
2487       raise errors.OpExecError("Can't detach the disks from the network on"
2488                                " old node: %s" % (msg,))
2489
2490     # if we managed to detach at least one, we update all the disks of
2491     # the instance to point to the new secondary
2492     self.lu.LogInfo("Updating instance configuration")
2493     for dev, _, new_logical_id in iv_names.itervalues():
2494       dev.logical_id = new_logical_id
2495       self.cfg.SetDiskID(dev, self.instance.primary_node)
2496
2497     self.cfg.Update(self.instance, feedback_fn)
2498
2499     # Release all node locks (the configuration has been updated)
2500     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2501
2502     # and now perform the drbd attach
2503     self.lu.LogInfo("Attaching primary drbds to new secondary"
2504                     " (standalone => connected)")
2505     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2506                                             self.new_node],
2507                                            self.node_secondary_ip,
2508                                            (self.instance.disks, self.instance),
2509                                            self.instance.name,
2510                                            False)
2511     for to_node, to_result in result.items():
2512       msg = to_result.fail_msg
2513       if msg:
2514         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2515                            to_node, msg,
2516                            hint=("please do a gnt-instance info to see the"
2517                                  " status of disks"))
2518
2519     cstep = itertools.count(5)
2520
2521     if self.early_release:
2522       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2523       self._RemoveOldStorage(self.target_node, iv_names)
2524       # TODO: Check if releasing locks early still makes sense
2525       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2526     else:
2527       # Release all resource locks except those used by the instance
2528       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2529                    keep=self.node_secondary_ip.keys())
2530
2531     # TODO: Can the instance lock be downgraded here? Take the optional disk
2532     # shutdown in the caller into consideration.
2533
2534     # Wait for sync
2535     # This can fail as the old devices are degraded and _WaitForSync
2536     # does a combined result over all disks, so we don't check its return value
2537     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2538     WaitForSync(self.lu, self.instance)
2539
2540     # Check all devices manually
2541     self._CheckDevices(self.instance.primary_node, iv_names)
2542
2543     # Step: remove old storage
2544     if not self.early_release:
2545       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2546       self._RemoveOldStorage(self.target_node, iv_names)