gnt-cluster modify: factor out ipolicy check
[ganeti-local] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import rpc
38 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40   AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41   CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42   IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43   CheckDiskTemplateEnabled
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45   CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46   BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52   constants.DT_PLAIN: "",
53   constants.DT_RBD: ".rbd",
54   constants.DT_EXT: ".ext",
55   }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59   constants.DT_PLAIN: constants.LD_LV,
60   constants.DT_FILE: constants.LD_FILE,
61   constants.DT_SHARED_FILE: constants.LD_FILE,
62   constants.DT_BLOCK: constants.LD_BLOCKDEV,
63   constants.DT_RBD: constants.LD_RBD,
64   constants.DT_EXT: constants.LD_EXT,
65   }
66
67
68 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
69                          excl_stor):
70   """Create a single block device on a given node.
71
72   This will not recurse over children of the device, so they must be
73   created in advance.
74
75   @param lu: the lu on whose behalf we execute
76   @param node_uuid: the node on which to create the device
77   @type instance: L{objects.Instance}
78   @param instance: the instance which owns the device
79   @type device: L{objects.Disk}
80   @param device: the device to create
81   @param info: the extra 'metadata' we should attach to the device
82       (this will be represented as a LVM tag)
83   @type force_open: boolean
84   @param force_open: this parameter will be passes to the
85       L{backend.BlockdevCreate} function where it specifies
86       whether we run on primary or not, and it affects both
87       the child assembly and the device own Open() execution
88   @type excl_stor: boolean
89   @param excl_stor: Whether exclusive_storage is active for the node
90
91   """
92   lu.cfg.SetDiskID(device, node_uuid)
93   result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
94                                        instance.name, force_open, info,
95                                        excl_stor)
96   result.Raise("Can't create block device %s on"
97                " node %s for instance %s" % (device,
98                                              lu.cfg.GetNodeName(node_uuid),
99                                              instance.name))
100   if device.physical_id is None:
101     device.physical_id = result.payload
102
103
104 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
105                          info, force_open, excl_stor):
106   """Create a tree of block devices on a given node.
107
108   If this device type has to be created on secondaries, create it and
109   all its children.
110
111   If not, just recurse to children keeping the same 'force' value.
112
113   @attention: The device has to be annotated already.
114
115   @param lu: the lu on whose behalf we execute
116   @param node_uuid: the node on which to create the device
117   @type instance: L{objects.Instance}
118   @param instance: the instance which owns the device
119   @type device: L{objects.Disk}
120   @param device: the device to create
121   @type force_create: boolean
122   @param force_create: whether to force creation of this device; this
123       will be change to True whenever we find a device which has
124       CreateOnSecondary() attribute
125   @param info: the extra 'metadata' we should attach to the device
126       (this will be represented as a LVM tag)
127   @type force_open: boolean
128   @param force_open: this parameter will be passes to the
129       L{backend.BlockdevCreate} function where it specifies
130       whether we run on primary or not, and it affects both
131       the child assembly and the device own Open() execution
132   @type excl_stor: boolean
133   @param excl_stor: Whether exclusive_storage is active for the node
134
135   @return: list of created devices
136   """
137   created_devices = []
138   try:
139     if device.CreateOnSecondary():
140       force_create = True
141
142     if device.children:
143       for child in device.children:
144         devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
145                                     force_create, info, force_open, excl_stor)
146         created_devices.extend(devs)
147
148     if not force_create:
149       return created_devices
150
151     CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
152                          excl_stor)
153     # The device has been completely created, so there is no point in keeping
154     # its subdevices in the list. We just add the device itself instead.
155     created_devices = [(node_uuid, device)]
156     return created_devices
157
158   except errors.DeviceCreationError, e:
159     e.created_devices.extend(created_devices)
160     raise e
161   except errors.OpExecError, e:
162     raise errors.DeviceCreationError(str(e), created_devices)
163
164
165 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
166   """Whether exclusive_storage is in effect for the given node.
167
168   @type cfg: L{config.ConfigWriter}
169   @param cfg: The cluster configuration
170   @type node_uuid: string
171   @param node_uuid: The node UUID
172   @rtype: bool
173   @return: The effective value of exclusive_storage
174   @raise errors.OpPrereqError: if no node exists with the given name
175
176   """
177   ni = cfg.GetNodeInfo(node_uuid)
178   if ni is None:
179     raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
180                                errors.ECODE_NOENT)
181   return IsExclusiveStorageEnabledNode(cfg, ni)
182
183
184 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
185                     force_open):
186   """Wrapper around L{_CreateBlockDevInner}.
187
188   This method annotates the root device first.
189
190   """
191   (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
192   excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
193   return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
194                               force_open, excl_stor)
195
196
197 def _UndoCreateDisks(lu, disks_created):
198   """Undo the work performed by L{CreateDisks}.
199
200   This function is called in case of an error to undo the work of
201   L{CreateDisks}.
202
203   @type lu: L{LogicalUnit}
204   @param lu: the logical unit on whose behalf we execute
205   @param disks_created: the result returned by L{CreateDisks}
206
207   """
208   for (node_uuid, disk) in disks_created:
209     lu.cfg.SetDiskID(disk, node_uuid)
210     result = lu.rpc.call_blockdev_remove(node_uuid, disk)
211     result.Warn("Failed to remove newly-created disk %s on node %s" %
212                 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
213
214
215 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
216   """Create all disks for an instance.
217
218   This abstracts away some work from AddInstance.
219
220   @type lu: L{LogicalUnit}
221   @param lu: the logical unit on whose behalf we execute
222   @type instance: L{objects.Instance}
223   @param instance: the instance whose disks we should create
224   @type to_skip: list
225   @param to_skip: list of indices to skip
226   @type target_node_uuid: string
227   @param target_node_uuid: if passed, overrides the target node for creation
228   @type disks: list of {objects.Disk}
229   @param disks: the disks to create; if not specified, all the disks of the
230       instance are created
231   @return: information about the created disks, to be used to call
232       L{_UndoCreateDisks}
233   @raise errors.OpPrereqError: in case of error
234
235   """
236   info = GetInstanceInfoText(instance)
237   if target_node_uuid is None:
238     pnode_uuid = instance.primary_node
239     all_node_uuids = instance.all_nodes
240   else:
241     pnode_uuid = target_node_uuid
242     all_node_uuids = [pnode_uuid]
243
244   if disks is None:
245     disks = instance.disks
246
247   CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
248
249   if instance.disk_template in constants.DTS_FILEBASED:
250     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
251     result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
252
253     result.Raise("Failed to create directory '%s' on"
254                  " node %s" % (file_storage_dir,
255                                lu.cfg.GetNodeName(pnode_uuid)))
256
257   disks_created = []
258   for idx, device in enumerate(disks):
259     if to_skip and idx in to_skip:
260       continue
261     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
262     for node_uuid in all_node_uuids:
263       f_create = node_uuid == pnode_uuid
264       try:
265         _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
266                         f_create)
267         disks_created.append((node_uuid, device))
268       except errors.DeviceCreationError, e:
269         logging.warning("Creating disk %s for instance '%s' failed",
270                         idx, instance.name)
271         disks_created.extend(e.created_devices)
272         _UndoCreateDisks(lu, disks_created)
273         raise errors.OpExecError(e.message)
274   return disks_created
275
276
277 def ComputeDiskSizePerVG(disk_template, disks):
278   """Compute disk size requirements in the volume group
279
280   """
281   def _compute(disks, payload):
282     """Universal algorithm.
283
284     """
285     vgs = {}
286     for disk in disks:
287       vgs[disk[constants.IDISK_VG]] = \
288         vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
289
290     return vgs
291
292   # Required free disk space as a function of disk and swap space
293   req_size_dict = {
294     constants.DT_DISKLESS: {},
295     constants.DT_PLAIN: _compute(disks, 0),
296     # 128 MB are added for drbd metadata for each disk
297     constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
298     constants.DT_FILE: {},
299     constants.DT_SHARED_FILE: {},
300     }
301
302   if disk_template not in req_size_dict:
303     raise errors.ProgrammerError("Disk template '%s' size requirement"
304                                  " is unknown" % disk_template)
305
306   return req_size_dict[disk_template]
307
308
309 def ComputeDisks(op, default_vg):
310   """Computes the instance disks.
311
312   @param op: The instance opcode
313   @param default_vg: The default_vg to assume
314
315   @return: The computed disks
316
317   """
318   disks = []
319   for disk in op.disks:
320     mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
321     if mode not in constants.DISK_ACCESS_SET:
322       raise errors.OpPrereqError("Invalid disk access mode '%s'" %
323                                  mode, errors.ECODE_INVAL)
324     size = disk.get(constants.IDISK_SIZE, None)
325     if size is None:
326       raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
327     try:
328       size = int(size)
329     except (TypeError, ValueError):
330       raise errors.OpPrereqError("Invalid disk size '%s'" % size,
331                                  errors.ECODE_INVAL)
332
333     ext_provider = disk.get(constants.IDISK_PROVIDER, None)
334     if ext_provider and op.disk_template != constants.DT_EXT:
335       raise errors.OpPrereqError("The '%s' option is only valid for the %s"
336                                  " disk template, not %s" %
337                                  (constants.IDISK_PROVIDER, constants.DT_EXT,
338                                   op.disk_template), errors.ECODE_INVAL)
339
340     data_vg = disk.get(constants.IDISK_VG, default_vg)
341     name = disk.get(constants.IDISK_NAME, None)
342     if name is not None and name.lower() == constants.VALUE_NONE:
343       name = None
344     new_disk = {
345       constants.IDISK_SIZE: size,
346       constants.IDISK_MODE: mode,
347       constants.IDISK_VG: data_vg,
348       constants.IDISK_NAME: name,
349       }
350
351     for key in [
352       constants.IDISK_METAVG,
353       constants.IDISK_ADOPT,
354       constants.IDISK_SPINDLES,
355       ]:
356       if key in disk:
357         new_disk[key] = disk[key]
358
359     # For extstorage, demand the `provider' option and add any
360     # additional parameters (ext-params) to the dict
361     if op.disk_template == constants.DT_EXT:
362       if ext_provider:
363         new_disk[constants.IDISK_PROVIDER] = ext_provider
364         for key in disk:
365           if key not in constants.IDISK_PARAMS:
366             new_disk[key] = disk[key]
367       else:
368         raise errors.OpPrereqError("Missing provider for template '%s'" %
369                                    constants.DT_EXT, errors.ECODE_INVAL)
370
371     disks.append(new_disk)
372
373   return disks
374
375
376 def CheckRADOSFreeSpace():
377   """Compute disk size requirements inside the RADOS cluster.
378
379   """
380   # For the RADOS cluster we assume there is always enough space.
381   pass
382
383
384 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
385                          iv_name, p_minor, s_minor):
386   """Generate a drbd8 device complete with its children.
387
388   """
389   assert len(vgnames) == len(names) == 2
390   port = lu.cfg.AllocatePort()
391   shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
392
393   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
394                           logical_id=(vgnames[0], names[0]),
395                           params={})
396   dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397   dev_meta = objects.Disk(dev_type=constants.LD_LV,
398                           size=constants.DRBD_META_SIZE,
399                           logical_id=(vgnames[1], names[1]),
400                           params={})
401   dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
402   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
403                           logical_id=(primary_uuid, secondary_uuid, port,
404                                       p_minor, s_minor,
405                                       shared_secret),
406                           children=[dev_data, dev_meta],
407                           iv_name=iv_name, params={})
408   drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
409   return drbd_dev
410
411
412 def GenerateDiskTemplate(
413   lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
414   disk_info, file_storage_dir, file_driver, base_index,
415   feedback_fn, full_disk_params):
416   """Generate the entire disk layout for a given template type.
417
418   """
419   vgname = lu.cfg.GetVGName()
420   disk_count = len(disk_info)
421   disks = []
422
423   CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
424
425   if template_name == constants.DT_DISKLESS:
426     pass
427   elif template_name == constants.DT_DRBD8:
428     if len(secondary_node_uuids) != 1:
429       raise errors.ProgrammerError("Wrong template configuration")
430     remote_node_uuid = secondary_node_uuids[0]
431     minors = lu.cfg.AllocateDRBDMinor(
432       [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
433
434     (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
435                                                        full_disk_params)
436     drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
437
438     names = []
439     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
440                                                for i in range(disk_count)]):
441       names.append(lv_prefix + "_data")
442       names.append(lv_prefix + "_meta")
443     for idx, disk in enumerate(disk_info):
444       disk_index = idx + base_index
445       data_vg = disk.get(constants.IDISK_VG, vgname)
446       meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
447       disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
448                                       disk[constants.IDISK_SIZE],
449                                       [data_vg, meta_vg],
450                                       names[idx * 2:idx * 2 + 2],
451                                       "disk/%d" % disk_index,
452                                       minors[idx * 2], minors[idx * 2 + 1])
453       disk_dev.mode = disk[constants.IDISK_MODE]
454       disk_dev.name = disk.get(constants.IDISK_NAME, None)
455       disks.append(disk_dev)
456   else:
457     if secondary_node_uuids:
458       raise errors.ProgrammerError("Wrong template configuration")
459
460     name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
461     if name_prefix is None:
462       names = None
463     else:
464       names = _GenerateUniqueNames(lu, ["%s.disk%s" %
465                                         (name_prefix, base_index + i)
466                                         for i in range(disk_count)])
467
468     if template_name == constants.DT_PLAIN:
469
470       def logical_id_fn(idx, _, disk):
471         vg = disk.get(constants.IDISK_VG, vgname)
472         return (vg, names[idx])
473
474     elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
475       logical_id_fn = \
476         lambda _, disk_index, disk: (file_driver,
477                                      "%s/disk%d" % (file_storage_dir,
478                                                     disk_index))
479     elif template_name == constants.DT_BLOCK:
480       logical_id_fn = \
481         lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
482                                        disk[constants.IDISK_ADOPT])
483     elif template_name == constants.DT_RBD:
484       logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
485     elif template_name == constants.DT_EXT:
486       def logical_id_fn(idx, _, disk):
487         provider = disk.get(constants.IDISK_PROVIDER, None)
488         if provider is None:
489           raise errors.ProgrammerError("Disk template is %s, but '%s' is"
490                                        " not found", constants.DT_EXT,
491                                        constants.IDISK_PROVIDER)
492         return (provider, names[idx])
493     else:
494       raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
495
496     dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
497
498     for idx, disk in enumerate(disk_info):
499       params = {}
500       # Only for the Ext template add disk_info to params
501       if template_name == constants.DT_EXT:
502         params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
503         for key in disk:
504           if key not in constants.IDISK_PARAMS:
505             params[key] = disk[key]
506       disk_index = idx + base_index
507       size = disk[constants.IDISK_SIZE]
508       feedback_fn("* disk %s, size %s" %
509                   (disk_index, utils.FormatUnit(size, "h")))
510       disk_dev = objects.Disk(dev_type=dev_type, size=size,
511                               logical_id=logical_id_fn(idx, disk_index, disk),
512                               iv_name="disk/%d" % disk_index,
513                               mode=disk[constants.IDISK_MODE],
514                               params=params,
515                               spindles=disk.get(constants.IDISK_SPINDLES))
516       disk_dev.name = disk.get(constants.IDISK_NAME, None)
517       disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
518       disks.append(disk_dev)
519
520   return disks
521
522
523 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
524   """Check the presence of the spindle options with exclusive_storage.
525
526   @type diskdict: dict
527   @param diskdict: disk parameters
528   @type es_flag: bool
529   @param es_flag: the effective value of the exlusive_storage flag
530   @type required: bool
531   @param required: whether spindles are required or just optional
532   @raise errors.OpPrereqError when spindles are given and they should not
533
534   """
535   if (not es_flag and constants.IDISK_SPINDLES in diskdict and
536       diskdict[constants.IDISK_SPINDLES] is not None):
537     raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
538                                " when exclusive storage is not active",
539                                errors.ECODE_INVAL)
540   if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
541                                 diskdict[constants.IDISK_SPINDLES] is None)):
542     raise errors.OpPrereqError("You must specify spindles in instance disks"
543                                " when exclusive storage is active",
544                                errors.ECODE_INVAL)
545
546
547 class LUInstanceRecreateDisks(LogicalUnit):
548   """Recreate an instance's missing disks.
549
550   """
551   HPATH = "instance-recreate-disks"
552   HTYPE = constants.HTYPE_INSTANCE
553   REQ_BGL = False
554
555   _MODIFYABLE = compat.UniqueFrozenset([
556     constants.IDISK_SIZE,
557     constants.IDISK_MODE,
558     constants.IDISK_SPINDLES,
559     ])
560
561   # New or changed disk parameters may have different semantics
562   assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
563     constants.IDISK_ADOPT,
564
565     # TODO: Implement support changing VG while recreating
566     constants.IDISK_VG,
567     constants.IDISK_METAVG,
568     constants.IDISK_PROVIDER,
569     constants.IDISK_NAME,
570     ]))
571
572   def _RunAllocator(self):
573     """Run the allocator based on input opcode.
574
575     """
576     be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
577
578     # FIXME
579     # The allocator should actually run in "relocate" mode, but current
580     # allocators don't support relocating all the nodes of an instance at
581     # the same time. As a workaround we use "allocate" mode, but this is
582     # suboptimal for two reasons:
583     # - The instance name passed to the allocator is present in the list of
584     #   existing instances, so there could be a conflict within the
585     #   internal structures of the allocator. This doesn't happen with the
586     #   current allocators, but it's a liability.
587     # - The allocator counts the resources used by the instance twice: once
588     #   because the instance exists already, and once because it tries to
589     #   allocate a new instance.
590     # The allocator could choose some of the nodes on which the instance is
591     # running, but that's not a problem. If the instance nodes are broken,
592     # they should be already be marked as drained or offline, and hence
593     # skipped by the allocator. If instance disks have been lost for other
594     # reasons, then recreating the disks on the same nodes should be fine.
595     disk_template = self.instance.disk_template
596     spindle_use = be_full[constants.BE_SPINDLE_USE]
597     disks = [{
598       constants.IDISK_SIZE: d.size,
599       constants.IDISK_MODE: d.mode,
600       constants.IDISK_SPINDLES: d.spindles,
601       } for d in self.instance.disks]
602     req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
603                                         disk_template=disk_template,
604                                         tags=list(self.instance.GetTags()),
605                                         os=self.instance.os,
606                                         nics=[{}],
607                                         vcpus=be_full[constants.BE_VCPUS],
608                                         memory=be_full[constants.BE_MAXMEM],
609                                         spindle_use=spindle_use,
610                                         disks=disks,
611                                         hypervisor=self.instance.hypervisor,
612                                         node_whitelist=None)
613     ial = iallocator.IAllocator(self.cfg, self.rpc, req)
614
615     ial.Run(self.op.iallocator)
616
617     assert req.RequiredNodes() == len(self.instance.all_nodes)
618
619     if not ial.success:
620       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
621                                  " %s" % (self.op.iallocator, ial.info),
622                                  errors.ECODE_NORES)
623
624     (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
625     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
626                  self.op.instance_name, self.op.iallocator,
627                  utils.CommaJoin(self.op.nodes))
628
629   def CheckArguments(self):
630     if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
631       # Normalize and convert deprecated list of disk indices
632       self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
633
634     duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
635     if duplicates:
636       raise errors.OpPrereqError("Some disks have been specified more than"
637                                  " once: %s" % utils.CommaJoin(duplicates),
638                                  errors.ECODE_INVAL)
639
640     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
641     # when neither iallocator nor nodes are specified
642     if self.op.iallocator or self.op.nodes:
643       CheckIAllocatorOrNode(self, "iallocator", "nodes")
644
645     for (idx, params) in self.op.disks:
646       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
647       unsupported = frozenset(params.keys()) - self._MODIFYABLE
648       if unsupported:
649         raise errors.OpPrereqError("Parameters for disk %s try to change"
650                                    " unmodifyable parameter(s): %s" %
651                                    (idx, utils.CommaJoin(unsupported)),
652                                    errors.ECODE_INVAL)
653
654   def ExpandNames(self):
655     self._ExpandAndLockInstance()
656     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
657
658     if self.op.nodes:
659       (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
660       self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
661     else:
662       self.needed_locks[locking.LEVEL_NODE] = []
663       if self.op.iallocator:
664         # iallocator will select a new node in the same group
665         self.needed_locks[locking.LEVEL_NODEGROUP] = []
666         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
667
668     self.needed_locks[locking.LEVEL_NODE_RES] = []
669
670   def DeclareLocks(self, level):
671     if level == locking.LEVEL_NODEGROUP:
672       assert self.op.iallocator is not None
673       assert not self.op.nodes
674       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
675       self.share_locks[locking.LEVEL_NODEGROUP] = 1
676       # Lock the primary group used by the instance optimistically; this
677       # requires going via the node before it's locked, requiring
678       # verification later on
679       self.needed_locks[locking.LEVEL_NODEGROUP] = \
680         self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
681
682     elif level == locking.LEVEL_NODE:
683       # If an allocator is used, then we lock all the nodes in the current
684       # instance group, as we don't know yet which ones will be selected;
685       # if we replace the nodes without using an allocator, locks are
686       # already declared in ExpandNames; otherwise, we need to lock all the
687       # instance nodes for disk re-creation
688       if self.op.iallocator:
689         assert not self.op.nodes
690         assert not self.needed_locks[locking.LEVEL_NODE]
691         assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
692
693         # Lock member nodes of the group of the primary node
694         for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
695           self.needed_locks[locking.LEVEL_NODE].extend(
696             self.cfg.GetNodeGroup(group_uuid).members)
697
698         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
699       elif not self.op.nodes:
700         self._LockInstancesNodes(primary_only=False)
701     elif level == locking.LEVEL_NODE_RES:
702       # Copy node locks
703       self.needed_locks[locking.LEVEL_NODE_RES] = \
704         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
705
706   def BuildHooksEnv(self):
707     """Build hooks env.
708
709     This runs on master, primary and secondary nodes of the instance.
710
711     """
712     return BuildInstanceHookEnvByObject(self, self.instance)
713
714   def BuildHooksNodes(self):
715     """Build hooks nodes.
716
717     """
718     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
719     return (nl, nl)
720
721   def CheckPrereq(self):
722     """Check prerequisites.
723
724     This checks that the instance is in the cluster and is not running.
725
726     """
727     instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
728     assert instance is not None, \
729       "Cannot retrieve locked instance %s" % self.op.instance_name
730     if self.op.node_uuids:
731       if len(self.op.node_uuids) != len(instance.all_nodes):
732         raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
733                                    " %d replacement nodes were specified" %
734                                    (instance.name, len(instance.all_nodes),
735                                     len(self.op.node_uuids)),
736                                    errors.ECODE_INVAL)
737       assert instance.disk_template != constants.DT_DRBD8 or \
738              len(self.op.node_uuids) == 2
739       assert instance.disk_template != constants.DT_PLAIN or \
740              len(self.op.node_uuids) == 1
741       primary_node = self.op.node_uuids[0]
742     else:
743       primary_node = instance.primary_node
744     if not self.op.iallocator:
745       CheckNodeOnline(self, primary_node)
746
747     if instance.disk_template == constants.DT_DISKLESS:
748       raise errors.OpPrereqError("Instance '%s' has no disks" %
749                                  self.op.instance_name, errors.ECODE_INVAL)
750
751     # Verify if node group locks are still correct
752     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
753     if owned_groups:
754       # Node group locks are acquired only for the primary node (and only
755       # when the allocator is used)
756       CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
757                               primary_only=True)
758
759     # if we replace nodes *and* the old primary is offline, we don't
760     # check the instance state
761     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
762     if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
763       CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
764                          msg="cannot recreate disks")
765
766     if self.op.disks:
767       self.disks = dict(self.op.disks)
768     else:
769       self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
770
771     maxidx = max(self.disks.keys())
772     if maxidx >= len(instance.disks):
773       raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
774                                  errors.ECODE_INVAL)
775
776     if ((self.op.node_uuids or self.op.iallocator) and
777          sorted(self.disks.keys()) != range(len(instance.disks))):
778       raise errors.OpPrereqError("Can't recreate disks partially and"
779                                  " change the nodes at the same time",
780                                  errors.ECODE_INVAL)
781
782     self.instance = instance
783
784     if self.op.iallocator:
785       self._RunAllocator()
786       # Release unneeded node and node resource locks
787       ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
788       ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
789       ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
790
791     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
792
793     if self.op.node_uuids:
794       node_uuids = self.op.node_uuids
795     else:
796       node_uuids = instance.all_nodes
797     excl_stor = compat.any(
798       rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
799       )
800     for new_params in self.disks.values():
801       CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
802
803   def Exec(self, feedback_fn):
804     """Recreate the disks.
805
806     """
807     assert (self.owned_locks(locking.LEVEL_NODE) ==
808             self.owned_locks(locking.LEVEL_NODE_RES))
809
810     to_skip = []
811     mods = [] # keeps track of needed changes
812
813     for idx, disk in enumerate(self.instance.disks):
814       try:
815         changes = self.disks[idx]
816       except KeyError:
817         # Disk should not be recreated
818         to_skip.append(idx)
819         continue
820
821       # update secondaries for disks, if needed
822       if self.op.node_uuids and disk.dev_type == constants.LD_DRBD8:
823         # need to update the nodes and minors
824         assert len(self.op.node_uuids) == 2
825         assert len(disk.logical_id) == 6 # otherwise disk internals
826                                          # have changed
827         (_, _, old_port, _, _, old_secret) = disk.logical_id
828         new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
829                                                 self.instance.uuid)
830         new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
831                   new_minors[0], new_minors[1], old_secret)
832         assert len(disk.logical_id) == len(new_id)
833       else:
834         new_id = None
835
836       mods.append((idx, new_id, changes))
837
838     # now that we have passed all asserts above, we can apply the mods
839     # in a single run (to avoid partial changes)
840     for idx, new_id, changes in mods:
841       disk = self.instance.disks[idx]
842       if new_id is not None:
843         assert disk.dev_type == constants.LD_DRBD8
844         disk.logical_id = new_id
845       if changes:
846         disk.Update(size=changes.get(constants.IDISK_SIZE, None),
847                     mode=changes.get(constants.IDISK_MODE, None),
848                     spindles=changes.get(constants.IDISK_SPINDLES, None))
849
850     # change primary node, if needed
851     if self.op.node_uuids:
852       self.instance.primary_node = self.op.node_uuids[0]
853       self.LogWarning("Changing the instance's nodes, you will have to"
854                       " remove any disks left on the older nodes manually")
855
856     if self.op.node_uuids:
857       self.cfg.Update(self.instance, feedback_fn)
858
859     # All touched nodes must be locked
860     mylocks = self.owned_locks(locking.LEVEL_NODE)
861     assert mylocks.issuperset(frozenset(self.instance.all_nodes))
862     new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
863
864     # TODO: Release node locks before wiping, or explain why it's not possible
865     if self.cfg.GetClusterInfo().prealloc_wipe_disks:
866       wipedisks = [(idx, disk, 0)
867                    for (idx, disk) in enumerate(self.instance.disks)
868                    if idx not in to_skip]
869       WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
870                          cleanup=new_disks)
871
872
873 def _PerformNodeInfoCall(lu, node_uuids, vg):
874   """Prepares the input and performs a node info call.
875
876   @type lu: C{LogicalUnit}
877   @param lu: a logical unit from which we get configuration data
878   @type node_uuids: list of string
879   @param node_uuids: list of node UUIDs to perform the call for
880   @type vg: string
881   @param vg: the volume group's name
882
883   """
884   lvm_storage_units = [(constants.ST_LVM_VG, vg)]
885   storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
886                                                   node_uuids)
887   hvname = lu.cfg.GetHypervisorType()
888   hvparams = lu.cfg.GetClusterInfo().hvparams
889   nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
890                                    [(hvname, hvparams[hvname])])
891   return nodeinfo
892
893
894 def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
895   """Checks the vg capacity for a given node.
896
897   @type node_info: tuple (_, list of dicts, _)
898   @param node_info: the result of the node info call for one node
899   @type node_name: string
900   @param node_name: the name of the node
901   @type vg: string
902   @param vg: volume group name
903   @type requested: int
904   @param requested: the amount of disk in MiB to check for
905   @raise errors.OpPrereqError: if the node doesn't have enough disk,
906       or we cannot check the node
907
908   """
909   (_, space_info, _) = node_info
910   lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
911       space_info, constants.ST_LVM_VG)
912   if not lvm_vg_info:
913     raise errors.OpPrereqError("Can't retrieve storage information for LVM")
914   vg_free = lvm_vg_info.get("storage_free", None)
915   if not isinstance(vg_free, int):
916     raise errors.OpPrereqError("Can't compute free disk space on node"
917                                " %s for vg %s, result was '%s'" %
918                                (node_name, vg, vg_free), errors.ECODE_ENVIRON)
919   if requested > vg_free:
920     raise errors.OpPrereqError("Not enough disk space on target node %s"
921                                " vg %s: required %d MiB, available %d MiB" %
922                                (node_name, vg, requested, vg_free),
923                                errors.ECODE_NORES)
924
925
926 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
927   """Checks if nodes have enough free disk space in the specified VG.
928
929   This function checks if all given nodes have the needed amount of
930   free disk. In case any node has less disk or we cannot get the
931   information from the node, this function raises an OpPrereqError
932   exception.
933
934   @type lu: C{LogicalUnit}
935   @param lu: a logical unit from which we get configuration data
936   @type node_uuids: C{list}
937   @param node_uuids: the list of node UUIDs to check
938   @type vg: C{str}
939   @param vg: the volume group to check
940   @type requested: C{int}
941   @param requested: the amount of disk in MiB to check for
942   @raise errors.OpPrereqError: if the node doesn't have enough disk,
943       or we cannot check the node
944
945   """
946   nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
947   for node in node_uuids:
948     node_name = lu.cfg.GetNodeName(node)
949     info = nodeinfo[node]
950     info.Raise("Cannot get current information from node %s" % node_name,
951                prereq=True, ecode=errors.ECODE_ENVIRON)
952     _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
953
954
955 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
956   """Checks if nodes have enough free disk space in all the VGs.
957
958   This function checks if all given nodes have the needed amount of
959   free disk. In case any node has less disk or we cannot get the
960   information from the node, this function raises an OpPrereqError
961   exception.
962
963   @type lu: C{LogicalUnit}
964   @param lu: a logical unit from which we get configuration data
965   @type node_uuids: C{list}
966   @param node_uuids: the list of node UUIDs to check
967   @type req_sizes: C{dict}
968   @param req_sizes: the hash of vg and corresponding amount of disk in
969       MiB to check for
970   @raise errors.OpPrereqError: if the node doesn't have enough disk,
971       or we cannot check the node
972
973   """
974   for vg, req_size in req_sizes.items():
975     _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
976
977
978 def _DiskSizeInBytesToMebibytes(lu, size):
979   """Converts a disk size in bytes to mebibytes.
980
981   Warns and rounds up if the size isn't an even multiple of 1 MiB.
982
983   """
984   (mib, remainder) = divmod(size, 1024 * 1024)
985
986   if remainder != 0:
987     lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
988                   " to not overwrite existing data (%s bytes will not be"
989                   " wiped)", (1024 * 1024) - remainder)
990     mib += 1
991
992   return mib
993
994
995 def _CalcEta(time_taken, written, total_size):
996   """Calculates the ETA based on size written and total size.
997
998   @param time_taken: The time taken so far
999   @param written: amount written so far
1000   @param total_size: The total size of data to be written
1001   @return: The remaining time in seconds
1002
1003   """
1004   avg_time = time_taken / float(written)
1005   return (total_size - written) * avg_time
1006
1007
1008 def WipeDisks(lu, instance, disks=None):
1009   """Wipes instance disks.
1010
1011   @type lu: L{LogicalUnit}
1012   @param lu: the logical unit on whose behalf we execute
1013   @type instance: L{objects.Instance}
1014   @param instance: the instance whose disks we should create
1015   @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1016   @param disks: Disk details; tuple contains disk index, disk object and the
1017     start offset
1018
1019   """
1020   node_uuid = instance.primary_node
1021   node_name = lu.cfg.GetNodeName(node_uuid)
1022
1023   if disks is None:
1024     disks = [(idx, disk, 0)
1025              for (idx, disk) in enumerate(instance.disks)]
1026
1027   for (_, device, _) in disks:
1028     lu.cfg.SetDiskID(device, node_uuid)
1029
1030   logging.info("Pausing synchronization of disks of instance '%s'",
1031                instance.name)
1032   result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1033                                                   (map(compat.snd, disks),
1034                                                    instance),
1035                                                   True)
1036   result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1037
1038   for idx, success in enumerate(result.payload):
1039     if not success:
1040       logging.warn("Pausing synchronization of disk %s of instance '%s'"
1041                    " failed", idx, instance.name)
1042
1043   try:
1044     for (idx, device, offset) in disks:
1045       # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1046       # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1047       wipe_chunk_size = \
1048         int(min(constants.MAX_WIPE_CHUNK,
1049                 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1050
1051       size = device.size
1052       last_output = 0
1053       start_time = time.time()
1054
1055       if offset == 0:
1056         info_text = ""
1057       else:
1058         info_text = (" (from %s to %s)" %
1059                      (utils.FormatUnit(offset, "h"),
1060                       utils.FormatUnit(size, "h")))
1061
1062       lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1063
1064       logging.info("Wiping disk %d for instance %s on node %s using"
1065                    " chunk size %s", idx, instance.name, node_name,
1066                    wipe_chunk_size)
1067
1068       while offset < size:
1069         wipe_size = min(wipe_chunk_size, size - offset)
1070
1071         logging.debug("Wiping disk %d, offset %s, chunk %s",
1072                       idx, offset, wipe_size)
1073
1074         result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1075                                            offset, wipe_size)
1076         result.Raise("Could not wipe disk %d at offset %d for size %d" %
1077                      (idx, offset, wipe_size))
1078
1079         now = time.time()
1080         offset += wipe_size
1081         if now - last_output >= 60:
1082           eta = _CalcEta(now - start_time, offset, size)
1083           lu.LogInfo(" - done: %.1f%% ETA: %s",
1084                      offset / float(size) * 100, utils.FormatSeconds(eta))
1085           last_output = now
1086   finally:
1087     logging.info("Resuming synchronization of disks for instance '%s'",
1088                  instance.name)
1089
1090     result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1091                                                     (map(compat.snd, disks),
1092                                                      instance),
1093                                                     False)
1094
1095     if result.fail_msg:
1096       lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1097                     node_name, result.fail_msg)
1098     else:
1099       for idx, success in enumerate(result.payload):
1100         if not success:
1101           lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1102                         " failed", idx, instance.name)
1103
1104
1105 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1106   """Wrapper for L{WipeDisks} that handles errors.
1107
1108   @type lu: L{LogicalUnit}
1109   @param lu: the logical unit on whose behalf we execute
1110   @type instance: L{objects.Instance}
1111   @param instance: the instance whose disks we should wipe
1112   @param disks: see L{WipeDisks}
1113   @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1114       case of error
1115   @raise errors.OpPrereqError: in case of failure
1116
1117   """
1118   try:
1119     WipeDisks(lu, instance, disks=disks)
1120   except errors.OpExecError:
1121     logging.warning("Wiping disks for instance '%s' failed",
1122                     instance.name)
1123     _UndoCreateDisks(lu, cleanup)
1124     raise
1125
1126
1127 def ExpandCheckDisks(instance, disks):
1128   """Return the instance disks selected by the disks list
1129
1130   @type disks: list of L{objects.Disk} or None
1131   @param disks: selected disks
1132   @rtype: list of L{objects.Disk}
1133   @return: selected instance disks to act on
1134
1135   """
1136   if disks is None:
1137     return instance.disks
1138   else:
1139     if not set(disks).issubset(instance.disks):
1140       raise errors.ProgrammerError("Can only act on disks belonging to the"
1141                                    " target instance: expected a subset of %r,"
1142                                    " got %r" % (instance.disks, disks))
1143     return disks
1144
1145
1146 def WaitForSync(lu, instance, disks=None, oneshot=False):
1147   """Sleep and poll for an instance's disk to sync.
1148
1149   """
1150   if not instance.disks or disks is not None and not disks:
1151     return True
1152
1153   disks = ExpandCheckDisks(instance, disks)
1154
1155   if not oneshot:
1156     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1157
1158   node_uuid = instance.primary_node
1159   node_name = lu.cfg.GetNodeName(node_uuid)
1160
1161   for dev in disks:
1162     lu.cfg.SetDiskID(dev, node_uuid)
1163
1164   # TODO: Convert to utils.Retry
1165
1166   retries = 0
1167   degr_retries = 10 # in seconds, as we sleep 1 second each time
1168   while True:
1169     max_time = 0
1170     done = True
1171     cumul_degraded = False
1172     rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1173     msg = rstats.fail_msg
1174     if msg:
1175       lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1176       retries += 1
1177       if retries >= 10:
1178         raise errors.RemoteError("Can't contact node %s for mirror data,"
1179                                  " aborting." % node_name)
1180       time.sleep(6)
1181       continue
1182     rstats = rstats.payload
1183     retries = 0
1184     for i, mstat in enumerate(rstats):
1185       if mstat is None:
1186         lu.LogWarning("Can't compute data for node %s/%s",
1187                       node_name, disks[i].iv_name)
1188         continue
1189
1190       cumul_degraded = (cumul_degraded or
1191                         (mstat.is_degraded and mstat.sync_percent is None))
1192       if mstat.sync_percent is not None:
1193         done = False
1194         if mstat.estimated_time is not None:
1195           rem_time = ("%s remaining (estimated)" %
1196                       utils.FormatSeconds(mstat.estimated_time))
1197           max_time = mstat.estimated_time
1198         else:
1199           rem_time = "no time estimate"
1200         lu.LogInfo("- device %s: %5.2f%% done, %s",
1201                    disks[i].iv_name, mstat.sync_percent, rem_time)
1202
1203     # if we're done but degraded, let's do a few small retries, to
1204     # make sure we see a stable and not transient situation; therefore
1205     # we force restart of the loop
1206     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1207       logging.info("Degraded disks found, %d retries left", degr_retries)
1208       degr_retries -= 1
1209       time.sleep(1)
1210       continue
1211
1212     if done or oneshot:
1213       break
1214
1215     time.sleep(min(60, max_time))
1216
1217   if done:
1218     lu.LogInfo("Instance %s's disks are in sync", instance.name)
1219
1220   return not cumul_degraded
1221
1222
1223 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1224   """Shutdown block devices of an instance.
1225
1226   This does the shutdown on all nodes of the instance.
1227
1228   If the ignore_primary is false, errors on the primary node are
1229   ignored.
1230
1231   """
1232   lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1233   all_result = True
1234   disks = ExpandCheckDisks(instance, disks)
1235
1236   for disk in disks:
1237     for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1238       lu.cfg.SetDiskID(top_disk, node_uuid)
1239       result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1240       msg = result.fail_msg
1241       if msg:
1242         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1243                       disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1244         if ((node_uuid == instance.primary_node and not ignore_primary) or
1245             (node_uuid != instance.primary_node and not result.offline)):
1246           all_result = False
1247   return all_result
1248
1249
1250 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1251   """Shutdown block devices of an instance.
1252
1253   This function checks if an instance is running, before calling
1254   _ShutdownInstanceDisks.
1255
1256   """
1257   CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1258   ShutdownInstanceDisks(lu, instance, disks=disks)
1259
1260
1261 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1262                            ignore_size=False):
1263   """Prepare the block devices for an instance.
1264
1265   This sets up the block devices on all nodes.
1266
1267   @type lu: L{LogicalUnit}
1268   @param lu: the logical unit on whose behalf we execute
1269   @type instance: L{objects.Instance}
1270   @param instance: the instance for whose disks we assemble
1271   @type disks: list of L{objects.Disk} or None
1272   @param disks: which disks to assemble (or all, if None)
1273   @type ignore_secondaries: boolean
1274   @param ignore_secondaries: if true, errors on secondary nodes
1275       won't result in an error return from the function
1276   @type ignore_size: boolean
1277   @param ignore_size: if true, the current known size of the disk
1278       will not be used during the disk activation, useful for cases
1279       when the size is wrong
1280   @return: False if the operation failed, otherwise a list of
1281       (host, instance_visible_name, node_visible_name)
1282       with the mapping from node devices to instance devices
1283
1284   """
1285   device_info = []
1286   disks_ok = True
1287   disks = ExpandCheckDisks(instance, disks)
1288
1289   # With the two passes mechanism we try to reduce the window of
1290   # opportunity for the race condition of switching DRBD to primary
1291   # before handshaking occured, but we do not eliminate it
1292
1293   # The proper fix would be to wait (with some limits) until the
1294   # connection has been made and drbd transitions from WFConnection
1295   # into any other network-connected state (Connected, SyncTarget,
1296   # SyncSource, etc.)
1297
1298   # mark instance disks as active before doing actual work, so watcher does
1299   # not try to shut them down erroneously
1300   lu.cfg.MarkInstanceDisksActive(instance.uuid)
1301
1302   # 1st pass, assemble on all nodes in secondary mode
1303   for idx, inst_disk in enumerate(disks):
1304     for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1305                                   instance.primary_node):
1306       if ignore_size:
1307         node_disk = node_disk.Copy()
1308         node_disk.UnsetSize()
1309       lu.cfg.SetDiskID(node_disk, node_uuid)
1310       result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1311                                              instance.name, False, idx)
1312       msg = result.fail_msg
1313       if msg:
1314         is_offline_secondary = (node_uuid in instance.secondary_nodes and
1315                                 result.offline)
1316         lu.LogWarning("Could not prepare block device %s on node %s"
1317                       " (is_primary=False, pass=1): %s",
1318                       inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1319         if not (ignore_secondaries or is_offline_secondary):
1320           disks_ok = False
1321
1322   # FIXME: race condition on drbd migration to primary
1323
1324   # 2nd pass, do only the primary node
1325   for idx, inst_disk in enumerate(disks):
1326     dev_path = None
1327
1328     for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1329                                   instance.primary_node):
1330       if node_uuid != instance.primary_node:
1331         continue
1332       if ignore_size:
1333         node_disk = node_disk.Copy()
1334         node_disk.UnsetSize()
1335       lu.cfg.SetDiskID(node_disk, node_uuid)
1336       result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1337                                              instance.name, True, idx)
1338       msg = result.fail_msg
1339       if msg:
1340         lu.LogWarning("Could not prepare block device %s on node %s"
1341                       " (is_primary=True, pass=2): %s",
1342                       inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1343         disks_ok = False
1344       else:
1345         dev_path = result.payload
1346
1347     device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1348                         inst_disk.iv_name, dev_path))
1349
1350   # leave the disks configured for the primary node
1351   # this is a workaround that would be fixed better by
1352   # improving the logical/physical id handling
1353   for disk in disks:
1354     lu.cfg.SetDiskID(disk, instance.primary_node)
1355
1356   if not disks_ok:
1357     lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1358
1359   return disks_ok, device_info
1360
1361
1362 def StartInstanceDisks(lu, instance, force):
1363   """Start the disks of an instance.
1364
1365   """
1366   disks_ok, _ = AssembleInstanceDisks(lu, instance,
1367                                       ignore_secondaries=force)
1368   if not disks_ok:
1369     ShutdownInstanceDisks(lu, instance)
1370     if force is not None and not force:
1371       lu.LogWarning("",
1372                     hint=("If the message above refers to a secondary node,"
1373                           " you can retry the operation using '--force'"))
1374     raise errors.OpExecError("Disk consistency error")
1375
1376
1377 class LUInstanceGrowDisk(LogicalUnit):
1378   """Grow a disk of an instance.
1379
1380   """
1381   HPATH = "disk-grow"
1382   HTYPE = constants.HTYPE_INSTANCE
1383   REQ_BGL = False
1384
1385   def ExpandNames(self):
1386     self._ExpandAndLockInstance()
1387     self.needed_locks[locking.LEVEL_NODE] = []
1388     self.needed_locks[locking.LEVEL_NODE_RES] = []
1389     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1390     self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1391
1392   def DeclareLocks(self, level):
1393     if level == locking.LEVEL_NODE:
1394       self._LockInstancesNodes()
1395     elif level == locking.LEVEL_NODE_RES:
1396       # Copy node locks
1397       self.needed_locks[locking.LEVEL_NODE_RES] = \
1398         CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1399
1400   def BuildHooksEnv(self):
1401     """Build hooks env.
1402
1403     This runs on the master, the primary and all the secondaries.
1404
1405     """
1406     env = {
1407       "DISK": self.op.disk,
1408       "AMOUNT": self.op.amount,
1409       "ABSOLUTE": self.op.absolute,
1410       }
1411     env.update(BuildInstanceHookEnvByObject(self, self.instance))
1412     return env
1413
1414   def BuildHooksNodes(self):
1415     """Build hooks nodes.
1416
1417     """
1418     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1419     return (nl, nl)
1420
1421   def CheckPrereq(self):
1422     """Check prerequisites.
1423
1424     This checks that the instance is in the cluster.
1425
1426     """
1427     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1428     assert self.instance is not None, \
1429       "Cannot retrieve locked instance %s" % self.op.instance_name
1430     node_uuids = list(self.instance.all_nodes)
1431     for node_uuid in node_uuids:
1432       CheckNodeOnline(self, node_uuid)
1433     self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1434
1435     if self.instance.disk_template not in constants.DTS_GROWABLE:
1436       raise errors.OpPrereqError("Instance's disk layout does not support"
1437                                  " growing", errors.ECODE_INVAL)
1438
1439     self.disk = self.instance.FindDisk(self.op.disk)
1440
1441     if self.op.absolute:
1442       self.target = self.op.amount
1443       self.delta = self.target - self.disk.size
1444       if self.delta < 0:
1445         raise errors.OpPrereqError("Requested size (%s) is smaller than "
1446                                    "current disk size (%s)" %
1447                                    (utils.FormatUnit(self.target, "h"),
1448                                     utils.FormatUnit(self.disk.size, "h")),
1449                                    errors.ECODE_STATE)
1450     else:
1451       self.delta = self.op.amount
1452       self.target = self.disk.size + self.delta
1453       if self.delta < 0:
1454         raise errors.OpPrereqError("Requested increment (%s) is negative" %
1455                                    utils.FormatUnit(self.delta, "h"),
1456                                    errors.ECODE_INVAL)
1457
1458     self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1459
1460   def _CheckDiskSpace(self, node_uuids, req_vgspace):
1461     template = self.instance.disk_template
1462     if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1463         not any(self.node_es_flags.values())):
1464       # TODO: check the free disk space for file, when that feature will be
1465       # supported
1466       # With exclusive storage we need to do something smarter than just looking
1467       # at free space, which, in the end, is basically a dry run. So we rely on
1468       # the dry run performed in Exec() instead.
1469       CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1470
1471   def Exec(self, feedback_fn):
1472     """Execute disk grow.
1473
1474     """
1475     assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1476     assert (self.owned_locks(locking.LEVEL_NODE) ==
1477             self.owned_locks(locking.LEVEL_NODE_RES))
1478
1479     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1480
1481     disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1482     if not disks_ok:
1483       raise errors.OpExecError("Cannot activate block device to grow")
1484
1485     feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1486                 (self.op.disk, self.instance.name,
1487                  utils.FormatUnit(self.delta, "h"),
1488                  utils.FormatUnit(self.target, "h")))
1489
1490     # First run all grow ops in dry-run mode
1491     for node_uuid in self.instance.all_nodes:
1492       self.cfg.SetDiskID(self.disk, node_uuid)
1493       result = self.rpc.call_blockdev_grow(node_uuid,
1494                                            (self.disk, self.instance),
1495                                            self.delta, True, True,
1496                                            self.node_es_flags[node_uuid])
1497       result.Raise("Dry-run grow request failed to node %s" %
1498                    self.cfg.GetNodeName(node_uuid))
1499
1500     if wipe_disks:
1501       # Get disk size from primary node for wiping
1502       self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1503       result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1504                                                     [self.disk])
1505       result.Raise("Failed to retrieve disk size from node '%s'" %
1506                    self.instance.primary_node)
1507
1508       (disk_dimensions, ) = result.payload
1509
1510       if disk_dimensions is None:
1511         raise errors.OpExecError("Failed to retrieve disk size from primary"
1512                                  " node '%s'" % self.instance.primary_node)
1513       (disk_size_in_bytes, _) = disk_dimensions
1514
1515       old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1516
1517       assert old_disk_size >= self.disk.size, \
1518         ("Retrieved disk size too small (got %s, should be at least %s)" %
1519          (old_disk_size, self.disk.size))
1520     else:
1521       old_disk_size = None
1522
1523     # We know that (as far as we can test) operations across different
1524     # nodes will succeed, time to run it for real on the backing storage
1525     for node_uuid in self.instance.all_nodes:
1526       self.cfg.SetDiskID(self.disk, node_uuid)
1527       result = self.rpc.call_blockdev_grow(node_uuid,
1528                                            (self.disk, self.instance),
1529                                            self.delta, False, True,
1530                                            self.node_es_flags[node_uuid])
1531       result.Raise("Grow request failed to node %s" %
1532                    self.cfg.GetNodeName(node_uuid))
1533
1534     # And now execute it for logical storage, on the primary node
1535     node_uuid = self.instance.primary_node
1536     self.cfg.SetDiskID(self.disk, node_uuid)
1537     result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1538                                          self.delta, False, False,
1539                                          self.node_es_flags[node_uuid])
1540     result.Raise("Grow request failed to node %s" %
1541                  self.cfg.GetNodeName(node_uuid))
1542
1543     self.disk.RecordGrow(self.delta)
1544     self.cfg.Update(self.instance, feedback_fn)
1545
1546     # Changes have been recorded, release node lock
1547     ReleaseLocks(self, locking.LEVEL_NODE)
1548
1549     # Downgrade lock while waiting for sync
1550     self.glm.downgrade(locking.LEVEL_INSTANCE)
1551
1552     assert wipe_disks ^ (old_disk_size is None)
1553
1554     if wipe_disks:
1555       assert self.instance.disks[self.op.disk] == self.disk
1556
1557       # Wipe newly added disk space
1558       WipeDisks(self, self.instance,
1559                 disks=[(self.op.disk, self.disk, old_disk_size)])
1560
1561     if self.op.wait_for_sync:
1562       disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1563       if disk_abort:
1564         self.LogWarning("Disk syncing has not returned a good status; check"
1565                         " the instance")
1566       if not self.instance.disks_active:
1567         _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1568     elif not self.instance.disks_active:
1569       self.LogWarning("Not shutting down the disk even if the instance is"
1570                       " not supposed to be running because no wait for"
1571                       " sync mode was requested")
1572
1573     assert self.owned_locks(locking.LEVEL_NODE_RES)
1574     assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1575
1576
1577 class LUInstanceReplaceDisks(LogicalUnit):
1578   """Replace the disks of an instance.
1579
1580   """
1581   HPATH = "mirrors-replace"
1582   HTYPE = constants.HTYPE_INSTANCE
1583   REQ_BGL = False
1584
1585   def CheckArguments(self):
1586     """Check arguments.
1587
1588     """
1589     if self.op.mode == constants.REPLACE_DISK_CHG:
1590       if self.op.remote_node is None and self.op.iallocator is None:
1591         raise errors.OpPrereqError("When changing the secondary either an"
1592                                    " iallocator script must be used or the"
1593                                    " new node given", errors.ECODE_INVAL)
1594       else:
1595         CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1596
1597     elif self.op.remote_node is not None or self.op.iallocator is not None:
1598       # Not replacing the secondary
1599       raise errors.OpPrereqError("The iallocator and new node options can"
1600                                  " only be used when changing the"
1601                                  " secondary node", errors.ECODE_INVAL)
1602
1603   def ExpandNames(self):
1604     self._ExpandAndLockInstance()
1605
1606     assert locking.LEVEL_NODE not in self.needed_locks
1607     assert locking.LEVEL_NODE_RES not in self.needed_locks
1608     assert locking.LEVEL_NODEGROUP not in self.needed_locks
1609
1610     assert self.op.iallocator is None or self.op.remote_node is None, \
1611       "Conflicting options"
1612
1613     if self.op.remote_node is not None:
1614       (self.op.remote_node_uuid, self.op.remote_node) = \
1615         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1616                               self.op.remote_node)
1617
1618       # Warning: do not remove the locking of the new secondary here
1619       # unless DRBD8Dev.AddChildren is changed to work in parallel;
1620       # currently it doesn't since parallel invocations of
1621       # FindUnusedMinor will conflict
1622       self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1623       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1624     else:
1625       self.needed_locks[locking.LEVEL_NODE] = []
1626       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1627
1628       if self.op.iallocator is not None:
1629         # iallocator will select a new node in the same group
1630         self.needed_locks[locking.LEVEL_NODEGROUP] = []
1631         self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1632
1633     self.needed_locks[locking.LEVEL_NODE_RES] = []
1634
1635     self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1636                                    self.op.instance_name, self.op.mode,
1637                                    self.op.iallocator, self.op.remote_node_uuid,
1638                                    self.op.disks, self.op.early_release,
1639                                    self.op.ignore_ipolicy)
1640
1641     self.tasklets = [self.replacer]
1642
1643   def DeclareLocks(self, level):
1644     if level == locking.LEVEL_NODEGROUP:
1645       assert self.op.remote_node_uuid is None
1646       assert self.op.iallocator is not None
1647       assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1648
1649       self.share_locks[locking.LEVEL_NODEGROUP] = 1
1650       # Lock all groups used by instance optimistically; this requires going
1651       # via the node before it's locked, requiring verification later on
1652       self.needed_locks[locking.LEVEL_NODEGROUP] = \
1653         self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1654
1655     elif level == locking.LEVEL_NODE:
1656       if self.op.iallocator is not None:
1657         assert self.op.remote_node_uuid is None
1658         assert not self.needed_locks[locking.LEVEL_NODE]
1659         assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1660
1661         # Lock member nodes of all locked groups
1662         self.needed_locks[locking.LEVEL_NODE] = \
1663           [node_uuid
1664            for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1665            for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1666       else:
1667         assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1668
1669         self._LockInstancesNodes()
1670
1671     elif level == locking.LEVEL_NODE_RES:
1672       # Reuse node locks
1673       self.needed_locks[locking.LEVEL_NODE_RES] = \
1674         self.needed_locks[locking.LEVEL_NODE]
1675
1676   def BuildHooksEnv(self):
1677     """Build hooks env.
1678
1679     This runs on the master, the primary and all the secondaries.
1680
1681     """
1682     instance = self.replacer.instance
1683     env = {
1684       "MODE": self.op.mode,
1685       "NEW_SECONDARY": self.op.remote_node,
1686       "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1687       }
1688     env.update(BuildInstanceHookEnvByObject(self, instance))
1689     return env
1690
1691   def BuildHooksNodes(self):
1692     """Build hooks nodes.
1693
1694     """
1695     instance = self.replacer.instance
1696     nl = [
1697       self.cfg.GetMasterNode(),
1698       instance.primary_node,
1699       ]
1700     if self.op.remote_node_uuid is not None:
1701       nl.append(self.op.remote_node_uuid)
1702     return nl, nl
1703
1704   def CheckPrereq(self):
1705     """Check prerequisites.
1706
1707     """
1708     assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1709             self.op.iallocator is None)
1710
1711     # Verify if node group locks are still correct
1712     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1713     if owned_groups:
1714       CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1715
1716     return LogicalUnit.CheckPrereq(self)
1717
1718
1719 class LUInstanceActivateDisks(NoHooksLU):
1720   """Bring up an instance's disks.
1721
1722   """
1723   REQ_BGL = False
1724
1725   def ExpandNames(self):
1726     self._ExpandAndLockInstance()
1727     self.needed_locks[locking.LEVEL_NODE] = []
1728     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1729
1730   def DeclareLocks(self, level):
1731     if level == locking.LEVEL_NODE:
1732       self._LockInstancesNodes()
1733
1734   def CheckPrereq(self):
1735     """Check prerequisites.
1736
1737     This checks that the instance is in the cluster.
1738
1739     """
1740     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1741     assert self.instance is not None, \
1742       "Cannot retrieve locked instance %s" % self.op.instance_name
1743     CheckNodeOnline(self, self.instance.primary_node)
1744
1745   def Exec(self, feedback_fn):
1746     """Activate the disks.
1747
1748     """
1749     disks_ok, disks_info = \
1750               AssembleInstanceDisks(self, self.instance,
1751                                     ignore_size=self.op.ignore_size)
1752     if not disks_ok:
1753       raise errors.OpExecError("Cannot activate block devices")
1754
1755     if self.op.wait_for_sync:
1756       if not WaitForSync(self, self.instance):
1757         self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1758         raise errors.OpExecError("Some disks of the instance are degraded!")
1759
1760     return disks_info
1761
1762
1763 class LUInstanceDeactivateDisks(NoHooksLU):
1764   """Shutdown an instance's disks.
1765
1766   """
1767   REQ_BGL = False
1768
1769   def ExpandNames(self):
1770     self._ExpandAndLockInstance()
1771     self.needed_locks[locking.LEVEL_NODE] = []
1772     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1773
1774   def DeclareLocks(self, level):
1775     if level == locking.LEVEL_NODE:
1776       self._LockInstancesNodes()
1777
1778   def CheckPrereq(self):
1779     """Check prerequisites.
1780
1781     This checks that the instance is in the cluster.
1782
1783     """
1784     self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1785     assert self.instance is not None, \
1786       "Cannot retrieve locked instance %s" % self.op.instance_name
1787
1788   def Exec(self, feedback_fn):
1789     """Deactivate the disks
1790
1791     """
1792     if self.op.force:
1793       ShutdownInstanceDisks(self, self.instance)
1794     else:
1795       _SafeShutdownInstanceDisks(self, self.instance)
1796
1797
1798 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1799                                ldisk=False):
1800   """Check that mirrors are not degraded.
1801
1802   @attention: The device has to be annotated already.
1803
1804   The ldisk parameter, if True, will change the test from the
1805   is_degraded attribute (which represents overall non-ok status for
1806   the device(s)) to the ldisk (representing the local storage status).
1807
1808   """
1809   lu.cfg.SetDiskID(dev, node_uuid)
1810
1811   result = True
1812
1813   if on_primary or dev.AssembleOnSecondary():
1814     rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1815     msg = rstats.fail_msg
1816     if msg:
1817       lu.LogWarning("Can't find disk on node %s: %s",
1818                     lu.cfg.GetNodeName(node_uuid), msg)
1819       result = False
1820     elif not rstats.payload:
1821       lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1822       result = False
1823     else:
1824       if ldisk:
1825         result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1826       else:
1827         result = result and not rstats.payload.is_degraded
1828
1829   if dev.children:
1830     for child in dev.children:
1831       result = result and _CheckDiskConsistencyInner(lu, instance, child,
1832                                                      node_uuid, on_primary)
1833
1834   return result
1835
1836
1837 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1838   """Wrapper around L{_CheckDiskConsistencyInner}.
1839
1840   """
1841   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1842   return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1843                                     ldisk=ldisk)
1844
1845
1846 def _BlockdevFind(lu, node_uuid, dev, instance):
1847   """Wrapper around call_blockdev_find to annotate diskparams.
1848
1849   @param lu: A reference to the lu object
1850   @param node_uuid: The node to call out
1851   @param dev: The device to find
1852   @param instance: The instance object the device belongs to
1853   @returns The result of the rpc call
1854
1855   """
1856   (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1857   return lu.rpc.call_blockdev_find(node_uuid, disk)
1858
1859
1860 def _GenerateUniqueNames(lu, exts):
1861   """Generate a suitable LV name.
1862
1863   This will generate a logical volume name for the given instance.
1864
1865   """
1866   results = []
1867   for val in exts:
1868     new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1869     results.append("%s%s" % (new_id, val))
1870   return results
1871
1872
1873 class TLReplaceDisks(Tasklet):
1874   """Replaces disks for an instance.
1875
1876   Note: Locking is not within the scope of this class.
1877
1878   """
1879   def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1880                remote_node_uuid, disks, early_release, ignore_ipolicy):
1881     """Initializes this class.
1882
1883     """
1884     Tasklet.__init__(self, lu)
1885
1886     # Parameters
1887     self.instance_uuid = instance_uuid
1888     self.instance_name = instance_name
1889     self.mode = mode
1890     self.iallocator_name = iallocator_name
1891     self.remote_node_uuid = remote_node_uuid
1892     self.disks = disks
1893     self.early_release = early_release
1894     self.ignore_ipolicy = ignore_ipolicy
1895
1896     # Runtime data
1897     self.instance = None
1898     self.new_node_uuid = None
1899     self.target_node_uuid = None
1900     self.other_node_uuid = None
1901     self.remote_node_info = None
1902     self.node_secondary_ip = None
1903
1904   @staticmethod
1905   def _RunAllocator(lu, iallocator_name, instance_uuid,
1906                     relocate_from_node_uuids):
1907     """Compute a new secondary node using an IAllocator.
1908
1909     """
1910     req = iallocator.IAReqRelocate(
1911           inst_uuid=instance_uuid,
1912           relocate_from_node_uuids=list(relocate_from_node_uuids))
1913     ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1914
1915     ial.Run(iallocator_name)
1916
1917     if not ial.success:
1918       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1919                                  " %s" % (iallocator_name, ial.info),
1920                                  errors.ECODE_NORES)
1921
1922     remote_node_name = ial.result[0]
1923     remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1924
1925     if remote_node is None:
1926       raise errors.OpPrereqError("Node %s not found in configuration" %
1927                                  remote_node_name, errors.ECODE_NOENT)
1928
1929     lu.LogInfo("Selected new secondary for instance '%s': %s",
1930                instance_uuid, remote_node_name)
1931
1932     return remote_node.uuid
1933
1934   def _FindFaultyDisks(self, node_uuid):
1935     """Wrapper for L{FindFaultyInstanceDisks}.
1936
1937     """
1938     return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1939                                    node_uuid, True)
1940
1941   def _CheckDisksActivated(self, instance):
1942     """Checks if the instance disks are activated.
1943
1944     @param instance: The instance to check disks
1945     @return: True if they are activated, False otherwise
1946
1947     """
1948     node_uuids = instance.all_nodes
1949
1950     for idx, dev in enumerate(instance.disks):
1951       for node_uuid in node_uuids:
1952         self.lu.LogInfo("Checking disk/%d on %s", idx,
1953                         self.cfg.GetNodeName(node_uuid))
1954         self.cfg.SetDiskID(dev, node_uuid)
1955
1956         result = _BlockdevFind(self, node_uuid, dev, instance)
1957
1958         if result.offline:
1959           continue
1960         elif result.fail_msg or not result.payload:
1961           return False
1962
1963     return True
1964
1965   def CheckPrereq(self):
1966     """Check prerequisites.
1967
1968     This checks that the instance is in the cluster.
1969
1970     """
1971     self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1972     assert self.instance is not None, \
1973       "Cannot retrieve locked instance %s" % self.instance_name
1974
1975     if self.instance.disk_template != constants.DT_DRBD8:
1976       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1977                                  " instances", errors.ECODE_INVAL)
1978
1979     if len(self.instance.secondary_nodes) != 1:
1980       raise errors.OpPrereqError("The instance has a strange layout,"
1981                                  " expected one secondary but found %d" %
1982                                  len(self.instance.secondary_nodes),
1983                                  errors.ECODE_FAULT)
1984
1985     secondary_node_uuid = self.instance.secondary_nodes[0]
1986
1987     if self.iallocator_name is None:
1988       remote_node_uuid = self.remote_node_uuid
1989     else:
1990       remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1991                                             self.instance.uuid,
1992                                             self.instance.secondary_nodes)
1993
1994     if remote_node_uuid is None:
1995       self.remote_node_info = None
1996     else:
1997       assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1998              "Remote node '%s' is not locked" % remote_node_uuid
1999
2000       self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
2001       assert self.remote_node_info is not None, \
2002         "Cannot retrieve locked node %s" % remote_node_uuid
2003
2004     if remote_node_uuid == self.instance.primary_node:
2005       raise errors.OpPrereqError("The specified node is the primary node of"
2006                                  " the instance", errors.ECODE_INVAL)
2007
2008     if remote_node_uuid == secondary_node_uuid:
2009       raise errors.OpPrereqError("The specified node is already the"
2010                                  " secondary node of the instance",
2011                                  errors.ECODE_INVAL)
2012
2013     if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2014                                     constants.REPLACE_DISK_CHG):
2015       raise errors.OpPrereqError("Cannot specify disks to be replaced",
2016                                  errors.ECODE_INVAL)
2017
2018     if self.mode == constants.REPLACE_DISK_AUTO:
2019       if not self._CheckDisksActivated(self.instance):
2020         raise errors.OpPrereqError("Please run activate-disks on instance %s"
2021                                    " first" % self.instance_name,
2022                                    errors.ECODE_STATE)
2023       faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2024       faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2025
2026       if faulty_primary and faulty_secondary:
2027         raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2028                                    " one node and can not be repaired"
2029                                    " automatically" % self.instance_name,
2030                                    errors.ECODE_STATE)
2031
2032       if faulty_primary:
2033         self.disks = faulty_primary
2034         self.target_node_uuid = self.instance.primary_node
2035         self.other_node_uuid = secondary_node_uuid
2036         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2037       elif faulty_secondary:
2038         self.disks = faulty_secondary
2039         self.target_node_uuid = secondary_node_uuid
2040         self.other_node_uuid = self.instance.primary_node
2041         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2042       else:
2043         self.disks = []
2044         check_nodes = []
2045
2046     else:
2047       # Non-automatic modes
2048       if self.mode == constants.REPLACE_DISK_PRI:
2049         self.target_node_uuid = self.instance.primary_node
2050         self.other_node_uuid = secondary_node_uuid
2051         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2052
2053       elif self.mode == constants.REPLACE_DISK_SEC:
2054         self.target_node_uuid = secondary_node_uuid
2055         self.other_node_uuid = self.instance.primary_node
2056         check_nodes = [self.target_node_uuid, self.other_node_uuid]
2057
2058       elif self.mode == constants.REPLACE_DISK_CHG:
2059         self.new_node_uuid = remote_node_uuid
2060         self.other_node_uuid = self.instance.primary_node
2061         self.target_node_uuid = secondary_node_uuid
2062         check_nodes = [self.new_node_uuid, self.other_node_uuid]
2063
2064         CheckNodeNotDrained(self.lu, remote_node_uuid)
2065         CheckNodeVmCapable(self.lu, remote_node_uuid)
2066
2067         old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2068         assert old_node_info is not None
2069         if old_node_info.offline and not self.early_release:
2070           # doesn't make sense to delay the release
2071           self.early_release = True
2072           self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2073                           " early-release mode", secondary_node_uuid)
2074
2075       else:
2076         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2077                                      self.mode)
2078
2079       # If not specified all disks should be replaced
2080       if not self.disks:
2081         self.disks = range(len(self.instance.disks))
2082
2083     # TODO: This is ugly, but right now we can't distinguish between internal
2084     # submitted opcode and external one. We should fix that.
2085     if self.remote_node_info:
2086       # We change the node, lets verify it still meets instance policy
2087       new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2088       cluster = self.cfg.GetClusterInfo()
2089       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2090                                                               new_group_info)
2091       CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2092                              self.remote_node_info, self.cfg,
2093                              ignore=self.ignore_ipolicy)
2094
2095     for node_uuid in check_nodes:
2096       CheckNodeOnline(self.lu, node_uuid)
2097
2098     touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2099                                                           self.other_node_uuid,
2100                                                           self.target_node_uuid]
2101                               if node_uuid is not None)
2102
2103     # Release unneeded node and node resource locks
2104     ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2105     ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2106     ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2107
2108     # Release any owned node group
2109     ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2110
2111     # Check whether disks are valid
2112     for disk_idx in self.disks:
2113       self.instance.FindDisk(disk_idx)
2114
2115     # Get secondary node IP addresses
2116     self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2117                                   in self.cfg.GetMultiNodeInfo(touched_nodes))
2118
2119   def Exec(self, feedback_fn):
2120     """Execute disk replacement.
2121
2122     This dispatches the disk replacement to the appropriate handler.
2123
2124     """
2125     if __debug__:
2126       # Verify owned locks before starting operation
2127       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2128       assert set(owned_nodes) == set(self.node_secondary_ip), \
2129           ("Incorrect node locks, owning %s, expected %s" %
2130            (owned_nodes, self.node_secondary_ip.keys()))
2131       assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2132               self.lu.owned_locks(locking.LEVEL_NODE_RES))
2133       assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2134
2135       owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2136       assert list(owned_instances) == [self.instance_name], \
2137           "Instance '%s' not locked" % self.instance_name
2138
2139       assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2140           "Should not own any node group lock at this point"
2141
2142     if not self.disks:
2143       feedback_fn("No disks need replacement for instance '%s'" %
2144                   self.instance.name)
2145       return
2146
2147     feedback_fn("Replacing disk(s) %s for instance '%s'" %
2148                 (utils.CommaJoin(self.disks), self.instance.name))
2149     feedback_fn("Current primary node: %s" %
2150                 self.cfg.GetNodeName(self.instance.primary_node))
2151     feedback_fn("Current seconary node: %s" %
2152                 utils.CommaJoin(self.cfg.GetNodeNames(
2153                                   self.instance.secondary_nodes)))
2154
2155     activate_disks = not self.instance.disks_active
2156
2157     # Activate the instance disks if we're replacing them on a down instance
2158     if activate_disks:
2159       StartInstanceDisks(self.lu, self.instance, True)
2160
2161     try:
2162       # Should we replace the secondary node?
2163       if self.new_node_uuid is not None:
2164         fn = self._ExecDrbd8Secondary
2165       else:
2166         fn = self._ExecDrbd8DiskOnly
2167
2168       result = fn(feedback_fn)
2169     finally:
2170       # Deactivate the instance disks if we're replacing them on a
2171       # down instance
2172       if activate_disks:
2173         _SafeShutdownInstanceDisks(self.lu, self.instance)
2174
2175     assert not self.lu.owned_locks(locking.LEVEL_NODE)
2176
2177     if __debug__:
2178       # Verify owned locks
2179       owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2180       nodes = frozenset(self.node_secondary_ip)
2181       assert ((self.early_release and not owned_nodes) or
2182               (not self.early_release and not (set(owned_nodes) - nodes))), \
2183         ("Not owning the correct locks, early_release=%s, owned=%r,"
2184          " nodes=%r" % (self.early_release, owned_nodes, nodes))
2185
2186     return result
2187
2188   def _CheckVolumeGroup(self, node_uuids):
2189     self.lu.LogInfo("Checking volume groups")
2190
2191     vgname = self.cfg.GetVGName()
2192
2193     # Make sure volume group exists on all involved nodes
2194     results = self.rpc.call_vg_list(node_uuids)
2195     if not results:
2196       raise errors.OpExecError("Can't list volume groups on the nodes")
2197
2198     for node_uuid in node_uuids:
2199       res = results[node_uuid]
2200       res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2201       if vgname not in res.payload:
2202         raise errors.OpExecError("Volume group '%s' not found on node %s" %
2203                                  (vgname, self.cfg.GetNodeName(node_uuid)))
2204
2205   def _CheckDisksExistence(self, node_uuids):
2206     # Check disk existence
2207     for idx, dev in enumerate(self.instance.disks):
2208       if idx not in self.disks:
2209         continue
2210
2211       for node_uuid in node_uuids:
2212         self.lu.LogInfo("Checking disk/%d on %s", idx,
2213                         self.cfg.GetNodeName(node_uuid))
2214         self.cfg.SetDiskID(dev, node_uuid)
2215
2216         result = _BlockdevFind(self, node_uuid, dev, self.instance)
2217
2218         msg = result.fail_msg
2219         if msg or not result.payload:
2220           if not msg:
2221             msg = "disk not found"
2222           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2223                                    (idx, self.cfg.GetNodeName(node_uuid), msg))
2224
2225   def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2226     for idx, dev in enumerate(self.instance.disks):
2227       if idx not in self.disks:
2228         continue
2229
2230       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2231                       (idx, self.cfg.GetNodeName(node_uuid)))
2232
2233       if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2234                                   on_primary, ldisk=ldisk):
2235         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2236                                  " replace disks for instance %s" %
2237                                  (self.cfg.GetNodeName(node_uuid),
2238                                   self.instance.name))
2239
2240   def _CreateNewStorage(self, node_uuid):
2241     """Create new storage on the primary or secondary node.
2242
2243     This is only used for same-node replaces, not for changing the
2244     secondary node, hence we don't want to modify the existing disk.
2245
2246     """
2247     iv_names = {}
2248
2249     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2250     for idx, dev in enumerate(disks):
2251       if idx not in self.disks:
2252         continue
2253
2254       self.lu.LogInfo("Adding storage on %s for disk/%d",
2255                       self.cfg.GetNodeName(node_uuid), idx)
2256
2257       self.cfg.SetDiskID(dev, node_uuid)
2258
2259       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2260       names = _GenerateUniqueNames(self.lu, lv_names)
2261
2262       (data_disk, meta_disk) = dev.children
2263       vg_data = data_disk.logical_id[0]
2264       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2265                              logical_id=(vg_data, names[0]),
2266                              params=data_disk.params)
2267       vg_meta = meta_disk.logical_id[0]
2268       lv_meta = objects.Disk(dev_type=constants.LD_LV,
2269                              size=constants.DRBD_META_SIZE,
2270                              logical_id=(vg_meta, names[1]),
2271                              params=meta_disk.params)
2272
2273       new_lvs = [lv_data, lv_meta]
2274       old_lvs = [child.Copy() for child in dev.children]
2275       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2276       excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2277
2278       # we pass force_create=True to force the LVM creation
2279       for new_lv in new_lvs:
2280         try:
2281           _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2282                                GetInstanceInfoText(self.instance), False,
2283                                excl_stor)
2284         except errors.DeviceCreationError, e:
2285           raise errors.OpExecError("Can't create block device: %s" % e.message)
2286
2287     return iv_names
2288
2289   def _CheckDevices(self, node_uuid, iv_names):
2290     for name, (dev, _, _) in iv_names.iteritems():
2291       self.cfg.SetDiskID(dev, node_uuid)
2292
2293       result = _BlockdevFind(self, node_uuid, dev, self.instance)
2294
2295       msg = result.fail_msg
2296       if msg or not result.payload:
2297         if not msg:
2298           msg = "disk not found"
2299         raise errors.OpExecError("Can't find DRBD device %s: %s" %
2300                                  (name, msg))
2301
2302       if result.payload.is_degraded:
2303         raise errors.OpExecError("DRBD device %s is degraded!" % name)
2304
2305   def _RemoveOldStorage(self, node_uuid, iv_names):
2306     for name, (_, old_lvs, _) in iv_names.iteritems():
2307       self.lu.LogInfo("Remove logical volumes for %s", name)
2308
2309       for lv in old_lvs:
2310         self.cfg.SetDiskID(lv, node_uuid)
2311
2312         msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2313         if msg:
2314           self.lu.LogWarning("Can't remove old LV: %s", msg,
2315                              hint="remove unused LVs manually")
2316
2317   def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2318     """Replace a disk on the primary or secondary for DRBD 8.
2319
2320     The algorithm for replace is quite complicated:
2321
2322       1. for each disk to be replaced:
2323
2324         1. create new LVs on the target node with unique names
2325         1. detach old LVs from the drbd device
2326         1. rename old LVs to name_replaced.<time_t>
2327         1. rename new LVs to old LVs
2328         1. attach the new LVs (with the old names now) to the drbd device
2329
2330       1. wait for sync across all devices
2331
2332       1. for each modified disk:
2333
2334         1. remove old LVs (which have the name name_replaces.<time_t>)
2335
2336     Failures are not very well handled.
2337
2338     """
2339     steps_total = 6
2340
2341     # Step: check device activation
2342     self.lu.LogStep(1, steps_total, "Check device existence")
2343     self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2344     self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2345
2346     # Step: check other node consistency
2347     self.lu.LogStep(2, steps_total, "Check peer consistency")
2348     self._CheckDisksConsistency(
2349       self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2350       False)
2351
2352     # Step: create new storage
2353     self.lu.LogStep(3, steps_total, "Allocate new storage")
2354     iv_names = self._CreateNewStorage(self.target_node_uuid)
2355
2356     # Step: for each lv, detach+rename*2+attach
2357     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2358     for dev, old_lvs, new_lvs in iv_names.itervalues():
2359       self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2360
2361       result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2362                                                      old_lvs)
2363       result.Raise("Can't detach drbd from local storage on node"
2364                    " %s for device %s" %
2365                    (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2366       #dev.children = []
2367       #cfg.Update(instance)
2368
2369       # ok, we created the new LVs, so now we know we have the needed
2370       # storage; as such, we proceed on the target node to rename
2371       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2372       # using the assumption that logical_id == physical_id (which in
2373       # turn is the unique_id on that node)
2374
2375       # FIXME(iustin): use a better name for the replaced LVs
2376       temp_suffix = int(time.time())
2377       ren_fn = lambda d, suff: (d.physical_id[0],
2378                                 d.physical_id[1] + "_replaced-%s" % suff)
2379
2380       # Build the rename list based on what LVs exist on the node
2381       rename_old_to_new = []
2382       for to_ren in old_lvs:
2383         result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2384         if not result.fail_msg and result.payload:
2385           # device exists
2386           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2387
2388       self.lu.LogInfo("Renaming the old LVs on the target node")
2389       result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2390                                              rename_old_to_new)
2391       result.Raise("Can't rename old LVs on node %s" %
2392                    self.cfg.GetNodeName(self.target_node_uuid))
2393
2394       # Now we rename the new LVs to the old LVs
2395       self.lu.LogInfo("Renaming the new LVs on the target node")
2396       rename_new_to_old = [(new, old.physical_id)
2397                            for old, new in zip(old_lvs, new_lvs)]
2398       result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2399                                              rename_new_to_old)
2400       result.Raise("Can't rename new LVs on node %s" %
2401                    self.cfg.GetNodeName(self.target_node_uuid))
2402
2403       # Intermediate steps of in memory modifications
2404       for old, new in zip(old_lvs, new_lvs):
2405         new.logical_id = old.logical_id
2406         self.cfg.SetDiskID(new, self.target_node_uuid)
2407
2408       # We need to modify old_lvs so that removal later removes the
2409       # right LVs, not the newly added ones; note that old_lvs is a
2410       # copy here
2411       for disk in old_lvs:
2412         disk.logical_id = ren_fn(disk, temp_suffix)
2413         self.cfg.SetDiskID(disk, self.target_node_uuid)
2414
2415       # Now that the new lvs have the old name, we can add them to the device
2416       self.lu.LogInfo("Adding new mirror component on %s",
2417                       self.cfg.GetNodeName(self.target_node_uuid))
2418       result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2419                                                   (dev, self.instance), new_lvs)
2420       msg = result.fail_msg
2421       if msg:
2422         for new_lv in new_lvs:
2423           msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2424                                                new_lv).fail_msg
2425           if msg2:
2426             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2427                                hint=("cleanup manually the unused logical"
2428                                      "volumes"))
2429         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2430
2431     cstep = itertools.count(5)
2432
2433     if self.early_release:
2434       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2435       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2436       # TODO: Check if releasing locks early still makes sense
2437       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2438     else:
2439       # Release all resource locks except those used by the instance
2440       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2441                    keep=self.node_secondary_ip.keys())
2442
2443     # Release all node locks while waiting for sync
2444     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2445
2446     # TODO: Can the instance lock be downgraded here? Take the optional disk
2447     # shutdown in the caller into consideration.
2448
2449     # Wait for sync
2450     # This can fail as the old devices are degraded and _WaitForSync
2451     # does a combined result over all disks, so we don't check its return value
2452     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2453     WaitForSync(self.lu, self.instance)
2454
2455     # Check all devices manually
2456     self._CheckDevices(self.instance.primary_node, iv_names)
2457
2458     # Step: remove old storage
2459     if not self.early_release:
2460       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2461       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2462
2463   def _ExecDrbd8Secondary(self, feedback_fn):
2464     """Replace the secondary node for DRBD 8.
2465
2466     The algorithm for replace is quite complicated:
2467       - for all disks of the instance:
2468         - create new LVs on the new node with same names
2469         - shutdown the drbd device on the old secondary
2470         - disconnect the drbd network on the primary
2471         - create the drbd device on the new secondary
2472         - network attach the drbd on the primary, using an artifice:
2473           the drbd code for Attach() will connect to the network if it
2474           finds a device which is connected to the good local disks but
2475           not network enabled
2476       - wait for sync across all devices
2477       - remove all disks from the old secondary
2478
2479     Failures are not very well handled.
2480
2481     """
2482     steps_total = 6
2483
2484     pnode = self.instance.primary_node
2485
2486     # Step: check device activation
2487     self.lu.LogStep(1, steps_total, "Check device existence")
2488     self._CheckDisksExistence([self.instance.primary_node])
2489     self._CheckVolumeGroup([self.instance.primary_node])
2490
2491     # Step: check other node consistency
2492     self.lu.LogStep(2, steps_total, "Check peer consistency")
2493     self._CheckDisksConsistency(self.instance.primary_node, True, True)
2494
2495     # Step: create new storage
2496     self.lu.LogStep(3, steps_total, "Allocate new storage")
2497     disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2498     excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2499                                                   self.new_node_uuid)
2500     for idx, dev in enumerate(disks):
2501       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2502                       (self.cfg.GetNodeName(self.new_node_uuid), idx))
2503       # we pass force_create=True to force LVM creation
2504       for new_lv in dev.children:
2505         try:
2506           _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2507                                new_lv, True, GetInstanceInfoText(self.instance),
2508                                False, excl_stor)
2509         except errors.DeviceCreationError, e:
2510           raise errors.OpExecError("Can't create block device: %s" % e.message)
2511
2512     # Step 4: dbrd minors and drbd setups changes
2513     # after this, we must manually remove the drbd minors on both the
2514     # error and the success paths
2515     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2516     minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2517                                          for _ in self.instance.disks],
2518                                         self.instance.uuid)
2519     logging.debug("Allocated minors %r", minors)
2520
2521     iv_names = {}
2522     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2523       self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2524                       (self.cfg.GetNodeName(self.new_node_uuid), idx))
2525       # create new devices on new_node; note that we create two IDs:
2526       # one without port, so the drbd will be activated without
2527       # networking information on the new node at this stage, and one
2528       # with network, for the latter activation in step 4
2529       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2530       if self.instance.primary_node == o_node1:
2531         p_minor = o_minor1
2532       else:
2533         assert self.instance.primary_node == o_node2, "Three-node instance?"
2534         p_minor = o_minor2
2535
2536       new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2537                       p_minor, new_minor, o_secret)
2538       new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2539                     p_minor, new_minor, o_secret)
2540
2541       iv_names[idx] = (dev, dev.children, new_net_id)
2542       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2543                     new_net_id)
2544       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2545                               logical_id=new_alone_id,
2546                               children=dev.children,
2547                               size=dev.size,
2548                               params={})
2549       (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2550                                             self.cfg)
2551       try:
2552         CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2553                              anno_new_drbd,
2554                              GetInstanceInfoText(self.instance), False,
2555                              excl_stor)
2556       except errors.GenericError:
2557         self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2558         raise
2559
2560     # We have new devices, shutdown the drbd on the old secondary
2561     for idx, dev in enumerate(self.instance.disks):
2562       self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2563       self.cfg.SetDiskID(dev, self.target_node_uuid)
2564       msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2565                                             (dev, self.instance)).fail_msg
2566       if msg:
2567         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2568                            "node: %s" % (idx, msg),
2569                            hint=("Please cleanup this device manually as"
2570                                  " soon as possible"))
2571
2572     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2573     result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2574                                                self.instance.disks)[pnode]
2575
2576     msg = result.fail_msg
2577     if msg:
2578       # detaches didn't succeed (unlikely)
2579       self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2580       raise errors.OpExecError("Can't detach the disks from the network on"
2581                                " old node: %s" % (msg,))
2582
2583     # if we managed to detach at least one, we update all the disks of
2584     # the instance to point to the new secondary
2585     self.lu.LogInfo("Updating instance configuration")
2586     for dev, _, new_logical_id in iv_names.itervalues():
2587       dev.logical_id = new_logical_id
2588       self.cfg.SetDiskID(dev, self.instance.primary_node)
2589
2590     self.cfg.Update(self.instance, feedback_fn)
2591
2592     # Release all node locks (the configuration has been updated)
2593     ReleaseLocks(self.lu, locking.LEVEL_NODE)
2594
2595     # and now perform the drbd attach
2596     self.lu.LogInfo("Attaching primary drbds to new secondary"
2597                     " (standalone => connected)")
2598     result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2599                                             self.new_node_uuid],
2600                                            self.node_secondary_ip,
2601                                            (self.instance.disks, self.instance),
2602                                            self.instance.name,
2603                                            False)
2604     for to_node, to_result in result.items():
2605       msg = to_result.fail_msg
2606       if msg:
2607         self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2608                            self.cfg.GetNodeName(to_node), msg,
2609                            hint=("please do a gnt-instance info to see the"
2610                                  " status of disks"))
2611
2612     cstep = itertools.count(5)
2613
2614     if self.early_release:
2615       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2616       self._RemoveOldStorage(self.target_node_uuid, iv_names)
2617       # TODO: Check if releasing locks early still makes sense
2618       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2619     else:
2620       # Release all resource locks except those used by the instance
2621       ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2622                    keep=self.node_secondary_ip.keys())
2623
2624     # TODO: Can the instance lock be downgraded here? Take the optional disk
2625     # shutdown in the caller into consideration.
2626
2627     # Wait for sync
2628     # This can fail as the old devices are degraded and _WaitForSync
2629     # does a combined result over all disks, so we don't check its return value
2630     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2631     WaitForSync(self.lu, self.instance)
2632
2633     # Check all devices manually
2634     self._CheckDevices(self.instance.primary_node, iv_names)
2635
2636     # Step: remove old storage
2637     if not self.early_release:
2638       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2639       self._RemoveOldStorage(self.target_node_uuid, iv_names)