Merge branch 'stable-2.8' into stable-2.9
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import rpc
38 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43   CheckDiskTemplateEnabled
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   constants.DT_FILE: ".file",
56   constants.DT_SHARED_FILE: ".sharedfile",
57   }
58
59
60 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
61                          excl_stor):
62   """Create a single block device on a given node.
63
64   This will not recurse over children of the device, so they must be
65   created in advance.
66
67   @param lu: the lu on whose behalf we execute
68   @param node_uuid: the node on which to create the device
69   @type instance: L{objects.Instance}
70   @param instance: the instance which owns the device
71   @type device: L{objects.Disk}
72   @param device: the device to create
73   @param info: the extra 'metadata' we should attach to the device
74       (this will be represented as a LVM tag)
75   @type force_open: boolean
76   @param force_open: this parameter will be passes to the
77       L{backend.BlockdevCreate} function where it specifies
78       whether we run on primary or not, and it affects both
79       the child assembly and the device own Open() execution
80   @type excl_stor: boolean
81   @param excl_stor: Whether exclusive_storage is active for the node
82
83   """
84   lu.cfg.SetDiskID(device, node_uuid)
85   result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
86                                        instance.name, force_open, info,
87                                        excl_stor)
88   result.Raise("Can't create block device %s on"
89                " node %s for instance %s" % (device,
90                                              lu.cfg.GetNodeName(node_uuid),
91                                              instance.name))
92   if device.physical_id is None:
93     device.physical_id = result.payload
94
95
96 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
97                          info, force_open, excl_stor):
98   """Create a tree of block devices on a given node.
99
100   If this device type has to be created on secondaries, create it and
101   all its children.
102
103   If not, just recurse to children keeping the same 'force' value.
104
105   @attention: The device has to be annotated already.
106
107   @param lu: the lu on whose behalf we execute
108   @param node_uuid: the node on which to create the device
109   @type instance: L{objects.Instance}
110   @param instance: the instance which owns the device
111   @type device: L{objects.Disk}
112   @param device: the device to create
113   @type force_create: boolean
114   @param force_create: whether to force creation of this device; this
115       will be change to True whenever we find a device which has
116       CreateOnSecondary() attribute
117   @param info: the extra 'metadata' we should attach to the device
118       (this will be represented as a LVM tag)
119   @type force_open: boolean
120   @param force_open: this parameter will be passes to the
121       L{backend.BlockdevCreate} function where it specifies
122       whether we run on primary or not, and it affects both
123       the child assembly and the device own Open() execution
124   @type excl_stor: boolean
125   @param excl_stor: Whether exclusive_storage is active for the node
126
127   @return: list of created devices
128   """
129   created_devices = []
130   try:
131     if device.CreateOnSecondary():
132       force_create = True
133
134     if device.children:
135       for child in device.children:
136         devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
137                                     force_create, info, force_open, excl_stor)
138         created_devices.extend(devs)
139
140     if not force_create:
141       return created_devices
142
143     CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
144                          excl_stor)
145     # The device has been completely created, so there is no point in keeping
146     # its subdevices in the list. We just add the device itself instead.
147     created_devices = [(node_uuid, device)]
148     return created_devices
149
150   except errors.DeviceCreationError, e:
151     e.created_devices.extend(created_devices)
152     raise e
153   except errors.OpExecError, e:
154     raise errors.DeviceCreationError(str(e), created_devices)
155
156
157 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
158   """Whether exclusive_storage is in effect for the given node.
159
160   @type cfg: L{config.ConfigWriter}
161   @param cfg: The cluster configuration
162   @type node_uuid: string
163   @param node_uuid: The node UUID
164   @rtype: bool
165   @return: The effective value of exclusive_storage
166   @raise errors.OpPrereqError: if no node exists with the given name
167
168   """
169   ni = cfg.GetNodeInfo(node_uuid)
170   if ni is None:
171     raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
172                                errors.ECODE_NOENT)
173   return IsExclusiveStorageEnabledNode(cfg, ni)
174
175
176 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
177                     force_open):
178   """Wrapper around L{_CreateBlockDevInner}.
179
180   This method annotates the root device first.
181
182   """
183   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
184   excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
185   return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
186                               force_open, excl_stor)
187
188
189 def _UndoCreateDisks(lu, disks_created):
190   """Undo the work performed by L{CreateDisks}.
191
192   This function is called in case of an error to undo the work of
193   L{CreateDisks}.
194
195   @type lu: L{LogicalUnit}
196   @param lu: the logical unit on whose behalf we execute
197   @param disks_created: the result returned by L{CreateDisks}
198
199   """
200   for (node_uuid, disk) in disks_created:
201     lu.cfg.SetDiskID(disk, node_uuid)
202     result = lu.rpc.call_blockdev_remove(node_uuid, disk)
203     result.Warn("Failed to remove newly-created disk %s on node %s" %
204                 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
205
206
207 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
208   """Create all disks for an instance.
209
210   This abstracts away some work from AddInstance.
211
212   @type lu: L{LogicalUnit}
213   @param lu: the logical unit on whose behalf we execute
214   @type instance: L{objects.Instance}
215   @param instance: the instance whose disks we should create
216   @type to_skip: list
217   @param to_skip: list of indices to skip
218   @type target_node_uuid: string
219   @param target_node_uuid: if passed, overrides the target node for creation
220   @type disks: list of {objects.Disk}
221   @param disks: the disks to create; if not specified, all the disks of the
222       instance are created
223   @return: information about the created disks, to be used to call
224       L{_UndoCreateDisks}
225   @raise errors.OpPrereqError: in case of error
226
227   """
228   info = GetInstanceInfoText(instance)
229   if target_node_uuid is None:
230     pnode_uuid = instance.primary_node
231     all_node_uuids = instance.all_nodes
232   else:
233     pnode_uuid = target_node_uuid
234     all_node_uuids = [pnode_uuid]
235
236   if disks is None:
237     disks = instance.disks
238
239   CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
240
241   if instance.disk_template in constants.DTS_FILEBASED:
242     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
243     result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
244
245     result.Raise("Failed to create directory '%s' on"
246                  " node %s" % (file_storage_dir,
247                                lu.cfg.GetNodeName(pnode_uuid)))
248
249   disks_created = []
250   for idx, device in enumerate(disks):
251     if to_skip and idx in to_skip:
252       continue
253     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
254     for node_uuid in all_node_uuids:
255       f_create = node_uuid == pnode_uuid
256       try:
257         _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
258                         f_create)
259         disks_created.append((node_uuid, device))
260       except errors.DeviceCreationError, e:
261         logging.warning("Creating disk %s for instance '%s' failed",
262                         idx, instance.name)
263         disks_created.extend(e.created_devices)
264         _UndoCreateDisks(lu, disks_created)
265         raise errors.OpExecError(e.message)
266   return disks_created
267
268
269 def ComputeDiskSizePerVG(disk_template, disks):
270   """Compute disk size requirements in the volume group
271
272   """
273   def _compute(disks, payload):
274     """Universal algorithm.
275
276     """
277     vgs = {}
278     for disk in disks:
279       vgs[disk[constants.IDISK_VG]] = \
280         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
281
282     return vgs
283
284   # Required free disk space as a function of disk and swap space
285   req_size_dict = {
286     constants.DT_DISKLESS: {},
287     constants.DT_PLAIN: _compute(disks, 0),
288     # 128 MB are added for drbd metadata for each disk
289     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
290     constants.DT_FILE: {},
291     constants.DT_SHARED_FILE: {},
292     }
293
294   if disk_template not in req_size_dict:
295     raise errors.ProgrammerError("Disk template '%s' size requirement"
296                                  " is unknown" % disk_template)
297
298   return req_size_dict[disk_template]
299
300
301 def ComputeDisks(op, default_vg):
302   """Computes the instance disks.
303
304   @param op: The instance opcode
305   @param default_vg: The default_vg to assume
306
307   @return: The computed disks
308
309   """
310   disks = []
311   for disk in op.disks:
312     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
313     if mode not in constants.DISK_ACCESS_SET:
314       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
315                                  mode, errors.ECODE_INVAL)
316     size = disk.get(constants.IDISK_SIZE, None)
317     if size is None:
318       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
319     try:
320       size = int(size)
321     except (TypeError, ValueError):
322       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
323                                  errors.ECODE_INVAL)
324
325     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
326     if ext_provider and op.disk_template != constants.DT_EXT:
327       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
328                                  " disk template, not %s" %
329                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
330                                   op.disk_template), errors.ECODE_INVAL)
331
332     data_vg = disk.get(constants.IDISK_VG, default_vg)
333     name = disk.get(constants.IDISK_NAME, None)
334     if name is not None and name.lower() == constants.VALUE_NONE:
335       name = None
336     new_disk = {
337       constants.IDISK_SIZE: size,
338       constants.IDISK_MODE: mode,
339       constants.IDISK_VG: data_vg,
340       constants.IDISK_NAME: name,
341       }
342
343     for key in [
344       constants.IDISK_METAVG,
345       constants.IDISK_ADOPT,
346       constants.IDISK_SPINDLES,
347       ]:
348       if key in disk:
349         new_disk[key] = disk[key]
350
351     # For extstorage, demand the `provider' option and add any
352     # additional parameters (ext-params) to the dict
353     if op.disk_template == constants.DT_EXT:
354       if ext_provider:
355         new_disk[constants.IDISK_PROVIDER] = ext_provider
356         for key in disk:
357           if key not in constants.IDISK_PARAMS:
358             new_disk[key] = disk[key]
359       else:
360         raise errors.OpPrereqError("Missing provider for template '%s'" %
361                                    constants.DT_EXT, errors.ECODE_INVAL)
362
363     disks.append(new_disk)
364
365   return disks
366
367
368 def CheckRADOSFreeSpace():
369   """Compute disk size requirements inside the RADOS cluster.
370
371   """
372   # For the RADOS cluster we assume there is always enough space.
373   pass
374
375
376 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
377                          iv_name, p_minor, s_minor):
378   """Generate a drbd8 device complete with its children.
379
380   """
381   assert len(vgnames) == len(names) == 2
382   port = lu.cfg.AllocatePort()
383   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384
385   dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
386                           logical_id=(vgnames[0], names[0]),
387                           params={})
388   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389   dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
390                           size=constants.DRBD_META_SIZE,
391                           logical_id=(vgnames[1], names[1]),
392                           params={})
393   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394   drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
395                           logical_id=(primary_uuid, secondary_uuid, port,
396                                       p_minor, s_minor,
397                                       shared_secret),
398                           children=[dev_data, dev_meta],
399                           iv_name=iv_name, params={})
400   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401   return drbd_dev
402
403
404 def GenerateDiskTemplate(
405   lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
406   disk_info, file_storage_dir, file_driver, base_index,
407   feedback_fn, full_disk_params):
408   """Generate the entire disk layout for a given template type.
409
410   """
411   vgname = lu.cfg.GetVGName()
412   disk_count = len(disk_info)
413   disks = []
414
415   CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
416
417   if template_name == constants.DT_DISKLESS:
418     pass
419   elif template_name == constants.DT_DRBD8:
420     if len(secondary_node_uuids) != 1:
421       raise errors.ProgrammerError("Wrong template configuration")
422     remote_node_uuid = secondary_node_uuids[0]
423     minors = lu.cfg.AllocateDRBDMinor(
424       [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
425
426     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
427                                                        full_disk_params)
428     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
429
430     names = []
431     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
432                                                for i in range(disk_count)]):
433       names.append(lv_prefix + "_data")
434       names.append(lv_prefix + "_meta")
435     for idx, disk in enumerate(disk_info):
436       disk_index = idx + base_index
437       data_vg = disk.get(constants.IDISK_VG, vgname)
438       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
439       disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
440                                       disk[constants.IDISK_SIZE],
441                                       [data_vg, meta_vg],
442                                       names[idx * 2:idx * 2 + 2],
443                                       "disk/%d" % disk_index,
444                                       minors[idx * 2], minors[idx * 2 + 1])
445       disk_dev.mode = disk[constants.IDISK_MODE]
446       disk_dev.name = disk.get(constants.IDISK_NAME, None)
447       disks.append(disk_dev)
448   else:
449     if secondary_node_uuids:
450       raise errors.ProgrammerError("Wrong template configuration")
451
452     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
453     if name_prefix is None:
454       names = None
455     else:
456       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
457                                         (name_prefix, base_index + i)
458                                         for i in range(disk_count)])
459
460     if template_name == constants.DT_PLAIN:
461
462       def logical_id_fn(idx, _, disk):
463         vg = disk.get(constants.IDISK_VG, vgname)
464         return (vg, names[idx])
465
466     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
467       logical_id_fn = \
468         lambda _, disk_index, disk: (file_driver,
469                                      "%s/%s" % (file_storage_dir,
470                                                 names[idx]))
471     elif template_name == constants.DT_BLOCK:
472       logical_id_fn = \
473         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
474                                        disk[constants.IDISK_ADOPT])
475     elif template_name == constants.DT_RBD:
476       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
477     elif template_name == constants.DT_EXT:
478       def logical_id_fn(idx, _, disk):
479         provider = disk.get(constants.IDISK_PROVIDER, None)
480         if provider is None:
481           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
482                                        " not found", constants.DT_EXT,
483                                        constants.IDISK_PROVIDER)
484         return (provider, names[idx])
485     else:
486       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
487
488     dev_type = template_name
489
490     for idx, disk in enumerate(disk_info):
491       params = {}
492       # Only for the Ext template add disk_info to params
493       if template_name == constants.DT_EXT:
494         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
495         for key in disk:
496           if key not in constants.IDISK_PARAMS:
497             params[key] = disk[key]
498       disk_index = idx + base_index
499       size = disk[constants.IDISK_SIZE]
500       feedback_fn("* disk %s, size %s" %
501                   (disk_index, utils.FormatUnit(size, "h")))
502       disk_dev = objects.Disk(dev_type=dev_type, size=size,
503                               logical_id=logical_id_fn(idx, disk_index, disk),
504                               iv_name="disk/%d" % disk_index,
505                               mode=disk[constants.IDISK_MODE],
506                               params=params,
507                               spindles=disk.get(constants.IDISK_SPINDLES))
508       disk_dev.name = disk.get(constants.IDISK_NAME, None)
509       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
510       disks.append(disk_dev)
511
512   return disks
513
514
515 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
516   """Check the presence of the spindle options with exclusive_storage.
517
518   @type diskdict: dict
519   @param diskdict: disk parameters
520   @type es_flag: bool
521   @param es_flag: the effective value of the exlusive_storage flag
522   @type required: bool
523   @param required: whether spindles are required or just optional
524   @raise errors.OpPrereqError when spindles are given and they should not
525
526   """
527   if (not es_flag and constants.IDISK_SPINDLES in diskdict and
528       diskdict[constants.IDISK_SPINDLES] is not None):
529     raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
530                                " when exclusive storage is not active",
531                                errors.ECODE_INVAL)
532   if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
533                                 diskdict[constants.IDISK_SPINDLES] is None)):
534     raise errors.OpPrereqError("You must specify spindles in instance disks"
535                                " when exclusive storage is active",
536                                errors.ECODE_INVAL)
537
538
539 class LUInstanceRecreateDisks(LogicalUnit):
540   """Recreate an instance's missing disks.
541
542   """
543   HPATH = "instance-recreate-disks"
544   HTYPE = constants.HTYPE_INSTANCE
545   REQ_BGL = False
546
547   _MODIFYABLE = compat.UniqueFrozenset([
548     constants.IDISK_SIZE,
549     constants.IDISK_MODE,
550     constants.IDISK_SPINDLES,
551     ])
552
553   # New or changed disk parameters may have different semantics
554   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555     constants.IDISK_ADOPT,
556
557     # TODO: Implement support changing VG while recreating
558     constants.IDISK_VG,
559     constants.IDISK_METAVG,
560     constants.IDISK_PROVIDER,
561     constants.IDISK_NAME,
562     ]))
563
564   def _RunAllocator(self):
565     """Run the allocator based on input opcode.
566
567     """
568     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569
570     # FIXME
571     # The allocator should actually run in "relocate" mode, but current
572     # allocators don't support relocating all the nodes of an instance at
573     # the same time. As a workaround we use "allocate" mode, but this is
574     # suboptimal for two reasons:
575     # - The instance name passed to the allocator is present in the list of
576     #   existing instances, so there could be a conflict within the
577     #   internal structures of the allocator. This doesn't happen with the
578     #   current allocators, but it's a liability.
579     # - The allocator counts the resources used by the instance twice: once
580     #   because the instance exists already, and once because it tries to
581     #   allocate a new instance.
582     # The allocator could choose some of the nodes on which the instance is
583     # running, but that's not a problem. If the instance nodes are broken,
584     # they should be already be marked as drained or offline, and hence
585     # skipped by the allocator. If instance disks have been lost for other
586     # reasons, then recreating the disks on the same nodes should be fine.
587     disk_template = self.instance.disk_template
588     spindle_use = be_full[constants.BE_SPINDLE_USE]
589     disks = [{
590       constants.IDISK_SIZE: d.size,
591       constants.IDISK_MODE: d.mode,
592       constants.IDISK_SPINDLES: d.spindles,
593       } for d in self.instance.disks]
594     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
595                                         disk_template=disk_template,
596                                         tags=list(self.instance.GetTags()),
597                                         os=self.instance.os,
598                                         nics=[{}],
599                                         vcpus=be_full[constants.BE_VCPUS],
600                                         memory=be_full[constants.BE_MAXMEM],
601                                         spindle_use=spindle_use,
602                                         disks=disks,
603                                         hypervisor=self.instance.hypervisor,
604                                         node_whitelist=None)
605     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
606
607     ial.Run(self.op.iallocator)
608
609     assert req.RequiredNodes() == len(self.instance.all_nodes)
610
611     if not ial.success:
612       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
613                                  " %s" % (self.op.iallocator, ial.info),
614                                  errors.ECODE_NORES)
615
616     (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
617     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
618                  self.op.instance_name, self.op.iallocator,
619                  utils.CommaJoin(self.op.nodes))
620
621   def CheckArguments(self):
622     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
623       # Normalize and convert deprecated list of disk indices
624       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
625
626     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
627     if duplicates:
628       raise errors.OpPrereqError("Some disks have been specified more than"
629                                  " once: %s" % utils.CommaJoin(duplicates),
630                                  errors.ECODE_INVAL)
631
632     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
633     # when neither iallocator nor nodes are specified
634     if self.op.iallocator or self.op.nodes:
635       CheckIAllocatorOrNode(self, "iallocator", "nodes")
636
637     for (idx, params) in self.op.disks:
638       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
639       unsupported = frozenset(params.keys()) - self._MODIFYABLE
640       if unsupported:
641         raise errors.OpPrereqError("Parameters for disk %s try to change"
642                                    " unmodifyable parameter(s): %s" %
643                                    (idx, utils.CommaJoin(unsupported)),
644                                    errors.ECODE_INVAL)
645
646   def ExpandNames(self):
647     self._ExpandAndLockInstance()
648     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
649
650     if self.op.nodes:
651       (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
652       self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
653     else:
654       self.needed_locks[locking.LEVEL_NODE] = []
655       if self.op.iallocator:
656         # iallocator will select a new node in the same group
657         self.needed_locks[locking.LEVEL_NODEGROUP] = []
658         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
659
660     self.needed_locks[locking.LEVEL_NODE_RES] = []
661
662   def DeclareLocks(self, level):
663     if level == locking.LEVEL_NODEGROUP:
664       assert self.op.iallocator is not None
665       assert not self.op.nodes
666       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
667       self.share_locks[locking.LEVEL_NODEGROUP] = 1
668       # Lock the primary group used by the instance optimistically; this
669       # requires going via the node before it's locked, requiring
670       # verification later on
671       self.needed_locks[locking.LEVEL_NODEGROUP] = \
672         self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
673
674     elif level == locking.LEVEL_NODE:
675       # If an allocator is used, then we lock all the nodes in the current
676       # instance group, as we don't know yet which ones will be selected;
677       # if we replace the nodes without using an allocator, locks are
678       # already declared in ExpandNames; otherwise, we need to lock all the
679       # instance nodes for disk re-creation
680       if self.op.iallocator:
681         assert not self.op.nodes
682         assert not self.needed_locks[locking.LEVEL_NODE]
683         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
684
685         # Lock member nodes of the group of the primary node
686         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
687           self.needed_locks[locking.LEVEL_NODE].extend(
688             self.cfg.GetNodeGroup(group_uuid).members)
689
690         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
691       elif not self.op.nodes:
692         self._LockInstancesNodes(primary_only=False)
693     elif level == locking.LEVEL_NODE_RES:
694       # Copy node locks
695       self.needed_locks[locking.LEVEL_NODE_RES] = \
696         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
697
698   def BuildHooksEnv(self):
699     """Build hooks env.
700
701     This runs on master, primary and secondary nodes of the instance.
702
703     """
704     return BuildInstanceHookEnvByObject(self, self.instance)
705
706   def BuildHooksNodes(self):
707     """Build hooks nodes.
708
709     """
710     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
711     return (nl, nl)
712
713   def CheckPrereq(self):
714     """Check prerequisites.
715
716     This checks that the instance is in the cluster and is not running.
717
718     """
719     instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
720     assert instance is not None, \
721       "Cannot retrieve locked instance %s" % self.op.instance_name
722     if self.op.node_uuids:
723       if len(self.op.node_uuids) != len(instance.all_nodes):
724         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
725                                    " %d replacement nodes were specified" %
726                                    (instance.name, len(instance.all_nodes),
727                                     len(self.op.node_uuids)),
728                                    errors.ECODE_INVAL)
729       assert instance.disk_template != constants.DT_DRBD8 or \
730              len(self.op.node_uuids) == 2
731       assert instance.disk_template != constants.DT_PLAIN or \
732              len(self.op.node_uuids) == 1
733       primary_node = self.op.node_uuids[0]
734     else:
735       primary_node = instance.primary_node
736     if not self.op.iallocator:
737       CheckNodeOnline(self, primary_node)
738
739     if instance.disk_template == constants.DT_DISKLESS:
740       raise errors.OpPrereqError("Instance '%s' has no disks" %
741                                  self.op.instance_name, errors.ECODE_INVAL)
742
743     # Verify if node group locks are still correct
744     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
745     if owned_groups:
746       # Node group locks are acquired only for the primary node (and only
747       # when the allocator is used)
748       CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
749                               primary_only=True)
750
751     # if we replace nodes *and* the old primary is offline, we don't
752     # check the instance state
753     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
754     if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
755       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
756                          msg="cannot recreate disks")
757
758     if self.op.disks:
759       self.disks = dict(self.op.disks)
760     else:
761       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
762
763     maxidx = max(self.disks.keys())
764     if maxidx >= len(instance.disks):
765       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
766                                  errors.ECODE_INVAL)
767
768     if ((self.op.node_uuids or self.op.iallocator) and
769          sorted(self.disks.keys()) != range(len(instance.disks))):
770       raise errors.OpPrereqError("Can't recreate disks partially and"
771                                  " change the nodes at the same time",
772                                  errors.ECODE_INVAL)
773
774     self.instance = instance
775
776     if self.op.iallocator:
777       self._RunAllocator()
778       # Release unneeded node and node resource locks
779       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
780       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
781       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
782
783     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
784
785     if self.op.node_uuids:
786       node_uuids = self.op.node_uuids
787     else:
788       node_uuids = instance.all_nodes
789     excl_stor = compat.any(
790       rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
791       )
792     for new_params in self.disks.values():
793       CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
794
795   def Exec(self, feedback_fn):
796     """Recreate the disks.
797
798     """
799     assert (self.owned_locks(locking.LEVEL_NODE) ==
800             self.owned_locks(locking.LEVEL_NODE_RES))
801
802     to_skip = []
803     mods = [] # keeps track of needed changes
804
805     for idx, disk in enumerate(self.instance.disks):
806       try:
807         changes = self.disks[idx]
808       except KeyError:
809         # Disk should not be recreated
810         to_skip.append(idx)
811         continue
812
813       # update secondaries for disks, if needed
814       if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
815         # need to update the nodes and minors
816         assert len(self.op.node_uuids) == 2
817         assert len(disk.logical_id) == 6 # otherwise disk internals
818                                          # have changed
819         (_, _, old_port, _, _, old_secret) = disk.logical_id
820         new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
821                                                 self.instance.uuid)
822         new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
823                   new_minors[0], new_minors[1], old_secret)
824         assert len(disk.logical_id) == len(new_id)
825       else:
826         new_id = None
827
828       mods.append((idx, new_id, changes))
829
830     # now that we have passed all asserts above, we can apply the mods
831     # in a single run (to avoid partial changes)
832     for idx, new_id, changes in mods:
833       disk = self.instance.disks[idx]
834       if new_id is not None:
835         assert disk.dev_type == constants.DT_DRBD8
836         disk.logical_id = new_id
837       if changes:
838         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
839                     mode=changes.get(constants.IDISK_MODE, None),
840                     spindles=changes.get(constants.IDISK_SPINDLES, None))
841
842     # change primary node, if needed
843     if self.op.node_uuids:
844       self.instance.primary_node = self.op.node_uuids[0]
845       self.LogWarning("Changing the instance's nodes, you will have to"
846                       " remove any disks left on the older nodes manually")
847
848     if self.op.node_uuids:
849       self.cfg.Update(self.instance, feedback_fn)
850
851     # All touched nodes must be locked
852     mylocks = self.owned_locks(locking.LEVEL_NODE)
853     assert mylocks.issuperset(frozenset(self.instance.all_nodes))
854     new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
855
856     # TODO: Release node locks before wiping, or explain why it's not possible
857     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
858       wipedisks = [(idx, disk, 0)
859                    for (idx, disk) in enumerate(self.instance.disks)
860                    if idx not in to_skip]
861       WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
862                          cleanup=new_disks)
863
864
865 def _PerformNodeInfoCall(lu, node_uuids, vg):
866   """Prepares the input and performs a node info call.
867
868   @type lu: C{LogicalUnit}
869   @param lu: a logical unit from which we get configuration data
870   @type node_uuids: list of string
871   @param node_uuids: list of node UUIDs to perform the call for
872   @type vg: string
873   @param vg: the volume group's name
874
875   """
876   lvm_storage_units = [(constants.ST_LVM_VG, vg)]
877   storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
878                                                   node_uuids)
879   hvname = lu.cfg.GetHypervisorType()
880   hvparams = lu.cfg.GetClusterInfo().hvparams
881   nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
882                                    [(hvname, hvparams[hvname])])
883   return nodeinfo
884
885
886 def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
887   """Checks the vg capacity for a given node.
888
889   @type node_info: tuple (_, list of dicts, _)
890   @param node_info: the result of the node info call for one node
891   @type node_name: string
892   @param node_name: the name of the node
893   @type vg: string
894   @param vg: volume group name
895   @type requested: int
896   @param requested: the amount of disk in MiB to check for
897   @raise errors.OpPrereqError: if the node doesn't have enough disk,
898       or we cannot check the node
899
900   """
901   (_, space_info, _) = node_info
902   lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
903       space_info, constants.ST_LVM_VG)
904   if not lvm_vg_info:
905     raise errors.OpPrereqError("Can't retrieve storage information for LVM")
906   vg_free = lvm_vg_info.get("storage_free", None)
907   if not isinstance(vg_free, int):
908     raise errors.OpPrereqError("Can't compute free disk space on node"
909                                " %s for vg %s, result was '%s'" %
910                                (node_name, vg, vg_free), errors.ECODE_ENVIRON)
911   if requested > vg_free:
912     raise errors.OpPrereqError("Not enough disk space on target node %s"
913                                " vg %s: required %d MiB, available %d MiB" %
914                                (node_name, vg, requested, vg_free),
915                                errors.ECODE_NORES)
916
917
918 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
919   """Checks if nodes have enough free disk space in the specified VG.
920
921   This function checks if all given nodes have the needed amount of
922   free disk. In case any node has less disk or we cannot get the
923   information from the node, this function raises an OpPrereqError
924   exception.
925
926   @type lu: C{LogicalUnit}
927   @param lu: a logical unit from which we get configuration data
928   @type node_uuids: C{list}
929   @param node_uuids: the list of node UUIDs to check
930   @type vg: C{str}
931   @param vg: the volume group to check
932   @type requested: C{int}
933   @param requested: the amount of disk in MiB to check for
934   @raise errors.OpPrereqError: if the node doesn't have enough disk,
935       or we cannot check the node
936
937   """
938   nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
939   for node in node_uuids:
940     node_name = lu.cfg.GetNodeName(node)
941     info = nodeinfo[node]
942     info.Raise("Cannot get current information from node %s" % node_name,
943                prereq=True, ecode=errors.ECODE_ENVIRON)
944     _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
945
946
947 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
948   """Checks if nodes have enough free disk space in all the VGs.
949
950   This function checks if all given nodes have the needed amount of
951   free disk. In case any node has less disk or we cannot get the
952   information from the node, this function raises an OpPrereqError
953   exception.
954
955   @type lu: C{LogicalUnit}
956   @param lu: a logical unit from which we get configuration data
957   @type node_uuids: C{list}
958   @param node_uuids: the list of node UUIDs to check
959   @type req_sizes: C{dict}
960   @param req_sizes: the hash of vg and corresponding amount of disk in
961       MiB to check for
962   @raise errors.OpPrereqError: if the node doesn't have enough disk,
963       or we cannot check the node
964
965   """
966   for vg, req_size in req_sizes.items():
967     _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
968
969
970 def _DiskSizeInBytesToMebibytes(lu, size):
971   """Converts a disk size in bytes to mebibytes.
972
973   Warns and rounds up if the size isn't an even multiple of 1 MiB.
974
975   """
976   (mib, remainder) = divmod(size, 1024 * 1024)
977
978   if remainder != 0:
979     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
980                   " to not overwrite existing data (%s bytes will not be"
981                   " wiped)", (1024 * 1024) - remainder)
982     mib += 1
983
984   return mib
985
986
987 def _CalcEta(time_taken, written, total_size):
988   """Calculates the ETA based on size written and total size.
989
990   @param time_taken: The time taken so far
991   @param written: amount written so far
992   @param total_size: The total size of data to be written
993   @return: The remaining time in seconds
994
995   """
996   avg_time = time_taken / float(written)
997   return (total_size - written) * avg_time
998
999
1000 def WipeDisks(lu, instance, disks=None):
1001   """Wipes instance disks.
1002
1003   @type lu: L{LogicalUnit}
1004   @param lu: the logical unit on whose behalf we execute
1005   @type instance: L{objects.Instance}
1006   @param instance: the instance whose disks we should create
1007   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1008   @param disks: Disk details; tuple contains disk index, disk object and the
1009     start offset
1010
1011   """
1012   node_uuid = instance.primary_node
1013   node_name = lu.cfg.GetNodeName(node_uuid)
1014
1015   if disks is None:
1016     disks = [(idx, disk, 0)
1017              for (idx, disk) in enumerate(instance.disks)]
1018
1019   for (_, device, _) in disks:
1020     lu.cfg.SetDiskID(device, node_uuid)
1021
1022   logging.info("Pausing synchronization of disks of instance '%s'",
1023                instance.name)
1024   result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1025                                                   (map(compat.snd, disks),
1026                                                    instance),
1027                                                   True)
1028   result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1029
1030   for idx, success in enumerate(result.payload):
1031     if not success:
1032       logging.warn("Pausing synchronization of disk %s of instance '%s'"
1033                    " failed", idx, instance.name)
1034
1035   try:
1036     for (idx, device, offset) in disks:
1037       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1038       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1039       wipe_chunk_size = \
1040         int(min(constants.MAX_WIPE_CHUNK,
1041                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1042
1043       size = device.size
1044       last_output = 0
1045       start_time = time.time()
1046
1047       if offset == 0:
1048         info_text = ""
1049       else:
1050         info_text = (" (from %s to %s)" %
1051                      (utils.FormatUnit(offset, "h"),
1052                       utils.FormatUnit(size, "h")))
1053
1054       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1055
1056       logging.info("Wiping disk %d for instance %s on node %s using"
1057                    " chunk size %s", idx, instance.name, node_name,
1058                    wipe_chunk_size)
1059
1060       while offset < size:
1061         wipe_size = min(wipe_chunk_size, size - offset)
1062
1063         logging.debug("Wiping disk %d, offset %s, chunk %s",
1064                       idx, offset, wipe_size)
1065
1066         result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1067                                            offset, wipe_size)
1068         result.Raise("Could not wipe disk %d at offset %d for size %d" %
1069                      (idx, offset, wipe_size))
1070
1071         now = time.time()
1072         offset += wipe_size
1073         if now - last_output >= 60:
1074           eta = _CalcEta(now - start_time, offset, size)
1075           lu.LogInfo(" - done: %.1f%% ETA: %s",
1076                      offset / float(size) * 100, utils.FormatSeconds(eta))
1077           last_output = now
1078   finally:
1079     logging.info("Resuming synchronization of disks for instance '%s'",
1080                  instance.name)
1081
1082     result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1083                                                     (map(compat.snd, disks),
1084                                                      instance),
1085                                                     False)
1086
1087     if result.fail_msg:
1088       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1089                     node_name, result.fail_msg)
1090     else:
1091       for idx, success in enumerate(result.payload):
1092         if not success:
1093           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1094                         " failed", idx, instance.name)
1095
1096
1097 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1098   """Wrapper for L{WipeDisks} that handles errors.
1099
1100   @type lu: L{LogicalUnit}
1101   @param lu: the logical unit on whose behalf we execute
1102   @type instance: L{objects.Instance}
1103   @param instance: the instance whose disks we should wipe
1104   @param disks: see L{WipeDisks}
1105   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1106       case of error
1107   @raise errors.OpPrereqError: in case of failure
1108
1109   """
1110   try:
1111     WipeDisks(lu, instance, disks=disks)
1112   except errors.OpExecError:
1113     logging.warning("Wiping disks for instance '%s' failed",
1114                     instance.name)
1115     _UndoCreateDisks(lu, cleanup)
1116     raise
1117
1118
1119 def ExpandCheckDisks(instance, disks):
1120   """Return the instance disks selected by the disks list
1121
1122   @type disks: list of L{objects.Disk} or None
1123   @param disks: selected disks
1124   @rtype: list of L{objects.Disk}
1125   @return: selected instance disks to act on
1126
1127   """
1128   if disks is None:
1129     return instance.disks
1130   else:
1131     if not set(disks).issubset(instance.disks):
1132       raise errors.ProgrammerError("Can only act on disks belonging to the"
1133                                    " target instance: expected a subset of %r,"
1134                                    " got %r" % (instance.disks, disks))
1135     return disks
1136
1137
1138 def WaitForSync(lu, instance, disks=None, oneshot=False):
1139   """Sleep and poll for an instance's disk to sync.
1140
1141   """
1142   if not instance.disks or disks is not None and not disks:
1143     return True
1144
1145   disks = ExpandCheckDisks(instance, disks)
1146
1147   if not oneshot:
1148     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1149
1150   node_uuid = instance.primary_node
1151   node_name = lu.cfg.GetNodeName(node_uuid)
1152
1153   for dev in disks:
1154     lu.cfg.SetDiskID(dev, node_uuid)
1155
1156   # TODO: Convert to utils.Retry
1157
1158   retries = 0
1159   degr_retries = 10 # in seconds, as we sleep 1 second each time
1160   while True:
1161     max_time = 0
1162     done = True
1163     cumul_degraded = False
1164     rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1165     msg = rstats.fail_msg
1166     if msg:
1167       lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1168       retries += 1
1169       if retries >= 10:
1170         raise errors.RemoteError("Can't contact node %s for mirror data,"
1171                                  " aborting." % node_name)
1172       time.sleep(6)
1173       continue
1174     rstats = rstats.payload
1175     retries = 0
1176     for i, mstat in enumerate(rstats):
1177       if mstat is None:
1178         lu.LogWarning("Can't compute data for node %s/%s",
1179                       node_name, disks[i].iv_name)
1180         continue
1181
1182       cumul_degraded = (cumul_degraded or
1183                         (mstat.is_degraded and mstat.sync_percent is None))
1184       if mstat.sync_percent is not None:
1185         done = False
1186         if mstat.estimated_time is not None:
1187           rem_time = ("%s remaining (estimated)" %
1188                       utils.FormatSeconds(mstat.estimated_time))
1189           max_time = mstat.estimated_time
1190         else:
1191           rem_time = "no time estimate"
1192         lu.LogInfo("- device %s: %5.2f%% done, %s",
1193                    disks[i].iv_name, mstat.sync_percent, rem_time)
1194
1195     # if we're done but degraded, let's do a few small retries, to
1196     # make sure we see a stable and not transient situation; therefore
1197     # we force restart of the loop
1198     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1199       logging.info("Degraded disks found, %d retries left", degr_retries)
1200       degr_retries -= 1
1201       time.sleep(1)
1202       continue
1203
1204     if done or oneshot:
1205       break
1206
1207     time.sleep(min(60, max_time))
1208
1209   if done:
1210     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1211
1212   return not cumul_degraded
1213
1214
1215 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1216   """Shutdown block devices of an instance.
1217
1218   This does the shutdown on all nodes of the instance.
1219
1220   If the ignore_primary is false, errors on the primary node are
1221   ignored.
1222
1223   """
1224   lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1225   all_result = True
1226   disks = ExpandCheckDisks(instance, disks)
1227
1228   for disk in disks:
1229     for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1230       lu.cfg.SetDiskID(top_disk, node_uuid)
1231       result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1232       msg = result.fail_msg
1233       if msg:
1234         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1235                       disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1236         if ((node_uuid == instance.primary_node and not ignore_primary) or
1237             (node_uuid != instance.primary_node and not result.offline)):
1238           all_result = False
1239   return all_result
1240
1241
1242 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1243   """Shutdown block devices of an instance.
1244
1245   This function checks if an instance is running, before calling
1246   _ShutdownInstanceDisks.
1247
1248   """
1249   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1250   ShutdownInstanceDisks(lu, instance, disks=disks)
1251
1252
1253 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1254                            ignore_size=False):
1255   """Prepare the block devices for an instance.
1256
1257   This sets up the block devices on all nodes.
1258
1259   @type lu: L{LogicalUnit}
1260   @param lu: the logical unit on whose behalf we execute
1261   @type instance: L{objects.Instance}
1262   @param instance: the instance for whose disks we assemble
1263   @type disks: list of L{objects.Disk} or None
1264   @param disks: which disks to assemble (or all, if None)
1265   @type ignore_secondaries: boolean
1266   @param ignore_secondaries: if true, errors on secondary nodes
1267       won't result in an error return from the function
1268   @type ignore_size: boolean
1269   @param ignore_size: if true, the current known size of the disk
1270       will not be used during the disk activation, useful for cases
1271       when the size is wrong
1272   @return: False if the operation failed, otherwise a list of
1273       (host, instance_visible_name, node_visible_name)
1274       with the mapping from node devices to instance devices
1275
1276   """
1277   device_info = []
1278   disks_ok = True
1279   disks = ExpandCheckDisks(instance, disks)
1280
1281   # With the two passes mechanism we try to reduce the window of
1282   # opportunity for the race condition of switching DRBD to primary
1283   # before handshaking occured, but we do not eliminate it
1284
1285   # The proper fix would be to wait (with some limits) until the
1286   # connection has been made and drbd transitions from WFConnection
1287   # into any other network-connected state (Connected, SyncTarget,
1288   # SyncSource, etc.)
1289
1290   # mark instance disks as active before doing actual work, so watcher does
1291   # not try to shut them down erroneously
1292   lu.cfg.MarkInstanceDisksActive(instance.uuid)
1293
1294   # 1st pass, assemble on all nodes in secondary mode
1295   for idx, inst_disk in enumerate(disks):
1296     for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1297                                   instance.primary_node):
1298       if ignore_size:
1299         node_disk = node_disk.Copy()
1300         node_disk.UnsetSize()
1301       lu.cfg.SetDiskID(node_disk, node_uuid)
1302       result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1303                                              instance.name, False, idx)
1304       msg = result.fail_msg
1305       if msg:
1306         is_offline_secondary = (node_uuid in instance.secondary_nodes and
1307                                 result.offline)
1308         lu.LogWarning("Could not prepare block device %s on node %s"
1309                       " (is_primary=False, pass=1): %s",
1310                       inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1311         if not (ignore_secondaries or is_offline_secondary):
1312           disks_ok = False
1313
1314   # FIXME: race condition on drbd migration to primary
1315
1316   # 2nd pass, do only the primary node
1317   for idx, inst_disk in enumerate(disks):
1318     dev_path = None
1319
1320     for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1321                                   instance.primary_node):
1322       if node_uuid != instance.primary_node:
1323         continue
1324       if ignore_size:
1325         node_disk = node_disk.Copy()
1326         node_disk.UnsetSize()
1327       lu.cfg.SetDiskID(node_disk, node_uuid)
1328       result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1329                                              instance.name, True, idx)
1330       msg = result.fail_msg
1331       if msg:
1332         lu.LogWarning("Could not prepare block device %s on node %s"
1333                       " (is_primary=True, pass=2): %s",
1334                       inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1335         disks_ok = False
1336       else:
1337         dev_path = result.payload
1338
1339     device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1340                         inst_disk.iv_name, dev_path))
1341
1342   # leave the disks configured for the primary node
1343   # this is a workaround that would be fixed better by
1344   # improving the logical/physical id handling
1345   for disk in disks:
1346     lu.cfg.SetDiskID(disk, instance.primary_node)
1347
1348   if not disks_ok:
1349     lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1350
1351   return disks_ok, device_info
1352
1353
1354 def StartInstanceDisks(lu, instance, force):
1355   """Start the disks of an instance.
1356
1357   """
1358   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1359                                       ignore_secondaries=force)
1360   if not disks_ok:
1361     ShutdownInstanceDisks(lu, instance)
1362     if force is not None and not force:
1363       lu.LogWarning("",
1364                     hint=("If the message above refers to a secondary node,"
1365                           " you can retry the operation using '--force'"))
1366     raise errors.OpExecError("Disk consistency error")
1367
1368
1369 class LUInstanceGrowDisk(LogicalUnit):
1370   """Grow a disk of an instance.
1371
1372   """
1373   HPATH = "disk-grow"
1374   HTYPE = constants.HTYPE_INSTANCE
1375   REQ_BGL = False
1376
1377   def ExpandNames(self):
1378     self._ExpandAndLockInstance()
1379     self.needed_locks[locking.LEVEL_NODE] = []
1380     self.needed_locks[locking.LEVEL_NODE_RES] = []
1381     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1382     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1383
1384   def DeclareLocks(self, level):
1385     if level == locking.LEVEL_NODE:
1386       self._LockInstancesNodes()
1387     elif level == locking.LEVEL_NODE_RES:
1388       # Copy node locks
1389       self.needed_locks[locking.LEVEL_NODE_RES] = \
1390         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1391
1392   def BuildHooksEnv(self):
1393     """Build hooks env.
1394
1395     This runs on the master, the primary and all the secondaries.
1396
1397     """
1398     env = {
1399       "DISK": self.op.disk,
1400       "AMOUNT": self.op.amount,
1401       "ABSOLUTE": self.op.absolute,
1402       }
1403     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1404     return env
1405
1406   def BuildHooksNodes(self):
1407     """Build hooks nodes.
1408
1409     """
1410     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1411     return (nl, nl)
1412
1413   def CheckPrereq(self):
1414     """Check prerequisites.
1415
1416     This checks that the instance is in the cluster.
1417
1418     """
1419     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1420     assert self.instance is not None, \
1421       "Cannot retrieve locked instance %s" % self.op.instance_name
1422     node_uuids = list(self.instance.all_nodes)
1423     for node_uuid in node_uuids:
1424       CheckNodeOnline(self, node_uuid)
1425     self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1426
1427     if self.instance.disk_template not in constants.DTS_GROWABLE:
1428       raise errors.OpPrereqError("Instance's disk layout does not support"
1429                                  " growing", errors.ECODE_INVAL)
1430
1431     self.disk = self.instance.FindDisk(self.op.disk)
1432
1433     if self.op.absolute:
1434       self.target = self.op.amount
1435       self.delta = self.target - self.disk.size
1436       if self.delta < 0:
1437         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1438                                    "current disk size (%s)" %
1439                                    (utils.FormatUnit(self.target, "h"),
1440                                     utils.FormatUnit(self.disk.size, "h")),
1441                                    errors.ECODE_STATE)
1442     else:
1443       self.delta = self.op.amount
1444       self.target = self.disk.size + self.delta
1445       if self.delta < 0:
1446         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1447                                    utils.FormatUnit(self.delta, "h"),
1448                                    errors.ECODE_INVAL)
1449
1450     self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1451
1452   def _CheckDiskSpace(self, node_uuids, req_vgspace):
1453     template = self.instance.disk_template
1454     if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1455         not any(self.node_es_flags.values())):
1456       # TODO: check the free disk space for file, when that feature will be
1457       # supported
1458       # With exclusive storage we need to do something smarter than just looking
1459       # at free space, which, in the end, is basically a dry run. So we rely on
1460       # the dry run performed in Exec() instead.
1461       CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1462
1463   def Exec(self, feedback_fn):
1464     """Execute disk grow.
1465
1466     """
1467     assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1468     assert (self.owned_locks(locking.LEVEL_NODE) ==
1469             self.owned_locks(locking.LEVEL_NODE_RES))
1470
1471     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1472
1473     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1474     if not disks_ok:
1475       raise errors.OpExecError("Cannot activate block device to grow")
1476
1477     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1478                 (self.op.disk, self.instance.name,
1479                  utils.FormatUnit(self.delta, "h"),
1480                  utils.FormatUnit(self.target, "h")))
1481
1482     # First run all grow ops in dry-run mode
1483     for node_uuid in self.instance.all_nodes:
1484       self.cfg.SetDiskID(self.disk, node_uuid)
1485       result = self.rpc.call_blockdev_grow(node_uuid,
1486                                            (self.disk, self.instance),
1487                                            self.delta, True, True,
1488                                            self.node_es_flags[node_uuid])
1489       result.Raise("Dry-run grow request failed to node %s" %
1490                    self.cfg.GetNodeName(node_uuid))
1491
1492     if wipe_disks:
1493       # Get disk size from primary node for wiping
1494       self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1495       result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1496                                                     [self.disk])
1497       result.Raise("Failed to retrieve disk size from node '%s'" %
1498                    self.instance.primary_node)
1499
1500       (disk_dimensions, ) = result.payload
1501
1502       if disk_dimensions is None:
1503         raise errors.OpExecError("Failed to retrieve disk size from primary"
1504                                  " node '%s'" % self.instance.primary_node)
1505       (disk_size_in_bytes, _) = disk_dimensions
1506
1507       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1508
1509       assert old_disk_size >= self.disk.size, \
1510         ("Retrieved disk size too small (got %s, should be at least %s)" %
1511          (old_disk_size, self.disk.size))
1512     else:
1513       old_disk_size = None
1514
1515     # We know that (as far as we can test) operations across different
1516     # nodes will succeed, time to run it for real on the backing storage
1517     for node_uuid in self.instance.all_nodes:
1518       self.cfg.SetDiskID(self.disk, node_uuid)
1519       result = self.rpc.call_blockdev_grow(node_uuid,
1520                                            (self.disk, self.instance),
1521                                            self.delta, False, True,
1522                                            self.node_es_flags[node_uuid])
1523       result.Raise("Grow request failed to node %s" %
1524                    self.cfg.GetNodeName(node_uuid))
1525
1526     # And now execute it for logical storage, on the primary node
1527     node_uuid = self.instance.primary_node
1528     self.cfg.SetDiskID(self.disk, node_uuid)
1529     result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1530                                          self.delta, False, False,
1531                                          self.node_es_flags[node_uuid])
1532     result.Raise("Grow request failed to node %s" %
1533                  self.cfg.GetNodeName(node_uuid))
1534
1535     self.disk.RecordGrow(self.delta)
1536     self.cfg.Update(self.instance, feedback_fn)
1537
1538     # Changes have been recorded, release node lock
1539     ReleaseLocks(self, locking.LEVEL_NODE)
1540
1541     # Downgrade lock while waiting for sync
1542     self.glm.downgrade(locking.LEVEL_INSTANCE)
1543
1544     assert wipe_disks ^ (old_disk_size is None)
1545
1546     if wipe_disks:
1547       assert self.instance.disks[self.op.disk] == self.disk
1548
1549       # Wipe newly added disk space
1550       WipeDisks(self, self.instance,
1551                 disks=[(self.op.disk, self.disk, old_disk_size)])
1552
1553     if self.op.wait_for_sync:
1554       disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1555       if disk_abort:
1556         self.LogWarning("Disk syncing has not returned a good status; check"
1557                         " the instance")
1558       if not self.instance.disks_active:
1559         _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1560     elif not self.instance.disks_active:
1561       self.LogWarning("Not shutting down the disk even if the instance is"
1562                       " not supposed to be running because no wait for"
1563                       " sync mode was requested")
1564
1565     assert self.owned_locks(locking.LEVEL_NODE_RES)
1566     assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1567
1568
1569 class LUInstanceReplaceDisks(LogicalUnit):
1570   """Replace the disks of an instance.
1571
1572   """
1573   HPATH = "mirrors-replace"
1574   HTYPE = constants.HTYPE_INSTANCE
1575   REQ_BGL = False
1576
1577   def CheckArguments(self):
1578     """Check arguments.
1579
1580     """
1581     if self.op.mode == constants.REPLACE_DISK_CHG:
1582       if self.op.remote_node is None and self.op.iallocator is None:
1583         raise errors.OpPrereqError("When changing the secondary either an"
1584                                    " iallocator script must be used or the"
1585                                    " new node given", errors.ECODE_INVAL)
1586       else:
1587         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1588
1589     elif self.op.remote_node is not None or self.op.iallocator is not None:
1590       # Not replacing the secondary
1591       raise errors.OpPrereqError("The iallocator and new node options can"
1592                                  " only be used when changing the"
1593                                  " secondary node", errors.ECODE_INVAL)
1594
1595   def ExpandNames(self):
1596     self._ExpandAndLockInstance()
1597
1598     assert locking.LEVEL_NODE not in self.needed_locks
1599     assert locking.LEVEL_NODE_RES not in self.needed_locks
1600     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1601
1602     assert self.op.iallocator is None or self.op.remote_node is None, \
1603       "Conflicting options"
1604
1605     if self.op.remote_node is not None:
1606       (self.op.remote_node_uuid, self.op.remote_node) = \
1607         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1608                               self.op.remote_node)
1609
1610       # Warning: do not remove the locking of the new secondary here
1611       # unless DRBD8Dev.AddChildren is changed to work in parallel;
1612       # currently it doesn't since parallel invocations of
1613       # FindUnusedMinor will conflict
1614       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1615       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1616     else:
1617       self.needed_locks[locking.LEVEL_NODE] = []
1618       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1619
1620       if self.op.iallocator is not None:
1621         # iallocator will select a new node in the same group
1622         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1623         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1624
1625     self.needed_locks[locking.LEVEL_NODE_RES] = []
1626
1627     self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1628                                    self.op.instance_name, self.op.mode,
1629                                    self.op.iallocator, self.op.remote_node_uuid,
1630                                    self.op.disks, self.op.early_release,
1631                                    self.op.ignore_ipolicy)
1632
1633     self.tasklets = [self.replacer]
1634
1635   def DeclareLocks(self, level):
1636     if level == locking.LEVEL_NODEGROUP:
1637       assert self.op.remote_node_uuid is None
1638       assert self.op.iallocator is not None
1639       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1640
1641       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1642       # Lock all groups used by instance optimistically; this requires going
1643       # via the node before it's locked, requiring verification later on
1644       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1645         self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1646
1647     elif level == locking.LEVEL_NODE:
1648       if self.op.iallocator is not None:
1649         assert self.op.remote_node_uuid is None
1650         assert not self.needed_locks[locking.LEVEL_NODE]
1651         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1652
1653         # Lock member nodes of all locked groups
1654         self.needed_locks[locking.LEVEL_NODE] = \
1655           [node_uuid
1656            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1657            for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1658       else:
1659         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1660
1661         self._LockInstancesNodes()
1662
1663     elif level == locking.LEVEL_NODE_RES:
1664       # Reuse node locks
1665       self.needed_locks[locking.LEVEL_NODE_RES] = \
1666         self.needed_locks[locking.LEVEL_NODE]
1667
1668   def BuildHooksEnv(self):
1669     """Build hooks env.
1670
1671     This runs on the master, the primary and all the secondaries.
1672
1673     """
1674     instance = self.replacer.instance
1675     env = {
1676       "MODE": self.op.mode,
1677       "NEW_SECONDARY": self.op.remote_node,
1678       "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1679       }
1680     env.update(BuildInstanceHookEnvByObject(self, instance))
1681     return env
1682
1683   def BuildHooksNodes(self):
1684     """Build hooks nodes.
1685
1686     """
1687     instance = self.replacer.instance
1688     nl = [
1689       self.cfg.GetMasterNode(),
1690       instance.primary_node,
1691       ]
1692     if self.op.remote_node_uuid is not None:
1693       nl.append(self.op.remote_node_uuid)
1694     return nl, nl
1695
1696   def CheckPrereq(self):
1697     """Check prerequisites.
1698
1699     """
1700     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1701             self.op.iallocator is None)
1702
1703     # Verify if node group locks are still correct
1704     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1705     if owned_groups:
1706       CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1707
1708     return LogicalUnit.CheckPrereq(self)
1709
1710
1711 class LUInstanceActivateDisks(NoHooksLU):
1712   """Bring up an instance's disks.
1713
1714   """
1715   REQ_BGL = False
1716
1717   def ExpandNames(self):
1718     self._ExpandAndLockInstance()
1719     self.needed_locks[locking.LEVEL_NODE] = []
1720     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1721
1722   def DeclareLocks(self, level):
1723     if level == locking.LEVEL_NODE:
1724       self._LockInstancesNodes()
1725
1726   def CheckPrereq(self):
1727     """Check prerequisites.
1728
1729     This checks that the instance is in the cluster.
1730
1731     """
1732     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1733     assert self.instance is not None, \
1734       "Cannot retrieve locked instance %s" % self.op.instance_name
1735     CheckNodeOnline(self, self.instance.primary_node)
1736
1737   def Exec(self, feedback_fn):
1738     """Activate the disks.
1739
1740     """
1741     disks_ok, disks_info = \
1742               AssembleInstanceDisks(self, self.instance,
1743                                     ignore_size=self.op.ignore_size)
1744     if not disks_ok:
1745       raise errors.OpExecError("Cannot activate block devices")
1746
1747     if self.op.wait_for_sync:
1748       if not WaitForSync(self, self.instance):
1749         self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1750         raise errors.OpExecError("Some disks of the instance are degraded!")
1751
1752     return disks_info
1753
1754
1755 class LUInstanceDeactivateDisks(NoHooksLU):
1756   """Shutdown an instance's disks.
1757
1758   """
1759   REQ_BGL = False
1760
1761   def ExpandNames(self):
1762     self._ExpandAndLockInstance()
1763     self.needed_locks[locking.LEVEL_NODE] = []
1764     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1765
1766   def DeclareLocks(self, level):
1767     if level == locking.LEVEL_NODE:
1768       self._LockInstancesNodes()
1769
1770   def CheckPrereq(self):
1771     """Check prerequisites.
1772
1773     This checks that the instance is in the cluster.
1774
1775     """
1776     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1777     assert self.instance is not None, \
1778       "Cannot retrieve locked instance %s" % self.op.instance_name
1779
1780   def Exec(self, feedback_fn):
1781     """Deactivate the disks
1782
1783     """
1784     if self.op.force:
1785       ShutdownInstanceDisks(self, self.instance)
1786     else:
1787       _SafeShutdownInstanceDisks(self, self.instance)
1788
1789
1790 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1791                                ldisk=False):
1792   """Check that mirrors are not degraded.
1793
1794   @attention: The device has to be annotated already.
1795
1796   The ldisk parameter, if True, will change the test from the
1797   is_degraded attribute (which represents overall non-ok status for
1798   the device(s)) to the ldisk (representing the local storage status).
1799
1800   """
1801   lu.cfg.SetDiskID(dev, node_uuid)
1802
1803   result = True
1804
1805   if on_primary or dev.AssembleOnSecondary():
1806     rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1807     msg = rstats.fail_msg
1808     if msg:
1809       lu.LogWarning("Can't find disk on node %s: %s",
1810                     lu.cfg.GetNodeName(node_uuid), msg)
1811       result = False
1812     elif not rstats.payload:
1813       lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1814       result = False
1815     else:
1816       if ldisk:
1817         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1818       else:
1819         result = result and not rstats.payload.is_degraded
1820
1821   if dev.children:
1822     for child in dev.children:
1823       result = result and _CheckDiskConsistencyInner(lu, instance, child,
1824                                                      node_uuid, on_primary)
1825
1826   return result
1827
1828
1829 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1830   """Wrapper around L{_CheckDiskConsistencyInner}.
1831
1832   """
1833   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1834   return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1835                                     ldisk=ldisk)
1836
1837
1838 def _BlockdevFind(lu, node_uuid, dev, instance):
1839   """Wrapper around call_blockdev_find to annotate diskparams.
1840
1841   @param lu: A reference to the lu object
1842   @param node_uuid: The node to call out
1843   @param dev: The device to find
1844   @param instance: The instance object the device belongs to
1845   @returns The result of the rpc call
1846
1847   """
1848   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1849   return lu.rpc.call_blockdev_find(node_uuid, disk)
1850
1851
1852 def _GenerateUniqueNames(lu, exts):
1853   """Generate a suitable LV name.
1854
1855   This will generate a logical volume name for the given instance.
1856
1857   """
1858   results = []
1859   for val in exts:
1860     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1861     results.append("%s%s" % (new_id, val))
1862   return results
1863
1864
1865 class TLReplaceDisks(Tasklet):
1866   """Replaces disks for an instance.
1867
1868   Note: Locking is not within the scope of this class.
1869
1870   """
1871   def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1872                remote_node_uuid, disks, early_release, ignore_ipolicy):
1873     """Initializes this class.
1874
1875     """
1876     Tasklet.__init__(self, lu)
1877
1878     # Parameters
1879     self.instance_uuid = instance_uuid
1880     self.instance_name = instance_name
1881     self.mode = mode
1882     self.iallocator_name = iallocator_name
1883     self.remote_node_uuid = remote_node_uuid
1884     self.disks = disks
1885     self.early_release = early_release
1886     self.ignore_ipolicy = ignore_ipolicy
1887
1888     # Runtime data
1889     self.instance = None
1890     self.new_node_uuid = None
1891     self.target_node_uuid = None
1892     self.other_node_uuid = None
1893     self.remote_node_info = None
1894     self.node_secondary_ip = None
1895
1896   @staticmethod
1897   def _RunAllocator(lu, iallocator_name, instance_uuid,
1898                     relocate_from_node_uuids):
1899     """Compute a new secondary node using an IAllocator.
1900
1901     """
1902     req = iallocator.IAReqRelocate(
1903           inst_uuid=instance_uuid,
1904           relocate_from_node_uuids=list(relocate_from_node_uuids))
1905     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1906
1907     ial.Run(iallocator_name)
1908
1909     if not ial.success:
1910       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1911                                  " %s" % (iallocator_name, ial.info),
1912                                  errors.ECODE_NORES)
1913
1914     remote_node_name = ial.result[0]
1915     remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1916
1917     if remote_node is None:
1918       raise errors.OpPrereqError("Node %s not found in configuration" %
1919                                  remote_node_name, errors.ECODE_NOENT)
1920
1921     lu.LogInfo("Selected new secondary for instance '%s': %s",
1922                instance_uuid, remote_node_name)
1923
1924     return remote_node.uuid
1925
1926   def _FindFaultyDisks(self, node_uuid):
1927     """Wrapper for L{FindFaultyInstanceDisks}.
1928
1929     """
1930     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1931                                    node_uuid, True)
1932
1933   def _CheckDisksActivated(self, instance):
1934     """Checks if the instance disks are activated.
1935
1936     @param instance: The instance to check disks
1937     @return: True if they are activated, False otherwise
1938
1939     """
1940     node_uuids = instance.all_nodes
1941
1942     for idx, dev in enumerate(instance.disks):
1943       for node_uuid in node_uuids:
1944         self.lu.LogInfo("Checking disk/%d on %s", idx,
1945                         self.cfg.GetNodeName(node_uuid))
1946         self.cfg.SetDiskID(dev, node_uuid)
1947
1948         result = _BlockdevFind(self, node_uuid, dev, instance)
1949
1950         if result.offline:
1951           continue
1952         elif result.fail_msg or not result.payload:
1953           return False
1954
1955     return True
1956
1957   def CheckPrereq(self):
1958     """Check prerequisites.
1959
1960     This checks that the instance is in the cluster.
1961
1962     """
1963     self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1964     assert self.instance is not None, \
1965       "Cannot retrieve locked instance %s" % self.instance_name
1966
1967     if self.instance.disk_template != constants.DT_DRBD8:
1968       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1969                                  " instances", errors.ECODE_INVAL)
1970
1971     if len(self.instance.secondary_nodes) != 1:
1972       raise errors.OpPrereqError("The instance has a strange layout,"
1973                                  " expected one secondary but found %d" %
1974                                  len(self.instance.secondary_nodes),
1975                                  errors.ECODE_FAULT)
1976
1977     secondary_node_uuid = self.instance.secondary_nodes[0]
1978
1979     if self.iallocator_name is None:
1980       remote_node_uuid = self.remote_node_uuid
1981     else:
1982       remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1983                                             self.instance.uuid,
1984                                             self.instance.secondary_nodes)
1985
1986     if remote_node_uuid is None:
1987       self.remote_node_info = None
1988     else:
1989       assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1990              "Remote node '%s' is not locked" % remote_node_uuid
1991
1992       self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1993       assert self.remote_node_info is not None, \
1994         "Cannot retrieve locked node %s" % remote_node_uuid
1995
1996     if remote_node_uuid == self.instance.primary_node:
1997       raise errors.OpPrereqError("The specified node is the primary node of"
1998                                  " the instance", errors.ECODE_INVAL)
1999
2000     if remote_node_uuid == secondary_node_uuid:
2001       raise errors.OpPrereqError("The specified node is already the"
2002                                  " secondary node of the instance",
2003                                  errors.ECODE_INVAL)
2004
2005     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2006                                     constants.REPLACE_DISK_CHG):
2007       raise errors.OpPrereqError("Cannot specify disks to be replaced",
2008                                  errors.ECODE_INVAL)
2009
2010     if self.mode == constants.REPLACE_DISK_AUTO:
2011       if not self._CheckDisksActivated(self.instance):
2012         raise errors.OpPrereqError("Please run activate-disks on instance %s"
2013                                    " first" % self.instance_name,
2014                                    errors.ECODE_STATE)
2015       faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2016       faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2017
2018       if faulty_primary and faulty_secondary:
2019         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2020                                    " one node and can not be repaired"
2021                                    " automatically" % self.instance_name,
2022                                    errors.ECODE_STATE)
2023
2024       if faulty_primary:
2025         self.disks = faulty_primary
2026         self.target_node_uuid = self.instance.primary_node
2027         self.other_node_uuid = secondary_node_uuid
2028         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2029       elif faulty_secondary:
2030         self.disks = faulty_secondary
2031         self.target_node_uuid = secondary_node_uuid
2032         self.other_node_uuid = self.instance.primary_node
2033         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2034       else:
2035         self.disks = []
2036         check_nodes = []
2037
2038     else:
2039       # Non-automatic modes
2040       if self.mode == constants.REPLACE_DISK_PRI:
2041         self.target_node_uuid = self.instance.primary_node
2042         self.other_node_uuid = secondary_node_uuid
2043         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2044
2045       elif self.mode == constants.REPLACE_DISK_SEC:
2046         self.target_node_uuid = secondary_node_uuid
2047         self.other_node_uuid = self.instance.primary_node
2048         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2049
2050       elif self.mode == constants.REPLACE_DISK_CHG:
2051         self.new_node_uuid = remote_node_uuid
2052         self.other_node_uuid = self.instance.primary_node
2053         self.target_node_uuid = secondary_node_uuid
2054         check_nodes = [self.new_node_uuid, self.other_node_uuid]
2055
2056         CheckNodeNotDrained(self.lu, remote_node_uuid)
2057         CheckNodeVmCapable(self.lu, remote_node_uuid)
2058
2059         old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2060         assert old_node_info is not None
2061         if old_node_info.offline and not self.early_release:
2062           # doesn't make sense to delay the release
2063           self.early_release = True
2064           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2065                           " early-release mode", secondary_node_uuid)
2066
2067       else:
2068         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2069                                      self.mode)
2070
2071       # If not specified all disks should be replaced
2072       if not self.disks:
2073         self.disks = range(len(self.instance.disks))
2074
2075     # TODO: This is ugly, but right now we can't distinguish between internal
2076     # submitted opcode and external one. We should fix that.
2077     if self.remote_node_info:
2078       # We change the node, lets verify it still meets instance policy
2079       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2080       cluster = self.cfg.GetClusterInfo()
2081       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2082                                                               new_group_info)
2083       CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2084                              self.remote_node_info, self.cfg,
2085                              ignore=self.ignore_ipolicy)
2086
2087     for node_uuid in check_nodes:
2088       CheckNodeOnline(self.lu, node_uuid)
2089
2090     touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2091                                                           self.other_node_uuid,
2092                                                           self.target_node_uuid]
2093                               if node_uuid is not None)
2094
2095     # Release unneeded node and node resource locks
2096     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2097     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2098     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2099
2100     # Release any owned node group
2101     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2102
2103     # Check whether disks are valid
2104     for disk_idx in self.disks:
2105       self.instance.FindDisk(disk_idx)
2106
2107     # Get secondary node IP addresses
2108     self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2109                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2110
2111   def Exec(self, feedback_fn):
2112     """Execute disk replacement.
2113
2114     This dispatches the disk replacement to the appropriate handler.
2115
2116     """
2117     if __debug__:
2118       # Verify owned locks before starting operation
2119       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2120       assert set(owned_nodes) == set(self.node_secondary_ip), \
2121           ("Incorrect node locks, owning %s, expected %s" %
2122            (owned_nodes, self.node_secondary_ip.keys()))
2123       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2124               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2125       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2126
2127       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2128       assert list(owned_instances) == [self.instance_name], \
2129           "Instance '%s' not locked" % self.instance_name
2130
2131       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2132           "Should not own any node group lock at this point"
2133
2134     if not self.disks:
2135       feedback_fn("No disks need replacement for instance '%s'" %
2136                   self.instance.name)
2137       return
2138
2139     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2140                 (utils.CommaJoin(self.disks), self.instance.name))
2141     feedback_fn("Current primary node: %s" %
2142                 self.cfg.GetNodeName(self.instance.primary_node))
2143     feedback_fn("Current seconary node: %s" %
2144                 utils.CommaJoin(self.cfg.GetNodeNames(
2145                                   self.instance.secondary_nodes)))
2146
2147     activate_disks = not self.instance.disks_active
2148
2149     # Activate the instance disks if we're replacing them on a down instance
2150     if activate_disks:
2151       StartInstanceDisks(self.lu, self.instance, True)
2152
2153     try:
2154       # Should we replace the secondary node?
2155       if self.new_node_uuid is not None:
2156         fn = self._ExecDrbd8Secondary
2157       else:
2158         fn = self._ExecDrbd8DiskOnly
2159
2160       result = fn(feedback_fn)
2161     finally:
2162       # Deactivate the instance disks if we're replacing them on a
2163       # down instance
2164       if activate_disks:
2165         _SafeShutdownInstanceDisks(self.lu, self.instance)
2166
2167     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2168
2169     if __debug__:
2170       # Verify owned locks
2171       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2172       nodes = frozenset(self.node_secondary_ip)
2173       assert ((self.early_release and not owned_nodes) or
2174               (not self.early_release and not (set(owned_nodes) - nodes))), \
2175         ("Not owning the correct locks, early_release=%s, owned=%r,"
2176          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2177
2178     return result
2179
2180   def _CheckVolumeGroup(self, node_uuids):
2181     self.lu.LogInfo("Checking volume groups")
2182
2183     vgname = self.cfg.GetVGName()
2184
2185     # Make sure volume group exists on all involved nodes
2186     results = self.rpc.call_vg_list(node_uuids)
2187     if not results:
2188       raise errors.OpExecError("Can't list volume groups on the nodes")
2189
2190     for node_uuid in node_uuids:
2191       res = results[node_uuid]
2192       res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2193       if vgname not in res.payload:
2194         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2195                                  (vgname, self.cfg.GetNodeName(node_uuid)))
2196
2197   def _CheckDisksExistence(self, node_uuids):
2198     # Check disk existence
2199     for idx, dev in enumerate(self.instance.disks):
2200       if idx not in self.disks:
2201         continue
2202
2203       for node_uuid in node_uuids:
2204         self.lu.LogInfo("Checking disk/%d on %s", idx,
2205                         self.cfg.GetNodeName(node_uuid))
2206         self.cfg.SetDiskID(dev, node_uuid)
2207
2208         result = _BlockdevFind(self, node_uuid, dev, self.instance)
2209
2210         msg = result.fail_msg
2211         if msg or not result.payload:
2212           if not msg:
2213             msg = "disk not found"
2214           if not self._CheckDisksActivated(self.instance):
2215             extra_hint = ("\nDisks seem to be not properly activated. Try"
2216                           " running activate-disks on the instance before"
2217                           " using replace-disks.")
2218           else:
2219             extra_hint = ""
2220           raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2221                                    (idx, self.cfg.GetNodeName(node_uuid), msg,
2222                                     extra_hint))
2223
2224   def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2225     for idx, dev in enumerate(self.instance.disks):
2226       if idx not in self.disks:
2227         continue
2228
2229       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2230                       (idx, self.cfg.GetNodeName(node_uuid)))
2231
2232       if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2233                                   on_primary, ldisk=ldisk):
2234         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2235                                  " replace disks for instance %s" %
2236                                  (self.cfg.GetNodeName(node_uuid),
2237                                   self.instance.name))
2238
2239   def _CreateNewStorage(self, node_uuid):
2240     """Create new storage on the primary or secondary node.
2241
2242     This is only used for same-node replaces, not for changing the
2243     secondary node, hence we don't want to modify the existing disk.
2244
2245     """
2246     iv_names = {}
2247
2248     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2249     for idx, dev in enumerate(disks):
2250       if idx not in self.disks:
2251         continue
2252
2253       self.lu.LogInfo("Adding storage on %s for disk/%d",
2254                       self.cfg.GetNodeName(node_uuid), idx)
2255
2256       self.cfg.SetDiskID(dev, node_uuid)
2257
2258       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2259       names = _GenerateUniqueNames(self.lu, lv_names)
2260
2261       (data_disk, meta_disk) = dev.children
2262       vg_data = data_disk.logical_id[0]
2263       lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2264                              logical_id=(vg_data, names[0]),
2265                              params=data_disk.params)
2266       vg_meta = meta_disk.logical_id[0]
2267       lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2268                              size=constants.DRBD_META_SIZE,
2269                              logical_id=(vg_meta, names[1]),
2270                              params=meta_disk.params)
2271
2272       new_lvs = [lv_data, lv_meta]
2273       old_lvs = [child.Copy() for child in dev.children]
2274       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2275       excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2276
2277       # we pass force_create=True to force the LVM creation
2278       for new_lv in new_lvs:
2279         try:
2280           _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2281                                GetInstanceInfoText(self.instance), False,
2282                                excl_stor)
2283         except errors.DeviceCreationError, e:
2284           raise errors.OpExecError("Can't create block device: %s" % e.message)
2285
2286     return iv_names
2287
2288   def _CheckDevices(self, node_uuid, iv_names):
2289     for name, (dev, _, _) in iv_names.iteritems():
2290       self.cfg.SetDiskID(dev, node_uuid)
2291
2292       result = _BlockdevFind(self, node_uuid, dev, self.instance)
2293
2294       msg = result.fail_msg
2295       if msg or not result.payload:
2296         if not msg:
2297           msg = "disk not found"
2298         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2299                                  (name, msg))
2300
2301       if result.payload.is_degraded:
2302         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2303
2304   def _RemoveOldStorage(self, node_uuid, iv_names):
2305     for name, (_, old_lvs, _) in iv_names.iteritems():
2306       self.lu.LogInfo("Remove logical volumes for %s", name)
2307
2308       for lv in old_lvs:
2309         self.cfg.SetDiskID(lv, node_uuid)
2310
2311         msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2312         if msg:
2313           self.lu.LogWarning("Can't remove old LV: %s", msg,
2314                              hint="remove unused LVs manually")
2315
2316   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2317     """Replace a disk on the primary or secondary for DRBD 8.
2318
2319     The algorithm for replace is quite complicated:
2320
2321       1. for each disk to be replaced:
2322
2323         1. create new LVs on the target node with unique names
2324         1. detach old LVs from the drbd device
2325         1. rename old LVs to name_replaced.<time_t>
2326         1. rename new LVs to old LVs
2327         1. attach the new LVs (with the old names now) to the drbd device
2328
2329       1. wait for sync across all devices
2330
2331       1. for each modified disk:
2332
2333         1. remove old LVs (which have the name name_replaces.<time_t>)
2334
2335     Failures are not very well handled.
2336
2337     """
2338     steps_total = 6
2339
2340     # Step: check device activation
2341     self.lu.LogStep(1, steps_total, "Check device existence")
2342     self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2343     self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2344
2345     # Step: check other node consistency
2346     self.lu.LogStep(2, steps_total, "Check peer consistency")
2347     self._CheckDisksConsistency(
2348       self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2349       False)
2350
2351     # Step: create new storage
2352     self.lu.LogStep(3, steps_total, "Allocate new storage")
2353     iv_names = self._CreateNewStorage(self.target_node_uuid)
2354
2355     # Step: for each lv, detach+rename*2+attach
2356     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2357     for dev, old_lvs, new_lvs in iv_names.itervalues():
2358       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2359
2360       result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2361                                                      old_lvs)
2362       result.Raise("Can't detach drbd from local storage on node"
2363                    " %s for device %s" %
2364                    (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2365       #dev.children = []
2366       #cfg.Update(instance)
2367
2368       # ok, we created the new LVs, so now we know we have the needed
2369       # storage; as such, we proceed on the target node to rename
2370       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2371       # using the assumption that logical_id == physical_id (which in
2372       # turn is the unique_id on that node)
2373
2374       # FIXME(iustin): use a better name for the replaced LVs
2375       temp_suffix = int(time.time())
2376       ren_fn = lambda d, suff: (d.physical_id[0],
2377                                 d.physical_id[1] + "_replaced-%s" % suff)
2378
2379       # Build the rename list based on what LVs exist on the node
2380       rename_old_to_new = []
2381       for to_ren in old_lvs:
2382         result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2383         if not result.fail_msg and result.payload:
2384           # device exists
2385           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2386
2387       self.lu.LogInfo("Renaming the old LVs on the target node")
2388       result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2389                                              rename_old_to_new)
2390       result.Raise("Can't rename old LVs on node %s" %
2391                    self.cfg.GetNodeName(self.target_node_uuid))
2392
2393       # Now we rename the new LVs to the old LVs
2394       self.lu.LogInfo("Renaming the new LVs on the target node")
2395       rename_new_to_old = [(new, old.physical_id)
2396                            for old, new in zip(old_lvs, new_lvs)]
2397       result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2398                                              rename_new_to_old)
2399       result.Raise("Can't rename new LVs on node %s" %
2400                    self.cfg.GetNodeName(self.target_node_uuid))
2401
2402       # Intermediate steps of in memory modifications
2403       for old, new in zip(old_lvs, new_lvs):
2404         new.logical_id = old.logical_id
2405         self.cfg.SetDiskID(new, self.target_node_uuid)
2406
2407       # We need to modify old_lvs so that removal later removes the
2408       # right LVs, not the newly added ones; note that old_lvs is a
2409       # copy here
2410       for disk in old_lvs:
2411         disk.logical_id = ren_fn(disk, temp_suffix)
2412         self.cfg.SetDiskID(disk, self.target_node_uuid)
2413
2414       # Now that the new lvs have the old name, we can add them to the device
2415       self.lu.LogInfo("Adding new mirror component on %s",
2416                       self.cfg.GetNodeName(self.target_node_uuid))
2417       result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2418                                                   (dev, self.instance), new_lvs)
2419       msg = result.fail_msg
2420       if msg:
2421         for new_lv in new_lvs:
2422           msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2423                                                new_lv).fail_msg
2424           if msg2:
2425             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2426                                hint=("cleanup manually the unused logical"
2427                                      "volumes"))
2428         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2429
2430     cstep = itertools.count(5)
2431
2432     if self.early_release:
2433       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2434       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2435       # TODO: Check if releasing locks early still makes sense
2436       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2437     else:
2438       # Release all resource locks except those used by the instance
2439       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2440                    keep=self.node_secondary_ip.keys())
2441
2442     # Release all node locks while waiting for sync
2443     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2444
2445     # TODO: Can the instance lock be downgraded here? Take the optional disk
2446     # shutdown in the caller into consideration.
2447
2448     # Wait for sync
2449     # This can fail as the old devices are degraded and _WaitForSync
2450     # does a combined result over all disks, so we don't check its return value
2451     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2452     WaitForSync(self.lu, self.instance)
2453
2454     # Check all devices manually
2455     self._CheckDevices(self.instance.primary_node, iv_names)
2456
2457     # Step: remove old storage
2458     if not self.early_release:
2459       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2460       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2461
2462   def _ExecDrbd8Secondary(self, feedback_fn):
2463     """Replace the secondary node for DRBD 8.
2464
2465     The algorithm for replace is quite complicated:
2466       - for all disks of the instance:
2467         - create new LVs on the new node with same names
2468         - shutdown the drbd device on the old secondary
2469         - disconnect the drbd network on the primary
2470         - create the drbd device on the new secondary
2471         - network attach the drbd on the primary, using an artifice:
2472           the drbd code for Attach() will connect to the network if it
2473           finds a device which is connected to the good local disks but
2474           not network enabled
2475       - wait for sync across all devices
2476       - remove all disks from the old secondary
2477
2478     Failures are not very well handled.
2479
2480     """
2481     steps_total = 6
2482
2483     pnode = self.instance.primary_node
2484
2485     # Step: check device activation
2486     self.lu.LogStep(1, steps_total, "Check device existence")
2487     self._CheckDisksExistence([self.instance.primary_node])
2488     self._CheckVolumeGroup([self.instance.primary_node])
2489
2490     # Step: check other node consistency
2491     self.lu.LogStep(2, steps_total, "Check peer consistency")
2492     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2493
2494     # Step: create new storage
2495     self.lu.LogStep(3, steps_total, "Allocate new storage")
2496     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2497     excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2498                                                   self.new_node_uuid)
2499     for idx, dev in enumerate(disks):
2500       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2501                       (self.cfg.GetNodeName(self.new_node_uuid), idx))
2502       # we pass force_create=True to force LVM creation
2503       for new_lv in dev.children:
2504         try:
2505           _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2506                                new_lv, True, GetInstanceInfoText(self.instance),
2507                                False, excl_stor)
2508         except errors.DeviceCreationError, e:
2509           raise errors.OpExecError("Can't create block device: %s" % e.message)
2510
2511     # Step 4: dbrd minors and drbd setups changes
2512     # after this, we must manually remove the drbd minors on both the
2513     # error and the success paths
2514     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2515     minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2516                                          for _ in self.instance.disks],
2517                                         self.instance.uuid)
2518     logging.debug("Allocated minors %r", minors)
2519
2520     iv_names = {}
2521     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2522       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2523                       (self.cfg.GetNodeName(self.new_node_uuid), idx))
2524       # create new devices on new_node; note that we create two IDs:
2525       # one without port, so the drbd will be activated without
2526       # networking information on the new node at this stage, and one
2527       # with network, for the latter activation in step 4
2528       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2529       if self.instance.primary_node == o_node1:
2530         p_minor = o_minor1
2531       else:
2532         assert self.instance.primary_node == o_node2, "Three-node instance?"
2533         p_minor = o_minor2
2534
2535       new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2536                       p_minor, new_minor, o_secret)
2537       new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2538                     p_minor, new_minor, o_secret)
2539
2540       iv_names[idx] = (dev, dev.children, new_net_id)
2541       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2542                     new_net_id)
2543       new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2544                               logical_id=new_alone_id,
2545                               children=dev.children,
2546                               size=dev.size,
2547                               params={})
2548       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2549                                             self.cfg)
2550       try:
2551         CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2552                              anno_new_drbd,
2553                              GetInstanceInfoText(self.instance), False,
2554                              excl_stor)
2555       except errors.GenericError:
2556         self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2557         raise
2558
2559     # We have new devices, shutdown the drbd on the old secondary
2560     for idx, dev in enumerate(self.instance.disks):
2561       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2562       self.cfg.SetDiskID(dev, self.target_node_uuid)
2563       msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2564                                             (dev, self.instance)).fail_msg
2565       if msg:
2566         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2567                            "node: %s" % (idx, msg),
2568                            hint=("Please cleanup this device manually as"
2569                                  " soon as possible"))
2570
2571     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2572     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2573                                                self.instance.disks)[pnode]
2574
2575     msg = result.fail_msg
2576     if msg:
2577       # detaches didn't succeed (unlikely)
2578       self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2579       raise errors.OpExecError("Can't detach the disks from the network on"
2580                                " old node: %s" % (msg,))
2581
2582     # if we managed to detach at least one, we update all the disks of
2583     # the instance to point to the new secondary
2584     self.lu.LogInfo("Updating instance configuration")
2585     for dev, _, new_logical_id in iv_names.itervalues():
2586       dev.logical_id = new_logical_id
2587       self.cfg.SetDiskID(dev, self.instance.primary_node)
2588
2589     self.cfg.Update(self.instance, feedback_fn)
2590
2591     # Release all node locks (the configuration has been updated)
2592     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2593
2594     # and now perform the drbd attach
2595     self.lu.LogInfo("Attaching primary drbds to new secondary"
2596                     " (standalone => connected)")
2597     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2598                                             self.new_node_uuid],
2599                                            self.node_secondary_ip,
2600                                            (self.instance.disks, self.instance),
2601                                            self.instance.name,
2602                                            False)
2603     for to_node, to_result in result.items():
2604       msg = to_result.fail_msg
2605       if msg:
2606         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2607                            self.cfg.GetNodeName(to_node), msg,
2608                            hint=("please do a gnt-instance info to see the"
2609                                  " status of disks"))
2610
2611     cstep = itertools.count(5)
2612
2613     if self.early_release:
2614       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2615       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2616       # TODO: Check if releasing locks early still makes sense
2617       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2618     else:
2619       # Release all resource locks except those used by the instance
2620       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2621                    keep=self.node_secondary_ip.keys())
2622
2623     # TODO: Can the instance lock be downgraded here? Take the optional disk
2624     # shutdown in the caller into consideration.
2625
2626     # Wait for sync
2627     # This can fail as the old devices are degraded and _WaitForSync
2628     # does a combined result over all disks, so we don't check its return value
2629     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2630     WaitForSync(self.lu, self.instance)
2631
2632     # Check all devices manually
2633     self._CheckDevices(self.instance.primary_node, iv_names)
2634
2635     # Step: remove old storage
2636     if not self.early_release:
2637       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2638       self._RemoveOldStorage(self.target_node_uuid, iv_names)